summaryrefslogtreecommitdiff
path: root/overlays/abs-podcast-autoplaylist/abs_podcast_autoplaylist/__main__.py
blob: fd7398058173d34be059d76e9ff02c326afa821d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import click
from pathlib import Path
import tomllib
import requests
from urllib.parse import urljoin
from operator import itemgetter
import re
from frozendict import frozendict

class BearerAuth(requests.auth.AuthBase):
    def __init__(self, token):
        self.token = token
    def __call__(self, r):
        r.headers["authorization"] = "Bearer " + self.token
        return r

class ABSSession(requests.Session):
    def __init__(self, config):
        super().__init__()
        self.base_url = config['instance']
        self.auth = BearerAuth(config['api_token'])

    def request(self, method, url, *args, **kwargs):
        joined_url = urljoin(self.base_url, url)
        return super().request(method, joined_url, *args, **kwargs)

@click.command()
@click.argument('config_file', type=click.Path(dir_okay=False, path_type=Path))
def main(config_file: Path):
    with config_file.open('rb') as fh:
        config = tomllib.load(fh)

    with ABSSession(config) as s:
        libraries = s.get('/api/libraries').json()['libraries']
        playlists = s.get('/api/playlists').json()['playlists']

        for library_config in config['libraries']:
            [library] = filter(lambda l: l['name'] == library_config['name'], libraries)
            filtered_playlists = list(filter(lambda p: p['name'] == library_config['playlist'] and p['libraryId'] == library['id'], playlists))
            def get_playlist():
                playlist = None
                if filtered_playlists:
                    [playlist] = filtered_playlists
                if not playlist:
                    playlist = s.post('/api/playlists', json={
                        'libraryId': library['id'],
                        'name': library_config['playlist'],
                    }).json()
                return playlist

            podcasts = dict()
            items = s.get('/api/libraries/{}/items'.format(library['id'])).json()['results']
            for item in items:
                item = s.get('/api/items/{}'.format(item['id']), json={'expanded': True}).json()
                episodes = list()
                for episode in sorted(item['media']['episodes'], key = itemgetter('publishedAt')):
                    progress = s.get('/api/me/progress/{}/{}'.format(episode['libraryItemId'], episode['id']))
                    if progress.ok and progress.json()["isFinished"]:
                        continue
                    episodes.append(episode)
                podcasts[item['media']['metadata']['title']] = list(map(lambda x: frozendict({ 'libraryItemId': x['libraryItemId'], 'episodeId': x['id']}), episodes))
            def lookup_podcast(expr):
                expr = re.compile(expr, flags=re.I)
                matches = filter(lambda t: expr.search(t), podcasts.keys())
                match list(matches):
                    case [x]:
                        return (x,)
                    case _:
                        raise RuntimeError("No unique match for ‘{}’".format(expr))

            priorities = [
                [
                    k
                    for item in (section if type(section) is list else [section])
                    for k in lookup_podcast(item)
                ]
                for section in library_config['priorities']
            ]

            playlist_items = list()
            for section in priorities:
                while any(map(lambda item: item in podcasts, section)):
                    for item in section:
                        if not item in podcasts:
                            continue

                        if not podcasts[item]:
                            del podcasts[item]
                            continue

                        playlist_items.append(podcasts[item].pop(0))

            playlist = get_playlist()
            current_playlist_items = map(lambda item: frozendict({ k: v for k, v in item.items() if k in {'libraryItemId', 'episodeId'}}), playlist['items'])

            if current_playlist_items == playlist_items:
                continue

            to_remove = set(current_playlist_items) - set(playlist_items)
            if to_remove:
                s.post('/api/playlists/{}/batch/remove'.format(playlist['id']), json={'items': list(to_remove)}).raise_for_status()
                playlist = get_playlist()
            to_add = set(playlist_items) - set(current_playlist_items)
            if to_add:
                s.post('/api/playlists/{}/batch/add'.format(playlist['id']), json={'items': list(to_add)}).raise_for_status()

            r = s.patch('/api/playlists/{}'.format(playlist['id']), json={'items': playlist_items}).raise_for_status()