Skip to content

Instantly share code, notes, and snippets.

@jhw
Last active July 25, 2024 10:04
Show Gist options
  • Save jhw/4618ef04563d248d57f41eae18a55626 to your computer and use it in GitHub Desktop.
Save jhw/4618ef04563d248d57f41eae18a55626 to your computer and use it in GitHub Desktop.
DynamoDB to EventBridge
env
*.pyc
__pycache__
tmp
AppName=dynamodb-eventbridge-pipes
from boto3.dynamodb.conditions import Key
import boto3, sys
def load_props():
return dict([tuple(row.split("="))
for row in open("app.props").read().split("\n")
if row!=''])
def fetch_outputs(cf, stackname):
outputs = {}
for stack in cf.describe_stacks()["Stacks"]:
if (stack["StackName"]==stackname and
"Outputs" in stack):
for output in stack["Outputs"]:
outputs[output["OutputKey"]] = output["OutputValue"]
return outputs
def fetch_results(table, leaguename):
pkq=Key("pk").eq(f"LEAGUE#{leaguename}")
skq=Key("sk").begins_with("RESULT")
resp=table.query(KeyConditionExpression=pkq & skq)
return resp["Items"] if "Items" in resp else []
if __name__ == "__main__":
try:
if len(sys.argv) < 2:
raise RuntimeError("please enter league name")
leaguename=sys.argv[1]
props=load_props()
cf=boto3.client("cloudformation")
outputs=fetch_outputs(cf, props["AppName"])
tablename=outputs["AppTable"]
table=boto3.resource("dynamodb").Table(tablename)
results=fetch_results(table, leaguename)
for result in results:
print (table.delete_item(Key={"pk": result["pk"],
"sk": result["sk"]}))
except RuntimeError as error:
print ("Error: %s" % str(error))
#!/usr/bin/env bash
. app.props
aws cloudformation delete-stack --stack-name $AppName
#!/usr/bin/env bash
. app.props
aws cloudformation deploy \
--stack-name $AppName \
--template-file stack.json \
--capabilities CAPABILITY_NAMED_IAM
from botocore.exceptions import ClientError
import boto3, os, re, sys, time
def fetch_functions(cf, stack_name):
functions, token = {}, None
while True:
kwargs = {"StackName": stack_name}
if token:
kwargs["NextToken"] = token
resp = cf.list_stack_resources(**kwargs)
for resource in resp["StackResourceSummaries"]:
if resource["ResourceType"] == "AWS::Lambda::Function":
functions[resource["LogicalResourceId"]] = resource["PhysicalResourceId"]
if "NextToken" in resp:
token = resp["NextToken"]
else:
break
return functions
def fetch_log_events(logs, kwargs):
events, token = [], None
while True:
if token:
kwargs["nextToken"] = token
resp = logs.filter_log_events(**kwargs)
events += resp["events"]
if "nextToken" in resp:
token = resp["nextToken"]
else:
break
return sorted(events,
key = lambda x: x["timestamp"])
if __name__ == "__main__":
try:
props =dict([tuple(row.split("="))
for row in open("app.props").read().split("\n")
if row != ''])
stack_name = props["AppName"]
if len(sys.argv) < 3:
raise RuntimeError("please enter function logical id, window")
logical_id, window = sys.argv[1:3]
if not re.search("^\\d+$", window):
raise RuntimeError("window is invalid")
window = int(window)
cf, logs = boto3.client("cloudformation"), boto3.client("logs")
functions = fetch_functions(cf, stack_name)
start_time = int(1000*(time.time()-window))
if logical_id not in functions:
raise RuntimeError("logical id not found")
physical_id = functions[logical_id]
log_group_name = "/aws/lambda/%s" % physical_id
kwargs = {"logGroupName": log_group_name,
"startTime": start_time,
"interleaved": True}
events = fetch_log_events(logs, kwargs)
for event in events:
msg = re.sub("\\r|\\n", "", event["message"])
print (msg)
except RuntimeError as error:
print ("Error: %s" % str(error))
except ClientError as error:
print ("Error: %s" % str(error))
import json
if __name__ == "__main__":
struct=json.loads(open("stack.json").read())
struct["Resources"]["AppEnrichmentFunction"]["Properties"]["Code"]["ZipFile"]=open("enrichment.py").read()
with open("stack.json", 'w') as f:
f.write(json.dumps(struct, indent=2))
import json
class Key:
def __init__(self, pk, sk, event_name, diff_keys):
self.pk = pk
self.sk = sk
self.event_name = event_name
self.diff_keys = diff_keys
def __str__(self):
return "%s/%s/%s/%s" % (self.pk,
self.sk,
self.event_name,
"|".join(self.diff_keys))
"""
- EventBridge required fields are Source, DetailType, Detail
- record["eventName"] is used as DetailType
- eventName could be INSERT, MODIFY, DELETE
"""
class Entry:
def __init__(self, key, records, source):
self.key = key
self.records = records
self.source = source
@property
def entry(self):
detail = {"pk": self.key.pk,
"sk": self.key.sk,
"eventName": self.key.event_name,
"diffKeys": self.key.diff_keys,
"records": self.records}
detail_type = self.key.event_name
return {"Source": self.source,
"DetailType": detail_type,
"Detail": json.dumps(detail)}
"""
- keys are diff'ed because it's useful to be able to match on changed attributes if a record is being updated one attr at a time, and change on one attr triggers change in another
"""
def diff_keys(record):
if not ("NewImage" in record["dynamodb"] and
"OldImage" in record["dynamodb"]):
return []
new_image={k: list(v.values())[0]
for k, v in record["dynamodb"]["NewImage"].items()}
old_image={k: list(v.values())[0]
for k, v in record["dynamodb"]["OldImage"].items()}
diff_keys=[]
for k in new_image:
if (k not in old_image or
new_image[k] != old_image[k]):
diff_keys.append(k)
return sorted(diff_keys) # NB sort
def batch_records(records):
keys, groups = {}, {}
for record in records:
pk = record["dynamodb"]["Keys"]["pk"]["S"]
sk = record["dynamodb"]["Keys"]["sk"]["S"].split("#")[0]
event_name = record["eventName"]
diffed_keys = diff_keys(record)
key = Key(pk = pk,
sk = sk,
event_name = event_name,
diff_keys = diffed_keys)
strkey = str(key)
if strkey not in keys:
keys[strkey] = key
groups.setdefault(strkey, [])
groups[strkey].append(record)
return [(key, groups[strkey])
for strkey, key in keys.items()]
def handler(event, context=None):
print ("IN: %s" % json.dumps(event))
source = context.function_name
# groups = batch_records(event["Records"])
groups = batch_records(event)
entries = [Entry(k, v, source).entry for k, v in groups]
print ("OUT: %s" % json.dumps(entries))
return entries
#!/usr/bin/env bash
. app.props
aws cloudformation describe-stack-events --stack-name $AppName --query "StackEvents[].{\"1.Timestamp\":Timestamp,\"2.Id\":LogicalResourceId,\"3.Type\":ResourceType,\"4.Status\":ResourceStatus,\"5.Reason\":ResourceStatusReason}"
#!/usr/bin/env bash
. app.props
aws cloudformation describe-stacks --stack-name $AppName --query 'Stacks[0].Outputs' --output table
#!/usr/bin/env bash
. app.props
aws cloudformation describe-stack-resources --stack-name $AppName --query "StackResources[].{\"1.Timestamp\":Timestamp,\"2.LogicalId\":LogicalResourceId,\"3.PhysicalId\":PhysicalResourceId,\"4.Type\":ResourceType,\"5.Status\":ResourceStatus}"
#!/usr/bin/env bash
aws cloudformation describe-stacks --query "Stacks[].{\"1.Name\":StackName,\"2.Status\":StackStatus}"

