From 07e9415c2d020e5b67bee4c2e1c2520fadb43614 Mon Sep 17 00:00:00 2001 From: SomberNight Date: Fri, 10 Nov 2017 21:39:20 +0100 Subject: [PATCH] interface ssl: besides TLS 1.1, also allow later versions --- lib/interface.py | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/lib/interface.py b/lib/interface.py index e269e957..ef4fcfcd 100644 --- a/lib/interface.py +++ b/lib/interface.py @@ -124,6 +124,18 @@ class TcpConnection(threading.Thread, util.PrintError): else: self.print_error("failed to connect", str(e)) + @staticmethod + def get_ssl_context(cert_reqs, ca_certs): + context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=ca_certs) + context.check_hostname = False + context.verify_mode = cert_reqs + + context.options |= ssl.OP_NO_SSLv2 + context.options |= ssl.OP_NO_SSLv3 + context.options |= ssl.OP_NO_TLSv1 + + return context + def get_socket(self): if self.use_ssl: cert_path = os.path.join(self.config_path, 'certs', self.host) @@ -134,7 +146,8 @@ class TcpConnection(threading.Thread, util.PrintError): return # try with CA first try: - s = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1_1, cert_reqs=ssl.CERT_REQUIRED, ca_certs=ca_path, do_handshake_on_connect=True) + context = self.get_ssl_context(cert_reqs=ssl.CERT_REQUIRED, ca_certs=ca_path) + s = context.wrap_socket(s, do_handshake_on_connect=True) except ssl.SSLError as e: print_error(e) s = None @@ -150,7 +163,8 @@ class TcpConnection(threading.Thread, util.PrintError): if s is None: return try: - s = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1_1, cert_reqs=ssl.CERT_NONE, ca_certs=None) + context = self.get_ssl_context(cert_reqs=ssl.CERT_NONE, ca_certs=None) + s = context.wrap_socket(s) except ssl.SSLError as e: self.print_error("SSL error retrieving SSL certificate:", e) return @@ -174,11 +188,9 @@ class TcpConnection(threading.Thread, util.PrintError): if self.use_ssl: try: - s = ssl.wrap_socket(s, - ssl_version=ssl.PROTOCOL_TLSv1_1, - cert_reqs=ssl.CERT_REQUIRED, - ca_certs=(temporary_path if is_new else cert_path), - do_handshake_on_connect=True) + context = self.get_ssl_context(cert_reqs=ssl.CERT_REQUIRED, + ca_certs=(temporary_path if is_new else cert_path)) + s = context.wrap_socket(s, do_handshake_on_connect=True) except socket.timeout: self.print_error('timeout') return