interface ssl: besides TLS 1.1, also allow later versions

This commit is contained in:
SomberNight 2017-11-10 21:39:20 +01:00
parent c46f219d25
commit 07e9415c2d
1 changed files with 19 additions and 7 deletions

View File

@ -124,6 +124,18 @@ class TcpConnection(threading.Thread, util.PrintError):
else: else:
self.print_error("failed to connect", str(e)) self.print_error("failed to connect", str(e))
@staticmethod
def get_ssl_context(cert_reqs, ca_certs):
context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=ca_certs)
context.check_hostname = False
context.verify_mode = cert_reqs
context.options |= ssl.OP_NO_SSLv2
context.options |= ssl.OP_NO_SSLv3
context.options |= ssl.OP_NO_TLSv1
return context
def get_socket(self): def get_socket(self):
if self.use_ssl: if self.use_ssl:
cert_path = os.path.join(self.config_path, 'certs', self.host) cert_path = os.path.join(self.config_path, 'certs', self.host)
@ -134,7 +146,8 @@ class TcpConnection(threading.Thread, util.PrintError):
return return
# try with CA first # try with CA first
try: try:
s = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1_1, cert_reqs=ssl.CERT_REQUIRED, ca_certs=ca_path, do_handshake_on_connect=True) context = self.get_ssl_context(cert_reqs=ssl.CERT_REQUIRED, ca_certs=ca_path)
s = context.wrap_socket(s, do_handshake_on_connect=True)
except ssl.SSLError as e: except ssl.SSLError as e:
print_error(e) print_error(e)
s = None s = None
@ -150,7 +163,8 @@ class TcpConnection(threading.Thread, util.PrintError):
if s is None: if s is None:
return return
try: try:
s = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1_1, cert_reqs=ssl.CERT_NONE, ca_certs=None) context = self.get_ssl_context(cert_reqs=ssl.CERT_NONE, ca_certs=None)
s = context.wrap_socket(s)
except ssl.SSLError as e: except ssl.SSLError as e:
self.print_error("SSL error retrieving SSL certificate:", e) self.print_error("SSL error retrieving SSL certificate:", e)
return return
@ -174,11 +188,9 @@ class TcpConnection(threading.Thread, util.PrintError):
if self.use_ssl: if self.use_ssl:
try: try:
s = ssl.wrap_socket(s, context = self.get_ssl_context(cert_reqs=ssl.CERT_REQUIRED,
ssl_version=ssl.PROTOCOL_TLSv1_1, ca_certs=(temporary_path if is_new else cert_path))
cert_reqs=ssl.CERT_REQUIRED, s = context.wrap_socket(s, do_handshake_on_connect=True)
ca_certs=(temporary_path if is_new else cert_path),
do_handshake_on_connect=True)
except socket.timeout: except socket.timeout:
self.print_error('timeout') self.print_error('timeout')
return return