From 7e7656e22ced47bec5ea5bae1da08e3ef48d2e42 Mon Sep 17 00:00:00 2001 From: Gregor Kleen Date: Tue, 4 Apr 2023 10:23:20 +0200 Subject: worktime... --- overlays/worktime/worktime.py | 619 ------------------------------------------ 1 file changed, 619 deletions(-) delete mode 100755 overlays/worktime/worktime.py (limited to 'overlays/worktime/worktime.py') diff --git a/overlays/worktime/worktime.py b/overlays/worktime/worktime.py deleted file mode 100755 index 46197b6e..00000000 --- a/overlays/worktime/worktime.py +++ /dev/null @@ -1,619 +0,0 @@ -#!@python@/bin/python - -import requests -from requests.exceptions import HTTPError -from requests.auth import HTTPBasicAuth -from datetime import * -from xdg import (BaseDirectory) -import configparser -from uritools import uricompose - -from dateutil.easter import * -from dateutil.tz import * -from dateutil.parser import isoparse - -from enum import Enum - -from math import (copysign, ceil, floor) - -import calendar - -import argparse - -from copy import deepcopy - -import sys -from sys import stderr - -from tabulate import tabulate - -from itertools import groupby -from functools import cache - -import backoff - - -class TogglAPISection(Enum): - TOGGL = '/api/v8' - REPORTS = '/reports/api/v2' - -class TogglAPIError(Exception): - def __init__(self, http_error, response): - self.http_error = http_error - self.response = response - - def __str__(self): - if not self.http_error is None: - return str(self.http_error) - else: - return self.response.text - -class TogglAPI(object): - def __init__(self, api_token, workspace_id, client_ids): - self._api_token = api_token - self._workspace_id = workspace_id - self._client_ids = set(map(int, client_ids.split(','))) if client_ids else None - - def _make_url(self, api=TogglAPISection.TOGGL, section=['time_entries', 'current'], params={}): - if api is TogglAPISection.REPORTS: - params.update({'user_agent': 'worktime', 'workspace_id': self._workspace_id}) - - api_path = api.value - section_path = '/'.join(section) - uri = uricompose(scheme='https', host='api.track.toggl.com', path=f"{api_path}/{section_path}", query=params) - - return uri - - def _query(self, url, method): - response = self._raw_query(url, method) - response.raise_for_status() - return response - - @backoff.on_predicate( - backoff.expo, - factor=0.1, max_value=2, - predicate=lambda r: r.status_code == 429, - max_time=10, - ) - def _raw_query(self, url, method): - headers = {'content-type': 'application/json'} - response = None - - if method == 'GET': - response = requests.get(url, headers=headers, auth=HTTPBasicAuth(self._api_token, 'api_token')) - elif method == 'POST': - response = requests.post(url, headers=headers, auth=HTTPBasicAuth(self._api_token, 'api_token')) - else: - raise ValueError(f"Undefined HTTP method “{method}”") - - return response - - def get_billable_hours(self, start_date, end_date=datetime.now(timezone.utc), rounding=False): - billable_acc = timedelta(milliseconds = 0) - step = timedelta(days = 365) - - for req_start in [start_date + x * step for x in range(0, ceil((end_date - start_date) / step))]: - req_end = end_date - if end_date > req_start + step: - req_end = datetime.combine((req_start + step).astimezone(timezone.utc).date(), time(tzinfo=timezone.utc)) - elif req_start > start_date: - req_start = datetime.combine(req_start.astimezone(timezone.utc).date(), time(tzinfo=timezone.utc)) + timedelta(days = 1) - - def get_report(client_ids = self._client_ids): - nonlocal req_start, req_end, rounding, self - - if client_ids is not None and not client_ids: - return timedelta(milliseconds = 0) - - params = { 'since': req_start.astimezone(timezone.utc).isoformat(), - 'until': req_end.astimezone(timezone.utc).isoformat(), - 'rounding': rounding, - 'billable': 'yes' - } - if client_ids is not None: - params |= { 'client_ids': ','.join(map(str, client_ids)) } - url = self._make_url(api = TogglAPISection.REPORTS, section = ['summary'], params = params) - r = self._query(url = url, method='GET') - if not r or not r.json(): - raise TogglAPIError(r) - res = timedelta(milliseconds=r.json()['total_billable']) if r.json()['total_billable'] else timedelta(milliseconds=0) - return res - - if 0 in self._client_ids: - url = self._make_url(api = TogglAPISection.TOGGL, section = ['workspaces', self._workspace_id, 'clients']) - r = self._query(url = url, method = 'GET') - if not r or not r.json(): - raise TogglAPIError(r) - - billable_acc += get_report(None) - get_report(set(map(lambda c: c['id'], r.json()))) - - billable_acc += get_report(self._client_ids - {0}) - - return billable_acc - - def get_running_clock(self, now=datetime.now(timezone.utc)): - url = self._make_url(api = TogglAPISection.TOGGL, section = ['time_entries', 'current']) - r = self._query(url = url, method='GET') - - if not r or not r.json(): - raise TogglAPIError(r) - - if not r.json()['data'] or not r.json()['data']['billable']: - return None - - if self._client_ids is not None: - if 'pid' in r.json()['data'] and r.json()['data']['pid']: - url = self._make_url(api = TogglAPISection.TOGGL, section = ['projects', str(r.json()['data']['pid'])]) - pr = self._query(url = url, method = 'GET') - if not pr or not pr.json(): - raise TogglAPIError(pr) - - if not pr.json()['data']: - return None - - if 'cid' in pr.json()['data'] and pr.json()['data']['cid']: - if pr.json()['data']['cid'] not in self._client_ids: - return None - elif 0 not in self._client_ids: - return None - elif 0 not in self._client_ids: - return None - - start = isoparse(r.json()['data']['start']) - - return now - start if start <= now else None - -class Worktime(object): - time_worked = timedelta() - running_entry = None - now = datetime.now(tzlocal()) - time_pulled_forward = timedelta() - is_workday = False - include_running = True - time_to_work = None - force_day_to_work = True - leave_days = set() - leave_budget = dict() - time_per_day = None - workdays = None - - @staticmethod - @cache - def holidays(year): - holidays = dict() - - y_easter = datetime.combine(easter(year), time(), tzinfo=tzlocal()) - - # Legal holidays in munich, bavaria - holidays[datetime(year, 1, 1, tzinfo=tzlocal()).date()] = 1 - holidays[datetime(year, 1, 6, tzinfo=tzlocal()).date()] = 1 - holidays[(y_easter+timedelta(days=-2)).date()] = 1 - holidays[(y_easter+timedelta(days=+1)).date()] = 1 - holidays[datetime(year, 5, 1, tzinfo=tzlocal()).date()] = 1 - holidays[(y_easter+timedelta(days=+39)).date()] = 1 - holidays[(y_easter+timedelta(days=+50)).date()] = 1 - holidays[(y_easter+timedelta(days=+60)).date()] = 1 - holidays[datetime(year, 8, 15, tzinfo=tzlocal()).date()] = 1 - holidays[datetime(year, 10, 3, tzinfo=tzlocal()).date()] = 1 - holidays[datetime(year, 11, 1, tzinfo=tzlocal()).date()] = 1 - holidays[datetime(year, 12, 25, tzinfo=tzlocal()).date()] = 1 - holidays[datetime(year, 12, 26, tzinfo=tzlocal()).date()] = 1 - - return holidays - - @staticmethod - def config(): - config = configparser.ConfigParser() - config_dir = BaseDirectory.load_first_config('worktime') - config.read(f"{config_dir}/worktime.ini") - return config - - def ordinal_workday(self, date): - start_date = datetime(date.year, 1, 1, tzinfo=tzlocal()).date() - return len([1 for offset in range(0, (date - start_date).days + 1) if self.would_be_workday(start_date + timedelta(days = offset))]) - - def would_be_workday(self, date): - return date.isoweekday() in self.workdays and date not in set(day for (day, val) in Worktime.holidays(date.year).items() if val >= 1) - - def __init__(self, start_datetime=None, end_datetime=None, now=None, include_running=True, force_day_to_work=True, **kwargs): - self.include_running = include_running - self.force_day_to_work = force_day_to_work - - if now: - self.now = now - - config = Worktime.config() - config_dir = BaseDirectory.load_first_config('worktime') - api = TogglAPI(api_token=config['TOGGL']['ApiToken'], workspace_id=config['TOGGL']['Workspace'], client_ids=config.get('TOGGL', 'ClientIds', fallback=None)) - date_format = config.get('WORKTIME', 'DateFormat', fallback='%Y-%m-%d') - - start_date = start_datetime or datetime.strptime(config['WORKTIME']['StartDate'], date_format).replace(tzinfo=tzlocal()) - end_date = end_datetime or self.now - - try: - with open(f"{config_dir}/reset", 'r') as reset: - for line in reset: - stripped_line = line.strip() - reset_date = datetime.strptime(stripped_line, date_format).replace(tzinfo=tzlocal()) - - if reset_date > start_date and reset_date <= end_date: - start_date = reset_date - except IOError as e: - if e.errno != 2: - raise e - - - hours_per_week = float(config.get('WORKTIME', 'HoursPerWeek', fallback=40)) - self.workdays = set([int(d.strip()) for d in config.get('WORKTIME', 'Workdays', fallback='1,2,3,4,5').split(',')]) - self.time_per_day = timedelta(hours = hours_per_week) / len(self.workdays) - - holidays = dict() - - leave_per_year = int(config.get('WORKTIME', 'LeavePerYear', fallback=30)) - for year in range(start_date.year, end_date.year + 1): - holidays |= {k: v * self.time_per_day for k, v in Worktime.holidays(year).items()} - leave_frac = 1 - if date(year, 1, 1) < start_date.date(): - leave_frac = (date(year + 1, 1, 1) - start_date.date()) / (date(year + 1, 1, 1) - date(year, 1, 1)) - self.leave_budget |= {year: floor(leave_per_year * leave_frac)} - - try: - with open(f"{config_dir}/reset-leave", 'r') as excused: - for line in excused: - stripped_line = line.strip() - if stripped_line: - [datestr, count] = stripped_line.split(' ') - day = datetime.strptime(datestr, date_format).replace(tzinfo=tzlocal()).date() - if day != start_date.date(): - continue - - self.leave_budget[day.year] = (self.leave_budget[day.year] if day.year in self.leave_budget else 0) + int(count) - except IOError as e: - if e.errno != 2: - raise e - - - for excused_kind in {'excused', 'leave'}: - try: - with open(f"{config_dir}/{excused_kind}", 'r') as excused: - for line in excused: - stripped_line = line.strip() - if stripped_line: - splitLine = stripped_line.split(' ') - fromDay = toDay = None - def parse_datestr(datestr): - nonlocal fromDay, toDay - def parse_single(singlestr): - return datetime.strptime(singlestr, date_format).replace(tzinfo=tzlocal()).date() - if '--' in datestr: - [fromDay,toDay] = datestr.split('--') - fromDay = parse_single(fromDay) - toDay = parse_single(toDay) - else: - fromDay = toDay = parse_single(datestr) - time = self.time_per_day - if len(splitLine) == 2: - [hours, datestr] = splitLine - time = timedelta(hours = float(hours)) - parse_datestr(datestr) - else: - parse_datestr(stripped_line) - - for day in [fromDay + timedelta(days = x) for x in range(0, (toDay - fromDay).days + 1)]: - if end_date.date() < day or day < start_date.date(): - continue - - if excused_kind == 'leave' and self.would_be_workday(day): - self.leave_days.add(day) - holidays[day] = time - except IOError as e: - if e.errno != 2: - raise e - - pull_forward = dict() - - start_day = start_date.date() - end_day = end_date.date() - - try: - with open(f"{config_dir}/pull-forward", 'r') as excused: - for line in excused: - stripped_line = line.strip() - if stripped_line: - [hours, datestr] = stripped_line.split(' ') - constr = datestr.split(',') - for d in [start_day + timedelta(days = x) for x in range(0, (end_day - start_day).days + 1 + int(timedelta(hours = float(hours)).total_seconds() / 60 * (7 / len(self.workdays)) * 2))]: - for c in constr: - if c in calendar.day_abbr: - if not d.strftime('%a') == c: break - elif "--" in c: - [fromDay,toDay] = c.split('--') - if fromDay != "": - fromDay = datetime.strptime(fromDay, date_format).replace(tzinfo=tzlocal()).date() - if not fromDay <= d: break - if toDay != "": - toDay = datetime.strptime(toDay, date_format).replace(tzinfo=tzlocal()).date() - if not d <= toDay: break - else: - if not d == datetime.strptime(c, date_format).replace(tzinfo=tzlocal()).date(): break - else: - if d >= end_date.date(): - pull_forward[d] = min(timedelta(hours = float(hours)), self.time_per_day - (holidays[d] if d in holidays else timedelta())) - except IOError as e: - if e.errno != 2: - raise e - - days_to_work = dict() - - if pull_forward: - end_day = max(end_day, max(list(pull_forward))) - - for day in [start_day + timedelta(days = x) for x in range(0, (end_day - start_day).days + 1)]: - if day.isoweekday() in self.workdays: - time_to_work = self.time_per_day - if day in holidays.keys(): - time_to_work -= holidays[day] - if time_to_work > timedelta(): - days_to_work[day] = time_to_work - - extra_days_to_work = dict() - - try: - with open(f"{config_dir}/days-to-work", 'r') as extra_days_to_work_file: - for line in extra_days_to_work_file: - stripped_line = line.strip() - if stripped_line: - splitLine = stripped_line.split(' ') - if len(splitLine) == 2: - [hours, datestr] = splitLine - day = datetime.strptime(datestr, date_format).replace(tzinfo=tzlocal()).date() - extra_days_to_work[day] = timedelta(hours = float(hours)) - else: - extra_days_to_work[datetime.strptime(stripped_line, date_format).replace(tzinfo=tzlocal()).date()] = self.time_per_day - except IOError as e: - if e.errno != 2: - raise e - - - self.is_workday = self.now.date() in days_to_work or self.now.date() in extra_days_to_work - - self.time_worked = timedelta() - - if self.include_running: - self.running_entry = api.get_running_clock(self.now) - - if self.running_entry: - self.time_worked += self.running_entry - - if self.running_entry and self.include_running and self.force_day_to_work and not (self.now.date() in days_to_work or self.now.date() in extra_days_to_work): - extra_days_to_work[self.now.date()] = timedelta() - - self.time_to_work = sum([days_to_work[day] for day in days_to_work.keys() if day <= end_date.date()], timedelta()) - for day in [d for d in list(pull_forward) if d > end_date.date()]: - days_forward = set([d for d in days_to_work.keys() if d >= end_date.date() and d < day and (not d in pull_forward or d == end_date.date())]) - extra_days_forward = set([d for d in extra_days_to_work.keys() if d >= end_date.date() and d < day and (not d in pull_forward or d == end_date.date())]) - days_forward = days_forward.union(extra_days_forward) - - extra_day_time_left = timedelta() - for extra_day in extra_days_forward: - day_time = max(timedelta(), self.time_per_day - extra_days_to_work[extra_day]) - extra_day_time_left += day_time - extra_day_time = min(extra_day_time_left, pull_forward[day]) - time_forward = pull_forward[day] - extra_day_time - if extra_day_time_left > timedelta(): - for extra_day in extra_days_forward: - day_time = max(timedelta(), self.time_per_day - extra_days_to_work[extra_day]) - extra_days_to_work[extra_day] += extra_day_time * (day_time / extra_day_time_left) - - hours_per_day_forward = time_forward / len(days_forward) if len(days_forward) > 0 else timedelta() - days_forward.discard(end_date.date()) - - self.time_pulled_forward += time_forward - hours_per_day_forward * len(days_forward) - - if end_date.date() in extra_days_to_work: - self.time_pulled_forward += extra_days_to_work[end_date.date()] - - self.time_to_work += self.time_pulled_forward - - self.time_worked += api.get_billable_hours(start_date, self.now, rounding = config.getboolean('WORKTIME', 'rounding', fallback=True)) - -def worktime(**args): - worktime = Worktime(**args) - - def format_worktime(worktime): - def difference_string(difference): - total_minutes_difference = round(difference / timedelta(minutes = 1)) - (hours_difference, minutes_difference) = divmod(abs(total_minutes_difference), 60) - sign = '' if total_minutes_difference >= 0 else '-' - - difference_string = f"{sign}" - if hours_difference != 0: - difference_string += f"{hours_difference}h" - if hours_difference == 0 or minutes_difference != 0: - difference_string += f"{minutes_difference}m" - - return difference_string - - difference = worktime.time_to_work - worktime.time_worked - total_minutes_difference = 5 * ceil(difference / timedelta(minutes = 5)) - - if worktime.running_entry and abs(difference) < timedelta(days = 1) and (total_minutes_difference > 0 or abs(worktime.running_entry) >= abs(difference)) : - clockout_time = worktime.now + difference - clockout_time += (5 - clockout_time.minute % 5) * timedelta(minutes = 1) - clockout_time = clockout_time.replace(second = 0, microsecond = 0) - - if total_minutes_difference >= 0: - difference_string = difference_string(total_minutes_difference * timedelta(minutes = 1)) - return f"{difference_string}/{clockout_time:%H:%M}" - else: - difference_string = difference_string(abs(total_minutes_difference) * timedelta(minutes = 1)) - return f"{clockout_time:%H:%M}/{difference_string}" - else: - if worktime.running_entry: - difference_string = difference_string(abs(total_minutes_difference) * timedelta(minutes = 1)) - indicator = '↓' if total_minutes_difference >= 0 else '↑' # '\u25b6' - - return f"{indicator}{difference_string}" - else: - difference_string = difference_string(total_minutes_difference * timedelta(minutes = 1)) - if worktime.is_workday: - return difference_string - else: - return f"({difference_string})" - - if worktime.time_pulled_forward >= timedelta(minutes = 15): - worktime_no_pulled_forward = deepcopy(worktime) - worktime_no_pulled_forward.time_to_work -= worktime_no_pulled_forward.time_pulled_forward - worktime_no_pulled_forward.time_pulled_forward = timedelta() - - difference_string = format_worktime(worktime) - difference_string_no_pulled_forward = format_worktime(worktime_no_pulled_forward) - - print(f"{difference_string_no_pulled_forward}…{difference_string}") - else: - print(format_worktime(worktime)) - -def time_worked(now, **args): - then = now.replace(hour = 0, minute = 0, second = 0, microsecond = 0) - if now.time() == time(): - now = now + timedelta(days = 1) - - then = Worktime(**dict(args, now = then)) - now = Worktime(**dict(args, now = now)) - - worked = now.time_worked - then.time_worked - - if args['do_round']: - total_minutes_difference = 5 * ceil(worked / timedelta(minutes = 5)) - (hours_difference, minutes_difference) = divmod(abs(total_minutes_difference), 60) - sign = '' if total_minutes_difference >= 0 else '-' - - difference_string = f"{sign}" - if hours_difference != 0: - difference_string += f"{hours_difference}h" - if hours_difference == 0 or minutes_difference != 0: - difference_string += f"{minutes_difference}m" - - clockout_time = None - clockout_difference = None - if then.is_workday or now.is_workday: - target_time = max(then.time_per_day, now.time_per_day) if then.time_per_day and now.time_per_day else (then.time_per_day if then.time_per_day else now.time_per_day); - difference = target_time - worked - clockout_difference = 5 * ceil(difference / timedelta(minutes = 5)) - clockout_time = now.now + difference - clockout_time += (5 - clockout_time.minute % 5) * timedelta(minutes = 1) - clockout_time = clockout_time.replace(second = 0, microsecond = 0) - - if now.running_entry and clockout_time and clockout_difference >= 0: - print(f"{difference_string}/{clockout_time:%H:%M}") - else: - print(difference_string) - else: - print(worked) - -def diff(now, **args): - now = now.replace(hour = 0, minute = 0, second = 0, microsecond = 0) - then = now - timedelta.resolution - - then = Worktime(**dict(args, now = then, include_running = False)) - now = Worktime(**dict(args, now = now, include_running = False)) - - print(now.time_to_work - then.time_to_work) - -def holidays(year, **args): - config = Worktime.config() - date_format = config.get('WORKTIME', 'DateFormat', fallback='%Y-%m-%d') - - table_data = [] - - holidays = Worktime.holidays(year) - for k, v in holidays.items(): - kstr = k.strftime(date_format) - table_data += [[kstr, v]] - print(tabulate(table_data, tablefmt="plain")) - -def leave(year, table, **args): - def_year = datetime.now(tzlocal()).year - worktime = Worktime(**dict(**args, end_datetime = datetime(year = (year if year else def_year) + 1, month = 1, day = 1, tzinfo=tzlocal()) - timedelta(microseconds=1))) - config = Worktime.config() - date_format = config.get('WORKTIME', 'DateFormat', fallback='%Y-%m-%d') - leave_expires = config.get('WORKTIME', 'LeaveExpires', fallback=None) - if leave_expires: - leave_expires = datetime.strptime(leave_expires, '%m-%d').date() - - leave_budget = deepcopy(worktime.leave_budget) - year_leave_budget = deepcopy(worktime.leave_budget) if year else None - years = sorted(leave_budget.keys()) - for day in sorted(worktime.leave_days): - for iyear in years: - if day > leave_expires.replace(year = iyear + 1): - continue - if leave_budget[iyear] <= 0: - continue - - leave_budget[iyear] -= 1 - if year_leave_budget and day.year < year: - year_leave_budget[iyear] -= 1 - break - else: - print(f'Unaccounted leave: {day}', file=stderr) - - if table and year: - table_data = [] - leave_days = sorted([day for day in worktime.leave_days if day.year == year and worktime.would_be_workday(day)]) - - count = 0 - for _, group in groupby(enumerate(leave_days), lambda kv: kv[0] - worktime.ordinal_workday(kv[1])): - group = list(map(lambda kv: kv[1], group)) - - for day in group: - for iyear in years: - if day > leave_expires.replace(year = iyear + 1): - continue - if year_leave_budget[iyear] <= 0: - continue - - year_leave_budget[iyear] -= 1 - break - - next_count = count + len(group) - if len(group) > 1: - table_data.append([count, group[0].strftime('%m-%d') + '--' + group[-1].strftime('%m-%d'), len(group), sum(year_leave_budget.values())]) - else: - table_data.append([count, group[0].strftime('%m-%d'), len(group), sum(year_leave_budget.values())]) - count = next_count - print(tabulate(table_data, tablefmt="plain")) - elif table: - table_data = [] - for year, days in leave_budget.items(): - leave_days = sorted([day for day in worktime.leave_days if day.year == year]) - table_data += [[year, days, ','.join(map(lambda d: d.strftime('%m-%d'), leave_days))]] - print(tabulate(table_data, tablefmt="plain")) - else: - print(leave_budget[year if year else def_year]) - -def main(): - parser = argparse.ArgumentParser(prog = "worktime", description = 'Track worktime using toggl API') - parser.add_argument('--time', dest = 'now', metavar = 'TIME', type = lambda s: datetime.fromisoformat(s).replace(tzinfo=tzlocal()), help = 'Time to calculate status for (default: current time)', default = datetime.now(tzlocal())) - parser.add_argument('--no-running', dest = 'include_running', action = 'store_false') - parser.add_argument('--no-force-day-to-work', dest = 'force_day_to_work', action = 'store_false') - subparsers = parser.add_subparsers(help = 'Subcommands') - parser.set_defaults(cmd = worktime) - time_worked_parser = subparsers.add_parser('time_worked', aliases = ['time', 'worked', 'today']) - time_worked_parser.add_argument('--no-round', dest = 'do_round', action = 'store_false') - time_worked_parser.set_defaults(cmd = time_worked) - diff_parser = subparsers.add_parser('diff') - diff_parser.set_defaults(cmd = diff) - holidays_parser = subparsers.add_parser('holidays') - holidays_parser.add_argument('year', metavar = 'YEAR', type = int, help = 'Year to evaluate holidays for (default: current year)', default = datetime.now(tzlocal()).year, nargs='?') - holidays_parser.set_defaults(cmd = holidays) - leave_parser = subparsers.add_parser('leave') - leave_parser.add_argument('year', metavar = 'YEAR', type = int, help = 'Year to evaluate leave days for (default: current year)', default = None, nargs='?') - leave_parser.add_argument('--table', action = 'store_true') - leave_parser.set_defaults(cmd = leave) - args = parser.parse_args() - - args.cmd(**vars(args)) - -if __name__ == "__main__": - sys.exit(main()) -- cgit v1.2.3