2019-11-20 12:12:46 -08:00
#!/usr/bin/env python3
2016-03-19 12:58:06 -07:00
# Copyright (c) 2014-2016 The Bitcoin Core developers
2022-05-11 16:29:59 -07:00
# Copyright (c) 2019-2022 The Zcash developers
2014-10-22 18:48:19 -07:00
# Distributed under the MIT software license, see the accompanying
2019-07-18 07:16:09 -07:00
# file COPYING or https://www.opensource.org/licenses/mit-license.php .
2014-10-22 18:48:19 -07:00
2014-04-07 08:29:36 -07:00
# Linux network utilities
2016-04-10 07:54:28 -07:00
2014-04-07 08:29:36 -07:00
import sys
import socket
import struct
import array
import os
2019-11-23 18:40:49 -08:00
from binascii import unhexlify , hexlify
2014-04-07 08:29:36 -07:00
2020-12-12 16:33:22 -08:00
# Roughly based on https://web.archive.org/web/20190424172231/http://voorloopnul.com:80/blog/a-python-netstat-in-less-than-100-lines-of-code/ by Ricardo Pascal
2014-04-07 08:29:36 -07:00
STATE_ESTABLISHED = ' 01 '
STATE_SYN_SENT = ' 02 '
STATE_SYN_RECV = ' 03 '
STATE_FIN_WAIT1 = ' 04 '
STATE_FIN_WAIT2 = ' 05 '
STATE_TIME_WAIT = ' 06 '
STATE_CLOSE = ' 07 '
STATE_CLOSE_WAIT = ' 08 '
STATE_LAST_ACK = ' 09 '
STATE_LISTEN = ' 0A '
STATE_CLOSING = ' 0B '
def get_socket_inodes ( pid ) :
'''
Get list of socket inodes for process pid .
'''
base = ' /proc/ %i /fd ' % pid
inodes = [ ]
for item in os . listdir ( base ) :
target = os . readlink ( os . path . join ( base , item ) )
if target . startswith ( ' socket: ' ) :
inodes . append ( int ( target [ 8 : - 1 ] ) )
return inodes
def _remove_empty ( array ) :
return [ x for x in array if x != ' ' ]
def _convert_ip_port ( array ) :
host , port = array . split ( ' : ' )
# convert host from mangled-per-four-bytes form as used by kernel
2019-12-04 07:52:02 -08:00
host = unhexlify ( host )
2014-04-07 08:29:36 -07:00
host_out = ' '
2019-11-23 18:40:49 -08:00
for x in range ( 0 , len ( host ) / / 4 ) :
2014-04-07 08:29:36 -07:00
( val , ) = struct . unpack ( ' =I ' , host [ x * 4 : ( x + 1 ) * 4 ] )
host_out + = ' %08x ' % val
return host_out , int ( port , 16 )
def netstat ( typ = ' tcp ' ) :
'''
Function to return a list with status of tcp connections at linux systems
To get pid of all network process running on system , you must run this script
as superuser
'''
2016-09-29 08:34:44 -07:00
with open ( ' /proc/net/ ' + typ , ' r ' , encoding = ' utf8 ' ) as f :
2014-04-07 08:29:36 -07:00
content = f . readlines ( )
content . pop ( 0 )
result = [ ]
for line in content :
line_array = _remove_empty ( line . split ( ' ' ) ) # Split lines and remove empty spaces.
tcp_id = line_array [ 0 ]
l_addr = _convert_ip_port ( line_array [ 1 ] )
r_addr = _convert_ip_port ( line_array [ 2 ] )
state = line_array [ 3 ]
inode = int ( line_array [ 9 ] ) # Need the inode to match with process pid.
nline = [ tcp_id , l_addr , r_addr , state , inode ]
result . append ( nline )
return result
def get_bind_addrs ( pid ) :
'''
Get bind addresses as ( host , port ) tuples for process pid .
'''
inodes = get_socket_inodes ( pid )
bind_addrs = [ ]
for conn in netstat ( ' tcp ' ) + netstat ( ' tcp6 ' ) :
if conn [ 3 ] == STATE_LISTEN and conn [ 4 ] in inodes :
bind_addrs . append ( conn [ 1 ] )
return bind_addrs
2020-12-12 16:33:22 -08:00
# from: https://code.activestate.com/recipes/439093/
2014-04-07 08:29:36 -07:00
def all_interfaces ( ) :
'''
Return all interfaces that are up
'''
2020-03-12 10:27:12 -07:00
import fcntl
2014-04-07 08:29:36 -07:00
is_64bits = sys . maxsize > 2 * * 32
struct_size = 40 if is_64bits else 32
s = socket . socket ( socket . AF_INET , socket . SOCK_DGRAM )
max_possible = 8 # initial value
while True :
bytes = max_possible * struct_size
2019-11-23 18:40:49 -08:00
names = array . array ( ' B ' , b ' \0 ' * bytes )
2014-04-07 08:29:36 -07:00
outbytes = struct . unpack ( ' iL ' , fcntl . ioctl (
s . fileno ( ) ,
0x8912 , # SIOCGIFCONF
struct . pack ( ' iL ' , bytes , names . buffer_info ( ) [ 0 ] )
) ) [ 0 ]
if outbytes == bytes :
max_possible * = 2
else :
break
namestr = names . tostring ( )
2019-11-23 18:40:49 -08:00
return [ ( namestr [ i : i + 16 ] . split ( b ' \0 ' , 1 ) [ 0 ] ,
2014-04-07 08:29:36 -07:00
socket . inet_ntoa ( namestr [ i + 20 : i + 24 ] ) )
for i in range ( 0 , outbytes , struct_size ) ]
def addr_to_hex ( addr ) :
'''
Convert string IPv4 or IPv6 address to binary address as returned by
get_bind_addrs .
Very naive implementation that certainly doesn ' t work for all IPv6 variants.
'''
if ' . ' in addr : # IPv4
addr = [ int ( x ) for x in addr . split ( ' . ' ) ]
elif ' : ' in addr : # IPv6
sub = [ [ ] , [ ] ] # prefix, suffix
x = 0
addr = addr . split ( ' : ' )
for i , comp in enumerate ( addr ) :
if comp == ' ' :
if i == 0 or i == ( len ( addr ) - 1 ) : # skip empty component at beginning or end
continue
x + = 1 # :: skips to suffix
assert ( x < 2 )
else : # two bytes per component
val = int ( comp , 16 )
sub [ x ] . append ( val >> 8 )
sub [ x ] . append ( val & 0xff )
nullbytes = 16 - len ( sub [ 0 ] ) - len ( sub [ 1 ] )
assert ( ( x == 0 and nullbytes == 0 ) or ( x == 1 and nullbytes > 0 ) )
addr = sub [ 0 ] + ( [ 0 ] * nullbytes ) + sub [ 1 ]
else :
raise ValueError ( ' Could not parse address %s ' % addr )
2019-11-23 18:40:49 -08:00
return hexlify ( bytearray ( addr ) ) . decode ( ' ascii ' )
2016-02-09 03:37:05 -08:00
def test_ipv6_local ( ) :
'''
Check for ( local ) IPv6 support .
'''
import socket
# By using SOCK_DGRAM this will not actually make a connection, but it will
# fail if there is no route to IPv6 localhost.
have_ipv6 = True
try :
s = socket . socket ( socket . AF_INET6 , socket . SOCK_DGRAM )
s . connect ( ( ' ::1 ' , 0 ) )
except socket . error :
have_ipv6 = False
return have_ipv6