From 63d94084ed6237cec818659f34984db452021f1c Mon Sep 17 00:00:00 2001 From: Gregor Kleen Date: Tue, 4 Apr 2023 12:05:53 +0200 Subject: ... --- overlays/worktime/worktime/__main__.py | 170 ++++++++++++++++++++++++++------- 1 file changed, 134 insertions(+), 36 deletions(-) (limited to 'overlays/worktime') diff --git a/overlays/worktime/worktime/__main__.py b/overlays/worktime/worktime/__main__.py index 2dc9ed72..9bc7ac3b 100755 --- a/overlays/worktime/worktime/__main__.py +++ b/overlays/worktime/worktime/__main__.py @@ -26,12 +26,14 @@ from sys import stderr from tabulate import tabulate from itertools import groupby -from functools import cache +from functools import cache, partial import backoff from pathlib import Path +from collections import defaultdict + class TogglAPISection(Enum): TOGGL = '/api/v8' @@ -168,7 +170,7 @@ class Worktime(object): running_entry = None now = datetime.now(tzlocal()) time_pulled_forward = timedelta() - is_workday = False + now_is_workday = False include_running = True time_to_work = None force_day_to_work = True @@ -213,6 +215,9 @@ class Worktime(object): def would_be_workday(self, date): return date.isoweekday() in self.workdays and date not in set(day for (day, val) in Worktime.holidays(date.year).items() if val >= 1) + def is_workday(self, date, extra=True): + return date in self.days_to_work or (extra and date in self.extra_days_to_work) + def __init__(self, start_datetime=None, end_datetime=None, now=None, include_running=True, force_day_to_work=True, **kwargs): self.include_running = include_running self.force_day_to_work = force_day_to_work @@ -229,8 +234,8 @@ class Worktime(object): ) date_format = config.get("WORKTIME", {}).get("DateFormat", '%Y-%m-%d') - start_date = start_datetime or datetime.strptime(config.get("WORKTIME", {}).get("StartDate"), date_format).replace(tzinfo=tzlocal()) - end_date = end_datetime or self.now + self.start_date = start_datetime or datetime.strptime(config.get("WORKTIME", {}).get("StartDate"), date_format).replace(tzinfo=tzlocal()) + self.end_date = end_datetime or self.now try: with open(Path(config_dir) / "reset", 'r') as reset: @@ -238,8 +243,8 @@ class Worktime(object): stripped_line = line.strip() reset_date = datetime.strptime(stripped_line, date_format).replace(tzinfo=tzlocal()) - if reset_date > start_date and reset_date <= end_date: - start_date = reset_date + if reset_date > self.start_date and reset_date <= self.end_date: + self.start_date = reset_date except IOError as e: if e.errno != 2: raise e @@ -252,11 +257,11 @@ class Worktime(object): holidays = dict() leave_per_year = int(config.get("WORKTIME", {}).get("LeavePerYear", 30)) - for year in range(start_date.year, end_date.year + 1): + for year in range(self.start_date.year, self.end_date.year + 1): holidays |= {k: v * self.time_per_day for k, v in Worktime.holidays(year).items()} leave_frac = 1 - if date(year, 1, 1) < start_date.date(): - leave_frac = (date(year + 1, 1, 1) - start_date.date()) / (date(year + 1, 1, 1) - date(year, 1, 1)) + if date(year, 1, 1) < self.start_date.date(): + leave_frac = (date(year + 1, 1, 1) - self.start_date.date()) / (date(year + 1, 1, 1) - date(year, 1, 1)) self.leave_budget |= {year: floor(leave_per_year * leave_frac)} try: @@ -266,7 +271,7 @@ class Worktime(object): if stripped_line: [datestr, count] = stripped_line.split(' ') day = datetime.strptime(datestr, date_format).replace(tzinfo=tzlocal()).date() - if day != start_date.date(): + if day != self.start_date.date(): continue self.leave_budget[day.year] = (self.leave_budget[day.year] if day.year in self.leave_budget else 0) + int(count) @@ -302,7 +307,7 @@ class Worktime(object): parse_datestr(stripped_line) for day in [fromDay + timedelta(days = x) for x in range(0, (toDay - fromDay).days + 1)]: - if end_date.date() < day or day < start_date.date(): + if self.end_date.date() < day or day < self.start_date.date(): continue if excused_kind == 'leave' and self.would_be_workday(day): @@ -314,8 +319,8 @@ class Worktime(object): pull_forward = dict() - start_day = start_date.date() - end_day = end_date.date() + start_day = self.start_date.date() + end_day = self.end_date.date() try: with open(Path(config_dir) / "pull-forward", 'r') as excused: @@ -339,13 +344,13 @@ class Worktime(object): else: if not d == datetime.strptime(c, date_format).replace(tzinfo=tzlocal()).date(): break else: - if d >= end_date.date(): + if d >= self.end_date.date(): pull_forward[d] = min(timedelta(hours = float(hours)), self.time_per_day - (holidays[d] if d in holidays else timedelta())) except IOError as e: if e.errno != 2: raise e - days_to_work = dict() + self.days_to_work = dict() if pull_forward: end_day = max(end_day, max(list(pull_forward))) @@ -356,9 +361,9 @@ class Worktime(object): if day in holidays.keys(): time_to_work -= holidays[day] if time_to_work > timedelta(): - days_to_work[day] = time_to_work + self.days_to_work[day] = time_to_work - extra_days_to_work = dict() + self.extra_days_to_work = dict() try: with open(Path(config_dir) / "days-to-work", 'r') as extra_days_to_work_file: @@ -369,15 +374,15 @@ class Worktime(object): if len(splitLine) == 2: [hours, datestr] = splitLine day = datetime.strptime(datestr, date_format).replace(tzinfo=tzlocal()).date() - extra_days_to_work[day] = timedelta(hours = float(hours)) + self.extra_days_to_work[day] = timedelta(hours = float(hours)) else: - extra_days_to_work[datetime.strptime(stripped_line, date_format).replace(tzinfo=tzlocal()).date()] = self.time_per_day + self.extra_days_to_work[datetime.strptime(stripped_line, date_format).replace(tzinfo=tzlocal()).date()] = self.time_per_day except IOError as e: if e.errno != 2: raise e - self.is_workday = self.now.date() in days_to_work or self.now.date() in extra_days_to_work + self.now_is_workday = self.is_workday(self.now.date()) self.time_worked = timedelta() @@ -387,37 +392,37 @@ class Worktime(object): if self.running_entry: self.time_worked += self.running_entry - if self.running_entry and self.include_running and self.force_day_to_work and not (self.now.date() in days_to_work or self.now.date() in extra_days_to_work): - extra_days_to_work[self.now.date()] = timedelta() + if self.running_entry and self.include_running and self.force_day_to_work and not (self.now.date() in self.days_to_work or self.now.date() in self.extra_days_to_work): + self.extra_days_to_work[self.now.date()] = timedelta() - self.time_to_work = sum([days_to_work[day] for day in days_to_work.keys() if day <= end_date.date()], timedelta()) - for day in [d for d in list(pull_forward) if d > end_date.date()]: - days_forward = set([d for d in days_to_work.keys() if d >= end_date.date() and d < day and (not d in pull_forward or d == end_date.date())]) - extra_days_forward = set([d for d in extra_days_to_work.keys() if d >= end_date.date() and d < day and (not d in pull_forward or d == end_date.date())]) + self.time_to_work = sum([self.days_to_work[day] for day in self.days_to_work.keys() if day <= self.end_date.date()], timedelta()) + for day in [d for d in list(pull_forward) if d > self.end_date.date()]: + days_forward = set([d for d in self.days_to_work.keys() if d >= self.end_date.date() and d < day and (not d in pull_forward or d == self.end_date.date())]) + extra_days_forward = set([d for d in self.extra_days_to_work.keys() if d >= self.end_date.date() and d < day and (not d in pull_forward or d == self.end_date.date())]) days_forward = days_forward.union(extra_days_forward) extra_day_time_left = timedelta() for extra_day in extra_days_forward: - day_time = max(timedelta(), self.time_per_day - extra_days_to_work[extra_day]) + day_time = max(timedelta(), self.time_per_day - self.extra_days_to_work[extra_day]) extra_day_time_left += day_time extra_day_time = min(extra_day_time_left, pull_forward[day]) time_forward = pull_forward[day] - extra_day_time if extra_day_time_left > timedelta(): for extra_day in extra_days_forward: - day_time = max(timedelta(), self.time_per_day - extra_days_to_work[extra_day]) - extra_days_to_work[extra_day] += extra_day_time * (day_time / extra_day_time_left) + day_time = max(timedelta(), self.time_per_day - self.extra_days_to_work[extra_day]) + self.extra_days_to_work[extra_day] += extra_day_time * (day_time / extra_day_time_left) hours_per_day_forward = time_forward / len(days_forward) if len(days_forward) > 0 else timedelta() - days_forward.discard(end_date.date()) + days_forward.discard(self.end_date.date()) self.time_pulled_forward += time_forward - hours_per_day_forward * len(days_forward) - if end_date.date() in extra_days_to_work: - self.time_pulled_forward += extra_days_to_work[end_date.date()] + if self.end_date.date() in self.extra_days_to_work: + self.time_pulled_forward += self.extra_days_to_work[self.end_date.date()] self.time_to_work += self.time_pulled_forward - self.time_worked += api.get_billable_hours(start_date, self.now, rounding = config.get("WORKTIME", {}).get("rounding", True)) + self.time_worked += api.get_billable_hours(self.start_date, self.now, rounding = config.get("WORKTIME", {}).get("rounding", True)) def worktime(**args): worktime = Worktime(**args) @@ -458,7 +463,7 @@ def worktime(**args): return f"{indicator}{difference_string}" else: difference_string = difference_string(total_minutes_difference * timedelta(minutes = 1)) - if worktime.is_workday: + if worktime.now_is_workday: return difference_string else: return f"({difference_string})" @@ -498,7 +503,7 @@ def time_worked(now, **args): clockout_time = None clockout_difference = None - if then.is_workday or now.is_workday: + if then.now_is_workday or now.now_is_workday: target_time = max(then.time_per_day, now.time_per_day) if then.time_per_day and now.time_per_day else (then.time_per_day if then.time_per_day else now.time_per_day); difference = target_time - worked clockout_difference = 5 * ceil(difference / timedelta(minutes = 5)) @@ -543,6 +548,8 @@ def leave(year, table, **args): if leave_expires: leave_expires = datetime.strptime(leave_expires, '%m-%d').date() + days = [worktime.start_date.date() + timedelta(days = x) for x in range(0, (worktime.end_date.date() - worktime.start_date.date()).days + 1)] + leave_budget = deepcopy(worktime.leave_budget) year_leave_budget = deepcopy(worktime.leave_budget) if year else None years = sorted(leave_budget.keys()) @@ -589,12 +596,99 @@ def leave(year, table, **args): table_data = [] for year, days in leave_budget.items(): leave_days = sorted([day for day in worktime.leave_days if day.year == year]) - table_data += [[year, days, ','.join(map(lambda d: d.strftime('%m-%d'), leave_days))]] + would_be_workdays = [day for day in days if day.year == year and worktime.would_be_workday(day)] + table_data += [[year, days, f"{len(leave_days)}/{len(list(would_be_workdays))}", ','.join(map(lambda d: d.strftime('%m-%d'), leave_days))]] print(tabulate(table_data, tablefmt="plain")) else: print(leave_budget[year if year else def_year]) +def classification(classification_name, table, **args): + worktime = Worktime(**args) + config = Worktime.config() + date_format = config.get("WORKTIME", {}).get("DateFormat", '%Y-%m-%d') + config_dir = BaseDirectory.load_first_config('worktime') + days = [worktime.start_date.date() + timedelta(days = x) for x in range(0, (worktime.end_date.date() - worktime.start_date.date()).days + 1)] + + year_classification = defaultdict(dict) + year_offset = defaultdict(lambda: 0) + + extra_classified = dict() + classification_files = { + Path(config_dir) / classification_name: True, + Path(config_dir) / f"extra-{classification_name}": True, + Path(config_dir) / f"not-{classification_name}": False, + } + for path, val in classification_files.items(): + try: + with path.open('r') as fh: + for line in fh: + stripped_line = line.strip() + if stripped_line: + fromDay = toDay = None + def parse_single(singlestr): + return datetime.strptime(singlestr, date_format).replace(tzinfo=tzlocal()).date() + if '--' in stripped_line: + [fromDay,toDay] = stripped_line.split('--') + fromDay = parse_single(fromDay) + toDay = parse_single(toDay) + else: + fromDay = toDay = parse_single(stripped_line) + + for day in [fromDay + timedelta(days = x) for x in range(0, (toDay - fromDay).days + 1)]: + extra_classified[day] = val + except IOError as e: + if e.errno != 2: + raise e + + for day in days: + if not worktime.is_workday(day, extra=False): + continue + + classification_days = set() + for default_classification in reversed(config.get("day-classification", {}).get(classification_name, {}).get("default", [])): + from_date = None + to_date = None + + if "from" in default_classification: + from_date = datetime.strptime(default_classification["from"], date_format).replace(tzinfo=tzlocal()).date() + if day < from_date: + continue + + if "to" in default_classification: + to_date = datetime.strptime(default_classification["to"], date_format).replace(tzinfo=tzlocal()).date() + if day >= to_date: + continue + + classification_days = set([int(d.strip()) for d in default_classification.get("days", "").split(',') if d.strip()]) + + default_classification = day.isoweekday() in classification_days + override = None + if day in extra_classified: + override = extra_classified[day] + if override != default_classification: + year_offset[day.year] += 1 if override else -1 + year_classification[day.year][day] = override if override is not None else default_classification + + if not table: + print(sum(year_offset.values())) + else: + table_data = [] + for year in sorted(year_classification.keys()): + row_data = [year] + count_classified = len([1 for day, classified in year_classification[year].items() if classified]) + count_would_be_workdays = len([1 for day in days if day.year == year and worktime.would_be_workday(day) and day not in worktime.leave_days]) + row_data.append(year_offset[year]) + if len(year_classification[year]) != count_would_be_workdays: + row_data.append(f"{count_classified}/{len(year_classification[year])}/{count_would_be_workdays}") + else: + row_data.append(f"{count_classified}/{len(year_classification[year])}") + row_data.append(','.join(sorted([day.strftime('%m-%d') for day, classified in year_classification[year].items() if classified]))) + table_data.append(row_data) + print(tabulate(table_data, tablefmt="plain")) + def main(): + config = Worktime.config() + parser = argparse.ArgumentParser(prog = "worktime", description = 'Track worktime using toggl API') parser.add_argument('--time', dest = 'now', metavar = 'TIME', type = lambda s: datetime.fromisoformat(s).replace(tzinfo=tzlocal()), help = 'Time to calculate status for (default: current time)', default = datetime.now(tzlocal())) parser.add_argument('--no-running', dest = 'include_running', action = 'store_false') @@ -613,6 +707,10 @@ def main(): leave_parser.add_argument('year', metavar = 'YEAR', type = int, help = 'Year to evaluate leave days for (default: current year)', default = None, nargs='?') leave_parser.add_argument('--table', action = 'store_true') leave_parser.set_defaults(cmd = leave) + for classification_name in config.get('day-classification', {}).keys(): + classification_parser = subparsers.add_parser(classification_name) + classification_parser.add_argument('--table', action = 'store_true') + classification_parser.set_defaults(cmd = partial(classification, classification_name=classification_name)) args = parser.parse_args() args.cmd(**vars(args)) -- cgit v1.2.3