aboutsummaryrefslogtreecommitdiffstats
path: root/klippy/configfile.py
diff options
context:
space:
mode:
Diffstat (limited to 'klippy/configfile.py')
-rw-r--r--klippy/configfile.py363
1 files changed, 203 insertions, 160 deletions
diff --git a/klippy/configfile.py b/klippy/configfile.py
index a8a4a4ff..5b9896a0 100644
--- a/klippy/configfile.py
+++ b/klippy/configfile.py
@@ -1,12 +1,17 @@
# Code for reading and writing the Klipper config file
#
-# Copyright (C) 2016-2021 Kevin O'Connor <kevin@koconnor.net>
+# Copyright (C) 2016-2024 Kevin O'Connor <kevin@koconnor.net>
#
# This file may be distributed under the terms of the GNU GPLv3 license.
import sys, os, glob, re, time, logging, configparser, io
error = configparser.Error
+
+######################################################################
+# Config section parsing helper
+######################################################################
+
class sentinel:
pass
@@ -134,30 +139,13 @@ class ConfigWrapper:
pconfig = self.printer.lookup_object("configfile")
pconfig.deprecate(self.section, option, value, msg)
-AUTOSAVE_HEADER = """
-#*# <---------------------- SAVE_CONFIG ---------------------->
-#*# DO NOT EDIT THIS BLOCK OR BELOW. The contents are auto-generated.
-#*#
-"""
-class PrinterConfig:
- def __init__(self, printer):
- self.printer = printer
- self.autosave = None
- self.deprecated = {}
- self.runtime_warnings = []
- self.deprecate_warnings = []
- self.status_raw_config = {}
- self.status_save_pending = {}
- self.status_settings = {}
- self.status_warnings = []
- self.save_config_pending = False
- gcode = self.printer.lookup_object('gcode')
- gcode.register_command("SAVE_CONFIG", self.cmd_SAVE_CONFIG,
- desc=self.cmd_SAVE_CONFIG_help)
- def get_printer(self):
- return self.printer
- def _read_config_file(self, filename):
+######################################################################
+# Config file parsing (with include file support)
+######################################################################
+
+class ConfigFileReader:
+ def read_config_file(self, filename):
try:
f = open(filename, 'r')
data = f.read()
@@ -167,53 +155,6 @@ class PrinterConfig:
logging.exception(msg)
raise error(msg)
return data.replace('\r\n', '\n')
- def _find_autosave_data(self, data):
- regular_data = data
- autosave_data = ""
- pos = data.find(AUTOSAVE_HEADER)
- if pos >= 0:
- regular_data = data[:pos]
- autosave_data = data[pos + len(AUTOSAVE_HEADER):].strip()
- # Check for errors and strip line prefixes
- if "\n#*# " in regular_data:
- logging.warning("Can't read autosave from config file"
- " - autosave state corrupted")
- return data, ""
- out = [""]
- for line in autosave_data.split('\n'):
- if ((not line.startswith("#*#")
- or (len(line) >= 4 and not line.startswith("#*# ")))
- and autosave_data):
- logging.warning("Can't read autosave from config file"
- " - modifications after header")
- return data, ""
- out.append(line[4:])
- out.append("")
- return regular_data, "\n".join(out)
- comment_r = re.compile('[#;].*$')
- value_r = re.compile('[^A-Za-z0-9_].*$')
- def _strip_duplicates(self, data, config):
- # Comment out fields in 'data' that are defined in 'config'
- lines = data.split('\n')
- section = None
- is_dup_field = False
- for lineno, line in enumerate(lines):
- pruned_line = self.comment_r.sub('', line).rstrip()
- if not pruned_line:
- continue
- if pruned_line[0].isspace():
- if is_dup_field:
- lines[lineno] = '#' + lines[lineno]
- continue
- is_dup_field = False
- if pruned_line[0] == '[':
- section = pruned_line[1:-1].strip()
- continue
- field = self.value_r.sub('', pruned_line)
- if config.fileconfig.has_option(section, field):
- is_dup_field = True
- lines[lineno] = '#' + lines[lineno]
- return "\n".join(lines)
def _parse_config_buffer(self, buffer, filename, fileconfig):
if not buffer:
return
@@ -235,7 +176,7 @@ class PrinterConfig:
raise error("Include file '%s' does not exist" % (include_glob,))
include_filenames.sort()
for include_filename in include_filenames:
- include_data = self._read_config_file(include_filename)
+ include_data = self.read_config_file(include_filename)
self._parse_config(include_data, include_filename, fileconfig,
visited)
return include_filenames
@@ -265,97 +206,104 @@ class PrinterConfig:
buffer.append(line)
self._parse_config_buffer(buffer, filename, fileconfig)
visited.remove(path)
- def _build_config_wrapper(self, data, filename):
+ def build_fileconfig(self, data, filename):
if sys.version_info.major >= 3:
fileconfig = configparser.RawConfigParser(
strict=False, inline_comment_prefixes=(';', '#'))
else:
fileconfig = configparser.RawConfigParser()
self._parse_config(data, filename, fileconfig, set())
- return ConfigWrapper(self.printer, fileconfig, {}, 'printer')
- def _build_config_string(self, config):
+ return fileconfig
+ def build_config_string(self, fileconfig):
sfile = io.StringIO()
- config.fileconfig.write(sfile)
+ fileconfig.write(sfile)
return sfile.getvalue().strip()
- def read_config(self, filename):
- return self._build_config_wrapper(self._read_config_file(filename),
- filename)
- def read_main_config(self):
+
+
+######################################################################
+# Config auto save helper
+######################################################################
+
+AUTOSAVE_HEADER = """
+#*# <---------------------- SAVE_CONFIG ---------------------->
+#*# DO NOT EDIT THIS BLOCK OR BELOW. The contents are auto-generated.
+#*#
+"""
+
+class ConfigAutoSave:
+ def __init__(self, printer):
+ self.printer = printer
+ self.fileconfig = None
+ self.status_save_pending = {}
+ self.save_config_pending = False
+ gcode = self.printer.lookup_object('gcode')
+ gcode.register_command("SAVE_CONFIG", self.cmd_SAVE_CONFIG,
+ desc=self.cmd_SAVE_CONFIG_help)
+ def _find_autosave_data(self, data):
+ regular_data = data
+ autosave_data = ""
+ pos = data.find(AUTOSAVE_HEADER)
+ if pos >= 0:
+ regular_data = data[:pos]
+ autosave_data = data[pos + len(AUTOSAVE_HEADER):].strip()
+ # Check for errors and strip line prefixes
+ if "\n#*# " in regular_data:
+ logging.warning("Can't read autosave from config file"
+ " - autosave state corrupted")
+ return data, ""
+ out = [""]
+ for line in autosave_data.split('\n'):
+ if ((not line.startswith("#*#")
+ or (len(line) >= 4 and not line.startswith("#*# ")))
+ and autosave_data):
+ logging.warning("Can't read autosave from config file"
+ " - modifications after header")
+ return data, ""
+ out.append(line[4:])
+ out.append("")
+ return regular_data, "\n".join(out)
+ comment_r = re.compile('[#;].*$')
+ value_r = re.compile('[^A-Za-z0-9_].*$')
+ def _strip_duplicates(self, data, fileconfig):
+ # Comment out fields in 'data' that are defined in 'config'
+ lines = data.split('\n')
+ section = None
+ is_dup_field = False
+ for lineno, line in enumerate(lines):
+ pruned_line = self.comment_r.sub('', line).rstrip()
+ if not pruned_line:
+ continue
+ if pruned_line[0].isspace():
+ if is_dup_field:
+ lines[lineno] = '#' + lines[lineno]
+ continue
+ is_dup_field = False
+ if pruned_line[0] == '[':
+ section = pruned_line[1:-1].strip()
+ continue
+ field = self.value_r.sub('', pruned_line)
+ if fileconfig.has_option(section, field):
+ is_dup_field = True
+ lines[lineno] = '#' + lines[lineno]
+ return "\n".join(lines)
+ def load_main_config(self):
filename = self.printer.get_start_args()['config_file']
- data = self._read_config_file(filename)
+ cfgrdr = ConfigFileReader()
+ data = cfgrdr.read_config_file(filename)
regular_data, autosave_data = self._find_autosave_data(data)
- regular_config = self._build_config_wrapper(regular_data, filename)
- autosave_data = self._strip_duplicates(autosave_data, regular_config)
- self.autosave = self._build_config_wrapper(autosave_data, filename)
- cfg = self._build_config_wrapper(regular_data + autosave_data, filename)
- return cfg
- def check_unused_options(self, config):
- fileconfig = config.fileconfig
- objects = dict(self.printer.lookup_objects())
- # Determine all the fields that have been accessed
- access_tracking = dict(config.access_tracking)
- for section in self.autosave.fileconfig.sections():
- for option in self.autosave.fileconfig.options(section):
- access_tracking[(section.lower(), option.lower())] = 1
- # Validate that there are no undefined parameters in the config file
- valid_sections = { s: 1 for s, o in access_tracking }
- for section_name in fileconfig.sections():
- section = section_name.lower()
- if section not in valid_sections and section not in objects:
- raise error("Section '%s' is not a valid config section"
- % (section,))
- for option in fileconfig.options(section_name):
- option = option.lower()
- if (section, option) not in access_tracking:
- raise error("Option '%s' is not valid in section '%s'"
- % (option, section))
- # Setup get_status()
- self._build_status(config)
- def log_config(self, config):
- lines = ["===== Config file =====",
- self._build_config_string(config),
- "======================="]
- self.printer.set_rollover_info("config", "\n".join(lines))
- # Status reporting
- def runtime_warning(self, msg):
- logging.warning(msg)
- res = {'type': 'runtime_warning', 'message': msg}
- self.runtime_warnings.append(res)
- self.status_warnings = self.runtime_warnings + self.deprecate_warnings
- def deprecate(self, section, option, value=None, msg=None):
- self.deprecated[(section, option, value)] = msg
- def _build_status(self, config):
- self.status_raw_config.clear()
- for section in config.get_prefix_sections(''):
- self.status_raw_config[section.get_name()] = section_status = {}
- for option in section.get_prefix_options(''):
- section_status[option] = section.get(option, note_valid=False)
- self.status_settings = {}
- for (section, option), value in config.access_tracking.items():
- self.status_settings.setdefault(section, {})[option] = value
- self.deprecate_warnings = []
- for (section, option, value), msg in self.deprecated.items():
- if value is None:
- res = {'type': 'deprecated_option'}
- else:
- res = {'type': 'deprecated_value', 'value': value}
- res['message'] = msg
- res['section'] = section
- res['option'] = option
- self.deprecate_warnings.append(res)
- self.status_warnings = self.runtime_warnings + self.deprecate_warnings
+ regular_fileconfig = cfgrdr.build_fileconfig(regular_data, filename)
+ autosave_data = self._strip_duplicates(autosave_data,
+ regular_fileconfig)
+ self.fileconfig = cfgrdr.build_fileconfig(autosave_data, filename)
+ return cfgrdr.build_fileconfig(regular_data + autosave_data, filename)
def get_status(self, eventtime):
- return {'config': self.status_raw_config,
- 'settings': self.status_settings,
- 'warnings': self.status_warnings,
- 'save_config_pending': self.save_config_pending,
+ return {'save_config_pending': self.save_config_pending,
'save_config_pending_items': self.status_save_pending}
- # Autosave functions
def set(self, section, option, value):
- if not self.autosave.fileconfig.has_section(section):
- self.autosave.fileconfig.add_section(section)
+ if not self.fileconfig.has_section(section):
+ self.fileconfig.add_section(section)
svalue = str(value)
- self.autosave.fileconfig.set(section, option, svalue)
+ self.fileconfig.set(section, option, svalue)
pending = dict(self.status_save_pending)
if not section in pending or pending[section] is None:
pending[section] = {}
@@ -366,8 +314,8 @@ class PrinterConfig:
self.save_config_pending = True
logging.info("save_config: set [%s] %s = %s", section, option, svalue)
def remove_section(self, section):
- if self.autosave.fileconfig.has_section(section):
- self.autosave.fileconfig.remove_section(section)
+ if self.fileconfig.has_section(section):
+ self.fileconfig.remove_section(section)
pending = dict(self.status_save_pending)
pending[section] = None
self.status_save_pending = pending
@@ -379,20 +327,22 @@ class PrinterConfig:
self.status_save_pending = pending
self.save_config_pending = True
def _disallow_include_conflicts(self, regular_data, cfgname, gcode):
- config = self._build_config_wrapper(regular_data, cfgname)
- for section in self.autosave.fileconfig.sections():
- for option in self.autosave.fileconfig.options(section):
- if config.fileconfig.has_option(section, option):
+ cfgrdr = ConfigFileReader()
+ regular_fileconfig = cfgrdr.build_fileconfig(regular_data, cfgname)
+ for section in self.fileconfig.sections():
+ for option in self.fileconfig.options(section):
+ if regular_fileconfig.has_option(section, option):
msg = ("SAVE_CONFIG section '%s' option '%s' conflicts "
"with included value" % (section, option))
raise gcode.error(msg)
cmd_SAVE_CONFIG_help = "Overwrite config file and restart"
def cmd_SAVE_CONFIG(self, gcmd):
- if not self.autosave.fileconfig.sections():
+ if not self.fileconfig.sections():
return
gcode = self.printer.lookup_object('gcode')
# Create string containing autosave data
- autosave_data = self._build_config_string(self.autosave)
+ cfgrdr = ConfigFileReader()
+ autosave_data = cfgrdr.build_config_string(self.fileconfig)
lines = [('#*# ' + l).strip()
for l in autosave_data.split('\n')]
lines.insert(0, "\n" + AUTOSAVE_HEADER.rstrip())
@@ -401,14 +351,14 @@ class PrinterConfig:
# Read in and validate current config file
cfgname = self.printer.get_start_args()['config_file']
try:
- data = self._read_config_file(cfgname)
+ data = cfgrdr.read_config_file(cfgname)
regular_data, old_autosave_data = self._find_autosave_data(data)
- config = self._build_config_wrapper(regular_data, cfgname)
+ regular_fileconfig = cfgrdr.build_fileconfig(regular_data, cfgname)
except error as e:
msg = "Unable to parse existing config on SAVE_CONFIG"
logging.exception(msg)
raise gcode.error(msg)
- regular_data = self._strip_duplicates(regular_data, self.autosave)
+ regular_data = self._strip_duplicates(regular_data, self.fileconfig)
self._disallow_include_conflicts(regular_data, cfgname, gcode)
data = regular_data.rstrip() + autosave_data
# Determine filenames
@@ -433,3 +383,96 @@ class PrinterConfig:
raise gcode.error(msg)
# Request a restart
gcode.request_restart('restart')
+
+
+######################################################################
+# Main printer config tracking
+######################################################################
+
+class PrinterConfig:
+ def __init__(self, printer):
+ self.printer = printer
+ self.autosave = ConfigAutoSave(printer)
+ self.deprecated = {}
+ self.runtime_warnings = []
+ self.deprecate_warnings = []
+ self.status_raw_config = {}
+ self.status_settings = {}
+ self.status_warnings = []
+ def get_printer(self):
+ return self.printer
+ def read_config(self, filename):
+ cfgrdr = ConfigFileReader()
+ data = cfgrdr.read_config_file(filename)
+ fileconfig = cfgrdr.build_fileconfig(data, filename)
+ return ConfigWrapper(self.printer, fileconfig, {}, 'printer')
+ def read_main_config(self):
+ fileconfig = self.autosave.load_main_config()
+ return ConfigWrapper(self.printer, fileconfig, {}, 'printer')
+ def check_unused_options(self, config):
+ fileconfig = config.fileconfig
+ objects = dict(self.printer.lookup_objects())
+ # Determine all the fields that have been accessed
+ access_tracking = dict(config.access_tracking)
+ for section in self.autosave.fileconfig.sections():
+ for option in self.autosave.fileconfig.options(section):
+ access_tracking[(section.lower(), option.lower())] = 1
+ # Validate that there are no undefined parameters in the config file
+ valid_sections = { s: 1 for s, o in access_tracking }
+ for section_name in fileconfig.sections():
+ section = section_name.lower()
+ if section not in valid_sections and section not in objects:
+ raise error("Section '%s' is not a valid config section"
+ % (section,))
+ for option in fileconfig.options(section_name):
+ option = option.lower()
+ if (section, option) not in access_tracking:
+ raise error("Option '%s' is not valid in section '%s'"
+ % (option, section))
+ # Setup get_status()
+ self._build_status(config)
+ def log_config(self, config):
+ cfgrdr = ConfigFileReader()
+ lines = ["===== Config file =====",
+ cfgrdr.build_config_string(config.fileconfig),
+ "======================="]
+ self.printer.set_rollover_info("config", "\n".join(lines))
+ # Status reporting
+ def runtime_warning(self, msg):
+ logging.warning(msg)
+ res = {'type': 'runtime_warning', 'message': msg}
+ self.runtime_warnings.append(res)
+ self.status_warnings = self.runtime_warnings + self.deprecate_warnings
+ def deprecate(self, section, option, value=None, msg=None):
+ self.deprecated[(section, option, value)] = msg
+ def _build_status(self, config):
+ self.status_raw_config.clear()
+ for section in config.get_prefix_sections(''):
+ self.status_raw_config[section.get_name()] = section_status = {}
+ for option in section.get_prefix_options(''):
+ section_status[option] = section.get(option, note_valid=False)
+ self.status_settings = {}
+ for (section, option), value in config.access_tracking.items():
+ self.status_settings.setdefault(section, {})[option] = value
+ self.deprecate_warnings = []
+ for (section, option, value), msg in self.deprecated.items():
+ if value is None:
+ res = {'type': 'deprecated_option'}
+ else:
+ res = {'type': 'deprecated_value', 'value': value}
+ res['message'] = msg
+ res['section'] = section
+ res['option'] = option
+ self.deprecate_warnings.append(res)
+ self.status_warnings = self.runtime_warnings + self.deprecate_warnings
+ def get_status(self, eventtime):
+ status = {'config': self.status_raw_config,
+ 'settings': self.status_settings,
+ 'warnings': self.status_warnings}
+ status.update(self.autosave.get_status(eventtime))
+ return status
+ # Autosave functions
+ def set(self, section, option, value):
+ self.autosave.set(section, option, value)
+ def remove_section(self, section):
+ self.autosave.remove_section(section)