Skip to content

Commit

Permalink
Merge branch 'staging'
Browse files Browse the repository at this point in the history
  • Loading branch information
RuslanBergenov committed Nov 9, 2024
2 parents ad406c9 + cdb22d1 commit 6faf287
Show file tree
Hide file tree
Showing 16 changed files with 914 additions and 337 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ jobs:
# check for the config file
gsutil stat gs://ga4-flattener-deployment-464892960897-adswerve-ga-flat-config/config_datasets.json || echo "ERROR: GCS config file not found"
- name: Test with pytest
# - name: Test with pytest
run: |
# test
pytest --verbose
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pycache/
.pycache

#Python virtual environment
venv_ga_flattener
venv/
env/
local/
Expand Down
224 changes: 174 additions & 50 deletions cf/main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#TODO: (optional for now) - intraday
#TODO: (optional for now) - remaining unit tests other than the most important ones
#TODO: (optional for now) - intraday flattening
import base64
import json
from google.cloud import bigquery
Expand Down Expand Up @@ -39,7 +38,7 @@ def __init__(self, event):
logging.critical(f"flattener configuration error: {e}")

def extract_values(self, string):
pattern = re.compile(r'projects\/(.*?)\/datasets\/(.*?)\/tables\/(events|pseudonymous_users.*?)_(20\d\d\d\d\d\d)$')
pattern = re.compile(r'projects\/(.*?)\/datasets\/(.*?)\/tables\/(events|pseudonymous_users|users.*?)_(20\d\d\d\d\d\d)$')
match = pattern.search(string)
if match:
project, dataset, table_type, shard = match.groups()
Expand All @@ -61,13 +60,19 @@ def flatten_nested_tables(self):
tables = list(set(tables_config) & set(["pseudo_users",
"pseudo_user_properties",
"pseudo_user_audiences"]))
else:
elif self.table_type == "events":

tables = list(set(tables_config) & set(["events",
"event_params",
"user_properties",
"items"]))

elif self.table_type == "users":

tables = list(set(tables_config) & set(["users",
"users_user_properties",
"users_user_audiences"]))

return tables

def get_output_configuration(self):
Expand All @@ -92,9 +97,9 @@ def __init__(self, gcp_project, dataset, table_type, date_shard):
self.table_type = table_type
self.source_table_type = "'intraday'" if self.source_table_is_intraday() else "'daily'"

if self.table_type == "pseudonymous_users":
if self.table_type == "pseudonymous_users" or self.table_type=="users":
self.date_field_name = "`date`"
else:
elif self.table_type == "events" or self.table_type == "events_intraday":
self.date_field_name = "event_date"

def source_table_is_intraday(self):
Expand All @@ -118,46 +123,67 @@ def source_table_exists(self, table_type_param):

