From 33b4e0eb74fee27d8edacb581fd6eeb10cce9f4f Mon Sep 17 00:00:00 2001 From: Gregor Kleen Date: Tue, 17 Dec 2019 00:17:57 +0100 Subject: worktime --- worktime.py | 199 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 199 insertions(+) create mode 100755 worktime.py (limited to 'worktime.py') diff --git a/worktime.py b/worktime.py new file mode 100755 index 0000000..36436ae --- /dev/null +++ b/worktime.py @@ -0,0 +1,199 @@ +#!@python@/bin/python + +import requests +from requests.auth import HTTPBasicAuth +from datetime import * +from xdg import (BaseDirectory) +import configparser +from uritools import uricompose + +from dateutil.easter import * +from dateutil.tz import * +from dateutil.parser import isoparse + +from enum import Enum + +from math import (copysign, ceil) + +class TogglAPISection(Enum): + TOGGL = '/api/v8' + REPORTS = '/reports/api/v2' + +class TogglAPI(object): + def __init__(self, api_token, workspace_id): + self._api_token = api_token + self._workspace_id = workspace_id + + def _make_url(self, api=TogglAPISection.TOGGL, section=['time_entries', 'current'], params={}): + if api is TogglAPISection.REPORTS: + params.update({'user_agent': 'worktime', 'workspace_id': self._workspace_id}) + + api_path = api.value + section_path = '/'.join(section) + return uricompose(scheme='https', host='www.toggl.com', path=f"{api_path}/{section_path}", query=params) + + def _query(self, url, method): + + headers = {'content-type': 'application/json'} + + if method == 'GET': + return requests.get(url, headers=headers, auth=HTTPBasicAuth(self._api_token, 'api_token')) + elif method == 'POST': + return requests.post(url, headers=headers, auth=HTTPBasicAuth(self._api_token, 'api_token')) + else: + raise ValueError(f"Undefined HTTP method “{method}”") + + def get_billable_hours(self, start_date, end_date=datetime.now(timezone.utc)): + url = self._make_url(api = TogglAPISection.REPORTS, section = ['summary'], params={'since': start_date.astimezone(timezone.utc).isoformat(), 'until': end_date.astimezone(timezone.utc).isoformat()}) + r = self._query(url = url, method='GET') + return timedelta(milliseconds=r.json()['total_billable']) + + def get_running_clock(self, now=datetime.now(timezone.utc)): + url = self._make_url(api = TogglAPISection.TOGGL, section = ['time_entries', 'current']) + r = self._query(url = url, method='GET').json() + + if not r or not r['data'] or not r['data']['billable']: + return None + + start = isoparse(r['data']['start']) + + return now - start if start <= now else None + +class Worktime(object): + time_to_work = timedelta() + time_worked = timedelta() + running_entry = None + now = datetime.now(tzlocal()) + time_pulled_forward = timedelta() + + def __init__(self, start_datetime=None, end_datetime=None, now=None): + if now: + self.now = now + + config = configparser.ConfigParser() + config_dir = BaseDirectory.load_first_config('worktime') + config.read(f"{config_dir}/worktime.ini") + api = TogglAPI(api_token=config['TOGGL']['ApiToken'], workspace_id=config['TOGGL']['Workspace']) + date_format = config.get('WORKTIME', 'DateFormat', fallback='%Y-%m-%d') + + start_date = start_datetime or datetime.strptime(config['WORKTIME']['StartDate'], date_format).replace(tzinfo=tzlocal()) + end_date = end_datetime or self.now + + hours_per_week = float(config.get('WORKTIME', 'HoursPerWeek', fallback=40)) + workdays = set([int(d.strip()) for d in config.get('WORKTIME', 'Workdays', fallback='1,2,3,4,5').split(',')]) + hours_per_day = hours_per_week / len(workdays) + + holidays = set() + + for year in range(start_date.year, end_date.year + 1): + y_easter = datetime.combine(easter(year), time(), tzinfo=tzlocal()) + + # Legal holidays in munich, bavaria + holidays.add(datetime(year, 1, 1, tzinfo=tzlocal()).date()) + holidays.add(datetime(year, 1, 6, tzinfo=tzlocal()).date()) + holidays.add((y_easter+timedelta(days=-2)).date()) + holidays.add((y_easter+timedelta(days=+1)).date()) + holidays.add(datetime(year, 5, 1, tzinfo=tzlocal()).date()) + holidays.add((y_easter+timedelta(days=+39)).date()) + holidays.add((y_easter+timedelta(days=+50)).date()) + holidays.add((y_easter+timedelta(days=+60)).date()) + holidays.add(datetime(year, 8, 15, tzinfo=tzlocal()).date()) + holidays.add(datetime(year, 10, 3, tzinfo=tzlocal()).date()) + holidays.add(datetime(year, 11, 1, tzinfo=tzlocal()).date()) + holidays.add(datetime(year, 12, 25, tzinfo=tzlocal()).date()) + holidays.add(datetime(year, 12, 26, tzinfo=tzlocal()).date()) + + try: + with open(f"{config_dir}/excused", 'r') as excused: + for line in excused: + stripped_line = line.strip() + if stripped_line: + holidays.add(datetime.strptime(stripped_line, date_format).replace(tzinfo=tzlocal()).date()) + except IOError as e: + if e.errno != 2: + raise e + + pull_forward = dict() + + try: + with open(f"{config_dir}/pull-forward", 'r') as excused: + for line in excused: + stripped_line = line.strip() + if stripped_line: + [hours, date] = stripped_line.split(' ') + day = datetime.strptime(date, date_format).replace(tzinfo=tzlocal()).date() + if day > end_date.date(): + pull_forward[day] = timedelta(hours = float(hours)) + except IOError as e: + if e.errno != 2: + raise e + + + days_to_work = set() + + start_day = start_date.date() + end_day = end_date.date() + if pull_forward: + end_day = max(end_day, max(list(pull_forward))) + + for day in [start_day + timedelta(days = x) for x in range(0, (end_day - start_day).days + 1)]: + if day.isoweekday() in workdays and not day in holidays: + days_to_work.add(day) + + self.time_to_work = timedelta(hours = len([day for day in days_to_work if day <= end_date.date()]) * hours_per_day) + for day in list(pull_forward): + days_forward = set([d for d in days_to_work if d >= end_date.date() and d < day and not d in pull_forward]) + hours_per_day_forward = pull_forward[day] / len(days_forward) if len(days_forward) > 0 else timedelta() + days_forward.discard(end_date.date()) + self.time_pulled_forward += pull_forward[day] - hours_per_day_forward * len(days_forward) + self.time_to_work += self.time_pulled_forward + + self.time_worked = api.get_billable_hours(start_date, self.now) + self.running_entry = api.get_running_clock(self.now) + + if self.running_entry: + self.time_worked += self.running_entry + +def main(): + worktime = Worktime() + + def difference_string(difference): + total_minutes_difference = round(difference / timedelta(minutes = 1)) + (hours_difference, minutes_difference) = divmod(abs(total_minutes_difference), 60) + sign = '' if total_minutes_difference >= 0 else '-' + + difference_string = f"{sign}" + if hours_difference != 0: + difference_string += f"{hours_difference}h" + if hours_difference == 0 or minutes_difference != 0: + difference_string += f"{minutes_difference}m" + + return difference_string + + + difference = worktime.time_to_work - worktime.time_worked + total_minutes_difference = 5 * ceil(difference / timedelta(minutes = 5)) + + if worktime.running_entry and abs(difference) < timedelta(days = 1) and (total_minutes_difference > 0 or abs(worktime.running_entry) >= abs(difference)) : + clockout_time = worktime.now + difference + clockout_time += (5 - clockout_time.minute % 5) * timedelta(minutes = 1) + clockout_time = clockout_time.replace(second = 0, microsecond = 0) + + if total_minutes_difference >= 0: + difference_string = difference_string(total_minutes_difference * timedelta(minutes = 1)) + print("{difference_string}/{clockout_time}".format(difference_string = difference_string, clockout_time = clockout_time.strftime("%H:%M"))) + else: + difference_string = difference_string(abs(total_minutes_difference) * timedelta(minutes = 1)) + print("{clockout_time}/{difference_string}".format(difference_string = difference_string, clockout_time = clockout_time.strftime("%H:%M"))) + else: + if worktime.running_entry: + difference_string = difference_string(abs(total_minutes_difference) * timedelta(minutes = 1)) + indicator = '↘' if total_minutes_difference >= 0 else '↗' # '\u25b6' + + print(f"{indicator} {difference_string}") + else: + difference_string = difference_string(total_minutes_difference * timedelta(minutes = 1)) + print(difference_string) + +if __name__ == "__main__": + sys.exit(main()) -- cgit v1.2.3