[Inteproxy-commits] r51 - in trunk: . test

scm-commit@wald.intevation.org scm-commit at wald.intevation.org
Wed Apr 18 17:43:19 CEST 2007


Author: bh
Date: 2007-04-18 17:43:19 +0200 (Wed, 18 Apr 2007)
New Revision: 51

Added:
   trunk/test/test_threadpool.py
   trunk/threadpool.py
Modified:
   trunk/ChangeLog
   trunk/InteProxy.py
Log:
* threadpool.py: New.  Simple thread pool class to replace the
global queue and worker thread function in InteProxy.py

* test/test_threadpool.py: New.  Tests for the thread pool

* InteProxy.py (the_queue, worker_thread): Removed.  The
functionality is now in threadpool.py
(MasterWorkerServer.__init__): New.  Extend base class method to
initialize a thread pool
(MasterWorkerServer.process_request): Handle requests with the
thread pool
(run_server): Adapt to MasterWorkerServer and thread pool changes.


Modified: trunk/ChangeLog
===================================================================
--- trunk/ChangeLog	2007-04-18 12:38:04 UTC (rev 50)
+++ trunk/ChangeLog	2007-04-18 15:43:19 UTC (rev 51)
@@ -1,5 +1,20 @@
 2007-04-18  Bernhard Herzog  <bh at intevation.de>
 
+	* threadpool.py: New.  Simple thread pool class to replace the
+	global queue and worker thread function in InteProxy.py
+
+	* test/test_threadpool.py: New.  Tests for the thread pool
+
+	* InteProxy.py (the_queue, worker_thread): Removed.  The
+	functionality is now in threadpool.py
+	(MasterWorkerServer.__init__): New.  Extend base class method to
+	initialize a thread pool
+	(MasterWorkerServer.process_request): Handle requests with the
+	thread pool
+	(run_server): Adapt to MasterWorkerServer and thread pool changes.
+
+2007-04-18  Bernhard Herzog  <bh at intevation.de>
+
 	* InteProxy.py (InteProxyHTTPRequestHandler): Fix debug level
 	comment.
 

Modified: trunk/InteProxy.py
===================================================================
--- trunk/InteProxy.py	2007-04-18 12:38:04 UTC (rev 50)
+++ trunk/InteProxy.py	2007-04-18 15:43:19 UTC (rev 51)
@@ -21,31 +21,15 @@
 import urlparse
 import urllib2
 import BaseHTTPServer
-import threading
-import Queue
 import socket
 import proxyconnection
 from transcoder import transcoder_map
 from getpassword import getpassword
+from threadpool import ThreadPool
 
 inteproxy_version = "0.1.2"
 
 
-# the central queue for the communication between master and worker
-# thread.
-the_queue = Queue.Queue(0)
-
-
-def worker_thread():
-    """The worker thread
-
-    This function takes requests out of the queue and handles them.
-    """
-    while 1:
-        request = the_queue.get()
-        request()
-
-
 class InteProxyHTTPRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
 
     # Whether the SHUTDOWN method is allowed.  If allowed, anybody who can
@@ -291,6 +275,14 @@
 
     do_shutdown = False
 
+    def __init__(self, server_address, RequestHandlerClass, num_workers):
+        BaseHTTPServer.HTTPServer.__init__(self, server_address,
+                                           RequestHandlerClass)
+        self.thread_pool = ThreadPool(num_workers, lambda f: f())
+        sys.stderr.write("[%s] starting %d worker threads\n" \
+                         % (log_date_time_string(), num_workers))
+        self.thread_pool.start()
+
     def serve_forever(self):
         """Handle requests until self.do_shutdown is True."""
         while not self.do_shutdown:
