This will create the configured number of CONSUMERS
and PRODUCERS
then start pushing messages as fast as it can.
you will have to use the command sudo killall python -9
to stop the process.
#!/usr/bin/env python | |
from boto import sqs | |
from boto.sqs.message import Message | |
import threading, logging, time | |
CONSUMERS = 30 | |
PRODUCERS = 5 | |
MAX_MESAGES = 10 | |
class Counter(threading.Thread): | |
daemon = True | |
name = None | |
interval = 1 | |
count = 0 | |
total = 0 | |
lock = threading.Lock() | |
def __init__(self, *args, **kwargs): | |
self.name = kwargs.get('name', None) | |
super(Counter, self).__init__(*args, **kwargs) | |
def increment(self): | |
with self.lock: | |
self.count += 1 | |
self.total += 1 | |
def value(self): | |
with self.lock: | |
return self.count | |
def run(self): | |
start = time.time() | |
time.clock() | |
elapsed = 0 | |
while elapsed < self.interval: | |
elapsed = time.time() - start | |
print('timed:%s:%s'% (self.name, self.count, self.total)) | |
self.count = 0 | |
time.sleep(1) | |
self.run() | |
class Producer(threading.Thread): | |
daemon = True | |
count = 0 | |
start_time = time.time() | |
def __init__(self, *args, **kwargs): | |
# keys should be in your boto config | |
self.conn = sqs.connect_to_region("us-west-2") | |
self.queue = self.conn.create_queue('test_test_test_deleteme') | |
super(Producer, self).__init__(*args, **kwargs) | |
def message(self, x): | |
self.count = self.count + 1 | |
message = Message() | |
message.set_body('message:%s:%s' % (self.count, x)) | |
return message | |
def bulkMessage(self, x): | |
self.count = self.count + 1 | |
return ('%s%s'%(self.count, x), 'message:%s:%s'%(self.count, x), 0) | |
def run(self): | |
messages = [(self.bulkMessage(i)) for i in range(0,MAX_MESAGES)] | |
self.queue.write_batch(messages) | |
end_time = time.time() | |
self.run() | |
class Consumer(threading.Thread): | |
daemon = True | |
start_time = time.time() | |
def __init__(self, counter, *args, **kwargs): | |
# keys should be in your boto config | |
self.counter = counter | |
self.conn = sqs.connect_to_region("us-west-2") | |
self.queue = self.conn.create_queue('test_test_test_deleteme') | |
super(Consumer, self).__init__(*args, **kwargs) | |
def run(self): | |
messages = self.queue.get_messages( | |
num_messages=MAX_MESAGES, | |
visibility_timeout=10 | |
) | |
for message in messages or []: | |
if not message: | |
return | |
print(message.get_body()) | |
self.counter.increment() | |
if messages: | |
self.queue.delete_message_batch(messages) | |
print(self.counter.value()) | |
self.run(); | |
def main(): | |
counter = Counter() | |
threads = [counter] | |
[threads.append(Producer()) for i in range(0,PRODUCERS)] | |
[threads.append(Consumer(counter)) for i in range(0,CONSUMERS)] | |
for t in threads: t.start() | |
time.sleep(100) | |
if __name__ == "__main__": | |
main() |