Fixing a bug for Piggybacked packets and updating layer's name

This commit is contained in:
FlUxIuS 2020-03-18 17:32:31 +01:00
parent b371de0a2e
commit 0f513c0403
3 changed files with 108 additions and 89 deletions

View File

@ -6,7 +6,7 @@ from __future__ import print_function
# Copyright (C) 2020 Sebastien Dudek (@FlUxIuS) at @PentHertz # Copyright (C) 2020 Sebastien Dudek (@FlUxIuS) at @PentHertz
import binascii import binascii
from layers.loraphy import * from layers.loraphy2wan import *
import argparse import argparse
def decodePHY(pkt): def decodePHY(pkt):

View File

@ -61,7 +61,7 @@ $ sudo python LoRa_PHYDecode.py
To generate packets, you can instantiate a Scapy packet as follows: To generate packets, you can instantiate a Scapy packet as follows:
```python ```python
>>> from layers.loraphy import * >>> from layers.loraphy2wan import *
>>> pkt = LoRa() >>> pkt = LoRa()
>>> pkt >>> pkt
<LoRa Join_Request_Field=[''] |> <LoRa Join_Request_Field=[''] |>
@ -96,7 +96,7 @@ Few helpers have been implemented to calculate MIC field, encrypt and decrypt pa
As an example, to check if the key `000102030405060708090A0B0C0D0E0F` is used to compute MIC on the following Join-request, we can write a little script as follows: As an example, to check if the key `000102030405060708090A0B0C0D0E0F` is used to compute MIC on the following Join-request, we can write a little script as follows:
```python ```python
>>> from layers.loraphy import * >>> from layers.loraphy2wan import *
>>> from lutil.crypto import * >>> from lutil.crypto import *
>>> key = "000102030405060708090A0B0C0D0E0F" >>> key = "000102030405060708090A0B0C0D0E0F"
>>> p = '000000006c6f7665636166656d656565746f6f00696953024c49' >>> p = '000000006c6f7665636166656d656565746f6f00696953024c49'

View File

