asda?‰PNG
IHDR ? f ??C1 sRGB ??é gAMA ±?üa pHYs ? ??o¨d GIDATx^íüL”÷e÷Y?a?("Bh?_ò???¢§?q5k?*:t0A-o??¥]VkJ¢M??f?±8\k2íll£1]q?ù???T
dbus/constants.py 0000644 00000002700 15102715602 0010065 0 ustar 00 # -*- coding: utf-8 -*-
# slip.dbus.constants -- constant values
#
# Copyright © 2011 Red Hat, Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
#
# Authors:
# Nils Philippsen
"""This module contains some constant values."""
# The maximum value of a 32bit signed integer is the magic value to indicate an
# infinite timeout for dbus. Unlike the C interface which deals with
# milliseconds as integers, the python interface uses seconds as floats for the
# timeout. Therefore we need to use the Python float (C double) value that
# gives 0x7FFFFFFF if multiplied by 1000.0 and cast into an integer.
#
# This calculation should be precise enough to get a value of 0x7FFFFFFF on the
# C side. If not, it will still amount to a very long time (not quite 25 days)
# which should be enough for all intents and purposes.
method_call_no_timeout = 0x7FFFFFFF / 1000.0
dbus/bus.py 0000644 00000002531 15102715602 0006644 0 ustar 00 # -*- coding: utf-8 -*-
# slip.dbus.bus -- augmented dbus buses
#
# Copyright © 2009, 2011 Red Hat, Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
#
# Authors:
# Nils Philippsen
"""This module contains functions which create monkey-patched/augmented D-Bus
buses."""
from __future__ import absolute_import
import dbus
from . import proxies
from . import constants
for name in ("Bus", "SystemBus", "SessionBus", "StarterBus"):
exec(
"""def %(name)s(*args, **kwargs):
busobj = dbus.%(name)s(*args, **kwargs)
busobj.ProxyObjectClass = proxies.ProxyObject
busobj.default_timeout = %(default_timeout)s
return busobj
""" % {
"name": name, "modname": __name__,
"default_timeout": constants.method_call_no_timeout})
dbus/introspection.py 0000644 00000010330 15102715602 0010747 0 ustar 00 # -*- coding: utf-8 -*-
# slip.dbus.introspection -- access dbus introspection data
#
# Copyright © 2011 Red Hat, Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
#
# Authors:
# Nils Philippsen
"""Classes and functions to easily access DBus introspection data."""
from __future__ import absolute_import
from xml.etree.ElementTree import ElementTree
from io import StringIO
from six import with_metaclass
class IElemMeta(type):
"""Metaclass for introspection elements.
Sets elemname class member automatically from class name if not set
explicitly. Registers classes for their element names."""
elemnames_to_classes = {}
@classmethod
def clsname_to_elemname(cls, clsname):
elemname = ""
for c in clsname:
c_lower = c.lower()
if c_lower != c:
if len(elemname):
elemname += "_"
elemname += c_lower
return elemname
def __new__(cls, name, bases, dct):
if name == "IElem":
return type.__new__(cls, name, bases, dct)
if 'elemname' not in dct:
if not name.startswith("IElem"):
raise TypeError(
"Class '%s' needs to set elemname (or be called "
"'IElem...'))" % name)
dct['elemname'] = IElemMeta.clsname_to_elemname(name[5:])
elemname = dct['elemname']
if elemname in IElemMeta.elemnames_to_classes:
raise TypeError(
"Class '%s' tries to register duplicate elemname '%s'" %
(name, elemname))
kls = type.__new__(cls, name, bases, dct)
IElemMeta.elemnames_to_classes[elemname] = kls
return kls
class IElem(with_metaclass(IElemMeta, object)):
"""Base class for introspection elements."""
def __new__(cls, elem, parent=None):
kls = IElemMeta.elemnames_to_classes.get(
elem.tag, IElemMeta.elemnames_to_classes[None])
return super(IElem, cls).__new__(kls, elem, parent)
def __init__(self, elem, parent=None):
self.elem = elem
self.parent = parent
self.child_elements = [IElem(c, parent=self) for c in elem]
def __str__(self):
s = "%s %r" % (self.elemname if self.elemname else "unknown:%s" %
self.elem.tag, self.attrib)
for c in self.child_elements:
for cc in str(c).split("\n"):
s += "\n %s" % (cc)
return s
@property
def attrib(self):
return self.elem.attrib
class IElemUnknown(IElem):
"""Catch-all for unknown introspection elements."""
elemname = None
class IElemNameMixin(object):
"""Mixin for introspection elements with names."""
@property
def name(self):
return self.attrib['name']
class IElemNode(IElem, IElemNameMixin):
"""Introspection node."""
def __init__(self, elem, parent=None):
super(IElemNode, self).__init__(elem, parent)
self.child_nodes = [
c for c in self.child_elements if isinstance(c, IElemNode)]
class IElemInterface(IElem):
"""Introspection interface."""
class IElemMethod(IElem):
"""Introspection interface method."""
class IElemArg(IElem):
"""Introspection method argument."""
class IElemSignal(IElem, IElemNameMixin):
"""Introspection interface signal."""
def introspect(string_or_file):
tree = ElementTree()
# assume string if read() method doesn't exist, works for string, unicode,
# dbus.String
if not hasattr(string_or_file, "read"):
string_or_file = StringIO(string_or_file)
xml_root = tree.parse(string_or_file)
elem_root = IElem(xml_root)
return elem_root
dbus/mainloop.py 0000644 00000006443 15102715602 0007677 0 ustar 00 # -*- coding: utf-8 -*-
# slip.dbus.mainloop -- mainloop wrappers
#
# Copyright © 2009, 2012 Red Hat, Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
#
# Authors:
# Nils Philippsen
"""This module contains mainloop wrappers.
Currently only glib main loops are supported."""
from __future__ import absolute_import
__all__ = ("MainLoop", "set_type")
class MainLoop(object):
"""An abstract main loop wrapper class and factory.
Use MainLoop() to get a main loop wrapper object for a main loop type
previously registered with set_type(). Defaults to glib main loops.
Actual main loop wrapper classes are derived from this class."""
__mainloop_class = None
def __new__(cls, *args, **kwargs):
global _mainloop_class
if MainLoop._mainloop_class is None:
MainLoop.set_type("glib")
return super(MainLoop, cls).__new__(
MainLoop.__mainloop_class, *args, **kwargs)
@classmethod
def set_type(cls, mltype):
"""Set a main loop type for non-blocking interfaces.
mltype: "glib" (currently only glib main loops are supported)"""
if MainLoop.__mainloop_class is not None:
raise RuntimeError("The main loop type can only be set once.")
ml_type_class = {"glib": GlibMainLoop}
if mltype in ml_type_class:
MainLoop.__mainloop_class = ml_type_class[mltype]
else:
raise ValueError(
"'%s' is not one of the valid main loop types:\n%s" %
(mltype, ", ".join(ml_type_class)))
def pending(self):
"""Returns if there are pending events."""
raise NotImplementedError()
def iterate(self):
"""Iterates over one pending event."""
raise NotImplementedError()
def iterate_over_pending_events(self):
"""Iterates over all pending events."""
while self.pending():
self.iterate()
def run(self):
"""Runs the main loop."""
raise NotImplementedError()
def quit(self):
"""Quits the main loop."""
raise NotImplementedError()
class GlibMainLoop(MainLoop):
def __init__(self):
from .._wrappers import _glib
ml = _glib.MainLoop()
ctx = ml.get_context()
self._mainloop = ml
self.pending = ctx.pending
self.iterate = ctx.iteration
self.run = ml.run
self.quit = ml.quit
def set_type(mltype):
"""Set a main loop type for non-blocking interfaces.
mltype: "glib" (currently only glib main loops are supported)
Deprecated, use MainLoop.set_type() instead."""
from warnings import warn
warn("use MainLoop.set_type() instead", DeprecationWarning)
MainLoop.set_type(mltype)
dbus/polkit.py 0000644 00000022111 15102715602 0007351 0 ustar 00 # -*- coding: utf-8 -*-
# slip.dbus.polkit -- convenience decorators and functions for using PolicyKit
# with dbus services and clients
#
# Copyright © 2008, 2009, 2012, 2013, 2015 Red Hat, Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
#
# Authors:
# Nils Philippsen
"""This module contains convenience decorators and functions for using
PolicyKit with dbus services and clients."""
from __future__ import absolute_import
import collections
import dbus
from decorator import decorator
from functools import reduce
from .constants import method_call_no_timeout
__all__ = ["require_auth", "enable_proxy", "AUTHFAIL_DONTCATCH",
"NotAuthorizedException", "AreAuthorizationsObtainable",
"IsSystemBusNameAuthorizedAsync"]
def require_auth(polkit_auth):
"""Decorator for DBus service methods.
Specify that a user needs a specific PolicyKit authorization `polkit_auth´
to execute it."""
def require_auth_decorator(method):
assert hasattr(method, "_dbus_is_method")
setattr(method, "_slip_polkit_auth_required", polkit_auth)
return method
return require_auth_decorator
AUTH_EXC_PREFIX = \
"org.fedoraproject.slip.dbus.service.PolKit.NotAuthorizedException."
class AUTHFAIL_DONTCATCH(object):
pass
def enable_proxy(
func=None, authfail_result=AUTHFAIL_DONTCATCH, authfail_exception=None,
authfail_callback=None):
"""Decorator for DBus proxy methods.
Let's you (optionally) specify either a result value or an exception type
and a callback which is returned, thrown or called respectively if a
PolicyKit authorization doesn't exist or can't be obtained in the DBus
mechanism, i.e. an appropriate DBus exception is thrown.
An exception constructor may and a callback must accept an `action_id´
parameter which will be set to the id of the PolicyKit action for which
authorization could not be obtained.
Examples:
1) Return `False´ in the event of an authorization problem, and call
`error_handler´:
def error_handler(action_id=None):
print "Authorization problem:", action_id
class MyProxy(object):
@polkit.enable_proxy(authfail_result=False,
authfail_callback=error_handler)
def some_method(self, ...):
...
2) Throw a `MyAuthError´ instance in the event of an authorization problem:
class MyAuthError(Exception):
def __init__(self, *args, **kwargs):
action_id = kwargs.pop("action_id")
super(MyAuthError, self).__init__(*args, **kwargs)
self.action_id = action_id
class MyProxy(object):
@polkit.enable_proxy(authfail_exception=MyAuthError)
def some_method(self, ...):
..."""
assert(func is None or isinstance(func, collections.Callable))
assert(
authfail_result in (None, AUTHFAIL_DONTCATCH) or
authfail_exception is None)
assert(
authfail_callback is None or
isinstance(authfail_callback, collections.Callable))
assert(
authfail_exception is None or
issubclass(authfail_exception, Exception))
def _enable_proxy(func, *p, **k):
try:
return func(*p, **k)
except dbus.DBusException as e:
exc_name = e.get_dbus_name()
if not exc_name.startswith(AUTH_EXC_PREFIX):
raise
action_id = exc_name[len(AUTH_EXC_PREFIX):]
if authfail_callback is not None:
authfail_callback(action_id=action_id)
if authfail_exception is not None:
try:
af_exc = authfail_exception(action_id=action_id)
except:
af_exc = authfail_exception()
raise af_exc
if authfail_result is AUTHFAIL_DONTCATCH:
raise
return authfail_result
if func is not None:
return decorator(_enable_proxy, func)
else:
def decorate(func):
return decorator(_enable_proxy, func)
return decorate
class NotAuthorizedException(dbus.DBusException):
"""Exception which a DBus service method throws if an authorization
required for executing it can't be obtained."""
_dbus_error_name = \
"org.fedoraproject.slip.dbus.service.PolKit.NotAuthorizedException"
def __init__(self, action_id, *p, **k):
self._dbus_error_name = self.__class__._dbus_error_name + "." +\
action_id
super(NotAuthorizedException, self).__init__(*p, **k)
class PolKit(object):
"""Convenience wrapper around polkit."""
_dbus_name = 'org.freedesktop.PolicyKit1'
_dbus_path = '/org/freedesktop/PolicyKit1/Authority'
_dbus_interface = 'org.freedesktop.PolicyKit1.Authority'
__interface = None
__bus = None
__bus_name = None
__signal_receiver = None
@classmethod
def _on_name_owner_changed(cls, name, old_owner, new_owner):
if name == cls._dbus_name and PolKit.__bus:
PolKit.__bus.remove_signal_receiver(PolKit.__signal_receiver)
PolKit.__bus = None
PolKit.__signal_receiver = None
PolKit.__interface = None
@property
def _bus(self):
if not PolKit.__bus:
PolKit.__bus = dbus.SystemBus()
PolKit.__signal_receiver = PolKit.__bus.add_signal_receiver(
handler_function=self._on_name_owner_changed,
signal_name='NameOwnerChanged',
dbus_interface='org.freedesktop.DBus',
arg0=self._dbus_name)
return PolKit.__bus
@property
def _bus_name(self):
if not PolKit.__bus_name:
PolKit.__bus_name = self._bus.get_unique_name()
return PolKit.__bus_name
@property
def _interface(self):
if not PolKit.__interface:
try:
PolKit.__interface = dbus.Interface(self._bus.get_object(
self._dbus_name, self._dbus_path),
self._dbus_interface)
except dbus.DBusException:
pass
return PolKit.__interface
@property
def _polkit_present(self):
return bool(self._interface)
def __dbus_system_bus_name_uid(self, system_bus_name):
bus_object = self._bus.get_object(
'org.freedesktop.DBus', '/org/freedesktop/DBus')
bus_interface = dbus.Interface(bus_object, 'org.freedesktop.DBus')
try:
uid = bus_interface.GetConnectionUnixUser(system_bus_name)
except:
uid = None
return uid
def __authorization_is_obtainable(self, authorization):
if not self._polkit_present:
return True
(is_authorized, is_challenge, details) = \
self._interface.CheckAuthorization(
("system-bus-name", {"name": self._bus_name}),
authorization, {}, 0, "")
return is_authorized or is_challenge
def AreAuthorizationsObtainable(self, authorizations):
if not self._polkit_present:
return True
if not isinstance(authorizations, (tuple, list, set)):
authorizations = (authorizations,)
obtainable = \
reduce(
lambda x, y: x and self.__authorization_is_obtainable(y),
authorizations, True)
return obtainable
def IsSystemBusNameAuthorizedAsync(
self, system_bus_name, action_id, reply_handler, error_handler,
challenge=True, details={}):
if not self._polkit_present:
return reply_handler(action_id is None or
self.__dbus_system_bus_name_uid(system_bus_name) == 0)
flags = 0
if challenge:
flags |= 0x1
def reply_cb(args):
(is_authorized, is_challenge, details) = args
reply_handler(is_authorized)
self._interface.CheckAuthorization(
("system-bus-name", {"name": system_bus_name}),
action_id, details, flags, "",
reply_handler=reply_cb, error_handler=error_handler,
timeout=method_call_no_timeout)
__polkit = PolKit()
def AreAuthorizationsObtainable(authorizations):
return __polkit.AreAuthorizationsObtainable(authorizations)
def IsSystemBusNameAuthorizedAsync(
system_bus_name, action_id, reply_handler, error_handler, challenge=True,
details={}):
return __polkit.IsSystemBusNameAuthorizedAsync(
system_bus_name, action_id, reply_handler, error_handler, challenge,
details)
dbus/proxies.py 0000644 00000003540 15102715602 0007545 0 ustar 00 # -*- coding: utf-8 -*-
# slip.dbus.proxies -- slightly augmented dbus proxy classes
#
# Copyright © 2005-2007 Collabora Ltd.
# Copyright © 2009, 2011 Red Hat, Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
#
# Authors:
# Nils Philippsen
"""This module contains D-Bus proxy classes which implement the default
timeout of the augmented bus classes in slip.dbus.bus."""
from __future__ import absolute_import
import dbus.proxies
from . import constants
class _ProxyMethod(dbus.proxies._ProxyMethod):
_connections_default_timeouts = {}
@property
def default_timeout(self):
if self._connection not in self._connections_default_timeouts:
dt = getattr(self._proxy._bus, "default_timeout", None)
if dt is None:
dt = constants.method_call_no_timeout
self._connections_default_timeouts[self._connection] = dt
return self._connections_default_timeouts[self._connection]
def __call__(self, *args, **kwargs):
if kwargs.get('timeout') is None:
kwargs["timeout"] = self.default_timeout
return dbus.proxies._ProxyMethod.__call__(self, *args, **kwargs)
class ProxyObject(dbus.proxies.ProxyObject):
ProxyMethodClass = _ProxyMethod
dbus/service.py 0000644 00000017640 15102715602 0007522 0 ustar 00 # -*- coding: utf-8 -*-
# slip.dbus.service -- convenience functions for using dbus-activated
# services
#
# Copyright © 2008, 2009, 2015 Red Hat, Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
#
# Authors:
# Nils Philippsen
"This module contains convenience functions for using dbus-activated services."
from __future__ import absolute_import
import dbus
import dbus.service
from six import with_metaclass
from .._wrappers import _glib as GLib
from . import polkit
__all__ = ["Object", "InterfaceType", "set_mainloop"]
__mainloop__ = None
def __glib_quit_cb__():
global __mainloop__
# assume a Glib mainloop
__mainloop__.quit()
__quit_cb__ = __glib_quit_cb__
def set_mainloop(mainloop):
global __mainloop__
__mainloop__ = mainloop
def set_quit_cb(quit_cb):
global __quit_cb__
__quit_cb__ = quit_cb
def quit_cb():
global __quit_cb__
__quit_cb__()
SENDER_KEYWORD = "__slip_dbus_service_sender__"
ASYNC_CALLBACKS = ("__slip_dbus_service_reply_cb__",
"__slip_dbus_service_error_cb__")
def wrap_method(method):
global SENDER_KEYWORD
global ASYNC_CALLBACKS
if method._dbus_sender_keyword is not None:
sender_keyword = method._dbus_sender_keyword
hide_sender_keyword = False
else:
sender_keyword = SENDER_KEYWORD
hide_sender_keyword = True
if method._dbus_async_callbacks is not None:
async_callbacks = method._dbus_async_callbacks
method_is_async = True
else:
async_callbacks = ASYNC_CALLBACKS
method_is_async = False
hide_async_callbacks = not method_is_async
def wrapped_method(self, *p, **k):
sender = k.get(sender_keyword)
if sender is not None:
# i.e. called over the bus, not locally
reply_cb = k[async_callbacks[0]]
error_cb = k[async_callbacks[1]]
if hide_sender_keyword:
del k[sender_keyword]
if hide_async_callbacks:
del k[async_callbacks[0]]
del k[async_callbacks[1]]
self.sender_seen(sender)
action_id = getattr(method, "_slip_polkit_auth_required",
getattr(self, "default_polkit_auth_required",
None))
if sender is not None and action_id:
def reply_handler(is_auth):
if is_auth:
if method_is_async:
# k contains async callbacks, simply pass on reply_cb
# and error_cb
method(self, *p, **k)
else:
# execute the synchronous method ...
error = None
try:
result = method(self, *p, **k)
except Exception as e:
error = e
# ... and call the reply or error callback
if error:
error_cb(error)
else:
# reply_cb((None,)) != reply_cb()
if result is None:
reply_cb()
else:
reply_cb(result)
else:
error_cb(polkit.NotAuthorizedException(action_id))
self.timeout_restart()
def error_handler(error):
error_cb(error)
self.timeout_restart()
polkit.IsSystemBusNameAuthorizedAsync(
sender, action_id,
reply_handler=reply_handler, error_handler=error_handler)
else:
# no action id, or run locally, no need to do anything fancy
retval = method(self, *p, **k)
self.timeout_restart()
return retval
for attr in (x for x in dir(method) if x[:6] == "_dbus_"):
if attr == "_dbus_sender_keyword":
wrapped_method._dbus_sender_keyword = sender_keyword
elif attr == "_dbus_async_callbacks":
wrapped_method._dbus_async_callbacks = async_callbacks
else:
setattr(wrapped_method, attr, getattr(method, attr))
# delattr (method, attr)
wrapped_method.__name__ = method.__name__
return wrapped_method
class InterfaceType(dbus.service.InterfaceType):
def __new__(cls, name, bases, dct):
for (attrname, attr) in dct.items():
if getattr(attr, "_dbus_is_method", False):
dct[attrname] = wrap_method(attr)
return super(InterfaceType, cls).__new__(cls, name, bases, dct)
class Object(with_metaclass(InterfaceType, dbus.service.Object)):
# timeout & persistence
persistent = False
default_duration = 5
duration = default_duration
current_source = None
senders = set()
connections_senders = {}
connections_smobjs = {}
# PolicyKit
default_polkit_auth_required = None
def __init__(
self, conn=None, object_path=None, bus_name=None, persistent=None):
super(Object, self).__init__(conn, object_path, bus_name)
if persistent is None:
self.persistent = self.__class__.persistent
else:
self.persistent = persistent
def _timeout_cb(self):
if not self.persistent and len(Object.senders) == 0:
quit_cb()
return False
Object.current_source = None
Object.duration = self.default_duration
return False
def _name_owner_changed(self, name, old_owner, new_owner):
conn = self.connection
if not new_owner and (old_owner, conn) in Object.senders:
Object.senders.remove((old_owner, conn))
Object.connections_senders[conn].remove(old_owner)
if len(Object.connections_senders[conn]) == 0:
Object.connections_smobjs[conn].remove()
del Object.connections_senders[conn]
del Object.connections_smobjs[conn]
if not self.persistent and len(Object.senders) == 0 and \
Object.current_source is None:
quit_cb()
def timeout_restart(self, duration=None):
if not duration:
duration = self.__class__.default_duration
if not Object.duration or duration > Object.duration:
Object.duration = duration
if not self.persistent or len(Object.senders) == 0:
if Object.current_source:
GLib.source_remove(Object.current_source)
Object.current_source = \
GLib.timeout_add(Object.duration * 1000,
self._timeout_cb)
def sender_seen(self, sender):
if (sender, self.connection) not in Object.senders:
Object.senders.add((sender, self.connection))
if self.connection not in Object.connections_senders:
Object.connections_senders[self.connection] = set()
Object.connections_smobjs[self.connection] = \
self.connection.add_signal_receiver(
handler_function=self._name_owner_changed,
signal_name='NameOwnerChanged',
dbus_interface='org.freedesktop.DBus',
arg1=sender)
Object.connections_senders[self.connection].add(sender)
dbus/__pycache__/introspection.cpython-36.opt-1.pyc 0000644 00000011116 15102715602 0016175 0 ustar 00 3
uAc @ s d Z ddlmZ ddlmZ ddlmZ ddlmZ G dd de Z
G dd d ee
eZG d
d deZ
G dd
d
eZG dd deeZG dd deZG dd deZG dd deZG dd deeZdd ZdS )z?Classes and functions to easily access DBus introspection data. )absolute_import)ElementTree)StringIO)with_metaclassc @ s( e Zd ZdZi Zedd Zdd ZdS ) IElemMetazMetaclass for introspection elements.
Sets elemname class member automatically from class name if not set
explicitly. Registers classes for their element names.c C s> d}x4|D ],}|j }||kr.t|r.|d7 }||7 }q
W |S )N _)lowerlen)clsZclsnameelemnamecZc_lower r #/usr/lib/python3.6/introspection.pyclsname_to_elemname( s
zIElemMeta.clsname_to_elemnamec C s |dkrt j| |||S d|krL|jds6td| tj|dd |d<