ThreadPoolingMixIn

P

pavel.uvarov

Hi, everybody!

I wrote a useful class ThreadPoolingMixIn which can be used to create
fast thread-based servers. This mix-in works much faster than
ThreadingMixIn because it doesn't create a new thread on each request.

Is it worth including in SocketServer.py?


from __future__ import with_statement
from SocketServer import ThreadingMixIn
import threading
import Queue
class ThreadPoolingMixIn(ThreadingMixIn):
"""Mix-in class to handle requests in a thread
pool.

The pool grows and thrinks depending on
load.

For instance, a threading UDP server class is created as
follows:

class ThreadPoolingUDPServer(ThreadPoolingMixIn, UDPServer):
pass

"""
__author__ = 'Pavel Uvarov <[email protected]>'

def init_thread_pool(self, min_workers = 5,
max_workers = 100, min_spare_workers = 5):
"""Initialize thread pool."""
self.q = Queue.Queue()
self.min_workers = min_workers
self.max_workers = max_workers
self.min_spare_workers = min_spare_workers
self.num_workers = 0
self.num_busy_workers = 0
self.workers_mutex = threading.Lock()
self.start_workers(self.min_workers)

def start_workers(self, n):
"""Start n workers."""
for i in xrange(n):
t = threading.Thread(target = self.worker)
t.setDaemon(True)
t.start()

def worker(self):
"""A function of a working
thread.

It gets a request from queue (blocking if
there
are no requests) and processes
it.

After processing it checks how many spare
workers
are there now and if this value is greater
than
self.min_spare_workers then the worker
exits.
Otherwise it loops
infinitely.

"""
with self.workers_mutex:
self.num_workers += 1
while True:
(request, client_address) = self.q.get()
with self.workers_mutex:
self.num_busy_workers += 1
self.process_request_thread(request, client_address)
self.q.task_done()
with self.workers_mutex:
self.num_busy_workers -= 1
if self.num_workers - self.num_busy_workers > \
self.min_spare_workers:
self.num_workers -= 1
return

def process_request(self, request, client_address):
"""Puts a request into
queue.

If the queue size is too large, it adds extra
worker.

"""
self.q.put((request, client_address))
with self.workers_mutex:
if self.q.qsize() > 3 and self.num_workers <
self.max_workers:
self.start_workers(1)

def join(self):
"""Wait for all busy threads"""
self.q.join()
 
R

Rhamphoryncus

Hi, everybody!

I wrote a useful class ThreadPoolingMixIn which can be used to create
fast thread-based servers. This mix-in works much faster than
ThreadingMixIn because it doesn't create a new thread on each request.

Do you have any benchmarks demonstrating the performance difference/

t.setDaemon(True)

Unfortunately, shutdown with daemon threads is fairly buggy. :/
 
G

Giampaolo Rodola'

Hi, everybody!

I wrote a useful class ThreadPoolingMixIn which can be used to create
fast thread-based servers. This mix-in works much faster than
ThreadingMixIn because it doesn't create a new thread on each request.

Is it worth including in SocketServer.py?

from __future__ import with_statement
from SocketServer import ThreadingMixIn
import threading
import Queue
class ThreadPoolingMixIn(ThreadingMixIn):
    """Mix-in class to handle requests in a thread
pool.

    The pool grows and thrinks depending on
load.

    For instance, a threading UDP server class is created as
follows:

    class ThreadPoolingUDPServer(ThreadPoolingMixIn, UDPServer):
pass

    """
    __author__ = 'Pavel Uvarov <[email protected]>'

    def init_thread_pool(self, min_workers = 5,
                         max_workers = 100, min_spare_workers = 5):
        """Initialize thread pool."""
        self.q = Queue.Queue()
        self.min_workers = min_workers
        self.max_workers = max_workers
        self.min_spare_workers = min_spare_workers
        self.num_workers = 0
        self.num_busy_workers = 0
        self.workers_mutex = threading.Lock()
        self.start_workers(self.min_workers)

    def start_workers(self, n):
        """Start n workers."""
        for i in xrange(n):
            t = threading.Thread(target = self.worker)
            t.setDaemon(True)
            t.start()

    def worker(self):
        """A function of a working
thread.

        It gets a request from queue (blocking if
there
        are no requests) and processes
it.

        After processing it checks how many spare
workers
        are there now and if this value is greater
than
        self.min_spare_workers then the worker
exits.
        Otherwise it loops
infinitely.

        """
        with self.workers_mutex:
            self.num_workers += 1
        while True:
            (request, client_address) = self.q.get()
            with self.workers_mutex:
                self.num_busy_workers += 1
            self.process_request_thread(request, client_address)
            self.q.task_done()
            with self.workers_mutex:
                self.num_busy_workers -= 1
                if self.num_workers - self.num_busy_workers > \
                        self.min_spare_workers:
                    self.num_workers -= 1
                    return

    def process_request(self, request, client_address):
        """Puts a request into
queue.

        If the queue size is too large, it adds extra
worker.

        """
        self.q.put((request, client_address))
        with self.workers_mutex:
            if self.q.qsize() > 3 and self.num_workers <
