[Inteproxy-commits] r51 - in trunk: . test
scm-commit@wald.intevation.org
scm-commit at wald.intevation.org
Wed Apr 18 17:43:19 CEST 2007
Author: bh
Date: 2007-04-18 17:43:19 +0200 (Wed, 18 Apr 2007)
New Revision: 51
Added:
trunk/test/test_threadpool.py
trunk/threadpool.py
Modified:
trunk/ChangeLog
trunk/InteProxy.py
Log:
* threadpool.py: New. Simple thread pool class to replace the
global queue and worker thread function in InteProxy.py
* test/test_threadpool.py: New. Tests for the thread pool
* InteProxy.py (the_queue, worker_thread): Removed. The
functionality is now in threadpool.py
(MasterWorkerServer.__init__): New. Extend base class method to
initialize a thread pool
(MasterWorkerServer.process_request): Handle requests with the
thread pool
(run_server): Adapt to MasterWorkerServer and thread pool changes.
Modified: trunk/ChangeLog
===================================================================
--- trunk/ChangeLog 2007-04-18 12:38:04 UTC (rev 50)
+++ trunk/ChangeLog 2007-04-18 15:43:19 UTC (rev 51)
@@ -1,5 +1,20 @@
2007-04-18 Bernhard Herzog <bh at intevation.de>
+ * threadpool.py: New. Simple thread pool class to replace the
+ global queue and worker thread function in InteProxy.py
+
+ * test/test_threadpool.py: New. Tests for the thread pool
+
+ * InteProxy.py (the_queue, worker_thread): Removed. The
+ functionality is now in threadpool.py
+ (MasterWorkerServer.__init__): New. Extend base class method to
+ initialize a thread pool
+ (MasterWorkerServer.process_request): Handle requests with the
+ thread pool
+ (run_server): Adapt to MasterWorkerServer and thread pool changes.
+
+2007-04-18 Bernhard Herzog <bh at intevation.de>
+
* InteProxy.py (InteProxyHTTPRequestHandler): Fix debug level
comment.
Modified: trunk/InteProxy.py
===================================================================
--- trunk/InteProxy.py 2007-04-18 12:38:04 UTC (rev 50)
+++ trunk/InteProxy.py 2007-04-18 15:43:19 UTC (rev 51)
@@ -21,31 +21,15 @@
import urlparse
import urllib2
import BaseHTTPServer
-import threading
-import Queue
import socket
import proxyconnection
from transcoder import transcoder_map
from getpassword import getpassword
+from threadpool import ThreadPool
inteproxy_version = "0.1.2"
-# the central queue for the communication between master and worker
-# thread.
-the_queue = Queue.Queue(0)
-
-
-def worker_thread():
- """The worker thread
-
- This function takes requests out of the queue and handles them.
- """
- while 1:
- request = the_queue.get()
- request()
-
-
class InteProxyHTTPRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
# Whether the SHUTDOWN method is allowed. If allowed, anybody who can
@@ -291,6 +275,14 @@
do_shutdown = False
+ def __init__(self, server_address, RequestHandlerClass, num_workers):
+ BaseHTTPServer.HTTPServer.__init__(self, server_address,
+ RequestHandlerClass)
+ self.thread_pool = ThreadPool(num_workers, lambda f: f())
+ sys.stderr.write("[%s] starting %d worker threads\n" \
+ % (log_date_time_string(), num_workers))
+ self.thread_pool.start()
+
def serve_forever(self):
"""Handle requests until self.do_shutdown is True."""
while not self.do_shutdown:
@@ -320,8 +312,9 @@
self.handle_error(request, client_address)
self.close_request(request)
sys.stderr.write("[%s] queue contains %d items\n" %
- (log_date_time_string(), the_queue.qsize()))
- the_queue.put(process_in_worker)
+ (log_date_time_string(),
+ self.thread_pool.queue.qsize()))
+ self.thread_pool.put(process_in_worker)
def handle_error(self, request, client_address):
"""Override to integrate the error reporting better with the other logs
@@ -401,15 +394,8 @@
sys.stderr.write("InteProxy Version %s\n" % inteproxy_version)
sys.stderr.write("[%s] server starting up\n" % log_date_time_string())
- httpd = ServerClass(server_address, HandlerClass)
+ httpd = ServerClass(server_address, HandlerClass, opts.workers)
- sys.stderr.write("[%s] starting %d worker threads\n" \
- % (log_date_time_string(), opts.workers))
- for i in range(opts.workers):
- worker = threading.Thread(target = worker_thread)
- worker.setDaemon(1)
- worker.start()
-
print "Serving HTTP on port", opts.port, "..."
sys.stderr.write("[%s] serving HTTP on port %d\n"
% (log_date_time_string(), opts.port))
Added: trunk/test/test_threadpool.py
===================================================================
--- trunk/test/test_threadpool.py 2007-04-18 12:38:04 UTC (rev 50)
+++ trunk/test/test_threadpool.py 2007-04-18 15:43:19 UTC (rev 51)
@@ -0,0 +1,55 @@
+# Copyright (C) 2007 by Intevation GmbH
+# Authors:
+# Bernhard Herzog <bh at intevation.de>
+#
+# This program is free software under the GPL (>=v2)
+# Read the file COPYING coming with the software for details.
+
+"""Tests for ThreadPool"""
+
+import threading
+import unittest
+
+import threadpool
+
+def currentThreadSet():
+ """Returns the currently running threads as a set"""
+ return set(threading.enumerate())
+
+
+class TestThreadPool(unittest.TestCase):
+
+ def test_thread_pool(self):
+
+ # handler function for the thread pool that simply records the
+ # items it processes.
+ processed_items = []
+ def handler(item):
+ processed_items.append(item)
+
+ # snapshot of currently running threads so that we can keep
+ # track of new threads.
+ initial_threads = currentThreadSet()
+
+ # Create a thread pool and start its threads
+ pool = threadpool.ThreadPool(5, handler)
+ pool.start()
+
+ # check that 5 new threads have been started
+ new_threads = currentThreadSet() - initial_threads
+ self.assertEquals(len(new_threads), 5)
+
+ # put some things into the queue. We cannot check immediately
+ # whether they've been processed, because that happens in a
+ # different thread. processed
+ pool.put("abc")
+ pool.put("def")
+
+ # stop the threads and check that they're gone
+ pool.stop()
+ remaining_threads = currentThreadSet() - new_threads - initial_threads
+ self.assertEquals(len(remaining_threads), 0)
+
+ # check whether all items put into the queue have been
+ # processed.
+ self.assertEquals(sorted(processed_items), ["abc", "def"])
Property changes on: trunk/test/test_threadpool.py
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ native
Added: trunk/threadpool.py
===================================================================
--- trunk/threadpool.py 2007-04-18 12:38:04 UTC (rev 50)
+++ trunk/threadpool.py 2007-04-18 15:43:19 UTC (rev 51)
@@ -0,0 +1,72 @@
+# Copyright (C) 2007 by Intevation GmbH
+# Authors:
+# Bernhard Herzog <bh at intevation.de>
+#
+# This program is free software under the GPL (>=v2)
+# Read the file COPYING coming with the software for details.
+
+"""Simple pool of worker threads to process tasks like e.g. HTTP requests"""
+
+
+import threading
+import Queue
+
+
+class ThreadPool(object):
+
+ """A class to manage a set of threads that read tasks from a queue.
+
+ The threads managed by the class are started as deamon threads so
+ that it's not required to terminate them if the program using them
+ wants to exit. Still, they can be explicitly stopped with the stop
+ method.
+
+ The threads read tasks from a Queue object managed by the ThreadPool
+ instance using the queue's blocking get() method. Tasks taken from
+ the queue are then passed to the handler_function which was
+ specified then the ThreadPool instance was created. The
+ handler_function is called with one parameter, the task object
+ (which can be any object except None), and should return when the
+ task is finished. Repeatedly getting new tasks from the queue is
+ handled by the thread pool itself.
+
+ The program using the thread pool puts tasks into the queue with the
+ thread pool's put method.
+ """
+
+ def __init__(self, num_threads, handler_function):
+ """Initialize the thread pool with a number of threads and a handler"""
+ self.queue = Queue.Queue(0)
+ self.num_threads = num_threads
+ self.handler_function = handler_function
+ self.threads = []
+
+ def start(self):
+ """Starts the threads"""
+ self.threads = []
+ for i in range(self.num_threads):
+ thread = threading.Thread(target = self._thread)
+ thread.setDaemon(1)
+ thread.start()
+ self.threads.append(thread)
+
+ def _thread(self):
+ """Takes tasks repeatedly the queue and calls the handler_function
+ This is an internal method.
+ """
+ while 1:
+ item = self.queue.get()
+ if item is None:
+ break
+ self.handler_function(item)
+
+ def stop(self):
+ """Stops the worker threads"""
+ for i in range(len(self.threads)):
+ self.put(None)
+ for thread in self.threads:
+ thread.join()
+
+ def put(self, item):
+ """Puts item into the queue to be processed by a worker thread"""
+ self.queue.put(item)
Property changes on: trunk/threadpool.py
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ native
More information about the Inteproxy-commits
mailing list