Created
April 27, 2022 22:02
-
-
Save diva-D/5445e03b207671c3878093984d237e0c to your computer and use it in GitHub Desktop.
This example also shows how to use it for the "Flow" framework to replace the automatically created database instance with the mock one
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import boto3 | |
import pytest | |
from moto import mock_dynamodb2 | |
from flow import Flow, FlowDynamoDB | |
flow = Flow(name="test", database=FlowDynamoDB()) | |
def create_table(): | |
# set up flow so that the database is the mocked version | |
# has to come in the decorated function so the mock is set up | |
flow = Flow(name="test", database=FlowDynamoDB()) | |
# set up this new flow on the module | |
product_flow.flow = flow | |
# Get just the flow worker function | |
dynamodb = boto3.resource("dynamodb", region_name="ap-southeast-2") | |
table = dynamodb.create_table( | |
TableName=os.getenv("DYNAMODB_TABLE_NAME"), | |
KeySchema=[ | |
{"AttributeName": "_id", "KeyType": "HASH"}, | |
{"AttributeName": "_sort", "KeyType": "RANGE"}, | |
], | |
AttributeDefinitions=[ | |
{"AttributeName": "_id", "AttributeType": "S"}, | |
{"AttributeName": "_sort", "AttributeType": "S"}, | |
{"AttributeName": "_flowSortIndex", "AttributeType": "S"}, | |
], | |
GlobalSecondaryIndexes=[ | |
{ | |
"IndexName": "_flowSortIndex", | |
"KeySchema": [ | |
{"AttributeName": "_flowSortIndex", "KeyType": "HASH"}, | |
], | |
"Projection": {"ProjectionType": "ALL"}, | |
} | |
], | |
ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5}, | |
) | |
return table | |
# this decorator makes sure we're mocking | |
@mock_dynamodb2 | |
def test_dynamodb(): | |
table = create_table() | |
... |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment