aboutsummaryrefslogtreecommitdiffstats
path: root/klippy/extras/temperature_combined.py
blob: d81139810b4b0537a6dc98b8d71188bdfd30a191 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# -*- coding: utf-8 -*-
# Support for combination of temperature sensors
#
# Copyright (C) 2023  Michael Jäger <michael@mjaeger.eu>
#
# This file may be distributed under the terms of the GNU GPLv3 license.

REPORT_TIME = 0.300


class PrinterSensorCombined:
    def __init__(self, config):
        self.printer = config.get_printer()
        self.reactor = self.printer.get_reactor()
        self.name = config.get_name().split()[-1]
        # get sensor names
        self.sensor_names = config.getlist("sensor_list")
        # get maximum_deviation parameter from config
        self.max_deviation = config.getfloat("maximum_deviation", above=0.0)
        # ensure compatibility with itself
        self.sensor = self
        # get empty list for sensors, could be any sensor class or a heater
        self.sensors = []
        # get combination method to handle the different sensor values with
        algos = {"min": min, "max": max, "mean": mean}
        self.apply_mode = config.getchoice("combination_method", algos)
        # set default values
        self.last_temp = self.min_temp = self.max_temp = 0.0
        # add object
        self.printer.add_object("temperature_combined " + self.name, self)
        # time-controlled sensor update
        self.temperature_update_timer = self.reactor.register_timer(
            self._temperature_update_event
        )
        self.printer.register_event_handler("klippy:connect", self._handle_connect)
        self.printer.register_event_handler("klippy:ready", self._handle_ready)

    def _handle_connect(self):
        for sensor_name in self.sensor_names:
            sensor = self.printer.lookup_object(sensor_name)
            # check if sensor has get_status function and
            # get_status has a 'temperature' value
            if not hasattr(sensor, "get_status"):
                raise self.printer.config_error(
                    "'%s' does not have a status." % (sensor_name,)
                )
            status = sensor.get_status(self.reactor.monotonic())
            if "temperature" not in status:
                raise self.printer.config_error(
                    "'%s' does not report a temperature." % (sensor_name,)
                )
            # Handle temperature monitors
            if status["temperature"] is None:
                raise self.printer.config_error(
                    "Temperature monitor '%s' is not supported" % (sensor_name,)
                )

            self.sensors.append(sensor)

    def _handle_ready(self):
        # Start temperature update timer
        # There is a race condition with sensors where they can be not ready,
        # and return 0 or None - initialize a little bit later.
        self.reactor.update_timer(
            self.temperature_update_timer, self.reactor.monotonic() + 1.0
        )

    def setup_minmax(self, min_temp, max_temp):
        self.min_temp = min_temp
        self.max_temp = max_temp

    def setup_callback(self, temperature_callback):
        self.temperature_callback = temperature_callback

    def get_report_time_delta(self):
        return REPORT_TIME

    def update_temp(self, eventtime):
        values = []
        for sensor in self.sensors:
            sensor_status = sensor.get_status(eventtime)
            sensor_temperature = sensor_status["temperature"]
            values.append(sensor_temperature)

        # check if values are out of max_deviation range
        if (max(values) - min(values)) > self.max_deviation:
            self.printer.invoke_shutdown(
                "COMBINED SENSOR maximum deviation exceeded limit of %0.1f, "
                "max sensor value %0.1f, min sensor value %0.1f."
                % (
                    self.max_deviation,
                    max(values),
                    min(values),
                )
            )

        temp = self.apply_mode(values)
        if temp:
            self.last_temp = temp

    def get_temp(self, eventtime):
        return self.last_temp, 0.0

    def get_status(self, eventtime):
        return {
            "temperature": round(self.last_temp, 2),
        }

    def _temperature_update_event(self, eventtime):
        # update sensor value
        self.update_temp(eventtime)

        # check min / max temp values
        if self.last_temp < self.min_temp:
            self.printer.invoke_shutdown(
                "COMBINED SENSOR temperature %0.1f "
                "below minimum temperature of %0.1f."
                % (
                    self.last_temp,
                    self.min_temp,
                )
            )
        if self.last_temp > self.max_temp:
            self.printer.invoke_shutdown(
                "COMBINED SENSOR temperature %0.1f "
                "above maximum temperature of %0.1f."
                % (
                    self.last_temp,
                    self.max_temp,
                )
            )

        # this is copied from temperature_host to enable time triggered updates
        # get mcu and measured / current(?) time
        mcu = self.printer.lookup_object("mcu")
        measured_time = self.reactor.monotonic()
        # convert to print time?! for the callback???
        self.temperature_callback(
            mcu.estimated_print_time(measured_time), self.last_temp
        )
        # set next update time
        return measured_time + REPORT_TIME


def mean(values):
    if not values:
        return
    return sum(values) / len(values)


def load_config(config):
    pheaters = config.get_printer().load_object(config, "heaters")
    pheaters.add_sensor_factory("temperature_combined", PrinterSensorCombined)