Donnerstag, 31. Dezember 2015

[Python] From Threads to ThreadPool to Gevent

A system test I wrote does many requests on a local server. The responses take from some to many seconds to arrive. IO-bound. I thought I should use threads for each request. All requests could start at the same time and when the results are in they could be evaluated. 
It worked. Lots of threads were created, did their work and died.
To many threads - one would say.
A threadpool should be a better way...


The first thing I learned was: there are no threadpools in Python 2.7
I wondered because I used threadpools in Python. But those were the ones in Python 3. There is a pool in multiprocessing, but have fun exchanging data that way. Or sharing code and states. A short try resulted in errors saying that some classes could not be found. Why? Because it was defined later then the processes were created. So I would have needed to create the processpool much later. Possible, but not what I wanted. I didn't even want processes. Why should I use processes to do some little requests in parallel?

A threadpool it should be. And because there weren't want in standard Python I thought of creating one myself.
A threadpool creates a number of threads in the beginning and destroys them in the end. Work is given to the threadpool from where the threads are fed. In my case there is a queue where work in form of a function is put in. The worker-threads are waiting via a 'get' on the queue for work. The queue itself is threadsafe, so only one worker will get the work.
Results from work or info about being done with a work is done via another queue and an Event-object. In the threadpool a thread doesn't finish and won't return its result by itself. Another queue is needed for that. As noone knows when work is done, results can come in in a different order then it is given out. For that a simple counter will give a number to each work. The id will be given to the thread and return from the thread with the result. The threadpool then needs to put back the result to the request and give that to the caller.
The threading.Event object is extended to help with that. When work is coming to the threadpool a new Event is put in a dict with a unique number. The number for identification of in-coming work and outgoing results. The event is returned. It's wait-method is changed so it waits for the result and returns it.
To handle the result-events there is another thread. It looks for results and sets the corresponding event.
Because all threads are waiting on queues on a shutdown of the threadpool each thread needs to get notified, which is done with a special package that is put for each thread in the queues.


The code looks like this:
import threading
import Queue
import itertools


class ThreadEnd(object):
    pass

tid = itertools.count()

class Thread(threading.Thread):

    def __init__(self, threadpool_id, queue_in, queue_out):
        self.tpid = threadpool_id
        self.tid = next(tid)
        print("[{}] ({}) New thread".format(self.tpid, self.tid))
        self.queue_in = queue_in
        self.queue_out = queue_out
        super(Thread, self).__init__()

    def run(self):
        print("[{}] ({}) Run thread".format(self.tpid, self.tid))
        while True:
            print("[{}] ({})Get thread in".format(self.tpid, self.tid))
            request_id, item = self.queue_in.get()
            print("[{}] ({}) Get thread out".format(self.tpid, self.tid))
            if isinstance(item, ThreadEnd):
                print("[{}] ({}) Put thread in 1".format(self.tpid, self.tid))
                self.queue_out.put((request_id, None))
                print("[{}] ({}) Put thread out 1".format(self.tpid, self.tid))
                return
            else:
                print("[{}] ({}) In Thread: {} {}".format(self.tpid, self.tid, item, item))
                result = item()
                print("[{}] ({}) Put thread in 2".format(self.tpid, self.tid))
                self.queue_out.put((request_id, result))
                print("[{}] ({}) Put thread out 2".format(self.tpid, self.tid))

eid = itertools.count()

class Event(object):

    def __init__(self, tpid):
        self.tpid = tpid
        self.eid = next(eid)
        print("[{}] {{{}}} New Event".format(self.tpid, self.eid))
        self.result = None
        self.event = threading.Event()

    def set(self, result):
        print("[{}] {{{}}} Set Event: {}".format(self.tpid, self.eid, result))
        self.result = result
        self.event.set()

    def wait(self):
        print("[{}] {{{}}} Wait Event".format(self.tpid, self.eid))
        self.event.wait()
        return self.result


tpid = itertools.count()

class ThreadPool(object):

    def __init__(self, num_threads):
        self.tpid = next(tpid)
        self.num_threads = num_threads
        print("[{}] ThreadPool with {} threads.".format(self.tpid, self.num_threads))
        self.queue_in = Queue.LifoQueue()
        self.queue_out = Queue.Queue()
        self.ids = itertools.count()
        self.results = {}
        self.stopping = False
        self.threads = [Thread(self.tpid, self.queue_in, self.queue_out) for _ in range(self.num_threads)]
        [thread.start() for thread in self.threads]

        self.fetcher = threading.Thread(target=self.fetch_results)
        self.fetcher.start()

    def stop(self):
        print("[{}] Stopping threadpool...".format(self.tpid))
        self.stopping = True
        [self.queue_in.put((-1, ThreadEnd())) for _ in self.threads]
        [thread.join() for thread in self.threads]
        self.fetcher.join()

    def workon(self, function):
        print("[{}] Threadpool: workon {}".format(self.tpid, function))
        request_id = next(self.ids)
        self.results[request_id] = Event(self.tpid)
        self.queue_in.put((request_id, function))
        return self.results[request_id]

    def fetch_results(self):
        print("[{}] Fetching in threadpool...".format(self.tpid))
        while not self.stopping or not self.queue_out.empty():
            print("[{}] Fetching in".format(self.tpid))
            request_id, result = self.queue_out.get()
            print("[{}] Fetching out".format(self.tpid, request_id))
            print("[{}] stopping: {} ; empty: {}".format(self.tpid, self.stopping, self.queue_out.empty()))
            self.results.pop(request_id, Event(self.tpid)).set(result)


threadpool = ThreadPool(8)
threads = [threadpool.workon(f) for f in functions]
[thread.wait() for thread in threads]
threadpool.stop()

It worked!

Kind of.

When I tried to use one threadpool and put all work in that I overlooked, that some work was created inside some threads. And when they decided to wait for the results, the threads itself would wait. And nothing happened anymore. With a threadpool for each layer that problem was solved. But not as nice as I would like it to be. 
E.g.
  • using multiple threadpools
  • having to define pool sizes
  • having a maximum number of threads
  • GIL
Stackless, greenlet, "Microthreads" - should work in this situation much better, I heard/read. Gevent seemed to be the one.

Installing it via pip was easy. 
Using it, too:
import gevent
import gevent.monkey

gevent.monkey.patch_all()

jobs = [gevent.spawn(f) for f in funcs]
gevent.wait(jobs)
# threads = [threadpool.workon(f) for f in funcs]
# [thread.wait() for thread in threads]
Easy.
Right?

It's not a solution if you need real threads and the horsepower of all cores, but nice and easy if you don't want to wait one each IO-bound operation.