diff options
author | Kevin O'Connor <kevin@koconnor.net> | 2024-09-25 23:34:59 -0400 |
---|---|---|
committer | Kevin O'Connor <kevin@koconnor.net> | 2024-10-30 14:42:53 -0400 |
commit | 9adb313ee870fd7bfa6b7dd363d36e609788e531 (patch) | |
tree | f4f3770d52a5e4e018222d83ce2d533ed7415a53 | |
parent | faa89be816064b42bff1ba81478405490e49289c (diff) | |
download | kutter-9adb313ee870fd7bfa6b7dd363d36e609788e531.tar.gz kutter-9adb313ee870fd7bfa6b7dd363d36e609788e531.tar.xz kutter-9adb313ee870fd7bfa6b7dd363d36e609788e531.zip |
configfile: Split configfile code into three separate classes
Separate out the low-level parsing code to a new ConfigFileReader()
class.
Separate out the auto-save handling code to a new ConfigAutoSave()
class.
This simplifies the main PrinterConfig() class.
Signed-off-by: Kevin O'Connor <kevin@koconnor.net>
-rw-r--r-- | klippy/configfile.py | 363 |
1 files changed, 203 insertions, 160 deletions
diff --git a/klippy/configfile.py b/klippy/configfile.py index a8a4a4ff..5b9896a0 100644 --- a/klippy/configfile.py +++ b/klippy/configfile.py @@ -1,12 +1,17 @@ # Code for reading and writing the Klipper config file # -# Copyright (C) 2016-2021 Kevin O'Connor <kevin@koconnor.net> +# Copyright (C) 2016-2024 Kevin O'Connor <kevin@koconnor.net> # # This file may be distributed under the terms of the GNU GPLv3 license. import sys, os, glob, re, time, logging, configparser, io error = configparser.Error + +###################################################################### +# Config section parsing helper +###################################################################### + class sentinel: pass @@ -134,30 +139,13 @@ class ConfigWrapper: pconfig = self.printer.lookup_object("configfile") pconfig.deprecate(self.section, option, value, msg) -AUTOSAVE_HEADER = """ -#*# <---------------------- SAVE_CONFIG ----------------------> -#*# DO NOT EDIT THIS BLOCK OR BELOW. The contents are auto-generated. -#*# -""" -class PrinterConfig: - def __init__(self, printer): - self.printer = printer - self.autosave = None - self.deprecated = {} - self.runtime_warnings = [] - self.deprecate_warnings = [] - self.status_raw_config = {} - self.status_save_pending = {} - self.status_settings = {} - self.status_warnings = [] - self.save_config_pending = False - gcode = self.printer.lookup_object('gcode') - gcode.register_command("SAVE_CONFIG", self.cmd_SAVE_CONFIG, - desc=self.cmd_SAVE_CONFIG_help) - def get_printer(self): - return self.printer - def _read_config_file(self, filename): +###################################################################### +# Config file parsing (with include file support) +###################################################################### + +class ConfigFileReader: + def read_config_file(self, filename): try: f = open(filename, 'r') data = f.read() @@ -167,53 +155,6 @@ class PrinterConfig: logging.exception(msg) raise error(msg) return data.replace('\r\n', '\n') - def _find_autosave_data(self, data): - regular_data = data - autosave_data = "" - pos = data.find(AUTOSAVE_HEADER) - if pos >= 0: - regular_data = data[:pos] - autosave_data = data[pos + len(AUTOSAVE_HEADER):].strip() - # Check for errors and strip line prefixes - if "\n#*# " in regular_data: - logging.warning("Can't read autosave from config file" - " - autosave state corrupted") - return data, "" - out = [""] - for line in autosave_data.split('\n'): - if ((not line.startswith("#*#") - or (len(line) >= 4 and not line.startswith("#*# "))) - and autosave_data): - logging.warning("Can't read autosave from config file" - " - modifications after header") - return data, "" - out.append(line[4:]) - out.append("") - return regular_data, "\n".join(out) - comment_r = re.compile('[#;].*$') - value_r = re.compile('[^A-Za-z0-9_].*$') - def _strip_duplicates(self, data, config): - # Comment out fields in 'data' that are defined in 'config' - lines = data.split('\n') - section = None - is_dup_field = False - for lineno, line in enumerate(lines): - pruned_line = self.comment_r.sub('', line).rstrip() - if not pruned_line: - continue - if pruned_line[0].isspace(): - if is_dup_field: - lines[lineno] = '#' + lines[lineno] - continue - is_dup_field = False - if pruned_line[0] == '[': - section = pruned_line[1:-1].strip() - continue - field = self.value_r.sub('', pruned_line) - if config.fileconfig.has_option(section, field): - is_dup_field = True - lines[lineno] = '#' + lines[lineno] - return "\n".join(lines) def _parse_config_buffer(self, buffer, filename, fileconfig): if not buffer: return @@ -235,7 +176,7 @@ class PrinterConfig: raise error("Include file '%s' does not exist" % (include_glob,)) include_filenames.sort() for include_filename in include_filenames: - include_data = self._read_config_file(include_filename) + include_data = self.read_config_file(include_filename) self._parse_config(include_data, include_filename, fileconfig, visited) return include_filenames @@ -265,97 +206,104 @@ class PrinterConfig: buffer.append(line) self._parse_config_buffer(buffer, filename, fileconfig) visited.remove(path) - def _build_config_wrapper(self, data, filename): + def build_fileconfig(self, data, filename): if sys.version_info.major >= 3: fileconfig = configparser.RawConfigParser( strict=False, inline_comment_prefixes=(';', '#')) else: fileconfig = configparser.RawConfigParser() self._parse_config(data, filename, fileconfig, set()) - return ConfigWrapper(self.printer, fileconfig, {}, 'printer') - def _build_config_string(self, config): + return fileconfig + def build_config_string(self, fileconfig): sfile = io.StringIO() - config.fileconfig.write(sfile) + fileconfig.write(sfile) return sfile.getvalue().strip() - def read_config(self, filename): - return self._build_config_wrapper(self._read_config_file(filename), - filename) - def read_main_config(self): + + +###################################################################### +# Config auto save helper +###################################################################### + +AUTOSAVE_HEADER = """ +#*# <---------------------- SAVE_CONFIG ----------------------> +#*# DO NOT EDIT THIS BLOCK OR BELOW. The contents are auto-generated. +#*# +""" + +class ConfigAutoSave: + def __init__(self, printer): + self.printer = printer + self.fileconfig = None + self.status_save_pending = {} + self.save_config_pending = False + gcode = self.printer.lookup_object('gcode') + gcode.register_command("SAVE_CONFIG", self.cmd_SAVE_CONFIG, + desc=self.cmd_SAVE_CONFIG_help) + def _find_autosave_data(self, data): + regular_data = data + autosave_data = "" + pos = data.find(AUTOSAVE_HEADER) + if pos >= 0: + regular_data = data[:pos] + autosave_data = data[pos + len(AUTOSAVE_HEADER):].strip() + # Check for errors and strip line prefixes + if "\n#*# " in regular_data: + logging.warning("Can't read autosave from config file" + " - autosave state corrupted") + return data, "" + out = [""] + for line in autosave_data.split('\n'): + if ((not line.startswith("#*#") + or (len(line) >= 4 and not line.startswith("#*# "))) + and autosave_data): + logging.warning("Can't read autosave from config file" + " - modifications after header") + return data, "" + out.append(line[4:]) + out.append("") + return regular_data, "\n".join(out) + comment_r = re.compile('[#;].*$') + value_r = re.compile('[^A-Za-z0-9_].*$') + def _strip_duplicates(self, data, fileconfig): + # Comment out fields in 'data' that are defined in 'config' + lines = data.split('\n') + section = None + is_dup_field = False + for lineno, line in enumerate(lines): + pruned_line = self.comment_r.sub('', line).rstrip() + if not pruned_line: + continue + if pruned_line[0].isspace(): + if is_dup_field: + lines[lineno] = '#' + lines[lineno] + continue + is_dup_field = False + if pruned_line[0] == '[': + section = pruned_line[1:-1].strip() + continue + field = self.value_r.sub('', pruned_line) + if fileconfig.has_option(section, field): + is_dup_field = True + lines[lineno] = '#' + lines[lineno] + return "\n".join(lines) + def load_main_config(self): filename = self.printer.get_start_args()['config_file'] - data = self._read_config_file(filename) + cfgrdr = ConfigFileReader() + data = cfgrdr.read_config_file(filename) regular_data, autosave_data = self._find_autosave_data(data) - regular_config = self._build_config_wrapper(regular_data, filename) - autosave_data = self._strip_duplicates(autosave_data, regular_config) - self.autosave = self._build_config_wrapper(autosave_data, filename) - cfg = self._build_config_wrapper(regular_data + autosave_data, filename) - return cfg - def check_unused_options(self, config): - fileconfig = config.fileconfig - objects = dict(self.printer.lookup_objects()) - # Determine all the fields that have been accessed - access_tracking = dict(config.access_tracking) - for section in self.autosave.fileconfig.sections(): - for option in self.autosave.fileconfig.options(section): - access_tracking[(section.lower(), option.lower())] = 1 - # Validate that there are no undefined parameters in the config file - valid_sections = { s: 1 for s, o in access_tracking } - for section_name in fileconfig.sections(): - section = section_name.lower() - if section not in valid_sections and section not in objects: - raise error("Section '%s' is not a valid config section" - % (section,)) - for option in fileconfig.options(section_name): - option = option.lower() - if (section, option) not in access_tracking: - raise error("Option '%s' is not valid in section '%s'" - % (option, section)) - # Setup get_status() - self._build_status(config) - def log_config(self, config): - lines = ["===== Config file =====", - self._build_config_string(config), - "======================="] - self.printer.set_rollover_info("config", "\n".join(lines)) - # Status reporting - def runtime_warning(self, msg): - logging.warning(msg) - res = {'type': 'runtime_warning', 'message': msg} - self.runtime_warnings.append(res) - self.status_warnings = self.runtime_warnings + self.deprecate_warnings - def deprecate(self, section, option, value=None, msg=None): - self.deprecated[(section, option, value)] = msg - def _build_status(self, config): - self.status_raw_config.clear() - for section in config.get_prefix_sections(''): - self.status_raw_config[section.get_name()] = section_status = {} - for option in section.get_prefix_options(''): - section_status[option] = section.get(option, note_valid=False) - self.status_settings = {} - for (section, option), value in config.access_tracking.items(): - self.status_settings.setdefault(section, {})[option] = value - self.deprecate_warnings = [] - for (section, option, value), msg in self.deprecated.items(): - if value is None: - res = {'type': 'deprecated_option'} - else: - res = {'type': 'deprecated_value', 'value': value} - res['message'] = msg - res['section'] = section - res['option'] = option - self.deprecate_warnings.append(res) - self.status_warnings = self.runtime_warnings + self.deprecate_warnings + regular_fileconfig = cfgrdr.build_fileconfig(regular_data, filename) + autosave_data = self._strip_duplicates(autosave_data, + regular_fileconfig) + self.fileconfig = cfgrdr.build_fileconfig(autosave_data, filename) + return cfgrdr.build_fileconfig(regular_data + autosave_data, filename) def get_status(self, eventtime): - return {'config': self.status_raw_config, - 'settings': self.status_settings, - 'warnings': self.status_warnings, - 'save_config_pending': self.save_config_pending, + return {'save_config_pending': self.save_config_pending, 'save_config_pending_items': self.status_save_pending} - # Autosave functions def set(self, section, option, value): - if not self.autosave.fileconfig.has_section(section): - self.autosave.fileconfig.add_section(section) + if not self.fileconfig.has_section(section): + self.fileconfig.add_section(section) svalue = str(value) - self.autosave.fileconfig.set(section, option, svalue) + self.fileconfig.set(section, option, svalue) pending = dict(self.status_save_pending) if not section in pending or pending[section] is None: pending[section] = {} @@ -366,8 +314,8 @@ class PrinterConfig: self.save_config_pending = True logging.info("save_config: set [%s] %s = %s", section, option, svalue) def remove_section(self, section): - if self.autosave.fileconfig.has_section(section): - self.autosave.fileconfig.remove_section(section) + if self.fileconfig.has_section(section): + self.fileconfig.remove_section(section) pending = dict(self.status_save_pending) pending[section] = None self.status_save_pending = pending @@ -379,20 +327,22 @@ class PrinterConfig: self.status_save_pending = pending self.save_config_pending = True def _disallow_include_conflicts(self, regular_data, cfgname, gcode): - config = self._build_config_wrapper(regular_data, cfgname) - for section in self.autosave.fileconfig.sections(): - for option in self.autosave.fileconfig.options(section): - if config.fileconfig.has_option(section, option): + cfgrdr = ConfigFileReader() + regular_fileconfig = cfgrdr.build_fileconfig(regular_data, cfgname) + for section in self.fileconfig.sections(): + for option in self.fileconfig.options(section): + if regular_fileconfig.has_option(section, option): msg = ("SAVE_CONFIG section '%s' option '%s' conflicts " "with included value" % (section, option)) raise gcode.error(msg) cmd_SAVE_CONFIG_help = "Overwrite config file and restart" def cmd_SAVE_CONFIG(self, gcmd): - if not self.autosave.fileconfig.sections(): + if not self.fileconfig.sections(): return gcode = self.printer.lookup_object('gcode') # Create string containing autosave data - autosave_data = self._build_config_string(self.autosave) + cfgrdr = ConfigFileReader() + autosave_data = cfgrdr.build_config_string(self.fileconfig) lines = [('#*# ' + l).strip() for l in autosave_data.split('\n')] lines.insert(0, "\n" + AUTOSAVE_HEADER.rstrip()) @@ -401,14 +351,14 @@ class PrinterConfig: # Read in and validate current config file cfgname = self.printer.get_start_args()['config_file'] try: - data = self._read_config_file(cfgname) + data = cfgrdr.read_config_file(cfgname) regular_data, old_autosave_data = self._find_autosave_data(data) - config = self._build_config_wrapper(regular_data, cfgname) + regular_fileconfig = cfgrdr.build_fileconfig(regular_data, cfgname) except error as e: msg = "Unable to parse existing config on SAVE_CONFIG" logging.exception(msg) raise gcode.error(msg) - regular_data = self._strip_duplicates(regular_data, self.autosave) + regular_data = self._strip_duplicates(regular_data, self.fileconfig) self._disallow_include_conflicts(regular_data, cfgname, gcode) data = regular_data.rstrip() + autosave_data # Determine filenames @@ -433,3 +383,96 @@ class PrinterConfig: raise gcode.error(msg) # Request a restart gcode.request_restart('restart') + + +###################################################################### +# Main printer config tracking +###################################################################### + +class PrinterConfig: + def __init__(self, printer): + self.printer = printer + self.autosave = ConfigAutoSave(printer) + self.deprecated = {} + self.runtime_warnings = [] + self.deprecate_warnings = [] + self.status_raw_config = {} + self.status_settings = {} + self.status_warnings = [] + def get_printer(self): + return self.printer + def read_config(self, filename): + cfgrdr = ConfigFileReader() + data = cfgrdr.read_config_file(filename) + fileconfig = cfgrdr.build_fileconfig(data, filename) + return ConfigWrapper(self.printer, fileconfig, {}, 'printer') + def read_main_config(self): + fileconfig = self.autosave.load_main_config() + return ConfigWrapper(self.printer, fileconfig, {}, 'printer') + def check_unused_options(self, config): + fileconfig = config.fileconfig + objects = dict(self.printer.lookup_objects()) + # Determine all the fields that have been accessed + access_tracking = dict(config.access_tracking) + for section in self.autosave.fileconfig.sections(): + for option in self.autosave.fileconfig.options(section): + access_tracking[(section.lower(), option.lower())] = 1 + # Validate that there are no undefined parameters in the config file + valid_sections = { s: 1 for s, o in access_tracking } + for section_name in fileconfig.sections(): + section = section_name.lower() + if section not in valid_sections and section not in objects: + raise error("Section '%s' is not a valid config section" + % (section,)) + for option in fileconfig.options(section_name): + option = option.lower() + if (section, option) not in access_tracking: + raise error("Option '%s' is not valid in section '%s'" + % (option, section)) + # Setup get_status() + self._build_status(config) + def log_config(self, config): + cfgrdr = ConfigFileReader() + lines = ["===== Config file =====", + cfgrdr.build_config_string(config.fileconfig), + "======================="] + self.printer.set_rollover_info("config", "\n".join(lines)) + # Status reporting + def runtime_warning(self, msg): + logging.warning(msg) + res = {'type': 'runtime_warning', 'message': msg} + self.runtime_warnings.append(res) + self.status_warnings = self.runtime_warnings + self.deprecate_warnings + def deprecate(self, section, option, value=None, msg=None): + self.deprecated[(section, option, value)] = msg + def _build_status(self, config): + self.status_raw_config.clear() + for section in config.get_prefix_sections(''): + self.status_raw_config[section.get_name()] = section_status = {} + for option in section.get_prefix_options(''): + section_status[option] = section.get(option, note_valid=False) + self.status_settings = {} + for (section, option), value in config.access_tracking.items(): + self.status_settings.setdefault(section, {})[option] = value + self.deprecate_warnings = [] + for (section, option, value), msg in self.deprecated.items(): + if value is None: + res = {'type': 'deprecated_option'} + else: + res = {'type': 'deprecated_value', 'value': value} + res['message'] = msg + res['section'] = section + res['option'] = option + self.deprecate_warnings.append(res) + self.status_warnings = self.runtime_warnings + self.deprecate_warnings + def get_status(self, eventtime): + status = {'config': self.status_raw_config, + 'settings': self.status_settings, + 'warnings': self.status_warnings} + status.update(self.autosave.get_status(eventtime)) + return status + # Autosave functions + def set(self, section, option, value): + self.autosave.set(section, option, value) + def remove_section(self, section): + self.autosave.remove_section(section) |