Skip to content

Instantly share code, notes, and snippets.

@Melraidin
Created December 20, 2016 19:05
Show Gist options
  • Save Melraidin/202fc2097159c677fa21376414567cee to your computer and use it in GitHub Desktop.
Save Melraidin/202fc2097159c677fa21376414567cee to your computer and use it in GitHub Desktop.
Query Fastly data in Luigi tasks
#!/usr/bin/python
"""
ETL for Fastly views.
"""
import datetime
import re
import luigi
import athena
s3_table_data_path = "s3://fastly-logs/date=%s/"
class UpdateFastlyPartitions(athena.AthenaQuery):
"""
Repair Fastly partitions.
"""
table_name = "fastly"
results_path_template = "repair-table-fastly/%s"
def run(self):
# An alternative approach would be to simply use an
# "msck repair table fastly" statement but this is very slow at Athena.
client = luigi.s3.S3Client()
paths = {}
for date in [self.date, self.date - datetime.timedelta(days=1)]:
paths[str(date)] = client.list(s3_table_data_path % date)
completed = set()
for date, paths in paths.iteritems():
for path in paths:
full_path = (s3_table_data_path % date) + path
matches = re.findall(r"date=([^/]+)/hour=([0-9]+)", full_path)
if len(matches) != 1:
continue
date, hour = matches[0][0], matches[0][1]
if (date, hour) in completed:
continue
shard_path = (s3_table_data_path % date) + ("hour=%s" % hour)
try:
self.query_store("""
alter table {table} add if not exists partition ( service='drscdn.500px.org', date='{date}', hour={hour} ) location '{path}';
""".format(table=self.table_name, date=date, hour=hour, path=shard_path))
except Exception:
pass
completed.add((date, hour))
class QueryViewsByLocation(athena.AthenaQuery):
"""
Query Athena for photo views by location.
"""
results_path_template = "fastly-views-by-location/%s"
def requires(self):
return [UpdateFastlyPartitions(date=self.date)]
def run(self):
self.query_store("""
select date_format(date, '%Y-%m-%d') || ' ' || cast(hour as varchar) || ':00' as date, cast(regexp_extract(path, 'photo/([0-9]+)', 1) as integer) as photo_id, cast(count(*) as integer) as views, round(latitude, 4) as latitude, round(longitude, 4) as longitude from fastly where cast(regexp_extract(path, 'photo/[0-9]+/[^/]*[mhw](?:%%3D|=)([0-9]+)', 1) as bigint) > 600 and response_code in ( 200, 304 ) and date = date '{date}' group by regexp_extract(path, 'photo/([0-9]+)', 1), date, hour, round(latitude, 4), round(longitude, 4)
""".format(date=self.date - datetime.timedelta(days=1)))
class LoadViewsByLocation(athena.AthenaLoad):
"""
Load photo views by location.
"""
table = "fastly_views_by_location"
def requires(self):
return QueryViewsByLocation(date=self.date)
def create_table(self, connection):
connection.cursor().execute("""
create table {table} (
date datetime encode lzo,
photo_id int encode lzo,
views int encode lzo,
latitude decimal(6, 3) encode lzo,
longitude decimal(6, 3) encode lzo
);
""".format(table=self.table))
if __name__ == '__main__':
luigi.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment