[Inteproxy-commits] r279 - in trunk: . inteproxy test
scm-commit@wald.intevation.org
scm-commit at wald.intevation.org
Fri Sep 24 12:18:13 CEST 2010
Author: iweinzierl
Date: 2010-09-24 12:18:10 +0200 (Fri, 24 Sep 2010)
New Revision: 279
Added:
trunk/test/test_sslvalidation.py
Modified:
trunk/ChangeLog
trunk/inteproxy/httpconnection.py
Log:
Introduced some new functions to verify that a remote url matches a hostname in a certificate. Added testcases for these functions.
Modified: trunk/ChangeLog
===================================================================
--- trunk/ChangeLog 2010-09-21 18:23:10 UTC (rev 278)
+++ trunk/ChangeLog 2010-09-24 10:18:10 UTC (rev 279)
@@ -1,3 +1,17 @@
+2010-09-22 Ingo Weinzierl <ingo.weinzierl at intevation.de>
+
+ * inteproxy/httpconnection.py(extract_identities_from_certificate):
+ Extract identities from certificate based on rfc2818.
+ (create_url_regex): Creates regular expressions that can be used to
+ match domains that might contain wildcards.
+ (is_remote_domain_valid): Function that helps to validate remote urls
+ based on the identities in a certificate.
+ (connect_ssl): New parameters that are required to verify remote server
+ certificates.
+
+ * test/test_sslvalidation.py: New. Some testcases for ssl hostname
+ validation.
+
2010-09-21 Ingo Weinzierl <ingo.weinzierl at intevation.de>
* inteproxy.cfg: Introduced a new parameter that defines the path to a
Modified: trunk/inteproxy/httpconnection.py
===================================================================
--- trunk/inteproxy/httpconnection.py 2010-09-21 18:23:10 UTC (rev 278)
+++ trunk/inteproxy/httpconnection.py 2010-09-24 10:18:10 UTC (rev 279)
@@ -22,6 +22,7 @@
import socket
import httplib
import urllib
+import re
# the ssl modules was introduced in Python 2.6. If available, we use
# that, otherwise the older ssl support from the socket module
@@ -114,14 +115,113 @@
return sock
-def connect_ssl(sock, keyfile=None, certfile=None, log_debug=log_debug):
+def extract_identities_from_certificate(certificate):
+ """Extracts the peer's identity from subjectAltName or subject field that
+ is contained in a peer certificate.
+
+ Both fields contain a tuple that contains a sequence of relative
+ distinguished names (RDN) stored in the certificates data structure (see
+ http://docs.python.org/library/ssl.html for more details about this
+ structure). If there are dNSNames defined in subjectAltName, these
+ identities are returned and the search for commonNames is skiped. In some
+ cases there are iPAddress stored in subjectAltname. For this reason, the
+ return value of this function is a tuple of valid domains and ips."""
+ domains = []
+ ips = []
+ subjectAltName = certificate.get('subjectAltName', ())
+ for key, value in subjectAltName:
+ if key == 'DNS':
+ domains.append(value)
+ if key == 'IP':
+ # XXX Not sure if the name of the field storing IP adresses is
+ # really iPAddress
+ ips.append(value)
+ if len(domains) > 0:
+ # It is not necessary to continue searching for 'commonName' fields in
+ # the certificate, because there is at least one dNSName defined that
+ # MUST be used as the identity of the peer (see
+ # http://www.ietf.org/rfc/rfc2818.txt for further details)
+ return domains, ips
+
+ subject = certificate.get('subject', ())
+ for rdn in subject:
+ for key, value in rdn:
+ if key == 'commonName' or key == 'dNSName':
+ domains.append(value)
+ return domains, ips
+
+
+def create_url_regex(domain):
+ """This function creates a regular expression that might be used to match
+ certain domains.
+
+ If domain contains wildcard characters '*', these characters are replaced by
+ a regular expression that matches a sequence of valid hostname characters
+ (letters, digits and hyphers). If domain is an invalid hostname, None is
+ returned."""
+ wildcard_char ="*"
+ wildcard_regex = "[a-zA-Z0-9-]*"
+
+ regex = ""
+ first = True
+ parts = domain.split(".")
+ for part in parts:
+ if not part or part[0] != "*" and not part[0].isalnum() or \
+ part[-1] != "*" and not part[-1].isalnum():
+ return None
+ if not first:
+ regex += "\\."
+ first = False
+
+ if wildcard_char in part:
+ regex += part.replace(wildcard_char, wildcard_regex)
+ else:
+ regex += re.escape(part)
+ return regex
+
+
+def is_remote_domain_valid(peercert, hostname, log_debug=log_debug):
+ """This function might be used to verify that hostname matches a valid
+ domain in peercert."""
+ hostname = hostname.lower()
+ parts = hostname.split(".")
+ domains, ips = extract_identities_from_certificate(peercert)
+ for domain in domains:
+ if len(domain.split(".")) != len(parts):
+ continue
+
+ regex = create_url_regex(domain.lower())
+ if regex and re.match(regex, hostname):
+ return True
+ for ip in ips:
+ if hostname == ip:
+ return True
+ log_debug("Server domain %r is not valid." % hostname)
+ return False
+
+
+def connect_ssl(sock, keyfile=None, certfile=None, log_debug=log_debug,
+ ca_certs=None, cert_required=True, remote_url=None):
"""Establish an SSL connection on the socket-like object sock.
If successful, the function returns a new socket like object that
uses SSL to encrypt the data.
"""
log_debug("initiate ssl on %r", sock)
if ssl is not None:
- wrapped_sock = ssl.wrap_socket(sock, keyfile, certfile)
+ cert_reqs = ssl.CERT_REQUIRED
+ if not cert_required:
+ cert_reqs = ssl.CERT_NONE
+ wrapped_sock = ssl.wrap_socket(sock, keyfile, certfile,
+ ca_certs=ca_certs,
+ cert_reqs=cert_reqs)
+ if not cert_required:
+ return wrapped_sock
+
+ peercert = wrapped_sock.getpeercert()
+ if not peercert or not is_remote_domain_valid(peercert, remote_url,
+ log_debug):
+ raise ssl.SSLError("Server domain does not match any identity "
+ "specified in the certificate.")
else:
sslsock = socket.ssl(sock, keyfile, certfile)
wrapped_sock = httplib.FakeSocket(sock, sslsock)
Added: trunk/test/test_sslvalidation.py
===================================================================
--- trunk/test/test_sslvalidation.py 2010-09-21 18:23:10 UTC (rev 278)
+++ trunk/test/test_sslvalidation.py 2010-09-24 10:18:10 UTC (rev 279)
@@ -0,0 +1,106 @@
+# Copyright (C) 2010 by Intevation GmbH
+# Authors:
+# Bernhard Herzog <bh at intevation.de>
+#
+# This program is free software under the GPL (>=v2)
+# Read the file COPYING coming with the software for details.
+
+"""Tests for InteProxy's SSL validation"""
+
+import unittest
+
+from inteproxy.httpconnection import is_remote_domain_valid
+
+
+class TestHostNameChecks(unittest.TestCase):
+
+ def match(self, hostname, **cert_dict):
+ self.failUnless(is_remote_domain_valid(cert_dict, hostname))
+
+ def nomatch(self, hostname, **cert_dict):
+ self.failIf(is_remote_domain_valid(cert_dict, hostname))
+
+ #
+ # Test for hostnames that should match
+ #
+
+ def test_cn_exact(self):
+ self.match("intevation.de",
+ subject=((("commonName", "intevation.de"),),))
+
+ def test_cn_any_single_subdomain(self):
+ self.match("www.intevation.de",
+ subject=((("commonName", "*.intevation.de"),),))
+
+ def test_cn_partial_single_subdomain(self):
+ self.match("ssl.intevation.de",
+ subject=((("commonName", "s*.intevation.de"),),))
+
+ def test_sn_exact(self):
+ self.match("intevation.de",
+ subjectAltName=(("DNS", "*.intevation.de"),
+ ("DNS", "intevation.de")))
+
+ def test_sn_any_single_subdomain(self):
+ self.match("inteproxy-demo.intevation.de",
+ subjectAltName=(("DNS", "*.intevation.de"),
+ ("DNS", "intevation.de")))
+
+ def test_hostname_lower_case(self):
+ self.match("intevation.de",
+ subject=((("commonName", "INtevATioN.DE"),),))
+
+ def test_remote_domain_lower_case(self):
+ self.match("inTEvaTIon.de",
+ subject=((("commonName", "INTEVATION.DE"),),))
+
+ def test_both_mixed_case(self):
+ self.match("inTEvaTIon.de",
+ subject=((("commonName", "INtevATioN.DE"),),))
+
+ def test_hostname_contains_alphanum(self):
+ self.match("intevation1999.de",
+ subject=((("commonName", "intevation1999.de"),),))
+
+ def test_hostname_contains_hypher(self):
+ self.match("intevation-1999.de",
+ subject=((("commonName", "intevation-1999.de"),),))
+
+ #
+ # Test for hostnames that should NOT match
+ #
+
+ def test_cn_too_many_subdomains(self):
+ self.nomatch("foo.www.intevation.de",
+ subject=((("commonName", "*.intevation.de"),),))
+
+ def test_cn_matches_only_subdomain_of_hostname(self):
+ self.nomatch("www.intevation.de.evil.example.com",
+ subject=((("commonName", "*.intevation.de"),),))
+
+
+ def test_sn_cn_but_only_cn_matches(self):
+ self.nomatch("www.intevation.de.evil.example.com",
+ subject=((("commonName", "www.intevation.de"),),),
+ subjectAltName=(("DNS", "ssl.intevation.de"),))
+
+ def test_hostname_start_with_wrong_char(self):
+ self.nomatch("-intevation.de",
+ subjectAltName=(("DNS", "-intevation.de"),))
+
+ def test_hostname_ends_with_wrong_char(self):
+ self.nomatch("intevation-.de",
+ subjectAltName=(("DNS", "intevation-.de"),))
+
+ def test_hostname_ends_with_wrong_newline(self):
+ self.nomatch("intevation\n.de",
+ subjectAltName=(("DNS", "intevation\n.de"),))
+
+ def test_hostname_contains_invalid_char(self):
+ self.nomatch("de-intevation.de",
+ subjectAltName=(("DNS", "dem?-intevation.de"),))
+
+ def test_hostname_contains_empty_component(self):
+ self.nomatch("..de",
+ subjectAltName=(("DNS", "..de"),))
+
More information about the Inteproxy-commits
mailing list