aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--klippy/extras/sos_filter.py232
-rw-r--r--src/Kconfig4
-rw-r--r--src/Makefile1
-rw-r--r--src/sos_filter.c158
-rw-r--r--src/sos_filter.h12
5 files changed, 407 insertions, 0 deletions
diff --git a/klippy/extras/sos_filter.py b/klippy/extras/sos_filter.py
new file mode 100644
index 00000000..f5ba3c27
--- /dev/null
+++ b/klippy/extras/sos_filter.py
@@ -0,0 +1,232 @@
+# Second Order Sections Filter
+#
+# Copyright (C) 2025 Gareth Farrington <gareth@waves.ky>
+#
+# This file may be distributed under the terms of the GNU GPLv3 license.
+
+MAX_INT32 = (2 ** 31)
+MIN_INT32 = -(2 ** 31) - 1
+def assert_is_int32(value, error):
+ if value > MAX_INT32 or value < MIN_INT32:
+ raise OverflowError(error)
+ return value
+
+# convert a floating point value to a 32 bit fixed point representation
+# checks for overflow
+def to_fixed_32(value, int_bits):
+ fractional_bits = (32 - (1 + int_bits))
+ fixed_val = int(value * (2 ** fractional_bits))
+ return assert_is_int32(fixed_val, "Fixed point Q%i overflow"
+ % (int_bits,))
+
+
+# Digital filter designer and container
+class DigitalFilter:
+ def __init__(self, sps, cfg_error, highpass=None, highpass_order=1,
+ lowpass=None, lowpass_order=1, notches=None, notch_quality=2.0):
+ self.filter_sections = []
+ self.initial_state = []
+ self.sample_frequency = sps
+ # an empty filter can be created without SciPi/numpy
+ if not (highpass or lowpass or notches):
+ return
+ try:
+ import scipy.signal as signal
+ except:
+ raise cfg_error("DigitalFilter require the SciPy module")
+ if highpass:
+ self.filter_sections.append(
+ self._butter(highpass, "highpass", highpass_order))
+ if lowpass:
+ self.filter_sections.append(
+ self._butter(lowpass, "lowpass", lowpass_order))
+ for notch_freq in notches:
+ self.filter_sections.append(self._notch(notch_freq, notch_quality))
+ if len(self.filter_sections) > 0:
+ self.initial_state = signal.sosfilt_zi(self.filter_sections)
+
+ def _butter(self, frequency, btype, order):
+ import scipy.signal as signal
+ return signal.butter(order, Wn=frequency, btype=btype,
+ fs=self.sample_frequency, output='sos')[0]
+
+ def _notch(self, freq, quality):
+ import scipy.signal as signal
+ b, a = signal.iirnotch(freq, Q=quality, fs=self.sample_frequency)
+ return signal.tf2sos(b, a)[0]
+
+ def get_filter_sections(self):
+ return self.filter_sections
+
+ def get_initial_state(self):
+ return self.initial_state
+
+# container that accepts SciPy formatted SOS filter data and converts it to a
+# selected fixed point representation. This data could come from DigitalFilter,
+# static data, config etc.
+class FixedPointSosFilter:
+ # filter_sections is an array of SciPy formatted SOS filter sections (sos)
+ # initial_state is an array of SciPy formatted SOS state sections (zi)
+ def __init__(self, filter_sections=None, initial_state=None,
+ coeff_int_bits=2, value_int_bits=15):
+ filter_sections = [] if filter_sections is None else filter_sections
+ initial_state = [] if initial_state is None else initial_state
+ num_sections = len(filter_sections)
+ num_state = len(initial_state)
+ if num_state != num_sections:
+ raise ValueError("The number of filter sections (%i) and state "
+ "sections (%i) must be equal" % (
+ num_sections, num_state))
+ self._coeff_int_bits = self._validate_int_bits(coeff_int_bits)
+ self._value_int_bits = self._validate_int_bits(value_int_bits)
+ self._filter = self._convert_filter(filter_sections)
+ self._state = self._convert_state(initial_state)
+
+ def get_filter_sections(self):
+ return self._filter
+
+ def get_initial_state(self):
+ return self._state
+
+ def get_coeff_int_bits(self):
+ return self._coeff_int_bits
+
+ def get_value_int_bits(self):
+ return self._value_int_bits
+
+ def get_num_sections(self):
+ return len(self._filter)
+
+ def _validate_int_bits(self, int_bits):
+ if int_bits < 1 or int_bits > 30:
+ raise ValueError("The number of integer bits (%i) must be a"
+ " value between 1 and 30" % (int_bits,))
+ return int_bits
+
+ # convert the SciPi SOS filters to fixed point format
+ def _convert_filter(self, filter_sections):
+ sos_fixed = []
+ for section in filter_sections:
+ nun_coeff = len(section)
+ if nun_coeff != 6:
+ raise ValueError("The number of filter coefficients is %i"
+ ", must be 6" % (nun_coeff,))
+ fixed_section = []
+ for col, coeff in enumerate(section):
+ if col != 3: # omit column 3
+ fixed_coeff = to_fixed_32(coeff, self._coeff_int_bits)
+ fixed_section.append(fixed_coeff)
+ elif coeff != 1.0: # double check colum 3 is always 1.0
+ raise ValueError("Coefficient 3 is expected to be 1.0"
+ " but was %f" % (coeff,))
+ sos_fixed.append(fixed_section)
+ return sos_fixed
+
+ # convert the SOS filter state matrix (zi) to fixed point format
+ def _convert_state(self, filter_state):
+ sos_state = []
+ for section in filter_state:
+ nun_states = len(section)
+ if nun_states != 2:
+ raise ValueError(
+ "The number of state elements is %i, must be 2"
+ % (nun_states,))
+ fixed_state = []
+ for col, value in enumerate(section):
+ fixed_state.append(to_fixed_32(value, self._value_int_bits))
+ sos_state.append(fixed_state)
+ return sos_state
+
+
+# Control an `sos_filter` object on the MCU
+class SosFilter:
+ # fixed_point_filter should be an FixedPointSosFilter instance. A filter of
+ # size 0 will create a passthrough filter.
+ # max_sections should be the largest number of sections you expect
+ # to use at runtime. The default is the size of the fixed_point_filter.
+ def __init__(self, mcu, cmd_queue, fixed_point_filter, max_sections=None):
+ self._mcu = mcu
+ self._cmd_queue = cmd_queue
+ self._oid = self._mcu.create_oid()
+ self._filter = fixed_point_filter
+ self._max_sections = max_sections
+ if self._max_sections is None:
+ self._max_sections = self._filter.get_num_sections()
+ self._cmd_set_section = [
+ "sos_filter_set_section oid=%d section_idx=%d"
+ " sos0=%i sos1=%i sos2=%i sos3=%i sos4=%i",
+ "sos_filter_set_section oid=%c section_idx=%c"
+ " sos0=%i sos1=%i sos2=%i sos3=%i sos4=%i"]
+ self._cmd_config_state = [
+ "sos_filter_set_state oid=%d section_idx=%d state0=%i state1=%i",
+ "sos_filter_set_state oid=%c section_idx=%c state0=%i state1=%i"]
+ self._cmd_activate = [
+ "sos_filter_set_active oid=%d n_sections=%d coeff_int_bits=%d",
+ "sos_filter_set_active oid=%c n_sections=%c coeff_int_bits=%c"]
+ self._mcu.register_config_callback(self._build_config)
+
+ def _build_config(self):
+ cmds = [self._cmd_set_section, self._cmd_config_state,
+ self._cmd_activate]
+ for cmd in cmds:
+ cmd.append(self._mcu.lookup_command(cmd[1], cq=self._cmd_queue))
+
+ def get_oid(self):
+ return self._oid
+
+ # create an uninitialized filter object
+ def create_filter(self):
+ self._mcu.add_config_cmd("config_sos_filter oid=%d max_sections=%u"
+ % (self._oid, self._max_sections))
+ self._configure_filter(is_init=True)
+
+ # either setup an init command or send the command based on a flag
+ def _cmd(self, command, args, is_init=False):
+ if is_init:
+ self._mcu.add_config_cmd(command[0] % args, is_init=True)
+ else:
+ command[2].send(args)
+
+ def _set_filter_sections(self, is_init=False):
+ for i, section in enumerate(self._filter.get_filter_sections()):
+ args = (self._oid, i, section[0], section[1], section[2],
+ section[3], section[4])
+ self._cmd(self._cmd_set_section, args, is_init)
+
+ def _set_filter_state(self, is_init=False):
+ for i, state in enumerate(self._filter.get_initial_state()):
+ args = (self._oid, i, state[0], state[1])
+ self._cmd(self._cmd_config_state, args, is_init)
+
+ def _activate_filter(self, is_init=False):
+ args = (self._oid, self._filter.get_num_sections(),
+ self._filter.get_coeff_int_bits())
+ self._cmd(self._cmd_activate, args, is_init)
+
+ # configure the filter sections on the mcu
+ # filters should be an array of filter sections in SciPi SOS format
+ # sos_filter_state should be an array of zi filter state elements
+ def _configure_filter(self, is_init=False):
+ num_sections = self._filter.get_num_sections()
+ if num_sections > self._max_sections:
+ raise ValueError("Too many filter sections: %i, The max is %i"
+ % (num_sections, self._max_sections,))
+ # convert to fixed point to find errors
+ # no errors, state is accepted
+ # configure MCU filter and activate
+ self._set_filter_sections(is_init)
+ self._set_filter_state(is_init,)
+ self._activate_filter(is_init)
+
+ # Change the filter coefficients and state at runtime
+ # fixed_point_filter should be an FixedPointSosFilter instance
+ # cq is an optional command queue to for command sequencing
+ def change_filter(self, fixed_point_filter):
+ self._filter = fixed_point_filter
+ self._configure_filter(False)
+
+ # Resets the filter state back to initial conditions at runtime
+ # cq is an optional command queue to for command sequencing
+ def reset_filter(self):
+ self._set_filter_state(False)
+ self._activate_filter(False)
diff --git a/src/Kconfig b/src/Kconfig
index 05b9cb42..fba367d3 100644
--- a/src/Kconfig
+++ b/src/Kconfig
@@ -177,6 +177,10 @@ config NEED_SENSOR_BULK
depends on WANT_ADXL345 || WANT_LIS2DW || WANT_MPU9250 || WANT_ICM20948 \
|| WANT_HX71X || WANT_ADS1220 || WANT_LDC1612 || WANT_SENSOR_ANGLE
default y
+config NEED_SOS_FILTER
+ bool
+ depends on WANT_HX71X || WANT_ADS1220
+ default y
menu "Optional features (to reduce code size)"
depends on HAVE_LIMITED_CODE_SIZE
config WANT_ADC
diff --git a/src/Makefile b/src/Makefile
index ebebf14d..75f2c3b3 100644
--- a/src/Makefile
+++ b/src/Makefile
@@ -27,3 +27,4 @@ src-$(CONFIG_WANT_ADS1220) += sensor_ads1220.c
src-$(CONFIG_WANT_LDC1612) += sensor_ldc1612.c
src-$(CONFIG_WANT_SENSOR_ANGLE) += sensor_angle.c
src-$(CONFIG_NEED_SENSOR_BULK) += sensor_bulk.c
+src-$(CONFIG_NEED_SOS_FILTER) += sos_filter.c
diff --git a/src/sos_filter.c b/src/sos_filter.c
new file mode 100644
index 00000000..7272241e
--- /dev/null
+++ b/src/sos_filter.c
@@ -0,0 +1,158 @@
+// Second Order sections Filter implementation using Fixed Point math
+//
+// Copyright (C) 2025 Gareth Farrington <gareth@waves.ky>
+//
+// This file may be distributed under the terms of the GNU GPLv3 license.
+
+#include "basecmd.h" // oid_alloc
+#include "command.h" // DECL_COMMAND
+#include "sched.h" // shutdown
+#include "sos_filter.h" // sos_filter
+
+typedef int32_t fixedQ_coeff_t;
+typedef int32_t fixedQ_value_t;
+
+// filter strucutre sizes
+#define SECTION_WIDTH 5
+#define STATE_WIDTH 2
+
+struct sos_filter_section {
+ // filter composed of second order sections
+ fixedQ_coeff_t coeff[SECTION_WIDTH]; // aka sos
+ fixedQ_value_t state[STATE_WIDTH]; // aka zi
+};
+
+struct sos_filter {
+ uint8_t max_sections, n_sections, coeff_frac_bits, is_active;
+ uint32_t coeff_rounding;
+ // filter composed of second order sections
+ struct sos_filter_section filter[0];
+};
+
+inline uint8_t
+overflows_int32(int64_t value) {
+ return value > (int64_t)INT32_MAX || value < (int64_t)INT32_MIN;
+}
+
+// Multiply a coefficient in fixedQ_coeff_t by a value fixedQ_value_t
+static inline fixedQ_value_t
+fixed_mul(struct sos_filter *sf, const fixedQ_coeff_t coeff
+ , const fixedQ_value_t value) {
+ // This optimizes to single cycle SMULL on Arm Coretex M0+
+ int64_t product = (int64_t)coeff * (int64_t)value;
+ // round up at the last bit to be shifted away
+ product += sf->coeff_rounding;
+ // shift the decimal right to discard the coefficient fractional bits
+ int64_t result = product >> sf->coeff_frac_bits;
+ // check for overflow of int32_t
+ if (overflows_int32(result)) {
+ shutdown("fixed_mul: overflow");
+ }
+ // truncate significant 32 bits
+ return (fixedQ_value_t)result;
+}
+
+// Apply the sosfilt algorithm to a new datapoint
+// returns the fixedQ_value_t filtered value
+int32_t
+sosfilt(struct sos_filter *sf, const int32_t unfiltered_value) {
+ if (!sf->is_active) {
+ shutdown("sos_filter not property initialized");
+ }
+
+ // an empty filter performs no filtering
+ if (sf->n_sections == 0) {
+ return unfiltered_value;
+ }
+
+ fixedQ_value_t cur_val = unfiltered_value;
+ // foreach section
+ for (int section_idx = 0; section_idx < sf->n_sections; section_idx++) {
+ struct sos_filter_section *section = &(sf->filter[section_idx]);
+ // apply the section's filter coefficients to input
+ fixedQ_value_t next_val = fixed_mul(sf, section->coeff[0], cur_val);
+ next_val += section->state[0];
+ section->state[0] = fixed_mul(sf, section->coeff[1], cur_val)
+ - fixed_mul(sf, section->coeff[3], next_val)
+ + (section->state[1]);
+ section->state[1] = fixed_mul(sf, section->coeff[2], cur_val)
+ - fixed_mul(sf, section->coeff[4], next_val);
+ cur_val = next_val;
+ }
+
+ return (int32_t)cur_val;
+}
+
+// Create an sos_filter
+void
+command_config_sos_filter(uint32_t *args)
+{
+ uint32_t max_sections = args[1];
+ uint32_t size = offsetof(struct sos_filter, filter[max_sections]);
+ struct sos_filter *sf = oid_alloc(args[0]
+ , command_config_sos_filter, size);
+ sf->max_sections = max_sections;
+ sf->is_active = 0;
+}
+DECL_COMMAND(command_config_sos_filter, "config_sos_filter oid=%c"
+ " max_sections=%u");
+
+// Lookup an sos_filter
+struct sos_filter *
+sos_filter_oid_lookup(uint8_t oid)
+{
+ return oid_lookup(oid, command_config_sos_filter);
+}
+
+// Set one section of the filter
+void
+command_sos_filter_set_section(uint32_t *args)
+{
+ struct sos_filter *sf = sos_filter_oid_lookup(args[0]);
+ // setting a section marks the filter as inactive
+ sf->is_active = 0;
+ uint8_t section_idx = args[1];
+ // copy section data
+ const uint8_t arg_base = 2;
+ for (uint8_t i = 0; i < SECTION_WIDTH; i++) {
+ sf->filter[section_idx].coeff[i] = args[i + arg_base];
+ }
+}
+DECL_COMMAND(command_sos_filter_set_section
+ , "sos_filter_set_section oid=%c section_idx=%c"
+ " sos0=%i sos1=%i sos2=%i sos3=%i sos4=%i");
+
+// Set the state of one section of the filter
+void
+command_sos_filter_set_state(uint32_t *args)
+{
+ struct sos_filter *sf = sos_filter_oid_lookup(args[0]);
+ // setting a section's state marks the filter as inactive
+ sf->is_active = 0;
+ // copy state data
+ uint8_t section_idx = args[1];
+ const uint8_t arg_base = 2;
+ sf->filter[section_idx].state[0] = args[0 + arg_base];
+ sf->filter[section_idx].state[1] = args[1 + arg_base];
+}
+DECL_COMMAND(command_sos_filter_set_state
+ , "sos_filter_set_state oid=%c section_idx=%c state0=%i state1=%i");
+
+// Set one section of the filter
+void
+command_sos_filter_activate(uint32_t *args)
+{
+ struct sos_filter *sf = sos_filter_oid_lookup(args[0]);
+ uint8_t n_sections = args[1];
+ if (n_sections > sf->max_sections) {
+ shutdown("Filter section count larger than max_sections");
+ }
+ sf->n_sections = n_sections;
+ const uint8_t coeff_int_bits = args[2];
+ sf->coeff_frac_bits = (31 - coeff_int_bits);
+ sf->coeff_rounding = (1 << (sf->coeff_frac_bits - 1));
+ // mark filter as ready to use
+ sf->is_active = 1;
+}
+DECL_COMMAND(command_sos_filter_activate
+ , "sos_filter_set_active oid=%c n_sections=%c coeff_int_bits=%c");
diff --git a/src/sos_filter.h b/src/sos_filter.h
new file mode 100644
index 00000000..6c215dda
--- /dev/null
+++ b/src/sos_filter.h
@@ -0,0 +1,12 @@
+#ifndef __SOS_FILTER_H
+#define __SOS_FILTER_H
+
+#include <stdint.h>
+
+struct sos_filter;
+
+int32_t sosfilt(struct sos_filter *sf
+ , const int32_t unfiltered_value);
+struct sos_filter *sos_filter_oid_lookup(uint8_t oid);
+
+#endif // sos_filter.h