self.max_workers:
                self.start_workers(1)

    def join(self):
        """Wait for all busy threads"""
        self.q.join()

This is not the right place to discuss about such a thing.
Post this same message on python-dev ml or, even better, open a new
ticket on the bug tracker attaching the patch and, most important, a
benchmark demonstrating the speed improvement.

--- Giampaolo
http://code.google.com/p/pyftpdlib/
 
R

Rhamphoryncus

This is not the right place to discuss about such a thing.
Post this same message on python-dev ml or, even better, open a new
ticket on the bug tracker attaching the patch and, most important, a
benchmark demonstrating the speed improvement.

It's perfectly acceptable to discuss ideas here before bringing them
up on python-ideas, and then python-dev.
 
P

pavel.uvarov

Do you have any benchmarks demonstrating the performance difference/

To benchmark this I used a simple tcp server which writes a small
(16k)
string to the client and closes the connection.

I started 100 remote clients and got 500 replies/s for ThreadingMixIn
and more than 1500 replies/s for ThreadPoolingMixIn. I tested it on
FreeBSD 6.2 amd64.

I'm very curious about the exactness of the number 500 for
ThreadingMixIn. It seems to be the same for various packet sizes.
I suspect there is some OS limit on thread creating rate.

Below I include a bugfixed ThreadPoolingMixIn and the benchmarking
utility. The utility can be used to start clients on localhost, though
the reply rate will be slower (around 1000 replies/s).

To start benchmarking server with localhost clients use:
python ./TestServer.py --server=threading --n-clients=100
or
python ./TestServer.py --server=threadpooling --n-clients=100


#------- ThreadPoolingMixIn.py
from __future__ import with_statement
from SocketServer import ThreadingMixIn
import threading
import Queue
class ThreadPoolingMixIn(ThreadingMixIn):
"""Mix-in class to handle requests in a thread pool.

The pool grows and thrinks depending on load.

For instance, a threadpooling TCP server class is created as
follows:

class ThreadPoolingUDPServer(ThreadPoolingMixIn, TCPServer): pass


"""
__author__ = 'Pavel Uvarov <[email protected]>'

def init_thread_pool(self, min_workers = 5,
max_workers = 100, min_spare_workers = 5):
"""Initialize thread pool."""
self.q = Queue.Queue()
self.min_workers = min_workers
self.max_workers = max_workers
self.min_spare_workers = min_spare_workers
self.num_workers = 0
self.num_busy_workers = 0
self.workers_mutex = threading.Lock()
self.start_workers(self.min_workers)

def start_workers(self, n):
"""Start n workers."""
for i in xrange(n):
t = threading.Thread(target = self.worker)
t.setDaemon(True)
t.start()

def worker(self):
"""A function of a working thread.

It gets a request from queue (blocking if there
are no requests) and processes it.

After processing it checks how many spare workers
are there now and if this value is greater than
self.min_spare_workers then the worker exits.
Otherwise it loops infinitely.

"""
with self.workers_mutex:
self.num_workers += 1
while True:
(request, client_address) = self.q.get()
with self.workers_mutex:
self.num_busy_workers += 1
self.process_request_thread(request, client_address)
self.q.task_done()
with self.workers_mutex:
self.num_busy_workers -= 1
if (self.num_workers > self.min_workers and
self.num_workers - self.num_busy_workers >
self.min_spare_workers):
self.num_workers -= 1
return

