aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xlib/canboot/flash_can.py91
1 files changed, 81 insertions, 10 deletions
diff --git a/lib/canboot/flash_can.py b/lib/canboot/flash_can.py
index 18100517..b7877097 100755
--- a/lib/canboot/flash_can.py
+++ b/lib/canboot/flash_can.py
@@ -496,10 +496,71 @@ class CanSocket:
self._loop.remove_reader(self.cansock.fileno())
self.cansock.close()
+class SerialSocket:
+ def __init__(self, loop: asyncio.AbstractEventLoop):
+ self._loop = loop
+ self.serial = self.serial_error = None
+ self.node = CanNode(0, self)
+
+ def _handle_response(self) -> None:
+ try:
+ data = self.serial.read(4096)
+ except self.serial_error as e:
+ logging.exception("Error on serial read")
+ self.close()
+ self.node.feed_data(data)
+
+ def send(self, can_id: int, payload: bytes = b"") -> None:
+ try:
+ self.serial.write(payload)
+ except self.serial_error as e:
+ logging.exception("Error on serial write")
+ self.close()
+
+ async def run(self, intf: str, baud: int, fw_path: pathlib.Path) -> None:
+ if not fw_path.is_file():
+ raise FlashCanError("Invalid firmware path '%s'" % (fw_path))
+ import serial
+ self.serial_error = serial.SerialException
+ try:
+ serial_dev = serial.Serial(baudrate=baud, timeout=0,
+ exclusive=True)
+ serial_dev.port = intf
+ serial_dev.open()
+ except (OSError, IOError, self.serial_error) as e:
+ raise FlashCanError("Unable to open serial port: %s" % (e,))
+ self.serial = serial_dev
+ self._loop.add_reader(self.serial.fileno(), self._handle_response)
+ flasher = CanFlasher(self.node, fw_path)
+ try:
+ await flasher.connect_btl()
+ await flasher.send_file()
+ await flasher.verify_file()
+ finally:
+ # always attempt to send the complete command. If
+ # there is an error it will exit the bootloader
+ # unless comms were broken
+ await flasher.finish()
+
+ def close(self):
+ if self.serial is None:
+ return
+ self._loop.remove_reader(self.serial.fileno())
+ self.serial.close()
+ self.serial = None
+
def main():
parser = argparse.ArgumentParser(
description="Can Bootloader Flash Utility")
parser.add_argument(
+ "-d", "--device", metavar='<serial device>',
+ help="Serial Device"
+ )
+ parser.add_argument(
+ "-b", "--baud", default=250000, metavar='<baud rate>',
+ help="Serial baud rate"
+ )
+ parser.add_argument(
"-i", "--interface", default="can0", metavar='<can interface>',
help="Can Interface"
)
@@ -522,27 +583,37 @@ def main():
args = parser.parse_args()
if not args.verbose:
- logging.getLogger().setLevel(logging.CRITICAL)
+ logging.getLogger().setLevel(logging.ERROR)
intf = args.interface
fpath = pathlib.Path(args.firmware).expanduser().resolve()
loop = asyncio.get_event_loop()
+ iscan = args.device is None
+ sock = None
try:
- cansock = CanSocket(loop)
- if args.query:
- loop.run_until_complete(cansock.run_query(intf))
+ if iscan:
+ sock = CanSocket(loop)
+ if args.query:
+ loop.run_until_complete(sock.run_query(intf))
+ else:
+ if args.uuid is None:
+ raise FlashCanError(
+ "The 'uuid' option must be specified to flash a device"
+ )
+ uuid = int(args.uuid, 16)
+ loop.run_until_complete(sock.run(intf, uuid, fpath))
else:
- if args.uuid is None:
+ if args.device is None:
raise FlashCanError(
- "The 'uuid' option must be specified to flash a device"
+ "The 'device' option must be specified to flash a device"
)
- uuid = int(args.uuid, 16)
- loop.run_until_complete(cansock.run(intf, uuid, fpath))
+ sock = SerialSocket(loop)
+ loop.run_until_complete(sock.run(args.device, args.baud, fpath))
except Exception as e:
logging.exception("Can Flash Error")
sys.exit(-1)
finally:
- if cansock is not None:
- cansock.close()
+ if sock is not None:
+ sock.close()
if args.query:
output_line("Query Complete")
else: