diff options
Diffstat (limited to 'worktime.py')
-rwxr-xr-x | worktime.py | 199 |
1 files changed, 199 insertions, 0 deletions
diff --git a/worktime.py b/worktime.py new file mode 100755 index 0000000..36436ae --- /dev/null +++ b/worktime.py | |||
@@ -0,0 +1,199 @@ | |||
1 | #!@python@/bin/python | ||
2 | |||
3 | import requests | ||
4 | from requests.auth import HTTPBasicAuth | ||
5 | from datetime import * | ||
6 | from xdg import (BaseDirectory) | ||
7 | import configparser | ||
8 | from uritools import uricompose | ||
9 | |||
10 | from dateutil.easter import * | ||
11 | from dateutil.tz import * | ||
12 | from dateutil.parser import isoparse | ||
13 | |||
14 | from enum import Enum | ||
15 | |||
16 | from math import (copysign, ceil) | ||
17 | |||
18 | class TogglAPISection(Enum): | ||
19 | TOGGL = '/api/v8' | ||
20 | REPORTS = '/reports/api/v2' | ||
21 | |||
22 | class TogglAPI(object): | ||
23 | def __init__(self, api_token, workspace_id): | ||
24 | self._api_token = api_token | ||
25 | self._workspace_id = workspace_id | ||
26 | |||
27 | def _make_url(self, api=TogglAPISection.TOGGL, section=['time_entries', 'current'], params={}): | ||
28 | if api is TogglAPISection.REPORTS: | ||
29 | params.update({'user_agent': 'worktime', 'workspace_id': self._workspace_id}) | ||
30 | |||
31 | api_path = api.value | ||
32 | section_path = '/'.join(section) | ||
33 | return uricompose(scheme='https', host='www.toggl.com', path=f"{api_path}/{section_path}", query=params) | ||
34 | |||
35 | def _query(self, url, method): | ||
36 | |||
37 | headers = {'content-type': 'application/json'} | ||
38 | |||
39 | if method == 'GET': | ||
40 | return requests.get(url, headers=headers, auth=HTTPBasicAuth(self._api_token, 'api_token')) | ||
41 | elif method == 'POST': | ||
42 | return requests.post(url, headers=headers, auth=HTTPBasicAuth(self._api_token, 'api_token')) | ||
43 | else: | ||
44 | raise ValueError(f"Undefined HTTP method “{method}”") | ||
45 | |||
46 | def get_billable_hours(self, start_date, end_date=datetime.now(timezone.utc)): | ||
47 | url = self._make_url(api = TogglAPISection.REPORTS, section = ['summary'], params={'since': start_date.astimezone(timezone.utc).isoformat(), 'until': end_date.astimezone(timezone.utc).isoformat()}) | ||
48 | r = self._query(url = url, method='GET') | ||
49 | return timedelta(milliseconds=r.json()['total_billable']) | ||
50 | |||
51 | def get_running_clock(self, now=datetime.now(timezone.utc)): | ||
52 | url = self._make_url(api = TogglAPISection.TOGGL, section = ['time_entries', 'current']) | ||
53 | r = self._query(url = url, method='GET').json() | ||
54 | |||
55 | if not r or not r['data'] or not r['data']['billable']: | ||
56 | return None | ||
57 | |||
58 | start = isoparse(r['data']['start']) | ||
59 | |||
60 | return now - start if start <= now else None | ||
61 | |||
62 | class Worktime(object): | ||
63 | time_to_work = timedelta() | ||
64 | time_worked = timedelta() | ||
65 | running_entry = None | ||
66 | now = datetime.now(tzlocal()) | ||
67 | time_pulled_forward = timedelta() | ||
68 | |||
69 | def __init__(self, start_datetime=None, end_datetime=None, now=None): | ||
70 | if now: | ||
71 | self.now = now | ||
72 | |||
73 | config = configparser.ConfigParser() | ||
74 | config_dir = BaseDirectory.load_first_config('worktime') | ||
75 | config.read(f"{config_dir}/worktime.ini") | ||
76 | api = TogglAPI(api_token=config['TOGGL']['ApiToken'], workspace_id=config['TOGGL']['Workspace']) | ||
77 | date_format = config.get('WORKTIME', 'DateFormat', fallback='%Y-%m-%d') | ||
78 | |||
79 | start_date = start_datetime or datetime.strptime(config['WORKTIME']['StartDate'], date_format).replace(tzinfo=tzlocal()) | ||
80 | end_date = end_datetime or self.now | ||
81 | |||
82 | hours_per_week = float(config.get('WORKTIME', 'HoursPerWeek', fallback=40)) | ||
83 | workdays = set([int(d.strip()) for d in config.get('WORKTIME', 'Workdays', fallback='1,2,3,4,5').split(',')]) | ||
84 | hours_per_day = hours_per_week / len(workdays) | ||
85 | |||
86 | holidays = set() | ||
87 | |||
88 | for year in range(start_date.year, end_date.year + 1): | ||
89 | y_easter = datetime.combine(easter(year), time(), tzinfo=tzlocal()) | ||
90 | |||
91 | # Legal holidays in munich, bavaria | ||
92 | holidays.add(datetime(year, 1, 1, tzinfo=tzlocal()).date()) | ||
93 | holidays.add(datetime(year, 1, 6, tzinfo=tzlocal()).date()) | ||
94 | holidays.add((y_easter+timedelta(days=-2)).date()) | ||
95 | holidays.add((y_easter+timedelta(days=+1)).date()) | ||
96 | holidays.add(datetime(year, 5, 1, tzinfo=tzlocal()).date()) | ||
97 | holidays.add((y_easter+timedelta(days=+39)).date()) | ||
98 | holidays.add((y_easter+timedelta(days=+50)).date()) | ||
99 | holidays.add((y_easter+timedelta(days=+60)).date()) | ||
100 | holidays.add(datetime(year, 8, 15, tzinfo=tzlocal()).date()) | ||
101 | holidays.add(datetime(year, 10, 3, tzinfo=tzlocal()).date()) | ||
102 | holidays.add(datetime(year, 11, 1, tzinfo=tzlocal()).date()) | ||
103 | holidays.add(datetime(year, 12, 25, tzinfo=tzlocal()).date()) | ||
104 | holidays.add(datetime(year, 12, 26, tzinfo=tzlocal()).date()) | ||
105 | |||
106 | try: | ||
107 | with open(f"{config_dir}/excused", 'r') as excused: | ||
108 | for line in excused: | ||
109 | stripped_line = line.strip() | ||
110 | if stripped_line: | ||
111 | holidays.add(datetime.strptime(stripped_line, date_format).replace(tzinfo=tzlocal()).date()) | ||
112 | except IOError as e: | ||
113 | if e.errno != 2: | ||
114 | raise e | ||
115 | |||
116 | pull_forward = dict() | ||
117 | |||
118 | try: | ||
119 | with open(f"{config_dir}/pull-forward", 'r') as excused: | ||
120 | for line in excused: | ||
121 | stripped_line = line.strip() | ||
122 | if stripped_line: | ||
123 | [hours, date] = stripped_line.split(' ') | ||
124 | day = datetime.strptime(date, date_format).replace(tzinfo=tzlocal()).date() | ||
125 | if day > end_date.date(): | ||
126 | pull_forward[day] = timedelta(hours = float(hours)) | ||
127 | except IOError as e: | ||
128 | if e.errno != 2: | ||
129 | raise e | ||
130 | |||
131 | |||
132 | days_to_work = set() | ||
133 | |||
134 | start_day = start_date.date() | ||
135 | end_day = end_date.date() | ||
136 | if pull_forward: | ||
137 | end_day = max(end_day, max(list(pull_forward))) | ||
138 | |||
139 | for day in [start_day + timedelta(days = x) for x in range(0, (end_day - start_day).days + 1)]: | ||
140 | if day.isoweekday() in workdays and not day in holidays: | ||
141 | days_to_work.add(day) | ||
142 | |||
143 | self.time_to_work = timedelta(hours = len([day for day in days_to_work if day <= end_date.date()]) * hours_per_day) | ||
144 | for day in list(pull_forward): | ||
145 | days_forward = set([d for d in days_to_work if d >= end_date.date() and d < day and not d in pull_forward]) | ||
146 | hours_per_day_forward = pull_forward[day] / len(days_forward) if len(days_forward) > 0 else timedelta() | ||
147 | days_forward.discard(end_date.date()) | ||
148 | self.time_pulled_forward += pull_forward[day] - hours_per_day_forward * len(days_forward) | ||
149 | self.time_to_work += self.time_pulled_forward | ||
150 | |||
151 | self.time_worked = api.get_billable_hours(start_date, self.now) | ||
152 | self.running_entry = api.get_running_clock(self.now) | ||
153 | |||
154 | if self.running_entry: | ||
155 | self.time_worked += self.running_entry | ||
156 | |||
157 | def main(): | ||
158 | worktime = Worktime() | ||
159 | |||
160 | def difference_string(difference): | ||
161 | total_minutes_difference = round(difference / timedelta(minutes = 1)) | ||
162 | (hours_difference, minutes_difference) = divmod(abs(total_minutes_difference), 60) | ||
163 | sign = '' if total_minutes_difference >= 0 else '-' | ||
164 | |||
165 | difference_string = f"{sign}" | ||
166 | if hours_difference != 0: | ||
167 | difference_string += f"{hours_difference}h" | ||
168 | if hours_difference == 0 or minutes_difference != 0: | ||
169 | difference_string += f"{minutes_difference}m" | ||
170 | |||
171 | return difference_string | ||
172 | |||
173 | |||
174 | difference = worktime.time_to_work - worktime.time_worked | ||
175 | total_minutes_difference = 5 * ceil(difference / timedelta(minutes = 5)) | ||
176 | |||
177 | if worktime.running_entry and abs(difference) < timedelta(days = 1) and (total_minutes_difference > 0 or abs(worktime.running_entry) >= abs(difference)) : | ||
178 | clockout_time = worktime.now + difference | ||
179 | clockout_time += (5 - clockout_time.minute % 5) * timedelta(minutes = 1) | ||
180 | clockout_time = clockout_time.replace(second = 0, microsecond = 0) | ||
181 | |||
182 | if total_minutes_difference >= 0: | ||
183 | difference_string = difference_string(total_minutes_difference * timedelta(minutes = 1)) | ||
184 | print("{difference_string}/{clockout_time}".format(difference_string = difference_string, clockout_time = clockout_time.strftime("%H:%M"))) | ||
185 | else: | ||
186 | difference_string = difference_string(abs(total_minutes_difference) * timedelta(minutes = 1)) | ||
187 | print("{clockout_time}/{difference_string}".format(difference_string = difference_string, clockout_time = clockout_time.strftime("%H:%M"))) | ||
188 | else: | ||
189 | if worktime.running_entry: | ||
190 | difference_string = difference_string(abs(total_minutes_difference) * timedelta(minutes = 1)) | ||
191 | indicator = '↘' if total_minutes_difference >= 0 else '↗' # '\u25b6' | ||
192 | |||
193 | print(f"{indicator} {difference_string}") | ||
194 | else: | ||
195 | difference_string = difference_string(total_minutes_difference * timedelta(minutes = 1)) | ||
196 | print(difference_string) | ||
197 | |||
198 | if __name__ == "__main__": | ||
199 | sys.exit(main()) | ||