diff options
Diffstat (limited to 'overlays/worktime/worktime.py')
-rwxr-xr-x | overlays/worktime/worktime.py | 430 |
1 files changed, 430 insertions, 0 deletions
diff --git a/overlays/worktime/worktime.py b/overlays/worktime/worktime.py new file mode 100755 index 00000000..c7b013f9 --- /dev/null +++ b/overlays/worktime/worktime.py | |||
@@ -0,0 +1,430 @@ | |||
1 | #!@python@/bin/python | ||
2 | |||
3 | import requests | ||
4 | from requests.exceptions import HTTPError | ||
5 | from requests.auth import HTTPBasicAuth | ||
6 | from datetime import * | ||
7 | from xdg import (BaseDirectory) | ||
8 | import configparser | ||
9 | from uritools import uricompose | ||
10 | |||
11 | from dateutil.easter import * | ||
12 | from dateutil.tz import * | ||
13 | from dateutil.parser import isoparse | ||
14 | |||
15 | from enum import Enum | ||
16 | |||
17 | from math import (copysign, ceil) | ||
18 | |||
19 | import calendar | ||
20 | |||
21 | import argparse | ||
22 | |||
23 | from copy import deepcopy | ||
24 | |||
25 | import sys | ||
26 | |||
27 | from tabulate import tabulate | ||
28 | |||
29 | class TogglAPISection(Enum): | ||
30 | TOGGL = '/api/v8' | ||
31 | REPORTS = '/reports/api/v2' | ||
32 | |||
33 | class TogglAPIError(Exception): | ||
34 | def __init__(self, http_error, response): | ||
35 | self.http_error = http_error | ||
36 | self.response = response | ||
37 | |||
38 | def __str__(self): | ||
39 | if not self.http_error is None: | ||
40 | return str(self.http_error) | ||
41 | else: | ||
42 | return self.response.text | ||
43 | |||
44 | class TogglAPI(object): | ||
45 | def __init__(self, api_token, workspace_id): | ||
46 | self._api_token = api_token | ||
47 | self._workspace_id = workspace_id | ||
48 | |||
49 | def _make_url(self, api=TogglAPISection.TOGGL, section=['time_entries', 'current'], params={}): | ||
50 | if api is TogglAPISection.REPORTS: | ||
51 | params.update({'user_agent': 'worktime', 'workspace_id': self._workspace_id}) | ||
52 | |||
53 | api_path = api.value | ||
54 | section_path = '/'.join(section) | ||
55 | uri = uricompose(scheme='https', host='api.track.toggl.com', path=f"{api_path}/{section_path}", query=params) | ||
56 | |||
57 | return uri | ||
58 | |||
59 | def _query(self, url, method): | ||
60 | |||
61 | headers = {'content-type': 'application/json'} | ||
62 | response = None | ||
63 | |||
64 | if method == 'GET': | ||
65 | response = requests.get(url, headers=headers, auth=HTTPBasicAuth(self._api_token, 'api_token')) | ||
66 | elif method == 'POST': | ||
67 | response = requests.post(url, headers=headers, auth=HTTPBasicAuth(self._api_token, 'api_token')) | ||
68 | else: | ||
69 | raise ValueError(f"Undefined HTTP method “{method}”") | ||
70 | |||
71 | response.raise_for_status() | ||
72 | |||
73 | return response | ||
74 | |||
75 | def get_billable_hours(self, start_date, end_date=datetime.now(timezone.utc), rounding=False): | ||
76 | billable_acc = timedelta(milliseconds = 0) | ||
77 | step = timedelta(days = 365) | ||
78 | |||
79 | for req_start in [start_date + x * step for x in range(0, ceil((end_date - start_date) / step))]: | ||
80 | req_end = end_date | ||
81 | if end_date > req_start + step: | ||
82 | req_end = datetime.combine((req_start + step).astimezone(timezone.utc).date(), time(tzinfo=timezone.utc)) | ||
83 | elif req_start > start_date: | ||
84 | req_start = datetime.combine(req_start.astimezone(timezone.utc).date(), time(tzinfo=timezone.utc)) + timedelta(days = 1) | ||
85 | |||
86 | url = self._make_url(api = TogglAPISection.REPORTS, section = ['summary'], params={'since': req_start.astimezone(timezone.utc).isoformat(), 'until': req_end.astimezone(timezone.utc).isoformat(), 'rounding': rounding}) | ||
87 | r = self._query(url = url, method='GET') | ||
88 | if not r or not r.json(): | ||
89 | raise TogglAPIError(r) | ||
90 | billable_acc += timedelta(milliseconds=r.json()['total_billable']) if r.json()['total_billable'] else timedelta(milliseconds=0) | ||
91 | |||
92 | return billable_acc | ||
93 | |||
94 | def get_running_clock(self, now=datetime.now(timezone.utc)): | ||
95 | url = self._make_url(api = TogglAPISection.TOGGL, section = ['time_entries', 'current']) | ||
96 | r = self._query(url = url, method='GET') | ||
97 | |||
98 | if not r or not r.json(): | ||
99 | raise TogglAPIError(r) | ||
100 | |||
101 | if not r.json()['data'] or not r.json()['data']['billable']: | ||
102 | return None | ||
103 | |||
104 | start = isoparse(r.json()['data']['start']) | ||
105 | |||
106 | return now - start if start <= now else None | ||
107 | |||
108 | class Worktime(object): | ||
109 | time_worked = timedelta() | ||
110 | running_entry = None | ||
111 | now = datetime.now(tzlocal()) | ||
112 | time_pulled_forward = timedelta() | ||
113 | is_workday = False | ||
114 | include_running = True | ||
115 | time_to_work = None | ||
116 | force_day_to_work = True | ||
117 | |||
118 | @staticmethod | ||
119 | def holidays(year): | ||
120 | holidays = dict() | ||
121 | |||
122 | y_easter = datetime.combine(easter(year), time(), tzinfo=tzlocal()) | ||
123 | |||
124 | # Legal holidays in munich, bavaria | ||
125 | holidays[datetime(year, 1, 1, tzinfo=tzlocal()).date()] = 1 | ||
126 | holidays[datetime(year, 1, 6, tzinfo=tzlocal()).date()] = 1 | ||
127 | holidays[(y_easter+timedelta(days=-2)).date()] = 1 | ||
128 | holidays[(y_easter+timedelta(days=+1)).date()] = 1 | ||
129 | holidays[datetime(year, 5, 1, tzinfo=tzlocal()).date()] = 1 | ||
130 | holidays[(y_easter+timedelta(days=+39)).date()] = 1 | ||
131 | holidays[(y_easter+timedelta(days=+50)).date()] = 1 | ||
132 | holidays[(y_easter+timedelta(days=+60)).date()] = 1 | ||
133 | holidays[datetime(year, 8, 15, tzinfo=tzlocal()).date()] = 1 | ||
134 | holidays[datetime(year, 10, 3, tzinfo=tzlocal()).date()] = 1 | ||
135 | holidays[datetime(year, 11, 1, tzinfo=tzlocal()).date()] = 1 | ||
136 | holidays[datetime(year, 12, 25, tzinfo=tzlocal()).date()] = 1 | ||
137 | holidays[datetime(year, 12, 26, tzinfo=tzlocal()).date()] = 1 | ||
138 | |||
139 | return holidays | ||
140 | |||
141 | @staticmethod | ||
142 | def config(): | ||
143 | config = configparser.ConfigParser() | ||
144 | config_dir = BaseDirectory.load_first_config('worktime') | ||
145 | config.read(f"{config_dir}/worktime.ini") | ||
146 | return config | ||
147 | |||
148 | def __init__(self, start_datetime=None, end_datetime=None, now=None, include_running=True, force_day_to_work=True, **kwargs): | ||
149 | self.include_running = include_running | ||
150 | self.force_day_to_work = force_day_to_work | ||
151 | |||
152 | if now: | ||
153 | self.now = now | ||
154 | |||
155 | config = Worktime.config() | ||
156 | config_dir = BaseDirectory.load_first_config('worktime') | ||
157 | api = TogglAPI(api_token=config['TOGGL']['ApiToken'], workspace_id=config['TOGGL']['Workspace']) | ||
158 | date_format = config.get('WORKTIME', 'DateFormat', fallback='%Y-%m-%d') | ||
159 | |||
160 | start_date = start_datetime or datetime.strptime(config['WORKTIME']['StartDate'], date_format).replace(tzinfo=tzlocal()) | ||
161 | end_date = end_datetime or self.now | ||
162 | |||
163 | try: | ||
164 | with open(f"{config_dir}/reset", 'r') as reset: | ||
165 | for line in reset: | ||
166 | stripped_line = line.strip() | ||
167 | reset_date = datetime.strptime(stripped_line, date_format).replace(tzinfo=tzlocal()) | ||
168 | |||
169 | if reset_date > start_date and reset_date < end_date: | ||
170 | start_date = reset_date | ||
171 | except IOError as e: | ||
172 | if e.errno != 2: | ||
173 | raise e | ||
174 | |||
175 | |||
176 | hours_per_week = float(config.get('WORKTIME', 'HoursPerWeek', fallback=40)) | ||
177 | workdays = set([int(d.strip()) for d in config.get('WORKTIME', 'Workdays', fallback='1,2,3,4,5').split(',')]) | ||
178 | time_per_day = timedelta(hours = hours_per_week) / len(workdays) | ||
179 | |||
180 | holidays = dict() | ||
181 | |||
182 | for year in range(start_date.year, end_date.year + 1): | ||
183 | holidays |= {k: v * time_per_day for k, v in Worktime.holidays(year).items()} | ||
184 | |||
185 | try: | ||
186 | with open(f"{config_dir}/excused", 'r') as excused: | ||
187 | for line in excused: | ||
188 | stripped_line = line.strip() | ||
189 | if stripped_line: | ||
190 | splitLine = stripped_line.split(' ') | ||
191 | if len(splitLine) == 2: | ||
192 | [hours, date] = splitLine | ||
193 | day = datetime.strptime(date, date_format).replace(tzinfo=tzlocal()).date() | ||
194 | holidays[day] = timedelta(hours = float(hours)) | ||
195 | else: | ||
196 | holidays[datetime.strptime(stripped_line, date_format).replace(tzinfo=tzlocal()).date()] = time_per_day | ||
197 | except IOError as e: | ||
198 | if e.errno != 2: | ||
199 | raise e | ||
200 | |||
201 | pull_forward = dict() | ||
202 | |||
203 | start_day = start_date.date() | ||
204 | end_day = end_date.date() | ||
205 | |||
206 | try: | ||
207 | with open(f"{config_dir}/pull-forward", 'r') as excused: | ||
208 | for line in excused: | ||
209 | stripped_line = line.strip() | ||
210 | if stripped_line: | ||
211 | [hours, date] = stripped_line.split(' ') | ||
212 | constr = date.split(',') | ||
213 | for d in [start_day + timedelta(days = x) for x in range(0, (end_day - start_day).days + 1 + int(timedelta(hours = float(hours)).total_seconds() / 60 * (7 / len(workdays)) * 2))]: | ||
214 | for c in constr: | ||
215 | if c in calendar.day_abbr: | ||
216 | if not d.strftime('%a') == c: break | ||
217 | elif "--" in c: | ||
218 | [fromDay,toDay] = c.split('--') | ||
219 | if fromDay != "": | ||
220 | fromDay = datetime.strptime(fromDay, date_format).replace(tzinfo=tzlocal()).date() | ||
221 | if not fromDay <= d: break | ||
222 | if toDay != "": | ||
223 | toDay = datetime.strptime(toDay, date_format).replace(tzinfo=tzlocal()).date() | ||
224 | if not d <= toDay: break | ||
225 | else: | ||
226 | if not d == datetime.strptime(c, date_format).replace(tzinfo=tzlocal()).date(): break | ||
227 | else: | ||
228 | if d >= end_date.date(): | ||
229 | pull_forward[d] = min(timedelta(hours = float(hours)), time_per_day - (holidays[d] if d in holidays else timedelta())) | ||
230 | except IOError as e: | ||
231 | if e.errno != 2: | ||
232 | raise e | ||
233 | |||
234 | days_to_work = dict() | ||
235 | |||
236 | if pull_forward: | ||
237 | end_day = max(end_day, max(list(pull_forward))) | ||
238 | |||
239 | for day in [start_day + timedelta(days = x) for x in range(0, (end_day - start_day).days + 1)]: | ||
240 | if day.isoweekday() in workdays: | ||
241 | time_to_work = time_per_day | ||
242 | if day in holidays.keys(): | ||
243 | time_to_work -= holidays[day] | ||
244 | if time_to_work > timedelta(): | ||
245 | days_to_work[day] = time_to_work | ||
246 | |||
247 | extra_days_to_work = dict() | ||
248 | |||
249 | try: | ||
250 | with open(f"{config_dir}/days-to-work", 'r') as extra_days_to_work_file: | ||
251 | for line in extra_days_to_work_file: | ||
252 | stripped_line = line.strip() | ||
253 | if stripped_line: | ||
254 | splitLine = stripped_line.split(' ') | ||
255 | if len(splitLine) == 2: | ||
256 | [hours, date] = splitLine | ||
257 | day = datetime.strptime(date, date_format).replace(tzinfo=tzlocal()).date() | ||
258 | extra_days_to_work[day] = timedelta(hours = float(hours)) | ||
259 | else: | ||
260 | extra_days_to_work[datetime.strptime(stripped_line, date_format).replace(tzinfo=tzlocal()).date()] = time_per_day | ||
261 | except IOError as e: | ||
262 | if e.errno != 2: | ||
263 | raise e | ||
264 | |||
265 | |||
266 | self.is_workday = self.now.date() in days_to_work or self.now.date() in extra_days_to_work | ||
267 | |||
268 | self.time_worked = timedelta() | ||
269 | |||
270 | if self.include_running: | ||
271 | self.running_entry = api.get_running_clock(self.now) | ||
272 | |||
273 | if self.running_entry: | ||
274 | self.time_worked += self.running_entry | ||
275 | |||
276 | if self.running_entry and self.include_running and self.force_day_to_work and not (self.now.date() in days_to_work or self.now.date() in extra_days_to_work): | ||
277 | extra_days_to_work[self.now.date()] = timedelta() | ||
278 | |||
279 | self.time_to_work = sum([days_to_work[day] for day in days_to_work.keys() if day <= end_date.date()], timedelta()) | ||
280 | for day in [d for d in list(pull_forward) if d > end_date.date()]: | ||
281 | days_forward = set([d for d in days_to_work.keys() if d >= end_date.date() and d < day and (not d in pull_forward or d == end_date.date())]) | ||
282 | extra_days_forward = set([d for d in extra_days_to_work.keys() if d >= end_date.date() and d < day and (not d in pull_forward or d == end_date.date())]) | ||
283 | days_forward = days_forward.union(extra_days_forward) | ||
284 | |||
285 | extra_day_time_left = timedelta() | ||
286 | for extra_day in extra_days_forward: | ||
287 | day_time = max(timedelta(), time_per_day - extra_days_to_work[extra_day]) | ||
288 | extra_day_time_left += day_time | ||
289 | extra_day_time = min(extra_day_time_left, pull_forward[day]) | ||
290 | time_forward = pull_forward[day] - extra_day_time | ||
291 | if extra_day_time_left > timedelta(): | ||
292 | for extra_day in extra_days_forward: | ||
293 | day_time = max(timedelta(), time_per_day - extra_days_to_work[extra_day]) | ||
294 | extra_days_to_work[extra_day] += extra_day_time * (day_time / extra_day_time_left) | ||
295 | |||
296 | hours_per_day_forward = time_forward / len(days_forward) if len(days_forward) > 0 else timedelta() | ||
297 | days_forward.discard(end_date.date()) | ||
298 | |||
299 | self.time_pulled_forward += time_forward - hours_per_day_forward * len(days_forward) | ||
300 | |||
301 | if end_date.date() in extra_days_to_work: | ||
302 | self.time_pulled_forward += extra_days_to_work[end_date.date()] | ||
303 | |||
304 | self.time_to_work += self.time_pulled_forward | ||
305 | |||
306 | self.time_worked += api.get_billable_hours(start_date, self.now, rounding = config.getboolean('WORKTIME', 'rounding', fallback=True)) | ||
307 | |||
308 | def worktime(**args): | ||
309 | worktime = Worktime(**args) | ||
310 | |||
311 | def format_worktime(worktime): | ||
312 | def difference_string(difference): | ||
313 | total_minutes_difference = round(difference / timedelta(minutes = 1)) | ||
314 | (hours_difference, minutes_difference) = divmod(abs(total_minutes_difference), 60) | ||
315 | sign = '' if total_minutes_difference >= 0 else '-' | ||
316 | |||
317 | difference_string = f"{sign}" | ||
318 | if hours_difference != 0: | ||
319 | difference_string += f"{hours_difference}h" | ||
320 | if hours_difference == 0 or minutes_difference != 0: | ||
321 | difference_string += f"{minutes_difference}m" | ||
322 | |||
323 | return difference_string | ||
324 | |||
325 | difference = worktime.time_to_work - worktime.time_worked | ||
326 | total_minutes_difference = 5 * ceil(difference / timedelta(minutes = 5)) | ||
327 | |||
328 | if worktime.running_entry and abs(difference) < timedelta(days = 1) and (total_minutes_difference > 0 or abs(worktime.running_entry) >= abs(difference)) : | ||
329 | clockout_time = worktime.now + difference | ||
330 | clockout_time += (5 - clockout_time.minute % 5) * timedelta(minutes = 1) | ||
331 | clockout_time = clockout_time.replace(second = 0, microsecond = 0) | ||
332 | |||
333 | if total_minutes_difference >= 0: | ||
334 | difference_string = difference_string(total_minutes_difference * timedelta(minutes = 1)) | ||
335 | return "{difference_string}/{clockout_time}".format(difference_string = difference_string, clockout_time = clockout_time.strftime("%H:%M")) | ||
336 | else: | ||
337 | difference_string = difference_string(abs(total_minutes_difference) * timedelta(minutes = 1)) | ||
338 | return "{clockout_time}/{difference_string}".format(difference_string = difference_string, clockout_time = clockout_time.strftime("%H:%M")) | ||
339 | else: | ||
340 | if worktime.running_entry: | ||
341 | difference_string = difference_string(abs(total_minutes_difference) * timedelta(minutes = 1)) | ||
342 | indicator = '↓' if total_minutes_difference >= 0 else '↑' # '\u25b6' | ||
343 | |||
344 | return f"{indicator}{difference_string}" | ||
345 | else: | ||
346 | difference_string = difference_string(total_minutes_difference * timedelta(minutes = 1)) | ||
347 | if worktime.is_workday: | ||
348 | return difference_string | ||
349 | else: | ||
350 | return f"({difference_string})" | ||
351 | |||
352 | if worktime.time_pulled_forward >= timedelta(minutes = 15): | ||
353 | worktime_no_pulled_forward = deepcopy(worktime) | ||
354 | worktime_no_pulled_forward.time_to_work -= worktime_no_pulled_forward.time_pulled_forward | ||
355 | worktime_no_pulled_forward.time_pulled_forward = timedelta() | ||
356 | |||
357 | difference_string = format_worktime(worktime) | ||
358 | difference_string_no_pulled_forward = format_worktime(worktime_no_pulled_forward) | ||
359 | |||
360 | print(f"{difference_string_no_pulled_forward}…{difference_string}") | ||
361 | else: | ||
362 | print(format_worktime(worktime)) | ||
363 | |||
364 | def time_worked(now, **args): | ||
365 | then = now.replace(hour = 0, minute = 0, second = 0, microsecond = 0) | ||
366 | if now.time() == time(): | ||
367 | now = now + timedelta(days = 1) | ||
368 | |||
369 | then = Worktime(**dict(args, now = then)) | ||
370 | now = Worktime(**dict(args, now = now)) | ||
371 | |||
372 | worked = now.time_worked - then.time_worked | ||
373 | |||
374 | if args['do_round']: | ||
375 | total_minutes_difference = 5 * ceil(worked / timedelta(minutes = 5)) | ||
376 | (hours_difference, minutes_difference) = divmod(abs(total_minutes_difference), 60) | ||
377 | sign = '' if total_minutes_difference >= 0 else '-' | ||
378 | |||
379 | difference_string = f"{sign}" | ||
380 | if hours_difference != 0: | ||
381 | difference_string += f"{hours_difference}h" | ||
382 | if hours_difference == 0 or minutes_difference != 0: | ||
383 | difference_string += f"{minutes_difference}m" | ||
384 | |||
385 | print(difference_string) | ||
386 | else: | ||
387 | print(worked) | ||
388 | |||
389 | def diff(now, **args): | ||
390 | now = now.replace(hour = 0, minute = 0, second = 0, microsecond = 0) | ||
391 | then = now - timedelta.resolution | ||
392 | |||
393 | then = Worktime(**dict(args, now = then, include_running = False)) | ||
394 | now = Worktime(**dict(args, now = now, include_running = False)) | ||
395 | |||
396 | print(now.time_to_work - then.time_to_work) | ||
397 | |||
398 | def holidays(now, **args): | ||
399 | config = Worktime.config() | ||
400 | date_format = config.get('WORKTIME', 'DateFormat', fallback='%Y-%m-%d') | ||
401 | |||
402 | table_data = [] | ||
403 | |||
404 | holidays = Worktime.holidays(now.year) | ||
405 | for k, v in holidays.items(): | ||
406 | kstr = k.strftime(date_format) | ||
407 | |||
408 | table_data += [[kstr, v]] | ||
409 | print(tabulate(table_data, tablefmt="plain")) | ||
410 | |||
411 | def main(): | ||
412 | parser = argparse.ArgumentParser(prog = "worktime", description = 'Track worktime using toggl API') | ||
413 | parser.add_argument('--time', dest = 'now', metavar = 'TIME', type = lambda s: datetime.fromisoformat(s).replace(tzinfo=tzlocal()), help = 'Time to calculate status for (default: current time)', default = datetime.now(tzlocal())) | ||
414 | parser.add_argument('--no-running', dest = 'include_running', action = 'store_false') | ||
415 | parser.add_argument('--no-force-day-to-work', dest = 'force_day_to_work', action = 'store_false') | ||
416 | subparsers = parser.add_subparsers(help = 'Subcommands') | ||
417 | parser.set_defaults(cmd = worktime) | ||
418 | time_worked_parser = subparsers.add_parser('time_worked', aliases = ['time', 'worked', 'today']) | ||
419 | time_worked_parser.add_argument('--no-round', dest = 'do_round', action = 'store_false') | ||
420 | time_worked_parser.set_defaults(cmd = time_worked) | ||
421 | diff_parser = subparsers.add_parser('diff') | ||
422 | diff_parser.set_defaults(cmd = diff) | ||
423 | holidays_parser = subparsers.add_parser('holidays') | ||
424 | holidays_parser.set_defaults(cmd = holidays) | ||
425 | args = parser.parse_args() | ||
426 | |||
427 | args.cmd(**vars(args)) | ||
428 | |||
429 | if __name__ == "__main__": | ||
430 | sys.exit(main()) | ||