diff options
Diffstat (limited to 'tools/pyboard.py')
-rwxr-xr-x | tools/pyboard.py | 269 |
1 files changed, 168 insertions, 101 deletions
diff --git a/tools/pyboard.py b/tools/pyboard.py index a0b4ac320..de8b48836 100755 --- a/tools/pyboard.py +++ b/tools/pyboard.py @@ -77,35 +77,42 @@ except AttributeError: # Python2 doesn't have buffer attr stdout = sys.stdout + def stdout_write_bytes(b): b = b.replace(b"\x04", b"") stdout.write(b) stdout.flush() + class PyboardError(Exception): pass + class TelnetToSerial: def __init__(self, ip, user, password, read_timeout=None): self.tn = None import telnetlib + self.tn = telnetlib.Telnet(ip, timeout=15) self.read_timeout = read_timeout - if b'Login as:' in self.tn.read_until(b'Login as:', timeout=read_timeout): - self.tn.write(bytes(user, 'ascii') + b"\r\n") + if b"Login as:" in self.tn.read_until(b"Login as:", timeout=read_timeout): + self.tn.write(bytes(user, "ascii") + b"\r\n") - if b'Password:' in self.tn.read_until(b'Password:', timeout=read_timeout): + if b"Password:" in self.tn.read_until(b"Password:", timeout=read_timeout): # needed because of internal implementation details of the telnet server time.sleep(0.2) - self.tn.write(bytes(password, 'ascii') + b"\r\n") + self.tn.write(bytes(password, "ascii") + b"\r\n") - if b'for more information.' in self.tn.read_until(b'Type "help()" for more information.', timeout=read_timeout): + if b"for more information." in self.tn.read_until( + b'Type "help()" for more information.', timeout=read_timeout + ): # login successful from collections import deque + self.fifo = deque() return - raise PyboardError('Failed to establish a telnet connection with the board') + raise PyboardError("Failed to establish a telnet connection with the board") def __del__(self): self.close() @@ -127,7 +134,7 @@ class TelnetToSerial: break timeout_count += 1 - data = b'' + data = b"" while len(data) < size and len(self.fifo) > 0: data += bytes([self.fifo.popleft()]) return data @@ -151,24 +158,33 @@ class ProcessToSerial: def __init__(self, cmd): import subprocess - self.subp = subprocess.Popen(cmd, bufsize=0, shell=True, preexec_fn=os.setsid, - stdin=subprocess.PIPE, stdout=subprocess.PIPE) + + self.subp = subprocess.Popen( + cmd, + bufsize=0, + shell=True, + preexec_fn=os.setsid, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + ) # Initially was implemented with selectors, but that adds Python3 # dependency. However, there can be race conditions communicating # with a particular child process (like QEMU), and selectors may # still work better in that case, so left inplace for now. # - #import selectors - #self.sel = selectors.DefaultSelector() - #self.sel.register(self.subp.stdout, selectors.EVENT_READ) + # import selectors + # self.sel = selectors.DefaultSelector() + # self.sel.register(self.subp.stdout, selectors.EVENT_READ) import select + self.poll = select.poll() self.poll.register(self.subp.stdout.fileno()) def close(self): import signal + os.killpg(os.getpgid(self.subp.pid), signal.SIGTERM) def read(self, size=1): @@ -182,7 +198,7 @@ class ProcessToSerial: return len(data) def inWaiting(self): - #res = self.sel.select(0) + # res = self.sel.select(0) res = self.poll.poll(0) if res: return 1 @@ -198,8 +214,16 @@ class ProcessPtyToTerminal: import subprocess import re import serial - self.subp = subprocess.Popen(cmd.split(), bufsize=0, shell=False, preexec_fn=os.setsid, - stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + self.subp = subprocess.Popen( + cmd.split(), + bufsize=0, + shell=False, + preexec_fn=os.setsid, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) pty_line = self.subp.stderr.readline().decode("utf-8") m = re.search(r"/dev/pts/[0-9]+", pty_line) if not m: @@ -213,6 +237,7 @@ class ProcessPtyToTerminal: def close(self): import signal + os.killpg(os.getpgid(self.subp.pid), signal.SIGTERM) def read(self, size=1): @@ -226,36 +251,37 @@ class ProcessPtyToTerminal: class Pyboard: - def __init__(self, device, baudrate=115200, user='micro', password='python', wait=0): + def __init__(self, device, baudrate=115200, user="micro", password="python", wait=0): if device.startswith("exec:"): - self.serial = ProcessToSerial(device[len("exec:"):]) + self.serial = ProcessToSerial(device[len("exec:") :]) elif device.startswith("execpty:"): - self.serial = ProcessPtyToTerminal(device[len("qemupty:"):]) - elif device and device[0].isdigit() and device[-1].isdigit() and device.count('.') == 3: + self.serial = ProcessPtyToTerminal(device[len("qemupty:") :]) + elif device and device[0].isdigit() and device[-1].isdigit() and device.count(".") == 3: # device looks like an IP address self.serial = TelnetToSerial(device, user, password, read_timeout=10) else: import serial + delayed = False for attempt in range(wait + 1): try: self.serial = serial.Serial(device, baudrate=baudrate, interCharTimeout=1) break - except (OSError, IOError): # Py2 and Py3 have different errors + except (OSError, IOError): # Py2 and Py3 have different errors if wait == 0: continue if attempt == 0: - sys.stdout.write('Waiting {} seconds for pyboard '.format(wait)) + sys.stdout.write("Waiting {} seconds for pyboard ".format(wait)) delayed = True time.sleep(1) - sys.stdout.write('.') + sys.stdout.write(".") sys.stdout.flush() else: if delayed: - print('') - raise PyboardError('failed to access ' + device) + print("") + raise PyboardError("failed to access " + device) if delayed: - print('') + print("") def close(self): self.serial.close() @@ -287,7 +313,7 @@ class Pyboard: return data def enter_raw_repl(self): - self.serial.write(b'\r\x03\x03') # ctrl-C twice: interrupt any running program + self.serial.write(b"\r\x03\x03") # ctrl-C twice: interrupt any running program # flush input (without relying on serial.flushInput()) n = self.serial.inWaiting() @@ -295,38 +321,38 @@ class Pyboard: self.serial.read(n) n = self.serial.inWaiting() - self.serial.write(b'\r\x01') # ctrl-A: enter raw REPL - data = self.read_until(1, b'raw REPL; CTRL-B to exit\r\n>') - if not data.endswith(b'raw REPL; CTRL-B to exit\r\n>'): + self.serial.write(b"\r\x01") # ctrl-A: enter raw REPL + data = self.read_until(1, b"raw REPL; CTRL-B to exit\r\n>") + if not data.endswith(b"raw REPL; CTRL-B to exit\r\n>"): print(data) - raise PyboardError('could not enter raw repl') + raise PyboardError("could not enter raw repl") - self.serial.write(b'\x04') # ctrl-D: soft reset - data = self.read_until(1, b'soft reboot\r\n') - if not data.endswith(b'soft reboot\r\n'): + self.serial.write(b"\x04") # ctrl-D: soft reset + data = self.read_until(1, b"soft reboot\r\n") + if not data.endswith(b"soft reboot\r\n"): print(data) - raise PyboardError('could not enter raw repl') + raise PyboardError("could not enter raw repl") # By splitting this into 2 reads, it allows boot.py to print stuff, # which will show up after the soft reboot and before the raw REPL. - data = self.read_until(1, b'raw REPL; CTRL-B to exit\r\n') - if not data.endswith(b'raw REPL; CTRL-B to exit\r\n'): + data = self.read_until(1, b"raw REPL; CTRL-B to exit\r\n") + if not data.endswith(b"raw REPL; CTRL-B to exit\r\n"): print(data) - raise PyboardError('could not enter raw repl') + raise PyboardError("could not enter raw repl") def exit_raw_repl(self): - self.serial.write(b'\r\x02') # ctrl-B: enter friendly REPL + self.serial.write(b"\r\x02") # ctrl-B: enter friendly REPL def follow(self, timeout, data_consumer=None): # wait for normal output - data = self.read_until(1, b'\x04', timeout=timeout, data_consumer=data_consumer) - if not data.endswith(b'\x04'): - raise PyboardError('timeout waiting for first EOF reception') + data = self.read_until(1, b"\x04", timeout=timeout, data_consumer=data_consumer) + if not data.endswith(b"\x04"): + raise PyboardError("timeout waiting for first EOF reception") data = data[:-1] # wait for error output - data_err = self.read_until(1, b'\x04', timeout=timeout) - if not data_err.endswith(b'\x04'): - raise PyboardError('timeout waiting for second EOF reception') + data_err = self.read_until(1, b"\x04", timeout=timeout) + if not data_err.endswith(b"\x04"): + raise PyboardError("timeout waiting for second EOF reception") data_err = data_err[:-1] # return normal and error output @@ -336,67 +362,71 @@ class Pyboard: if isinstance(command, bytes): command_bytes = command else: - command_bytes = bytes(command, encoding='utf8') + command_bytes = bytes(command, encoding="utf8") # check we have a prompt - data = self.read_until(1, b'>') - if not data.endswith(b'>'): - raise PyboardError('could not enter raw repl') + data = self.read_until(1, b">") + if not data.endswith(b">"): + raise PyboardError("could not enter raw repl") # write command for i in range(0, len(command_bytes), 256): - self.serial.write(command_bytes[i:min(i + 256, len(command_bytes))]) + self.serial.write(command_bytes[i : min(i + 256, len(command_bytes))]) time.sleep(0.01) - self.serial.write(b'\x04') + self.serial.write(b"\x04") # check if we could exec command data = self.serial.read(2) - if data != b'OK': - raise PyboardError('could not exec command (response: %r)' % data) + if data != b"OK": + raise PyboardError("could not exec command (response: %r)" % data) def exec_raw(self, command, timeout=10, data_consumer=None): - self.exec_raw_no_follow(command); + self.exec_raw_no_follow(command) return self.follow(timeout, data_consumer) def eval(self, expression): - ret = self.exec_('print({})'.format(expression)) + ret = self.exec_("print({})".format(expression)) ret = ret.strip() return ret def exec_(self, command, data_consumer=None): ret, ret_err = self.exec_raw(command, data_consumer=data_consumer) if ret_err: - raise PyboardError('exception', ret, ret_err) + raise PyboardError("exception", ret, ret_err) return ret def execfile(self, filename): - with open(filename, 'rb') as f: + with open(filename, "rb") as f: pyfile = f.read() return self.exec_(pyfile) def get_time(self): - t = str(self.eval('pyb.RTC().datetime()'), encoding='utf8')[1:-1].split(', ') + t = str(self.eval("pyb.RTC().datetime()"), encoding="utf8")[1:-1].split(", ") return int(t[4]) * 3600 + int(t[5]) * 60 + int(t[6]) def fs_ls(self, src): - cmd = "import uos\nfor f in uos.ilistdir(%s):\n" \ - " print('{:12} {}{}'.format(f[3]if len(f)>3 else 0,f[0],'/'if f[1]&0x4000 else ''))" % \ - (("'%s'" % src) if src else '') + cmd = ( + "import uos\nfor f in uos.ilistdir(%s):\n" + " print('{:12} {}{}'.format(f[3]if len(f)>3 else 0,f[0],'/'if f[1]&0x4000 else ''))" + % (("'%s'" % src) if src else "") + ) self.exec_(cmd, data_consumer=stdout_write_bytes) def fs_cat(self, src, chunk_size=256): - cmd = "with open('%s') as f:\n while 1:\n" \ + cmd = ( + "with open('%s') as f:\n while 1:\n" " b=f.read(%u)\n if not b:break\n print(b,end='')" % (src, chunk_size) + ) self.exec_(cmd, data_consumer=stdout_write_bytes) def fs_get(self, src, dest, chunk_size=256): self.exec_("f=open('%s','rb')\nr=f.read" % src) - with open(dest, 'wb') as f: + with open(dest, "wb") as f: while True: data = bytearray() - self.exec_("print(r(%u))" % chunk_size, data_consumer=lambda d:data.extend(d)) - assert data.endswith(b'\r\n\x04') - data = eval(str(data[:-3], 'ascii')) + self.exec_("print(r(%u))" % chunk_size, data_consumer=lambda d: data.extend(d)) + assert data.endswith(b"\r\n\x04") + data = eval(str(data[:-3], "ascii")) if not data: break f.write(data) @@ -404,15 +434,15 @@ class Pyboard: def fs_put(self, src, dest, chunk_size=256): self.exec_("f=open('%s','wb')\nw=f.write" % dest) - with open(src, 'rb') as f: + with open(src, "rb") as f: while True: data = f.read(chunk_size) if not data: break if sys.version_info < (3,): - self.exec_('w(b' + repr(data) + ')') + self.exec_("w(b" + repr(data) + ")") else: - self.exec_('w(' + repr(data) + ')') + self.exec_("w(" + repr(data) + ")") self.exec_("f.close()") def fs_mkdir(self, dir): @@ -424,11 +454,13 @@ class Pyboard: def fs_rm(self, src): self.exec_("import uos\nuos.remove('%s')" % src) + # in Python2 exec is a keyword so one must use "exec_" # but for Python3 we want to provide the nicer version "exec" setattr(Pyboard, "exec", Pyboard.exec_) -def execfile(filename, device='/dev/ttyACM0', baudrate=115200, user='micro', password='python'): + +def execfile(filename, device="/dev/ttyACM0", baudrate=115200, user="micro", password="python"): pyb = Pyboard(device, baudrate, user, password) pyb.enter_raw_repl() output = pyb.execfile(filename) @@ -436,54 +468,62 @@ def execfile(filename, device='/dev/ttyACM0', baudrate=115200, user='micro', pas pyb.exit_raw_repl() pyb.close() + def filesystem_command(pyb, args): def fname_remote(src): - if src.startswith(':'): + if src.startswith(":"): src = src[1:] return src + def fname_cp_dest(src, dest): - src = src.rsplit('/', 1)[-1] - if dest is None or dest == '': + src = src.rsplit("/", 1)[-1] + if dest is None or dest == "": dest = src - elif dest == '.': - dest = './' + src - elif dest.endswith('/'): + elif dest == ".": + dest = "./" + src + elif dest.endswith("/"): dest += src return dest cmd = args[0] args = args[1:] try: - if cmd == 'cp': + if cmd == "cp": srcs = args[:-1] dest = args[-1] - if srcs[0].startswith('./') or dest.startswith(':'): + if srcs[0].startswith("./") or dest.startswith(":"): op = pyb.fs_put - fmt = 'cp %s :%s' + fmt = "cp %s :%s" dest = fname_remote(dest) else: op = pyb.fs_get - fmt = 'cp :%s %s' + fmt = "cp :%s %s" for src in srcs: src = fname_remote(src) dest2 = fname_cp_dest(src, dest) print(fmt % (src, dest2)) op(src, dest2) else: - op = {'ls': pyb.fs_ls, 'cat': pyb.fs_cat, 'mkdir': pyb.fs_mkdir, - 'rmdir': pyb.fs_rmdir, 'rm': pyb.fs_rm}[cmd] - if cmd == 'ls' and not args: - args = [''] + op = { + "ls": pyb.fs_ls, + "cat": pyb.fs_cat, + "mkdir": pyb.fs_mkdir, + "rmdir": pyb.fs_rmdir, + "rm": pyb.fs_rm, + }[cmd] + if cmd == "ls" and not args: + args = [""] for src in args: src = fname_remote(src) - print('%s :%s' % (cmd, src)) + print("%s :%s" % (cmd, src)) op(src) except PyboardError as er: - print(str(er.args[2], 'ascii')) + print(str(er.args[2], "ascii")) pyb.exit_raw_repl() pyb.close() sys.exit(1) + _injected_import_hook_code = """\ import uos, uio class _FS: @@ -511,20 +551,44 @@ uos.umount('/_') del _injected_buf, _FS """ + def main(): import argparse - cmd_parser = argparse.ArgumentParser(description='Run scripts on the pyboard.') - cmd_parser.add_argument('--device', default='/dev/ttyACM0', help='the serial device or the IP address of the pyboard') - cmd_parser.add_argument('-b', '--baudrate', default=115200, help='the baud rate of the serial device') - cmd_parser.add_argument('-u', '--user', default='micro', help='the telnet login username') - cmd_parser.add_argument('-p', '--password', default='python', help='the telnet login password') - cmd_parser.add_argument('-c', '--command', help='program passed in as string') - cmd_parser.add_argument('-w', '--wait', default=0, type=int, help='seconds to wait for USB connected board to become available') + + cmd_parser = argparse.ArgumentParser(description="Run scripts on the pyboard.") + cmd_parser.add_argument( + "--device", + default="/dev/ttyACM0", + help="the serial device or the IP address of the pyboard", + ) + cmd_parser.add_argument( + "-b", "--baudrate", default=115200, help="the baud rate of the serial device" + ) + cmd_parser.add_argument("-u", "--user", default="micro", help="the telnet login username") + cmd_parser.add_argument("-p", "--password", default="python", help="the telnet login password") + cmd_parser.add_argument("-c", "--command", help="program passed in as string") + cmd_parser.add_argument( + "-w", + "--wait", + default=0, + type=int, + help="seconds to wait for USB connected board to become available", + ) group = cmd_parser.add_mutually_exclusive_group() - group.add_argument('--follow', action='store_true', help='follow the output after running the scripts [default if no scripts given]') - group.add_argument('--no-follow', action='store_true', help='Do not follow the output after running the scripts.') - cmd_parser.add_argument('-f', '--filesystem', action='store_true', help='perform a filesystem action') - cmd_parser.add_argument('files', nargs='*', help='input files') + group.add_argument( + "--follow", + action="store_true", + help="follow the output after running the scripts [default if no scripts given]", + ) + group.add_argument( + "--no-follow", + action="store_true", + help="Do not follow the output after running the scripts.", + ) + cmd_parser.add_argument( + "-f", "--filesystem", action="store_true", help="perform a filesystem action" + ) + cmd_parser.add_argument("files", nargs="*", help="input files") args = cmd_parser.parse_args() # open the connection to the pyboard @@ -551,7 +615,9 @@ def main(): pyb.exec_raw_no_follow(buf) ret_err = None else: - ret, ret_err = pyb.exec_raw(buf, timeout=None, data_consumer=stdout_write_bytes) + ret, ret_err = pyb.exec_raw( + buf, timeout=None, data_consumer=stdout_write_bytes + ) except PyboardError as er: print(er) pyb.close() @@ -571,14 +637,14 @@ def main(): # run the command, if given if args.command is not None: - execbuffer(args.command.encode('utf-8')) + execbuffer(args.command.encode("utf-8")) # run any files for filename in args.files: - with open(filename, 'rb') as f: + with open(filename, "rb") as f: pyfile = f.read() - if filename.endswith('.mpy') and pyfile[0] == ord('M'): - pyb.exec_('_injected_buf=' + repr(pyfile)) + if filename.endswith(".mpy") and pyfile[0] == ord("M"): + pyb.exec_("_injected_buf=" + repr(pyfile)) pyfile = _injected_import_hook_code execbuffer(pyfile) @@ -602,5 +668,6 @@ def main(): # close the connection to the pyboard pyb.close() + if __name__ == "__main__": main() |