Cleaning code
This commit is contained in:
parent
0f513c0403
commit
a5f0a9d65c
|
@ -5,22 +5,27 @@ from __future__ import print_function
|
||||||
# LoRa PHYdecoder - parse LoRa PHY decoded by gr-lora
|
# LoRa PHYdecoder - parse LoRa PHY decoded by gr-lora
|
||||||
# Copyright (C) 2020 Sebastien Dudek (@FlUxIuS) at @PentHertz
|
# Copyright (C) 2020 Sebastien Dudek (@FlUxIuS) at @PentHertz
|
||||||
|
|
||||||
import binascii
|
from layers import LoRa
|
||||||
from layers.loraphy2wan import *
|
from scapy.layers.inet import UDP
|
||||||
|
from scapy.sendrecv import sniff
|
||||||
import argparse
|
import argparse
|
||||||
|
|
||||||
|
|
||||||
def decodePHY(pkt):
|
def decodePHY(pkt):
|
||||||
decoded = LoRa(pkt[UDP].load)
|
decoded = LoRa(pkt[UDP].load)
|
||||||
print (repr(decoded))
|
print (repr(decoded))
|
||||||
|
|
||||||
|
|
||||||
def filterpkt(pkt, port):
|
def filterpkt(pkt, port):
|
||||||
if pkt.haslayer(UDP):
|
if pkt.haslayer(UDP):
|
||||||
if pkt[UDP].dport == port:
|
if pkt[UDP].dport == port:
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
parser = argparse.ArgumentParser(description='Monitor and decode MAC PHY packets.')
|
parser = argparse.ArgumentParser(
|
||||||
|
description='Monitor and decode MAC PHY packets.')
|
||||||
parser.add_argument('-p', '--port', dest='port', default=40868,
|
parser.add_argument('-p', '--port', dest='port', default=40868,
|
||||||
help='TAP PORT to listen on (default: UDP 40868)')
|
help='TAP PORT to listen on (default: UDP 40868)')
|
||||||
parser.add_argument('-i', '--iface', dest='iface', default='lo',
|
parser.add_argument('-i', '--iface', dest='iface', default='lo',
|
||||||
|
@ -30,6 +35,6 @@ if __name__ == "__main__":
|
||||||
iface = args.iface
|
iface = args.iface
|
||||||
port = int(args.port)
|
port = int(args.port)
|
||||||
|
|
||||||
sniff(prn=decodePHY,
|
sniff(prn=decodePHY,
|
||||||
lfilter=lambda pkt:filterpkt(pkt, port),
|
lfilter=lambda pkt: filterpkt(pkt, port),
|
||||||
iface=iface)
|
iface=iface)
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
from layers.loraphy2wan import *
|
|
@ -5,16 +5,18 @@ from Crypto.Cipher import AES
|
||||||
from Crypto.Hash import CMAC
|
from Crypto.Hash import CMAC
|
||||||
import binascii
|
import binascii
|
||||||
|
|
||||||
|
|
||||||
def JoinAcceptPayload_decrypt(key, hexpkt):
|
def JoinAcceptPayload_decrypt(key, hexpkt):
|
||||||
"""
|
"""
|
||||||
Decrypt Join Accept payloads
|
Decrypt Join Accept payloads
|
||||||
In(1): String 128 bits key
|
In(1): String 128 bits key
|
||||||
In(2): String packet
|
In(2): String packet
|
||||||
Out: String decrypted Join accept packet
|
Out: String decrypted Join accept packet
|
||||||
"""
|
"""
|
||||||
payload = hexpkt[4:]
|
payload = hexpkt[4:]
|
||||||
cipher = AES.new(key, AES.MODE_ECB)
|
cipher = AES.new(key, AES.MODE_ECB)
|
||||||
return cipher.encrypt(payload) # logic right? :D
|
return cipher.encrypt(payload) # logic right? :D
|
||||||
|
|
||||||
|
|
||||||
def JoinAcceptPayload_encrypt(key, hexpkt):
|
def JoinAcceptPayload_encrypt(key, hexpkt):
|
||||||
"""
|
"""
|
||||||
|
@ -27,22 +29,24 @@ def JoinAcceptPayload_encrypt(key, hexpkt):
|
||||||
cipher = AES.new(key, AES.MODE_ECB)
|
cipher = AES.new(key, AES.MODE_ECB)
|
||||||
return cipher.decrypt(payload)
|
return cipher.decrypt(payload)
|
||||||
|
|
||||||
|
|
||||||
def getPHY_CMAC(key, hexpkt, direction=1):
|
def getPHY_CMAC(key, hexpkt, direction=1):
|
||||||
"""
|
"""
|
||||||
Compute MIC with AES CMAC
|
Compute MIC with AES CMAC
|
||||||
In(1): String 128 bits key for CMAC
|
In(1): String 128 bits key for CMAC
|
||||||
In(2): hexstring of the packet
|
In(2): hexstring of the packet
|
||||||
In(3): Direction (1: network, 0: end device)
|
In(3): Direction (1: network, 0: end device)
|
||||||
Out: Hexdigest of computed MIC
|
Out: Hexdigest of computed MIC
|
||||||
"""
|
"""
|
||||||
lowoff = -4
|
lowoff = -4
|
||||||
if direction == 0:
|
if direction == 0:
|
||||||
lowoff = -6 # skip the CRC
|
lowoff = -6 # skip the CRC
|
||||||
payload = hexpkt[3:lowoff]
|
payload = hexpkt[3:lowoff]
|
||||||
cobj = CMAC.new(key, ciphermod=AES)
|
cobj = CMAC.new(key, ciphermod=AES)
|
||||||
toret = cobj.update(payload).hexdigest()
|
toret = cobj.update(payload).hexdigest()
|
||||||
return toret[:8]
|
return toret[:8]
|
||||||
|
|
||||||
|
|
||||||
def checkMIC(key, hexpkt, direction=1):
|
def checkMIC(key, hexpkt, direction=1):
|
||||||
"""
|
"""
|
||||||
Check MIC in the packet
|
Check MIC in the packet
|
||||||
|
@ -53,12 +57,12 @@ def checkMIC(key, hexpkt, direction=1):
|
||||||
"""
|
"""
|
||||||
mic = hexpkt[-4:]
|
mic = hexpkt[-4:]
|
||||||
if direction == 0:
|
if direction == 0:
|
||||||
mic = hexpkt[-6:-2] # skip the CRC
|
mic = hexpkt[-6:-2] # skip the CRC
|
||||||
try:
|
try:
|
||||||
binascii.unhexlify(mic)
|
binascii.unhexlify(mic)
|
||||||
except:
|
except Exception:
|
||||||
mic = binascii.hexlify(mic)
|
mic = binascii.hexlify(mic)
|
||||||
cmic = getPHY_CMAC(key, hexpkt)
|
cmic = getPHY_CMAC(key, hexpkt)
|
||||||
return (mic == cmic)
|
return (mic == cmic)
|
||||||
|
|
||||||
#TODO: more helpers
|
# TODO: more helpers
|
||||||
|
|
Loading…
Reference in New Issue