aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Williams <dcbw@redhat.com>2012-01-21 12:55:16 -0600
committerDan Williams <dcbw@redhat.com>2012-01-21 12:55:16 -0600
commitc37fdf5f94711ffbdf966f2ab366bbd047dfc147 (patch)
tree6fdac73a3c57f1415dddb7002d018e7f27a06e25
parenta9d6b5a8b62fc0fc07d7c3fab99b25c67474c68d (diff)
decode: updates all around
Rewrite packet handling so packets can span multiple USB URBs (which sometimes happens with WMC) and also add a bunch more WMC decoding stuff.
-rwxr-xr-xdecode/decode.py34
-rw-r--r--decode/packet.py130
-rw-r--r--decode/qmux.py6
-rw-r--r--decode/wmc.py215
4 files changed, 244 insertions, 141 deletions
diff --git a/decode/decode.py b/decode/decode.py
index 5be72cdc..7196f9de 100755
--- a/decode/decode.py
+++ b/decode/decode.py
@@ -41,31 +41,17 @@ if __name__ == "__main__":
lines = f.readlines()
f.close()
- in_packet = False
- finish_packet = False
- pkt_lines = []
+ packet = None
for l in lines:
- if l[0] == '[':
- # Start of a packet
- if "] >>> URB" in l or "] <<< URB" in l:
- if in_packet == True:
- in_packet = False
- finish_packet = True
- else:
- in_packet = True
- elif "] UsbSnoop - " in l:
- # Packet done?
- if in_packet == True:
- in_packet = False
- finish_packet = True
-
- if finish_packet == True:
- packets.append(Packet(pkt_lines, control, transfer))
- pkt_lines = []
- finish_packet = False
-
- if in_packet == True:
- pkt_lines.append(l)
+ if packet:
+ done = packet.add_line(l)
+ if done:
+ packets.append(packet)
+ packet = None
+ else:
+ packet = Packet(l, control, transfer)
+ if packet.direction == defs.TO_UNKNOWN:
+ packet = None
for p in packets:
p.show()
diff --git a/decode/packet.py b/decode/packet.py
index a139021a..bc702faa 100644
--- a/decode/packet.py
+++ b/decode/packet.py
@@ -53,8 +53,9 @@ def get_urb_info(l):
if idx >= 0:
direction = defs.TO_HOST
else:
- raise Exception("Invalid packet start line")
+ return (defs.TO_UNKNOWN, -1)
+ # Yay, valid packet, grab URB number
numstr = ""
for c in l[idx + 9:]:
if c.isdigit():
@@ -68,66 +69,90 @@ def get_urb_info(l):
return (direction, int(numstr))
class Packet:
- def __init__(self, lines, control_prot, transfer_prot):
+ def __init__(self, line, control_prot, transfer_prot):
self.direction = defs.TO_UNKNOWN
self.func = URBF_UNKNOWN
+ self.control_prot = control_prot
+ self.transfer_prot = transfer_prot
self.extra = []
self.data = None
self.urbnum = 0
self.protocol = None
self.has_data = False
self.typecode = None
-
- # Parse the packet
-
- (self.direction, self.urbnum) = get_urb_info(lines[0])
-
- try:
- (self.func, self.has_data, self.typecode) = funcs[lines[1].strip()]
- except KeyError:
- raise KeyError("URB function %s not handled" % lines[1].strip())
-
- if self.func == URBF_TRANSFER:
- self.protocol = transfer_prot
- elif self.func == URBF_CONTROL:
- self.protocol = control_prot
-
- # Parse transfer buffer data
- in_data = False
- data = ""
- for i in range(2, len(lines)):
- l = lines[i].strip()
- if self.has_data:
- if l.startswith("TransferBufferMDL"):
- if in_data == True:
- raise Exception("Already in data")
- in_data = True
- elif l.startswith("UrbLink"):
- in_data = False
- elif in_data and len(l) and not "no data supplied" in l:
- d = l[l.index(": ") + 2:] # get data alone
- data += d.replace(" ", "")
+ self.lines = []
+ self.in_data = False
+ self.data_complete = False
+ self.tmpdata = ""
+ self.fcomplete = None
+ self.funpack = None
+ self.fshow = None
+
+ # Check if this is actually a packet
+ self.lines.append(line)
+ (self.direction, self.urbnum) = get_urb_info(line)
+
+ def add_line(self, line):
+ line = line.strip()
+ if not len(line):
+ return
+ self.lines.append(line)
+
+ if line[0] == '[':
+ # Usually the end of a packet, but if we need data from the next
+ # packet keep going
+ if self.has_data and not self.data_complete:
+ return False
+ return True
+
+ if not self.typecode:
+ # haven't gotten our "-- URB_FUNCTION_xxxx" line yet
+ if line.find("-- URB_FUNCTION_") >= 0:
+ try:
+ (self.func, self.has_data, self.typecode) = funcs[line]
+ except KeyError:
+ raise KeyError("URB function %s not handled" % line)
+
+ if self.func == URBF_TRANSFER:
+ self.protocol = self.transfer_prot
+ elif self.func == URBF_CONTROL:
+ self.protocol = self.control_prot
+
+ if self.protocol:
+ exec "from %s import get_funcs" % self.protocol
+ (self.fcomplete, self.funpack, self.fshow) = get_funcs()
else:
- self.extra.append(l)
-
- if len(data) > 0:
- self.parse_data(data)
-
- def get_funcs(self):
- if self.protocol:
- exec "from %s import get_funcs" % self.protocol
- return get_funcs()
- return (None, None)
+ return False # not done; need more lines
+
+ if line.find("TransferBufferMDL = ") >= 0 and self.has_data:
+ self.in_data = True
+ return False # not done; need more lines
+
+ if line.find("UrbLink = ") >= 0:
+ if self.in_data:
+ self.in_data = False
+
+ # special case: zero-length data means complete
+ if len(self.tmpdata) == 0:
+ self.data_complete = True
+ return True
+
+ if self.fcomplete:
+ self.data_complete = self.fcomplete(self.tmpdata, self.direction)
+ if self.data_complete:
+ self.data = self.funpack(self.tmpdata, self.direction)
+ return self.data_complete
+ else:
+ self.data = binascii.unhexlify(self.tmpdata)
+ self.data_complete = True
+ return False # not done; need more lines
- def parse_data(self, data):
- if not self.has_data:
- raise Exception("Data only valid for URBF_TRANSFER or URBF_CONTROL")
+ if self.in_data:
+ if len(line) and not "no data supplied" in line:
+ d = line[line.index(": ") + 2:] # get data alone
+ self.tmpdata += d.replace(" ", "")
- (unpack, show) = self.get_funcs()
- if unpack:
- self.data = unpack(data, self.direction)
- else:
- self.data = binascii.unhexlify(data)
+ return False # not done; need more lines
def add_ascii(self, line, items):
if len(line) < 53:
@@ -188,7 +213,6 @@ class Packet:
print prefix + output
print ""
- (unpack, show) = self.get_funcs()
- if show:
- show(self.data, " " * 8, self.direction)
+ if self.fshow:
+ self.fshow(self.data, " " * 8, self.direction)
diff --git a/decode/qmux.py b/decode/qmux.py
index 36a2f302..4ff5cad7 100644
--- a/decode/qmux.py
+++ b/decode/qmux.py
@@ -24,6 +24,10 @@ TP_REQUEST = 0x00
TP_RESPONSE = 0x02
TP_INDICATION = 0x04
+def complete(data, direction):
+ # We don't handle QMUX frames spanning packets yet
+ return True
+
def unpack(data, direction):
return binascii.unhexlify(data)
@@ -183,5 +187,5 @@ def show(data, prefix, direction):
print ""
def get_funcs():
- return (unpack, show)
+ return (complete, unpack, show)
diff --git a/decode/wmc.py b/decode/wmc.py
index a0759d67..1a64c80f 100644
--- a/decode/wmc.py
+++ b/decode/wmc.py
@@ -18,6 +18,18 @@ import binascii
import struct
import defs
+def complete(data, direction):
+ if direction == defs.TO_MODEM:
+ if data[len(data) - 2:] == "0d":
+ return True
+ elif direction == defs.TO_HOST:
+ if data[len(data) - 6:] == "30307e":
+ return True
+ else:
+ raise ValueError("No data direction")
+ return False
+
+
def unpack(data, direction):
# unpack the data
if direction == defs.TO_MODEM:
@@ -74,37 +86,63 @@ def show_device_info(data, prefix, direction):
fmt = fmt + "H" # eri_ver?
fmt = fmt + "3s" # unknown6
fmt = fmt + "64s" # unknown7
- fmt = fmt + "20s" # meid
- fmt = fmt + "22s" # imei
- fmt = fmt + "16s" # unknown9
- fmt = fmt + "22s" # iccid
- fmt = fmt + "4s" # unknown10
- fmt = fmt + "16s" # MCC
- fmt = fmt + "16s" # MNC
- fmt = fmt + "4s" # unknown11
- fmt = fmt + "4s" # unknown12
- fmt = fmt + "4s" # unknown13
- fmt = fmt + "1s"
+ fmt = fmt + "s" # unknown8
+ fmt = fmt + "14s" # meid
+ fmt = fmt + "6s" # unknown9
+ fmt = fmt + "16s" # imei
+ fmt = fmt + "6s" # unknown10
+ fmt = fmt + "16s" # unknown11
+ fmt = fmt + "20s" # iccid
+ fmt = fmt + "6s" # unknown12
expected = struct.calcsize(fmt)
- if len(data) != expected:
+ if len(data) >= expected:
+ (u1, manf, model, fwrev, hwrev, u2, u3, cdmamin, u4, homesid, u5, eriver, \
+ u6, u7, u8, meid, u9, imei, u10, u11, iccid, u12) = struct.unpack(fmt, data[:expected])
+ print prefix + " Manf: %s" % manf
+ print prefix + " Model: %s" % model
+ print prefix + " FW Rev: %s" % fwrev
+ print prefix + " HW Rev: %s" % hwrev
+ print prefix + " MIN: %s" % cdmamin
+ print prefix + " Home SID: %d" % homesid
+ print prefix + " ERI Ver: %d" % eriver
+ print prefix + " MEID: %s" % meid
+ print prefix + " IMEI: %s" % imei
+ print prefix + " U11 : %s" % u11
+ print prefix + " ICCID: %s" % iccid
+ else:
raise ValueError("Unexpected Info command response len (got %d expected %d)" % (len(data), expected))
- (u1, manf, model, fwrev, hwrev, u2, u3, cdmamin, u4, homesid, u5, eriver, \
- u6, u7, meid, imei, u9, iccid, u10, mcc, mnc, u11, u12, u13, u14) = struct.unpack(fmt, data)
-
- print prefix + " Manf: %s" % manf
- print prefix + " Model: %s" % model
- print prefix + " FW Rev: %s" % fwrev
- print prefix + " HW Rev: %s" % hwrev
- print prefix + " MIN: %s" % cdmamin
- print prefix + " Home SID: %d" % homesid
- print prefix + " ERI Ver: %d" % eriver
- print prefix + " MEID: %s" % meid
- print prefix + " IMEI: %s" % imei
- print prefix + " Unk9: %s" % u9
- print prefix + " ICCID: %s" % iccid
- print prefix + " MCC: %s" % mcc
- print prefix + " MNC: %s" % mnc
+
+ fmt3 = "<"
+ fmt3 = fmt3 + "16s" # MCC
+ fmt3 = fmt3 + "16s" # MNC
+ fmt3 = fmt3 + "4s" # unknown11
+ fmt3 = fmt3 + "4s" # unknown12
+ fmt3 = fmt3 + "4s" # unknown13
+ expected3 = struct.calcsize(fmt3)
+ if len(data) >= expected + expected3:
+ (mcc, mnc, u11, u12, u13) = struct.unpack(fmt3, data[expected:])
+ print prefix + " MCC: %s" % mcc
+ print prefix + " MNC: %s" % mnc
+
+
+def state_to_string(state):
+ states = { 0: "unknown",
+ 1: "idle",
+ 2: "connecting",
+ 3: "authenticating",
+ 4: "connected",
+ 5: "dormant",
+ 6: "updating NAM",
+ 7: "updating PRL",
+ 8: "disconnecting",
+ 9: "error",
+ 10: "updating UICC",
+ 11: "updating PLMN" }
+ try:
+ return states[state]
+ except KeyError:
+ return "unknown"
def show_connection_info(data, prefix, direction):
if direction != defs.TO_HOST:
@@ -113,22 +151,29 @@ def show_connection_info(data, prefix, direction):
fmt = "<"
fmt = fmt + "I" # rx_bytes
fmt = fmt + "I" # tx_bytes
- fmt = fmt + "8s" # unknown3
- fmt = fmt + "B" # unknown4
- fmt = fmt + "7s" # unknown7
- fmt = fmt + "16s" # ip4_address
- fmt = fmt + "8s" # netmask?
- fmt = fmt + "40s" # ip6_address
+ fmt = fmt + "8s" # unknown1
+ fmt = fmt + "B" # state
+ fmt = fmt + "3s" # unknown2
expected = struct.calcsize(fmt)
- if len(data) != expected:
- raise ValueError("Unexpected IP Info command response len (got %d expected %d)" % (len(data), expected))
- (rxb, txb, u3, u4, u7, ip4addr, netmask, ip6addr) = struct.unpack(fmt, data)
-
- print prefix + " RX Bytes: %d" % rxb
- print prefix + " TX Bytes: %d" % txb
- print prefix + " IP4 Addr: %s" % ip4addr
- print prefix + " IP6 Addr: %s" % ip6addr
+ if len(data) >= expected:
+ (rxb, txb, u1, state, u2) = struct.unpack(fmt, data[:expected])
+ print prefix + " RX Bytes: %d" % rxb
+ print prefix + " TX Bytes: %d" % txb
+ print prefix + " State: %d (%s)" % (state, state_to_string (state))
+ else:
+ raise ValueError("Unexpected Connection Info command response len (got %d expected %d)" % (len(data), expected))
+
+ fmt3 = "<"
+ fmt3 = fmt3 + "4s" # unknown3
+ fmt3 = fmt3 + "16s" # ip4_address
+ fmt3 = fmt3 + "8s" # netmask?
+ fmt3 = fmt3 + "40s" # ip6_address
+ expected3 = struct.calcsize(fmt3)
+ if len(data) >= expected + expected3:
+ (u3, ip4addr, netmask, ip6addr) = struct.unpack(fmt3, data[expected:])
+ print prefix + " IP4 Addr: %s" % ip4addr
+ print prefix + " IP6 Addr: %s" % ip6addr
def get_signal(item):
if item == 0x7D:
@@ -136,6 +181,28 @@ def get_signal(item):
else:
return (item * -1, "")
+def service_to_string(service):
+ services = { 0: "none",
+ 1: "AMPS",
+ 2: "IS95-A",
+ 3: "IS95-B",
+ 4: "GSM",
+ 5: "GPRS",
+ 6: "1xRTT",
+ 7: "EVDO r0",
+ 8: "UMTS",
+ 9: "EVDO rA",
+ 10: "EDGE",
+ 11: "HSDPA",
+ 12: "HSUPA",
+ 13: "HSPA",
+ 14: "LTE",
+ 15: "EVDO rA eHRPD" }
+ try:
+ return services[service]
+ except KeyError:
+ return "unknown"
+
def show_network_info(data, prefix, direction):
if direction != defs.TO_HOST:
return
@@ -143,41 +210,63 @@ def show_network_info(data, prefix, direction):
fmt = "<"
fmt = fmt + "B" # unknown1
fmt = fmt + "3s" # unknown2
- fmt = fmt + "B" # unknown3
+ fmt = fmt + "B" # service
fmt = fmt + "B" # unknown4
fmt = fmt + "10s" # magic
fmt = fmt + "H" # counter1
fmt = fmt + "H" # counter2
fmt = fmt + "B" # unknown5
fmt = fmt + "3s" # unknown6
- fmt = fmt + "B" # cdma1x_dbm
+ fmt = fmt + "B" # 2g_dbm
fmt = fmt + "3s" # unknown7
fmt = fmt + "16s" # cdma_opname
fmt = fmt + "18s" # unknown8
- fmt = fmt + "B" # hdr_dbm
+ fmt = fmt + "B" # 3g_dbm
fmt = fmt + "3s" # unknown9
fmt = fmt + "B" # unknown10
fmt = fmt + "3s" # unknown11
fmt = fmt + "B" # unknown12
- fmt = fmt + "8s" # lte_opname
- fmt = fmt + "60s" # unknown13
- fmt = fmt + "B" # lte_dbm
- fmt = fmt + "3s" # unknown14
- fmt = fmt + "4s" # unknown15
+ fmt = fmt + "8s" # 3gpp_opname
+ fmt = fmt + "4s" # unknown13
+ fmt = fmt + "I" # unknown14
+ fmt = fmt + "I" # unknown15
+ fmt = fmt + "44s" # unknown16
+ fmt = fmt + "I" # mcc/mnc
expected = struct.calcsize(fmt)
- if len(data) != expected:
- raise ValueError("Unexpected Status command response len (got %d expected %d)" % (len(data), expected))
- (u1, u2, u3, u4, magic, counter1, counter2, u5, u6, cdma_dbm, u7, cdma_opname, \
- u8, hdr_dbm, u9, u10, u11, u12, lte_opname, u13, lte_dbm, u14, u15) = struct.unpack(fmt, data)
-
- print prefix + " Counter1: %s" % counter1
- print prefix + " Counter2: %s" % counter2
- print prefix + " CDMA dBm: %d dBm %s" % get_signal(cdma_dbm)
- print prefix + " CDMA Op: %s" % cdma_opname
- print prefix + " HDR dBm: %d dBm %s" % get_signal(hdr_dbm)
- print prefix + " LTE Op: %s" % lte_opname
- print prefix + " LTE dBm: %d dBm %s" % get_signal(lte_dbm)
+ if len(data) >= expected:
+ (u1, u2, service, u4, magic, counter1, counter2, u5, u6, two_g_dbm, u7, \
+ cdma_opname, u8, three_g_dbm, u9, u10, u11, u12, tgpp_opname, u13, u14, \
+ u15, u16, mccmnc) = struct.unpack(fmt, data[:expected])
+ print prefix + " Counter1: %s" % counter1
+ print prefix + " Counter2: %s" % counter2
+ print prefix + " Service: %d (%s)" % (service, service_to_string (service))
+ print prefix + " 2G dBm: %d dBm %s" % get_signal(two_g_dbm)
+ print prefix + " 3G dBm: %d dBm %s" % get_signal(three_g_dbm)
+ print prefix + " CDMA Op: %s" % cdma_opname
+ print prefix + " 3GPP Op: %s" % tgpp_opname
+
+ # handle 2-digit MNC
+ if mccmnc < 100000:
+ mccmnc *= 10;
+
+ mcc = mccmnc / 1000
+ mnc = mccmnc - (mcc * 1000)
+ if mcc > 100:
+ print prefix + " MCC/MNC: %u-%u" % (mcc, mnc)
+
+ else:
+ raise ValueError("Unexpected Network Info command response len (got %d expected %d)" % (len(data), expected))
+
+ fmt3 = "<"
+ fmt3 = fmt3 + "B" # lte_dbm
+ fmt3 = fmt3 + "3s" # unknown17
+ fmt3 = fmt3 + "4s" # unknown18
+ expected3 = struct.calcsize(fmt3)
+ if len(data) >= expected + expected3:
+ (lte_dbm, u17, u18) = struct.unpack(fmt3, data[expected:])
+ print prefix + " LTE dBm: %d dBm %s" % get_signal(lte_dbm)
+
def show_init(data, prefix, direction):
show_data(data, prefix)
@@ -264,5 +353,5 @@ def show(data, prefix, direction):
print ""
def get_funcs():
- return (unpack, show)
+ return (complete, unpack, show)