| 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
 | import asyncclick as click
from dbus_next.aio import MessageBus
from dbus_next import BusType, Message, PropertyAccess
import asyncio
from functools import update_wrapper
from dbus_next.service import ServiceInterface, method, dbus_property
from dbus_next import Variant, DBusError
import os
import json
class BlockInterface(ServiceInterface):
    def __init__(self, system_bus, logind):
        super().__init__('li.yggdrasil.WaybarSystemdInhibit')
        self.system_bus = system_bus
        self.logind = logind
        self.fd = None
    def Release(self):
        if not self.fd:
            return
        os.close(self.fd)
        self.fd = None
        self.emit_properties_changed({'IsAcquired': False})
    async def Acquire(self):
        if self.fd:
            return
        res = await self.system_bus.call(Message(
            destination='org.freedesktop.login1',
            path='/org/freedesktop/login1',
            interface='org.freedesktop.login1.Manager',
            member='Inhibit',
            signature='ssss',
            body=[
                "handle-lid-switch",
                "waybar-systemd-inhibit",
                "User request",
                "block",
            ],
        ))
        self.fd = res.unix_fds[res.body[0]]
        self.emit_properties_changed({'IsAcquired': True})
    @method()
    async def ToggleBlock(self):
        if self.fd:
            self.Release()
        else:
            await self.Acquire()
    @dbus_property(access=PropertyAccess.READ)
    def IsAcquired(self) -> 'b':
        return self.fd is not None
@click.command()
async def main():
    system_bus = await MessageBus(bus_type=BusType.SYSTEM, negotiate_unix_fd=True).connect()
    session_bus = await MessageBus(bus_type=BusType.SESSION).connect()
    introspection = await system_bus.introspect('org.freedesktop.login1', '/org/freedesktop/login1')
    obj = system_bus.get_proxy_object('org.freedesktop.login1', '/org/freedesktop/login1', introspection)
    logind = obj.get_interface('org.freedesktop.login1.Manager')
    properties = obj.get_interface('org.freedesktop.DBus.Properties')
    def is_blocked_logind(what: str):
        return "handle-lid-switch" in what.split(':')
    def print_state(is_blocked: bool, is_acquired: bool = False):
        icon = "󰌢" if is_blocked else "󰛧"
        text = f"<span font=\"Symbols Nerd Font Mono\">{icon}</span>"
        if is_acquired:
            text = f"<span color=\"#f28a21\">{text}</span>"
        elif is_blocked:
            text = f"<span color=\"#ffffff\">{text}</span>"
        print(json.dumps({'text': text, 'tooltip': ("Manually inhibited" if is_acquired else None)}, separators=(',', ':')), flush=True)
    print_state(is_blocked_logind(await logind.get_block_inhibited()))
    async def get_inhibit():
        introspection = await session_bus.introspect('li.yggdrasil.WaybarSystemdInhibit', '/li/yggdrasil/WaybarSystemdInhibit')
        return session_bus.get_proxy_object('li.yggdrasil.WaybarSystemdInhibit', '/li/yggdrasil/WaybarSystemdInhibit', introspection)
    async def on_logind_properties_changed(interface_name, changed_properties, invalidated_properties):
        if 'BlockInhibited' not in changed_properties:
            return
        properties = (await get_inhibit()).get_interface('li.yggdrasil.WaybarSystemdInhibit')
        print_state(is_blocked_logind(changed_properties['BlockInhibited'].value), await properties.get_is_acquired())
    properties.on_properties_changed(on_logind_properties_changed)
    session_bus.export('/li/yggdrasil/WaybarSystemdInhibit', BlockInterface(system_bus, logind))
    await session_bus.request_name('li.yggdrasil.WaybarSystemdInhibit')
    properties = (await get_inhibit()).get_interface('org.freedesktop.DBus.Properties')
    async def on_inhibit_properties_changed(interface_name, changed_properties, invalidated_properties):
        if 'IsAcquired' not in changed_properties:
            return
        print_state(is_blocked_logind(await logind.get_block_inhibited()), changed_properties['IsAcquired'].value)
    properties.on_properties_changed(on_inhibit_properties_changed)
    await session_bus.wait_for_disconnect()
@click.command()
async def toggle():
    session_bus = await MessageBus(bus_type=BusType.SESSION).connect()
    introspection = await session_bus.introspect('li.yggdrasil.WaybarSystemdInhibit', '/li/yggdrasil/WaybarSystemdInhibit')
    obj = session_bus.get_proxy_object('li.yggdrasil.WaybarSystemdInhibit', '/li/yggdrasil/WaybarSystemdInhibit', introspection)
    interface = obj.get_interface('li.yggdrasil.WaybarSystemdInhibit')
    await interface.call_toggle_block()
 |