Skip to content

Commit

Permalink
feat: ✨ introduce report streams
Browse files Browse the repository at this point in the history
introducing report streams, for now without group by.
  • Loading branch information
tobiascadee committed Sep 19, 2024
1 parent 88c2e2d commit d1d3f3d
Show file tree
Hide file tree
Showing 8 changed files with 235 additions and 82 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Secrets and internal config files
**/.secrets/*

config.json
# Ignore meltano internal cache and sqlite systemdb

.meltano/
Expand Down
10 changes: 6 additions & 4 deletions meltano.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,15 @@ plugins:
kind: string
description: The organisation id in your apple search ads

# TODO: Declare required settings here:
settings_group_validation:
- [username, password]

# TODO: Declare default configuration values here:
config:
org_id: "1227480"
client_id: $TAP_APPLE_SEARCH_ADS_CLIENT_ID
client_secret: $TAP_APPLE_SEARCH_ADS_CLIENT_SECRET

select:
- campaign_reports.*
- campaigns.*

loaders:
- name: target-jsonl
Expand Down
37 changes: 14 additions & 23 deletions tap_apple_search_ads/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,41 @@

from __future__ import annotations

import sys
from functools import cached_property
from typing import TYPE_CHECKING, Any, Callable, Iterable
from typing import TYPE_CHECKING, Any, Callable

import requests
from singer_sdk.helpers.jsonpath import extract_jsonpath
from singer_sdk.pagination import BaseOffsetPaginator # noqa: TCH002
from singer_sdk.pagination import BaseOffsetPaginator
from singer_sdk.streams import RESTStream

from tap_apple_search_ads.auth import AppleSearchAdsAuthenticator

if sys.version_info >= (3, 9):
import importlib.resources as importlib_resources
else:
import importlib_resources

if TYPE_CHECKING:
from singer_sdk.helpers.types import Context


_Auth = Callable[[requests.PreparedRequest], requests.PreparedRequest]


class AppleSearchAdsPaginator(BaseOffsetPaginator):
def __init__(self, start_value: int, page_size: int, *args: Any, **kwargs: Any) -> None:
super().__init__(start_value, page_size, *args, **kwargs)
"""Paginator for Apple Search Ads tap."""

def has_more(self, response: requests.Response) -> bool:
"""Override this method to check if the endpoint has any pages left.
Args:
response: API response object.
Returns:
Boolean flag used to indicate if the endpoint has more pages.
"""
pagination = response.json().get("pagination", {})
total_results = pagination.get("totalResults", 0)
start_index = pagination.get("startIndex", 0)
items_per_page = pagination.get("itemsPerPage", 0)
return start_index + items_per_page < total_results


class AppleSearchAdsStream(RESTStream):
"""AppleSearchAds stream class."""

Expand Down Expand Up @@ -81,7 +83,7 @@ def get_new_paginator(self) -> BaseOffsetPaginator:
Returns:
A pagination helper instance.
"""
return AppleSearchAdsPaginator(0, 1000)
return AppleSearchAdsPaginator(0, 4)

def get_url_params(
self,
Expand All @@ -104,14 +106,3 @@ def get_url_params(
params["field"] = "asc"
params["order_by"] = self.replication_key
return params

def parse_response(self, response: requests.Response) -> Iterable[dict]:
"""Parse the response and return an iterator of result records.
Args:
response: The HTTP ``requests.Response`` object.
Yields:
Each record from the source.
"""
yield from extract_jsonpath(self.records_jsonpath, input=response.json())
114 changes: 114 additions & 0 deletions tap_apple_search_ads/schemas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
"""Schemas for streams."""

from singer_sdk.typing import ( # JSON Schema typing helpers
ArrayType,
BooleanType,
DateTimeType,
IntegerType,
NumberType,
ObjectType,
PropertiesList,
Property,
StringType,
)

campaigns_schema = PropertiesList(
Property("id", IntegerType),
Property("orgId", IntegerType),
Property("name", StringType),
Property("name", StringType),
Property(
"budgetAmount",
ObjectType(
Property("amount", StringType),
Property("currency", StringType),
),
),
Property(
"dailyBudgetAmount",
ObjectType(
Property("amount", StringType),
Property("currency", StringType),
),
),
Property("adamId", IntegerType),
Property("paymentModel", StringType),
Property(
"locInvoiceDetails",
ObjectType(
Property("clientName", StringType),
Property("orderNumber", StringType),
Property("buyerName", StringType),
Property("buyerEmail", StringType),
Property("billingContactEmail", StringType),
),
),
Property("budgetOrders", ArrayType(IntegerType)),
Property("startTime", DateTimeType),
Property("endTime", DateTimeType),
Property("status", StringType),
Property("servingStatus", StringType),
Property("creationTime", DateTimeType),
Property("servingStateReasons", ArrayType(StringType)),
Property("modificationTime", DateTimeType),
Property("deleted", BooleanType),
Property("sapinLawResponse", StringType),
Property("countriesOrRegions", ArrayType(StringType)),
Property("countryOrRegionServingStateReasons", ObjectType()),
Property("supplySources", ArrayType(StringType)),
Property("adChannelType", StringType),
Property("billingEvent", StringType),
Property("displayStatus", StringType),
).to_dict()

reports_schema = PropertiesList(
Property("impressions", IntegerType),
Property("taps", IntegerType),
Property("ttr", NumberType),
Property(
"avgCPT",
ObjectType(
Property("amount", StringType),
Property("currency", StringType),
),
),
Property(
"avgCPM",
ObjectType(
Property("amount", StringType),
Property("currency", StringType),
),
),
Property(
"localSpend",
ObjectType(
Property("amount", StringType),
Property("currency", StringType),
),
),
Property("totalInstalls", IntegerType),
Property("totalNewDownloads", IntegerType),
Property("totalRedownloads", IntegerType),
Property("viewInstalls", IntegerType),
Property("tapInstalls", IntegerType),
Property("tapNewDownloads", IntegerType),
Property("tapRedownloads", IntegerType),
Property("viewNewDownloads", IntegerType),
Property("viewRedownloads", IntegerType),
Property(
"totalAvgCPI",
ObjectType(
Property("amount", StringType),
Property("currency", StringType),
),
),
Property("totalInstallRate", NumberType),
Property(
"tapInstallCPI",
ObjectType(
Property("amount", StringType),
Property("currency", StringType),
),
),
Property("tapInstallRate", NumberType),
).to_dict()
1 change: 0 additions & 1 deletion tap_apple_search_ads/schemas/__init__.py

This file was deleted.

133 changes: 83 additions & 50 deletions tap_apple_search_ads/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,65 +2,98 @@

from __future__ import annotations

import sys
import typing as t

from singer_sdk.typing import PropertiesList, Property, DateTimeType, StringType, BooleanType, IntegerType, ObjectType, ArrayType # JSON Schema typing helpers
from datetime import datetime, timezone

from tap_apple_search_ads.client import AppleSearchAdsStream

if sys.version_info >= (3, 9):
import importlib.resources as importlib_resources
else:
import importlib_resources
from .schemas import campaigns_schema, reports_schema

if t.TYPE_CHECKING:
from singer_sdk.helpers.types import Context, Record

_TToken = t.TypeVar("_TToken")


class CampaignsStream(AppleSearchAdsStream):
"""Define custom stream."""

name = "campaigns"
path = "/campaigns"
primary_keys: t.ClassVar[list[str]] = ["id"]
replication_key = None

schema = PropertiesList(
Property("id", IntegerType),
Property("orgId", IntegerType),
Property("name", StringType),
Property("name", StringType),
Property("budgetAmount", ObjectType(
Property("amount", StringType),
Property("currency", StringType),
),
),
Property("dailyBudgetAmount", ObjectType(
Property("amount", StringType),
Property("currency", StringType),
),
),
Property("adamId", IntegerType),
Property("paymentModel", StringType),
Property("locInvoiceDetails", ObjectType(
Property("clientName", StringType),
Property("orderNumber", StringType),
Property("buyerName", StringType),
Property("buyerEmail", StringType),
Property("billingContactEmail", StringType),
schema = campaigns_schema


class ReportStream(AppleSearchAdsStream):
"""Base class for report streams.
For now report streams only return totals and not grouped by.
"""

rest_method = "POST"
records_jsonpath = "$.data.reportingDataResponse.row[*]"

@property
def schema(self) -> dict:
"""Return schema with primary key added."""
schema = reports_schema
schema["properties"][self.primary_keys[0]] = {
"type": ["integer", "null"],
}
return schema

def prepare_request_payload(
self,
context: Context | None, # noqa: ARG002
next_page_token: _TToken | None, # noqa: ARG002
) -> dict | None:
"""Prepare the data payload for the REST API request.
Args:
context: Stream partition or context dictionary.
next_page_token: Token, page number or any request argument to request the
next page of data.
"""
return {
"startTime": self.config.get("start_date", "2016-01-01"),
"endTime": self.config.get(
"start_date",
datetime.now(tz=timezone.utc).strftime("%Y-%m-%d"),
),
),
Property("budgetOrders", ArrayType(IntegerType)),
Property("startTime", DateTimeType),
Property("endTime", DateTimeType),
Property("status", StringType),
Property("servingStatus", StringType),
Property("creationTime", DateTimeType),
Property("servingStateReasons", ArrayType(StringType)),
Property("modificationTime", DateTimeType),
Property("deleted", BooleanType),
Property("sapinLawResponse", StringType),
Property("countriesOrRegions", ArrayType(StringType)),
Property("countryOrRegionServingStateReasons", ObjectType()),
Property("supplySources", ArrayType(StringType)),
Property("adChannelType", StringType),
Property("billingEvent", StringType),
Property("displayStatus", StringType),
).to_dict()
"selector": {
"orderBy": [{"field": self.primary_keys[0], "sortOrder": "ASCENDING"}],
"pagination": {"offset": 0, "limit": 1000},
},
"timeZone": "UTC",
"returnRecordsWithNoMetrics": True,
"returnRowTotals": True,
"returnGrandTotals": True,
}

def post_process(
self,
row: Record,
context: Context | None = None, # noqa: ARG002
) -> dict | None:
"""As needed, append or transform raw data to match expected structure.
Args:
row: Individual record in the stream.
context: Stream partition or context dictionary.
Returns:
The resulting record dict, or `None` if the record should be excluded.
"""
new_row = dict(row["total"].items())
new_row[self.primary_keys[0]] = row["metadata"][self.primary_keys[0]]
return new_row


class CampaignReportsStream(ReportStream):
"""Campaign reports stream."""

path = "/reports/campaigns"
primary_keys: t.ClassVar[list[str]] = [
"campaignId",
] # make sure this is just one key for report streams.
name = "campaign_reports"
Loading

0 comments on commit d1d3f3d

Please sign in to comment.