Skip to content

Instantly share code, notes, and snippets.

@feluelle
Last active August 14, 2024 19:58
Show Gist options
  • Save feluelle/ff7d14617c983a3230d91b180d206ca2 to your computer and use it in GitHub Desktop.
Save feluelle/ff7d14617c983a3230d91b180d206ca2 to your computer and use it in GitHub Desktop.
Google API (and S3) integration in Airflow
#
# -------------------- authorize script --------------------
#
# -*- coding: utf-8 -*-
"""
Just adjust the global variables accordingly and then run this script from the command line.
Make sure the correct client_secret_file is selected,
and that it has the necessary right configured.
Create a new connection in the Airflow server where:
host: SCOPES
schema: refresh_token
login: client_id
password: client_secret
"""
from google_auth_oauthlib.flow import InstalledAppFlow
CLIENT_SECRETS_FILE = '../client_secret.json'
SCOPES = [
'https://www.googleapis.com/auth/analytics.readonly', # Google Analytics API
'https://www.googleapis.com/auth/yt-analytics.readonly', # YouTube Analytics API
'https://www.googleapis.com/auth/yt-analytics-monetary.readonly', # YouTube Reporting API
'https://www.googleapis.com/auth/youtube', # YouTube Data API
'https://www.googleapis.com/auth/spreadsheets.readonly', # Google Sheets API
]
def authorize():
flow = InstalledAppFlow.from_client_secrets_file(CLIENT_SECRETS_FILE, SCOPES)
credentials = flow.run_console()
print('Refresh Token: ', credentials.refresh_token)
print('Client ID: ', credentials.client_id)
print('Client Secret: ', credentials.client_secret)
if __name__ == "__main__":
authorize()
#
# -------------------- GoogleApiClientHook --------------------
#
from airflow.hooks.base_hook import BaseHook
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
class GoogleApiClientHook(BaseHook):
"""
A hook to use the Google API Client library.
:param google_api_client_conn_id: Holds the connection information needed for
an google api client to be authenticated.
See `this <../../utils/authorize_google_api.py>`_
for more information.
:type google_api_client_conn_id: str
:param api_service_name: The name of the api service that is needed to get the data
for example 'youtube'.
:type api_service_name: str
:param api_version: The version of the api that will be requested for example 'v3'.
:type api_version: str
"""
def __init__(self, google_api_client_conn_id, api_service_name, api_version):
super(GoogleApiClientHook, self).__init__(google_api_client_conn_id)
self.google_api_client_conn_id = google_api_client_conn_id
self.api_service_name = api_service_name
self.api_version = api_version
def get_conn(self):
"""
Creates an authenticated api client for the given api service name and credentials.
:return: the authenticated api service.
"""
self.log.info("Authenticating Google API Client")
credentials = self._get_credentials_from_connection()
api_service = build(
serviceName=self.api_service_name,
version=self.api_version,
credentials=credentials,
cache_discovery=False
)
return api_service
def _get_credentials_from_connection(self):
conn = self.get_connection(self.google_api_client_conn_id)
info = {
'refresh_token': conn.schema,
'client_id': conn.login,
'client_secret': conn.password
}
scopes = conn.host.split(',')
return Credentials.from_authorized_user_info(info=info, scopes=scopes)
#
# -------------------- GoogleApiToS3Transfer --------------------
#
# -*- coding: utf-8 -*-
import json
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from hooks.google_api_client_hook import GoogleApiClientHook
from hooks.s3_hook import S3Hook
class GoogleApiToS3Transfer(BaseOperator):
"""
Basic class for transferring data from an Google APIs endpoint into a S3 Bucket.
:param google_api_service_name: The specific API service that is being requested.
:type google_api_service_name: str
:param google_api_service_version: The version of the API that is being requested.
:type google_api_service_version: str
:param google_api_endpoint_path: The client libraries path to the api call's executing method.
For example: 'analyticsreporting.reports.batchGet'
NOTE: See https://developers.google.com/apis-explorer
for more information on what methods are available.
:type google_api_endpoint_path: str
:param google_api_endpoint_params: The params to control the corresponding endpoint result.
:type google_api_endpoint_params: dict
:param s3_destination_key: The url where to put the data retrieved
from the endpoint in S3.
:type s3_destination_key: str
:param google_api_response_via_xcom: Can be set to expose the google api response
to xcom.
The default value is None.
:type google_api_response_via_xcom: str
:param google_api_endpoint_params_via_xcom: If set to a value this value will be used as a key
for pulling from xcom and updating the google api
endpoint params.
The default value is None.
:type google_api_endpoint_params_via_xcom: str
:param google_api_endpoint_params_via_xcom_task_ids: Task ids to filter xcom by.
The default value is None.
:type google_api_endpoint_params_via_xcom_task_ids: str or list of str
:param google_api_pagination: If set to True Pagination will be enabled for this request
to retrieve all data.
NOTE: This means the response will be a list of responses.
:type google_api_pagination: bool
:param s3_overwrite: Specifies whether the s3 file will be overwritten if exists.
The default value is True.
:type s3_overwrite: bool
:param google_api_client_conn_id: The connection id specifying the authentication information
to Google's API.
The default value is 'google_api_default'.
:type google_api_client_conn_id: str
:param s3_conn_id: The connection id specifying the authentication information
for the S3 Bucket.
The default value is 's3_default'.
:type s3_conn_id: str
"""
template_fields = ('google_api_endpoint_params', 's3_destination_key',)
template_ext = ()
ui_color = '#cc181e'
@apply_defaults
def __init__(self,
google_api_service_name,
google_api_service_version,
google_api_endpoint_path,
google_api_endpoint_params,
s3_destination_key,
*args,
google_api_response_via_xcom=None,
google_api_endpoint_params_via_xcom=None,
google_api_endpoint_params_via_xcom_task_ids=None,
google_api_pagination=False,
s3_overwrite=True,
google_api_client_conn_id='google_api_default',
s3_conn_id='s3_default',
**kwargs):
super(GoogleApiToS3Transfer, self).__init__(*args, **kwargs)
self.google_api_service_name = google_api_service_name
self.google_api_service_version = google_api_service_version
self.google_api_endpoint_path = google_api_endpoint_path
self.google_api_endpoint_params = google_api_endpoint_params
self.s3_destination_key = s3_destination_key
self.google_api_response_via_xcom = google_api_response_via_xcom
self.google_api_endpoint_params_via_xcom = google_api_endpoint_params_via_xcom
self.google_api_endpoint_params_via_xcom_task_ids = \
google_api_endpoint_params_via_xcom_task_ids
self.google_api_pagination = google_api_pagination
self.s3_overwrite = s3_overwrite
self.google_api_client_conn_id = google_api_client_conn_id
self.s3_conn_id = s3_conn_id
def execute(self, context):
"""
Transfers Google APIs json data to S3.
:param context: The context that is being provided when executing.
:type context: dict
"""
self.log.info('Transferring data from %s to s3', self.google_api_service_name)
if self.google_api_endpoint_params_via_xcom:
self._update_google_api_endpoint_params_via_xcom(context['task_instance'])
data = self._retrieve_data_from_google_api()
if self.google_api_response_via_xcom:
self._expose_google_api_response_via_xcom(context['task_instance'], data)
self._load_data_to_s3(data)
def _retrieve_data_from_google_api(self):
google_api_client = GoogleApiClientHook(
google_api_client_conn_id=self.google_api_client_conn_id,
api_service_name=self.google_api_service_name,
api_version=self.google_api_service_version
)
google_api_conn_client = google_api_client.get_conn()
google_api_response = self._call_google_api_request(google_api_conn_client)
return google_api_response
def _load_data_to_s3(self, data):
s3_hook = S3Hook(s3_conn_id=self.s3_conn_id)
s3_hook.load_string(
string_data=json.dumps(data),
key=self.s3_destination_key,
replace=self.s3_overwrite
)
def _build_google_api_request(self, google_api_conn_client, google_api_sub_functions):
for sub_function in google_api_sub_functions:
google_api_conn_client = getattr(google_api_conn_client, sub_function)
if sub_function != google_api_sub_functions[-1]:
google_api_conn_client = google_api_conn_client()
else:
google_api_conn_client = google_api_conn_client(**self.google_api_endpoint_params)
return google_api_conn_client
def _call_google_api_request(self, google_api_conn_client):
google_api_endpoint_parts = self.google_api_endpoint_path.split('.')
google_api_endpoint_instance = self._build_google_api_request(
google_api_conn_client,
google_api_endpoint_parts[1:]
)
if self.google_api_pagination:
return self._paginate_google_api(
google_api_endpoint_instance,
google_api_conn_client,
google_api_endpoint_parts
)
return google_api_endpoint_instance.execute()
def _update_google_api_endpoint_params_via_xcom(self, task_instance):
google_api_endpoint_params = task_instance.xcom_pull(
task_ids=self.google_api_endpoint_params_via_xcom_task_ids,
key=self.google_api_endpoint_params_via_xcom
)
self.google_api_endpoint_params.update(google_api_endpoint_params)
def _expose_google_api_response_via_xcom(self, task_instance, data):
task_instance.xcom_push(key=self.google_api_response_via_xcom, value=data)
def _paginate_google_api(self,
google_api_endpoint_instance,
google_api_conn_client,
google_api_endpoint_parts):
google_api_responses = []
while google_api_endpoint_instance:
google_api_response = google_api_endpoint_instance.execute()
google_api_responses.append(google_api_response)
google_api_endpoint_instance = self._build_next_google_api_request(
google_api_conn_client,
google_api_endpoint_parts[1:],
google_api_endpoint_instance,
google_api_response
)
return google_api_responses
def _build_next_google_api_request(self,
google_api_conn_client,
google_api_sub_functions,
google_api_endpoint_instance,
google_api_response):
for sub_function in google_api_sub_functions:
if sub_function != google_api_sub_functions[-1]:
google_api_conn_client = getattr(google_api_conn_client, sub_function)
google_api_conn_client = google_api_conn_client()
else:
google_api_conn_client = getattr(google_api_conn_client, sub_function + '_next')
google_api_conn_client = google_api_conn_client(
google_api_endpoint_instance,
google_api_response
)
return google_api_conn_client
#
# -------------------- Google API DAG examples --------------------
#
# -*- coding: utf-8 -*-
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.talos import GoogleApiToS3Transfer
default_args = {
'owner': 'Felix Uellendall',
'start_date': datetime.utcnow(),
}
dag = DAG(
dag_id='google_api_to_s3_example',
default_args=default_args,
schedule_interval=timedelta(minutes=10)
)
t1 = GoogleApiToS3Transfer(
google_api_service_name='analytics',
google_api_service_version='v4',
google_api_endpoint_path='analyticsreporting.reports.batchGet',
google_api_endpoint_params={
'body': {
'reportRequests': [{
'viewId': 'id',
'dateRanges': [{'startDate': '7daysAgo', 'endDate': 'today'}],
'metrics': [{'expression': 'ga:sessions'}],
'dimensions': [{'name': 'ga:country'}]
}]
}
},
s3_destination_key='internal/google_analytics_reporting_example.json',
task_id='google_analytics_reporting_to_s3',
dag=dag
)
t2 = GoogleApiToS3Transfer(
google_api_service_name='youtubeAnalytics',
google_api_service_version='v2',
google_api_endpoint_path='youtubeAnalytics.reports.query',
google_api_endpoint_params={
'ids': 'channel==MINE',
'startDate': '2017-01-01',
'endDate': '2017-12-31',
'metrics': 'estimatedMinutesWatched,views,likes,subscribersGained',
'dimensions': 'day',
'sort': 'day'
},
s3_destination_key='internal/youtube_analytics_example.json',
task_id='youtube_analytics_to_s3',
dag=dag
)
t3 = GoogleApiToS3Transfer(
google_api_service_name='youtubereporting',
google_api_service_version='v1',
google_api_endpoint_path='youtubereporting.jobs.list',
google_api_endpoint_params={},
s3_destination_key='internal/youtube_reporting_example.json',
task_id='youtube_reporting_to_s3',
dag=dag
)
t4 = GoogleApiToS3Transfer(
google_api_service_name='youtube',
google_api_service_version='v3',
google_api_endpoint_path='youtube.channels.list',
google_api_endpoint_params={
'part': 'brandingSettings',
'mine': True
},
s3_destination_key='internal/youtube_data_example.json',
task_id='youtube_data_to_s3',
dag=dag
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment