import asyncclick as click
from dbus_next.aio import MessageBus
from dbus_next import BusType, Message, PropertyAccess
import asyncio
from functools import update_wrapper
from dbus_next.service import ServiceInterface, method, dbus_property
from dbus_next import Variant, DBusError
import os
import json
class BlockInterface(ServiceInterface):
def __init__(self, system_bus, logind):
super().__init__('li.yggdrasil.WaybarSystemdInhibit')
self.system_bus = system_bus
self.logind = logind
self.fd = None
def Release(self):
if not self.fd:
return
os.close(self.fd)
self.fd = None
self.emit_properties_changed({'IsAcquired': False})
async def Acquire(self):
if self.fd:
return
res = await self.system_bus.call(Message(
destination='org.freedesktop.login1',
path='/org/freedesktop/login1',
interface='org.freedesktop.login1.Manager',
member='Inhibit',
signature='ssss',
body=[
"handle-lid-switch",
"waybar-systemd-inhibit",
"User request",
"block",
],
))
self.fd = res.unix_fds[res.body[0]]
self.emit_properties_changed({'IsAcquired': True})
@method()
async def ToggleBlock(self):
if self.fd:
self.Release()
else:
await self.Acquire()
@dbus_property(access=PropertyAccess.READ)
def IsAcquired(self) -> 'b':
return self.fd is not None
@click.command()
async def main():
system_bus = await MessageBus(bus_type=BusType.SYSTEM, negotiate_unix_fd=True).connect()
session_bus = await MessageBus(bus_type=BusType.SESSION).connect()
introspection = await system_bus.introspect('org.freedesktop.login1', '/org/freedesktop/login1')
obj = system_bus.get_proxy_object('org.freedesktop.login1', '/org/freedesktop/login1', introspection)
logind = obj.get_interface('org.freedesktop.login1.Manager')
properties = obj.get_interface('org.freedesktop.DBus.Properties')
def is_blocked_logind(what: str):
return "handle-lid-switch" in what.split(':')
def print_state(is_blocked: bool, is_acquired: bool = False):
icon = "" if is_blocked else ""
text = f"{icon}"
if is_acquired:
text = f"{text}"
elif is_blocked:
text = f"{text}"
print(json.dumps({'text': text, 'tooltip': ("Manually inhibited" if is_acquired else None)}, separators=(',', ':')), flush=True)
print_state(is_blocked_logind(await logind.get_block_inhibited()))
async def get_inhibit():
introspection = await session_bus.introspect('li.yggdrasil.WaybarSystemdInhibit', '/li/yggdrasil/WaybarSystemdInhibit')
return session_bus.get_proxy_object('li.yggdrasil.WaybarSystemdInhibit', '/li/yggdrasil/WaybarSystemdInhibit', introspection)
async def on_logind_properties_changed(interface_name, changed_properties, invalidated_properties):
if 'BlockInhibited' not in changed_properties:
return
properties = (await get_inhibit()).get_interface('li.yggdrasil.WaybarSystemdInhibit')
print_state(is_blocked_logind(changed_properties['BlockInhibited'].value), await properties.get_is_acquired())
properties.on_properties_changed(on_logind_properties_changed)
session_bus.export('/li/yggdrasil/WaybarSystemdInhibit', BlockInterface(system_bus, logind))
await session_bus.request_name('li.yggdrasil.WaybarSystemdInhibit')
properties = (await get_inhibit()).get_interface('org.freedesktop.DBus.Properties')
async def on_inhibit_properties_changed(interface_name, changed_properties, invalidated_properties):
if 'IsAcquired' not in changed_properties:
return
print_state(is_blocked_logind(await logind.get_block_inhibited()), changed_properties['IsAcquired'].value)
properties.on_properties_changed(on_inhibit_properties_changed)
await session_bus.wait_for_disconnect()
@click.command()
async def toggle():
session_bus = await MessageBus(bus_type=BusType.SESSION).connect()
introspection = await session_bus.introspect('li.yggdrasil.WaybarSystemdInhibit', '/li/yggdrasil/WaybarSystemdInhibit')
obj = session_bus.get_proxy_object('li.yggdrasil.WaybarSystemdInhibit', '/li/yggdrasil/WaybarSystemdInhibit', introspection)
interface = obj.get_interface('li.yggdrasil.WaybarSystemdInhibit')
await interface.call_toggle_block()