diff options
Diffstat (limited to 'klippy/webhooks.py')
-rw-r--r-- | klippy/webhooks.py | 194 |
1 files changed, 116 insertions, 78 deletions
diff --git a/klippy/webhooks.py b/klippy/webhooks.py index 9e17177a..1a471a93 100644 --- a/klippy/webhooks.py +++ b/klippy/webhooks.py @@ -19,47 +19,58 @@ except ImportError: # json_loads_byteify = None if sys.version_info.major < 3: + def json_loads_byteify(data, ignore_dicts=False): if isinstance(data, unicode): - return data.encode('utf-8') + return data.encode("utf-8") if isinstance(data, list): return [json_loads_byteify(i, True) for i in data] if isinstance(data, dict) and not ignore_dicts: - return {json_loads_byteify(k, True): json_loads_byteify(v, True) - for k, v in data.items()} + return { + json_loads_byteify(k, True): json_loads_byteify(v, True) + for k, v in data.items() + } return data + def json_dumps(obj): - return json.dumps(obj, separators=(',', ':')).encode() + return json.dumps(obj, separators=(",", ":")).encode() + def json_loads(data): return json.loads(data, object_hook=json_loads_byteify) + else: json_dumps = msgspec.json.encode json_loads = msgspec.json.decode REQUEST_LOG_SIZE = 20 + class WebRequestError(gcode.CommandError): - def __init__(self, message,): + def __init__( + self, + message, + ): Exception.__init__(self, message) def to_dict(self): - return { - 'error': 'WebRequestError', - 'message': str(self)} + return {"error": "WebRequestError", "message": str(self)} + class Sentinel: pass + class WebRequest: error = WebRequestError + def __init__(self, client_conn, request): self.client_conn = client_conn base_request = json_loads(request) if type(base_request) != dict: raise ValueError("Not a top-level dictionary") - self.id = base_request.get('id', None) - self.method = base_request.get('method') - self.params = base_request.get('params', {}) + self.id = base_request.get("id", None) + self.method = base_request.get("method") + self.params = base_request.get("params", {}) if type(self.method) != str or type(self.params) != dict: raise ValueError("Invalid request type") self.response = None @@ -72,8 +83,7 @@ class WebRequest: value = self.params.get(item, default) if value is Sentinel: raise WebRequestError("Missing Argument [%s]" % (item,)) - if (types is not None and type(value) not in types - and item in self.params): + if types is not None and type(value) not in types and item in self.params: raise WebRequestError("Invalid Argument Type [%s]" % (item,)) return value @@ -113,6 +123,7 @@ class WebRequest: self.response = {} return {"id": self.id, rtype: self.response} + class ServerSocket: def __init__(self, webhooks, printer): self.printer = printer @@ -121,8 +132,8 @@ class ServerSocket: self.sock = self.fd_handle = None self.clients = {} start_args = printer.get_start_args() - server_address = start_args.get('apiserver') - is_fileinput = (start_args.get('debuginput') is not None) + server_address = start_args.get("apiserver") + is_fileinput = start_args.get("debuginput") is not None if not server_address or is_fileinput: # Do not enable server return @@ -132,11 +143,10 @@ class ServerSocket: self.sock.bind(server_address) self.sock.listen(1) self.fd_handle = self.reactor.register_fd( - self.sock.fileno(), self._handle_accept) - printer.register_event_handler( - 'klippy:disconnect', self._handle_disconnect) - printer.register_event_handler( - "klippy:shutdown", self._handle_shutdown) + self.sock.fileno(), self._handle_accept + ) + printer.register_event_handler("klippy:disconnect", self._handle_disconnect) + printer.register_event_handler("klippy:shutdown", self._handle_shutdown) def _handle_accept(self, eventtime): try: @@ -167,8 +177,8 @@ class ServerSocket: except OSError: if os.path.exists(file_path): logging.exception( - "webhooks: Unable to delete socket file '%s'" - % (file_path)) + "webhooks: Unable to delete socket file '%s'" % (file_path) + ) raise def pop_client(self, client_id): @@ -184,6 +194,7 @@ class ServerSocket: client.close() return False, "" + class ClientConnection: def __init__(self, server, sock): self.printer = server.printer @@ -193,7 +204,8 @@ class ClientConnection: self.uid = id(self) self.sock = sock self.fd_handle = self.reactor.register_fd( - self.sock.fileno(), self.process_received, self._do_send) + self.sock.fileno(), self.process_received, self._do_send + ) self.partial_data = self.send_buffer = b"" self.is_blocking = False self.blocking_count = 0 @@ -202,8 +214,13 @@ class ClientConnection: def dump_request_log(self): out = [] - out.append("Dumping %d requests for client %d" - % (len(self.request_log), self.uid,)) + out.append( + "Dumping %d requests for client %d" + % ( + len(self.request_log), + self.uid, + ) + ) for eventtime, request in self.request_log: out.append("Received %f: %s" % (eventtime, request)) logging.info("\n".join(out)) @@ -248,7 +265,7 @@ class ClientConnection: # Socket Closed self.close() return - requests = data.split(b'\x03') + requests = data.split(b"\x03") requests[0] = self.partial_data + requests[0] self.partial_data = requests.pop() for req in requests: @@ -256,11 +273,11 @@ class ClientConnection: try: web_request = WebRequest(self, req) except Exception: - logging.exception("webhooks: Error decoding Server Request %s" - % (req)) + logging.exception("webhooks: Error decoding Server Request %s" % (req)) continue self.reactor.register_callback( - lambda e, s=self, wr=web_request: s._process_request(wr)) + lambda e, s=self, wr=web_request: s._process_request(wr) + ) def _process_request(self, web_request): try: @@ -269,8 +286,7 @@ class ClientConnection: except self.printer.command_error as e: web_request.set_error(WebRequestError(str(e))) except Exception as e: - msg = ("Internal Error on WebRequest: %s" - % (web_request.get_method())) + msg = "Internal Error on WebRequest: %s" % (web_request.get_method()) logging.exception(msg) web_request.set_error(WebRequestError(str(e))) self.printer.invoke_shutdown(msg) @@ -284,7 +300,7 @@ class ClientConnection: jmsg = json_dumps(data) self.send_buffer += jmsg + b"\x03" except (TypeError, ValueError) as e: - msg = ("json encoding error: %s" % (str(e),)) + msg = "json encoding error: %s" % (str(e),) logging.exception(msg) self.printer.invoke_shutdown(msg) return @@ -312,6 +328,7 @@ class ClientConnection: self.is_blocking = False self.send_buffer = self.send_buffer[sent:] + class WebHooks: def __init__(self, printer): self.printer = printer @@ -320,8 +337,7 @@ class WebHooks: self._mux_endpoints = {} self.register_endpoint("info", self._handle_info_request) self.register_endpoint("emergency_stop", self._handle_estop_request) - self.register_endpoint("register_remote_method", - self._handle_rpc_registration) + self.register_endpoint("register_remote_method", self._handle_rpc_registration) self.sconn = ServerSocket(self, printer) def register_endpoint(self, path, callback): @@ -338,11 +354,13 @@ class WebHooks: if prev_key != key: raise self.printer.config_error( "mux endpoint %s %s %s may have only one key (%s)" - % (path, key, value, prev_key)) + % (path, key, value, prev_key) + ) if value in prev_values: raise self.printer.config_error( "mux endpoint %s %s %s already registered (%s)" - % (path, key, value, prev_values)) + % (path, key, value, prev_values) + ) prev_values[value] = callback def _handle_mux(self, web_request): @@ -352,30 +370,33 @@ class WebHooks: else: key_param = web_request.get(key) if key_param not in values: - raise web_request.error("The value '%s' is not valid for %s" - % (key_param, key)) + raise web_request.error( + "The value '%s' is not valid for %s" % (key_param, key) + ) values[key_param](web_request) def _handle_list_endpoints(self, web_request): - web_request.send({'endpoints': list(self._endpoints.keys())}) + web_request.send({"endpoints": list(self._endpoints.keys())}) def _handle_info_request(self, web_request): - client_info = web_request.get_dict('client_info', None) + client_info = web_request.get_dict("client_info", None) if client_info is not None: web_request.get_client_connection().set_client_info(client_info) state_message, state = self.printer.get_state_message() src_path = os.path.dirname(__file__) klipper_path = os.path.normpath(os.path.join(src_path, "..")) - response = {'state': state, - 'state_message': state_message, - 'hostname': socket.gethostname(), - 'klipper_path': klipper_path, - 'python_path': sys.executable, - 'process_id': os.getpid(), - 'user_id': os.getuid(), - 'group_id': os.getgid()} + response = { + "state": state, + "state_message": state_message, + "hostname": socket.gethostname(), + "klipper_path": klipper_path, + "python_path": sys.executable, + "process_id": os.getpid(), + "user_id": os.getuid(), + "group_id": os.getgid(), + } start_args = self.printer.get_start_args() - for sa in ['log_file', 'config_file', 'software_version', 'cpu_info']: + for sa in ["log_file", "config_file", "software_version", "cpu_info"]: response[sa] = start_args.get(sa) web_request.send(response) @@ -383,11 +404,13 @@ class WebHooks: self.printer.invoke_shutdown("Shutdown due to webhooks request") def _handle_rpc_registration(self, web_request): - template = web_request.get_dict('response_template') - method = web_request.get_str('remote_method') + template = web_request.get_dict("response_template") + method = web_request.get_str("remote_method") new_conn = web_request.get_client_connection() - logging.info("webhooks: registering remote method '%s' " - "for connection id: %d" % (method, id(new_conn))) + logging.info( + "webhooks: registering remote method '%s' " + "for connection id: %d" % (method, id(new_conn)) + ) self._remote_methods.setdefault(method, {})[new_conn] = template def get_connection(self): @@ -403,7 +426,7 @@ class WebHooks: def get_status(self, eventtime): state_message, state = self.printer.get_state_message() - return {'state': state, 'state_message': state_message} + return {"state": state, "state_message": state_message} def stats(self, eventtime): return self.sconn.stats(eventtime) @@ -411,21 +434,24 @@ class WebHooks: def call_remote_method(self, method, **kwargs): if method not in self._remote_methods: raise self.printer.command_error( - "Remote method '%s' not registered" % (method)) + "Remote method '%s' not registered" % (method) + ) conn_map = self._remote_methods[method] valid_conns = {} for conn, template in conn_map.items(): if not conn.is_closed(): valid_conns[conn] = template - out = {'params': kwargs} + out = {"params": kwargs} out.update(template) conn.send(out) if not valid_conns: del self._remote_methods[method] raise self.printer.command_error( - "No active connections for method '%s'" % (method)) + "No active connections for method '%s'" % (method) + ) self._remote_methods[method] = valid_conns + class GCodeHelper: def __init__(self, printer): self.printer = printer @@ -434,39 +460,45 @@ class GCodeHelper: self.is_output_registered = False self.clients = {} # Register webhooks - wh = printer.lookup_object('webhooks') + wh = printer.lookup_object("webhooks") wh.register_endpoint("gcode/help", self._handle_help) wh.register_endpoint("gcode/script", self._handle_script) wh.register_endpoint("gcode/restart", self._handle_restart) - wh.register_endpoint("gcode/firmware_restart", - self._handle_firmware_restart) - wh.register_endpoint("gcode/subscribe_output", - self._handle_subscribe_output) + wh.register_endpoint("gcode/firmware_restart", self._handle_firmware_restart) + wh.register_endpoint("gcode/subscribe_output", self._handle_subscribe_output) + def _handle_help(self, web_request): web_request.send(self.gcode.get_command_help()) + def _handle_script(self, web_request): - self.gcode.run_script(web_request.get_str('script')) + self.gcode.run_script(web_request.get_str("script")) + def _handle_restart(self, web_request): - self.gcode.run_script('restart') + self.gcode.run_script("restart") + def _handle_firmware_restart(self, web_request): - self.gcode.run_script('firmware_restart') + self.gcode.run_script("firmware_restart") + def _output_callback(self, msg): for cconn, template in list(self.clients.items()): if cconn.is_closed(): del self.clients[cconn] continue tmp = dict(template) - tmp['params'] = {'response': msg} + tmp["params"] = {"response": msg} cconn.send(tmp) + def _handle_subscribe_output(self, web_request): cconn = web_request.get_client_connection() - template = web_request.get_dict('response_template', {}) + template = web_request.get_dict("response_template", {}) self.clients[cconn] = template if not self.is_output_registered: self.gcode.register_output_handler(self._output_callback) self.is_output_registered = True -SUBSCRIPTION_REFRESH_TIME = .25 + +SUBSCRIPTION_REFRESH_TIME = 0.25 + class QueryStatusHelper: def __init__(self, printer): @@ -476,14 +508,17 @@ class QueryStatusHelper: self.query_timer = None self.last_query = {} # Register webhooks - webhooks = printer.lookup_object('webhooks') + webhooks = printer.lookup_object("webhooks") webhooks.register_endpoint("objects/list", self._handle_list) webhooks.register_endpoint("objects/query", self._handle_query) webhooks.register_endpoint("objects/subscribe", self._handle_subscribe) + def _handle_list(self, web_request): - objects = [n for n, o in self.printer.lookup_objects() - if hasattr(o, 'get_status')] - web_request.send({'objects': objects}) + objects = [ + n for n, o in self.printer.lookup_objects() if hasattr(o, "get_status") + ] + web_request.send({"objects": objects}) + def _do_query(self, eventtime): last_query = self.last_query query = self.last_query = {} @@ -502,7 +537,7 @@ class QueryStatusHelper: res = query.get(obj_name, None) if res is None: po = self.printer.lookup_object(obj_name, None) - if po is None or not hasattr(po, 'get_status'): + if po is None or not hasattr(po, "get_status"): res = query[obj_name] = {} else: res = query[obj_name] = po.get_status(eventtime) @@ -521,7 +556,7 @@ class QueryStatusHelper: # Send data if cquery or is_query: tmp = dict(template) - tmp['params'] = {'eventtime': eventtime, 'status': cquery} + tmp["params"] = {"eventtime": eventtime, "status": cquery} send_func(tmp) if not query: # Unregister timer if there are no longer any subscriptions @@ -530,8 +565,9 @@ class QueryStatusHelper: self.query_timer = None return reactor.NEVER return eventtime + SUBSCRIPTION_REFRESH_TIME + def _handle_query(self, web_request, is_subscribe=False): - objects = web_request.get_dict('objects') + objects = web_request.get_dict("objects") # Validate subscription format for k, v in objects.items(): if type(k) != str or (v is not None and type(v) != list): @@ -542,7 +578,7 @@ class QueryStatusHelper: raise web_request.error("Invalid argument") # Add to pending queries cconn = web_request.get_client_connection() - template = web_request.get_dict('response_template', {}) + template = web_request.get_dict("response_template", {}) if is_subscribe and cconn in self.clients: del self.clients[cconn] reactor = self.printer.get_reactor() @@ -554,13 +590,15 @@ class QueryStatusHelper: self.query_timer = qt # Wait for data to be queried msg = complete.wait() - web_request.send(msg['params']) + web_request.send(msg["params"]) if is_subscribe: self.clients[cconn] = (cconn, objects, cconn.send, template) + def _handle_subscribe(self, web_request): self._handle_query(web_request, is_subscribe=True) + def add_early_printer_objects(printer): - printer.add_object('webhooks', WebHooks(printer)) + printer.add_object("webhooks", WebHooks(printer)) GCodeHelper(printer) QueryStatusHelper(printer) |