From bf79ea3e396acb00bb335f67961aac4c32da5ac0 Mon Sep 17 00:00:00 2001 From: Tobias Cadee Date: Thu, 19 Sep 2024 14:56:38 +0200 Subject: [PATCH] feat: :sparkles: introduce report streams introducing report streams, for now without group by. --- .gitignore | 2 +- meltano.yml | 10 +- tap_apple_search_ads/client.py | 35 +++--- tap_apple_search_ads/schemas.py | 114 +++++++++++++++++++ tap_apple_search_ads/schemas/__init__.py | 1 - tap_apple_search_ads/streams.py | 133 ++++++++++++++--------- tap_apple_search_ads/tap.py | 19 +++- tests/test_core.py | 1 - 8 files changed, 234 insertions(+), 81 deletions(-) create mode 100644 tap_apple_search_ads/schemas.py delete mode 100644 tap_apple_search_ads/schemas/__init__.py diff --git a/.gitignore b/.gitignore index 475019c..e4770d5 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,6 @@ # Secrets and internal config files **/.secrets/* - +config.json # Ignore meltano internal cache and sqlite systemdb .meltano/ diff --git a/meltano.yml b/meltano.yml index 3761496..4840b3e 100644 --- a/meltano.yml +++ b/meltano.yml @@ -35,13 +35,15 @@ plugins: kind: string description: The organisation id in your apple search ads - # TODO: Declare required settings here: - settings_group_validation: - - [username, password] - # TODO: Declare default configuration values here: config: org_id: "1227480" + client_id: $TAP_APPLE_SEARCH_ADS_CLIENT_ID + client_secret: $TAP_APPLE_SEARCH_ADS_CLIENT_SECRET + + select: + - campaign_reports.* + - campaigns.* loaders: - name: target-jsonl diff --git a/tap_apple_search_ads/client.py b/tap_apple_search_ads/client.py index 2f5d285..025a805 100644 --- a/tap_apple_search_ads/client.py +++ b/tap_apple_search_ads/client.py @@ -2,39 +2,41 @@ from __future__ import annotations -import sys from functools import cached_property -from typing import TYPE_CHECKING, Any, Callable, Iterable +from typing import TYPE_CHECKING, Any, Callable import requests -from singer_sdk.helpers.jsonpath import extract_jsonpath -from singer_sdk.pagination import BaseOffsetPaginator # noqa: TCH002 +from singer_sdk.pagination import BaseOffsetPaginator from singer_sdk.streams import RESTStream from tap_apple_search_ads.auth import AppleSearchAdsAuthenticator -if sys.version_info >= (3, 9): - import importlib.resources as importlib_resources -else: - import importlib_resources - if TYPE_CHECKING: from singer_sdk.helpers.types import Context _Auth = Callable[[requests.PreparedRequest], requests.PreparedRequest] + class AppleSearchAdsPaginator(BaseOffsetPaginator): - def __init__(self, start_value: int, page_size: int, *args: Any, **kwargs: Any) -> None: - super().__init__(start_value, page_size, *args, **kwargs) + """Paginator for Apple Search Ads tap.""" def has_more(self, response: requests.Response) -> bool: + """Override this method to check if the endpoint has any pages left. + + Args: + response: API response object. + + Returns: + Boolean flag used to indicate if the endpoint has more pages. + """ pagination = response.json().get("pagination", {}) total_results = pagination.get("totalResults", 0) start_index = pagination.get("startIndex", 0) items_per_page = pagination.get("itemsPerPage", 0) return start_index + items_per_page < total_results + class AppleSearchAdsStream(RESTStream): """AppleSearchAds stream class.""" @@ -104,14 +106,3 @@ def get_url_params( params["field"] = "asc" params["order_by"] = self.replication_key return params - - def parse_response(self, response: requests.Response) -> Iterable[dict]: - """Parse the response and return an iterator of result records. - - Args: - response: The HTTP ``requests.Response`` object. - - Yields: - Each record from the source. - """ - yield from extract_jsonpath(self.records_jsonpath, input=response.json()) diff --git a/tap_apple_search_ads/schemas.py b/tap_apple_search_ads/schemas.py new file mode 100644 index 0000000..e677f7d --- /dev/null +++ b/tap_apple_search_ads/schemas.py @@ -0,0 +1,114 @@ +"""Schemas for streams.""" + +from singer_sdk.typing import ( # JSON Schema typing helpers + ArrayType, + BooleanType, + DateTimeType, + IntegerType, + NumberType, + ObjectType, + PropertiesList, + Property, + StringType, +) + +campaigns_schema = PropertiesList( + Property("id", IntegerType), + Property("orgId", IntegerType), + Property("name", StringType), + Property("name", StringType), + Property( + "budgetAmount", + ObjectType( + Property("amount", StringType), + Property("currency", StringType), + ), + ), + Property( + "dailyBudgetAmount", + ObjectType( + Property("amount", StringType), + Property("currency", StringType), + ), + ), + Property("adamId", IntegerType), + Property("paymentModel", StringType), + Property( + "locInvoiceDetails", + ObjectType( + Property("clientName", StringType), + Property("orderNumber", StringType), + Property("buyerName", StringType), + Property("buyerEmail", StringType), + Property("billingContactEmail", StringType), + ), + ), + Property("budgetOrders", ArrayType(IntegerType)), + Property("startTime", DateTimeType), + Property("endTime", DateTimeType), + Property("status", StringType), + Property("servingStatus", StringType), + Property("creationTime", DateTimeType), + Property("servingStateReasons", ArrayType(StringType)), + Property("modificationTime", DateTimeType), + Property("deleted", BooleanType), + Property("sapinLawResponse", StringType), + Property("countriesOrRegions", ArrayType(StringType)), + Property("countryOrRegionServingStateReasons", ObjectType()), + Property("supplySources", ArrayType(StringType)), + Property("adChannelType", StringType), + Property("billingEvent", StringType), + Property("displayStatus", StringType), +).to_dict() + +reports_schema = PropertiesList( + Property("impressions", IntegerType), + Property("taps", IntegerType), + Property("ttr", NumberType), + Property( + "avgCPT", + ObjectType( + Property("amount", StringType), + Property("currency", StringType), + ), + ), + Property( + "avgCPM", + ObjectType( + Property("amount", StringType), + Property("currency", StringType), + ), + ), + Property( + "localSpend", + ObjectType( + Property("amount", StringType), + Property("currency", StringType), + ), + ), + Property("totalInstalls", IntegerType), + Property("totalNewDownloads", IntegerType), + Property("totalRedownloads", IntegerType), + Property("viewInstalls", IntegerType), + Property("tapInstalls", IntegerType), + Property("tapNewDownloads", IntegerType), + Property("tapRedownloads", IntegerType), + Property("viewNewDownloads", IntegerType), + Property("viewRedownloads", IntegerType), + Property( + "totalAvgCPI", + ObjectType( + Property("amount", StringType), + Property("currency", StringType), + ), + ), + Property("totalInstallRate", NumberType), + Property( + "tapInstallCPI", + ObjectType( + Property("amount", StringType), + Property("currency", StringType), + ), + ), + Property("tapInstallRate", NumberType), +).to_dict() diff --git a/tap_apple_search_ads/schemas/__init__.py b/tap_apple_search_ads/schemas/__init__.py deleted file mode 100644 index 06c0a19..0000000 --- a/tap_apple_search_ads/schemas/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""JSON schema files for the REST API.""" diff --git a/tap_apple_search_ads/streams.py b/tap_apple_search_ads/streams.py index ec4c02e..7e7f701 100644 --- a/tap_apple_search_ads/streams.py +++ b/tap_apple_search_ads/streams.py @@ -2,17 +2,18 @@ from __future__ import annotations -import sys import typing as t - -from singer_sdk.typing import PropertiesList, Property, DateTimeType, StringType, BooleanType, IntegerType, ObjectType, ArrayType # JSON Schema typing helpers +from datetime import datetime, timezone from tap_apple_search_ads.client import AppleSearchAdsStream -if sys.version_info >= (3, 9): - import importlib.resources as importlib_resources -else: - import importlib_resources +from .schemas import campaigns_schema, reports_schema + +if t.TYPE_CHECKING: + from singer_sdk.helpers.types import Context, Record + +_TToken = t.TypeVar("_TToken") + class CampaignsStream(AppleSearchAdsStream): """Define custom stream.""" @@ -20,47 +21,79 @@ class CampaignsStream(AppleSearchAdsStream): name = "campaigns" path = "/campaigns" primary_keys: t.ClassVar[list[str]] = ["id"] - replication_key = None - - schema = PropertiesList( - Property("id", IntegerType), - Property("orgId", IntegerType), - Property("name", StringType), - Property("name", StringType), - Property("budgetAmount", ObjectType( - Property("amount", StringType), - Property("currency", StringType), - ), - ), - Property("dailyBudgetAmount", ObjectType( - Property("amount", StringType), - Property("currency", StringType), - ), - ), - Property("adamId", IntegerType), - Property("paymentModel", StringType), - Property("locInvoiceDetails", ObjectType( - Property("clientName", StringType), - Property("orderNumber", StringType), - Property("buyerName", StringType), - Property("buyerEmail", StringType), - Property("billingContactEmail", StringType), + schema = campaigns_schema + + +class ReportStream(AppleSearchAdsStream): + """Base class for report streams. + + For now report streams only return totals and not grouped by. + """ + + rest_method = "POST" + records_jsonpath = "$.data.reportingDataResponse.row[*]" + + @property + def schema(self) -> dict: + """Return schema with primary key added.""" + schema = reports_schema + schema["properties"][self.primary_keys[0]] = { + "type": ["integer", "null"], + } + return schema + + def prepare_request_payload( + self, + context: Context | None, # noqa: ARG002 + next_page_token: _TToken | None, # noqa: ARG002 + ) -> dict | None: + """Prepare the data payload for the REST API request. + + Args: + context: Stream partition or context dictionary. + next_page_token: Token, page number or any request argument to request the + next page of data. + """ + return { + "startTime": self.config.get("start_date", "2016-01-01"), + "endTime": self.config.get( + "start_date", + datetime.now(tz=timezone.utc).strftime("%Y-%m-%d"), ), - ), - Property("budgetOrders", ArrayType(IntegerType)), - Property("startTime", DateTimeType), - Property("endTime", DateTimeType), - Property("status", StringType), - Property("servingStatus", StringType), - Property("creationTime", DateTimeType), - Property("servingStateReasons", ArrayType(StringType)), - Property("modificationTime", DateTimeType), - Property("deleted", BooleanType), - Property("sapinLawResponse", StringType), - Property("countriesOrRegions", ArrayType(StringType)), - Property("countryOrRegionServingStateReasons", ObjectType()), - Property("supplySources", ArrayType(StringType)), - Property("adChannelType", StringType), - Property("billingEvent", StringType), - Property("displayStatus", StringType), - ).to_dict() \ No newline at end of file + "selector": { + "orderBy": [{"field": self.primary_keys[0], "sortOrder": "ASCENDING"}], + "pagination": {"offset": 0, "limit": 1000}, + }, + "timeZone": "UTC", + "returnRecordsWithNoMetrics": True, + "returnRowTotals": True, + "returnGrandTotals": True, + } + + def post_process( + self, + row: Record, + context: Context | None = None, # noqa: ARG002 + ) -> dict | None: + """As needed, append or transform raw data to match expected structure. + + Args: + row: Individual record in the stream. + context: Stream partition or context dictionary. + + Returns: + The resulting record dict, or `None` if the record should be excluded. + """ + new_row = dict(row["total"].items()) + new_row[self.primary_keys[0]] = row["metadata"][self.primary_keys[0]] + return new_row + + +class CampaignReportsStream(ReportStream): + """Campaign reports stream.""" + + path = "/reports/campaigns" + primary_keys: t.ClassVar[list[str]] = [ + "campaignId", + ] # make sure this is just one key for report streams. + name = "campaign_reports" diff --git a/tap_apple_search_ads/tap.py b/tap_apple_search_ads/tap.py index 0f93098..9422626 100644 --- a/tap_apple_search_ads/tap.py +++ b/tap_apple_search_ads/tap.py @@ -3,9 +3,13 @@ from __future__ import annotations from singer_sdk import Tap -from singer_sdk.typing import PropertiesList, Property, StringType, DateTimeType # JSON schema typing helpers +from singer_sdk.typing import ( + DateType, + PropertiesList, + Property, + StringType, +) # JSON schema typing helpers -# TODO: Import your custom stream types here: from tap_apple_search_ads import streams @@ -35,6 +39,16 @@ class TapAppleSearchAds(Tap): required=True, description="The organisation id in your apple search ads.", ), + Property( + "start_date", + DateType, + description="Start date for reporting streams, format in YYYY-MM-DD.", + ), + Property( + "end_date", + DateType, + description="End date for reporting streams, format in YYYY-MM-DD.", + ), ).to_dict() def discover_streams(self) -> list[streams.AppleSearchAdsStream]: @@ -45,6 +59,7 @@ def discover_streams(self) -> list[streams.AppleSearchAdsStream]: """ return [ streams.CampaignsStream(self), + streams.CampaignReportsStream(self), ] diff --git a/tests/test_core.py b/tests/test_core.py index 447e62f..9e5fecc 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -8,7 +8,6 @@ SAMPLE_CONFIG = { "start_date": datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%d"), - # TODO: Initialize minimal tap config }