def process_request(self, request, client_address):
"""Puts a request into queue.

If the queue size is too large, it adds extra worker.

"""
self.q.put((request, client_address))
with self.workers_mutex:
if self.q.qsize() > 3 and self.num_workers <
self.max_workers:
self.start_workers(1)

def join(self):
"""Wait for all busy threads"""
self.q.join()

#------- TestServer.py
from __future__ import with_statement
from SocketServer import *
import socket
import sys
import threading
import time
import os
from ThreadPoolingMixIn import *

class ThreadPoolingTCPServer(ThreadPoolingMixIn, TCPServer): pass

class TestServer(ThreadingTCPServer):

allow_reuse_address = True
request_queue_size = 128

def __init__(self, server_address, RequestHandlerClass,
packet_size):
TCPServer.__init__(self, server_address, RequestHandlerClass)
self.packet_size = packet_size
self.sum_t = 0
self.total_num_requests = 0
self.num_requests = 0
self.t0 = time.time()
self.lock = threading.Lock()

def reset_stats(self):
with self.lock:
self.total_num_requests += self.num_requests
self.num_requests = 0
self.sum_t = 0
self.t0 = time.time()

def update_stats(self, t0, t1):
with self.lock:
self.num_requests += 1
self.sum_t += t1 - t0
n = self.num_requests
sum_t = self.sum_t
avg_t = sum_t / n
rate = n / (t1 - self.t0)
return (n, avg_t, rate)

def handle_request(self):
"""Handle one request, possibly blocking."""
try:
request, client_address = self.get_request()
except KeyboardInterrupt:
raise
except socket.error:
return
if self.verify_request(request, client_address):
try:
self.process_request(request, client_address)
except KeyboardInterrupt:
raise
except:
self.handle_error(request, client_address)
self.close_request(request)

class TestServerThreadPool(ThreadPoolingMixIn,TestServer):
def __init__(self, server_address, RequestHandlerClass,
packet_size):
TestServer.__init__(self, server_address, RequestHandlerClass,
packet_size)
self.init_thread_pool(2, 200, 20)

class TestRequestHandler(StreamRequestHandler):

def __init__(self, request, client_address, server):
self.t0 = time.time()
StreamRequestHandler.__init__(self, request,
client_address, server)

def handle(self):
self.wfile.write('a'*(self.server.packet_size))

t1 = time.time()
(n, avg_t, rate) = self.server.update_stats(self.t0, t1)
if n % 10000 == 0:
print('rate=%.2f ' % rate)
self.server.reset_stats()

from optparse import OptionParser

def server(o):
HandlerClass = TestRequestHandler

if o.server == "threading":
ServerClass = TestServer
elif o.server == "threadpooling":
ServerClass = TestServerThreadPool
else:
return

server_address = ('', o.port)
try:
srv = ServerClass(server_address, HandlerClass,
o.packet_size)

sa = srv.socket.getsockname()
print "Serving on", sa[0], "port", sa[1], "..."
srv.serve_forever()
except Exception, val:
print "Exception: %s" % str(val)
raise

def client(o):
for f in xrange(0,o.n_clients):
if os.fork():
while True:
try:
sock = socket.socket(socket.AF_INET,
socket.SOCK_STREAM)
sock.connect(("localhost",o.port))
while len(sock.recv(4096)):
pass
sock.close()
except Exception, val:
print val
time.sleep(1)

if __name__ == '__main__':
args = sys.argv[1:]
usage = "usage: %prog [options]"
parser = OptionParser(usage)
parser.add_option( "-p", "--port", help="Server port",
type="int", default=8123 )
parser.add_option( "", "--n-clients", help="Number of client
forks",
type="int", default=0 )
parser.add_option( "", "--server",
help="Type of the server (threading or
threadpooling)",
type="string", default="" )
parser.add_option( "", "--packet-size", help="Packet size",
type="int", default=16*1024 )
(o,a) = parser.parse_args(args)