@@ -320,8 +312,9 @@
                 self.handle_error(request, client_address)
                 self.close_request(request)
         sys.stderr.write("[%s] queue contains %d items\n" %
-                         (log_date_time_string(), the_queue.qsize()))
-        the_queue.put(process_in_worker)
+                         (log_date_time_string(),
+                          self.thread_pool.queue.qsize()))
+        self.thread_pool.put(process_in_worker)
 
     def handle_error(self, request, client_address):
         """Override to integrate the error reporting better with the other logs
@@ -401,15 +394,8 @@
     sys.stderr.write("InteProxy Version %s\n" % inteproxy_version)
     sys.stderr.write("[%s] server starting up\n" % log_date_time_string())
 
-    httpd = ServerClass(server_address, HandlerClass)
+    httpd = ServerClass(server_address, HandlerClass, opts.workers)
 
-    sys.stderr.write("[%s] starting %d worker threads\n" \
-                     % (log_date_time_string(), opts.workers))
-    for i in range(opts.workers):
-        worker = threading.Thread(target = worker_thread)
-        worker.setDaemon(1)
-        worker.start()
-
     print "Serving HTTP on port", opts.port, "..."
     sys.stderr.write("[%s] serving HTTP on port %d\n"
                      % (log_date_time_string(), opts.port))

Added: trunk/test/test_threadpool.py
===================================================================
--- trunk/test/test_threadpool.py	2007-04-18 12:38:04 UTC (rev 50)
+++ trunk/test/test_threadpool.py	2007-04-18 15:43:19 UTC (rev 51)
@@ -0,0 +1,55 @@
+# Copyright (C) 2007 by Intevation GmbH
+# Authors:
+# Bernhard Herzog <bh at intevation.de>
+#
+# This program is free software under the GPL (>=v2)
+# Read the file COPYING coming with the software for details.
+
+"""Tests for ThreadPool"""
+
+import threading
+import unittest
+
+import threadpool
+
+def currentThreadSet():
+    """Returns the currently running threads as a set"""
+    return set(threading.enumerate())
+
+
+class TestThreadPool(unittest.TestCase):
+
+    def test_thread_pool(self):
+
+        # handler function for the thread pool that simply records the
+        # items it processes.
+        processed_items = []
+        def handler(item):
+            processed_items.append(item)
+
+        # snapshot of currently running threads so that we can keep
+        # track of new threads.
+        initial_threads = currentThreadSet()
+
+        # Create a thread pool and start its threads
+        pool = threadpool.ThreadPool(5, handler)
+        pool.start()
+
+        # check that 5 new threads have been started
+        new_threads = currentThreadSet() - initial_threads
+        self.assertEquals(len(new_threads), 5)
+
+        # put some things into the queue.  We cannot check immediately
+        # whether they've been processed, because that happens in a
+        # different thread.  processed
+        pool.put("abc")
+        pool.put("def")
+
+        # stop the threads and check that they're gone
+        pool.stop()
+        remaining_threads = currentThreadSet() - new_threads - initial_threads
+        self.assertEquals(len(remaining_threads), 0)
+
+        # check whether all items put into the queue have been
+        # processed.
+        self.assertEquals(sorted(processed_items), ["abc", "def"])


Property changes on: trunk/test/test_threadpool.py
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + native

Added: trunk/threadpool.py
===================================================================
--- trunk/threadpool.py	2007-04-18 12:38:04 UTC (rev 50)
+++ trunk/threadpool.py	2007-04-18 15:43:19 UTC (rev 51)
@@ -0,0 +1,72 @@
+# Copyright (C) 2007 by Intevation GmbH
+# Authors:
+# Bernhard Herzog <bh at intevation.de>
+#
+# This program is free software under the GPL (>=v2)
+# Read the file COPYING coming with the software for details.
+
+"""Simple pool of worker threads to process tasks like e.g. HTTP requests"""
+
+
+import threading
+import Queue
+
+
+class ThreadPool(object):
+
+    """A class to manage a set of threads that read tasks from a queue.
+
+    The threads managed by the class are started as deamon threads so
+    that it's not required to terminate them if the program using them
+    wants to exit.  Still, they can be explicitly stopped with the stop
+    method.
+
+    The threads read tasks from a Queue object managed by the ThreadPool
+    instance using the queue's blocking get() method.  Tasks taken from
+    the queue are then passed to the handler_function which was
+    specified then the ThreadPool instance was created.  The
+    handler_function is called with one parameter, the task object
+    (which can be any object except None), and should return when the
+    task is finished.  Repeatedly getting new tasks from the queue is
+    handled by the thread pool itself.
+
+    The program using the thread pool puts tasks into the queue with the
+    thread pool's put method.
+    """
+
+    def __init__(self, num_threads, handler_function):
+        """Initialize the thread pool with a number of threads and a handler"""
+        self.queue = Queue.Queue(0)
+        self.num_threads = num_threads
+        self.handler_function = handler_function
+        self.threads = []
+
+    def start(self):
+        """Starts the threads"""
+        self.threads = []
+        for i in range(self.num_threads):
+            thread = threading.Thread(target = self._thread)
+            thread.setDaemon(1)
+            thread.start()
+            self.threads.append(thread)
+
+    def _thread(self):
+        """Takes tasks repeatedly the queue and calls the handler_function
+        This is an internal method.
+        """
+        while 1:
+            item = self.queue.get()
+            if item is None:
+                break
+            self.handler_function(item)
+
+    def stop(self):
+        """Stops the worker threads"""
+        for i in range(len(self.threads)):
+            self.put(None)
+        for thread in self.threads:
+            thread.join()
+
+    def put(self, item):
+        """Puts item into the queue to be processed by a worker thread"""
+        self.queue.put(item)


Property changes on: trunk/threadpool.py
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + native



More information about the Inteproxy-commits mailing list