def get_temp_table_query(self):
"""
build unique event id
build unique primary key
"""
qry = f"""
CREATE OR REPLACE TEMP TABLE temp_events AS (
SELECT
CONCAT(
stream_id,
"_",
IFNULL(user_pseudo_id, ""),
"_",
event_name,
"_",
event_timestamp,
"_",
IFNULL(CAST(batch_event_index AS STRING), ""),
"_",
IFNULL(CAST(batch_page_id AS STRING), ""),
"_",
IFNULL(CAST(batch_ordering_id AS STRING), ""),
"_",

qry = ""

if self.table_type in ("events", "events_intraday" ):

qry = f"""
CREATE OR REPLACE TEMP TABLE temp_events AS (
SELECT
CONCAT(
stream_id,
"_",
IFNULL(user_pseudo_id, ""),
"_",
event_name,
"_",
event_timestamp,
"_",
IFNULL(CAST(batch_event_index AS STRING), ""),
"_",
IFNULL(CAST(batch_page_id AS STRING), ""),
"_",
IFNULL(CAST(batch_ordering_id AS STRING), ""),
"_",
ROW_NUMBER() OVER (PARTITION BY CONCAT(
stream_id,
IFNULL(user_pseudo_id, ""),
event_name,
event_timestamp,
IFNULL(CAST(batch_event_index AS STRING), ""),
IFNULL(CAST(batch_page_id AS STRING), ""),
IFNULL(CAST(batch_ordering_id AS STRING), "")
))) event_id,
ROW_NUMBER() OVER (PARTITION BY CONCAT(
stream_id,
IFNULL(user_pseudo_id, ""),
event_name,
event_timestamp,
IFNULL(CAST(batch_event_index AS STRING), ""),
IFNULL(CAST(batch_page_id AS STRING), ""),
IFNULL(CAST(batch_ordering_id AS STRING), "")
))) event_id,
*
FROM
`{self.gcp_project}.{self.dataset}.{self.table_type}_*`
WHERE _TABLE_SUFFIX = "{self.date_shard}"
)
;
"""
*
FROM
`{self.gcp_project}.{self.dataset}.{self.table_type}_*`
WHERE _TABLE_SUFFIX = "{self.date_shard}"
)
;
"""
elif self.table_type == "users":
qry = f"""
CREATE OR REPLACE TEMP TABLE temp_users AS (
SELECT
GENERATE_UUID() row_id,
PARSE_DATE('%Y%m%d', _TABLE_SUFFIX) `date`,
*
FROM
`{self.gcp_project}.{self.dataset}.{self.table_type}_*`
WHERE _TABLE_SUFFIX = "{self.date_shard}"
)
;
"""
elif self.table_type == "pseudonymous_users":
qry = ""
return qry


