#!/bin/python3 import dbus import dbus.mainloop.glib import sys import threading from gi.repository import GLib from queue import Queue BUS = "org.bluez" INTERFACE = "org.bluez.Device1" class BTD(object): def __init__(self): dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) self.disconnect_queue: Queue = Queue() self.loop = GLib.MainLoop() self.bus = dbus.SystemBus() def _callback(self) -> None: self.disconnect_queue.get() self.disconnect_queue.task_done() def _shutdown(self) -> None: self.disconnect_queue.join() self.loop.quit() def get_connected_devices(self) -> list: try: manager = dbus.Interface(self.bus.get_object(BUS, "/",), "org.freedesktop.DBus.ObjectManager") except dbus.DBusException: return [] return [object for object, iface in manager.GetManagedObjects().items() if iface.get(INTERFACE, {}).get("Connected")] def disconnect_all(self) -> None: devices = self.get_connected_devices() for object in devices: self.disconnect_queue.put(object) device = dbus.Interface(self.bus.get_object(BUS, object), INTERFACE) device.Disconnect(reply_handler=self._callback, error_handler=self._callback) if len(devices) > 0: threading.Thread(target=self._shutdown).start() self.loop.run() if __name__ == "__main__": if len(sys.argv) > 1 and sys.argv[1] == "pre": r = BTD() r.disconnect_all()