Skip to content

Instantly share code, notes, and snippets.

@PeskyPotato
Last active June 14, 2024 07:47
Show Gist options
  • Save PeskyPotato/249f5b97037ba501794d3a706580abd6 to your computer and use it in GitHub Desktop.
Save PeskyPotato/249f5b97037ba501794d3a706580abd6 to your computer and use it in GitHub Desktop.
Repartition objects through Kinesis
import boto3
import json
from time import sleep
import os
src_bucket = os.getenv('SRC_BUCKET','src-bkt')
dst_bucket = os.getenv('DST_BUCKET','dst-bkt')
region = os.getenv('REGION', 'eu-central-1')
stream_name = os.getenv('STREAM_NAME', 'kinesis-datastream')
def keys(bucket_name, prefix='/', delimiter='/'):
prefix = prefix.lstrip(delimiter)
bucket = boto3.resource('s3').Bucket(bucket_name)
return (_.key for _ in bucket.objects.filter(Prefix=prefix))
def check_type(data):
try:
json.loads(data)
print("is JSON")
return [data]
except ValueError as e:
# newline delimited json
lines = data.decode('utf-8')
if "\n" in lines:
lines = lines.split('\n')
print("is JSON - NLD")
ans = []
for line in lines:
ans.append(line.encode('utf-8'))
# print(ans[:2])
return ans
else: # one line json objects
lines = lines.split("}{")
print("is JSON - 1L")
first = True
ans = []
idx = 0
for line in lines:
if first:
line = line + "}"
ans.append(line.encode('utf-8'))
first = False
elif idx == len(lines) - 1:
line = "{" + line
ans.append(line.encode('utf-8'))
first = False
else:
line = "{" + line + "}"
ans.append(line.encode('utf-8'))
idx += 1
json.loads(line.encode('utf-8'))
return ans
return []
s3_client = boto3.client('s3')
kinesis_client = boto3.client('kinesis', region_name=region)
print(src_bucket, dst_bucket, region, stream_name)
for x in keys(src_bucket):
# print(x)
response = s3_client.get_object(
Bucket=src_bucket,
Key=x
)
# object_body = response['Body'].read()
lines = check_type(response['Body'].read())
for line in lines:
kinesis_client.put_record(
StreamName=stream_name,
Data = line,
PartitionKey = 'partition-key'
)
sleep(1)
sleep(2)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment