1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
|
import asyncclick as click
from dbus_next.aio import MessageBus
from dbus_next import BusType, Message, PropertyAccess
import asyncio
from functools import update_wrapper
from dbus_next.service import ServiceInterface, method, dbus_property
from dbus_next import Variant, DBusError
import os
import json
class BlockInterface(ServiceInterface):
def __init__(self, system_bus, logind):
super().__init__('li.yggdrasil.WaybarSystemdInhibit')
self.system_bus = system_bus
self.logind = logind
self.fd = None
def Release(self):
if not self.fd:
return
os.close(self.fd)
self.fd = None
self.emit_properties_changed({'IsAcquired': False})
async def Acquire(self):
if self.fd:
return
res = await self.system_bus.call(Message(
destination='org.freedesktop.login1',
path='/org/freedesktop/login1',
interface='org.freedesktop.login1.Manager',
member='Inhibit',
signature='ssss',
body=[
"handle-lid-switch",
"waybar-systemd-inhibit",
"User request",
"block",
],
))
self.fd = res.unix_fds[res.body[0]]
self.emit_properties_changed({'IsAcquired': True})
@method()
async def ToggleBlock(self):
if self.fd:
self.Release()
else:
await self.Acquire()
@dbus_property(access=PropertyAccess.READ)
def IsAcquired(self) -> 'b':
return self.fd is not None
@click.command()
async def main():
system_bus = await MessageBus(bus_type=BusType.SYSTEM, negotiate_unix_fd=True).connect()
session_bus = await MessageBus(bus_type=BusType.SESSION).connect()
introspection = await system_bus.introspect('org.freedesktop.login1', '/org/freedesktop/login1')
obj = system_bus.get_proxy_object('org.freedesktop.login1', '/org/freedesktop/login1', introspection)
logind = obj.get_interface('org.freedesktop.login1.Manager')
properties = obj.get_interface('org.freedesktop.DBus.Properties')
def is_blocked_logind(what: str):
return "handle-lid-switch" in what.split(':')
def print_state(is_blocked: bool, is_acquired: bool = False):
icon = "󰌢" if is_blocked else "󰛧"
text = f"<span font=\"Symbols Nerd Font Mono\">{icon}</span>"
if is_acquired:
text = f"<span color=\"#f28a21\">{text}</span>"
elif is_blocked:
text = f"<span color=\"#ffffff\">{text}</span>"
print(json.dumps({'text': text, 'tooltip': ("Manually inhibited" if is_acquired else None)}, separators=(',', ':')), flush=True)
print_state(is_blocked_logind(await logind.get_block_inhibited()))
async def get_inhibit():
introspection = await session_bus.introspect('li.yggdrasil.WaybarSystemdInhibit', '/li/yggdrasil/WaybarSystemdInhibit')
return session_bus.get_proxy_object('li.yggdrasil.WaybarSystemdInhibit', '/li/yggdrasil/WaybarSystemdInhibit', introspection)
async def on_logind_properties_changed(interface_name, changed_properties, invalidated_properties):
if 'BlockInhibited' not in changed_properties:
return
properties = (await get_inhibit()).get_interface('li.yggdrasil.WaybarSystemdInhibit')
print_state(is_blocked_logind(changed_properties['BlockInhibited'].value), await properties.get_is_acquired())
properties.on_properties_changed(on_logind_properties_changed)
session_bus.export('/li/yggdrasil/WaybarSystemdInhibit', BlockInterface(system_bus, logind))
await session_bus.request_name('li.yggdrasil.WaybarSystemdInhibit')
properties = (await get_inhibit()).get_interface('org.freedesktop.DBus.Properties')
async def on_inhibit_properties_changed(interface_name, changed_properties, invalidated_properties):
if 'IsAcquired' not in changed_properties:
return
print_state(is_blocked_logind(await logind.get_block_inhibited()), changed_properties['IsAcquired'].value)
properties.on_properties_changed(on_inhibit_properties_changed)
await session_bus.wait_for_disconnect()
@click.command()
async def toggle():
session_bus = await MessageBus(bus_type=BusType.SESSION).connect()
introspection = await session_bus.introspect('li.yggdrasil.WaybarSystemdInhibit', '/li/yggdrasil/WaybarSystemdInhibit')
obj = session_bus.get_proxy_object('li.yggdrasil.WaybarSystemdInhibit', '/li/yggdrasil/WaybarSystemdInhibit', introspection)
interface = obj.get_interface('li.yggdrasil.WaybarSystemdInhibit')
await interface.call_toggle_block()
|