demo

  • deploy stack
  • push results
  • python dump_logs.py DemoFunction 60
  • put events
  • python dump_logs.py DemoFunction 60
import boto3, yaml
def load_props():
return dict([tuple(row.split("="))
for row in open("app.props").read().split("\n")
if row!=''])
def fetch_outputs(cf, stackname):
outputs = {}
for stack in cf.describe_stacks()["Stacks"]:
if (stack["StackName"]==stackname and
"Outputs" in stack):
for output in stack["Outputs"]:
outputs[output["OutputKey"]] = output["OutputValue"]
return outputs
def group_results(results, filterfn = lambda x: True):
groups={}
for result in results:
leaguename=result["league"]
if filterfn(leaguename):
groups.setdefault(leaguename, [])
groups[leaguename].append(result)
return {k:sorted(v, key=lambda x: x["date"])
for k, v in groups.items()}
def push_results(table, leaguename, results):
print (leaguename)
with table.batch_writer() as batch:
for result in results:
item={"pk": f"LEAGUE#{leaguename}",
"sk": f"RESULT#{result['date']}/{result['name']}",
"score": result["score"]}
batch.put_item(Item=item)
if __name__ == "__main__":
props=load_props()
cf=boto3.client("cloudformation")
outputs=fetch_outputs(cf, props["AppName"])
tablename=outputs["AppTable"]
table=boto3.resource("dynamodb").Table(tablename)
results=group_results(yaml.safe_load(open("results.yaml").read()))
for leaguename in sorted(results.keys()):
if leaguename.startswith("ENG"):
push_results(table, leaguename, results[leaguename])
(env) jhw@Justins-Air 4618ef04563d248d57f41eae18a55626 % python put_events.py
{'FailedEntryCount': 0, 'Entries': [{'EventId': '5ceaf5af-597d-7982-7f81-219bb9991367'}], 'ResponseMetadata': {'RequestId': '3c9fc216-115a-4efe-92d2-bb5c4597fb31', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '3c9fc216-115a-4efe-92d2-bb5c4597fb31', 'content-type': 'application/x-amz-json-1.1', 'content-length': '85', 'date': 'Thu, 04 Jul 2024 05:25:47 GMT'}, 'RetryAttempts': 0}}
(env) jhw@Justins-Air 4618ef04563d248d57f41eae18a55626 % python dump_logs.py DemoFunction 60
INIT_START Runtime Version: python:3.10.v36 Runtime Version ARN: arn:aws:lambda:eu-west-1::runtime:bbd47e5ef4020932b9374e2ab9f9ed3bac502f27e17a031c35d9fb8935cf1f8c
START RequestId: 8bc06f0b-2471-410e-b6d6-f52844c6ac02 Version: $LATEST
[WARNING] 2024-07-04T05:25:48.057Z 8bc06f0b-2471-410e-b6d6-f52844c6ac02 {'version': '0', 'id': '5ceaf5af-597d-7982-7f81-219bb9991367', 'detail-type': 'INSERT', 'source': 'dynamodb-eventbridge-pipes-AppEnrichmentFunction-aEGR7w1BP6sw', 'account': '119552584133', 'time': '2024-07-04T05:25:47Z', 'region': 'eu-west-1', 'resources': [], 'detail': {'pk': 'LEAGUE#ENG4', 'sk': 'RESULT', 'eventName': 'INSERT', 'diffKeys': [], 'records': [{'eventID': '207b09e0f5e99984157ed7f8b2622a99', 'eventName': 'INSERT', 'eventVersion': '1.1', 'eventSource': 'aws:dynamodb', 'awsRegion': 'eu-west-1', 'dynamodb': {'ApproximateCreationDateTime': 1720068148, 'Keys': {'sk': {'S': 'RESULT#2024-02-24/Stockport County vs Swindon Town'}, 'pk': {'S': 'LEAGUE#ENG4'}}, 'NewImage': {'score': {'S': '0-0'}, 'sk': {'S': 'RESULT#2024-02-24/Stockport County vs Swindon Town'}, 'pk': {'S': 'LEAGUE#ENG4'}}, 'SequenceNumber': '3100000000084663692281', 'SizeBytes': 138, 'StreamViewType': 'NEW_AND_OLD_IMAGES'}, 'eventSourceARN': 'arn:aws:dynamodb:eu-west-1:119552584133:table/dynamodb-eventbridge-pipes-AppTable-1VFFZ5B8CHPNR/stream/2024-07-04T04:40:03.017'}]}}
import boto3, json
Entries=[{"Source": "dynamodb-eventbridge-pipes-AppEnrichmentFunction-aEGR7w1BP6sw", "DetailType": "INSERT", "Detail": "{\"pk\": \"LEAGUE#ENG4\", \"sk\": \"RESULT\", \"eventName\": \"INSERT\", \"diffKeys\": [], \"records\": [{\"eventID\": \"207b09e0f5e99984157ed7f8b2622a99\", \"eventName\": \"INSERT\", \"eventVersion\": \"1.1\", \"eventSource\": \"aws:dynamodb\", \"awsRegion\": \"eu-west-1\", \"dynamodb\": {\"ApproximateCreationDateTime\": 1720068148, \"Keys\": {\"sk\": {\"S\": \"RESULT#2024-02-24/Stockport County vs Swindon Town\"}, \"pk\": {\"S\": \"LEAGUE#ENG4\"}}, \"NewImage\": {\"score\": {\"S\": \"0-0\"}, \"sk\": {\"S\": \"RESULT#2024-02-24/Stockport County vs Swindon Town\"}, \"pk\": {\"S\": \"LEAGUE#ENG4\"}}, \"SequenceNumber\": \"3100000000084663692281\", \"SizeBytes\": 138, \"StreamViewType\": \"NEW_AND_OLD_IMAGES\"}, \"eventSourceARN\": \"arn:aws:dynamodb:eu-west-1:119552584133:table/dynamodb-eventbridge-pipes-AppTable-1VFFZ5B8CHPNR/stream/2024-07-04T04:40:03.017\"}]}"}]
if __name__ == "__main__":
events = boto3.client("events")
print (events.put_events(Entries = Entries))
awscli
boto3
botocore
pyyaml
- date: '2024-02-19'
league: ENG1
name: Everton vs Crystal Palace
score: 1-1
- date: '2024-02-19'
league: SPA1
name: Athletic Bilbao vs Girona
score: 3-2
- date: '2024-02-20'
league: ENG1
name: Manchester City vs Brentford
score: 1-0
- date: '2024-02-20'
league: ENG2
name: Cardiff City vs Blackburn Rovers
score: 0-0
- date: '2024-02-20'
league: ENG2
name: Ipswich Town vs Rotherham United
score: 4-3
- date: '2024-02-20'
league: ENG2
name: Plymouth Argyle vs West Bromwich Albion
score: 0-3
- date: '2024-02-20'
league: ENG2
name: Southampton vs Hull City
score: 1-2
- date: '2024-02-20'
league: ENG3
name: Cambridge United vs Bolton Wanderers
score: 1-2
- date: '2024-02-20'
league: ENG3
name: Oxford United vs Northampton Town
score: 2-2
- date: '2024-02-20'
league: ENG3
name: Reading vs Port Vale
score: 2-0
- date: '2024-02-20'
league: ENG4
name: AFC Wimbledon vs Crawley Town
score: 0-1
- date: '2024-02-20'
league: ENG4
name: Gillingham vs Stockport County
score: 0-0
- date: '2024-02-20'
league: ENG4
name: Milton Keynes Dons vs Wrexham
score: 1-1
- date: '2024-02-20'
league: ENG4
name: Walsall vs Morecambe
score: 3-0
- date: '2024-02-20'
league: SCO3
name: Falkirk vs Montrose
score: 3-0
- date: '2024-02-20'
league: SCO4
name: Clyde vs Forfar Athletic
score: 0-2
- date: '2024-02-20'
league: SCO4
name: Peterhead vs Bonnyrigg Rose
score: 0-0
- date: '2024-02-21'
league: ENG1
name: Liverpool vs Luton Town
score: 4-1
- date: '2024-02-22'
league: ITA1
name: Torino vs Lazio
score: 0-2
- date: '2024-02-23'
league: ENG2
name: Coventry City vs Preston North End
score: 0-3
- date: '2024-02-23'
league: ENG2
name: Leeds United vs Leicester City
score: 3-1
- date: '2024-02-23'
league: ENG3
name: Wigan Athletic vs Cheltenham Town
score: 1-1
- date: '2024-02-23'
league: FRA1
name: Metz vs Lyon
score: 1-2
- date: '2024-02-23'
league: GER1
name: Bayer 04 Leverkusen vs Mainz 05
score: 2-1
- date: '2024-02-23'
league: ITA1
name: Bologna vs Hellas Verona
score: 2-0
- date: '2024-02-23'
league: NED1
name: FC Utrecht vs Heracles Almelo
score: 1-0
- date: '2024-02-23'
league: SCO2
name: Partick Thistle vs Dunfermline
score: 1-3
- date: '2024-02-23'
league: SPA1
name: Real Sociedad vs Villarreal
score: 1-3
- date: '2024-02-24'
league: ENG1
name: AFC Bournemouth vs Manchester City
score: 0-1
- date: '2024-02-24'
league: ENG1
name: Arsenal vs Newcastle United
score: 4-1
- date: '2024-02-24'
league: ENG1
name: Aston Villa vs Nottingham Forest
score: 4-2
- date: '2024-02-24'
league: ENG1
name: Brighton & Hove Albion vs Everton
score: 1-1
- date: '2024-02-24'
league: ENG1
name: Crystal Palace vs Burnley
score: 3-0
- date: '2024-02-24'
league: ENG1
name: Manchester United vs Fulham
score: 1-2
- date: '2024-02-24'
league: ENG2
name: Blackburn Rovers vs Norwich City
score: 1-1
- date: '2024-02-24'
league: ENG2
name: Cardiff City vs Stoke City
score: 2-1
- date: '2024-02-24'
league: ENG2
name: Hull City vs West Bromwich Albion
score: 1-1
- date: '2024-02-24'
league: ENG2
name: Ipswich Town vs Birmingham City
score: 3-1
- date: '2024-02-24'
league: ENG2
name: Middlesbrough vs Plymouth Argyle
score: 0-2
- date: '2024-02-24'
league: ENG2
name: Queens Park Rangers vs Rotherham United
score: 2-1
- date: '2024-02-24'
league: ENG2
name: Sheffield Wednesday vs Bristol City
score: 2-1
- date: '2024-02-24'
league: ENG2
name: Southampton vs Millwall
score: 1-2
- date: '2024-02-24'
league: ENG2
name: Sunderland vs Swansea City
score: 1-2
- date: '2024-02-24'
league: ENG2
name: Watford vs Huddersfield Town
score: 1-2
- date: '2024-02-24'
league: ENG3
name: Barnsley vs Derby County
score: 2-1
- date: '2024-02-24'
league: ENG3
name: Blackpool vs Bolton Wanderers
score: 4-1
- date: '2024-02-24'
league: ENG3
name: Bristol Rovers vs Carlisle United
score: 2-1
- date: '2024-02-24'
league: ENG3
name: Burton Albion vs Northampton Town
score: 0-2
- date: '2024-02-24'
league: ENG3
name: Cambridge United vs Peterborough United
score: 0-1
- date: '2024-02-24'
league: ENG3
name: Charlton Athletic vs Portsmouth
score: 0-0
- date: '2024-02-24'
league: ENG3
name: Exeter City vs Fleetwood Town
score: 1-1
- date: '2024-02-24'
league: ENG3
name: Oxford United vs Leyton Orient
score: 1-2
- date: '2024-02-24'
league: ENG3
name: Port Vale vs Lincoln City
score: 0-2
- date: '2024-02-24'
league: ENG3
name: Reading vs Shrewsbury Town
score: 2-3
- date: '2024-02-24'
league: ENG3
name: Stevenage vs Wycombe Wanderers
score: 1-0
- date: '2024-02-24'
league: ENG4
name: Accrington Stanley vs Crawley Town
score: 0-1
- date: '2024-02-24'
league: ENG4
name: Doncaster Rovers vs AFC Wimbledon
score: 1-0
- date: '2024-02-24'
league: ENG4
name: Forest Green Rovers vs Tranmere Rovers
score: 1-0
- date: '2024-02-24'
league: ENG4
name: Gillingham vs Wrexham
score: 1-0
- date: '2024-02-24'
league: ENG4
name: Harrogate Town vs Walsall
score: 0-2
- date: '2024-02-24'
league: ENG4
name: Mansfield Town vs Salford City
score: 5-1
- date: '2024-02-24'
league: ENG4
name: Milton Keynes Dons vs Newport County
score: 3-0
- date: '2024-02-24'
league: ENG4
name: Morecambe vs Grimsby Town
score: 1-1
- date: '2024-02-24'
league: ENG4
name: Notts County vs Crewe Alexandra
score: 1-3
- date: '2024-02-24'
league: ENG4
name: Stockport County vs Swindon Town
score: 0-0
- date: '2024-02-24'
league: ENG4
name: Sutton United vs Colchester United
score: 1-1
- date: '2024-02-24'
league: FRA1
name: Lorient vs Nantes
score: 0-1
- date: '2024-02-24'
league: FRA1
name: Strasbourg vs Brest
score: 0-3
- date: '2024-02-24'
league: GER1
name: 1. FC Union Berlin vs 1. FC Heidenheim 1846
score: 2-2
- date: '2024-02-24'
league: GER1
name: Bayern Munich vs RB Leipzig
score: 2-1
- date: '2024-02-24'
league: GER1
name: "Borussia M\xF6nchengladbach vs VfL Bochum 1848"
score: 5-2
- date: '2024-02-24'
league: GER1
name: "VfB Stuttgart vs 1. FC K\xF6ln"
score: 1-1
- date: '2024-02-24'
league: GER1
name: Werder Bremen vs Darmstadt 98
score: 1-1
- date: '2024-02-24'
league: ITA1
name: Genoa vs Udinese
score: 2-0
- date: '2024-02-24'
league: ITA1
name: Salernitana vs Monza
score: 0-2
- date: '2024-02-24'
league: ITA1
name: Sassuolo vs Empoli
score: 2-3
- date: '2024-02-24'
league: NED1
name: N.E.C. vs Sparta Rotterdam
score: 2-0
- date: '2024-02-24'
league: NED1
name: PEC Zwolle vs PSV Eindhoven
score: 1-7
- date: '2024-02-24'
league: NED1
name: RKC Waalwijk vs Fortuna Sittard
score: 0-1
- date: '2024-02-24'
league: SCO2
name: Arbroath vs Inverness Caledonian Thistle
score: 1-1
- date: '2024-02-24'
league: SCO2
name: Ayr United vs Raith Rovers
score: 1-2
- date: '2024-02-24'
league: SCO2
name: Dundee United vs Queen's Park
score: 3-1
- date: '2024-02-24'
league: SCO2
name: Greenock Morton vs Airdrieonians
score: 2-1
- date: '2024-02-24'
league: SCO3
name: Cove Rangers vs Annan Athletic
score: 2-1
- date: '2024-02-24'
league: SCO3
name: Hamilton Academical vs Alloa Athletic
score: 1-2
- date: '2024-02-24'
league: SCO3
name: Kelty Hearts vs Falkirk
score: 0-1
- date: '2024-02-24'
league: SCO3
name: Queen of the South vs Montrose
score: 2-3
- date: '2024-02-24'
league: SCO3
name: Stirling Albion vs Edinburgh City
score: 4-0
- date: '2024-02-24'
league: SCO4
name: Dumbarton vs Stranraer
score: 2-1
- date: '2024-02-24'
league: SCO4
name: East Fife vs Forfar Athletic
score: 1-1
- date: '2024-02-24'
league: SCO4
name: Peterhead vs Elgin City
score: 1-1
- date: '2024-02-24'
league: SCO4
name: Stenhousemuir vs Clyde
score: 1-6
- date: '2024-02-24'
league: SCO4
name: The Spartans vs Bonnyrigg Rose
score: 2-1
- date: '2024-02-24'
league: SPA1
name: "Alav\xE9s vs Mallorca"
score: 1-1
- date: '2024-02-24'
league: SPA1
name: "Almer\xEDa vs Atl\xE9tico Madrid"
score: 2-2
- date: '2024-02-24'
league: SPA1
name: Barcelona vs Getafe
score: 4-0
- date: '2024-02-25'
league: ENG1
name: Wolverhampton Wanderers vs Sheffield United
score: 1-0
- date: '2024-02-25'
league: FRA1
name: Le Havre vs Reims
score: 1-2
- date: '2024-02-25'
league: FRA1
name: Lens vs Monaco
score: 2-3
- date: '2024-02-25'
league: FRA1
name: Marseille vs Montpellier
score: 4-1
- date: '2024-02-25'
league: FRA1
name: Nice vs Clermont
score: 0-0
- date: '2024-02-25'
league: FRA1
name: Paris Saint Germain vs Rennes
score: 1-1
- date: '2024-02-25'
league: FRA1
name: Toulouse vs Lille
score: 3-1
- date: '2024-02-25'
league: GER1
name: Borussia Dortmund vs 1899 Hoffenheim
score: 2-3
- date: '2024-02-25'
league: GER1
name: Eintracht Frankfurt vs VfL Wolfsburg
score: 2-2
- date: '2024-02-25'
league: GER1
name: FC Augsburg vs SC Freiburg
score: 2-1
- date: '2024-02-25'
league: ITA1
name: AC Milan vs Atalanta
score: 1-1
- date: '2024-02-25'
league: ITA1
name: Cagliari vs Napoli
score: 1-1
- date: '2024-02-25'
league: ITA1
name: Juventus vs Frosinone
score: 3-2
- date: '2024-02-25'
league: ITA1
name: Lecce vs Inter Milan
score: 0-4
- date: '2024-02-25'
league: NED1
name: AZ Alkmaar vs Ajax
score: 2-0
- date: '2024-02-25'
league: NED1
name: Almere City FC vs Feyenoord
score: 0-2
- date: '2024-02-25'
league: NED1
name: Excelsior vs Vitesse
score: 1-2
- date: '2024-02-25'
league: NED1
name: FC Twente vs Go Ahead Eagles
score: 3-0
- date: '2024-02-25'
league: NED1
name: FC Volendam vs sc Heerenveen
score: 0-4
- date: '2024-02-25'
league: SPA1
name: "C\xE1diz vs Celta Vigo"
score: 2-2
- date: '2024-02-25'
league: SPA1
name: Las Palmas vs Osasuna
score: 1-1
- date: '2024-02-25'
league: SPA1
name: Real Betis vs Athletic Bilbao
score: 3-1
- date: '2024-02-25'
league: SPA1
name: Real Madrid vs Sevilla
score: 1-0
#!/usr/bin/env bash
export AWS_DEFAULT_OUTPUT=table
export AWS_PROFILE=woldeploy
export AWS_REGION=eu-west-1
export DOMAIN_NAME=polyreader.net
export GOOGLE_CLIENT_ID=363643387528-gcr335jr2l0rqcca7n33cjgqds44ts8e.apps.googleusercontent.com
export GOOGLE_CLIENT_SECRET=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
{
"Outputs": {
"AppTable": {
"Value": {
"Ref": "AppTable"
}
}
},
"Resources": {
"AppTable": {
"Properties": {
"AttributeDefinitions": [
{
"AttributeName": "pk",
"AttributeType": "S"
},
{
"AttributeName": "sk",
"AttributeType": "S"
}
],
"BillingMode": "PAY_PER_REQUEST",
"KeySchema": [
{
"AttributeName": "pk",
"KeyType": "HASH"
},
{
"AttributeName": "sk",
"KeyType": "RANGE"
}
],
"StreamSpecification": {
"StreamViewType": "NEW_AND_OLD_IMAGES"
}
},
"Type": "AWS::DynamoDB::Table"
},
"AppEnrichmentFunction": {
"Properties": {
"Code": {
"ZipFile": "import json\n\nclass Key:\n\n def __init__(self, pk, sk, event_name, diff_keys):\n self.pk = pk\n self.sk = sk\n self.event_name = event_name\n self.diff_keys = diff_keys\n\n def __str__(self):\n return \"%s/%s/%s/%s\" % (self.pk,\n self.sk,\n self.event_name,\n \"|\".join(self.diff_keys))\n \n\"\"\"\n- EventBridge required fields are Source, DetailType, Detail\n- record[\"eventName\"] is used as DetailType\n- eventName could be INSERT, MODIFY, DELETE\n\"\"\"\n \nclass Entry:\n\n def __init__(self, key, records, source):\n self.key = key\n self.records = records\n self.source = source\n\n @property\n def entry(self): \n detail = {\"pk\": self.key.pk,\n \"sk\": self.key.sk,\n \"eventName\": self.key.event_name,\n \"diffKeys\": self.key.diff_keys,\n \"records\": self.records}\n detail_type = self.key.event_name\n return {\"Source\": self.source,\n \"DetailType\": detail_type,\n \"Detail\": json.dumps(detail)}\n\n\"\"\"\n- keys are diff'ed because it's useful to be able to match on changed attributes if a record is being updated one attr at a time, and change on one attr triggers change in another\n\"\"\"\n \ndef diff_keys(record):\n if not (\"NewImage\" in record[\"dynamodb\"] and\n \"OldImage\" in record[\"dynamodb\"]):\n return [] \n new_image={k: list(v.values())[0]\n for k, v in record[\"dynamodb\"][\"NewImage\"].items()}\n old_image={k: list(v.values())[0]\n for k, v in record[\"dynamodb\"][\"OldImage\"].items()}\n diff_keys=[]\n for k in new_image:\n if (k not in old_image or\n new_image[k] != old_image[k]):\n diff_keys.append(k)\n return sorted(diff_keys) # NB sort\n \ndef batch_records(records):\n keys, groups = {}, {}\n for record in records:\n pk = record[\"dynamodb\"][\"Keys\"][\"pk\"][\"S\"]\n sk = record[\"dynamodb\"][\"Keys\"][\"sk\"][\"S\"].split(\"#\")[0]\n event_name = record[\"eventName\"]\n diffed_keys = diff_keys(record)\n key = Key(pk = pk,\n sk = sk,\n event_name = event_name,\n diff_keys = diffed_keys)\n strkey = str(key)\n if strkey not in keys:\n keys[strkey] = key\n groups.setdefault(strkey, [])\n groups[strkey].append(record)\n return [(key, groups[strkey])\n for strkey, key in keys.items()]\n\ndef handler(event, context=None):\n print (\"IN: %s\" % json.dumps(event))\n source = context.function_name\n # groups = batch_records(event[\"Records\"])\n groups = batch_records(event)\n entries = [Entry(k, v, source).entry for k, v in groups]\n print (\"OUT: %s\" % json.dumps(entries))\n return entries\n"
},
"Handler": "index.handler",
"MemorySize": 512,
"Role": {
"Fn::GetAtt": [
"AppEnrichmentRole",
"Arn"
]
},
"Runtime": "python3.10",
"Timeout": 5
},
"Type": "AWS::Lambda::Function"
},
"AppEnrichmentRole": {
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
}
}
],
"Version": "2012-10-17"
}
},
"Type": "AWS::IAM::Role"
},
"AppEnrichmentPolicy": {
"Properties": {
"PolicyDocument": {
"Statement": [
{
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Effect": "Allow",
"Resource": "*"
}
],
"Version": "2012-10-17"
},
"PolicyName": {
"Fn::Sub": "app-enrichment-policy-${AWS::StackName}"
},
"Roles": [
{
"Ref": "AppEnrichmentRole"
}
]
},
"Type": "AWS::IAM::Policy"
},
"AppPipeRole": {
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": "pipes.amazonaws.com"
}
}
],
"Version": "2012-10-17"
}
},
"Type": "AWS::IAM::Role"
},
"AppPipePolicy": {
"Properties": {
"PolicyDocument": {
"Statement": [
{
"Action": [
"dynamodb:GetRecords",
"dynamodb:GetShardIterator",
"dynamodb:DescribeStream",
"dynamodb:ListStreams"
],
"Effect": "Allow",
"Resource": {
"Fn::GetAtt": [
"AppTable",
"StreamArn"
]
}
},
{
"Action": [
"lambda:InvokeFunction"
],
"Effect": "Allow",
"Resource": "*"
},
{
"Action": [
"events:PutEvents"
],
"Effect": "Allow",
"Resource": "*"
}
],
"Version": "2012-10-17"
},
"PolicyName": {
"Fn::Sub": "app-pipe-policy-${AWS::StackName}"
},
"Roles": [
{
"Ref": "AppPipeRole"
}
]
},
"Type": "AWS::IAM::Policy"
},
"AppPipe": {
"Properties": {
"Source": {
"Fn::GetAtt": [
"AppTable",
"StreamArn"
]
},
"Enrichment": {
"Fn::GetAtt": [
"AppEnrichmentFunction",
"Arn"
]
},
"RoleArn": {
"Fn::GetAtt": [
"AppPipeRole",
"Arn"
]
},
"Target": {
"Fn::Sub": "arn:aws:events:${AWS::Region}:${AWS::AccountId}:event-bus/default"
},
"SourceParameters": {
"DynamoDBStreamParameters": {
"MaximumRetryAttempts": 0,
"MaximumBatchingWindowInSeconds": 1,
"StartingPosition": "LATEST"
}
}
},
"Type": "AWS::Pipes::Pipe",
"DependsOn": [
"AppTable",
"AppEnrichmentFunction"
]
},
"AppEnrichmentFunctionPermission": {
"Type": "AWS::Lambda::Permission",
"Properties": {
"Action": "lambda:InvokeFunction",
"FunctionName": {
"Ref": "AppEnrichmentFunction",
},
"Principal": "pipes.amazonaws.com",
"SourceArn": {
"Fn::GetAtt": [
"AppPipe",
"Arn"
]
}
}
},
"DemoEventInvokeConfig": {
"Properties": {
"FunctionName": {
"Ref": "DemoFunction"
},
"MaximumRetryAttempts": 0,
"Qualifier": "$LATEST"
},
"Type": "AWS::Lambda::EventInvokeConfig"
},
"DemoFunction": {
"Properties": {
"Code": {
"ZipFile": "\nimport logging\n\nlogger=logging.getLogger()\nlogger.setLevel(logging.INFO)\n\ndef handler(event, context=None):\n logger.warning(str(event))\n"
},
"Handler": "index.handler",
"MemorySize": 512,
"Role": {
"Fn::GetAtt": [
"DemoRole",
"Arn"
]
},
"Runtime": "python3.10",
"Timeout": 5
},
"Type": "AWS::Lambda::Function"
},
"DemoPermission": {
"Properties": {
"Action": "lambda:InvokeFunction",
"FunctionName": {
"Ref": "DemoFunction"
},
"Principal": "events.amazonaws.com",
"SourceArn": {
"Fn::GetAtt": [
"DemoRule",
"Arn"
]
}
},
"Type": "AWS::Lambda::Permission"
},
"DemoPolicy": {
"Properties": {
"PolicyDocument": {
"Statement": [
{
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Effect": "Allow",
"Resource": "*"
}
],
"Version": "2012-10-17"
},
"PolicyName": {
"Fn::Sub": "demo-policy-${AWS::StackName}"
},
"Roles": [
{
"Ref": "DemoRole"
}
]
},
"Type": "AWS::IAM::Policy"
},
"DemoRole": {
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": [
"sts:AssumeRole"
],
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
}
}
],
"Version": "2012-10-17"
}
},
"Type": "AWS::IAM::Role"
},
"DemoRule": {
"Properties": {
"EventPattern": {
"detail": {
"eventName": [
"INSERT"
],
"pk": [
{
"prefix": "LEAGUE"
}
]
}
},
"State": "ENABLED",
"Targets": [
{
"Arn": {
"Fn::GetAtt": [
"DemoFunction",
"Arn"
]
},
"Id": {
"Fn::Sub": "demo-rule"
}
}
]
},
"Type": "AWS::Events::Rule"
}
}
}
from enrichment import handler
class Context:
def __init__(self):
self.function_name = "whatevs"
if __name__ == "__main__":
event=[{'eventID': 'ef3a95fd09bc49aa9fc73eb97df6cd44', 'eventName': 'INSERT', 'eventVersion': '1.1', 'eventSource': 'aws:dynamodb', 'awsRegion': 'eu-west-1', 'dynamodb': {'ApproximateCreationDateTime': 1720023912, 'Keys': {'sk': {'S': 'RESULT#2024-02-24/Morecambe vs Grimsby Town'}, 'pk': {'S': 'LEAGUE#ENG4'}}, 'NewImage': {'score': {'S': '1-1'}, 'sk': {'S': 'RESULT#2024-02-24/Morecambe vs Grimsby Town'}, 'pk': {'S': 'LEAGUE#ENG4'}}, 'SequenceNumber': '3100000000098083738482', 'SizeBytes': 124, 'StreamViewType': 'NEW_AND_OLD_IMAGES'}, 'eventSourceARN': 'arn:aws:dynamodb:eu-west-1:119552584133:table/dynamodb-eventbridge-pipes-AppTable-1UJAPOHAXHWW2/stream/2024-07-03T16:23:38.945'}]
handler(event, Context())

short

  • does pipe need both lambda:invokeFunction Permission and IAM Policy entry?

medium

  • group results by date
  • large message error handling

done

  • script to insert enrichment.py into template
  • move code inline
  • script to dump demo function logs
  • script to push dynamodb item
  • enrichment.py
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment