[Inteproxy-commits] r355 - in branches/compression: . inteproxy

scm-commit at wald.intevation.org scm-commit at wald.intevation.org
Wed Feb 29 10:57:32 CET 2012


Author: aheinecke
Date: 2012-02-29 10:57:31 +0100 (Wed, 29 Feb 2012)
New Revision: 355

Added:
   branches/compression/inteproxy/decompressstream.py
Modified:
   branches/compression/ChangeLog
   branches/compression/inteproxy/httpmessage.py
   branches/compression/inteproxy/proxycore.py
Log:
Refactor compression, HTTPMessage now reads from an
internal _body_stream which is a DecompressStream
in the case that do_decompress is called before reading.

do_decompress also now modifies the header to remove
the Content-Encoding and lenght.

The body stream in httpmessage is now also set
when the body is set.



Modified: branches/compression/ChangeLog
===================================================================
--- branches/compression/ChangeLog	2012-02-24 16:04:28 UTC (rev 354)
+++ branches/compression/ChangeLog	2012-02-29 09:57:31 UTC (rev 355)
@@ -1,3 +1,16 @@
+2012-02-29	Andre Heinecke	<aheinecke at intevation.de>
+	* M inteproxy/proxycore.py:
+	  Remove decompressed_read method.
+	  Remove method get_decompress_object
+	* M inteproxy/httpmessage.py:
+	  Added do_decompress method to httpresponse to select decompression 
+	  of the response.
+	  Read does now create a decompression object if necessary to decompress
+	  the input stream of the response.
+	* A inteproxy/decompressstream.py:
+	  Add decompress stream class to wrap around an input stream for
+	  decompressed reading.
+
 2012-02-24	Andre Heinecke	<aheinecke at intevation.de>
 	* M inteproxy/proxycore.py:
 	  Add method decompressed_read to read decompressed

Added: branches/compression/inteproxy/decompressstream.py
===================================================================
--- branches/compression/inteproxy/decompressstream.py	                        (rev 0)
+++ branches/compression/inteproxy/decompressstream.py	2012-02-29 09:57:31 UTC (rev 355)
@@ -0,0 +1,41 @@
+# Copyright (C) 2012 by Intevation GmbH
+# Authors:
+# Bernhard Herzog <bh at intevation.de>
+# Andre Heinecke <aheinecke at intevation.de>
+#
+# This program is free software under the GPL (>=v2)
+# Read the file COPYING coming with the software for details.
+
+""" On the fly decompression of a data stream """
+
+class DecompressStream(object):
+    """A class to wrap around a data stream that contains
+    compressed data.
+
+    The decompression object can be given at construction time.
+    """
+
+    def __init__(self, infile, decompressobj):
+        """
+        Initialize the DecompressStream with a input stream and a decompressor
+        """
+        self.infile = infile
+        self.decompressor = decompressobj
+
+    def read(self, amount):
+        if amount == -1:
+            return self.decompressor.decompress(self.infile.read())
+        decompressed_chunks = []
+        count = 0
+        while count < amount:
+            max_read = amount - count
+            compressed = self.decompressor.unconsumed_tail
+            if not compressed:
+                compressed = self.infile.read(amount)
+                if not compressed:
+                    break
+            deflated = self.decompressor.decompress(compressed, max_read)
+            count += len(deflated)
+            decompressed_chunks.append(deflated)
+
+        return "".join(decompressed_chunks)

Modified: branches/compression/inteproxy/httpmessage.py
===================================================================
--- branches/compression/inteproxy/httpmessage.py	2012-02-24 16:04:28 UTC (rev 354)
+++ branches/compression/inteproxy/httpmessage.py	2012-02-29 09:57:31 UTC (rev 355)
@@ -8,6 +8,8 @@
 """Abstractions for http request and response messages"""
 
 from StringIO import StringIO
+from inteproxy.decompressstream import DecompressStream
+from zlib import decompressobj, MAX_WBITS
 
 class HTTPMessage(object):
 
@@ -66,6 +68,7 @@
         if content_type is not None:
             self.headers["Content-type"] = content_type
         self._body = body
+        self._body_stream = StringIO(self.body)
 
     def get_body(self):
         self.read_entire_message()
@@ -77,8 +80,6 @@
         raise NotImplementedError
 
     def read(self, amount):
-        if self._body_stream is None and self.body_has_been_read():
-            self._body_stream = StringIO(self.body)
         if self._body_stream is not None:
             data = self._body_stream.read(amount)
         else:
@@ -145,28 +146,87 @@
         self.version = version
         self.status = status
         self.reason = reason
+        self.__decompress = False
+        self.started_reading = False
 
     def debug_log_message(self, log_function):
         log_function("HTTPResponseMessage: %s %s %s",
                      self.version, self.status, self.reason)
         super(HTTPResponseMessage, self).debug_log_message(log_function)
 
-    def read_entire_message(self, decompressor = None):
+    def read_entire_message(self):
         """
         Read the entire message and set the messages body.
         If the optional decompressor parameter is given the
         body will be decompressed.
         """
-        if self.body_has_been_read():
+        if not self.body_has_been_read():
+            self.set_body(self.read())
+
+    def do_decompress(self):
+        """
+        Decompress the input stream on read if possible.
+
+        If the content-encoding of the message is either gzip
+        or deflate the Content-Encoding and Content-Length
+        headers will also be removed.
+        """
+        if self.__decompress:
             return
-        length = int(self.headers.get("Content-Length", "0"))
-        if length:
-            # Not using chunked so we read everything at once
-            if decompressor:
-                self.set_body(decompressor.decompress(self.read(length)))
+
+        if not self.headers.get("Content-Encoding"):
+            return
+
+        if self.headers.get("Content-Encoding") == "deflate":
+            self.__decompress = "deflate"
+        elif self.headers.get("Content-Encoding") == "gzip":
+            self.__decompress = "gzip"
+
+        if self.__decompress:
+            # Can decompress the input
+            if self.started_reading:
+                self.__decompress = False
+                raise Exception("do_decompress called after first read")
+
+            del self.headers["Content-Encoding"]
+            if self.headers.get("Content-Length"):
+                del self.headers["Content-Length"]
+
+    def read(self, amount = -1):
+        """
+        Read the message up to amount bytes and return a data string of
+        length amount.
+        If do_decompress was called before this will return the data in
+        a decompressed form if possible.
+        """
+
+        if self.started_reading == False and amount != 0:
+            self.started_reading = True
+
+        if self._body_stream is None:
+            if self.__decompress:
+                # Create the decompression object
+                #
+                # On defate decompression -zlib.MAX_WBITS is given to ensure
+                # that non RFC confirming responses as they are sent by most
+                # http servers are decompressed correctly by ignoring a
+                # possibly invlaid header.
+
+                # To decompress gzip with zlib 16 needs to be added to the
+                # wbits parameter
+
+                # See the documention of inflateInit2 at
+                # http://zlib.net/manual.html
+
+                if self.__decompress == "deflate":
+                    self._body_stream = DecompressStream(self.infile,
+                            decompressobj(-MAX_WBITS))
+                elif self.__decompress == "gzip":
+                    self._body_stream = DecompressStream(self.infile,
+                            decompressobj(16 + MAX_WBITS))
+                else:
+                    raise Exception("Invalid decompress method"
+                            " in HTTPResponse")
             else:
-                self.set_body(self.read(length))
-        elif not self.headers.get("Content-Length"):
-            # Can't read the entire message because we are chunked
-            # FIXME how to handle this?
-            pass
+                self._body_stream = self.infile
+        return self._body_stream.read(amount)

Modified: branches/compression/inteproxy/proxycore.py
===================================================================
--- branches/compression/inteproxy/proxycore.py	2012-02-24 16:04:28 UTC (rev 354)
+++ branches/compression/inteproxy/proxycore.py	2012-02-29 09:57:31 UTC (rev 355)
@@ -330,36 +330,6 @@
             self.send_header(header, value)
         self.end_headers()
 
-    def get_decompress_object(self, response):
-        # Set up the response for decompression
-
-        # On defate decompression -zlib.MAX_WBITS is given to ensure
-        # that non RFC confirming responses as they are sent by most
-        # http servers are decompressed correctly by ignoring a
-        # possibly invlaid header.
-
-        # To decompress gzip with zlib 16 needs to be added to the wbits
-        # parameter
-
-        # See the documention of inflateInit2 at http://zlib.net/manual.html
-        if response.headers.get("Content-Encoding") == "deflate":
-            return zlib.decompressobj(-zlib.MAX_WBITS)
-        elif response.headers.get("Content-Encoding") == "gzip":
-            return zlib.decompressobj(16 + zlib.MAX_WBITS)
-
-    def decompressed_read(self, orig_read, response, amount):
-        # Read <amount> bytes with the orig_read function from 
-        # response and return it decompressed
-        # if self.decompressor is set.
-        # The Return value can be larger then amount
-        raw_data = orig_read(amount)
-        if self.decompressor:
-            if not response.body_has_been_read():
-                return self.decompressor.decompress(raw_data)
-            else:
-                return raw_data
-        return raw_data
-
     def handle_response(self, response):
         # The HTTP version in the reply generated by send_response is
         # taken from self.protocol_version.  We simply set it to
@@ -373,12 +343,7 @@
         do_chunked = response.headers.get("Transfer-encoding") == "chunked"
 
         if do_rewrite or self.should_decompress_response:
-            self.decompressor = self.get_decompress_object(response)
-            if self.decompressor:
-                # Reflect the decompression in the headers
-                del response.headers["Content-Encoding"]
-                if int(response.headers.get("Content-Length", "0")):
-                    response.read_entire_message(self.decompressor)
+            response.do_decompress()
 
         if do_chunked and do_rewrite:
             self.send_headers(response)
@@ -441,7 +406,7 @@
         append = data.append
 
         while True:
-            chunk = self.decompressed_read(read, response, length)
+            chunk = response.read(length)
             if not chunk:
                 break
 
@@ -490,7 +455,7 @@
                 chunk_size = min(length, max_chunk_size)
             else:
                 chunk_size = max_chunk_size
-            chunk = self.decompressed_read(read, response, chunk_size)
+            chunk = response.read(chunk_size)
             if not chunk:
                 break
             if length is not None:



More information about the Inteproxy-commits mailing list