From 0f513c0403e749d39af4203dc931c6792d35486d Mon Sep 17 00:00:00 2001 From: FlUxIuS Date: Wed, 18 Mar 2020 17:32:31 +0100 Subject: [PATCH] Fixing a bug for Piggybacked packets and updating layer's name --- LoRa_PHYDecode.py | 2 +- README.md | 4 +- layers/{loraphy.py => loraphy2wan.py} | 191 ++++++++++++++------------ 3 files changed, 108 insertions(+), 89 deletions(-) rename layers/{loraphy.py => loraphy2wan.py} (83%) diff --git a/LoRa_PHYDecode.py b/LoRa_PHYDecode.py index 8cb936e..055d0c3 100644 --- a/LoRa_PHYDecode.py +++ b/LoRa_PHYDecode.py @@ -6,7 +6,7 @@ from __future__ import print_function # Copyright (C) 2020 Sebastien Dudek (@FlUxIuS) at @PentHertz import binascii -from layers.loraphy import * +from layers.loraphy2wan import * import argparse def decodePHY(pkt): diff --git a/README.md b/README.md index 6061819..236626c 100644 --- a/README.md +++ b/README.md @@ -61,7 +61,7 @@ $ sudo python LoRa_PHYDecode.py To generate packets, you can instantiate a Scapy packet as follows: ```python ->>> from layers.loraphy import * +>>> from layers.loraphy2wan import * >>> pkt = LoRa() >>> pkt @@ -96,7 +96,7 @@ Few helpers have been implemented to calculate MIC field, encrypt and decrypt pa As an example, to check if the key `000102030405060708090A0B0C0D0E0F` is used to compute MIC on the following Join-request, we can write a little script as follows: ```python ->>> from layers.loraphy import * +>>> from layers.loraphy2wan import * >>> from lutil.crypto import * >>> key = "000102030405060708090A0B0C0D0E0F" >>> p = '000000006c6f7665636166656d656565746f6f00696953024c49' diff --git a/layers/loraphy.py b/layers/loraphy2wan.py similarity index 83% rename from layers/loraphy.py rename to layers/loraphy2wan.py index 008f9e8..6f683c8 100644 --- a/layers/loraphy.py +++ b/layers/loraphy2wan.py @@ -3,7 +3,13 @@ # LoRa Scapy layers # Copyright (C) 2020 Sebastien Dudek (@FlUxIuS) at @PentHertz -from scapy.all import * +from scapy.packet import Packet +from scapy.fields import BitField, ByteEnumField, ByteField, \ + ConditionalField, IntField, LEShortField, PacketListField, \ + StrFixedLenField, X3BytesField, XByteField, XIntField, \ + XShortField, BitFieldLenField, LEX3BytesField, XBitField, \ + BitEnumField, XLEIntField, StrField, PacketField + class FCtrl_DownLink(Packet): name = "FCtrl_DownLink" @@ -13,6 +19,7 @@ class FCtrl_DownLink(Packet): BitField("FPending", 0, 1), BitFieldLenField("FOptsLen", 0, 4)] + # pylint: disable=R0201 def extract_padding(self, p): return "", p @@ -25,6 +32,7 @@ class FCtrl_UpLink(Packet): BitField("ClassB", 0, 1), BitFieldLenField("FOptsLen", 0, 4)] + # pylint: disable=R0201 def extract_padding(self, p): return "", p @@ -32,10 +40,7 @@ class FCtrl_UpLink(Packet): class DevAddrElem(Packet): name = "DevAddrElem" fields_desc = [XByteField("NwkID", 0x0), - LEX3BytesField("NwkAddr", b"\x00"*3)] - - def extract_padding(self, p): - return "", p + LEX3BytesField("NwkAddr", b"\x00" * 3)] CIDs_up = {0x01: "ResetInd", @@ -46,13 +51,13 @@ CIDs_up = {0x01: "ResetInd", 0x06: "DevStatusReq", 0x07: "NewChannelReq", 0x08: "RXTimingSetupReq", - 0x09: "TxParamSetupReq", # LoRa 1.1 specs from here + 0x09: "TxParamSetupReq", # LoRa 1.1 specs 0x0A: "DlChannelReq", 0x0B: "RekeyInd", 0x0C: "ADRParamSetupReq", 0x0D: "DeviceTimeReq", 0x0E: "ForceRejoinReq", - 0x0F: "RejoinParamSetupReq"} # end of LoRa 1.1 specs + 0x0F: "RejoinParamSetupReq"} # end of LoRa 1.1 specs CIDs_down = {0x01: "ResetConf", @@ -63,12 +68,12 @@ CIDs_down = {0x01: "ResetConf", 0x06: "DevStatusAns", 0x07: "NewChannelAns", 0x08: "RXTimingSetupAns", - 0x09: "TxParamSetupAns", # LoRa 1.1 specs from here + 0x09: "TxParamSetupAns", # LoRa 1.1 specs here 0x0A: "DlChannelAns", 0x0B: "RekeyConf", 0x0C: "ADRParamSetupAns", 0x0D: "DeviceTimeAns", - 0x0F: "RejoinParamSetupAns"} # end of LoRa 1.1 specs + 0x0F: "RejoinParamSetupAns"} # end of LoRa 1.1 specs class ResetInd(Packet): @@ -83,7 +88,6 @@ class ResetConf(Packet): class LinkCheckReq(Packet): name = "LinkCheckReq" - fields_desc = [] class LinkCheckAns(Packet): @@ -116,12 +120,15 @@ class LinkADRAns_Status(Packet): name = "LinkADRAns_Status" fields_desc = [BitField("RFU", 0, 5), BitField("PowerACK", 0, 1), + BitField("DataRate", 0, 1), BitField("ChannelMaskACK", 0, 1)] class LinkADRAns(Packet): name = "LinkADRAns" - fields_desc = [LinkADRAns_Status] + fields_desc = [PacketField("status", + LinkADRAns_Status(), + LinkADRAns_Status)] class DutyCyclePL(Packet): @@ -141,9 +148,9 @@ class DutyCycleAns(Packet): class DLsettings(Packet): name = "DLsettings" - fields_desc = [BitField("RFU", 0, 1), - BitField("RX1DRoffset", 0, 3), - BitField("RX2DataRate", 0, 4)] + fields_desc = [BitField("OptNeg", 0, 1), + XBitField("RX1DRoffset", 0, 3), + XBitField("RX2_Data_rate", 0, 4)] class RXParamSetupReq(Packet): @@ -164,6 +171,7 @@ class RXParamSetupAns(Packet): name = "RXParamSetupAns" fields_desc = [RXParamSetupAns_Status] + Battery_state = {0: "End-device connected to external source", 255: "Battery level unknown"} @@ -226,8 +234,7 @@ class RXTimingSetupAns(Packet): fields_desc = [] -# Specific commands for LoRa 1.1 here - +# Specific commands for LoRa 1.1 here MaxEIRPs = {0: "8 dbm", 1: "10 dbm", @@ -289,7 +296,7 @@ class DevLoraWANversion(Packet): class RekeyInd(Packet): name = "RekeyInd" fields_desc = [PacketListField("LoRaWANversion", b"", - DevLoraWANversion, length_from=lambda pkt:1)] + DevLoraWANversion, length_from=lambda pkt:1)] class RekeyConf(Packet): @@ -325,7 +332,7 @@ class DeviceTimeAns(Packet): class ForceRejoinReq(Packet): - name ="ForceRejoinReq" + name = "ForceRejoinReq" fields_desc = [BitField("RFU", 0, 2), BitField("Period", 0, 3), BitField("Max_Retries", 0, 3), @@ -384,7 +391,8 @@ class MACCommand_up(Packet): RXTimingSetupReq, length_from=lambda pkt:1), lambda pkt:(pkt.CID == 0x08)), - ConditionalField(PacketListField("TxParamSetup", b"", # specific to 1.1 from here + # specific to 1.1 from here + ConditionalField(PacketListField("TxParamSetup", b"", TxParamSetupReq, length_from=lambda pkt:1), lambda pkt:(pkt.CID == 0x09)), @@ -413,6 +421,7 @@ class MACCommand_up(Packet): length_from=lambda pkt:1), lambda pkt:(pkt.CID == 0x0F))] + # pylint: disable=R0201 def extract_padding(self, p): return "", p @@ -477,64 +486,73 @@ class MACCommand_down(Packet): length_from=lambda pkt:1), lambda pkt:(pkt.CID == 0x0F))] - def extract_padding(self, p): - return "", p class FOpts(Packet): name = "FOpts" fields_desc = [ConditionalField(PacketListField("FOpts_up", b"", - MACCommand_up, # piggybacked MAC Command for uplink - length_from=lambda pkt:pkt.FCtrl[0].FOptsLen), - lambda pkt:(pkt.FCtrl[0].FOptsLen > 0 - and pkt.MType & 0b1 == 0 - and pkt.MType >= 0b010)), + # UL piggy MAC Command + MACCommand_up, + length_from=lambda pkt:pkt.FCtrl[0].FOptsLen), # noqa: E501 + lambda pkt:(pkt.FCtrl[0].FOptsLen > 0 and + pkt.MType & 0b1 == 0 and + pkt.MType >= 0b010)), ConditionalField(PacketListField("FOpts_down", b"", - MACCommand_down, # piggybacked MAC Command for downlink - length_from=lambda pkt:pkt.FCtrl[0].FOptsLen), - lambda pkt:(pkt.FCtrl[0].FOptsLen > 0 - and pkt.MType & 0b1 == 1 - and pkt.MType <= 0b101))] + # DL piggy MAC Command + MACCommand_down, + length_from=lambda pkt:pkt.FCtrl[0].FOptsLen), # noqa: E501 + lambda pkt:(pkt.FCtrl[0].FOptsLen > 0 and + pkt.MType & 0b1 == 1 and + pkt.MType <= 0b101))] -def FOptsShow(pkt): + +def FOptsDownShow(pkt): try: - if pkt.FCtrl[0].FOptsLen > 0 and pkt.MType & 0b1 == 0 and pkt.MType >= 0b010: + if pkt.FCtrl[0].FOptsLen > 0 and pkt.MType & 0b1 == 1 and pkt.MType <= 0b101: # noqa: E501 return True - elif pkt.FCtrl[0].FOptsLen > 0 and pkt.MType & 0b1 == 1 and pkt.MType <= 0b101: - return True return False - except: + except Exception: return False + +def FOptsUpShow(pkt): + try: + if pkt.FCtrl[0].FOptsLen > 0 and pkt.MType & 0b1 == 0 and pkt.MType >= 0b010: # noqa: E501 + return True + return False + except Exception: + return False + + class FHDR(Packet): name = "FHDR" - fields_desc = [ConditionalField(PacketListField("DevAddr", b"", DevAddrElem, + fields_desc = [ConditionalField(PacketListField("DevAddr", b"", DevAddrElem, # noqa: E501 length_from=lambda pkt:4), - lambda pkt:(pkt.MType >= 0b010 - and pkt.MType <= 0b101)), + lambda pkt:(pkt.MType >= 0b010 and + pkt.MType <= 0b101)), ConditionalField(PacketListField("FCtrl", b"", FCtrl_DownLink, length_from=lambda pkt:1), - lambda pkt:(pkt.MType & 0b1 == 1 - and pkt.MType <= 0b101)), - ConditionalField(PacketListField("FCtrl", b"", + lambda pkt:(pkt.MType & 0b1 == 1 and + pkt.MType <= 0b101)), + ConditionalField(PacketListField("FCtrl", b"", FCtrl_UpLink, length_from=lambda pkt:1), - lambda pkt:(pkt.MType & 0b1 == 0 - and pkt.MType >= 0b010)), + lambda pkt:(pkt.MType & 0b1 == 0 and + pkt.MType >= 0b010)), ConditionalField(LEShortField("FCnt", 0), - lambda pkt:(pkt.MType >= 0b010 - and pkt.MType <= 0b101)), + lambda pkt:(pkt.MType >= 0b010 and + pkt.MType <= 0b101)), ConditionalField(PacketListField("FOpts_up", b"", MACCommand_up, - length_from=lambda pkt:pkt.FCtrl[0].FOptsLen), - lambda pkt:FOptsShow(pkt)), - ConditionalField(PacketListField("FOpts_down", b"", - MACCommand_down, - length_from=lambda pkt:pkt.FCtrl[0].FOptsLen), - lambda pkt:FOptsShow(pkt))] + length_from=lambda pkt:pkt.FCtrl[0].FOptsLen), # noqa: E501 + FOptsUpShow), + ConditionalField(PacketListField("FOpts_down", b"", + MACCommand_down, + length_from=lambda pkt:pkt.FCtrl[0].FOptsLen), # noqa: E501 + FOptsDownShow)] -FPorts = {0: "NwkSKey"} # anything else is AppSKey +FPorts = {0: "NwkSKey"} # anything else is AppSKey JoinReqTypes = {0xFF: "Join-request", @@ -550,31 +568,25 @@ class Join_Request(Packet): LEShortField("DevNonce", 0x0000)] -class DLsettings(Packet): - name = "DLsettings" - fields_desc = [BitField("OptNeg", 0, 1), - XBitField("RX1DRoffset", 0, 3), - XBitField("RX2_Data_rate", 0, 4)] - - class Join_Accept(Packet): name = "Join_Accept" dcflist = False fields_desc = [LEX3BytesField("JoinAppNonce", 0), - LEX3BytesField("NetID", 0), - XLEIntField("DevAddr", 0), - DLsettings, - XByteField("RxDelay", 0), - ConditionalField(StrFixedLenField("CFList", b"\x00" * 16 , 16), - lambda pkt:(Join_Accept.dcflist is True))] + LEX3BytesField("NetID", 0), + XLEIntField("DevAddr", 0), + DLsettings, + XByteField("RxDelay", 0), + ConditionalField(StrFixedLenField("CFList", b"\x00" * 16, 16), # noqa: E501 + lambda pkt:(Join_Accept.dcflist is True))] + # pylint: disable=R0201 def extract_padding(self, p): return "", p - def __init__(self, packet=""): # CFlist calculated with on rest packet len + def __init__(self, packet=""): # CFList calculated with rest of packet len if len(packet) > 18: Join_Accept.dcflist = True - return super(Join_Accept, self).__init__(packet) + super(Join_Accept, self).__init__(packet) RejoinType = {0: "NetID+DevEUI", @@ -582,7 +594,7 @@ RejoinType = {0: "NetID+DevEUI", 2: "NetID+DevEUI"} -def RejoinReq(Packet): # LoRa 1.1 specs +class RejoinReq(Packet): # LoRa 1.1 specs name = "RejoinReq" fields_desc = [ByteField("Type", 0), X3BytesField("NetID", 0), @@ -592,12 +604,12 @@ def RejoinReq(Packet): # LoRa 1.1 specs class FRMPayload(Packet): name = "FRMPayload" - fields_desc = [ConditionalField(StrField("DataPayload", 0, remain=4), # Downlink - lambda pkt:(pkt.MType == 0b101 - or pkt.MType == 0b011)), - ConditionalField(StrField("DataPayload", 0, remain=6), # Uplink - lambda pkt:(pkt.MType == 0b100 - or pkt.MType == 0b010)), + fields_desc = [ConditionalField(StrField("DataPayload", "", remain=4), # Downlink # noqa: E501 + lambda pkt:(pkt.MType == 0b101 or + pkt.MType == 0b011)), + ConditionalField(StrField("DataPayload", "", remain=6), # Uplink # noqa: E501 + lambda pkt:(pkt.MType == 0b100 or + pkt.MType == 0b010)), ConditionalField(PacketListField("Join_Request_Field", b"", Join_Request, length_from=lambda pkt:18), @@ -605,19 +617,26 @@ class FRMPayload(Packet): ConditionalField(PacketListField("Join_Accept_Field", b"", Join_Accept, count_from=lambda pkt:1), - lambda pkt:(pkt.MType == 0b001 - and LoRa.encrypted is False)), + lambda pkt:(pkt.MType == 0b001 and + LoRa.encrypted is False)), ConditionalField(StrField("Join_Accept_Encrypted", 0), - lambda pkt:(pkt.MType == 0b001 and LoRa.encrypted is True))] + lambda pkt:(pkt.MType == 0b001 and LoRa.encrypted is True)), # noqa: E501 + ConditionalField(PacketListField("ReJoin_Request_Field", b"", # noqa: E501 + RejoinReq, + length_from=lambda pkt:14), + lambda pkt:(pkt.MType == 0b111))] class MACPayload(Packet): name = "MACPayload" + eFPort = False fields_desc = [FHDR, ConditionalField(ByteEnumField("FPort", 0, FPorts), - lambda pkt:(pkt.MType >= 0b010 and pkt.MType <= 0b101)), + lambda pkt:(pkt.MType >= 0b010 and + pkt.MType <= 0b101 and + pkt.FCtrl[0].FOptsLen == 0)), FRMPayload] - + MTypes = {0b000: "Join-request", 0b001: "Join-accept", @@ -625,11 +644,11 @@ MTypes = {0b000: "Join-request", 0b011: "Unconfirmed Data Down", 0b100: "Confirmed Data Up", 0b101: "Confirmed Data Down", - 0b110: "Rejoin-request", # Only in LoRa 1.1 specs + 0b110: "Rejoin-request", # Only in LoRa 1.1 specs 0b111: "Proprietary"} -class MHDR(Packet): # same for 1.0 and 1.1 +class MHDR(Packet): # Same for 1.0 as for 1.1 name = "MHDR" fields_desc = [BitEnumField("MType", 0b000, 3, MTypes), BitField("RFU", 0b000, 3), @@ -641,13 +660,13 @@ class PHYPayload(Packet): fields_desc = [MHDR, MACPayload, ConditionalField(XIntField("MIC", 0), - lambda pkt:(pkt.MType != 0b001 - or LoRa.encrypted is False))] + lambda pkt:(pkt.MType != 0b001 or + LoRa.encrypted is False))] -class LoRa(Packet): # default frame (unclear specs => taken from https://www.ncbi.nlm.nih.gov/pmc/articles/PMC5677147/) +class LoRa(Packet): # default frame (unclear specs => taken from https://www.ncbi.nlm.nih.gov/pmc/articles/PMC5677147/) # noqa: E501 name = "LoRa" - version = "1.1" # default version to parse + version = "1.1" # default version to parse encrypted = True fields_desc = [XBitField("Preamble", 0, 4), XBitField("PHDR", 0, 16),