[Inteproxy-commits] r279 - in trunk: . inteproxy test

scm-commit@wald.intevation.org scm-commit at wald.intevation.org
Fri Sep 24 12:18:13 CEST 2010


Author: iweinzierl
Date: 2010-09-24 12:18:10 +0200 (Fri, 24 Sep 2010)
New Revision: 279

Added:
   trunk/test/test_sslvalidation.py
Modified:
   trunk/ChangeLog
   trunk/inteproxy/httpconnection.py
Log:
Introduced some new functions to verify that a remote url matches a hostname in a certificate. Added testcases for these functions.

Modified: trunk/ChangeLog
===================================================================
--- trunk/ChangeLog	2010-09-21 18:23:10 UTC (rev 278)
+++ trunk/ChangeLog	2010-09-24 10:18:10 UTC (rev 279)
@@ -1,3 +1,17 @@
+2010-09-22  Ingo Weinzierl <ingo.weinzierl at intevation.de>
+
+	* inteproxy/httpconnection.py(extract_identities_from_certificate):
+	Extract identities from certificate based on rfc2818.
+	(create_url_regex): Creates regular expressions that can be used to
+	match domains that might contain wildcards.
+	(is_remote_domain_valid): Function that helps to validate remote urls
+	based on the identities in a certificate.
+	(connect_ssl): New parameters that are required to verify remote server
+	certificates.
+
+	* test/test_sslvalidation.py: New. Some testcases for ssl hostname
+	validation.
+
 2010-09-21  Ingo Weinzierl <ingo.weinzierl at intevation.de>
 
 	* inteproxy.cfg: Introduced a new parameter that defines the path to a

Modified: trunk/inteproxy/httpconnection.py
===================================================================
--- trunk/inteproxy/httpconnection.py	2010-09-21 18:23:10 UTC (rev 278)
+++ trunk/inteproxy/httpconnection.py	2010-09-24 10:18:10 UTC (rev 279)
@@ -22,6 +22,7 @@
 import socket
 import httplib
 import urllib
+import re
 
 # the ssl modules was introduced in Python 2.6.  If available, we use
 # that, otherwise the older ssl support from the socket module
@@ -114,14 +115,113 @@
     return sock
 
 
-def connect_ssl(sock, keyfile=None, certfile=None, log_debug=log_debug):
+def extract_identities_from_certificate(certificate):
+    """Extracts the peer's identity from subjectAltName or subject field that
+    is contained in a peer certificate.
+
+    Both fields contain a tuple that contains a sequence of relative
+    distinguished names (RDN) stored in the certificates data structure (see
+    http://docs.python.org/library/ssl.html for more details about this
+    structure). If there are dNSNames defined in subjectAltName, these
+    identities are returned and the search for commonNames is skiped. In some
+    cases there are iPAddress stored in subjectAltname. For this reason, the
+    return value of this function is a tuple of valid domains and ips."""
+    domains = []
+    ips = []
+    subjectAltName = certificate.get('subjectAltName', ())
+    for key, value in subjectAltName:
+        if key == 'DNS':
+            domains.append(value)
+        if key == 'IP':
+            # XXX Not sure if the name of the field storing IP adresses is
+            # really iPAddress
+            ips.append(value)
+    if len(domains) > 0:
+        # It is not necessary to continue searching for 'commonName' fields in
+        # the certificate, because there is at least one dNSName defined that
+        # MUST be used as the identity of the peer (see
+        # http://www.ietf.org/rfc/rfc2818.txt for further details) 
+        return domains, ips
+
+    subject = certificate.get('subject', ())
+    for rdn in subject:
+        for key, value in rdn:
+            if key == 'commonName' or key == 'dNSName':
+                domains.append(value)
+    return domains, ips
+
+
+def create_url_regex(domain):
+    """This function creates a regular expression that might be used to match
+    certain domains.
+
+    If domain contains wildcard characters '*', these characters are replaced by
+    a regular expression that matches a sequence of valid hostname characters
+    (letters, digits and hyphers). If domain is an invalid hostname, None is
+    returned."""
+    wildcard_char ="*"
+    wildcard_regex = "[a-zA-Z0-9-]*"
+
+    regex = ""
+    first = True
+    parts = domain.split(".")
+    for part in parts:
+        if not part or part[0] != "*" and not part[0].isalnum() or \
+            part[-1] != "*" and not part[-1].isalnum():
+            return None
+        if not first:
+            regex += "\\."
+        first = False
+
+        if wildcard_char in part:
+            regex += part.replace(wildcard_char, wildcard_regex)
+        else:
+            regex += re.escape(part)
+    return regex
+
+
+def is_remote_domain_valid(peercert, hostname, log_debug=log_debug):
+    """This function might be used to verify that hostname matches a valid
+    domain in peercert."""
+    hostname = hostname.lower()
+    parts = hostname.split(".")
+    domains, ips = extract_identities_from_certificate(peercert)
+    for domain in domains:
+        if len(domain.split(".")) != len(parts):
+            continue
+
+        regex = create_url_regex(domain.lower())
+        if regex and re.match(regex, hostname):
+            return True
+    for ip in ips:
+        if hostname == ip:
+            return True
+    log_debug("Server domain %r is not valid." % hostname)
+    return False
+
+
+def connect_ssl(sock, keyfile=None, certfile=None, log_debug=log_debug,
+                ca_certs=None, cert_required=True, remote_url=None):
     """Establish an SSL connection on the socket-like object sock.
     If successful, the function returns a new socket like object that
     uses SSL to encrypt the data.
     """
     log_debug("initiate ssl on %r", sock)
     if ssl is not None:
