Created
July 13, 2017 23:53
-
-
Save jmelloy/bb01ac422f947a88d06c9983077a0008 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def test_handler(self): | |
... | |
with patch.dict("os.environ", {"jobs": self.queue_name}): | |
with patch("clumpy.google.service", service): | |
job_check.handler({}, FakeContext(10)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def test_handler(self): | |
sqs = boto3.resource("sqs", "us-west-2") | |
job_queue = sqs.get_queue_by_name(QueueName=self.queue_name) | |
job_queue.send_message( | |
MessageBody=json.dumps(self.data) | |
) | |
bq_http = HttpMockSequence([ | |
({'status': '200'}, self.bigquery_discovery), | |
({'status': '200'}, self.bigquery_done), ]) | |
cs_http = HttpMockSequence([ | |
({'status': '200'}, self.storage_discovery), | |
({'status': '200'}, ''), ]) | |
def service(name, scope=None, v=None): | |
print(name) | |
if name == "bigquery": | |
return build("bigquery", "v2", http=bq_http) | |
elif name == "storage": | |
return build("storage", "v1", http=cs_http) | |
with patch.dict("os.environ", {"jobs": self.queue_name}): | |
with patch("google.service", service): | |
job_check.handler({}, FakeContext(10)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment