diff options
author | Dan Williams <dcbw@redhat.com> | 2012-01-21 12:55:16 -0600 |
---|---|---|
committer | Dan Williams <dcbw@redhat.com> | 2012-01-21 12:55:16 -0600 |
commit | c37fdf5f94711ffbdf966f2ab366bbd047dfc147 (patch) | |
tree | 6fdac73a3c57f1415dddb7002d018e7f27a06e25 | |
parent | a9d6b5a8b62fc0fc07d7c3fab99b25c67474c68d (diff) |
decode: updates all around
Rewrite packet handling so packets can span multiple USB URBs
(which sometimes happens with WMC) and also add a bunch more
WMC decoding stuff.
-rwxr-xr-x | decode/decode.py | 34 | ||||
-rw-r--r-- | decode/packet.py | 130 | ||||
-rw-r--r-- | decode/qmux.py | 6 | ||||
-rw-r--r-- | decode/wmc.py | 215 |
4 files changed, 244 insertions, 141 deletions
diff --git a/decode/decode.py b/decode/decode.py index 5be72cdc..7196f9de 100755 --- a/decode/decode.py +++ b/decode/decode.py @@ -41,31 +41,17 @@ if __name__ == "__main__": lines = f.readlines() f.close() - in_packet = False - finish_packet = False - pkt_lines = [] + packet = None for l in lines: - if l[0] == '[': - # Start of a packet - if "] >>> URB" in l or "] <<< URB" in l: - if in_packet == True: - in_packet = False - finish_packet = True - else: - in_packet = True - elif "] UsbSnoop - " in l: - # Packet done? - if in_packet == True: - in_packet = False - finish_packet = True - - if finish_packet == True: - packets.append(Packet(pkt_lines, control, transfer)) - pkt_lines = [] - finish_packet = False - - if in_packet == True: - pkt_lines.append(l) + if packet: + done = packet.add_line(l) + if done: + packets.append(packet) + packet = None + else: + packet = Packet(l, control, transfer) + if packet.direction == defs.TO_UNKNOWN: + packet = None for p in packets: p.show() diff --git a/decode/packet.py b/decode/packet.py index a139021a..bc702faa 100644 --- a/decode/packet.py +++ b/decode/packet.py @@ -53,8 +53,9 @@ def get_urb_info(l): if idx >= 0: direction = defs.TO_HOST else: - raise Exception("Invalid packet start line") + return (defs.TO_UNKNOWN, -1) + # Yay, valid packet, grab URB number numstr = "" for c in l[idx + 9:]: if c.isdigit(): @@ -68,66 +69,90 @@ def get_urb_info(l): return (direction, int(numstr)) class Packet: - def __init__(self, lines, control_prot, transfer_prot): + def __init__(self, line, control_prot, transfer_prot): self.direction = defs.TO_UNKNOWN self.func = URBF_UNKNOWN + self.control_prot = control_prot + self.transfer_prot = transfer_prot self.extra = [] self.data = None self.urbnum = 0 self.protocol = None self.has_data = False self.typecode = None - - # Parse the packet - - (self.direction, self.urbnum) = get_urb_info(lines[0]) - - try: - (self.func, self.has_data, self.typecode) = funcs[lines[1].strip()] - except KeyError: - raise KeyError("URB function %s not handled" % lines[1].strip()) - - if self.func == URBF_TRANSFER: - self.protocol = transfer_prot - elif self.func == URBF_CONTROL: - self.protocol = control_prot - - # Parse transfer buffer data - in_data = False - data = "" - for i in range(2, len(lines)): - l = lines[i].strip() - if self.has_data: - if l.startswith("TransferBufferMDL"): - if in_data == True: - raise Exception("Already in data") - in_data = True - elif l.startswith("UrbLink"): - in_data = False - elif in_data and len(l) and not "no data supplied" in l: - d = l[l.index(": ") + 2:] # get data alone - data += d.replace(" ", "") + self.lines = [] + self.in_data = False + self.data_complete = False + self.tmpdata = "" + self.fcomplete = None + self.funpack = None + self.fshow = None + + # Check if this is actually a packet + self.lines.append(line) + (self.direction, self.urbnum) = get_urb_info(line) + + def add_line(self, line): + line = line.strip() + if not len(line): + return + self.lines.append(line) + + if line[0] == '[': + # Usually the end of a packet, but if we need data from the next + # packet keep going + if self.has_data and not self.data_complete: + return False + return True + + if not self.typecode: + # haven't gotten our "-- URB_FUNCTION_xxxx" line yet + if line.find("-- URB_FUNCTION_") >= 0: + try: + (self.func, self.has_data, self.typecode) = funcs[line] + except KeyError: + raise KeyError("URB function %s not handled" % line) + + if self.func == URBF_TRANSFER: + self.protocol = self.transfer_prot + elif self.func == URBF_CONTROL: + self.protocol = self.control_prot + + if self.protocol: + exec "from %s import get_funcs" % self.protocol + (self.fcomplete, self.funpack, self.fshow) = get_funcs() else: - self.extra.append(l) - - if len(data) > 0: - self.parse_data(data) - - def get_funcs(self): - if self.protocol: - exec "from %s import get_funcs" % self.protocol - return get_funcs() - return (None, None) + return False # not done; need more lines + + if line.find("TransferBufferMDL = ") >= 0 and self.has_data: + self.in_data = True + return False # not done; need more lines + + if line.find("UrbLink = ") >= 0: + if self.in_data: + self.in_data = False + + # special case: zero-length data means complete + if len(self.tmpdata) == 0: + self.data_complete = True + return True + + if self.fcomplete: + self.data_complete = self.fcomplete(self.tmpdata, self.direction) + if self.data_complete: + self.data = self.funpack(self.tmpdata, self.direction) + return self.data_complete + else: + self.data = binascii.unhexlify(self.tmpdata) + self.data_complete = True + return False # not done; need more lines - def parse_data(self, data): - if not self.has_data: - raise Exception("Data only valid for URBF_TRANSFER or URBF_CONTROL") + if self.in_data: + if len(line) and not "no data supplied" in line: + d = line[line.index(": ") + 2:] # get data alone + self.tmpdata += d.replace(" ", "") - (unpack, show) = self.get_funcs() - if unpack: - self.data = unpack(data, self.direction) - else: - self.data = binascii.unhexlify(data) + return False # not done; need more lines def add_ascii(self, line, items): if len(line) < 53: @@ -188,7 +213,6 @@ class Packet: print prefix + output print "" - (unpack, show) = self.get_funcs() - if show: - show(self.data, " " * 8, self.direction) + if self.fshow: + self.fshow(self.data, " " * 8, self.direction) diff --git a/decode/qmux.py b/decode/qmux.py index 36a2f302..4ff5cad7 100644 --- a/decode/qmux.py +++ b/decode/qmux.py @@ -24,6 +24,10 @@ TP_REQUEST = 0x00 TP_RESPONSE = 0x02 TP_INDICATION = 0x04 +def complete(data, direction): + # We don't handle QMUX frames spanning packets yet + return True + def unpack(data, direction): return binascii.unhexlify(data) @@ -183,5 +187,5 @@ def show(data, prefix, direction): print "" def get_funcs(): - return (unpack, show) + return (complete, unpack, show) diff --git a/decode/wmc.py b/decode/wmc.py index a0759d67..1a64c80f 100644 --- a/decode/wmc.py +++ b/decode/wmc.py @@ -18,6 +18,18 @@ import binascii import struct import defs +def complete(data, direction): + if direction == defs.TO_MODEM: + if data[len(data) - 2:] == "0d": + return True + elif direction == defs.TO_HOST: + if data[len(data) - 6:] == "30307e": + return True + else: + raise ValueError("No data direction") + return False + + def unpack(data, direction): # unpack the data if direction == defs.TO_MODEM: @@ -74,37 +86,63 @@ def show_device_info(data, prefix, direction): fmt = fmt + "H" # eri_ver? fmt = fmt + "3s" # unknown6 fmt = fmt + "64s" # unknown7 - fmt = fmt + "20s" # meid - fmt = fmt + "22s" # imei - fmt = fmt + "16s" # unknown9 - fmt = fmt + "22s" # iccid - fmt = fmt + "4s" # unknown10 - fmt = fmt + "16s" # MCC - fmt = fmt + "16s" # MNC - fmt = fmt + "4s" # unknown11 - fmt = fmt + "4s" # unknown12 - fmt = fmt + "4s" # unknown13 - fmt = fmt + "1s" + fmt = fmt + "s" # unknown8 + fmt = fmt + "14s" # meid + fmt = fmt + "6s" # unknown9 + fmt = fmt + "16s" # imei + fmt = fmt + "6s" # unknown10 + fmt = fmt + "16s" # unknown11 + fmt = fmt + "20s" # iccid + fmt = fmt + "6s" # unknown12 expected = struct.calcsize(fmt) - if len(data) != expected: + if len(data) >= expected: + (u1, manf, model, fwrev, hwrev, u2, u3, cdmamin, u4, homesid, u5, eriver, \ + u6, u7, u8, meid, u9, imei, u10, u11, iccid, u12) = struct.unpack(fmt, data[:expected]) + print prefix + " Manf: %s" % manf + print prefix + " Model: %s" % model + print prefix + " FW Rev: %s" % fwrev + print prefix + " HW Rev: %s" % hwrev + print prefix + " MIN: %s" % cdmamin + print prefix + " Home SID: %d" % homesid + print prefix + " ERI Ver: %d" % eriver + print prefix + " MEID: %s" % meid + print prefix + " IMEI: %s" % imei + print prefix + " U11 : %s" % u11 + print prefix + " ICCID: %s" % iccid + else: raise ValueError("Unexpected Info command response len (got %d expected %d)" % (len(data), expected)) - (u1, manf, model, fwrev, hwrev, u2, u3, cdmamin, u4, homesid, u5, eriver, \ - u6, u7, meid, imei, u9, iccid, u10, mcc, mnc, u11, u12, u13, u14) = struct.unpack(fmt, data) - - print prefix + " Manf: %s" % manf - print prefix + " Model: %s" % model - print prefix + " FW Rev: %s" % fwrev - print prefix + " HW Rev: %s" % hwrev - print prefix + " MIN: %s" % cdmamin - print prefix + " Home SID: %d" % homesid - print prefix + " ERI Ver: %d" % eriver - print prefix + " MEID: %s" % meid - print prefix + " IMEI: %s" % imei - print prefix + " Unk9: %s" % u9 - print prefix + " ICCID: %s" % iccid - print prefix + " MCC: %s" % mcc - print prefix + " MNC: %s" % mnc + + fmt3 = "<" + fmt3 = fmt3 + "16s" # MCC + fmt3 = fmt3 + "16s" # MNC + fmt3 = fmt3 + "4s" # unknown11 + fmt3 = fmt3 + "4s" # unknown12 + fmt3 = fmt3 + "4s" # unknown13 + expected3 = struct.calcsize(fmt3) + if len(data) >= expected + expected3: + (mcc, mnc, u11, u12, u13) = struct.unpack(fmt3, data[expected:]) + print prefix + " MCC: %s" % mcc + print prefix + " MNC: %s" % mnc + + +def state_to_string(state): + states = { 0: "unknown", + 1: "idle", + 2: "connecting", + 3: "authenticating", + 4: "connected", + 5: "dormant", + 6: "updating NAM", + 7: "updating PRL", + 8: "disconnecting", + 9: "error", + 10: "updating UICC", + 11: "updating PLMN" } + try: + return states[state] + except KeyError: + return "unknown" def show_connection_info(data, prefix, direction): if direction != defs.TO_HOST: @@ -113,22 +151,29 @@ def show_connection_info(data, prefix, direction): fmt = "<" fmt = fmt + "I" # rx_bytes fmt = fmt + "I" # tx_bytes - fmt = fmt + "8s" # unknown3 - fmt = fmt + "B" # unknown4 - fmt = fmt + "7s" # unknown7 - fmt = fmt + "16s" # ip4_address - fmt = fmt + "8s" # netmask? - fmt = fmt + "40s" # ip6_address + fmt = fmt + "8s" # unknown1 + fmt = fmt + "B" # state + fmt = fmt + "3s" # unknown2 expected = struct.calcsize(fmt) - if len(data) != expected: - raise ValueError("Unexpected IP Info command response len (got %d expected %d)" % (len(data), expected)) - (rxb, txb, u3, u4, u7, ip4addr, netmask, ip6addr) = struct.unpack(fmt, data) - - print prefix + " RX Bytes: %d" % rxb - print prefix + " TX Bytes: %d" % txb - print prefix + " IP4 Addr: %s" % ip4addr - print prefix + " IP6 Addr: %s" % ip6addr + if len(data) >= expected: + (rxb, txb, u1, state, u2) = struct.unpack(fmt, data[:expected]) + print prefix + " RX Bytes: %d" % rxb + print prefix + " TX Bytes: %d" % txb + print prefix + " State: %d (%s)" % (state, state_to_string (state)) + else: + raise ValueError("Unexpected Connection Info command response len (got %d expected %d)" % (len(data), expected)) + + fmt3 = "<" + fmt3 = fmt3 + "4s" # unknown3 + fmt3 = fmt3 + "16s" # ip4_address + fmt3 = fmt3 + "8s" # netmask? + fmt3 = fmt3 + "40s" # ip6_address + expected3 = struct.calcsize(fmt3) + if len(data) >= expected + expected3: + (u3, ip4addr, netmask, ip6addr) = struct.unpack(fmt3, data[expected:]) + print prefix + " IP4 Addr: %s" % ip4addr + print prefix + " IP6 Addr: %s" % ip6addr def get_signal(item): if item == 0x7D: @@ -136,6 +181,28 @@ def get_signal(item): else: return (item * -1, "") +def service_to_string(service): + services = { 0: "none", + 1: "AMPS", + 2: "IS95-A", + 3: "IS95-B", + 4: "GSM", + 5: "GPRS", + 6: "1xRTT", + 7: "EVDO r0", + 8: "UMTS", + 9: "EVDO rA", + 10: "EDGE", + 11: "HSDPA", + 12: "HSUPA", + 13: "HSPA", + 14: "LTE", + 15: "EVDO rA eHRPD" } + try: + return services[service] + except KeyError: + return "unknown" + def show_network_info(data, prefix, direction): if direction != defs.TO_HOST: return @@ -143,41 +210,63 @@ def show_network_info(data, prefix, direction): fmt = "<" fmt = fmt + "B" # unknown1 fmt = fmt + "3s" # unknown2 - fmt = fmt + "B" # unknown3 + fmt = fmt + "B" # service fmt = fmt + "B" # unknown4 fmt = fmt + "10s" # magic fmt = fmt + "H" # counter1 fmt = fmt + "H" # counter2 fmt = fmt + "B" # unknown5 fmt = fmt + "3s" # unknown6 - fmt = fmt + "B" # cdma1x_dbm + fmt = fmt + "B" # 2g_dbm fmt = fmt + "3s" # unknown7 fmt = fmt + "16s" # cdma_opname fmt = fmt + "18s" # unknown8 - fmt = fmt + "B" # hdr_dbm + fmt = fmt + "B" # 3g_dbm fmt = fmt + "3s" # unknown9 fmt = fmt + "B" # unknown10 fmt = fmt + "3s" # unknown11 fmt = fmt + "B" # unknown12 - fmt = fmt + "8s" # lte_opname - fmt = fmt + "60s" # unknown13 - fmt = fmt + "B" # lte_dbm - fmt = fmt + "3s" # unknown14 - fmt = fmt + "4s" # unknown15 + fmt = fmt + "8s" # 3gpp_opname + fmt = fmt + "4s" # unknown13 + fmt = fmt + "I" # unknown14 + fmt = fmt + "I" # unknown15 + fmt = fmt + "44s" # unknown16 + fmt = fmt + "I" # mcc/mnc expected = struct.calcsize(fmt) - if len(data) != expected: - raise ValueError("Unexpected Status command response len (got %d expected %d)" % (len(data), expected)) - (u1, u2, u3, u4, magic, counter1, counter2, u5, u6, cdma_dbm, u7, cdma_opname, \ - u8, hdr_dbm, u9, u10, u11, u12, lte_opname, u13, lte_dbm, u14, u15) = struct.unpack(fmt, data) - - print prefix + " Counter1: %s" % counter1 - print prefix + " Counter2: %s" % counter2 - print prefix + " CDMA dBm: %d dBm %s" % get_signal(cdma_dbm) - print prefix + " CDMA Op: %s" % cdma_opname - print prefix + " HDR dBm: %d dBm %s" % get_signal(hdr_dbm) - print prefix + " LTE Op: %s" % lte_opname - print prefix + " LTE dBm: %d dBm %s" % get_signal(lte_dbm) + if len(data) >= expected: + (u1, u2, service, u4, magic, counter1, counter2, u5, u6, two_g_dbm, u7, \ + cdma_opname, u8, three_g_dbm, u9, u10, u11, u12, tgpp_opname, u13, u14, \ + u15, u16, mccmnc) = struct.unpack(fmt, data[:expected]) + print prefix + " Counter1: %s" % counter1 + print prefix + " Counter2: %s" % counter2 + print prefix + " Service: %d (%s)" % (service, service_to_string (service)) + print prefix + " 2G dBm: %d dBm %s" % get_signal(two_g_dbm) + print prefix + " 3G dBm: %d dBm %s" % get_signal(three_g_dbm) + print prefix + " CDMA Op: %s" % cdma_opname + print prefix + " 3GPP Op: %s" % tgpp_opname + + # handle 2-digit MNC + if mccmnc < 100000: + mccmnc *= 10; + + mcc = mccmnc / 1000 + mnc = mccmnc - (mcc * 1000) + if mcc > 100: + print prefix + " MCC/MNC: %u-%u" % (mcc, mnc) + + else: + raise ValueError("Unexpected Network Info command response len (got %d expected %d)" % (len(data), expected)) + + fmt3 = "<" + fmt3 = fmt3 + "B" # lte_dbm + fmt3 = fmt3 + "3s" # unknown17 + fmt3 = fmt3 + "4s" # unknown18 + expected3 = struct.calcsize(fmt3) + if len(data) >= expected + expected3: + (lte_dbm, u17, u18) = struct.unpack(fmt3, data[expected:]) + print prefix + " LTE dBm: %d dBm %s" % get_signal(lte_dbm) + def show_init(data, prefix, direction): show_data(data, prefix) @@ -264,5 +353,5 @@ def show(data, prefix, direction): print "" def get_funcs(): - return (unpack, show) + return (complete, unpack, show) |