-        wrapped_sock = ssl.wrap_socket(sock, keyfile, certfile)
+        cert_reqs = ssl.CERT_REQUIRED
+        if not cert_required:
+            cert_reqs = ssl.CERT_NONE
+        wrapped_sock = ssl.wrap_socket(sock, keyfile, certfile,
+                                       ca_certs=ca_certs,
+                                       cert_reqs=cert_reqs)
+        if not cert_required:
+            return wrapped_sock
+
+        peercert = wrapped_sock.getpeercert()
+        if not peercert or not is_remote_domain_valid(peercert, remote_url,
+                                                      log_debug):
+            raise ssl.SSLError("Server domain does not match any identity "
+                               "specified in the certificate.")
     else:
         sslsock = socket.ssl(sock, keyfile, certfile)
         wrapped_sock = httplib.FakeSocket(sock, sslsock)

Added: trunk/test/test_sslvalidation.py
===================================================================
--- trunk/test/test_sslvalidation.py	2010-09-21 18:23:10 UTC (rev 278)
+++ trunk/test/test_sslvalidation.py	2010-09-24 10:18:10 UTC (rev 279)
@@ -0,0 +1,106 @@
+# Copyright (C) 2010 by Intevation GmbH
+# Authors:
+# Bernhard Herzog <bh at intevation.de>
+#
+# This program is free software under the GPL (>=v2)
+# Read the file COPYING coming with the software for details.
+
+"""Tests for InteProxy's SSL validation"""
+
+import unittest
+
+from inteproxy.httpconnection import is_remote_domain_valid
+
+
+class TestHostNameChecks(unittest.TestCase):
+
+    def match(self, hostname, **cert_dict):
+        self.failUnless(is_remote_domain_valid(cert_dict, hostname))
+
+    def nomatch(self, hostname, **cert_dict):
+        self.failIf(is_remote_domain_valid(cert_dict, hostname))
+
+    #
+    # Test for hostnames that should match
+    #
+
+    def test_cn_exact(self):
+        self.match("intevation.de",
+                   subject=((("commonName", "intevation.de"),),))
+
+    def test_cn_any_single_subdomain(self):
+        self.match("www.intevation.de",
+                   subject=((("commonName", "*.intevation.de"),),))
+
+    def test_cn_partial_single_subdomain(self):
+        self.match("ssl.intevation.de",
+                   subject=((("commonName", "s*.intevation.de"),),))
+
+    def test_sn_exact(self):
+        self.match("intevation.de",
+                   subjectAltName=(("DNS", "*.intevation.de"),
+                                   ("DNS", "intevation.de")))
+
+    def test_sn_any_single_subdomain(self):
+        self.match("inteproxy-demo.intevation.de",
+                   subjectAltName=(("DNS", "*.intevation.de"),
+                                   ("DNS", "intevation.de")))
+
+    def test_hostname_lower_case(self):
+        self.match("intevation.de",
+                   subject=((("commonName", "INtevATioN.DE"),),))
+
+    def test_remote_domain_lower_case(self):
+        self.match("inTEvaTIon.de",
+                   subject=((("commonName", "INTEVATION.DE"),),))
+
+    def test_both_mixed_case(self):
+        self.match("inTEvaTIon.de",
+                   subject=((("commonName", "INtevATioN.DE"),),))
+
+    def test_hostname_contains_alphanum(self):
+        self.match("intevation1999.de",
+                   subject=((("commonName", "intevation1999.de"),),))
+
+    def test_hostname_contains_hypher(self):
+        self.match("intevation-1999.de",
+                   subject=((("commonName", "intevation-1999.de"),),))
+
+    #
+    # Test for hostnames that should NOT match
+    #
+
+    def test_cn_too_many_subdomains(self):
+        self.nomatch("foo.www.intevation.de",
+                     subject=((("commonName", "*.intevation.de"),),))
+
+    def test_cn_matches_only_subdomain_of_hostname(self):
+        self.nomatch("www.intevation.de.evil.example.com",
+                     subject=((("commonName", "*.intevation.de"),),))
+
+
+    def test_sn_cn_but_only_cn_matches(self):
+        self.nomatch("www.intevation.de.evil.example.com",
+                     subject=((("commonName", "www.intevation.de"),),),
+                     subjectAltName=(("DNS", "ssl.intevation.de"),))
+
+    def test_hostname_start_with_wrong_char(self):
+        self.nomatch("-intevation.de",
+                     subjectAltName=(("DNS", "-intevation.de"),))
+
+    def test_hostname_ends_with_wrong_char(self):
+        self.nomatch("intevation-.de",
+                     subjectAltName=(("DNS", "intevation-.de"),))
+
+    def test_hostname_ends_with_wrong_newline(self):
+        self.nomatch("intevation\n.de",
+                     subjectAltName=(("DNS", "intevation\n.de"),))
+
+    def test_hostname_contains_invalid_char(self):
+        self.nomatch("de-intevation.de",
+                     subjectAltName=(("DNS", "dem?-intevation.de"),))
+
+    def test_hostname_contains_empty_component(self):
+        self.nomatch("..de",
+                     subjectAltName=(("DNS", "..de"),))
+



More information about the Inteproxy-commits mailing list