-
-
Save topiaruss/264015387b69f867c4bf to your computer and use it in GitHub Desktop.
[pytest] | |
python_paths = src/bolts src/spouts | |
testpaths = src/tests |
pytest==2.8.7 | |
pytest-mock==0.10.1 | |
pytest-pythonpath==0.7 |
import json | |
from io import BytesIO | |
from mock import call | |
from streamparse.storm import Tuple, Bolt | |
from wordcount import WordCounter | |
def prep_tup(id, comp, stream, task, tuple): | |
""" | |
Prepare a test tuple and a terminated json equivalent | |
""" | |
tup_dict = dict(id=id, comp=comp, stream=stream, task=task, tuple=tuple) | |
tup_json = "{}\nend\n".format(json.dumps(tup_dict)).encode('utf-8') | |
tup = Tuple(tup_dict['id'], tup_dict['comp'], | |
tup_dict['stream'], tup_dict['task'], | |
tup_dict['tuple'],) | |
return tup, tup_json | |
def test_wordcount_basic(mocker): | |
mocker.patch.object(Bolt, 'ack', autospec=True) | |
mocker.patch.object(Bolt, 'read_task_ids', autospec=True) | |
mocker.patch.object(Bolt, 'send_message', autospec=True) | |
tup, tupj = prep_tup(14, 'word_spout', 'words', 'some_bolt', ['test_word']) | |
bolt = WordCounter(input_stream=BytesIO(tupj), | |
output_stream=BytesIO()) | |
bolt.initialize({}, {}) | |
bolt._run() | |
assert Bolt.send_message.call_args_list[0] == \ | |
call(bolt, {u'command': u'emit', | |
u'anchors': [tup.id], | |
u'tuple': ['test_word', 1]}) | |
assert Bolt.send_message.call_args_list[1] == \ | |
call(bolt, {u'msg': 'test_word: 1', | |
u'command': u'log', | |
u'level': 2}) |
import json | |
from io import BytesIO | |
from mock import call | |
from streamparse.storm import Spout | |
from words import WordSpout | |
def prep_spout_tup(command): | |
""" | |
Prepare a test tuple and a terminated json equivalent | |
""" | |
tup_dict = dict(command=command) | |
tup_json = "{}\nend\n".format(json.dumps(tup_dict)).encode('utf-8') | |
return tup_json | |
def test_words_spout_basic(mocker): | |
mocker.patch.object(Spout, 'ack', autospec=True) | |
mocker.patch.object(Spout, 'emit', autospec=True) | |
mocker.patch.object(Spout, 'emit_many', autospec=True) | |
mocker.patch.object(Spout, 'send_message', autospec=True) | |
tupj = prep_spout_tup('next') | |
spout = WordSpout(input_stream=BytesIO(tupj), output_stream=BytesIO()) | |
spout.initialize({}, {}) | |
spout._run() | |
assert Spout.send_message.call_args_list[0] == \ | |
call(spout, {u'command': u'sync'}) | |
assert Spout.emit.call_args_list[0] == \ | |
call(spout, [u'dog']) |
This is a really nice idea. Thanks for sharing, @topiaruss
@amontalenti Most welcome. I encourage exchange of ideas about testing patterns for SP. Can you suggest a better location for that?
Added example for testing a spout
nice,
Wont Tupe(**tupl_dict)
streamline that generator a bit though?
@mathster, not that helpful. Tuple does not want kv pairs, and order is significant, so .values() breaks.
For anyone still watching this, I'm creating a pytest-based test module for each Component (Bolt or Spout), and emphasising testing of emits, rather than send_messages, which takes a lot of time to maintain, if logging changes.
I'm finding it really useful for testing stream names, and also exercising different branches of process() methods, when multiple streams are configured in a Bolt. I can produce 100% test coverage really easily, and pdb right into logic that I'm developing.
Adding @kbourgoin and @dan-blanchard here so they can see what you're up to.
This is really nice. I've been wanting to put something together to help people with testing but just hadn't gotten around to it yet. We'll probably want to add this to pystorm instead of streamparse in the end.
@topiaruss Have you done more on this. I'm interested!
pip install -r requirements-test.txt
py.test