@ -3,7 +3,13 @@
# LoRa Scapy layers # LoRa Scapy layers
# Copyright (C) 2020 Sebastien Dudek (@FlUxIuS) at @PentHertz # Copyright (C) 2020 Sebastien Dudek (@FlUxIuS) at @PentHertz
from scapy.all import * from scapy.packet import Packet
from scapy.fields import BitField, ByteEnumField, ByteField, \
ConditionalField, IntField, LEShortField, PacketListField, \
StrFixedLenField, X3BytesField, XByteField, XIntField, \
XShortField, BitFieldLenField, LEX3BytesField, XBitField, \
BitEnumField, XLEIntField, StrField, PacketField
class FCtrl_DownLink(Packet): class FCtrl_DownLink(Packet):
name = "FCtrl_DownLink" name = "FCtrl_DownLink"
@ -13,6 +19,7 @@ class FCtrl_DownLink(Packet):
BitField("FPending", 0, 1), BitField("FPending", 0, 1),
BitFieldLenField("FOptsLen", 0, 4)] BitFieldLenField("FOptsLen", 0, 4)]
# pylint: disable=R0201
def extract_padding(self, p): def extract_padding(self, p):
return "", p return "", p
@ -25,6 +32,7 @@ class FCtrl_UpLink(Packet):
BitField("ClassB", 0, 1), BitField("ClassB", 0, 1),
BitFieldLenField("FOptsLen", 0, 4)] BitFieldLenField("FOptsLen", 0, 4)]
# pylint: disable=R0201
def extract_padding(self, p): def extract_padding(self, p):
return "", p return "", p
@ -32,10 +40,7 @@ class FCtrl_UpLink(Packet):
class DevAddrElem(Packet): class DevAddrElem(Packet):
name = "DevAddrElem" name = "DevAddrElem"
fields_desc = [XByteField("NwkID", 0x0), fields_desc = [XByteField("NwkID", 0x0),
LEX3BytesField("NwkAddr", b"\x00"*3)] LEX3BytesField("NwkAddr", b"\x00" * 3)]
def extract_padding(self, p):
return "", p
CIDs_up = {0x01: "ResetInd", CIDs_up = {0x01: "ResetInd",
@ -46,13 +51,13 @@ CIDs_up = {0x01: "ResetInd",
0x06: "DevStatusReq", 0x06: "DevStatusReq",
0x07: "NewChannelReq", 0x07: "NewChannelReq",
0x08: "RXTimingSetupReq", 0x08: "RXTimingSetupReq",
0x09: "TxParamSetupReq", # LoRa 1.1 specs from here 0x09: "TxParamSetupReq", # LoRa 1.1 specs
0x0A: "DlChannelReq", 0x0A: "DlChannelReq",
0x0B: "RekeyInd", 0x0B: "RekeyInd",
0x0C: "ADRParamSetupReq", 0x0C: "ADRParamSetupReq",
0x0D: "DeviceTimeReq", 0x0D: "DeviceTimeReq",
0x0E: "ForceRejoinReq", 0x0E: "ForceRejoinReq",
0x0F: "RejoinParamSetupReq"} # end of LoRa 1.1 specs 0x0F: "RejoinParamSetupReq"} # end of LoRa 1.1 specs
CIDs_down = {0x01: "ResetConf", CIDs_down = {0x01: "ResetConf",
@ -63,12 +68,12 @@ CIDs_down = {0x01: "ResetConf",
0x06: "DevStatusAns", 0x06: "DevStatusAns",
0x07: "NewChannelAns", 0x07: "NewChannelAns",
0x08: "RXTimingSetupAns", 0x08: "RXTimingSetupAns",
0x09: "TxParamSetupAns", # LoRa 1.1 specs from here 0x09: "TxParamSetupAns", # LoRa 1.1 specs here
0x0A: "DlChannelAns", 0x0A: "DlChannelAns",
0x0B: "RekeyConf", 0x0B: "RekeyConf",
0x0C: "ADRParamSetupAns", 0x0C: "ADRParamSetupAns",
0x0D: "DeviceTimeAns", 0x0D: "DeviceTimeAns",
0x0F: "RejoinParamSetupAns"} # end of LoRa 1.1 specs 0x0F: "RejoinParamSetupAns"} # end of LoRa 1.1 specs
class ResetInd(Packet): class ResetInd(Packet):
@ -83,7 +88,6 @@ class ResetConf(Packet):
class LinkCheckReq(Packet): class LinkCheckReq(Packet):
name = "LinkCheckReq" name = "LinkCheckReq"
fields_desc = []
class LinkCheckAns(Packet): class LinkCheckAns(Packet):
@ -116,12 +120,15 @@ class LinkADRAns_Status(Packet):
name = "LinkADRAns_Status" name = "LinkADRAns_Status"
fields_desc = [BitField("RFU", 0, 5), fields_desc = [BitField("RFU", 0, 5),
BitField("PowerACK", 0, 1), BitField("PowerACK", 0, 1),
BitField("DataRate", 0, 1),
BitField("ChannelMaskACK", 0, 1)] BitField("ChannelMaskACK", 0, 1)]
class LinkADRAns(Packet): class LinkADRAns(Packet):
name = "LinkADRAns" name = "LinkADRAns"
fields_desc = [LinkADRAns_Status] fields_desc = [PacketField("status",
LinkADRAns_Status(),
LinkADRAns_Status)]
class DutyCyclePL(Packet): class DutyCyclePL(Packet):
@ -141,9 +148,9 @@ class DutyCycleAns(Packet):
class DLsettings(Packet): class DLsettings(Packet):
name = "DLsettings" name = "DLsettings"
fields_desc = [BitField("RFU", 0, 1), fields_desc = [BitField("OptNeg", 0, 1),
BitField("RX1DRoffset", 0, 3), XBitField("RX1DRoffset", 0, 3),
BitField("RX2DataRate", 0, 4)] XBitField("RX2_Data_rate", 0, 4)]
class RXParamSetupReq(Packet): class RXParamSetupReq(Packet):
@ -164,6 +171,7 @@ class RXParamSetupAns(Packet):
name = "RXParamSetupAns" name = "RXParamSetupAns"
fields_desc = [RXParamSetupAns_Status] fields_desc = [RXParamSetupAns_Status]
Battery_state = {0: "End-device connected to external source", Battery_state = {0: "End-device connected to external source",
255: "Battery level unknown"} 255: "Battery level unknown"}
@ -226,8 +234,7 @@ class RXTimingSetupAns(Packet):
fields_desc = [] fields_desc = []
# Specific commands for LoRa 1.1 here # Specific commands for LoRa 1.1 here
MaxEIRPs = {0: "8 dbm", MaxEIRPs = {0: "8 dbm",
1: "10 dbm", 1: "10 dbm",
@ -289,7 +296,7 @@ class DevLoraWANversion(Packet):
class RekeyInd(Packet): class RekeyInd(Packet):
name = "RekeyInd" name = "RekeyInd"
fields_desc = [PacketListField("LoRaWANversion", b"", fields_desc = [PacketListField("LoRaWANversion", b"",
DevLoraWANversion, length_from=lambda pkt:1)] DevLoraWANversion, length_from=lambda pkt:1)]
class RekeyConf(Packet): class RekeyConf(Packet):
@ -325,7 +332,7 @@ class DeviceTimeAns(Packet):
class ForceRejoinReq(Packet): class ForceRejoinReq(Packet):
name ="ForceRejoinReq" name = "ForceRejoinReq"
fields_desc = [BitField("RFU", 0, 2), fields_desc = [BitField("RFU", 0, 2),
BitField("Period", 0, 3), BitField("Period", 0, 3),
BitField("Max_Retries", 0, 3), BitField("Max_Retries", 0, 3),
@ -384,7 +391,8 @@ class MACCommand_up(Packet):
RXTimingSetupReq, RXTimingSetupReq,
length_from=lambda pkt:1), length_from=lambda pkt:1),
lambda pkt:(pkt.CID == 0x08)), lambda pkt:(pkt.CID == 0x08)),
ConditionalField(PacketListField("TxParamSetup", b"", # specific to 1.1 from here # specific to 1.1 from here
ConditionalField(PacketListField("TxParamSetup", b"",
TxParamSetupReq, TxParamSetupReq,
length_from=lambda pkt:1), length_from=lambda pkt:1),
lambda pkt:(pkt.CID == 0x09)), lambda pkt:(pkt.CID == 0x09)),
@ -413,6 +421,7 @@ class MACCommand_up(Packet):
length_from=lambda pkt:1), length_from=lambda pkt:1),
lambda pkt:(pkt.CID == 0x0F))] lambda pkt:(pkt.CID == 0x0F))]
# pylint: disable=R0201
def extract_padding(self, p): def extract_padding(self, p):
return "", p return "", p
@ -477,64 +486,73 @@ class MACCommand_down(Packet):
length_from=lambda pkt:1), length_from=lambda pkt:1),
lambda pkt:(pkt.CID == 0x0F))] lambda pkt:(pkt.CID == 0x0F))]
def extract_padding(self, p):
return "", p
class FOpts(Packet): class FOpts(Packet):
name = "FOpts" name = "FOpts"
fields_desc = [ConditionalField(PacketListField("FOpts_up", b"", fields_desc = [ConditionalField(PacketListField("FOpts_up", b"",
MACCommand_up, # piggybacked MAC Command for uplink # UL piggy MAC Command
length_from=lambda pkt:pkt.FCtrl[0].FOptsLen), MACCommand_up,
lambda pkt:(pkt.FCtrl[0].FOptsLen > 0 length_from=lambda pkt:pkt.FCtrl[0].FOptsLen), # noqa: E501
and pkt.MType & 0b1 == 0 lambda pkt:(pkt.FCtrl[0].FOptsLen > 0 and
and pkt.MType >= 0b010)), pkt.MType & 0b1 == 0 and
pkt.MType >= 0b010)),
ConditionalField(PacketListField("FOpts_down", b"", ConditionalField(PacketListField("FOpts_down", b"",
MACCommand_down, # piggybacked MAC Command for downlink # DL piggy MAC Command
length_from=lambda pkt:pkt.FCtrl[0].FOptsLen), MACCommand_down,
lambda pkt:(pkt.FCtrl[0].FOptsLen > 0 length_from=lambda pkt:pkt.FCtrl[0].FOptsLen), # noqa: E501
and pkt.MType & 0b1 == 1 lambda pkt:(pkt.FCtrl[0].FOptsLen > 0 and
and pkt.MType <= 0b101))] pkt.MType & 0b1 == 1 and
pkt.MType <= 0b101))]
def FOptsShow(pkt):
def FOptsDownShow(pkt):
try: try:
if pkt.FCtrl[0].FOptsLen > 0 and pkt.MType & 0b1 == 0 and pkt.MType >= 0b010: if pkt.FCtrl[0].FOptsLen > 0 and pkt.MType & 0b1 == 1 and pkt.MType <= 0b101: # noqa: E501
return True
elif pkt.FCtrl[0].FOptsLen > 0 and pkt.MType & 0b1 == 1 and pkt.MType <= 0b101:
return True return True
return False return False
except: except Exception:
return False return False
def FOptsUpShow(pkt):
try:
if pkt.FCtrl[0].FOptsLen > 0 and pkt.MType & 0b1 == 0 and pkt.MType >= 0b010: # noqa: E501
return True
return False
except Exception:
return False
class FHDR(Packet): class FHDR(Packet):
name = "FHDR" name = "FHDR"
fields_desc = [ConditionalField(PacketListField("DevAddr", b"", DevAddrElem, fields_desc = [ConditionalField(PacketListField("DevAddr", b"", DevAddrElem, # noqa: E501
length_from=lambda pkt:4), length_from=lambda pkt:4),
lambda pkt:(pkt.MType >= 0b010 lambda pkt:(pkt.MType >= 0b010 and
and pkt.MType <= 0b101)), pkt.MType <= 0b101)),
ConditionalField(PacketListField("FCtrl", b"", ConditionalField(PacketListField("FCtrl", b"",
FCtrl_DownLink, FCtrl_DownLink,
length_from=lambda pkt:1), length_from=lambda pkt:1),
lambda pkt:(pkt.MType & 0b1 == 1 lambda pkt:(pkt.MType & 0b1 == 1 and
and pkt.MType <= 0b101)), pkt.MType <= 0b101)),
ConditionalField(PacketListField("FCtrl", b"", ConditionalField(PacketListField("FCtrl", b"",
FCtrl_UpLink, FCtrl_UpLink,
length_from=lambda pkt:1), length_from=lambda pkt:1),
lambda pkt:(pkt.MType & 0b1 == 0 lambda pkt:(pkt.MType & 0b1 == 0 and
and pkt.MType >= 0b010)), pkt.MType >= 0b010)),
ConditionalField(LEShortField("FCnt", 0), ConditionalField(LEShortField("FCnt", 0),
lambda pkt:(pkt.MType >= 0b010 lambda pkt:(pkt.MType >= 0b010 and
and pkt.MType <= 0b101)), pkt.MType <= 0b101)),
ConditionalField(PacketListField("FOpts_up", b"", ConditionalField(PacketListField("FOpts_up", b"",
MACCommand_up, MACCommand_up,
length_from=lambda pkt:pkt.FCtrl[0].FOptsLen), length_from=lambda pkt:pkt.FCtrl[0].FOptsLen), # noqa: E501
lambda pkt:FOptsShow(pkt)), FOptsUpShow),
ConditionalField(PacketListField("FOpts_down", b"", ConditionalField(PacketListField("FOpts_down", b"",
MACCommand_down, MACCommand_down,
length_from=lambda pkt:pkt.FCtrl[0].FOptsLen), length_from=lambda pkt:pkt.FCtrl[0].FOptsLen), # noqa: E501
lambda pkt:FOptsShow(pkt))] FOptsDownShow)]
FPorts = {0: "NwkSKey"} # anything else is AppSKey FPorts = {0: "NwkSKey"} # anything else is AppSKey
JoinReqTypes = {0xFF: "Join-request", JoinReqTypes = {0xFF: "Join-request",
@ -550,31 +568,25 @@ class Join_Request(Packet):
LEShortField("DevNonce", 0x0000)] LEShortField("DevNonce", 0x0000)]
class DLsettings(Packet):
name = "DLsettings"
fields_desc = [BitField("OptNeg", 0, 1),
XBitField("RX1DRoffset", 0, 3),
XBitField("RX2_Data_rate", 0, 4)]
class Join_Accept(Packet): class Join_Accept(Packet):
name = "Join_Accept" name = "Join_Accept"
dcflist = False dcflist = False
fields_desc = [LEX3BytesField("JoinAppNonce", 0), fields_desc = [LEX3BytesField("JoinAppNonce", 0),
LEX3BytesField("NetID", 0), LEX3BytesField("NetID", 0),
XLEIntField("DevAddr", 0), XLEIntField("DevAddr", 0),
DLsettings, DLsettings,
XByteField("RxDelay", 0), XByteField("RxDelay", 0),
ConditionalField(StrFixedLenField("CFList", b"\x00" * 16 , 16), ConditionalField(StrFixedLenField("CFList", b"\x00" * 16, 16), # noqa: E501
lambda pkt:(Join_Accept.dcflist is True))] lambda pkt:(Join_Accept.dcflist is True))]
# pylint: disable=R0201
def extract_padding(self, p): def extract_padding(self, p):
return "", p return "", p
def __init__(self, packet=""): # CFlist calculated with on rest packet len def __init__(self, packet=""): # CFList calculated with rest of packet len
if len(packet) > 18: if len(packet) > 18:
Join_Accept.dcflist = True Join_Accept.dcflist = True
return super(Join_Accept, self).__init__(packet) super(Join_Accept, self).__init__(packet)
RejoinType = {0: "NetID+DevEUI", RejoinType = {0: "NetID+DevEUI",
@ -582,7 +594,7 @@ RejoinType = {0: "NetID+DevEUI",
2: "NetID+DevEUI"} 2: "NetID+DevEUI"}
def RejoinReq(Packet): # LoRa 1.1 specs class RejoinReq(Packet): # LoRa 1.1 specs
name = "RejoinReq" name = "RejoinReq"
fields_desc = [ByteField("Type", 0), fields_desc = [ByteField("Type", 0),
X3BytesField("NetID", 0), X3BytesField("NetID", 0),
@ -592,12 +604,12 @@ def RejoinReq(Packet): # LoRa 1.1 specs
class FRMPayload(Packet): class FRMPayload(Packet):
name = "FRMPayload" name = "FRMPayload"
fields_desc = [ConditionalField(StrField("DataPayload", 0, remain=4), # Downlink fields_desc = [ConditionalField(StrField("DataPayload", "", remain=4), # Downlink # noqa: E501
lambda pkt:(pkt.MType == 0b101 lambda pkt:(pkt.MType == 0b101 or
or pkt.MType == 0b011)), pkt.MType == 0b011)),
ConditionalField(StrField("DataPayload", 0, remain=6), # Uplink ConditionalField(StrField("DataPayload", "", remain=6), # Uplink # noqa: E501
lambda pkt:(pkt.MType == 0b100 lambda pkt:(pkt.MType == 0b100 or
or pkt.MType == 0b010)), pkt.MType == 0b010)),
ConditionalField(PacketListField("Join_Request_Field", b"", ConditionalField(PacketListField("Join_Request_Field", b"",
Join_Request, Join_Request,
length_from=lambda pkt:18), length_from=lambda pkt:18),
@ -605,17 +617,24 @@ class FRMPayload(Packet):
ConditionalField(PacketListField("Join_Accept_Field", b"", ConditionalField(PacketListField("Join_Accept_Field", b"",
Join_Accept, Join_Accept,
count_from=lambda pkt:1), count_from=lambda pkt:1),
lambda pkt:(pkt.MType == 0b001 lambda pkt:(pkt.MType == 0b001 and
and LoRa.encrypted is False)), LoRa.encrypted is False)),
ConditionalField(StrField("Join_Accept_Encrypted", 0), ConditionalField(StrField("Join_Accept_Encrypted", 0),
lambda pkt:(pkt.MType == 0b001 and LoRa.encrypted is True))] lambda pkt:(pkt.MType == 0b001 and LoRa.encrypted is True)), # noqa: E501
ConditionalField(PacketListField("ReJoin_Request_Field", b"", # noqa: E501
RejoinReq,
length_from=lambda pkt:14),
lambda pkt:(pkt.MType == 0b111))]
class MACPayload(Packet): class MACPayload(Packet):
name = "MACPayload" name = "MACPayload"
eFPort = False
fields_desc = [FHDR, fields_desc = [FHDR,
ConditionalField(ByteEnumField("FPort", 0, FPorts), ConditionalField(ByteEnumField("FPort", 0, FPorts),
lambda pkt:(pkt.MType >= 0b010 and pkt.MType <= 0b101)), lambda pkt:(pkt.MType >= 0b010 and
pkt.MType <= 0b101 and
pkt.FCtrl[0].FOptsLen == 0)),
FRMPayload] FRMPayload]
@ -625,11 +644,11 @@ MTypes = {0b000: "Join-request",
0b011: "Unconfirmed Data Down", 0b011: "Unconfirmed Data Down",
0b100: "Confirmed Data Up", 0b100: "Confirmed Data Up",
0b101: "Confirmed Data Down", 0b101: "Confirmed Data Down",
0b110: "Rejoin-request", # Only in LoRa 1.1 specs 0b110: "Rejoin-request", # Only in LoRa 1.1 specs
0b111: "Proprietary"} 0b111: "Proprietary"}
class MHDR(Packet): # same for 1.0 and 1.1 class MHDR(Packet): # Same for 1.0 as for 1.1
name = "MHDR" name = "MHDR"
fields_desc = [BitEnumField("MType", 0b000, 3, MTypes), fields_desc = [BitEnumField("MType", 0b000, 3, MTypes),
BitField("RFU", 0b000, 3), BitField("RFU", 0b000, 3),
@ -641,13 +660,13 @@ class PHYPayload(Packet):
fields_desc = [MHDR, fields_desc = [MHDR,
MACPayload, MACPayload,
ConditionalField(XIntField("MIC", 0), ConditionalField(XIntField("MIC", 0),
lambda pkt:(pkt.MType != 0b001 lambda pkt:(pkt.MType != 0b001 or
or LoRa.encrypted is False))] LoRa.encrypted is False))]
class LoRa(Packet): # default frame (unclear specs => taken from https://www.ncbi.nlm.nih.gov/pmc/articles/PMC5677147/) class LoRa(Packet): # default frame (unclear specs => taken from https://www.ncbi.nlm.nih.gov/pmc/articles/PMC5677147/) # noqa: E501
name = "LoRa" name = "LoRa"
version = "1.1" # default version to parse version = "1.1" # default version to parse
encrypted = True encrypted = True
fields_desc = [XBitField("Preamble", 0, 4), fields_desc = [XBitField("Preamble", 0, 4),
XBitField("PHDR", 0, 16), XBitField("PHDR", 0, 16),