zc_utils.py: optionally enforce the MAX_SIZE limit (enforced by default).
Signed-off-by: Daira Hopwood <daira@jacaranda.org>
This commit is contained in:
parent
efee9dcb03
commit
bc6bc8a375
69
zc_utils.py
69
zc_utils.py
|
@ -3,7 +3,10 @@ import sys; assert sys.version_info[0] >= 3, "Python 3 required."
|
||||||
|
|
||||||
import struct
|
import struct
|
||||||
|
|
||||||
def write_compact_size(n):
|
MAX_SIZE = 0x2000000
|
||||||
|
|
||||||
|
def write_compact_size(n, allow_u64=False):
|
||||||
|
assert allow_u64 or n <= MAX_SIZE
|
||||||
if n < 253:
|
if n < 253:
|
||||||
return struct.pack('B', n)
|
return struct.pack('B', n)
|
||||||
elif n <= 0xFFFF:
|
elif n <= 0xFFFF:
|
||||||
|
@ -13,7 +16,12 @@ def write_compact_size(n):
|
||||||
else:
|
else:
|
||||||
return struct.pack('B', 255) + struct.pack('<Q', n)
|
return struct.pack('B', 255) + struct.pack('<Q', n)
|
||||||
|
|
||||||
def parse_compact_size(rest):
|
def parse_compact_size(rest, allow_u64=False):
|
||||||
|
(n, rest) = parse_compact_u64(rest)
|
||||||
|
assert allow_u64 or n <= MAX_SIZE
|
||||||
|
return (n, rest)
|
||||||
|
|
||||||
|
def parse_compact_u64(rest):
|
||||||
assert len(rest) >= 1
|
assert len(rest) >= 1
|
||||||
b = rest[0]
|
b = rest[0]
|
||||||
if b < 253:
|
if b < 253:
|
||||||
|
@ -35,36 +43,43 @@ def parse_compact_size(rest):
|
||||||
return (n, rest[9:])
|
return (n, rest[9:])
|
||||||
|
|
||||||
|
|
||||||
def assert_parse_fails(encoding):
|
def assert_parse_fails(encoding, allow_u64):
|
||||||
try:
|
try:
|
||||||
parse_compact_size(encoding)
|
parse_compact_size(encoding, allow_u64)
|
||||||
except AssertionError:
|
except AssertionError:
|
||||||
pass
|
pass
|
||||||
else:
|
else:
|
||||||
raise AssertionError("parse_compact_size(%r) failed to raise AssertionError" % (encoding,))
|
raise AssertionError("parse_compact_size(%r) failed to raise AssertionError" % (encoding,))
|
||||||
|
|
||||||
def test_round_trip(n, encoding):
|
def test_round_trip(n, encoding, allow_u64):
|
||||||
assert write_compact_size(n) == encoding
|
assert write_compact_size(n, allow_u64) == encoding
|
||||||
assert parse_compact_size(encoding) == (n, b'')
|
assert parse_compact_size(encoding, allow_u64) == (n, b'')
|
||||||
assert parse_compact_size(encoding + b'*') == (n, b'*')
|
assert parse_compact_size(encoding + b'*', allow_u64) == (n, b'*')
|
||||||
assert_parse_fails(encoding[:-1])
|
assert_parse_fails(encoding[:-1], allow_u64)
|
||||||
|
|
||||||
test_round_trip(0, b'\x00')
|
for allow_u64 in (False, True):
|
||||||
test_round_trip(1, b'\x01')
|
test_round_trip(0, b'\x00', allow_u64)
|
||||||
test_round_trip(252, b'\xFC')
|
test_round_trip(1, b'\x01', allow_u64)
|
||||||
test_round_trip(253, b'\xFD\xFD\x00')
|
test_round_trip(252, b'\xFC', allow_u64)
|
||||||
test_round_trip(254, b'\xFD\xFE\x00')
|
test_round_trip(253, b'\xFD\xFD\x00', allow_u64)
|
||||||
test_round_trip(255, b'\xFD\xFF\x00')
|
test_round_trip(254, b'\xFD\xFE\x00', allow_u64)
|
||||||
test_round_trip(256, b'\xFD\x00\x01')
|
test_round_trip(255, b'\xFD\xFF\x00', allow_u64)
|
||||||
test_round_trip(0xFFFE, b'\xFD\xFE\xFF')
|
test_round_trip(256, b'\xFD\x00\x01', allow_u64)
|
||||||
test_round_trip(0xFFFF, b'\xFD\xFF\xFF')
|
test_round_trip(0xFFFE, b'\xFD\xFE\xFF', allow_u64)
|
||||||
test_round_trip(0x010000, b'\xFE\x00\x00\x01\x00')
|
test_round_trip(0xFFFF, b'\xFD\xFF\xFF', allow_u64)
|
||||||
test_round_trip(0x010001, b'\xFE\x01\x00\x01\x00')
|
test_round_trip(0x010000, b'\xFE\x00\x00\x01\x00', allow_u64)
|
||||||
test_round_trip(0xFFFFFFFE, b'\xFE\xFE\xFF\xFF\xFF')
|
test_round_trip(0x010001, b'\xFE\x01\x00\x01\x00', allow_u64)
|
||||||
test_round_trip(0xFFFFFFFF, b'\xFE\xFF\xFF\xFF\xFF')
|
test_round_trip(0x02000000, b'\xFE\x00\x00\x00\x02', allow_u64)
|
||||||
test_round_trip(0x0100000000, b'\xFF\x00\x00\x00\x00\x01\x00\x00\x00')
|
|
||||||
test_round_trip(0xFFFFFFFFFFFFFFFF, b'\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF')
|
|
||||||
|
|
||||||
assert_parse_fails(b'\xFD\xFC\x00')
|
assert_parse_fails(b'\xFD\xFC\x00', allow_u64)
|
||||||
assert_parse_fails(b'\xFE\xFF\xFF\x00\x00')
|
assert_parse_fails(b'\xFE\xFF\xFF\x00\x00', allow_u64)
|
||||||
assert_parse_fails(b'\xFF\xFF\xFF\xFF\xFF\x00\x00\x00\x00')
|
assert_parse_fails(b'\xFF\xFF\xFF\xFF\xFF\x00\x00\x00\x00', allow_u64)
|
||||||
|
|
||||||
|
assert_parse_fails(b'\xFE\x01\x00\x00\x02', False)
|
||||||
|
assert_parse_fails(b'\xFF\x00\x00\x00\x00\x01\x00\x00\x00', False)
|
||||||
|
assert_parse_fails(b'\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF', False)
|
||||||
|
|
||||||
|
test_round_trip(0xFFFFFFFE, b'\xFE\xFE\xFF\xFF\xFF', True)
|
||||||
|
test_round_trip(0xFFFFFFFF, b'\xFE\xFF\xFF\xFF\xFF', True)
|
||||||
|
test_round_trip(0x0100000000, b'\xFF\x00\x00\x00\x00\x01\x00\x00\x00', True)
|
||||||
|
test_round_trip(0xFFFFFFFFFFFFFFFF, b'\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF', True)
|
||||||
|
|
Loading…
Reference in New Issue