Skip to content

Instantly share code, notes, and snippets.

@cerdman
Created October 21, 2016 05:14
Show Gist options
  • Save cerdman/95f811d85917608cc39ac8c372e1c3a2 to your computer and use it in GitHub Desktop.
Save cerdman/95f811d85917608cc39ac8c372e1c3a2 to your computer and use it in GitHub Desktop.
BOTO: threaded SQS connections

BOTO threaded SQS consumer

This will create the configured number of CONSUMERS and PRODUCERS then start pushing messages as fast as it can.

you will have to use the command sudo killall python -9 to stop the process.

#!/usr/bin/env python
from boto import sqs
from boto.sqs.message import Message
import threading, logging, time
CONSUMERS = 30
PRODUCERS = 5
MAX_MESAGES = 10
class Counter(threading.Thread):
daemon = True
name = None
interval = 1
count = 0
total = 0
lock = threading.Lock()
def __init__(self, *args, **kwargs):
self.name = kwargs.get('name', None)
super(Counter, self).__init__(*args, **kwargs)
def increment(self):
with self.lock:
self.count += 1
self.total += 1
def value(self):
with self.lock:
return self.count
def run(self):
start = time.time()
time.clock()
elapsed = 0
while elapsed < self.interval:
elapsed = time.time() - start
print('timed:%s:%s'% (self.name, self.count, self.total))
self.count = 0
time.sleep(1)
self.run()
class Producer(threading.Thread):
daemon = True
count = 0
start_time = time.time()
def __init__(self, *args, **kwargs):
# keys should be in your boto config
self.conn = sqs.connect_to_region("us-west-2")
self.queue = self.conn.create_queue('test_test_test_deleteme')
super(Producer, self).__init__(*args, **kwargs)
def message(self, x):
self.count = self.count + 1
message = Message()
message.set_body('message:%s:%s' % (self.count, x))
return message
def bulkMessage(self, x):
self.count = self.count + 1
return ('%s%s'%(self.count, x), 'message:%s:%s'%(self.count, x), 0)
def run(self):
messages = [(self.bulkMessage(i)) for i in range(0,MAX_MESAGES)]
self.queue.write_batch(messages)
end_time = time.time()
self.run()
class Consumer(threading.Thread):
daemon = True
start_time = time.time()
def __init__(self, counter, *args, **kwargs):
# keys should be in your boto config
self.counter = counter
self.conn = sqs.connect_to_region("us-west-2")
self.queue = self.conn.create_queue('test_test_test_deleteme')
super(Consumer, self).__init__(*args, **kwargs)
def run(self):
messages = self.queue.get_messages(
num_messages=MAX_MESAGES,
visibility_timeout=10
)
for message in messages or []:
if not message:
return
print(message.get_body())
self.counter.increment()
if messages:
self.queue.delete_message_batch(messages)
print(self.counter.value())
self.run();
def main():
counter = Counter()
threads = [counter]
[threads.append(Producer()) for i in range(0,PRODUCERS)]
[threads.append(Consumer(counter)) for i in range(0,CONSUMERS)]
for t in threads: t.start()
time.sleep(100)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment