#!@python@/bin/python import requests from requests.exceptions import HTTPError from requests.auth import HTTPBasicAuth from datetime import * from xdg import (BaseDirectory) import configparser from uritools import uricompose from dateutil.easter import * from dateutil.tz import * from dateutil.parser import isoparse from enum import Enum from math import (copysign, ceil) import calendar import argparse from copy import deepcopy import sys from tabulate import tabulate class TogglAPISection(Enum): TOGGL = '/api/v8' REPORTS = '/reports/api/v2' class TogglAPIError(Exception): def __init__(self, http_error, response): self.http_error = http_error self.response = response def __str__(self): if not self.http_error is None: return str(self.http_error) else: return self.response.text class TogglAPI(object): def __init__(self, api_token, workspace_id): self._api_token = api_token self._workspace_id = workspace_id def _make_url(self, api=TogglAPISection.TOGGL, section=['time_entries', 'current'], params={}): if api is TogglAPISection.REPORTS: params.update({'user_agent': 'worktime', 'workspace_id': self._workspace_id}) api_path = api.value section_path = '/'.join(section) uri = uricompose(scheme='https', host='api.track.toggl.com', path=f"{api_path}/{section_path}", query=params) return uri def _query(self, url, method): headers = {'content-type': 'application/json'} response = None if method == 'GET': response = requests.get(url, headers=headers, auth=HTTPBasicAuth(self._api_token, 'api_token')) elif method == 'POST': response = requests.post(url, headers=headers, auth=HTTPBasicAuth(self._api_token, 'api_token')) else: raise ValueError(f"Undefined HTTP method “{method}”") response.raise_for_status() return response def get_billable_hours(self, start_date, end_date=datetime.now(timezone.utc), rounding=False): billable_acc = timedelta(milliseconds = 0) step = timedelta(days = 365) for req_start in [start_date + x * step for x in range(0, ceil((end_date - start_date) / step))]: req_end = end_date if end_date > req_start + step: req_end = datetime.combine((req_start + step).astimezone(timezone.utc).date(), time(tzinfo=timezone.utc)) elif req_start > start_date: req_start = datetime.combine(req_start.astimezone(timezone.utc).date(), time(tzinfo=timezone.utc)) + timedelta(days = 1) url = self._make_url(api = TogglAPISection.REPORTS, section = ['summary'], params={'since': req_start.astimezone(timezone.utc).isoformat(), 'until': req_end.astimezone(timezone.utc).isoformat(), 'rounding': rounding}) r = self._query(url = url, method='GET') if not r or not r.json(): raise TogglAPIError(r) billable_acc += timedelta(milliseconds=r.json()['total_billable']) if r.json()['total_billable'] else timedelta(milliseconds=0) return billable_acc def get_running_clock(self, now=datetime.now(timezone.utc)): url = self._make_url(api = TogglAPISection.TOGGL, section = ['time_entries', 'current']) r = self._query(url = url, method='GET') if not r or not r.json(): raise TogglAPIError(r) if not r.json()['data'] or not r.json()['data']['billable']: return None start = isoparse(r.json()['data']['start']) return now - start if start <= now else None class Worktime(object): time_worked = timedelta() running_entry = None now = datetime.now(tzlocal()) time_pulled_forward = timedelta() is_workday = False include_running = True time_to_work = None force_day_to_work = True @staticmethod def holidays(year): holidays = dict() y_easter = datetime.combine(easter(year), time(), tzinfo=tzlocal()) # Legal holidays in munich, bavaria holidays[datetime(year, 1, 1, tzinfo=tzlocal()).date()] = 1 holidays[datetime(year, 1, 6, tzinfo=tzlocal()).date()] = 1 holidays[(y_easter+timedelta(days=-2)).date()] = 1 holidays[(y_easter+timedelta(days=+1)).date()] = 1 holidays[datetime(year, 5, 1, tzinfo=tzlocal()).date()] = 1 holidays[(y_easter+timedelta(days=+39)).date()] = 1 holidays[(y_easter+timedelta(days=+50)).date()] = 1 holidays[(y_easter+timedelta(days=+60)).date()] = 1 holidays[datetime(year, 8, 15, tzinfo=tzlocal()).date()] = 1 holidays[datetime(year, 10, 3, tzinfo=tzlocal()).date()] = 1 holidays[datetime(year, 11, 1, tzinfo=tzlocal()).date()] = 1 holidays[datetime(year, 12, 25, tzinfo=tzlocal()).date()] = 1 holidays[datetime(year, 12, 26, tzinfo=tzlocal()).date()] = 1 return holidays @staticmethod def config(): config = configparser.ConfigParser() config_dir = BaseDirectory.load_first_config('worktime') config.read(f"{config_dir}/worktime.ini") return config def __init__(self, start_datetime=None, end_datetime=None, now=None, include_running=True, force_day_to_work=True, **kwargs): self.include_running = include_running self.force_day_to_work = force_day_to_work if now: self.now = now config = Worktime.config() config_dir = BaseDirectory.load_first_config('worktime') api = TogglAPI(api_token=config['TOGGL']['ApiToken'], workspace_id=config['TOGGL']['Workspace']) date_format = config.get('WORKTIME', 'DateFormat', fallback='%Y-%m-%d') start_date = start_datetime or datetime.strptime(config['WORKTIME']['StartDate'], date_format).replace(tzinfo=tzlocal()) end_date = end_datetime or self.now try: with open(f"{config_dir}/reset", 'r') as reset: for line in reset: stripped_line = line.strip() reset_date = datetime.strptime(stripped_line, date_format).replace(tzinfo=tzlocal()) if reset_date > start_date and reset_date < end_date: start_date = reset_date except IOError as e: if e.errno != 2: raise e hours_per_week = float(config.get('WORKTIME', 'HoursPerWeek', fallback=40)) workdays = set([int(d.strip()) for d in config.get('WORKTIME', 'Workdays', fallback='1,2,3,4,5').split(',')]) time_per_day = timedelta(hours = hours_per_week) / len(workdays) holidays = dict() for year in range(start_date.year, end_date.year + 1): holidays |= {k: v * time_per_day for k, v in Worktime.holidays(year).items()} try: with open(f"{config_dir}/excused", 'r') as excused: for line in excused: stripped_line = line.strip() if stripped_line: splitLine = stripped_line.split(' ') if len(splitLine) == 2: [hours, date] = splitLine day = datetime.strptime(date, date_format).replace(tzinfo=tzlocal()).date() holidays[day] = timedelta(hours = float(hours)) else: holidays[datetime.strptime(stripped_line, date_format).replace(tzinfo=tzlocal()).date()] = time_per_day except IOError as e: if e.errno != 2: raise e pull_forward = dict() start_day = start_date.date() end_day = end_date.date() try: with open(f"{config_dir}/pull-forward", 'r') as excused: for line in excused: stripped_line = line.strip() if stripped_line: [hours, date] = stripped_line.split(' ') constr = date.split(',') for d in [start_day + timedelta(days = x) for x in range(0, (end_day - start_day).days + 1 + int(timedelta(hours = float(hours)).total_seconds() / 60 * (7 / len(workdays)) * 2))]: for c in constr: if c in calendar.day_abbr: if not d.strftime('%a') == c: break elif "--" in c: [fromDay,toDay] = c.split('--') if fromDay != "": fromDay = datetime.strptime(fromDay, date_format).replace(tzinfo=tzlocal()).date() if not fromDay <= d: break if toDay != "": toDay = datetime.strptime(toDay, date_format).replace(tzinfo=tzlocal()).date() if not d <= toDay: break else: if not d == datetime.strptime(c, date_format).replace(tzinfo=tzlocal()).date(): break else: if d >= end_date.date(): pull_forward[d] = min(timedelta(hours = float(hours)), time_per_day - (holidays[d] if d in holidays else timedelta())) except IOError as e: if e.errno != 2: raise e days_to_work = dict() if pull_forward: end_day = max(end_day, max(list(pull_forward))) for day in [start_day + timedelta(days = x) for x in range(0, (end_day - start_day).days + 1)]: if day.isoweekday() in workdays: time_to_work = time_per_day if day in holidays.keys(): time_to_work -= holidays[day] if time_to_work > timedelta(): days_to_work[day] = time_to_work extra_days_to_work = dict() try: with open(f"{config_dir}/days-to-work", 'r') as extra_days_to_work_file: for line in extra_days_to_work_file: stripped_line = line.strip() if stripped_line: splitLine = stripped_line.split(' ') if len(splitLine) == 2: [hours, date] = splitLine day = datetime.strptime(date, date_format).replace(tzinfo=tzlocal()).date() extra_days_to_work[day] = timedelta(hours = float(hours)) else: extra_days_to_work[datetime.strptime(stripped_line, date_format).replace(tzinfo=tzlocal()).date()] = time_per_day except IOError as e: if e.errno != 2: raise e self.is_workday = self.now.date() in days_to_work or self.now.date() in extra_days_to_work self.time_worked = timedelta() if self.include_running: self.running_entry = api.get_running_clock(self.now) if self.running_entry: self.time_worked += self.running_entry if self.running_entry and self.include_running and self.force_day_to_work and not (self.now.date() in days_to_work or self.now.date() in extra_days_to_work): extra_days_to_work[self.now.date()] = timedelta() self.time_to_work = sum([days_to_work[day] for day in days_to_work.keys() if day <= end_date.date()], timedelta()) for day in [d for d in list(pull_forward) if d > end_date.date()]: days_forward = set([d for d in days_to_work.keys() if d >= end_date.date() and d < day and (not d in pull_forward or d == end_date.date())]) extra_days_forward = set([d for d in extra_days_to_work.keys() if d >= end_date.date() and d < day and (not d in pull_forward or d == end_date.date())]) days_forward = days_forward.union(extra_days_forward) extra_day_time_left = timedelta() for extra_day in extra_days_forward: day_time = max(timedelta(), time_per_day - extra_days_to_work[extra_day]) extra_day_time_left += day_time extra_day_time = min(extra_day_time_left, pull_forward[day]) time_forward = pull_forward[day] - extra_day_time if extra_day_time_left > timedelta(): for extra_day in extra_days_forward: day_time = max(timedelta(), time_per_day - extra_days_to_work[extra_day]) extra_days_to_work[extra_day] += extra_day_time * (day_time / extra_day_time_left) hours_per_day_forward = time_forward / len(days_forward) if len(days_forward) > 0 else timedelta() days_forward.discard(end_date.date()) self.time_pulled_forward += time_forward - hours_per_day_forward * len(days_forward) if end_date.date() in extra_days_to_work: self.time_pulled_forward += extra_days_to_work[end_date.date()] self.time_to_work += self.time_pulled_forward self.time_worked += api.get_billable_hours(start_date, self.now, rounding = config.getboolean('WORKTIME', 'rounding', fallback=True)) def worktime(**args): worktime = Worktime(**args) def format_worktime(worktime): def difference_string(difference): total_minutes_difference = round(difference / timedelta(minutes = 1)) (hours_difference, minutes_difference) = divmod(abs(total_minutes_difference), 60) sign = '' if total_minutes_difference >= 0 else '-' difference_string = f"{sign}" if hours_difference != 0: difference_string += f"{hours_difference}h" if hours_difference == 0 or minutes_difference != 0: difference_string += f"{minutes_difference}m" return difference_string difference = worktime.time_to_work - worktime.time_worked total_minutes_difference = 5 * ceil(difference / timedelta(minutes = 5)) if worktime.running_entry and abs(difference) < timedelta(days = 1) and (total_minutes_difference > 0 or abs(worktime.running_entry) >= abs(difference)) : clockout_time = worktime.now + difference clockout_time += (5 - clockout_time.minute % 5) * timedelta(minutes = 1) clockout_time = clockout_time.replace(second = 0, microsecond = 0) if total_minutes_difference >= 0: difference_string = difference_string(total_minutes_difference * timedelta(minutes = 1)) return "{difference_string}/{clockout_time}".format(difference_string = difference_string, clockout_time = clockout_time.strftime("%H:%M")) else: difference_string = difference_string(abs(total_minutes_difference) * timedelta(minutes = 1)) return "{clockout_time}/{difference_string}".format(difference_string = difference_string, clockout_time = clockout_time.strftime("%H:%M")) else: if worktime.running_entry: difference_string = difference_string(abs(total_minutes_difference) * timedelta(minutes = 1)) indicator = '↓' if total_minutes_difference >= 0 else '↑' # '\u25b6' return f"{indicator}{difference_string}" else: difference_string = difference_string(total_minutes_difference * timedelta(minutes = 1)) if worktime.is_workday: return difference_string else: return f"({difference_string})" if worktime.time_pulled_forward >= timedelta(minutes = 15): worktime_no_pulled_forward = deepcopy(worktime) worktime_no_pulled_forward.time_to_work -= worktime_no_pulled_forward.time_pulled_forward worktime_no_pulled_forward.time_pulled_forward = timedelta() difference_string = format_worktime(worktime) difference_string_no_pulled_forward = format_worktime(worktime_no_pulled_forward) print(f"{difference_string_no_pulled_forward}…{difference_string}") else: print(format_worktime(worktime)) def time_worked(now, **args): then = now.replace(hour = 0, minute = 0, second = 0, microsecond = 0) if now.time() == time(): now = now + timedelta(days = 1) then = Worktime(**dict(args, now = then)) now = Worktime(**dict(args, now = now)) worked = now.time_worked - then.time_worked if args['do_round']: total_minutes_difference = 5 * ceil(worked / timedelta(minutes = 5)) (hours_difference, minutes_difference) = divmod(abs(total_minutes_difference), 60) sign = '' if total_minutes_difference >= 0 else '-' difference_string = f"{sign}" if hours_difference != 0: difference_string += f"{hours_difference}h" if hours_difference == 0 or minutes_difference != 0: difference_string += f"{minutes_difference}m" print(difference_string) else: print(worked) def diff(now, **args): now = now.replace(hour = 0, minute = 0, second = 0, microsecond = 0) then = now - timedelta.resolution then = Worktime(**dict(args, now = then, include_running = False)) now = Worktime(**dict(args, now = now, include_running = False)) print(now.time_to_work - then.time_to_work) def holidays(now, **args): config = Worktime.config() date_format = config.get('WORKTIME', 'DateFormat', fallback='%Y-%m-%d') table_data = [] holidays = Worktime.holidays(now.year) for k, v in holidays.items(): kstr = k.strftime(date_format) table_data += [[kstr, v]] print(tabulate(table_data, tablefmt="plain")) def main(): parser = argparse.ArgumentParser(prog = "worktime", description = 'Track worktime using toggl API') parser.add_argument('--time', dest = 'now', metavar = 'TIME', type = lambda s: datetime.fromisoformat(s).replace(tzinfo=tzlocal()), help = 'Time to calculate status for (default: current time)', default = datetime.now(tzlocal())) parser.add_argument('--no-running', dest = 'include_running', action = 'store_false') parser.add_argument('--no-force-day-to-work', dest = 'force_day_to_work', action = 'store_false') subparsers = parser.add_subparsers(help = 'Subcommands') parser.set_defaults(cmd = worktime) time_worked_parser = subparsers.add_parser('time_worked', aliases = ['time', 'worked', 'today']) time_worked_parser.add_argument('--no-round', dest = 'do_round', action = 'store_false') time_worked_parser.set_defaults(cmd = time_worked) diff_parser = subparsers.add_parser('diff') diff_parser.set_defaults(cmd = diff) holidays_parser = subparsers.add_parser('holidays') holidays_parser.set_defaults(cmd = holidays) args = parser.parse_args() args.cmd(**vars(args)) if __name__ == "__main__": sys.exit(main())