From 2b0a91db51d320470df13ba2fce1dbe09d319687 Mon Sep 17 00:00:00 2001 From: Benjamin Gregory Date: Tue, 14 Nov 2017 01:01:50 -0500 Subject: [PATCH] ic --- .gitignore | 1 + README.md | 1 + __init__.py | 14 ++++ hooks/__init__.py | 0 hooks/facebook_ads_hook.py | 48 ++++++++++++ operators/__init__.py | 0 operators/facebook_ads_to_s3_operator.py | 98 ++++++++++++++++++++++++ 7 files changed, 162 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 __init__.py create mode 100644 hooks/__init__.py create mode 100644 hooks/facebook_ads_hook.py create mode 100644 operators/__init__.py create mode 100644 operators/facebook_ads_to_s3_operator.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e43b0f9 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.DS_Store diff --git a/README.md b/README.md new file mode 100644 index 0000000..ba02655 --- /dev/null +++ b/README.md @@ -0,0 +1 @@ +# Airflow Plugin - Facebook Ads diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..5c8d1e5 --- /dev/null +++ b/__init__.py @@ -0,0 +1,14 @@ +from airflow.plugins_manager import AirflowPlugin +from facebook_ads_plugin.hooks.facebook_ads_hook import FacebookAdsHook +from facebook_ads_plugin.operators.facebook_ads_to_s3_operator import FacebookAdsInsightsToS3Operator + + +class PGFacebookAdsPlugin(AirflowPlugin): + name = "PGFacebookAdsPlugin" + hooks = [FacebookAdsHook] + operators = [FacebookAdsInsightsToS3Operator] + executors = [] + macros = [] + admin_views = [] + flask_blueprints = [] + menu_links = [] diff --git a/hooks/__init__.py b/hooks/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/hooks/facebook_ads_hook.py b/hooks/facebook_ads_hook.py new file mode 100644 index 0000000..e4cc39d --- /dev/null +++ b/hooks/facebook_ads_hook.py @@ -0,0 +1,48 @@ +from airflow.hooks.base_hook import BaseHook + +from urllib.parse import urlencode +import requests +import time + + +class FacebookAdsHook(BaseHook): + def __init__(self, facebook_ads_conn_id='facebook_ads_default'): + self.facebook_ads_conn_id = facebook_ads_conn_id + self.connection = self.get_connection(facebook_ads_conn_id) + + self.base_uri = 'https://graph.facebook.com' + self.api_version = self.connection.extra_dejson['apiVersion'] or '2.10' + self.access_token = self.connection.extra_dejson['accessToken'] or self.connection.password + + def get_insights_for_account_id(self, account_id, insight_fields, breakdowns, time_range, time_increment='all_days', level='ad', limit=100): + payload = urlencode({ + 'access_token': self.access_token, + 'breakdowns': ','.join(breakdowns), + 'fields': ','.join(insight_fields), + 'time_range': time_range, + 'time_increment': time_increment, + 'level': level, + 'limit': limit + }) + + response = requests.get('{base_uri}/v{api_version}/act_{account_id}/insights?{payload}'.format( + base_uri=self.base_uri, + api_version=self.api_version, + account_id=account_id, + payload=payload + )) + + response.raise_for_status() + response_body = response.json() + insights = [] + + while 'next' in response_body.get('paging', {}): + time.sleep(1) + insights.extend(response_body['data']) + response = requests.get(response_body['paging']['next']) + response.raise_for_status() + response_body = response.json() + + insights.extend(response_body['data']) + + return insights diff --git a/operators/__init__.py b/operators/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/operators/facebook_ads_to_s3_operator.py b/operators/facebook_ads_to_s3_operator.py new file mode 100644 index 0000000..32af91b --- /dev/null +++ b/operators/facebook_ads_to_s3_operator.py @@ -0,0 +1,98 @@ +from facebook_ads_plugin.hooks.facebook_ads_hook import FacebookAdsHook +from airflow.hooks.S3_hook import S3Hook +from airflow.models import BaseOperator + +import json +import os +from datetime import datetime + + +class FacebookAdsInsightsToS3Operator(BaseOperator): + """ + Facebook Ads Insights To S3 Operator + :param facebook_conn_id: The source facebook connection id. + :type s3_conn_id: string + :param s3_conn_id: The destination s3 connection id. + :type s3_conn_id: string + :param s3_bucket: The destination s3 bucket. + :type s3_bucket: string + :param s3_key: The destination s3 key. + :type s3_key: string + :param account_ids: An array of Facebook Ad Account Ids strings which + own campaigns, ad_sets, and ads. + :type account_ids: array + :param insight_fields: An array of insight field strings to get back from + the API. Defaults to an empty array. + :type insight_fields: array + :param breakdowns: An array of breakdown strings for which to group insights.abs + Defaults to an empty array. + :type breakdowns: array + :param since: A datetime representing the start time to get Facebook data. + Can use Airflow template for execution_date + :type since: datetime + :param until: A datetime representing the end time to get Facebook data. + Can use Airflow template for next_execution_date + :type until: datetime + :param time_increment: A string representing the time increment for which to get data, + described by the Facebook Ads API. Defaults to 'all_days'. + :type time_increment: string + :param level: A string representing the level for which to get Facebook Ads data, + can be campaign, ad_set, or ad level. Defaults to 'ad'. + :type level: string + :param limit: The number of records to fetch in each request. Defaults to 100. + :type limit: integer + """ + + template_fields = ('s3_key', 'since', 'until') + + def __init__(self, + facebook_conn_id, + s3_conn_id, + s3_bucket, + s3_key, + account_ids, + insight_fields, + breakdowns, + since, + until, + time_increment='all_days', + level='ad', + limit=100, + *args, + **kwargs): + super().__init__(*args, **kwargs) + + self.facebook_conn_id = facebook_conn_id + self.s3_conn_id = s3_conn_id + self.s3_bucket = s3_bucket + self.s3_key = s3_key + self.account_ids = account_ids + self.insight_fields = insight_fields + self.breakdowns = breakdowns + self.since = since + self.until = until + self.time_increment = time_increment + self.level = level + self.limit = limit + + def execute(self, context): + facebook_conn = FacebookAdsHook(self.facebook_conn_id) + s3_conn = S3Hook(self.s3_conn_id) + + time_range = { + 'since': datetime.strptime(self.since, '%Y-%m-%d %H:%M:%S').strftime('%Y-%m-%d'), + 'until': datetime.strptime(self.until, '%Y-%m-%d %H:%M:%S').strftime('%Y-%m-%d') + } + + file_name = '/tmp/{key}.jsonl'.format(key=self.s3_key) + with open(file_name, 'w') as insight_file: + for account_id in self.account_ids: + insights = facebook_conn.get_insights_for_account_id(account_id, self.insight_fields, self.breakdowns, time_range, self.time_increment, self.level, self.limit) + + if len(insights) > 0: + for insight in insights[:-1]: + insight_file.write(json.dumps(insight) + '\n') + insight_file.write(json.dumps(insights[-1:][0])) + + s3_conn.load_file(file_name, self.s3_key, self.s3_bucket, True) + os.remove(file_name)