Last active
May 9, 2019 19:57
-
-
Save s-fujimoto/c97ff3d098d0d280aefcfe2252a7b23e to your computer and use it in GitHub Desktop.
Transfer s3 to Elasticsearch log from by Lambda
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
import os | |
import gzip | |
from datetime import datetime | |
from botocore.awsrequest import AWSRequest | |
from botocore.auth import SigV4Auth | |
from botocore.endpoint import BotocoreHTTPSession | |
from botocore.credentials import Credentials | |
def lambda_handler(event, context): | |
print('Started') | |
es_host = os.environ['ES_HOST'] | |
es_index = os.environ['ES_INDEX_PREFIX'] + "-" + datetime.strftime(datetime.now(), "%Y%m%d") | |
credentials = Credentials( | |
os.environ["AWS_ACCESS_KEY_ID"], | |
os.environ["AWS_SECRET_ACCESS_KEY"], | |
os.environ["AWS_SESSION_TOKEN"]) | |
bucket = event["Records"][0]["s3"]["bucket"]["name"] | |
key = event["Records"][0]["s3"]["object"]["key"] | |
s3 = boto3.resource('s3') | |
s3.Bucket(bucket).download_file(key, '/tmp/log.gz') | |
with gzip.open('/tmp/log.gz') as f: | |
data = "" | |
for line in f: | |
data += '{"index":{"_index":"%s","_type":"log"}}\n' % es_index | |
data += '{"message":"%s"}\n' % line.strip().replace('"', '\\"') | |
if len(data) > 3000000: | |
_bulk(es_host, data, credentials) | |
data = "" | |
if data != "": | |
_bulk(es_host, data, credentials) | |
return 'Completed' | |
def _bulk(host, doc, credentials): | |
pipeline = os.environ['PIPELINE_NAME'] | |
url = 'https://%s/_bulk?pipeline=%s' % (host, pipeline) | |
response = request(url, "POST", credentials, 'es', data=doc) | |
if not response.ok: | |
print(response.text) | |
def request(url, method, credentials, service_name, region=None, headers=None, data=None): | |
if not region: | |
region = os.environ["AWS_REGION"] | |
aws_request = AWSRequest(url=url, method=method, headers=headers, data=data) | |
SigV4Auth(credentials, service_name, region).add_auth(aws_request) | |
return BotocoreHTTPSession().send(aws_request.prepare()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment