summaryrefslogtreecommitdiff
path: root/overlays/waybar-systemd-inhibit/waybar_systemd_inhibit/__main__.py
blob: 35cc7fd1024f955daee5b402e644ad425cdc358f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import asyncclick as click
from dbus_next.aio import MessageBus
from dbus_next import BusType, Message, PropertyAccess
import asyncio
from functools import update_wrapper
from dbus_next.service import ServiceInterface, method, dbus_property
from dbus_next import Variant, DBusError
import os
import json

class BlockInterface(ServiceInterface):
    def __init__(self, system_bus, logind):
        super().__init__('li.yggdrasil.WaybarSystemdInhibit')
        self.system_bus = system_bus
        self.logind = logind
        self.fd = None

    def Release(self):
        if not self.fd:
            return

        os.close(self.fd)
        self.fd = None
        self.emit_properties_changed({'IsAcquired': False})

    async def Acquire(self):
        if self.fd:
            return

        res = await self.system_bus.call(Message(
            destination='org.freedesktop.login1',
            path='/org/freedesktop/login1',
            interface='org.freedesktop.login1.Manager',
            member='Inhibit',
            signature='ssss',
            body=[
                "handle-lid-switch",
                "waybar-systemd-inhibit",
                "User request",
                "block",
            ],
        ))
        self.fd = res.unix_fds[res.body[0]]
        self.emit_properties_changed({'IsAcquired': True})

    @method()
    async def ToggleBlock(self):
        if self.fd:
            self.Release()
        else:
            await self.Acquire()

    @dbus_property(access=PropertyAccess.READ)
    def IsAcquired(self) -> 'b':
        return self.fd is not None


@click.command()
async def main():
    system_bus = await MessageBus(bus_type=BusType.SYSTEM, negotiate_unix_fd=True).connect()
    session_bus = await MessageBus(bus_type=BusType.SESSION).connect()

    introspection = await system_bus.introspect('org.freedesktop.login1', '/org/freedesktop/login1')
    obj = system_bus.get_proxy_object('org.freedesktop.login1', '/org/freedesktop/login1', introspection)
    logind = obj.get_interface('org.freedesktop.login1.Manager')
    properties = obj.get_interface('org.freedesktop.DBus.Properties')

    def is_blocked_logind(what: str):
        return "handle-lid-switch" in what.split(':')

    def print_state(is_blocked: bool, is_acquired: bool = False):
        icon = "󰌢" if is_blocked else "󰛧"
        text = f"<span font=\"Symbols Nerd Font Mono\">{icon}</span>"
        if is_acquired:
            text = f"<span color=\"#f28a21\">{text}</span>"
        elif is_blocked:
            text = f"<span color=\"#ffffff\">{text}</span>"
        print(json.dumps({'text': text, 'tooltip': ("Manually inhibited" if is_acquired else None)}, separators=(',', ':')), flush=True)

    print_state(is_blocked_logind(await logind.get_block_inhibited()))

    async def get_inhibit():
        introspection = await session_bus.introspect('li.yggdrasil.WaybarSystemdInhibit', '/li/yggdrasil/WaybarSystemdInhibit')
        return session_bus.get_proxy_object('li.yggdrasil.WaybarSystemdInhibit', '/li/yggdrasil/WaybarSystemdInhibit', introspection)

    async def on_logind_properties_changed(interface_name, changed_properties, invalidated_properties):
        if 'BlockInhibited' not in changed_properties:
            return

        properties = (await get_inhibit()).get_interface('li.yggdrasil.WaybarSystemdInhibit')

        print_state(is_blocked_logind(changed_properties['BlockInhibited'].value), await properties.get_is_acquired())

    properties.on_properties_changed(on_logind_properties_changed)

    session_bus.export('/li/yggdrasil/WaybarSystemdInhibit', BlockInterface(system_bus, logind))
    await session_bus.request_name('li.yggdrasil.WaybarSystemdInhibit')

    properties = (await get_inhibit()).get_interface('org.freedesktop.DBus.Properties')

    async def on_inhibit_properties_changed(interface_name, changed_properties, invalidated_properties):
        if 'IsAcquired' not in changed_properties:
            return

        print_state(is_blocked_logind(await logind.get_block_inhibited()), changed_properties['IsAcquired'].value)

    properties.on_properties_changed(on_inhibit_properties_changed)

    await session_bus.wait_for_disconnect()

@click.command()
async def toggle():
    session_bus = await MessageBus(bus_type=BusType.SESSION).connect()
    introspection = await session_bus.introspect('li.yggdrasil.WaybarSystemdInhibit', '/li/yggdrasil/WaybarSystemdInhibit')
    obj = session_bus.get_proxy_object('li.yggdrasil.WaybarSystemdInhibit', '/li/yggdrasil/WaybarSystemdInhibit', introspection)
    interface = obj.get_interface('li.yggdrasil.WaybarSystemdInhibit')
    await interface.call_toggle_block()