if os.fork() == 0:
server(o)
else:
client(o)
 
M

Michael Ströder

To benchmark this I used a simple tcp server which writes a small
(16k)
string to the client and closes the connection.

Just a general note: When benchmarking such a network service it would
be valuable to see benchmark results for several data sizes. I'd expect
better numbers for a ThreadPoolingMixIn when there are more requests
with smaller data size.

Ciao, Michael.
 
P

pavel.uvarov

To benchmark this I used a simple tcp server which writes a small
(16k)
string to the client and closes the connection.

I started 100 remote clients and got 500 replies/s for ThreadingMixIn
and more than 1500 replies/s for ThreadPoolingMixIn. I tested it on
FreeBSD 6.2 amd64.

I'm very curious about the exactness of the number 500 for
ThreadingMixIn. It seems to be the same for various packet sizes.
I suspect there is some OS limit on thread creating rate.

Below I include a bugfixed ThreadPoolingMixIn and the benchmarking
utility. The utility can be used to start clients on localhost, though
the reply rate will be slower (around 1000 replies/s).

To start benchmarking server with localhost clients use:
python ./TestServer.py --server=threading --n-clients=100
or
python ./TestServer.py --server=threadpooling --n-clients=100

I've just tested it on a linux box and got a 240 replies/s vs 2000
replies/s, that is 8x performance improvement.
 
P

pavel.uvarov

Just a general note: When benchmarking such a network service it would
be valuable to see benchmark results for several data sizes. I'd expect
better numbers for a ThreadPoolingMixIn when there are more requests
with smaller data size.

Ciao, Michael.

Here are benchmarks for FreeBSD 6.2, amd64

packet_size x y
0 499.57 1114.54
1024 499.29 1130.02
3072 500.09 1119.14
7168 498.20 1111.76
15360 499.29 1086.73
31744 500.04 1036.46
64512 499.43 939.60
130048 499.28 737.44
261120 498.04 499.03
523264 307.54 312.04
1047552 173.57 185.32
2096128 93.61 94.39

x = ThreadingMixIn replies/s
y = ThreadPoolingMixIn replies/s
 
M

miller.paul.w

On Jun 2, 7:15 pm, Michael Ströder <[email protected]> wrote:
Here are benchmarks for FreeBSD 6.2, amd64

packet_size         x         y
          0    499.57   1114.54
       1024    499.29   1130.02
       3072    500.09   1119.14
       7168    498.20   1111.76
      15360    499.29   1086.73
      31744    500.04   1036.46
      64512    499.43    939.60
     130048    499.28    737.44
     261120    498.04    499.03
     523264    307.54    312.04
    1047552    173.57    185.32
    2096128     93.61     94.39

x = ThreadingMixIn replies/s
y = ThreadPoolingMixIn replies/s

Well, I'd say you've got yourself a winner. Performance (at least on
FreeBSD) seems as good or better for your ThreadPoolingMixin than
ThreadingMixin. Is this with the default values of min=5 and max=5
worker threads?
 
P

pavel.uvarov

Well, I'd say you've got yourself a winner. Performance (at least on
FreeBSD) seems as good or better for your ThreadPoolingMixin than
ThreadingMixin. Is this with the default values of min=5 and max=5
worker threads?

No, I initialized thread pool with min_threads=2, max_threads=200 and
min_spare_threads=20.

For Linux (2.6.22, amd64) I got even more dramatic improvement:

packet_size x y
0 249.97 2014.63
1024 249.98 1782.83
3072 240.09 1859.00
7168 249.98 1900.61
15360 249.98 1787.30
31744 238.96 1808.17
64512 249.85 1561.47
130048 237.26 1213.26
261120 249.98 841.96
523264 249.97 595.40
1047552 236.40 351.96
2096128 216.26 218.15

x = ThreadingMixIn replies/s
y = ThreadPoolingMixIn replies/s
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

No members online now.

Forum statistics

Threads
473,995
Messages
2,570,236
Members
46,825
Latest member
VernonQuy6

Latest Threads

Top