aboutsummaryrefslogtreecommitdiffstats
path: root/klippy
diff options
context:
space:
mode:
authorKevin O'Connor <kevin@koconnor.net>2020-08-11 21:31:09 -0400
committerKevin O'Connor <kevin@koconnor.net>2020-08-16 13:23:05 -0400
commitb0e3effb5324df0ed7371c3c0103139d65f2480e (patch)
treeec1e4fc3071bf35009d79c72be0cdd079753cc6a /klippy
parent16a53e6918a6514941095692a2ea588ca5274413 (diff)
downloadkutter-b0e3effb5324df0ed7371c3c0103139d65f2480e.tar.gz
kutter-b0e3effb5324df0ed7371c3c0103139d65f2480e.tar.xz
kutter-b0e3effb5324df0ed7371c3c0103139d65f2480e.zip
webhooks: Rework get_status() subscriptions
Implement a new subscription system for get_status() updates. Subscriptions are per-client. After an initial update, only changes will be transmitted. Responses are only transmitted to the client that issued the subscription. Signed-off-by: Kevin O'Connor <kevin@koconnor.net>
Diffstat (limited to 'klippy')
-rw-r--r--klippy/webhooks.py205
1 files changed, 89 insertions, 116 deletions
diff --git a/klippy/webhooks.py b/klippy/webhooks.py
index d10f5c2c..358e9646 100644
--- a/klippy/webhooks.py
+++ b/klippy/webhooks.py
@@ -357,128 +357,101 @@ class GCodeHelper:
SUBSCRIPTION_REFRESH_TIME = .25
-class StatusHandler:
+class QueryStatusHelper:
def __init__(self, printer):
self.printer = printer
- self.webhooks = webhooks = printer.lookup_object('webhooks')
- self.ready = self.timer_started = False
- self.reactor = self.printer.get_reactor()
- self.available_objects = {}
- self.subscriptions = {}
- self.subscription_timer = self.reactor.register_timer(
- self._batch_subscription_handler, self.reactor.NEVER)
-
- # Register events
- self.printer.register_event_handler(
- "klippy:ready", self._handle_ready)
- self.printer.register_event_handler(
- "gcode:request_restart", self._handle_restart)
-
+ self.clients = {}
+ self.pending_queries = []
+ self.query_timer = None
+ self.last_query = {}
# Register webhooks
- webhooks.register_endpoint("objects/list", self._handle_object_request)
- webhooks.register_endpoint("objects/status",
- self._handle_status_request)
- webhooks.register_endpoint("objects/subscription",
- self._handle_subscription_request)
- webhooks.register_endpoint("objects/list_subscription",
- self._handle_list_subscription_request)
-
- def _handle_ready(self):
- eventtime = self.reactor.monotonic()
- self.available_objects = {}
- objs = self.printer.lookup_objects()
- status_objs = {n: o for n, o in objs if hasattr(o, "get_status")}
- for name, obj in status_objs.items():
- attrs = obj.get_status(eventtime)
- self.available_objects[name] = attrs.keys()
- self.ready = True
-
- def _handle_restart(self, eventtime):
- self.ready = False
- self.reactor.update_timer(self.subscription_timer, self.reactor.NEVER)
-
- def _batch_subscription_handler(self, eventtime):
- status = self._process_status_request(self.subscriptions, eventtime)
- self.webhooks.call_remote_method(
- "process_status_update", status=status)
- return eventtime + SUBSCRIPTION_REFRESH_TIME
-
- def _process_status_request(self, requested_objects, eventtime):
- result = {}
- if self.ready:
- for name, req_items in requested_objects.items():
- obj = self.printer.lookup_object(name, None)
- if obj is not None and name in self.available_objects:
- status = obj.get_status(eventtime)
- if not req_items:
- # return all items excluding callables
- result[name] = {k: v for k, v in status.items()
- if not callable(v)}
+ webhooks = printer.lookup_object('webhooks')
+ webhooks.register_endpoint("objects/list", self._handle_list)
+ webhooks.register_endpoint("objects/query", self._handle_query)
+ webhooks.register_endpoint("objects/subscribe", self._handle_subscribe)
+ def _handle_list(self, web_request):
+ objects = [n for n, o in self.printer.lookup_objects()
+ if hasattr(o, 'get_status')]
+ web_request.send({'objects': objects})
+ def _do_query(self, eventtime):
+ last_query = self.last_query
+ query = self.last_query = {}
+ msglist = self.pending_queries
+ self.pending_queries = []
+ msglist.extend(self.clients.values())
+ # Generate get_status() info for each client
+ for cconn, subscription, send_func, template in msglist:
+ is_query = cconn is None
+ if not is_query and cconn.is_closed():
+ del self.clients[cconn]
+ continue
+ # Query each requested printer object
+ cquery = {}
+ for obj_name, req_items in subscription.items():
+ res = query.get(obj_name, None)
+ if res is None:
+ po = self.printer.lookup_object(obj_name, None)
+ if po is None or not hasattr(po, 'get_status'):
+ res = query[obj_name] = {}
else:
- # return requested items excluding callables
- result[name] = {k: v for k, v in status.items()
- if k in req_items and not callable(v)}
- else:
- result = {"status": "Klippy Not Ready"}
- return result
-
- def _handle_object_request(self, web_request):
- web_request.send(dict(self.available_objects))
-
- def _handle_status_request(self, web_request):
- args = web_request.get_args()
- eventtime = self.reactor.monotonic()
- result = self._process_status_request(args, eventtime)
- web_request.send(result)
-
- def _handle_subscription_request(self, web_request):
- args = web_request.get_args()
- if args:
- self.add_subscripton(args)
- else:
+ res = query[obj_name] = po.get_status(eventtime)
+ if req_items is None:
+ req_items = list(res.keys())
+ if req_items:
+ subscription[obj_name] = req_items
+ lres = last_query.get(obj_name, {})
+ cres = {}
+ for ri in req_items:
+ rd = res.get(ri, None)
+ if not callable(rd) and (is_query or rd != lres.get(ri)):
+ cres[ri] = rd
+ if cres or is_query:
+ cquery[obj_name] = cres
+ # Send data
+ if cquery or is_query:
+ tmp = dict(template)
+ tmp['params'] = {'eventtime': eventtime, 'status': cquery}
+ send_func(tmp)
+ if not query:
+ # Unregister timer if there are no longer any subscriptions
+ reactor = self.printer.get_reactor()
+ reactor.unregister_timer(self.query_timer)
+ self.query_timer = None
+ return reactor.NEVER
+ return eventtime + SUBSCRIPTION_REFRESH_TIME
+ def _handle_query(self, web_request, is_subscribe=False):
+ objects = web_request.get('objects')
+ # Validate subscription format
+ if type(objects) != type({}):
raise web_request.error("Invalid argument")
-
- def _handle_list_subscription_request(self, web_request):
- web_request.send(dict(self.subscriptions))
-
- def add_subscripton(self, new_sub):
- if not new_sub:
- return
- for obj_name, req_items in new_sub.items():
- if obj_name not in self.available_objects:
- logging.info(
- "webhooks: Object {%s} not available for subscription"
- % (obj_name))
- continue
- # validate requested items
- if req_items:
- avail_items = set(self.available_objects[obj_name])
- invalid_items = set(req_items) - avail_items
- if invalid_items:
- logging.info(
- "webhooks: Removed invalid items [%s] from "
- "subscription request %s" %
- (", ".join(invalid_items), obj_name))
- req_items = list(set(req_items) - invalid_items)
- if not req_items:
- # No valid items remaining
- continue
- # Add or update subscription
- existing_items = self.subscriptions.get(obj_name, None)
- if existing_items is not None:
- if req_items == [] or existing_items == []:
- # Subscribe to all items
- self.subscriptions[obj_name] = []
- else:
- req_items = list(set(req_items) | set(existing_items))
- self.subscriptions[obj_name] = req_items
- else:
- self.subscriptions[obj_name] = req_items
- if not self.timer_started:
- self.reactor.update_timer(self.subscription_timer, self.reactor.NOW)
- self.timer_started = True
+ for k, v in objects.items():
+ if type(k) != type("") or type(v) not in [type([]), type(None)]:
+ raise web_request.error("Invalid argument")
+ if v is not None:
+ for ri in v:
+ if type(ri) != type(""):
+ raise web_request.error("Invalid argument")
+ # Add to pending queries
+ cconn = web_request.get_client_connection()
+ template = web_request.get('response_template', {})
+ if is_subscribe and cconn in self.clients:
+ del self.clients[cconn]
+ reactor = self.printer.get_reactor()
+ complete = reactor.completion()
+ self.pending_queries.append((None, objects, complete.complete, {}))
+ # Start timer if needed
+ if self.query_timer is None:
+ qt = reactor.register_timer(self._do_query, reactor.NOW)
+ self.query_timer = qt
+ # Wait for data to be queried
+ msg = complete.wait()
+ web_request.send(msg['params'])
+ if is_subscribe:
+ self.clients[cconn] = (cconn, objects, cconn.send, template)
+ def _handle_subscribe(self, web_request):
+ self._handle_query(web_request, is_subscribe=True)
def add_early_printer_objects(printer):
printer.add_object('webhooks', WebHooks(printer))
GCodeHelper(printer)
- StatusHandler(printer)
+ QueryStatusHelper(printer)