summaryrefslogtreecommitdiff
path: root/overlays/worktime
diff options
context:
space:
mode:
Diffstat (limited to 'overlays/worktime')
-rw-r--r--overlays/worktime/default.nix19
-rwxr-xr-xoverlays/worktime/worktime.py430
2 files changed, 449 insertions, 0 deletions
diff --git a/overlays/worktime/default.nix b/overlays/worktime/default.nix
new file mode 100644
index 00000000..ab6fb40a
--- /dev/null
+++ b/overlays/worktime/default.nix
@@ -0,0 +1,19 @@
1final: prev: {
2 worktime = prev.stdenv.mkDerivation rec {
3 name = "worktime";
4 src = ./worktime.py;
5
6 phases = [ "buildPhase" "installPhase" ];
7
8 python = prev.python39.withPackages (ps: with ps; [pyxdg dateutil uritools requests configparser tabulate]);
9
10 buildPhase = ''
11 substituteAll $src worktime
12 '';
13
14 installPhase = ''
15 install -m 0755 -D -t $out/bin \
16 worktime
17 '';
18 };
19}
diff --git a/overlays/worktime/worktime.py b/overlays/worktime/worktime.py
new file mode 100755
index 00000000..c7b013f9
--- /dev/null
+++ b/overlays/worktime/worktime.py
@@ -0,0 +1,430 @@
1#!@python@/bin/python
2
3import requests
4from requests.exceptions import HTTPError
5from requests.auth import HTTPBasicAuth
6from datetime import *
7from xdg import (BaseDirectory)
8import configparser
9from uritools import uricompose
10
11from dateutil.easter import *
12from dateutil.tz import *
13from dateutil.parser import isoparse
14
15from enum import Enum
16
17from math import (copysign, ceil)
18
19import calendar
20
21import argparse
22
23from copy import deepcopy
24
25import sys
26
27from tabulate import tabulate
28
29class TogglAPISection(Enum):
30 TOGGL = '/api/v8'
31 REPORTS = '/reports/api/v2'
32
33class TogglAPIError(Exception):
34 def __init__(self, http_error, response):
35 self.http_error = http_error
36 self.response = response
37
38 def __str__(self):
39 if not self.http_error is None:
40 return str(self.http_error)
41 else:
42 return self.response.text
43
44class TogglAPI(object):
45 def __init__(self, api_token, workspace_id):
46 self._api_token = api_token
47 self._workspace_id = workspace_id
48
49 def _make_url(self, api=TogglAPISection.TOGGL, section=['time_entries', 'current'], params={}):
50 if api is TogglAPISection.REPORTS:
51 params.update({'user_agent': 'worktime', 'workspace_id': self._workspace_id})
52
53 api_path = api.value
54 section_path = '/'.join(section)
55 uri = uricompose(scheme='https', host='api.track.toggl.com', path=f"{api_path}/{section_path}", query=params)
56
57 return uri
58
59 def _query(self, url, method):
60
61 headers = {'content-type': 'application/json'}
62 response = None
63
64 if method == 'GET':
65 response = requests.get(url, headers=headers, auth=HTTPBasicAuth(self._api_token, 'api_token'))
66 elif method == 'POST':
67 response = requests.post(url, headers=headers, auth=HTTPBasicAuth(self._api_token, 'api_token'))
68 else:
69 raise ValueError(f"Undefined HTTP method “{method}”")
70
71 response.raise_for_status()
72
73 return response
74
75 def get_billable_hours(self, start_date, end_date=datetime.now(timezone.utc), rounding=False):
76 billable_acc = timedelta(milliseconds = 0)
77 step = timedelta(days = 365)
78
79 for req_start in [start_date + x * step for x in range(0, ceil((end_date - start_date) / step))]:
80 req_end = end_date
81 if end_date > req_start + step:
82 req_end = datetime.combine((req_start + step).astimezone(timezone.utc).date(), time(tzinfo=timezone.utc))
83 elif req_start > start_date:
84 req_start = datetime.combine(req_start.astimezone(timezone.utc).date(), time(tzinfo=timezone.utc)) + timedelta(days = 1)
85
86 url = self._make_url(api = TogglAPISection.REPORTS, section = ['summary'], params={'since': req_start.astimezone(timezone.utc).isoformat(), 'until': req_end.astimezone(timezone.utc).isoformat(), 'rounding': rounding})
87 r = self._query(url = url, method='GET')
88 if not r or not r.json():
89 raise TogglAPIError(r)
90 billable_acc += timedelta(milliseconds=r.json()['total_billable']) if r.json()['total_billable'] else timedelta(milliseconds=0)
91
92 return billable_acc
93
94 def get_running_clock(self, now=datetime.now(timezone.utc)):
95 url = self._make_url(api = TogglAPISection.TOGGL, section = ['time_entries', 'current'])
96 r = self._query(url = url, method='GET')
97
98 if not r or not r.json():
99 raise TogglAPIError(r)
100
101 if not r.json()['data'] or not r.json()['data']['billable']:
102 return None
103
104 start = isoparse(r.json()['data']['start'])
105
106 return now - start if start <= now else None
107
108class Worktime(object):
109 time_worked = timedelta()
110 running_entry = None
111 now = datetime.now(tzlocal())
112 time_pulled_forward = timedelta()
113 is_workday = False
114 include_running = True
115 time_to_work = None
116 force_day_to_work = True
117
118 @staticmethod
119 def holidays(year):
120 holidays = dict()
121
122 y_easter = datetime.combine(easter(year), time(), tzinfo=tzlocal())
123
124 # Legal holidays in munich, bavaria
125 holidays[datetime(year, 1, 1, tzinfo=tzlocal()).date()] = 1
126 holidays[datetime(year, 1, 6, tzinfo=tzlocal()).date()] = 1
127 holidays[(y_easter+timedelta(days=-2)).date()] = 1
128 holidays[(y_easter+timedelta(days=+1)).date()] = 1
129 holidays[datetime(year, 5, 1, tzinfo=tzlocal()).date()] = 1
130 holidays[(y_easter+timedelta(days=+39)).date()] = 1
131 holidays[(y_easter+timedelta(days=+50)).date()] = 1
132 holidays[(y_easter+timedelta(days=+60)).date()] = 1
133 holidays[datetime(year, 8, 15, tzinfo=tzlocal()).date()] = 1
134 holidays[datetime(year, 10, 3, tzinfo=tzlocal()).date()] = 1
135 holidays[datetime(year, 11, 1, tzinfo=tzlocal()).date()] = 1
136 holidays[datetime(year, 12, 25, tzinfo=tzlocal()).date()] = 1
137 holidays[datetime(year, 12, 26, tzinfo=tzlocal()).date()] = 1
138
139 return holidays
140
141 @staticmethod
142 def config():
143 config = configparser.ConfigParser()
144 config_dir = BaseDirectory.load_first_config('worktime')
145 config.read(f"{config_dir}/worktime.ini")
146 return config
147
148 def __init__(self, start_datetime=None, end_datetime=None, now=None, include_running=True, force_day_to_work=True, **kwargs):
149 self.include_running = include_running
150 self.force_day_to_work = force_day_to_work
151
152 if now:
153 self.now = now
154
155 config = Worktime.config()
156 config_dir = BaseDirectory.load_first_config('worktime')
157 api = TogglAPI(api_token=config['TOGGL']['ApiToken'], workspace_id=config['TOGGL']['Workspace'])
158 date_format = config.get('WORKTIME', 'DateFormat', fallback='%Y-%m-%d')
159
160 start_date = start_datetime or datetime.strptime(config['WORKTIME']['StartDate'], date_format).replace(tzinfo=tzlocal())
161 end_date = end_datetime or self.now
162
163 try:
164 with open(f"{config_dir}/reset", 'r') as reset:
165 for line in reset:
166 stripped_line = line.strip()
167 reset_date = datetime.strptime(stripped_line, date_format).replace(tzinfo=tzlocal())
168
169 if reset_date > start_date and reset_date < end_date:
170 start_date = reset_date
171 except IOError as e:
172 if e.errno != 2:
173 raise e
174
175
176 hours_per_week = float(config.get('WORKTIME', 'HoursPerWeek', fallback=40))
177 workdays = set([int(d.strip()) for d in config.get('WORKTIME', 'Workdays', fallback='1,2,3,4,5').split(',')])
178 time_per_day = timedelta(hours = hours_per_week) / len(workdays)
179
180 holidays = dict()
181
182 for year in range(start_date.year, end_date.year + 1):
183 holidays |= {k: v * time_per_day for k, v in Worktime.holidays(year).items()}
184
185 try:
186 with open(f"{config_dir}/excused", 'r') as excused:
187 for line in excused:
188 stripped_line = line.strip()
189 if stripped_line:
190 splitLine = stripped_line.split(' ')
191 if len(splitLine) == 2:
192 [hours, date] = splitLine
193 day = datetime.strptime(date, date_format).replace(tzinfo=tzlocal()).date()
194 holidays[day] = timedelta(hours = float(hours))
195 else:
196 holidays[datetime.strptime(stripped_line, date_format).replace(tzinfo=tzlocal()).date()] = time_per_day
197 except IOError as e:
198 if e.errno != 2:
199 raise e
200
201 pull_forward = dict()
202
203 start_day = start_date.date()
204 end_day = end_date.date()
205
206 try:
207 with open(f"{config_dir}/pull-forward", 'r') as excused:
208 for line in excused:
209 stripped_line = line.strip()
210 if stripped_line:
211 [hours, date] = stripped_line.split(' ')
212 constr = date.split(',')
213 for d in [start_day + timedelta(days = x) for x in range(0, (end_day - start_day).days + 1 + int(timedelta(hours = float(hours)).total_seconds() / 60 * (7 / len(workdays)) * 2))]:
214 for c in constr:
215 if c in calendar.day_abbr:
216 if not d.strftime('%a') == c: break
217 elif "--" in c:
218 [fromDay,toDay] = c.split('--')
219 if fromDay != "":
220 fromDay = datetime.strptime(fromDay, date_format).replace(tzinfo=tzlocal()).date()
221 if not fromDay <= d: break
222 if toDay != "":
223 toDay = datetime.strptime(toDay, date_format).replace(tzinfo=tzlocal()).date()
224 if not d <= toDay: break
225 else:
226 if not d == datetime.strptime(c, date_format).replace(tzinfo=tzlocal()).date(): break
227 else:
228 if d >= end_date.date():
229 pull_forward[d] = min(timedelta(hours = float(hours)), time_per_day - (holidays[d] if d in holidays else timedelta()))
230 except IOError as e:
231 if e.errno != 2:
232 raise e
233
234 days_to_work = dict()
235
236 if pull_forward:
237 end_day = max(end_day, max(list(pull_forward)))
238
239 for day in [start_day + timedelta(days = x) for x in range(0, (end_day - start_day).days + 1)]:
240 if day.isoweekday() in workdays:
241 time_to_work = time_per_day
242 if day in holidays.keys():
243 time_to_work -= holidays[day]
244 if time_to_work > timedelta():
245 days_to_work[day] = time_to_work
246
247 extra_days_to_work = dict()
248
249 try:
250 with open(f"{config_dir}/days-to-work", 'r') as extra_days_to_work_file:
251 for line in extra_days_to_work_file:
252 stripped_line = line.strip()
253 if stripped_line:
254 splitLine = stripped_line.split(' ')
255 if len(splitLine) == 2:
256 [hours, date] = splitLine
257 day = datetime.strptime(date, date_format).replace(tzinfo=tzlocal()).date()
258 extra_days_to_work[day] = timedelta(hours = float(hours))
259 else:
260 extra_days_to_work[datetime.strptime(stripped_line, date_format).replace(tzinfo=tzlocal()).date()] = time_per_day
261 except IOError as e:
262 if e.errno != 2:
263 raise e
264
265
266 self.is_workday = self.now.date() in days_to_work or self.now.date() in extra_days_to_work
267
268 self.time_worked = timedelta()
269
270 if self.include_running:
271 self.running_entry = api.get_running_clock(self.now)
272
273 if self.running_entry:
274 self.time_worked += self.running_entry
275
276 if self.running_entry and self.include_running and self.force_day_to_work and not (self.now.date() in days_to_work or self.now.date() in extra_days_to_work):
277 extra_days_to_work[self.now.date()] = timedelta()
278
279 self.time_to_work = sum([days_to_work[day] for day in days_to_work.keys() if day <= end_date.date()], timedelta())
280 for day in [d for d in list(pull_forward) if d > end_date.date()]:
281 days_forward = set([d for d in days_to_work.keys() if d >= end_date.date() and d < day and (not d in pull_forward or d == end_date.date())])
282 extra_days_forward = set([d for d in extra_days_to_work.keys() if d >= end_date.date() and d < day and (not d in pull_forward or d == end_date.date())])
283 days_forward = days_forward.union(extra_days_forward)
284
285 extra_day_time_left = timedelta()
286 for extra_day in extra_days_forward:
287 day_time = max(timedelta(), time_per_day - extra_days_to_work[extra_day])
288 extra_day_time_left += day_time
289 extra_day_time = min(extra_day_time_left, pull_forward[day])
290 time_forward = pull_forward[day] - extra_day_time
291 if extra_day_time_left > timedelta():
292 for extra_day in extra_days_forward:
293 day_time = max(timedelta(), time_per_day - extra_days_to_work[extra_day])
294 extra_days_to_work[extra_day] += extra_day_time * (day_time / extra_day_time_left)
295
296 hours_per_day_forward = time_forward / len(days_forward) if len(days_forward) > 0 else timedelta()
297 days_forward.discard(end_date.date())
298
299 self.time_pulled_forward += time_forward - hours_per_day_forward * len(days_forward)
300
301 if end_date.date() in extra_days_to_work:
302 self.time_pulled_forward += extra_days_to_work[end_date.date()]
303
304 self.time_to_work += self.time_pulled_forward
305
306 self.time_worked += api.get_billable_hours(start_date, self.now, rounding = config.getboolean('WORKTIME', 'rounding', fallback=True))
307
308def worktime(**args):
309 worktime = Worktime(**args)
310
311 def format_worktime(worktime):
312 def difference_string(difference):
313 total_minutes_difference = round(difference / timedelta(minutes = 1))
314 (hours_difference, minutes_difference) = divmod(abs(total_minutes_difference), 60)
315 sign = '' if total_minutes_difference >= 0 else '-'
316
317 difference_string = f"{sign}"
318 if hours_difference != 0:
319 difference_string += f"{hours_difference}h"
320 if hours_difference == 0 or minutes_difference != 0:
321 difference_string += f"{minutes_difference}m"
322
323 return difference_string
324
325 difference = worktime.time_to_work - worktime.time_worked
326 total_minutes_difference = 5 * ceil(difference / timedelta(minutes = 5))
327
328 if worktime.running_entry and abs(difference) < timedelta(days = 1) and (total_minutes_difference > 0 or abs(worktime.running_entry) >= abs(difference)) :
329 clockout_time = worktime.now + difference
330 clockout_time += (5 - clockout_time.minute % 5) * timedelta(minutes = 1)
331 clockout_time = clockout_time.replace(second = 0, microsecond = 0)
332
333 if total_minutes_difference >= 0:
334 difference_string = difference_string(total_minutes_difference * timedelta(minutes = 1))
335 return "{difference_string}/{clockout_time}".format(difference_string = difference_string, clockout_time = clockout_time.strftime("%H:%M"))
336 else:
337 difference_string = difference_string(abs(total_minutes_difference) * timedelta(minutes = 1))
338 return "{clockout_time}/{difference_string}".format(difference_string = difference_string, clockout_time = clockout_time.strftime("%H:%M"))
339 else:
340 if worktime.running_entry:
341 difference_string = difference_string(abs(total_minutes_difference) * timedelta(minutes = 1))
342 indicator = '↓' if total_minutes_difference >= 0 else '↑' # '\u25b6'
343
344 return f"{indicator}{difference_string}"
345 else:
346 difference_string = difference_string(total_minutes_difference * timedelta(minutes = 1))
347 if worktime.is_workday:
348 return difference_string
349 else:
350 return f"({difference_string})"
351
352 if worktime.time_pulled_forward >= timedelta(minutes = 15):
353 worktime_no_pulled_forward = deepcopy(worktime)
354 worktime_no_pulled_forward.time_to_work -= worktime_no_pulled_forward.time_pulled_forward
355 worktime_no_pulled_forward.time_pulled_forward = timedelta()
356
357 difference_string = format_worktime(worktime)
358 difference_string_no_pulled_forward = format_worktime(worktime_no_pulled_forward)
359
360 print(f"{difference_string_no_pulled_forward}…{difference_string}")
361 else:
362 print(format_worktime(worktime))
363
364def time_worked(now, **args):
365 then = now.replace(hour = 0, minute = 0, second = 0, microsecond = 0)
366 if now.time() == time():
367 now = now + timedelta(days = 1)
368
369 then = Worktime(**dict(args, now = then))
370 now = Worktime(**dict(args, now = now))
371
372 worked = now.time_worked - then.time_worked
373
374 if args['do_round']:
375 total_minutes_difference = 5 * ceil(worked / timedelta(minutes = 5))
376 (hours_difference, minutes_difference) = divmod(abs(total_minutes_difference), 60)
377 sign = '' if total_minutes_difference >= 0 else '-'
378
379 difference_string = f"{sign}"
380 if hours_difference != 0:
381 difference_string += f"{hours_difference}h"
382 if hours_difference == 0 or minutes_difference != 0:
383 difference_string += f"{minutes_difference}m"
384
385 print(difference_string)
386 else:
387 print(worked)
388
389def diff(now, **args):
390 now = now.replace(hour = 0, minute = 0, second = 0, microsecond = 0)
391 then = now - timedelta.resolution
392
393 then = Worktime(**dict(args, now = then, include_running = False))
394 now = Worktime(**dict(args, now = now, include_running = False))
395
396 print(now.time_to_work - then.time_to_work)
397
398def holidays(now, **args):
399 config = Worktime.config()
400 date_format = config.get('WORKTIME', 'DateFormat', fallback='%Y-%m-%d')
401
402 table_data = []
403
404 holidays = Worktime.holidays(now.year)
405 for k, v in holidays.items():
406 kstr = k.strftime(date_format)
407
408 table_data += [[kstr, v]]
409 print(tabulate(table_data, tablefmt="plain"))
410
411def main():
412 parser = argparse.ArgumentParser(prog = "worktime", description = 'Track worktime using toggl API')
413 parser.add_argument('--time', dest = 'now', metavar = 'TIME', type = lambda s: datetime.fromisoformat(s).replace(tzinfo=tzlocal()), help = 'Time to calculate status for (default: current time)', default = datetime.now(tzlocal()))
414 parser.add_argument('--no-running', dest = 'include_running', action = 'store_false')
415 parser.add_argument('--no-force-day-to-work', dest = 'force_day_to_work', action = 'store_false')
416 subparsers = parser.add_subparsers(help = 'Subcommands')
417 parser.set_defaults(cmd = worktime)
418 time_worked_parser = subparsers.add_parser('time_worked', aliases = ['time', 'worked', 'today'])
419 time_worked_parser.add_argument('--no-round', dest = 'do_round', action = 'store_false')
420 time_worked_parser.set_defaults(cmd = time_worked)
421 diff_parser = subparsers.add_parser('diff')
422 diff_parser.set_defaults(cmd = diff)
423 holidays_parser = subparsers.add_parser('holidays')
424 holidays_parser.set_defaults(cmd = holidays)
425 args = parser.parse_args()
426
427 args.cmd(**vars(args))
428
429if __name__ == "__main__":
430 sys.exit(main())