summaryrefslogtreecommitdiff
path: root/overlays/waybar-systemd-inhibit/waybar_systemd_inhibit/__main__.py
diff options
context:
space:
mode:
Diffstat (limited to 'overlays/waybar-systemd-inhibit/waybar_systemd_inhibit/__main__.py')
-rw-r--r--overlays/waybar-systemd-inhibit/waybar_systemd_inhibit/__main__.py117
1 files changed, 117 insertions, 0 deletions
diff --git a/overlays/waybar-systemd-inhibit/waybar_systemd_inhibit/__main__.py b/overlays/waybar-systemd-inhibit/waybar_systemd_inhibit/__main__.py
new file mode 100644
index 00000000..35cc7fd1
--- /dev/null
+++ b/overlays/waybar-systemd-inhibit/waybar_systemd_inhibit/__main__.py
@@ -0,0 +1,117 @@
1import asyncclick as click
2from dbus_next.aio import MessageBus
3from dbus_next import BusType, Message, PropertyAccess
4import asyncio
5from functools import update_wrapper
6from dbus_next.service import ServiceInterface, method, dbus_property
7from dbus_next import Variant, DBusError
8import os
9import json
10
11class BlockInterface(ServiceInterface):
12 def __init__(self, system_bus, logind):
13 super().__init__('li.yggdrasil.WaybarSystemdInhibit')
14 self.system_bus = system_bus
15 self.logind = logind
16 self.fd = None
17
18 def Release(self):
19 if not self.fd:
20 return
21
22 os.close(self.fd)
23 self.fd = None
24 self.emit_properties_changed({'IsAcquired': False})
25
26 async def Acquire(self):
27 if self.fd:
28 return
29
30 res = await self.system_bus.call(Message(
31 destination='org.freedesktop.login1',
32 path='/org/freedesktop/login1',
33 interface='org.freedesktop.login1.Manager',
34 member='Inhibit',
35 signature='ssss',
36 body=[
37 "handle-lid-switch",
38 "waybar-systemd-inhibit",
39 "User request",
40 "block",
41 ],
42 ))
43 self.fd = res.unix_fds[res.body[0]]
44 self.emit_properties_changed({'IsAcquired': True})
45
46 @method()
47 async def ToggleBlock(self):
48 if self.fd:
49 self.Release()
50 else:
51 await self.Acquire()
52
53 @dbus_property(access=PropertyAccess.READ)
54 def IsAcquired(self) -> 'b':
55 return self.fd is not None
56
57
58@click.command()
59async def main():
60 system_bus = await MessageBus(bus_type=BusType.SYSTEM, negotiate_unix_fd=True).connect()
61 session_bus = await MessageBus(bus_type=BusType.SESSION).connect()
62
63 introspection = await system_bus.introspect('org.freedesktop.login1', '/org/freedesktop/login1')
64 obj = system_bus.get_proxy_object('org.freedesktop.login1', '/org/freedesktop/login1', introspection)
65 logind = obj.get_interface('org.freedesktop.login1.Manager')
66 properties = obj.get_interface('org.freedesktop.DBus.Properties')
67
68 def is_blocked_logind(what: str):
69 return "handle-lid-switch" in what.split(':')
70
71 def print_state(is_blocked: bool, is_acquired: bool = False):
72 icon = "󰌢" if is_blocked else "󰛧"
73 text = f"<span font=\"Symbols Nerd Font Mono\">{icon}</span>"
74 if is_acquired:
75 text = f"<span color=\"#f28a21\">{text}</span>"
76 elif is_blocked:
77 text = f"<span color=\"#ffffff\">{text}</span>"
78 print(json.dumps({'text': text, 'tooltip': ("Manually inhibited" if is_acquired else None)}, separators=(',', ':')), flush=True)
79
80 print_state(is_blocked_logind(await logind.get_block_inhibited()))
81
82 async def get_inhibit():
83 introspection = await session_bus.introspect('li.yggdrasil.WaybarSystemdInhibit', '/li/yggdrasil/WaybarSystemdInhibit')
84 return session_bus.get_proxy_object('li.yggdrasil.WaybarSystemdInhibit', '/li/yggdrasil/WaybarSystemdInhibit', introspection)
85
86 async def on_logind_properties_changed(interface_name, changed_properties, invalidated_properties):
87 if 'BlockInhibited' not in changed_properties:
88 return
89
90 properties = (await get_inhibit()).get_interface('li.yggdrasil.WaybarSystemdInhibit')
91
92 print_state(is_blocked_logind(changed_properties['BlockInhibited'].value), await properties.get_is_acquired())
93
94 properties.on_properties_changed(on_logind_properties_changed)
95
96 session_bus.export('/li/yggdrasil/WaybarSystemdInhibit', BlockInterface(system_bus, logind))
97 await session_bus.request_name('li.yggdrasil.WaybarSystemdInhibit')
98
99 properties = (await get_inhibit()).get_interface('org.freedesktop.DBus.Properties')
100
101 async def on_inhibit_properties_changed(interface_name, changed_properties, invalidated_properties):
102 if 'IsAcquired' not in changed_properties:
103 return
104
105 print_state(is_blocked_logind(await logind.get_block_inhibited()), changed_properties['IsAcquired'].value)
106
107 properties.on_properties_changed(on_inhibit_properties_changed)
108
109 await session_bus.wait_for_disconnect()
110
111@click.command()
112async def toggle():
113 session_bus = await MessageBus(bus_type=BusType.SESSION).connect()
114 introspection = await session_bus.introspect('li.yggdrasil.WaybarSystemdInhibit', '/li/yggdrasil/WaybarSystemdInhibit')
115 obj = session_bus.get_proxy_object('li.yggdrasil.WaybarSystemdInhibit', '/li/yggdrasil/WaybarSystemdInhibit', introspection)
116 interface = obj.get_interface('li.yggdrasil.WaybarSystemdInhibit')
117 await interface.call_toggle_block()