-
-
Save everilae/9697228 to your computer and use it in GitHub Desktop.
# A simple generator wrapper, not sure if it's good for anything at all. | |
# With basic python threading | |
from threading import Thread | |
try: | |
from queue import Queue | |
except ImportError: | |
from Queue import Queue | |
# ... or use multiprocessing versions | |
# WARNING: use sentinel based on value, not identity | |
from multiprocessing import Process, Queue as MpQueue | |
class ThreadedGenerator(object): | |
""" | |
Generator that runs on a separate thread, returning values to calling | |
thread. Care must be taken that the iterator does not mutate any shared | |
variables referenced in the calling thread. | |
""" | |
def __init__(self, iterator, | |
sentinel=object(), | |
queue_maxsize=0, | |
daemon=False, | |
Thread=Thread, | |
Queue=Queue): | |
self._iterator = iterator | |
self._sentinel = sentinel | |
self._queue = Queue(maxsize=queue_maxsize) | |
self._thread = Thread( | |
name=repr(iterator), | |
target=self._run | |
) | |
self._thread.daemon = daemon | |
def __repr__(self): | |
return 'ThreadedGenerator({!r})'.format(self._iterator) | |
def _run(self): | |
try: | |
for value in self._iterator: | |
self._queue.put(value) | |
finally: | |
self._queue.put(self._sentinel) | |
def __iter__(self): | |
self._thread.start() | |
for value in iter(self._queue.get, self._sentinel): | |
yield value | |
self._thread.join() |
Hi, I'm new to Python multiprocessing. Please, can you provide some example of how this code can be used?
Thank you!
This is so beautiful ;-;
This is super useful :)
This is useful, but will have problem for e.g. infinite iterators, queue_maxsize>0, or other scenarios in which you don't exhaust the iterator.
For example, for this code:
print(list(zip(range(6), ThreadedGenerator(range(1000000), queue_maxsize=1))))
This will never halt, since the thread is still running. This also (sort of) happens if you set queue_maxsize=0, it will try to exhaust the iterator.
The main problem is that for this kind of cases where we just stop iterating in the middle, the thread wouldn't be closed. But I use this code to zip 2 generators, so if my 2 generators don't have exact same length, I will be in trouble, which in my use case is a bit troublesome to deal with.
I don't think there is a simple solution for this, since for the above code, __del__
won't be called so you don't know if this kind of scenario occurs. Stoppable thread is another part of implementation that needs (copy & paste) effort.
So TL;DR:
- Don't use this on infinite iterator. The thread will never join, and if queue_maxsize=0, you might have some memory problem.
- Try to guarentee that you will eventually exhaust the iterator, or the thread will never join.
- Try to make sure you would use every item in the iterator, since for queue_maxsize=0, the queue will be stuffed with the stuff you never use, EVEN IF you set queue_maxsize>0, the thread never joins, it itself is a big enough problem for some scenarios.
Hi @everilae - I know this gist is quite a few years old, but I still wanted to thank you for sharing it! I was working on something similar when I found this gist, and it was helpful to see what you had done. In case you are interested I have just posted mine here.
@Kaiserouo - I found your comments very useful. I was already keen to make things as watertight as possible, especially if consuming from the generator in a way that might not exhaust it. Because the code cannot know how many values will be consumed, the only way is to provide a close method - preferebly with the class acting as a context manager to invoke it.
My version would therefore write your example as
with ThreadedGenerator(range(1000000), maxq=1) as tg:
print(list(zip(range(6), tg)))
Yes, extremely useful. This nullifies my customer's performance questions on day one!
Nice work!