diff options
Diffstat (limited to 'scripts/spi_flash/spi_flash.py')
-rw-r--r-- | scripts/spi_flash/spi_flash.py | 917 |
1 files changed, 515 insertions, 402 deletions
diff --git a/scripts/spi_flash/spi_flash.py b/scripts/spi_flash/spi_flash.py index 729dd2bb..f784f0a4 100644 --- a/scripts/spi_flash/spi_flash.py +++ b/scripts/spi_flash/spi_flash.py @@ -29,14 +29,17 @@ import mcu # ########################################################### + def output_line(msg): sys.stdout.write("%s\n" % (msg,)) sys.stdout.flush() + def output(msg): sys.stdout.write("%s" % (msg,)) sys.stdout.flush() + def calc_crc7(data, with_padding=True): # G(x) = x^7 + x^3 + 1 # Shift left as we are only calculating a 7 bit CRC @@ -52,6 +55,7 @@ def calc_crc7(data, with_padding=True): return crc return crc | 1 + def calc_crc16(data): poly = 0b10001000000100001 crc = 0 @@ -61,32 +65,34 @@ def calc_crc16(data): crc = (crc << 1) ^ poly if crc & 0x8000 else crc << 1 return crc & 0xFFFF + # Translate a serial device name to a stable serial name in # /dev/serial/by-path/ # Borrowed from klipper/scripts/flash_usb.py def translate_serial_to_tty(device): ttyname = os.path.realpath(device) - if os.path.exists('/dev/serial/by-path/'): - for fname in os.listdir('/dev/serial/by-path/'): - fname = '/dev/serial/by-path/' + fname + if os.path.exists("/dev/serial/by-path/"): + for fname in os.listdir("/dev/serial/by-path/"): + fname = "/dev/serial/by-path/" + fname if os.path.realpath(fname) == ttyname: return ttyname, fname return ttyname, ttyname + def check_need_convert(board_name, config): conv_script = config.get("conversion_script") if conv_script is None: return conv_util = os.path.join(fatfs_lib.KLIPPER_DIR, conv_script) - klipper_bin = config['klipper_bin_path'] + klipper_bin = config["klipper_bin_path"] dest_bin = os.path.join( - os.path.dirname(klipper_bin), - os.path.basename(config['firmware_path'])) + os.path.dirname(klipper_bin), os.path.basename(config["firmware_path"]) + ) cmd = "%s %s %s %s" % (sys.executable, conv_util, klipper_bin, dest_bin) output("Converting Klipper binary to custom format...") os.system(cmd) output_line("Done") - config['klipper_bin_path'] = dest_bin + config["klipper_bin_path"] = dest_bin ########################################################### @@ -102,14 +108,14 @@ SD_SPI_SPEED = 400000 # MCU Command Constants RESET_CMD = "reset" GET_CFG_CMD = "get_config" -GET_CFG_RESPONSES = ( # Supported responses (sorted by newer revisions first). - "config is_config=%c crc=%u is_shutdown=%c move_count=%hu", # d4aee4f - "config is_config=%c crc=%u move_count=%hu is_shutdown=%c" # Original +GET_CFG_RESPONSES = ( # Supported responses (sorted by newer revisions first). + "config is_config=%c crc=%u is_shutdown=%c move_count=%hu", # d4aee4f + "config is_config=%c crc=%u move_count=%hu is_shutdown=%c", # Original ) ALLOC_OIDS_CMD = "allocate_oids count=%d" SPI_CFG_CMDS = ( - "config_spi oid=%d pin=%s cs_active_high=%d", # 7793784 - "config_spi oid=%d pin=%s" # Original + "config_spi oid=%d pin=%s cs_active_high=%d", # 7793784 + "config_spi oid=%d pin=%s", # Original ) SPI_BUS_CMD = "spi_set_bus oid=%d spi_bus=%s mode=%d rate=%d" SW_SPI_BUS_CMDS = ( @@ -124,31 +130,34 @@ SPI_XFER_RESPONSE = "spi_transfer_response oid=%c response=%*s" SDIO_CFG_CMD = "config_sdio oid=%d blocksize=%u" SDIO_BUS_CMD = "sdio_set_bus oid=%d sdio_bus=%s" SDIO_SEND_CMD = "sdio_send_command oid=%c cmd=%c argument=%u wait=%c" -SDIO_SEND_CMD_RESPONSE = "sdio_send_command_response oid=%c error=%c " \ - "response=%*s" -SDIO_READ_DATA="sdio_read_data oid=%c cmd=%c argument=%u" -SDIO_READ_DATA_RESPONSE="sdio_read_data_response oid=%c error=%c read=%u" -SDIO_WRITE_DATA="sdio_write_data oid=%c cmd=%c argument=%u" -SDIO_WRITE_DATA_RESPONSE="sdio_write_data_response oid=%c error=%c write=%u" -SDIO_READ_DATA_BUFFER="sdio_read_data_buffer oid=%c offset=%u len=%c" -SDIO_READ_DATA_BUFFER_RESPONSE="sdio_read_data_buffer_response oid=%c data=%*s" -SDIO_WRITE_DATA_BUFFER="sdio_write_data_buffer oid=%c offset=%u data=%*s" -SDIO_SET_SPEED="sdio_set_speed oid=%c speed=%u" +SDIO_SEND_CMD_RESPONSE = "sdio_send_command_response oid=%c error=%c " "response=%*s" +SDIO_READ_DATA = "sdio_read_data oid=%c cmd=%c argument=%u" +SDIO_READ_DATA_RESPONSE = "sdio_read_data_response oid=%c error=%c read=%u" +SDIO_WRITE_DATA = "sdio_write_data oid=%c cmd=%c argument=%u" +SDIO_WRITE_DATA_RESPONSE = "sdio_write_data_response oid=%c error=%c write=%u" +SDIO_READ_DATA_BUFFER = "sdio_read_data_buffer oid=%c offset=%u len=%c" +SDIO_READ_DATA_BUFFER_RESPONSE = "sdio_read_data_buffer_response oid=%c data=%*s" +SDIO_WRITE_DATA_BUFFER = "sdio_write_data_buffer oid=%c offset=%u data=%*s" +SDIO_SET_SPEED = "sdio_set_speed oid=%c speed=%u" FINALIZE_CFG_CMD = "finalize_config crc=%d" + class SPIFlashError(Exception): pass + class MCUConfigError(SPIFlashError): pass + class SPIDirect: def __init__(self, ser): self.oid = SPI_OID self._spi_send_cmd = mcu.CommandWrapper(ser, SPI_SEND_CMD) self._spi_transfer_cmd = mcu.CommandQueryWrapper( - ser, SPI_XFER_CMD, SPI_XFER_RESPONSE, self.oid) + ser, SPI_XFER_CMD, SPI_XFER_RESPONSE, self.oid + ) def spi_send(self, data): self._spi_send_cmd.send([self.oid, data]) @@ -156,20 +165,23 @@ class SPIDirect: def spi_transfer(self, data): return self._spi_transfer_cmd.send([self.oid, data]) + class SDIODirect: def __init__(self, ser): self.oid = SDIO_OID self._sdio_send_cmd = mcu.CommandQueryWrapper( - ser, SDIO_SEND_CMD, SDIO_SEND_CMD_RESPONSE, self.oid) + ser, SDIO_SEND_CMD, SDIO_SEND_CMD_RESPONSE, self.oid + ) self._sdio_read_data = mcu.CommandQueryWrapper( - ser, SDIO_READ_DATA, SDIO_READ_DATA_RESPONSE, self.oid) + ser, SDIO_READ_DATA, SDIO_READ_DATA_RESPONSE, self.oid + ) self._sdio_write_data = mcu.CommandQueryWrapper( - ser, SDIO_WRITE_DATA, SDIO_WRITE_DATA_RESPONSE, self.oid) + ser, SDIO_WRITE_DATA, SDIO_WRITE_DATA_RESPONSE, self.oid + ) self._sdio_read_data_buffer = mcu.CommandQueryWrapper( - ser, SDIO_READ_DATA_BUFFER, SDIO_READ_DATA_BUFFER_RESPONSE, - self.oid) - self._sdio_write_data_buffer = mcu.CommandWrapper(ser, - SDIO_WRITE_DATA_BUFFER) + ser, SDIO_READ_DATA_BUFFER, SDIO_READ_DATA_BUFFER_RESPONSE, self.oid + ) + self._sdio_write_data_buffer = mcu.CommandWrapper(ser, SDIO_WRITE_DATA_BUFFER) self._sdio_set_speed = mcu.CommandWrapper(ser, SDIO_SET_SPEED) def sdio_send_cmd(self, cmd, argument, wait): @@ -192,14 +204,28 @@ class SDIODirect: # FatFs Constants. Enums are implemented as lists. The item's index is its value -DRESULT = ['RES_OK', 'RES_ERROR', 'RES_WRPRT', 'RES_NOTRDY', 'RES_PARERR'] +DRESULT = ["RES_OK", "RES_ERROR", "RES_WRPRT", "RES_NOTRDY", "RES_PARERR"] FRESULT = [ - 'FR_OK', 'FR_DISK_ERR', 'FR_INT_ERR', 'FR_NOT_READY', 'FR_NO_FILE', - 'FR_NO_PATH', 'FR_INVALID_NAME', 'FR_DENIED', 'FR_EXIST', - 'FR_INVALID_OBJECT', 'FR_WRITE_PROTECTED', 'FR_INVALID_DRIVE', - 'FR_NOT_ENABLED', 'FR_NO_FILESYSTEM', 'FR_MKFS_ABORTED', 'FR_TIMEOUT', - 'FR_LOCKED', 'FR_NOT_ENOUGH_CORE', 'FR_TOO_MANY_OPEN_FILES', - 'FR_INVALID_PARAMETER' + "FR_OK", + "FR_DISK_ERR", + "FR_INT_ERR", + "FR_NOT_READY", + "FR_NO_FILE", + "FR_NO_PATH", + "FR_INVALID_NAME", + "FR_DENIED", + "FR_EXIST", + "FR_INVALID_OBJECT", + "FR_WRITE_PROTECTED", + "FR_INVALID_DRIVE", + "FR_NOT_ENABLED", + "FR_NO_FILESYSTEM", + "FR_MKFS_ABORTED", + "FR_TIMEOUT", + "FR_LOCKED", + "FR_NOT_ENOUGH_CORE", + "FR_TOO_MANY_OPEN_FILES", + "FR_INVALID_PARAMETER", ] FS_TYPES = {1: "FAT12", 2: "FAT16", 3: "FAT32", 4: "EXFAT"} STA_NO_INIT = 1 << 0 @@ -207,6 +233,7 @@ STA_NO_DISK = 1 << 1 STA_WRITE_PROTECT = 1 << 2 SECTOR_SIZE = 512 + # FAT16/32 File System Support class FatFS: def __init__(self, ser, spi=True): @@ -220,25 +247,23 @@ class FatFS: self._register_callbacks() def _register_callbacks(self): - status_cb = self.ffi_main.callback( - "uint8_t(void)", self._fatfs_cb_status) - init_cb = self.ffi_main.callback( - "uint8_t(void)", self._fatfs_cb_initialize) + status_cb = self.ffi_main.callback("uint8_t(void)", self._fatfs_cb_status) + init_cb = self.ffi_main.callback("uint8_t(void)", self._fatfs_cb_initialize) read_cb = self.ffi_main.callback( - "uint8_t(uint8_t*, uint32_t, unsigned int)", - self._fatfs_cb_disk_read) + "uint8_t(uint8_t*, uint32_t, unsigned int)", self._fatfs_cb_disk_read + ) write_cb = self.ffi_main.callback( - "uint8_t(const uint8_t*, uint32_t, unsigned int)", - self._fatfs_cb_disk_write) + "uint8_t(const uint8_t*, uint32_t, unsigned int)", self._fatfs_cb_disk_write + ) ioctl_cb = self.ffi_main.callback( - "uint8_t(uint8_t, void*)", self._fatfs_cb_disk_ioctl) - ftime_cb = self.ffi_main.callback( - "uint32_t(void)", self._fatfs_cb_get_fattime) + "uint8_t(uint8_t, void*)", self._fatfs_cb_disk_ioctl + ) + ftime_cb = self.ffi_main.callback("uint32_t(void)", self._fatfs_cb_get_fattime) # Keep a reference to the callbacks so they don't get gc'ed - self.ffi_callbacks = [status_cb, init_cb, read_cb, write_cb, - ioctl_cb, ftime_cb] + self.ffi_callbacks = [status_cb, init_cb, read_cb, write_cb, ioctl_cb, ftime_cb] self.ffi_lib.fatfs_set_callbacks( - status_cb, init_cb, read_cb, write_cb, ioctl_cb, ftime_cb) + status_cb, init_cb, read_cb, write_cb, ioctl_cb, ftime_cb + ) def clear_callbacks(self): self.ffi_lib.fatfs_clear_callbacks() @@ -306,18 +331,27 @@ class FatFS: # this module to take any actions for incoming IOCTL # commands. ioctl_cmds = [ - 'CTRL_SYNC', 'GET_SECTOR_COUNT', 'GET_SECTOR_SIZE', - 'GET_BLOCK_SIZE', 'CTRL_TRIM'] - logging.debug("flash_sdcard: Received IOCTL command %s" - % (ioctl_cmds[cmd])) + "CTRL_SYNC", + "GET_SECTOR_COUNT", + "GET_SECTOR_SIZE", + "GET_BLOCK_SIZE", + "CTRL_TRIM", + ] + logging.debug("flash_sdcard: Received IOCTL command %s" % (ioctl_cmds[cmd])) return 0 def _fatfs_cb_get_fattime(self): tobj = time.localtime() year = tobj[0] - 1980 sec = min(tobj[5], 59) // 2 - return (year << 25) | (tobj[1] << 21) | (tobj[2] << 16) \ - | (tobj[3] << 11) | (tobj[4] << 5) | sec + return ( + (year << 25) + | (tobj[1] << 21) + | (tobj[2] << 16) + | (tobj[3] << 11) + | (tobj[4] << 5) + | sec + ) def mount(self, print_func=logging.info): ret = self.ffi_lib.fatfs_mount() @@ -327,8 +361,9 @@ class FatFS: for key, val in sorted(dinfo.items(), key=lambda x: x[0]): print_func("%s: %s" % (key, val)) else: - raise OSError("flash_sdcard: failed to mount SD Card, returned %s" - % (FRESULT[ret])) + raise OSError( + "flash_sdcard: failed to mount SD Card, returned %s" % (FRESULT[ret]) + ) def unmount(self): self.ffi_lib.fatfs_unmount() @@ -344,9 +379,10 @@ class FatFS: # Can be path to directory or file ret = self.ffi_lib.fatfs_remove(sd_path.encode()) if ret != 0: - raise OSError("flash_sdcard: Error deleting item at path '%s'," - " result: %s" - % (sd_path, FRESULT[ret])) + raise OSError( + "flash_sdcard: Error deleting item at path '%s'," + " result: %s" % (sd_path, FRESULT[ret]) + ) def get_file_info(self, sd_file_path): finfo = self.ffi_main.new("struct ff_file_info *") @@ -355,16 +391,25 @@ class FatFS: raise OSError( "flash_sdcard: Failed to retreive file info for path '%s'," " result: %s" - % (sd_file_path, FRESULT[ret],)) + % ( + sd_file_path, + FRESULT[ret], + ) + ) return self._parse_ff_info(finfo) def list_sd_directory(self, sd_dir_path): flist = self.ffi_main.new("struct ff_file_info[128]") ret = self.ffi_lib.fatfs_list_dir(flist, 128, sd_dir_path.encode()) if ret != 0: - raise OSError("flash_sdcard: Failed to retreive file list at path" - " '%s', result: %s" - % (sd_dir_path, FRESULT[ret],)) + raise OSError( + "flash_sdcard: Failed to retreive file list at path" + " '%s', result: %s" + % ( + sd_dir_path, + FRESULT[ret], + ) + ) convlist = [] for f in flist: if f.size == 0 and f.modified_date == 0 and f.modified_time == 0: @@ -377,37 +422,49 @@ class FatFS: fdate = finfo.modified_date ftime = finfo.modified_time dstr = "%d-%d-%d %d:%d:%d" % ( - (fdate >> 5) & 0xF, fdate & 0x1F, ((fdate >> 9) & 0x7F) + 1980, - (ftime >> 11) & 0x1F, (ftime >> 5) & 0x3F, ftime & 0x1F) + (fdate >> 5) & 0xF, + fdate & 0x1F, + ((fdate >> 9) & 0x7F) + 1980, + (ftime >> 11) & 0x1F, + (ftime >> 5) & 0x3F, + ftime & 0x1F, + ) return { - 'name': self.ffi_main.string(finfo.name, 256), - 'size': finfo.size, - 'modified': dstr, - 'is_dir': bool(finfo.attrs & 0x10), - 'is_read_only': bool(finfo.attrs & 0x01), - 'is_hidden': bool(finfo.attrs & 0x02), - 'is_system': bool(finfo.attrs & 0x04) + "name": self.ffi_main.string(finfo.name, 256), + "size": finfo.size, + "modified": dstr, + "is_dir": bool(finfo.attrs & 0x10), + "is_read_only": bool(finfo.attrs & 0x01), + "is_hidden": bool(finfo.attrs & 0x02), + "is_system": bool(finfo.attrs & 0x04), } def get_disk_info(self): disk_info = self.ffi_main.new("struct ff_disk_info *") ret = self.ffi_lib.fatfs_get_disk_info(disk_info) if ret != 0: - logging.info("flash_sdcard: Failed to retreive disk info: %s" - % (FRESULT[ret],)) + logging.info( + "flash_sdcard: Failed to retreive disk info: %s" % (FRESULT[ret],) + ) return {} return { - 'volume_label': self.ffi_main.string(disk_info.label, 12), - 'volume_serial': disk_info.serial_number, - 'fs_type': FS_TYPES.get(disk_info.fs_type, "UNKNOWN") + "volume_label": self.ffi_main.string(disk_info.label, 12), + "volume_serial": disk_info.serial_number, + "fs_type": FS_TYPES.get(disk_info.fs_type, "UNKNOWN"), } SD_FILE_MODES = { - 'w+x': 0x01 | 0x02 | 0x04, 'wx': 0x02 | 0x04, - 'r+': 0x01 | 0x02, 'w+': 0x01 | 0x02 | 0x08, - 'a+': 0x01 | 0x02 | 0x30, 'r': 0x01, - 'w': 0x02 | 0x08, 'a': 0x02 | 0x30} + "w+x": 0x01 | 0x02 | 0x04, + "wx": 0x02 | 0x04, + "r+": 0x01 | 0x02, + "w+": 0x01 | 0x02 | 0x08, + "a+": 0x01 | 0x02 | 0x30, + "r": 0x01, + "w": 0x02 | 0x08, + "a": 0x02 | 0x30, +} + class SDCardFile: def __init__(self, ffi_main, ffi_lib, sd_path, mode="r"): @@ -415,7 +472,7 @@ class SDCardFile: self.ffi_lib = ffi_lib self.path = sd_path mode = mode.lower() - if mode[-1] == 'b': + if mode[-1] == "b": mode = mode[:-1] self.mode = SD_FILE_MODES.get(mode, 0) self.fhdl = None @@ -429,8 +486,7 @@ class SDCardFile: self.eof = False if self.fhdl == self.ffi_main.NULL: self.fhdl = None - raise OSError("flash_sdcard: Could not open file '%s':" - % (self.path)) + raise OSError("flash_sdcard: Could not open file '%s':" % (self.path)) def read(self, length=None): if self.fhdl is None: @@ -448,9 +504,8 @@ class SDCardFile: while True: bytes_read = self.ffi_lib.fatfs_read(self.fhdl, cdata_buf, length) if bytes_read < 0: - raise OSError("flash_sdcard: Error Reading file '%s'" - % (self.path)) - self.eof = (bytes_read < length) + raise OSError("flash_sdcard: Error Reading file '%s'" % (self.path)) + self.eof = bytes_read < length ret_buf += byte_buf[0:bytes_read] if self.eof or not full_read: break @@ -473,9 +528,10 @@ class SDCardFile: ret = self.ffi_lib.fatfs_close(self.fhdl) self.fhdl = None if ret != 0: - logging.info("flash_sdcard: Error closing sd file '%s', " - "returned %d" - % (self.path, FRESULT[ret])) + logging.info( + "flash_sdcard: Error closing sd file '%s', " + "returned %d" % (self.path, FRESULT[ret]) + ) def __enter__(self): self.open() @@ -486,24 +542,25 @@ class SDCardFile: SD_COMMANDS = { - 'GO_IDLE_STATE': 0, - 'ALL_SEND_CID': 2, - 'SET_REL_ADDR': 3, - 'SET_BUS_WIDTH': 6, - 'SEL_DESEL_CARD': 7, - 'SEND_IF_COND': 8, - 'SEND_CSD': 9, - 'SEND_CID': 10, - 'SD_SEND_OP_COND': 41, - 'SEND_STATUS': 13, - 'SET_BLOCKLEN': 16, - 'READ_SINGLE_BLOCK': 17, - 'WRITE_BLOCK': 24, - 'APP_CMD': 55, - 'READ_OCR': 58, - 'CRC_ON_OFF': 59, + "GO_IDLE_STATE": 0, + "ALL_SEND_CID": 2, + "SET_REL_ADDR": 3, + "SET_BUS_WIDTH": 6, + "SEL_DESEL_CARD": 7, + "SEND_IF_COND": 8, + "SEND_CSD": 9, + "SEND_CID": 10, + "SD_SEND_OP_COND": 41, + "SEND_STATUS": 13, + "SET_BLOCKLEN": 16, + "READ_SINGLE_BLOCK": 17, + "WRITE_BLOCK": 24, + "APP_CMD": 55, + "READ_OCR": 58, + "CRC_ON_OFF": 59, } + class SDCardSPI: def __init__(self, ser): self.spi = SPIDirect(ser) @@ -522,18 +579,20 @@ class SDCardSPI: if self.initialized: return # Send reset command (CMD0) - if not self._check_command(1, 'GO_IDLE_STATE', 0): + if not self._check_command(1, "GO_IDLE_STATE", 0): raise OSError( "flash_sdcard: failed to reset SD Card\n" "Note that older (Version 1.0) SD cards can not be\n" "hot swapped. Execute FIRMWARE_RESTART with the card\n" - "inserted for successful initialization.") + "inserted for successful initialization." + ) # Check Voltage Range (CMD8). Only Cards meeting the v2.0 spec # support this. V1.0 cards (and MMC) will return illegal command. check_pattern = 0b1010 resp = self._send_command_with_response( - 'SEND_IF_COND', (1 << 8) | check_pattern) - resp = resp.strip(b'\xFF') + "SEND_IF_COND", (1 << 8) | check_pattern + ) + resp = resp.strip(b"\xff") if resp and resp[0] & (1 << 2): # CMD8 is illegal, this is a version 1.0 card self.sd_version = 1 @@ -541,54 +600,57 @@ class SDCardSPI: if resp[0] == 1: self.sd_version = 2 if not (resp[-2] == 1 and resp[-1] == check_pattern): - raise OSError("flash_sdcard: SD Card not running in a " - "compatible voltage range") + raise OSError( + "flash_sdcard: SD Card not running in a " + "compatible voltage range" + ) else: raise OSError("flash_sdcard: CMD8 Error 0x%X" % (resp[0],)) else: - raise OSError("flash_sdcard: Invalid CMD8 response: %s" - % (repr(resp))) + raise OSError("flash_sdcard: Invalid CMD8 response: %s" % (repr(resp))) if self.enable_crc: # Enable SD crc checks (CMD59) - if not self._check_command(1, 'CRC_ON_OFF', 1): + if not self._check_command(1, "CRC_ON_OFF", 1): logging.info("flash_sdcard: failed to enable CRC checks") if self.sd_version == 2: # Init card and come out of idle (ACMD41) # Version 2 Cards may init before checking the OCR - if not self._check_command(0, 'SD_SEND_OP_COND', 1 << 30, - is_app_cmd=True): - raise OSError("flash_sdcard: SD Card did not come" - " out of IDLE after reset") + if not self._check_command( + 0, "SD_SEND_OP_COND", 1 << 30, is_app_cmd=True + ): + raise OSError( + "flash_sdcard: SD Card did not come" " out of IDLE after reset" + ) # Read OCR Register (CMD58) - resp = self._send_command_with_response('READ_OCR', 0) - resp = resp.strip(b'\xFF') + resp = self._send_command_with_response("READ_OCR", 0) + resp = resp.strip(b"\xff") # If 'READ_OCR' is illegal then this is likely MMC. # At this time MMC is not supported if len(resp) == 5: if self.sd_version == 1 and resp[0] == 1: # Check acceptable voltage range for V1 cards if resp[2] != 0xFF: - raise OSError("flash_sdcard: card does not support" - " 3.3v range") + raise OSError( + "flash_sdcard: card does not support" " 3.3v range" + ) elif self.sd_version == 2 and resp[0] == 0: # Determine if this is a high capacity sdcard if resp[1] & 0x40: self.high_capacity = True else: - raise OSError("flash_sdcard: READ_OCR Error 0x%X" - % (resp[0],)) + raise OSError("flash_sdcard: READ_OCR Error 0x%X" % (resp[0],)) else: raise OSError("flash_sdcard: Invalid OCR Response") if self.sd_version == 1: # Init card and come out of idle (ACMD41) # Version 1 Cards do this after checking the OCR - if not self._check_command(0, 'SD_SEND_OP_COND', 0, - is_app_cmd=True): - raise OSError("flash_sdcard: SD Card did not come" - " out of IDLE after reset") + if not self._check_command(0, "SD_SEND_OP_COND", 0, is_app_cmd=True): + raise OSError( + "flash_sdcard: SD Card did not come" " out of IDLE after reset" + ) # Set block size to 512 (CMD16) - if self._check_command(0, 'SET_BLOCKLEN', SECTOR_SIZE, tries=5): + if self._check_command(0, "SET_BLOCKLEN", SECTOR_SIZE, tries=5): self.initialized = True else: raise OSError("flash_sdcard: failed to set block size") @@ -601,10 +663,10 @@ class SDCardSPI: if self.initialized: # Reset the SD Card try: - if not self._check_command(1, 'GO_IDLE_STATE', 0): + if not self._check_command(1, "GO_IDLE_STATE", 0): logging.info("flash_sdcard: failed to reset SD Card") # Disable CRC Checks - if not self._check_command(1, 'CRC_ON_OFF', 0): + if not self._check_command(1, "CRC_ON_OFF", 0): logging.info("flash_sdcard: failed to disable CRC") except Exception: logging.exception("Error resetting SD Card") @@ -615,26 +677,29 @@ class SDCardSPI: self.card_info.clear() def _check_command(self, expected, cmd, args, is_app_cmd=False, tries=15): - func = self._send_app_cmd_with_response if is_app_cmd else \ - self._send_command_with_response + func = ( + self._send_app_cmd_with_response + if is_app_cmd + else self._send_command_with_response + ) while True: resp, rt = func(cmd, args, get_rt=True) # logging.info("flash_sdcard: Check cmd %s, response: %s" # % (cmd, repr(resp))) - resp = resp.strip(b'\xFF') + resp = resp.strip(b"\xff") if resp and expected == resp[0]: return True tries -= 1 if tries < 1: return False - self.reactor.pause(rt + .1) + self.reactor.pause(rt + 0.1) def _send_command(self, cmd, args): command = SD_COMMANDS[cmd] | 0x40 request = [command] if isinstance(args, int): for i in range(3, -1, -1): - request.append((args >> (8*i)) & 0xFF) + request.append((args >> (8 * i)) & 0xFF) elif isinstance(args, list) and len(args) == 4: request += args else: @@ -645,22 +710,22 @@ class SDCardSPI: def _send_command_with_response(self, cmd, args, get_rt=False): self._send_command(cmd, args) - params = self.spi.spi_transfer([0xFF]*8) + params = self.spi.spi_transfer([0xFF] * 8) if get_rt: - return bytearray(params['response']), params['#receive_time'] + return bytearray(params["response"]), params["#receive_time"] else: - return bytearray(params['response']) + return bytearray(params["response"]) def _send_app_cmd_with_response(self, cmd, args, get_rt=False): # CMD55 tells the SD Card that the next command is an # Application Specific Command. - self._send_command_with_response('APP_CMD', 0) + self._send_command_with_response("APP_CMD", 0) return self._send_command_with_response(cmd, args, get_rt) def _find_sd_token(self, token, tries=10): while tries: params = self.spi.spi_transfer([0xFF]) - resp = bytearray(params['response']) + resp = bytearray(params["response"]) if resp[0] == token: return True tries -= 1 @@ -669,36 +734,35 @@ class SDCardSPI: def _find_sd_response(self, tries=10): while tries: params = self.spi.spi_transfer([0xFF]) - resp = bytearray(params['response']) + resp = bytearray(params["response"]) if resp[0] != 0xFF: return resp[0] tries -= 1 return 0xFF def _process_cid_reg(self): - self._send_command('SEND_CID', 0) + self._send_command("SEND_CID", 0) reg = self._do_block_read(size=16) if reg is None: raise OSError("flash_sdcard: Error reading CID register") cid = collections.OrderedDict() - cid['manufacturer_id'] = reg[0] - cid['oem_id'] = reg[1:3].decode(encoding='ascii', errors='ignore') - cid['product_name'] = reg[3:8].decode( - encoding='ascii', errors='ignore') - cid['product_revision'] = str(reg[8] >> 4 & 0xFF) + "." \ - + str(reg[8] & 0xFF) - cid['serial_number'] = "".join(["%02X" % (c,) for c in reg[9:13]]) + cid["manufacturer_id"] = reg[0] + cid["oem_id"] = reg[1:3].decode(encoding="ascii", errors="ignore") + cid["product_name"] = reg[3:8].decode(encoding="ascii", errors="ignore") + cid["product_revision"] = str(reg[8] >> 4 & 0xFF) + "." + str(reg[8] & 0xFF) + cid["serial_number"] = "".join(["%02X" % (c,) for c in reg[9:13]]) mfg_year = (((reg[13] & 0xF) << 4) | ((reg[14] >> 4) & 0xF)) + 2000 mfg_month = reg[14] & 0xF - cid['manufacturing_date'] = "%d/%d" % (mfg_month, mfg_year) + cid["manufacturing_date"] = "%d/%d" % (mfg_month, mfg_year) crc = calc_crc7(reg[:15]) if crc != reg[15]: - raise OSError("flash_sdcard: CID crc mismatch: 0x%02X, recd: 0x%02X" - % (crc, reg[15])) + raise OSError( + "flash_sdcard: CID crc mismatch: 0x%02X, recd: 0x%02X" % (crc, reg[15]) + ) self.card_info.update(cid) def _process_csd_reg(self): - self._send_command('SEND_CSD', 0) + self._send_command("SEND_CSD", 0) reg = self._do_block_read(size=16) if reg is None: raise OSError("flash_sdcard: Error reading CSD register") @@ -707,10 +771,9 @@ class SDCardSPI: csd_type = (reg[0] >> 6) & 0x3 if csd_type == 0: # Standard Capacity (CSD Version 1.0) - max_block_len = 2**(reg[5] & 0xF) - c_size = ((reg[6] & 0x3) << 10) | (reg[7] << 2) | \ - ((reg[8] >> 6) & 0x3) - c_mult = 2**((((reg[9] & 0x3) << 1) | (reg[10] >> 7)) + 2) + max_block_len = 2 ** (reg[5] & 0xF) + c_size = ((reg[6] & 0x3) << 10) | (reg[7] << 2) | ((reg[8] >> 6) & 0x3) + c_mult = 2 ** ((((reg[9] & 0x3) << 1) | (reg[10] >> 7)) + 2) max_capacity = (c_size + 1) * c_mult * max_block_len str_capacity = "%.1f MiB" % (max_capacity / (1024.0**2)) elif csd_type == 1: @@ -723,9 +786,10 @@ class SDCardSPI: self.write_protected = (reg[14] & 0x30) != 0 crc = calc_crc7(reg[:15]) if crc != reg[15]: - raise OSError("flash_sdcard: CSD crc mismatch: 0x%02X, recd: 0x%02X" - % (crc, reg[15])) - self.card_info['capacity'] = str_capacity + raise OSError( + "flash_sdcard: CSD crc mismatch: 0x%02X, recd: 0x%02X" % (crc, reg[15]) + ) + self.card_info["capacity"] = str_capacity self.total_sectors = max_capacity // SECTOR_SIZE def print_card_info(self, print_func=logging.info): @@ -749,7 +813,7 @@ class SDCardSPI: offset = sector if not self.high_capacity: offset = sector * SECTOR_SIZE - self._send_command('READ_SINGLE_BLOCK', offset) + self._send_command("READ_SINGLE_BLOCK", offset) buf = self._do_block_read() if buf is None: raise OSError(err_msg) @@ -759,12 +823,12 @@ class SDCardSPI: valid_response = True sd_resp = self._find_sd_response() if sd_resp != 0: - logging.info("flash_sdcard: invalid read block response: 0x%02X" - % (sd_resp)) + logging.info( + "flash_sdcard: invalid read block response: 0x%02X" % (sd_resp) + ) valid_response = False if not self._find_sd_token(0xFE): - logging.info("flash_sdcard: read error, unable to find " - "start token") + logging.info("flash_sdcard: read error, unable to find " "start token") valid_response = False if not valid_response: # In the event of an invalid response we will still @@ -773,26 +837,27 @@ class SDCardSPI: bcount = size + 2 while bcount: sent = min(32, bcount) - self.spi.spi_send([0xFF]*sent) + self.spi.spi_send([0xFF] * sent) bcount -= sent self._find_sd_token(0xFF) return None buf = bytearray() while len(buf) < size: count = min(32, size - len(buf)) - params = self.spi.spi_transfer([0xFF]*count) - buf += bytearray(params['response']) + params = self.spi.spi_transfer([0xFF] * count) + buf += bytearray(params["response"]) # Get the CRC params = self.spi.spi_transfer([0xFF, 0xFF]) # Make sure we leave the busy state self._find_sd_token(0xFF) - crc = bytearray(params['response']) + crc = bytearray(params["response"]) crc_int = (crc[0] << 8) | crc[1] calculated_crc = calc_crc16(buf) if calculated_crc != crc_int: logging.info( "flash_sdcard: CRC Mismatch, Received: %04X, Calculated: %04X" - % (crc_int, calculated_crc)) + % (crc_int, calculated_crc) + ) return None return buf @@ -800,24 +865,23 @@ class SDCardSPI: with self.mutex: if not 0 <= sector < self.total_sectors: raise OSError( - "flash_sdcard: write error, sector number %d invalid" - % (sector)) + "flash_sdcard: write error, sector number %d invalid" % (sector) + ) if not self.initialized: - raise OSError("flash_sdcard: write error, SD Card not" - " initialized") + raise OSError("flash_sdcard: write error, SD Card not" " initialized") outbuf = bytearray(data) if len(outbuf) > SECTOR_SIZE: - raise OSError("sd_card: Cannot write sector larger" - " than %d bytes" - % (SECTOR_SIZE)) + raise OSError( + "sd_card: Cannot write sector larger" + " than %d bytes" % (SECTOR_SIZE) + ) elif len(outbuf) < SECTOR_SIZE: outbuf += bytearray([0] * (SECTOR_SIZE - len(outbuf))) offset = sector if not self.high_capacity: offset = sector * SECTOR_SIZE - if not self._check_command(0, 'WRITE_BLOCK', offset, tries=2): - raise OSError("flash_sdcard: Error writing to sector %d" - % (sector,)) + if not self._check_command(0, "WRITE_BLOCK", offset, tries=2): + raise OSError("flash_sdcard: Error writing to sector %d" % (sector,)) crc = calc_crc16(outbuf) outbuf.insert(0, 0xFE) outbuf.append((crc >> 8) & 0xFF) @@ -827,26 +891,30 @@ class SDCardSPI: outbuf = outbuf[32:] resp = self._find_sd_response() err_msgs = [] - if (resp & 0x1f) != 5: + if (resp & 0x1F) != 5: err_msgs.append("flash_sdcard: write error 0x%02X" % (resp,)) # wait until the card leaves the busy state if not self._find_sd_token(0xFF, tries=128): - err_msgs.append("flash_sdcard: could not leave busy" - " state after write") + err_msgs.append( + "flash_sdcard: could not leave busy" " state after write" + ) else: - status = self._send_command_with_response('SEND_STATUS', 0) - status = status.strip(b'\xFF') + status = self._send_command_with_response("SEND_STATUS", 0) + status = status.strip(b"\xff") if len(status) == 2: if status[1] != 0: err_msgs.append( - "flash_sdcard: write error 0x%02X" - % (status[1],)) + "flash_sdcard: write error 0x%02X" % (status[1],) + ) else: - err_msgs.append("flash_sdcard: Invalid status response" - " after write: %s" % (repr(status),)) + err_msgs.append( + "flash_sdcard: Invalid status response" + " after write: %s" % (repr(status),) + ) if err_msgs: raise OSError("\n".join(err_msgs)) + class SDCardSDIO: def __init__(self, ser): self.sdio = SDIODirect(ser) @@ -864,49 +932,61 @@ class SDCardSDIO: def init_sd(self): def check_for_ocr_errors(reg): # returns False if an error flag is set - return ((reg[0]&0xFD) | (reg[1]&0xFF) | - (reg[2]&0xE0) | (reg[3]&0x08)) == 0 + return ( + (reg[0] & 0xFD) | (reg[1] & 0xFF) | (reg[2] & 0xE0) | (reg[3] & 0x08) + ) == 0 + with self.mutex: if self.initialized: return # Send reset command (CMD0) - if not self._send_command('GO_IDLE_STATE', 0): + if not self._send_command("GO_IDLE_STATE", 0): raise OSError( "flash_sdcard: failed to reset SD Card\n" "Note that older (Version 1.0) SD cards can not be\n" "hot swapped. Execute FIRMWARE_RESTART with the card\n" - "inserted for successful initialization.") + "inserted for successful initialization." + ) # Check Voltage Range (CMD8). Only Cards meeting the v2.0 spec # support this. V1.0 cards (and MMC) will return illegal command. check_pattern = 0b1010 resp = self._send_command_with_response( - 'SEND_IF_COND', (1 << 8) | check_pattern) - resp = resp.strip(b'\xFF') + "SEND_IF_COND", (1 << 8) | check_pattern + ) + resp = resp.strip(b"\xff") if len(resp) != 4: # CMD8 is illegal, this is a version 1.0 card self.sd_version = 1 else: self.sd_version = 2 if not (resp[-2] == 1 and resp[-1] == check_pattern): - raise OSError("flash_sdcard: SD Card not running in a " - "compatible voltage range") + raise OSError( + "flash_sdcard: SD Card not running in a " + "compatible voltage range" + ) if self.sd_version == 2: # Init card and come out of idle (ACMD41) # Version 2 Cards may init before checking the OCR # Allow vor LVDS card with 1.8v, too. - resp = self._check_command(lambda x: x[0]>>7 == 1, - 'SD_SEND_OP_COND', 0xC1100000, is_app_cmd=True, - ignoreCRC=True) + resp = self._check_command( + lambda x: x[0] >> 7 == 1, + "SD_SEND_OP_COND", + 0xC1100000, + is_app_cmd=True, + ignoreCRC=True, + ) if resp is None: - raise OSError("flash_sdcard: SD Card did not come" - " out of IDLE after reset") + raise OSError( + "flash_sdcard: SD Card did not come" " out of IDLE after reset" + ) if len(resp) == 4: if self.sd_version == 1: # Check acceptable voltage range for V1 cards if resp[1] != 0xFF: - raise OSError("flash_sdcard: card does not support" - " 3.3v range") + raise OSError( + "flash_sdcard: card does not support" " 3.3v range" + ) elif self.sd_version == 2: # Determine if this is a high capacity sdcard if resp[0] & 0x40: @@ -916,46 +996,46 @@ class SDCardSDIO: if self.sd_version == 1: # Init card and come out of idle (ACMD41) # Version 1 Cards do this after checking the OCR - if not self._check_command(0, 'SD_SEND_OP_COND', 0, - is_app_cmd=True): - raise OSError("flash_sdcard: SD Card did not come" - " out of IDLE after reset") + if not self._check_command(0, "SD_SEND_OP_COND", 0, is_app_cmd=True): + raise OSError( + "flash_sdcard: SD Card did not come" " out of IDLE after reset" + ) # Read out CID information register self._process_cid_reg() # Get card's relative address (RCA) - resp = self._send_command_with_response('SET_REL_ADDR', 0) + resp = self._send_command_with_response("SET_REL_ADDR", 0) # Check if bits 15:13 have some error set - if (resp[-2] & 0xe0) != 0: - raise OSError("flash_sdcard: set card's " - "relative address failed") - self.rca = resp[0]<<8 | resp[1] + if (resp[-2] & 0xE0) != 0: + raise OSError("flash_sdcard: set card's " "relative address failed") + self.rca = resp[0] << 8 | resp[1] # Read out CSD information register self._process_csd_reg() # Select the current card - if not self._check_command(check_for_ocr_errors, 'SEL_DESEL_CARD', - self.rca << 16, tries=1): + if not self._check_command( + check_for_ocr_errors, "SEL_DESEL_CARD", self.rca << 16, tries=1 + ): raise OSError("flash_sdcard: failed to select the card") # Set SDIO clk speed to approx. 1 MHz self.sdio.sdio_set_speed(1000000) - if self._check_command(check_for_ocr_errors, 'SET_BLOCKLEN', - SECTOR_SIZE, tries=5): + if self._check_command( + check_for_ocr_errors, "SET_BLOCKLEN", SECTOR_SIZE, tries=5 + ): self.initialized = True else: raise OSError("flash_sdcard: failed to set block size") - def deinit(self): with self.mutex: if self.initialized: # Reset the SD Card try: - if not self._send_command('GO_IDLE_STATE', 0): + if not self._send_command("GO_IDLE_STATE", 0): logging.info("flash_sdcard: failed to reset SD Card") except Exception: logging.exception("Error resetting SD Card") @@ -965,20 +1045,24 @@ class SDCardSDIO: self.total_sectors = 0 self.card_info.clear() - def _check_command(self, check_func, cmd, args, is_app_cmd=False, tries=15, - ignoreCRC=False): - func = self._send_app_cmd_with_response if is_app_cmd else \ - self._send_command_with_response + def _check_command( + self, check_func, cmd, args, is_app_cmd=False, tries=15, ignoreCRC=False + ): + func = ( + self._send_app_cmd_with_response + if is_app_cmd + else self._send_command_with_response + ) while True: resp, rt = func(cmd, args, get_rt=True, ignoreCRC=ignoreCRC) - #logging.info("flash_sdcard: Check cmd %s, response: %s" + # logging.info("flash_sdcard: Check cmd %s, response: %s" # % (cmd, repr(resp))) if resp and check_func(resp): return resp tries -= 1 if tries < 1: return None - self.reactor.pause(rt + .1) + self.reactor.pause(rt + 0.1) def _send_command(self, cmd, args, wait=0): cmd_code = SD_COMMANDS[cmd] @@ -986,68 +1070,70 @@ class SDCardSDIO: if isinstance(args, int) or isinstance(args, long): argument = args & 0xFFFFFFFF elif isinstance(args, list) and len(args) == 4: - argument = ((args[0] << 24) & 0xFF000000) | \ - ((args[1] << 16) & 0x00FF0000) | \ - ((args[2] << 8) & 0x0000FF00) | \ - ((args[3] << 0) & 0x000000FF) + argument = ( + ((args[0] << 24) & 0xFF000000) + | ((args[1] << 16) & 0x00FF0000) + | ((args[2] << 8) & 0x0000FF00) + | ((args[3] << 0) & 0x000000FF) + ) else: raise OSError("flash_sdcard: Invalid SD Card Command argument") params = self.sdio.sdio_send_cmd(cmd_code, argument, wait) - #logging.debug(f'_send_command({cmd=}, {args=}, {wait=}) -> ' + # logging.debug(f'_send_command({cmd=}, {args=}, {wait=}) -> ' # 'CMD: {cmd_code} ARG: {argument} -> {params=}') - if (wait == 0): + if wait == 0: # Just return the error code if no response was requested - return params['error'] == 0 + return params["error"] == 0 return params - def _send_command_with_response(self, cmd, args, check_error=True, - ignoreCRC=False, get_rt=False): + def _send_command_with_response( + self, cmd, args, check_error=True, ignoreCRC=False, get_rt=False + ): # Wait for a short response params = self._send_command(cmd, args, wait=1) - response = params['response'] + response = params["response"] if check_error: - if params['error'] != 0: - if ignoreCRC and params['error'] != 4: + if params["error"] != 0: + if ignoreCRC and params["error"] != 4: response = [] if get_rt: - return bytearray(response), params['#receive_time'] + return bytearray(response), params["#receive_time"] else: return bytearray(response) - def _send_app_cmd_with_response(self, cmd, args, - ignoreCRC=False, get_rt=False): + def _send_app_cmd_with_response(self, cmd, args, ignoreCRC=False, get_rt=False): # CMD55 tells the SD Card that the next command is an # Application Specific Command. - self._send_command_with_response('APP_CMD', self.rca << 16) + self._send_command_with_response("APP_CMD", self.rca << 16) return self._send_command_with_response( - cmd, args, ignoreCRC=ignoreCRC, get_rt=get_rt) + cmd, args, ignoreCRC=ignoreCRC, get_rt=get_rt + ) def _process_cid_reg(self): - params = self._send_command('ALL_SEND_CID', 0, wait=2) - reg = bytearray(params['response']) + params = self._send_command("ALL_SEND_CID", 0, wait=2) + reg = bytearray(params["response"]) if reg is None: raise OSError("flash_sdcard: Error reading CID register") cid = collections.OrderedDict() - cid['manufacturer_id'] = reg[0] - cid['oem_id'] = reg[1:3].decode(encoding='ascii', errors='ignore') - cid['product_name'] = reg[3:8].decode( - encoding='ascii', errors='ignore') - cid['product_revision'] = str(reg[8] >> 4 & 0xFF) + "." \ - + str(reg[8] & 0xFF) - cid['serial_number'] = "".join(["%02X" % (c,) for c in reg[9:13]]) + cid["manufacturer_id"] = reg[0] + cid["oem_id"] = reg[1:3].decode(encoding="ascii", errors="ignore") + cid["product_name"] = reg[3:8].decode(encoding="ascii", errors="ignore") + cid["product_revision"] = str(reg[8] >> 4 & 0xFF) + "." + str(reg[8] & 0xFF) + cid["serial_number"] = "".join(["%02X" % (c,) for c in reg[9:13]]) mfg_year = (((reg[13] & 0xF) << 4) | ((reg[14] >> 4) & 0xF)) + 2000 mfg_month = reg[14] & 0xF - cid['manufacturing_date'] = "%d/%d" % (mfg_month, mfg_year) + cid["manufacturing_date"] = "%d/%d" % (mfg_month, mfg_year) crc = calc_crc7(reg[:15], with_padding=False) if crc != reg[15]: - raise OSError("flash_sdcard: CID crc mismatch: 0x%02X, recd: 0x%02X" - % (crc, reg[15])) + raise OSError( + "flash_sdcard: CID crc mismatch: 0x%02X, recd: 0x%02X" % (crc, reg[15]) + ) self.card_info.update(cid) def _process_csd_reg(self): - params = self._send_command('SEND_CSD', self.rca << 16, wait=2) - reg = bytearray(params['response']) + params = self._send_command("SEND_CSD", self.rca << 16, wait=2) + reg = bytearray(params["response"]) if reg is None: raise OSError("flash_sdcard: Error reading CSD register") str_capacity = "Invalid" @@ -1055,10 +1141,9 @@ class SDCardSDIO: csd_type = (reg[0] >> 6) & 0x3 if csd_type == 0: # Standard Capacity (CSD Version 1.0) - max_block_len = 2**(reg[5] & 0xF) - c_size = ((reg[6] & 0x3) << 10) | (reg[7] << 2) | \ - ((reg[8] >> 6) & 0x3) - c_mult = 2**((((reg[9] & 0x3) << 1) | (reg[10] >> 7)) + 2) + max_block_len = 2 ** (reg[5] & 0xF) + c_size = ((reg[6] & 0x3) << 10) | (reg[7] << 2) | ((reg[8] >> 6) & 0x3) + c_mult = 2 ** ((((reg[9] & 0x3) << 1) | (reg[10] >> 7)) + 2) max_capacity = (c_size + 1) * c_mult * max_block_len str_capacity = "%.1f MiB" % (max_capacity / (1024.0**2)) elif csd_type == 1: @@ -1071,9 +1156,10 @@ class SDCardSDIO: self.write_protected = (reg[14] & 0x30) != 0 crc = calc_crc7(reg[:15], with_padding=False) if crc != reg[15]: - raise OSError("flash_sdcard: CSD crc mismatch: 0x%02X, recd: 0x%02X" - % (crc, reg[15])) - self.card_info['capacity'] = str_capacity + raise OSError( + "flash_sdcard: CSD crc mismatch: 0x%02X, recd: 0x%02X" % (crc, reg[15]) + ) + self.card_info["capacity"] = str_capacity self.total_sectors = max_capacity // SECTOR_SIZE def print_card_info(self, print_func=logging.info): @@ -1099,22 +1185,24 @@ class SDCardSDIO: offset = sector * SECTOR_SIZE params = self.sdio.sdio_read_data( - SD_COMMANDS['READ_SINGLE_BLOCK'], offset) - if params['error'] != 0: + SD_COMMANDS["READ_SINGLE_BLOCK"], offset + ) + if params["error"] != 0: raise OSError( - 'Read data failed. Error code=%d' %(params['error'],) ) - if params['read'] != SECTOR_SIZE: + "Read data failed. Error code=%d" % (params["error"],) + ) + if params["read"] != SECTOR_SIZE: raise OSError( - 'Read data failed. Expected %d bytes but got %d.' % - (SECTOR_SIZE, params['read']) ) + "Read data failed. Expected %d bytes but got %d." + % (SECTOR_SIZE, params["read"]) + ) buf = bytearray() offset = 0 - while SECTOR_SIZE-len(buf)>0: - rest = min(SECTOR_SIZE-len(buf), 32) - params = self.sdio.sdio_read_data_buffer( - offset, length=rest) - temp = bytearray(params['data']) + while SECTOR_SIZE - len(buf) > 0: + rest = min(SECTOR_SIZE - len(buf), 32) + params = self.sdio.sdio_read_data_buffer(offset, length=rest) + temp = bytearray(params["data"]) if len(temp) == 0: raise OSError("Read zero bytes from buffer") buf += temp @@ -1127,16 +1215,16 @@ class SDCardSDIO: with self.mutex: if not 0 <= sector < self.total_sectors: raise OSError( - "flash_sdcard: write error, sector number %d invalid" - % (sector)) + "flash_sdcard: write error, sector number %d invalid" % (sector) + ) if not self.initialized: - raise OSError("flash_sdcard: write error, SD Card not" - " initialized") + raise OSError("flash_sdcard: write error, SD Card not" " initialized") outbuf = bytearray(data) if len(outbuf) > SECTOR_SIZE: - raise OSError("sd_card: Cannot write sector larger" - " than %d bytes" - % (SECTOR_SIZE)) + raise OSError( + "sd_card: Cannot write sector larger" + " than %d bytes" % (SECTOR_SIZE) + ) elif len(outbuf) < SECTOR_SIZE: outbuf += bytearray([0] * (SECTOR_SIZE - len(outbuf))) offset = sector @@ -1145,23 +1233,25 @@ class SDCardSDIO: CHUNKSIZE = 32 for i in range(0, SECTOR_SIZE, CHUNKSIZE): - self.sdio.sdio_write_data_buffer(i, outbuf[i:i+CHUNKSIZE]) - params = self.sdio.sdio_write_data( - SD_COMMANDS['WRITE_BLOCK'], offset) - if (params['error'] != 0) or (params['write'] != SECTOR_SIZE): - raise OSError( - "flash_sdcard: Error writing to sector %d"% (sector,)) + self.sdio.sdio_write_data_buffer(i, outbuf[i : i + CHUNKSIZE]) + params = self.sdio.sdio_write_data(SD_COMMANDS["WRITE_BLOCK"], offset) + if (params["error"] != 0) or (params["write"] != SECTOR_SIZE): + raise OSError("flash_sdcard: Error writing to sector %d" % (sector,)) - status = self._send_command_with_response( - 'SEND_STATUS', self.rca << 16) + status = self._send_command_with_response("SEND_STATUS", self.rca << 16) if len(status) != 4: - raise OSError("flash_sdcard: Failed to get status response" - " after write: %s" % (repr(status),)) - if ((status[3]>>1) & 0x0F) != 0: + raise OSError( + "flash_sdcard: Failed to get status response" + " after write: %s" % (repr(status),) + ) + if ((status[3] >> 1) & 0x0F) != 0: # Bit 12:9 are not "0" (card is in idle) - raise OSError("flash_sdcard: Write error." - " Card is not in transfer state: 0x%02X" - % (((status[3]>>1) & 0x0F))) + raise OSError( + "flash_sdcard: Write error." + " Card is not in transfer state: 0x%02X" + % (((status[3] >> 1) & 0x0F)) + ) + SDIO_WARNING = """ This board requires a manual reboot to complete the flash process. @@ -1170,6 +1260,7 @@ power cycle is required. Please perform the power cycle now and then rerun this utility with the 'check' option to verify flash. """ + class MCUConnection: def __init__(self, k_reactor, device, baud, board_cfg): self.reactor = k_reactor @@ -1194,7 +1285,7 @@ class MCUConnection: self.reactor.register_callback(self._do_serial_connect) curtime = self.reactor.monotonic() while True: - curtime = self.reactor.pause(curtime + 1.) + curtime = self.reactor.pause(curtime + 1.0) output(".") if self.connect_completion.test(): self.connected = self.connect_completion.wait() @@ -1205,18 +1296,19 @@ class MCUConnection: raise SPIFlashError("Unable to connect to MCU") output_line("Connected") msgparser = self._serial.get_msgparser() - mcu_type = msgparser.get_constant('MCU') - build_mcu_type = self.board_config['mcu'] + mcu_type = msgparser.get_constant("MCU") + build_mcu_type = self.board_config["mcu"] if mcu_type != build_mcu_type: raise SPIFlashError( "MCU Type mismatch: Build MCU = %s, Connected MCU = %s" - % (build_mcu_type, mcu_type)) + % (build_mcu_type, mcu_type) + ) self.enumerations = msgparser.get_enumerations() self.raw_dictionary = msgparser.get_raw_data_dictionary() self.proto_error = msgparser.error def _do_serial_connect(self, eventtime): - endtime = eventtime + 60. + endtime = eventtime + 60.0 while True: try: self._serial.connect_uart(self.serial_device, self.baud) @@ -1228,7 +1320,7 @@ class MCUConnection: return output("Connection Error, retrying..") self._serial.disconnect() - self.reactor.pause(curtime + 2.) + self.reactor.pause(curtime + 2.0) else: break self.connect_completion.complete(True) @@ -1255,7 +1347,8 @@ class MCUConnection: for response in GET_CFG_RESPONSES: try: get_cfg_cmd = mcu.CommandQueryWrapper( - self._serial, GET_CFG_CMD, response) + self._serial, GET_CFG_CMD, response + ) break except Exception as err: # Raise an exception if we hit the end of the list. @@ -1268,25 +1361,26 @@ class MCUConnection: output("Checking Current MCU Configuration...") params = self.get_mcu_config() output_line("Done") - if params['is_config'] or params['is_shutdown']: - output_line("MCU needs restart: is_config=%d, is_shutdown=%d" - % (params['is_config'], params['is_shutdown'])) + if params["is_config"] or params["is_shutdown"]: + output_line( + "MCU needs restart: is_config=%d, is_shutdown=%d" + % (params["is_config"], params["is_shutdown"]) + ) return True return False def _configure_mcu_spibus(self, printfunc=logging.info): # TODO: add commands for buttons? Or perhaps an endstop? We # just need to be able to query the status of the detect pin - cs_pin = self.board_config['cs_pin'].upper() - bus = self.board_config['spi_bus'] - bus_enums = self.enumerations.get( - 'spi_bus', self.enumerations.get('bus')) - pin_enums = self.enumerations.get('pin') + cs_pin = self.board_config["cs_pin"].upper() + bus = self.board_config["spi_bus"] + bus_enums = self.enumerations.get("spi_bus", self.enumerations.get("bus")) + pin_enums = self.enumerations.get("pin") if bus == "swspi": mcu_freq = self.clocksync.print_time_to_clock(1) - pulse_ticks = mcu_freq//SD_SPI_SPEED - cfgpins = self.board_config['spi_pins'] - pins = [p.strip().upper() for p in cfgpins.split(',') if p.strip()] + pulse_ticks = mcu_freq // SD_SPI_SPEED + cfgpins = self.board_config["spi_pins"] + pins = [p.strip().upper() for p in cfgpins.split(",") if p.strip()] pin_err_msg = "Invalid Software SPI Pins: %s" % (cfgpins,) if len(pins) != 3: raise SPIFlashError(pin_err_msg) @@ -1294,10 +1388,10 @@ class MCUConnection: if p not in pin_enums: raise SPIFlashError(pin_err_msg) bus_cmds = [ - SW_SPI_BUS_CMDS[0] % (SPI_OID, pins[0], pins[1], pins[2], - SPI_MODE, pulse_ticks), - SW_SPI_BUS_CMDS[1] % (SPI_OID, pins[0], pins[1], pins[2], - SPI_MODE, SD_SPI_SPEED) + SW_SPI_BUS_CMDS[0] + % (SPI_OID, pins[0], pins[1], pins[2], SPI_MODE, pulse_ticks), + SW_SPI_BUS_CMDS[1] + % (SPI_OID, pins[0], pins[1], pins[2], SPI_MODE, SD_SPI_SPEED), ] else: if bus not in bus_enums: @@ -1307,7 +1401,9 @@ class MCUConnection: ] if cs_pin not in pin_enums: raise SPIFlashError("Invalid CS Pin: %s" % (cs_pin,)) - cfg_cmds = [ALLOC_OIDS_CMD % (1,),] + cfg_cmds = [ + ALLOC_OIDS_CMD % (1,), + ] self._serial.send(cfg_cmds[0]) spi_cfg_cmds = [ SPI_CFG_CMDS[0] % (SPI_OID, cs_pin, False), @@ -1315,20 +1411,19 @@ class MCUConnection: ] cfg_cmds.append(self._try_send_command(spi_cfg_cmds)) cfg_cmds.append(self._try_send_command(bus_cmds)) - config_crc = zlib.crc32('\n'.join(cfg_cmds).encode()) & 0xffffffff + config_crc = zlib.crc32("\n".join(cfg_cmds).encode()) & 0xFFFFFFFF self._serial.send(FINALIZE_CFG_CMD % (config_crc,)) config = self.get_mcu_config() if not config["is_config"] or config["is_shutdown"]: raise MCUConfigError("Failed to configure MCU") printfunc("Initializing SD Card and Mounting file system...") self.fatfs = FatFS(self._serial) - self.reactor.pause(self.reactor.monotonic() + .5) + self.reactor.pause(self.reactor.monotonic() + 0.5) try: self.fatfs.mount(printfunc) except OSError: logging.exception("SD Card Mount Failure") - raise SPIFlashError( - "Failed to Initialize SD Card. Is it inserted?") + raise SPIFlashError("Failed to Initialize SD Card. Is it inserted?") def _try_send_command(self, cmd_list): for cmd in cmd_list: @@ -1341,10 +1436,9 @@ class MCUConnection: return cmd def _configure_mcu_sdiobus(self, printfunc=logging.info): - bus = self.board_config['sdio_bus'] - bus_enums = self.enumerations.get( - 'sdio_bus', self.enumerations.get('bus')) - pin_enums = self.enumerations.get('pin') + bus = self.board_config["sdio_bus"] + bus_enums = self.enumerations.get("sdio_bus", self.enumerations.get("bus")) + pin_enums = self.enumerations.get("pin") if bus not in bus_enums: raise SPIFlashError("Invalid SDIO Bus: %s" % (bus,)) bus_cmd = SDIO_BUS_CMD % (SDIO_OID, bus) @@ -1352,25 +1446,24 @@ class MCUConnection: cfg_cmds = [ALLOC_OIDS_CMD % (1,), sdio_cfg_cmd, bus_cmd] for cmd in cfg_cmds: self._serial.send(cmd) - config_crc = zlib.crc32('\n'.join(cfg_cmds).encode()) & 0xffffffff + config_crc = zlib.crc32("\n".join(cfg_cmds).encode()) & 0xFFFFFFFF self._serial.send(FINALIZE_CFG_CMD % (config_crc,)) config = self.get_mcu_config() if not config["is_config"] or config["is_shutdown"]: raise MCUConfigError("Failed to configure MCU") printfunc("Initializing SD Card and Mounting file system...") - self.fatfs = FatFS(self._serial,spi=False) - self.reactor.pause(self.reactor.monotonic() + .5) + self.fatfs = FatFS(self._serial, spi=False) + self.reactor.pause(self.reactor.monotonic() + 0.5) try: self.fatfs.mount(printfunc) except OSError: logging.exception("SD Card Mount Failure") - raise SPIFlashError( - "Failed to Initialize SD Card. Is it inserted?") + raise SPIFlashError("Failed to Initialize SD Card. Is it inserted?") def configure_mcu(self, printfunc=logging.info): - if 'spi_bus' in self.board_config: + if "spi_bus" in self.board_config: self._configure_mcu_spibus(printfunc=printfunc) - elif 'sdio_bus' in self.board_config: + elif "sdio_bus" in self.board_config: self._configure_mcu_sdiobus(printfunc=printfunc) else: raise SPIFlashError("Unknown bus defined in board_defs.py.") @@ -1379,10 +1472,10 @@ class MCUConnection: output("Uploading Klipper Firmware to SD Card...") input_sha = hashlib.sha1() sd_sha = hashlib.sha1() - klipper_bin_path = self.board_config['klipper_bin_path'] - fw_path = self.board_config.get('firmware_path', "firmware.bin") + klipper_bin_path = self.board_config["klipper_bin_path"] + fw_path = self.board_config.get("firmware_path", "firmware.bin") try: - with open(klipper_bin_path, 'rb') as local_f: + with open(klipper_bin_path, "rb") as local_f: with self.fatfs.open_file(fw_path, "wb") as sd_f: while True: buf = local_f.read(4096) @@ -1397,7 +1490,7 @@ class MCUConnection: output("Validating Upload...") try: finfo = self.fatfs.get_file_info(fw_path) - with self.fatfs.open_file(fw_path, 'r') as sd_f: + with self.fatfs.open_file(fw_path, "r") as sd_f: while True: buf = sd_f.read(4096) if not buf: @@ -1406,20 +1499,22 @@ class MCUConnection: except Exception: logging.exception("SD Card Download Error") raise SPIFlashError("Error reading %s from SD" % (fw_path)) - sd_size = finfo.get('size', -1) + sd_size = finfo.get("size", -1) input_chksm = input_sha.hexdigest().upper() sd_chksm = sd_sha.hexdigest().upper() if input_chksm != sd_chksm: - raise SPIFlashError("Checksum Mismatch: Got '%s', expected '%s'" - % (sd_chksm, input_chksm)) + raise SPIFlashError( + "Checksum Mismatch: Got '%s', expected '%s'" % (sd_chksm, input_chksm) + ) output_line("Done") output_line( "Firmware Upload Complete: %s, Size: %d, Checksum (SHA1): %s" - % (fw_path, sd_size, sd_chksm)) + % (fw_path, sd_size, sd_chksm) + ) return sd_chksm def verify_flash(self, req_chksm, old_dictionary, req_dictionary): - if bool(self.board_config.get('skip_verify', False)): + if bool(self.board_config.get("skip_verify", False)): output_line(SDIO_WARNING) return output("Verifying Flash...") @@ -1429,10 +1524,14 @@ class MCUConnection: # If we have a dictionary, check that it matches. if req_dictionary: if cur_dictionary != req_dictionary: - raise SPIFlashError("Version Mismatch: Got '%s...', " - "expected '%s...'" - % (msgparser.get_version_info()[0], - json.loads(req_dictionary)['version'])) + raise SPIFlashError( + "Version Mismatch: Got '%s...', " + "expected '%s...'" + % ( + msgparser.get_version_info()[0], + json.loads(req_dictionary)["version"], + ) + ) output("Version matched...") validation_passed = True # Otherwise check that the MCU dictionary changed @@ -1444,10 +1543,9 @@ class MCUConnection: # If the version didn't change, look for current firmware to checksum cur_fw_sha = None if not validation_passed: - cur_fw_path = self.board_config.get('current_firmware_path', - "FIRMWARE.CUR") + cur_fw_path = self.board_config.get("current_firmware_path", "FIRMWARE.CUR") try: - with self.fatfs.open_file(cur_fw_path, 'r') as sd_f: + with self.fatfs.open_file(cur_fw_path, "r") as sd_f: cur_fw_sha = hashlib.sha1() while True: buf = sd_f.read(4096) @@ -1464,16 +1562,17 @@ class MCUConnection: validation_passed = True output("Checksum matched...") else: - raise SPIFlashError("Checksum Mismatch: Got '%s', " - "expected '%s'" - % (cur_fw_chksm, req_chksm)) + raise SPIFlashError( + "Checksum Mismatch: Got '%s', " + "expected '%s'" % (cur_fw_chksm, req_chksm) + ) if not validation_passed: raise SPIFlashError("Validation failure.") output_line("Done") # Remove firmware file if MCU bootloader failed to rename. if cur_fw_sha is None: try: - fw_path = self.board_config.get('firmware_path', "firmware.bin") + fw_path = self.board_config.get("firmware_path", "firmware.bin") self.fatfs.remove_item(fw_path) output_line("Found and deleted %s after reset" % (fw_path,)) except Exception: @@ -1481,17 +1580,19 @@ class MCUConnection: output_line("Firmware Flash Successful") output_line("Current Firmware: %s" % (msgparser.get_version_info()[0],)) + class SPIFlash: def __init__(self, args): self.board_config = args - if not os.path.exists(args['device']): - raise SPIFlashError("No device found at '%s'" % (args['device'],)) - if not os.path.isfile(args['klipper_bin_path']): - raise SPIFlashError("Unable to locate klipper binary at '%s'" - % (args['klipper_bin_path'],)) - tty_name, dev_by_path = translate_serial_to_tty(args['device']) + if not os.path.exists(args["device"]): + raise SPIFlashError("No device found at '%s'" % (args["device"],)) + if not os.path.isfile(args["klipper_bin_path"]): + raise SPIFlashError( + "Unable to locate klipper binary at '%s'" % (args["klipper_bin_path"],) + ) + tty_name, dev_by_path = translate_serial_to_tty(args["device"]) self.device_path = dev_by_path - self.baud_rate = args['baud'] + self.baud_rate = args["baud"] self.mcu_conn = None self.firmware_checksum = None self.task_complete = False @@ -1499,24 +1600,26 @@ class SPIFlash: self.need_verify = True self.old_dictionary = None self.new_dictionary = None - if args['klipper_dict_path'] is not None: + if args["klipper_dict_path"] is not None: try: - with open(args['klipper_dict_path'], 'rb') as dict_f: - self.new_dictionary = dict_f.read(32*1024) + with open(args["klipper_dict_path"], "rb") as dict_f: + self.new_dictionary = dict_f.read(32 * 1024) except Exception: - raise SPIFlashError("Missing or invalid dictionary at '%s'" - % (args['klipper_dict_path'],)) + raise SPIFlashError( + "Missing or invalid dictionary at '%s'" + % (args["klipper_dict_path"],) + ) def _wait_for_reconnect(self): output("Waiting for device to reconnect...") - time.sleep(1.) + time.sleep(1.0) if os.path.exists(self.device_path): # device is already available, this could be a UART - time.sleep(2.) + time.sleep(2.0) else: wait_left = 30 while wait_left: - time.sleep(1.) + time.sleep(1.0) output(".") if os.path.exists(self.device_path): break @@ -1568,16 +1671,18 @@ class SPIFlash: if not self.mcu_conn.connected: self.mcu_conn.connect() self.mcu_conn.configure_mcu() - self.mcu_conn.verify_flash(self.firmware_checksum, self.old_dictionary, - self.new_dictionary) + self.mcu_conn.verify_flash( + self.firmware_checksum, self.old_dictionary, self.new_dictionary + ) self.mcu_conn.reset() self.task_complete = True def run_reactor_task(self, run_cb): self.task_complete = False k_reactor = reactor.Reactor() - self.mcu_conn = MCUConnection(k_reactor, self.device_path, - self.baud_rate, self.board_config) + self.mcu_conn = MCUConnection( + k_reactor, self.device_path, self.baud_rate, self.board_config + ) k_reactor.register_callback(run_cb) try: k_reactor.run() @@ -1593,7 +1698,7 @@ class SPIFlash: self.mcu_conn = k_reactor = None def run(self): - if not bool(self.board_config.get('verify_only', False)): + if not bool(self.board_config.get("verify_only", False)): self.run_reactor_task(self.run_reset_upload) self._wait_for_reconnect() if self.need_upload: @@ -1606,12 +1711,12 @@ class SPIFlash: self._wait_for_reconnect() self.run_reactor_task(self.run_verify) + def main(): - parser = argparse.ArgumentParser( - description="SD Card Firmware Upload Utility") + parser = argparse.ArgumentParser(description="SD Card Firmware Upload Utility") parser.add_argument( - "-l", "--list", action="store_true", - help="List Supported Boards") + "-l", "--list", action="store_true", help="List Supported Boards" + ) args = parser.parse_known_args() if args[0].list: blist = board_defs.list_boards() @@ -1620,24 +1725,32 @@ def main(): output_line(board) return parser.add_argument( - "-b", "--baud", metavar="<baud rate>", type=int, - default=250000, help="Serial Baud Rate") - parser.add_argument( - "-v", "--verbose", action="store_true", - help="Enable verbose output") - parser.add_argument( - "-d", "--dict_path", metavar="<klipper.dict>", type=str, - default=None, help="Klipper firmware dictionary") + "-b", + "--baud", + metavar="<baud rate>", + type=int, + default=250000, + help="Serial Baud Rate", + ) parser.add_argument( - "-c","--check", action="store_true", - help="Perform flash check/verify only") + "-v", "--verbose", action="store_true", help="Enable verbose output" + ) parser.add_argument( - "device", metavar="<device>", help="Device Serial Port") + "-d", + "--dict_path", + metavar="<klipper.dict>", + type=str, + default=None, + help="Klipper firmware dictionary", + ) parser.add_argument( - "board", metavar="<board>", help="Board Type") + "-c", "--check", action="store_true", help="Perform flash check/verify only" + ) + parser.add_argument("device", metavar="<device>", help="Device Serial Port") + parser.add_argument("board", metavar="<board>", help="Board Type") parser.add_argument( - "klipper_bin_path", metavar="<klipper.bin>", - help="Klipper firmware binary") + "klipper_bin_path", metavar="<klipper.bin>", help="Klipper firmware binary" + ) args = parser.parse_args() log_level = logging.DEBUG if args.verbose else logging.CRITICAL logging.basicConfig(level=log_level) @@ -1645,14 +1758,14 @@ def main(): if flash_args is None: output_line("Unable to find definition for board: %s" % (args.board,)) sys.exit(-1) - flash_args['device'] = args.device - flash_args['baud'] = args.baud - flash_args['klipper_bin_path'] = args.klipper_bin_path - flash_args['klipper_dict_path'] = args.dict_path - flash_args['verify_only'] = args.check + flash_args["device"] = args.device + flash_args["baud"] = args.baud + flash_args["klipper_bin_path"] = args.klipper_bin_path + flash_args["klipper_dict_path"] = args.dict_path + flash_args["verify_only"] = args.check if args.check: # override board_defs setting when doing verify-only: - flash_args['skip_verify'] = False + flash_args["skip_verify"] = False check_need_convert(args.board, flash_args) fatfs_lib.check_fatfs_build(output) try: |