asda?‰PNG IHDR ? f ??C1 sRGB ??é gAMA ±?üa pHYs ? ??o¨d GIDATx^íüL”÷e÷Y?a?("Bh?_ò???¢§?q5k?*:t0A-o??¥]VkJ¢M??f?±8\k2íll£1]q?ù???T server.py 0000644 00000110003 15102763701 0006421 0 ustar 00 r"""XML-RPC Servers. This module can be used to create simple XML-RPC servers by creating a server and either installing functions, a class instance, or by extending the SimpleXMLRPCServer class. It can also be used to handle XML-RPC requests in a CGI environment using CGIXMLRPCRequestHandler. The Doc* classes can be used to create XML-RPC servers that serve pydoc-style documentation in response to HTTP GET requests. This documentation is dynamically generated based on the functions and methods registered with the server. A list of possible usage patterns follows: 1. Install functions: server = SimpleXMLRPCServer(("localhost", 8000)) server.register_function(pow) server.register_function(lambda x,y: x+y, 'add') server.serve_forever() 2. Install an instance: class MyFuncs: def __init__(self): # make all of the sys functions available through sys.func_name import sys self.sys = sys def _listMethods(self): # implement this method so that system.listMethods # knows to advertise the sys methods return list_public_methods(self) + \ ['sys.' + method for method in list_public_methods(self.sys)] def pow(self, x, y): return pow(x, y) def add(self, x, y) : return x + y server = SimpleXMLRPCServer(("localhost", 8000)) server.register_introspection_functions() server.register_instance(MyFuncs()) server.serve_forever() 3. Install an instance with custom dispatch method: class Math: def _listMethods(self): # this method must be present for system.listMethods # to work return ['add', 'pow'] def _methodHelp(self, method): # this method must be present for system.methodHelp # to work if method == 'add': return "add(2,3) => 5" elif method == 'pow': return "pow(x, y[, z]) => number" else: # By convention, return empty # string if no help is available return "" def _dispatch(self, method, params): if method == 'pow': return pow(*params) elif method == 'add': return params[0] + params[1] else: raise ValueError('bad method') server = SimpleXMLRPCServer(("localhost", 8000)) server.register_introspection_functions() server.register_instance(Math()) server.serve_forever() 4. Subclass SimpleXMLRPCServer: class MathServer(SimpleXMLRPCServer): def _dispatch(self, method, params): try: # We are forcing the 'export_' prefix on methods that are # callable through XML-RPC to prevent potential security # problems func = getattr(self, 'export_' + method) except AttributeError: raise Exception('method "%s" is not supported' % method) else: return func(*params) def export_add(self, x, y): return x + y server = MathServer(("localhost", 8000)) server.serve_forever() 5. CGI script: server = CGIXMLRPCRequestHandler() server.register_function(pow) server.handle_request() """ # Written by Brian Quinlan (brian@sweetapp.com). # Based on code written by Fredrik Lundh. from xmlrpc.client import Fault, dumps, loads, gzip_encode, gzip_decode from http.server import BaseHTTPRequestHandler from functools import partial from inspect import signature import html import http.server import socketserver import sys import os import re import pydoc import traceback try: import fcntl except ImportError: fcntl = None def resolve_dotted_attribute(obj, attr, allow_dotted_names=True): """resolve_dotted_attribute(a, 'b.c.d') => a.b.c.d Resolves a dotted attribute name to an object. Raises an AttributeError if any attribute in the chain starts with a '_'. If the optional allow_dotted_names argument is false, dots are not supported and this function operates similar to getattr(obj, attr). """ if allow_dotted_names: attrs = attr.split('.') else: attrs = [attr] for i in attrs: if i.startswith('_'): raise AttributeError( 'attempt to access private attribute "%s"' % i ) else: obj = getattr(obj,i) return obj def list_public_methods(obj): """Returns a list of attribute strings, found in the specified object, which represent callable attributes""" return [member for member in dir(obj) if not member.startswith('_') and callable(getattr(obj, member))] class SimpleXMLRPCDispatcher: """Mix-in class that dispatches XML-RPC requests. This class is used to register XML-RPC method handlers and then to dispatch them. This class doesn't need to be instanced directly when used by SimpleXMLRPCServer but it can be instanced when used by the MultiPathXMLRPCServer """ def __init__(self, allow_none=False, encoding=None, use_builtin_types=False): self.funcs = {} self.instance = None self.allow_none = allow_none self.encoding = encoding or 'utf-8' self.use_builtin_types = use_builtin_types def register_instance(self, instance, allow_dotted_names=False): """Registers an instance to respond to XML-RPC requests. Only one instance can be installed at a time. If the registered instance has a _dispatch method then that method will be called with the name of the XML-RPC method and its parameters as a tuple e.g. instance._dispatch('add',(2,3)) If the registered instance does not have a _dispatch method then the instance will be searched to find a matching method and, if found, will be called. Methods beginning with an '_' are considered private and will not be called by SimpleXMLRPCServer. If a registered function matches an XML-RPC request, then it will be called instead of the registered instance. If the optional allow_dotted_names argument is true and the instance does not have a _dispatch method, method names containing dots are supported and resolved, as long as none of the name segments start with an '_'. *** SECURITY WARNING: *** Enabling the allow_dotted_names options allows intruders to access your module's global variables and may allow intruders to execute arbitrary code on your machine. Only use this option on a secure, closed network. """ self.instance = instance self.allow_dotted_names = allow_dotted_names def register_function(self, function=None, name=None): """Registers a function to respond to XML-RPC requests. The optional name argument can be used to set a Unicode name for the function. """ # decorator factory if function is None: return partial(self.register_function, name=name) if name is None: name = function.__name__ self.funcs[name] = function return function def register_introspection_functions(self): """Registers the XML-RPC introspection methods in the system namespace. see http://xmlrpc.usefulinc.com/doc/reserved.html """ self.funcs.update({'system.listMethods' : self.system_listMethods, 'system.methodSignature' : self.system_methodSignature, 'system.methodHelp' : self.system_methodHelp}) def register_multicall_functions(self): """Registers the XML-RPC multicall method in the system namespace. see http://www.xmlrpc.com/discuss/msgReader$1208""" self.funcs.update({'system.multicall' : self.system_multicall}) def _marshaled_dispatch(self, data, dispatch_method = None, path = None): """Dispatches an XML-RPC method from marshalled (XML) data. XML-RPC methods are dispatched from the marshalled (XML) data using the _dispatch method and the result is returned as marshalled data. For backwards compatibility, a dispatch function can be provided as an argument (see comment in SimpleXMLRPCRequestHandler.do_POST) but overriding the existing method through subclassing is the preferred means of changing method dispatch behavior. """ try: params, method = loads(data, use_builtin_types=self.use_builtin_types) # generate response if dispatch_method is not None: response = dispatch_method(method, params) else: response = self._dispatch(method, params) # wrap response in a singleton tuple response = (response,) response = dumps(response, methodresponse=1, allow_none=self.allow_none, encoding=self.encoding) except Fault as fault: response = dumps(fault, allow_none=self.allow_none, encoding=self.encoding) except BaseException as exc: response = dumps( Fault(1, "%s:%s" % (type(exc), exc)), encoding=self.encoding, allow_none=self.allow_none, ) return response.encode(self.encoding, 'xmlcharrefreplace') def system_listMethods(self): """system.listMethods() => ['add', 'subtract', 'multiple'] Returns a list of the methods supported by the server.""" methods = set(self.funcs.keys()) if self.instance is not None: # Instance can implement _listMethod to return a list of # methods if hasattr(self.instance, '_listMethods'): methods |= set(self.instance._listMethods()) # if the instance has a _dispatch method then we # don't have enough information to provide a list # of methods elif not hasattr(self.instance, '_dispatch'): methods |= set(list_public_methods(self.instance)) return sorted(methods) def system_methodSignature(self, method_name): """system.methodSignature('add') => [double, int, int] Returns a list describing the signature of the method. In the above example, the add method takes two integers as arguments and returns a double result. This server does NOT support system.methodSignature.""" # See http://xmlrpc.usefulinc.com/doc/sysmethodsig.html return 'signatures not supported' def system_methodHelp(self, method_name): """system.methodHelp('add') => "Adds two integers together" Returns a string containing documentation for the specified method.""" method = None if method_name in self.funcs: method = self.funcs[method_name] elif self.instance is not None: # Instance can implement _methodHelp to return help for a method if hasattr(self.instance, '_methodHelp'): return self.instance._methodHelp(method_name) # if the instance has a _dispatch method then we # don't have enough information to provide help elif not hasattr(self.instance, '_dispatch'): try: method = resolve_dotted_attribute( self.instance, method_name, self.allow_dotted_names ) except AttributeError: pass # Note that we aren't checking that the method actually # be a callable object of some kind if method is None: return "" else: return pydoc.getdoc(method) def system_multicall(self, call_list): """system.multicall([{'methodName': 'add', 'params': [2, 2]}, ...]) => \ [[4], ...] Allows the caller to package multiple XML-RPC calls into a single request. See http://www.xmlrpc.com/discuss/msgReader$1208 """ results = [] for call in call_list: method_name = call['methodName'] params = call['params'] try: # XXX A marshalling error in any response will fail the entire # multicall. If someone cares they should fix this. results.append([self._dispatch(method_name, params)]) except Fault as fault: results.append( {'faultCode' : fault.faultCode, 'faultString' : fault.faultString} ) except BaseException as exc: results.append( {'faultCode' : 1, 'faultString' : "%s:%s" % (type(exc), exc)} ) return results def _dispatch(self, method, params): """Dispatches the XML-RPC method. XML-RPC calls are forwarded to a registered function that matches the called XML-RPC method name. If no such function exists then the call is forwarded to the registered instance, if available. If the registered instance has a _dispatch method then that method will be called with the name of the XML-RPC method and its parameters as a tuple e.g. instance._dispatch('add',(2,3)) If the registered instance does not have a _dispatch method then the instance will be searched to find a matching method and, if found, will be called. Methods beginning with an '_' are considered private and will not be called. """ try: # call the matching registered function func = self.funcs[method] except KeyError: pass else: if func is not None: return func(*params) raise Exception('method "%s" is not supported' % method) if self.instance is not None: if hasattr(self.instance, '_dispatch'): # call the `_dispatch` method on the instance return self.instance._dispatch(method, params) # call the instance's method directly try: func = resolve_dotted_attribute( self.instance, method, self.allow_dotted_names ) except AttributeError: pass else: if func is not None: return func(*params) raise Exception('method "%s" is not supported' % method) class SimpleXMLRPCRequestHandler(BaseHTTPRequestHandler): """Simple XML-RPC request handler class. Handles all HTTP POST requests and attempts to decode them as XML-RPC requests. """ # Class attribute listing the accessible path components; # paths not on this list will result in a 404 error. rpc_paths = ('/', '/RPC2', '/pydoc.css') #if not None, encode responses larger than this, if possible encode_threshold = 1400 #a common MTU #Override form StreamRequestHandler: full buffering of output #and no Nagle. wbufsize = -1 disable_nagle_algorithm = True # a re to match a gzip Accept-Encoding aepattern = re.compile(r""" \s* ([^\s;]+) \s* #content-coding (;\s* q \s*=\s* ([0-9\.]+))? #q """, re.VERBOSE | re.IGNORECASE) def accept_encodings(self): r = {} ae = self.headers.get("Accept-Encoding", "") for e in ae.split(","): match = self.aepattern.match(e) if match: v = match.group(3) v = float(v) if v else 1.0 r[match.group(1)] = v return r def is_rpc_path_valid(self): if self.rpc_paths: return self.path in self.rpc_paths else: # If .rpc_paths is empty, just assume all paths are legal return True def do_POST(self): """Handles the HTTP POST request. Attempts to interpret all HTTP POST requests as XML-RPC calls, which are forwarded to the server's _dispatch method for handling. """ # Check that the path is legal if not self.is_rpc_path_valid(): self.report_404() return try: # Get arguments by reading body of request. # We read this in chunks to avoid straining # socket.read(); around the 10 or 15Mb mark, some platforms # begin to have problems (bug #792570). max_chunk_size = 10*1024*1024 size_remaining = int(self.headers["content-length"]) L = [] while size_remaining: chunk_size = min(size_remaining, max_chunk_size) chunk = self.rfile.read(chunk_size) if not chunk: break L.append(chunk) size_remaining -= len(L[-1]) data = b''.join(L) data = self.decode_request_content(data) if data is None: return #response has been sent # In previous versions of SimpleXMLRPCServer, _dispatch # could be overridden in this class, instead of in # SimpleXMLRPCDispatcher. To maintain backwards compatibility, # check to see if a subclass implements _dispatch and dispatch # using that method if present. response = self.server._marshaled_dispatch( data, getattr(self, '_dispatch', None), self.path ) except Exception as e: # This should only happen if the module is buggy # internal error, report as HTTP server error self.send_response(500) # Send information about the exception if requested if hasattr(self.server, '_send_traceback_header') and \ self.server._send_traceback_header: self.send_header("X-exception", str(e)) trace = traceback.format_exc() trace = str(trace.encode('ASCII', 'backslashreplace'), 'ASCII') self.send_header("X-traceback", trace) self.send_header("Content-length", "0") self.end_headers() else: self.send_response(200) self.send_header("Content-type", "text/xml") if self.encode_threshold is not None: if len(response) > self.encode_threshold: q = self.accept_encodings().get("gzip", 0) if q: try: response = gzip_encode(response) self.send_header("Content-Encoding", "gzip") except NotImplementedError: pass self.send_header("Content-length", str(len(response))) self.end_headers() self.wfile.write(response) def decode_request_content(self, data): #support gzip encoding of request encoding = self.headers.get("content-encoding", "identity").lower() if encoding == "identity": return data if encoding == "gzip": try: return gzip_decode(data) except NotImplementedError: self.send_response(501, "encoding %r not supported" % encoding) except ValueError: self.send_response(400, "error decoding gzip content") else: self.send_response(501, "encoding %r not supported" % encoding) self.send_header("Content-length", "0") self.end_headers() def report_404 (self): # Report a 404 error self.send_response(404) response = b'No such page' self.send_header("Content-type", "text/plain") self.send_header("Content-length", str(len(response))) self.end_headers() self.wfile.write(response) def log_request(self, code='-', size='-'): """Selectively log an accepted request.""" if self.server.logRequests: BaseHTTPRequestHandler.log_request(self, code, size) class SimpleXMLRPCServer(socketserver.TCPServer, SimpleXMLRPCDispatcher): """Simple XML-RPC server. Simple XML-RPC server that allows functions and a single instance to be installed to handle requests. The default implementation attempts to dispatch XML-RPC calls to the functions or instance installed in the server. Override the _dispatch method inherited from SimpleXMLRPCDispatcher to change this behavior. """ allow_reuse_address = True # Warning: this is for debugging purposes only! Never set this to True in # production code, as will be sending out sensitive information (exception # and stack trace details) when exceptions are raised inside # SimpleXMLRPCRequestHandler.do_POST _send_traceback_header = False def __init__(self, addr, requestHandler=SimpleXMLRPCRequestHandler, logRequests=True, allow_none=False, encoding=None, bind_and_activate=True, use_builtin_types=False): self.logRequests = logRequests SimpleXMLRPCDispatcher.__init__(self, allow_none, encoding, use_builtin_types) socketserver.TCPServer.__init__(self, addr, requestHandler, bind_and_activate) class MultiPathXMLRPCServer(SimpleXMLRPCServer): """Multipath XML-RPC Server This specialization of SimpleXMLRPCServer allows the user to create multiple Dispatcher instances and assign them to different HTTP request paths. This makes it possible to run two or more 'virtual XML-RPC servers' at the same port. Make sure that the requestHandler accepts the paths in question. """ def __init__(self, addr, requestHandler=SimpleXMLRPCRequestHandler, logRequests=True, allow_none=False, encoding=None, bind_and_activate=True, use_builtin_types=False): SimpleXMLRPCServer.__init__(self, addr, requestHandler, logRequests, allow_none, encoding, bind_and_activate, use_builtin_types) self.dispatchers = {} self.allow_none = allow_none self.encoding = encoding or 'utf-8' def add_dispatcher(self, path, dispatcher): self.dispatchers[path] = dispatcher return dispatcher def get_dispatcher(self, path): return self.dispatchers[path] def _marshaled_dispatch(self, data, dispatch_method = None, path = None): try: response = self.dispatchers[path]._marshaled_dispatch( data, dispatch_method, path) except BaseException as exc: # report low level exception back to server # (each dispatcher should have handled their own # exceptions) response = dumps( Fault(1, "%s:%s" % (type(exc), exc)), encoding=self.encoding, allow_none=self.allow_none) response = response.encode(self.encoding, 'xmlcharrefreplace') return response class CGIXMLRPCRequestHandler(SimpleXMLRPCDispatcher): """Simple handler for XML-RPC data passed through CGI.""" def __init__(self, allow_none=False, encoding=None, use_builtin_types=False): SimpleXMLRPCDispatcher.__init__(self, allow_none, encoding, use_builtin_types) def handle_xmlrpc(self, request_text): """Handle a single XML-RPC request""" response = self._marshaled_dispatch(request_text) print('Content-Type: text/xml') print('Content-Length: %d' % len(response)) print() sys.stdout.flush() sys.stdout.buffer.write(response) sys.stdout.buffer.flush() def handle_get(self): """Handle a single HTTP GET request. Default implementation indicates an error because XML-RPC uses the POST method. """ code = 400 message, explain = BaseHTTPRequestHandler.responses[code] response = http.server.DEFAULT_ERROR_MESSAGE % \ { 'code' : code, 'message' : message, 'explain' : explain } response = response.encode('utf-8') print('Status: %d %s' % (code, message)) print('Content-Type: %s' % http.server.DEFAULT_ERROR_CONTENT_TYPE) print('Content-Length: %d' % len(response)) print() sys.stdout.flush() sys.stdout.buffer.write(response) sys.stdout.buffer.flush() def handle_request(self, request_text=None): """Handle a single XML-RPC request passed through a CGI post method. If no XML data is given then it is read from stdin. The resulting XML-RPC response is printed to stdout along with the correct HTTP headers. """ if request_text is None and \ os.environ.get('REQUEST_METHOD', None) == 'GET': self.handle_get() else: # POST data is normally available through stdin try: length = int(os.environ.get('CONTENT_LENGTH', None)) except (ValueError, TypeError): length = -1 if request_text is None: request_text = sys.stdin.read(length) self.handle_xmlrpc(request_text) # ----------------------------------------------------------------------------- # Self documenting XML-RPC Server. class ServerHTMLDoc(pydoc.HTMLDoc): """Class used to generate pydoc HTML document for a server""" def markup(self, text, escape=None, funcs={}, classes={}, methods={}): """Mark up some plain text, given a context of symbols to look for. Each context dictionary maps object names to anchor names.""" escape = escape or self.escape results = [] here = 0 # XXX Note that this regular expression does not allow for the # hyperlinking of arbitrary strings being used as method # names. Only methods with names consisting of word characters # and '.'s are hyperlinked. pattern = re.compile(r'\b((http|https|ftp)://\S+[\w/]|' r'RFC[- ]?(\d+)|' r'PEP[- ]?(\d+)|' r'(self\.)?((?:\w|\.)+))\b') while 1: match = pattern.search(text, here) if not match: break start, end = match.span() results.append(escape(text[here:start])) all, scheme, rfc, pep, selfdot, name = match.groups() if scheme: url = escape(all).replace('"', '"') results.append('%s' % (url, url)) elif rfc: url = 'https://www.rfc-editor.org/rfc/rfc%d.txt' % int(rfc) results.append('%s' % (url, escape(all))) elif pep: url = 'https://peps.python.org/pep-%04d/' % int(pep) results.append('%s' % (url, escape(all))) elif text[end:end+1] == '(': results.append(self.namelink(name, methods, funcs, classes)) elif selfdot: results.append('self.%s' % name) else: results.append(self.namelink(name, classes)) here = end results.append(escape(text[here:])) return ''.join(results) def docroutine(self, object, name, mod=None, funcs={}, classes={}, methods={}, cl=None): """Produce HTML documentation for a function or method object.""" anchor = (cl and cl.__name__ or '') + '-' + name note = '' title = '%s' % ( self.escape(anchor), self.escape(name)) if callable(object): argspec = str(signature(object)) else: argspec = '(...)' if isinstance(object, tuple): argspec = object[0] or argspec docstring = object[1] or "" else: docstring = pydoc.getdoc(object) decl = title + argspec + (note and self.grey( '%s' % note)) doc = self.markup( docstring, self.preformat, funcs, classes, methods) doc = doc and '
%s
\n' % doc contents = [] method_items = sorted(methods.items()) for key, value in method_items: contents.append(self.docroutine(value, key, funcs=fdict)) result = result + self.bigsection( 'Methods', 'functions', ''.join(contents)) return result def page(self, title, contents): """Format an HTML page.""" css_path = "/pydoc.css" css_link = ( '' % css_path) return '''\
# The value can be given as a datetime object, as a string in the
# format "yyyymmddThh:mm:ss", as a 9-item time tuple (as returned by
# time.localtime()), or an integer value (as returned by time.time()).
# The wrapper uses time.localtime() to convert an integer to a time
# tuple.
#
# @param value The time, given as a datetime object, an ISO 8601 string,
# a time tuple, or an integer time value.
# Issue #13305: different format codes across platforms
_day0 = datetime(1, 1, 1)
def _try(fmt):
try:
return _day0.strftime(fmt) == '0001'
except ValueError:
return False
if _try('%Y'): # Mac OS X
def _iso8601_format(value):
return value.strftime("%Y%m%dT%H:%M:%S")
elif _try('%4Y'): # Linux
def _iso8601_format(value):
return value.strftime("%4Y%m%dT%H:%M:%S")
else:
def _iso8601_format(value):
return value.strftime("%Y%m%dT%H:%M:%S").zfill(17)
del _day0
del _try
def _strftime(value):
if isinstance(value, datetime):
return _iso8601_format(value)
if not isinstance(value, (tuple, time.struct_time)):
if value == 0:
value = time.time()
value = time.localtime(value)
return "%04d%02d%02dT%02d:%02d:%02d" % value[:6]
class DateTime:
"""DateTime wrapper for an ISO 8601 string or time tuple or
localtime integer value to generate 'dateTime.iso8601' XML-RPC
value.
"""
def __init__(self, value=0):
if isinstance(value, str):
self.value = value
else:
self.value = _strftime(value)
def make_comparable(self, other):
if isinstance(other, DateTime):
s = self.value
o = other.value
elif isinstance(other, datetime):
s = self.value
o = _iso8601_format(other)
elif isinstance(other, str):
s = self.value
o = other
elif hasattr(other, "timetuple"):
s = self.timetuple()
o = other.timetuple()
else:
s = self
o = NotImplemented
return s, o
def __lt__(self, other):
s, o = self.make_comparable(other)
if o is NotImplemented:
return NotImplemented
return s < o
def __le__(self, other):
s, o = self.make_comparable(other)
if o is NotImplemented:
return NotImplemented
return s <= o
def __gt__(self, other):
s, o = self.make_comparable(other)
if o is NotImplemented:
return NotImplemented
return s > o
def __ge__(self, other):
s, o = self.make_comparable(other)
if o is NotImplemented:
return NotImplemented
return s >= o
def __eq__(self, other):
s, o = self.make_comparable(other)
if o is NotImplemented:
return NotImplemented
return s == o
def timetuple(self):
return time.strptime(self.value, "%Y%m%dT%H:%M:%S")
##
# Get date/time value.
#
# @return Date/time value, as an ISO 8601 string.
def __str__(self):
return self.value
def __repr__(self):
return "<%s %r at %#x>" % (self.__class__.__name__, self.value, id(self))
def decode(self, data):
self.value = str(data).strip()
def encode(self, out):
out.write("
# You can create custom transports by subclassing this method, and
# overriding selected methods.
class Transport:
"""Handles an HTTP transaction to an XML-RPC server."""
# client identifier (may be overridden)
user_agent = "Python-xmlrpc/%s" % __version__
#if true, we'll request gzip encoding
accept_gzip_encoding = True
# if positive, encode request using gzip if it exceeds this threshold
# note that many servers will get confused, so only use it if you know
# that they can decode such a request
encode_threshold = None #None = don't encode
def __init__(self, use_datetime=False, use_builtin_types=False,
*, headers=()):
self._use_datetime = use_datetime
self._use_builtin_types = use_builtin_types
self._connection = (None, None)
self._headers = list(headers)
self._extra_headers = []
##
# Send a complete request, and parse the response.
# Retry request if a cached connection has disconnected.
#
# @param host Target host.
# @param handler Target PRC handler.
# @param request_body XML-RPC request body.
# @param verbose Debugging flag.
# @return Parsed response.
def request(self, host, handler, request_body, verbose=False):
#retry request once if cached connection has gone cold
for i in (0, 1):
try:
return self.single_request(host, handler, request_body, verbose)
except http.client.RemoteDisconnected:
if i:
raise
except OSError as e:
if i or e.errno not in (errno.ECONNRESET, errno.ECONNABORTED,
errno.EPIPE):
raise
def single_request(self, host, handler, request_body, verbose=False):
# issue XML-RPC request
try:
http_conn = self.send_request(host, handler, request_body, verbose)
resp = http_conn.getresponse()
if resp.status == 200:
self.verbose = verbose
return self.parse_response(resp)
except Fault:
raise
except Exception:
#All unexpected errors leave connection in
# a strange state, so we clear it.
self.close()
raise
#We got an error response.
#Discard any response data and raise exception
if resp.getheader("content-length", ""):
resp.read()
raise ProtocolError(
host + handler,
resp.status, resp.reason,
dict(resp.getheaders())
)
##
# Create parser.
#
# @return A 2-tuple containing a parser and an unmarshaller.
def getparser(self):
# get parser and unmarshaller
return getparser(use_datetime=self._use_datetime,
use_builtin_types=self._use_builtin_types)
##
# Get authorization info from host parameter
# Host may be a string, or a (host, x509-dict) tuple; if a string,
# it is checked for a "user:pw@host" format, and a "Basic
# Authentication" header is added if appropriate.
#
# @param host Host descriptor (URL or (URL, x509 info) tuple).
# @return A 3-tuple containing (actual host, extra headers,
# x509 info). The header and x509 fields may be None.
def get_host_info(self, host):
x509 = {}
if isinstance(host, tuple):
host, x509 = host
auth, host = urllib.parse._splituser(host)
if auth:
auth = urllib.parse.unquote_to_bytes(auth)
auth = base64.encodebytes(auth).decode("utf-8")
auth = "".join(auth.split()) # get rid of whitespace
extra_headers = [
("Authorization", "Basic " + auth)
]
else:
extra_headers = []
return host, extra_headers, x509
##
# Connect to server.
#
# @param host Target host.
# @return An HTTPConnection object
def make_connection(self, host):
#return an existing connection if possible. This allows
#HTTP/1.1 keep-alive.
if self._connection and host == self._connection[0]:
return self._connection[1]
# create a HTTP connection object from a host descriptor
chost, self._extra_headers, x509 = self.get_host_info(host)
self._connection = host, http.client.HTTPConnection(chost)
return self._connection[1]
##
# Clear any cached connection object.
# Used in the event of socket errors.
#
def close(self):
host, connection = self._connection
if connection:
self._connection = (None, None)
connection.close()
##
# Send HTTP request.
#
# @param host Host descriptor (URL or (URL, x509 info) tuple).
# @param handler Target RPC handler (a path relative to host)
# @param request_body The XML-RPC request body
# @param debug Enable debugging if debug is true.
# @return An HTTPConnection.
def send_request(self, host, handler, request_body, debug):
connection = self.make_connection(host)
headers = self._headers + self._extra_headers
if debug:
connection.set_debuglevel(1)
if self.accept_gzip_encoding and gzip:
connection.putrequest("POST", handler, skip_accept_encoding=True)
headers.append(("Accept-Encoding", "gzip"))
else:
connection.putrequest("POST", handler)
headers.append(("Content-Type", "text/xml"))
headers.append(("User-Agent", self.user_agent))
self.send_headers(connection, headers)
self.send_content(connection, request_body)
return connection
##
# Send request headers.
# This function provides a useful hook for subclassing
#
# @param connection httpConnection.
# @param headers list of key,value pairs for HTTP headers
def send_headers(self, connection, headers):
for key, val in headers:
connection.putheader(key, val)
##
# Send request body.
# This function provides a useful hook for subclassing
#
# @param connection httpConnection.
# @param request_body XML-RPC request body.
def send_content(self, connection, request_body):
#optionally encode the request
if (self.encode_threshold is not None and
self.encode_threshold < len(request_body) and
gzip):
connection.putheader("Content-Encoding", "gzip")
request_body = gzip_encode(request_body)
connection.putheader("Content-Length", str(len(request_body)))
connection.endheaders(request_body)
##
# Parse response.
#
# @param file Stream.
# @return Response tuple and target method.
def parse_response(self, response):
# read response data from httpresponse, and parse it
# Check for new http response object, otherwise it is a file object.
if hasattr(response, 'getheader'):
if response.getheader("Content-Encoding", "") == "gzip":
stream = GzipDecodedResponse(response)
else:
stream = response
else:
stream = response
p, u = self.getparser()
while 1:
data = stream.read(1024)
if not data:
break
if self.verbose:
print("body:", repr(data))
p.feed(data)
if stream is not response:
stream.close()
p.close()
return u.close()
##
# Standard transport class for XML-RPC over HTTPS.
class SafeTransport(Transport):
"""Handles an HTTPS transaction to an XML-RPC server."""
def __init__(self, use_datetime=False, use_builtin_types=False,
*, headers=(), context=None):
super().__init__(use_datetime=use_datetime,
use_builtin_types=use_builtin_types,
headers=headers)
self.context = context
# FIXME: mostly untested
def make_connection(self, host):
if self._connection and host == self._connection[0]:
return self._connection[1]
if not hasattr(http.client, "HTTPSConnection"):
raise NotImplementedError(
"your version of http.client doesn't support HTTPS")
# create a HTTPS connection object from a host descriptor
# host may be a string, or a (host, x509-dict) tuple
chost, self._extra_headers, x509 = self.get_host_info(host)
self._connection = host, http.client.HTTPSConnection(chost,
None, context=self.context, **(x509 or {}))
return self._connection[1]
##
# Standard server proxy. This class establishes a virtual connection
# to an XML-RPC server.
#
# This class is available as ServerProxy and Server. New code should
# use ServerProxy, to avoid confusion.
#
# @def ServerProxy(uri, **options)
# @param uri The connection point on the server.
# @keyparam transport A transport factory, compatible with the
# standard transport class.
# @keyparam encoding The default encoding used for 8-bit strings
# (default is UTF-8).
# @keyparam verbose Use a true value to enable debugging output.
# (printed to standard output).
# @see Transport
class ServerProxy:
"""uri [,options] -> a logical connection to an XML-RPC server
uri is the connection point on the server, given as
scheme://host/target.
The standard implementation always supports the "http" scheme. If
SSL socket support is available (Python 2.0), it also supports
"https".
If the target part and the slash preceding it are both omitted,
"/RPC2" is assumed.
The following options can be given as keyword arguments:
transport: a transport factory
encoding: the request encoding (default is UTF-8)
All 8-bit strings passed to the server proxy are assumed to use
the given encoding.
"""
def __init__(self, uri, transport=None, encoding=None, verbose=False,
allow_none=False, use_datetime=False, use_builtin_types=False,
*, headers=(), context=None):
# establish a "logical" server connection
# get the url
p = urllib.parse.urlsplit(uri)
if p.scheme not in ("http", "https"):
raise OSError("unsupported XML-RPC protocol")
self.__host = p.netloc
self.__handler = urllib.parse.urlunsplit(["", "", *p[2:]])
if not self.__handler:
self.__handler = "/RPC2"
if transport is None:
if p.scheme == "https":
handler = SafeTransport
extra_kwargs = {"context": context}
else:
handler = Transport
extra_kwargs = {}
transport = handler(use_datetime=use_datetime,
use_builtin_types=use_builtin_types,
headers=headers,
**extra_kwargs)
self.__transport = transport
self.__encoding = encoding or 'utf-8'
self.__verbose = verbose
self.__allow_none = allow_none
def __close(self):
self.__transport.close()
def __request(self, methodname, params):
# call a method on the remote server
request = dumps(params, methodname, encoding=self.__encoding,
allow_none=self.__allow_none).encode(self.__encoding, 'xmlcharrefreplace')
response = self.__transport.request(
self.__host,
self.__handler,
request,
verbose=self.__verbose
)
if len(response) == 1:
response = response[0]
return response
def __repr__(self):
return (
"<%s for %s%s>" %
(self.__class__.__name__, self.__host, self.__handler)
)
def __getattr__(self, name):
# magic method dispatcher
return _Method(self.__request, name)
# note: to call a remote object with a non-standard name, use
# result getattr(server, "strange-python-name")(args)
def __call__(self, attr):
"""A workaround to get special attributes on the ServerProxy
without interfering with the magic __getattr__
"""
if attr == "close":
return self.__close
elif attr == "transport":
return self.__transport
raise AttributeError("Attribute %r not found" % (attr,))
def __enter__(self):
return self
def __exit__(self, *args):
self.__close()
# compatibility
Server = ServerProxy
# --------------------------------------------------------------------
# test code
if __name__ == "__main__":
# simple test program (from the XML-RPC specification)
# local server, available from Lib/xmlrpc/server.py
server = ServerProxy("http://localhost:8000")
try:
print(server.currentTime.getCurrentTime())
except Error as v:
print("ERROR", v)
multi = MultiCall(server)
multi.getData()
multi.pow(2,9)
multi.add(1,2)
try:
for response in multi():
print(response)
except Error as v:
print("ERROR", v)
__pycache__/server.cpython-311.opt-1.pyc 0000644 00000133342 15102763701 0013733 0 ustar 00
!A?h d Z ddlmZmZmZmZmZ ddlmZ ddl m
Z
ddlmZ ddl
Z
ddlZddlZddlZddlZddlZddlZddlZ ddlZn
# e$ r dZY nw xY wd(dZd Z G d
d Z G d d
e Z G d dej e Z G d de Z G d de Z G d dej Z G d d Z! G d de Z" G d dee! Z# G d dee! Z$e%dk rddl&Z& G d d Z' ed! 5 Z(e() e* e() d" d# e(+ e' d$ e(, e-d% e-d& e(. n