def get_events_query_select_statement(self):
qry = f"""SELECT
PARSE_DATE('%Y%m%d', event_date) AS event_date,
Expand Down Expand Up @@ -448,6 +474,88 @@ def get_pseudo_user_audiences_select_statement(self):

return qry


def get_users_select_statement(self):
qry = f"""
SELECT
row_id,
`date`,
user_id,
user_info.last_active_timestamp_micros as user_info_last_active_timestamp_micros,
user_info.user_first_touch_timestamp_micros as user_info_user_first_touch_timestamp_micros,
user_info.first_purchase_date as user_info_first_purchase_date,
device.operating_system as device_operating_system,
device.category as device_category,
device.mobile_brand_name as device_mobile_brand_name,
device.mobile_model_name as device_mobile_model_name,
device.unified_screen_name as device_unified_screen_name,
geo.city as geo_city,
geo.country as geo_country,
geo.continent as geo_continent,
geo.region as geo_region,
user_ltv.revenue_in_usd as user_ltv_revenue_in_usd,
user_ltv.sessions as user_ltv_sessions,
user_ltv.engagement_time_millis as user_ltv_engagement_time_millis,
user_ltv.purchases as user_ltv_purchases,
user_ltv.engaged_sessions as user_ltv_engaged_sessions,
user_ltv.session_duration_micros as user_ltv_session_duration_micros,
predictions.in_app_purchase_score_7d as predictions_in_app_purchase_score_7d,
predictions.purchase_score_7d as predictions_purchase_score_7d,
predictions.churn_score_7d as predictions_churn_score_7d,
predictions.revenue_28d_in_usd as predictions_revenue_28d_in_usd,
privacy_info.is_limited_ad_tracking as privacy_info_is_limited_ad_tracking,
privacy_info.is_ads_personalization_allowed as privacy_info_is_ads_personalization_allowed,
occurrence_date,
last_updated_date
FROM
temp_users
;
"""
return qry

def get_users_user_properties_select_statement(self):
qry = f"""
SELECT
row_id,
`date`,
user_id,
up.key user_property_key,
up.value.string_value user_property_value,
up.value.set_timestamp_micros user_property_set_timestamp_micros,
up.value.user_property_name
FROM
temp_users
,UNNEST(user_properties) up
;
"""
return qry

def get_users_user_audiences_select_statement(self):
qry = f"""
SELECT
row_id,
`date`,
user_id,
a.id audience_id,
a.name audience_name,
a.membership_start_timestamp_micros audience_membership_start_timestamp_micros,
a.membership_expiry_timestamp_micros audience_membership_expiry_timestamp_micros,
a.npa audience_npa
FROM
temp_users
,UNNEST(audiences) a
;
"""
return qry

def get_flat_table_update_query(self, select_statement, flat_table, sharded_output_required=True, partitioned_output_required=False):

assert flat_table in ["flat_events",
Expand All @@ -456,7 +564,10 @@ def get_flat_table_update_query(self, select_statement, flat_table, sharded_outp
"flat_items",
"flat_pseudo_users",
"flat_pseudo_user_properties",
"flat_pseudo_user_audiences"]
"flat_pseudo_user_audiences",
"flat_users",
"flat_users_user_properties",
"flat_users_user_audiences"]

assert "flat" in flat_table

Expand Down Expand Up @@ -497,7 +608,11 @@ def get_select_statement(self, flat_table):
"flat_items",
"flat_pseudo_users",
"flat_pseudo_user_properties",
"flat_pseudo_user_audiences"]
"flat_pseudo_user_audiences",
"flat_users",
"flat_users_user_properties",
"flat_users_user_audiences"
]

query = ""

Expand All @@ -522,6 +637,15 @@ def get_select_statement(self, flat_table):
elif flat_table == "flat_pseudo_user_audiences":
query += self.get_pseudo_user_audiences_select_statement()

elif flat_table == "flat_users":
query += self.get_users_select_statement()

elif flat_table == "flat_users_user_properties":
query += self.get_users_user_properties_select_statement()

elif flat_table == "flat_users_user_audiences":
query += self.get_users_user_audiences_select_statement()

return query

def build_full_query(self, sharded_output_required=True, partitioned_output_required=False,
Expand All @@ -531,16 +655,16 @@ def build_full_query(self, sharded_output_required=True, partitioned_output_requ
"flat_items",
"flat_pseudo_users",
"flat_pseudo_user_properties",
"flat_pseudo_user_audiences"]):
"flat_pseudo_user_audiences",
"flat_users",
"flat_users_user_properties",
"flat_users_user_audiences"
]):

assert len(list_of_flat_tables) >= 1, "At least 1 flat table needs to be included in the config file"
query = ""


if ("flat_events" in list_of_flat_tables
or "flat_event_params" in list_of_flat_tables
or "flat_user_properties" in list_of_flat_tables
or "flat_items" in list_of_flat_tables):
if self.table_type in ("events", "events_intraday", "users"):

query += self.get_temp_table_query()

Expand Down
3 changes: 3 additions & 0 deletions cfconfigbuilder/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ def get_ga_datasets(self):
, os.environ["PSEUDO_USERS"]
, os.environ["PSEUDO_USER_PROPERTIES"]
, os.environ["PSEUDO_USER_AUDIENCES"]
, os.environ["USERS"]
, os.environ["USERS_USER_PROPERTIES"]
, os.environ["USERS_USER_AUDIENCES"]
]
return ret_val

Expand Down
4 changes: 3 additions & 1 deletion dm_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ def __init__(self, context_environment_vars):
protoPayload.authenticationInfo.principalEmail="[email protected]"
severity: "NOTICE"
NOT "events_intraday_"
NOT "tables/users_"
'''
self.FILTER_INTRADAY = '''
resource.type="bigquery_resource"
Expand Down Expand Up @@ -100,6 +99,9 @@ def __init__(self, context_environment_vars):
"PSEUDO_USERS": "pseudo_users",
"PSEUDO_USER_PROPERTIES": "pseudo_user_properties",
"PSEUDO_USER_AUDIENCES": "pseudo_user_audiences",
"USERS": "users",
"USERS_USER_PROPERTIES": "users_user_properties",
"USERS_USER_AUDIENCES": "users_user_audiences",
"LOCATION_ID": "us-central1",
"TOPIC_NAME": self.get_topic_id()
}
Expand Down
Loading

0 comments on commit 6faf287

Please sign in to comment.