From be1556f36720213686736ca75ff6f6e0543bc841 Mon Sep 17 00:00:00 2001 From: RuslanBergenov Date: Sat, 9 Nov 2024 10:32:43 -0700 Subject: [PATCH 01/23] refactor: flat tables clean up in the test project and dataset --- tools/bigquery_delete_all_flat_tables.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/bigquery_delete_all_flat_tables.py b/tools/bigquery_delete_all_flat_tables.py index 28b88f6..d8ec54c 100644 --- a/tools/bigquery_delete_all_flat_tables.py +++ b/tools/bigquery_delete_all_flat_tables.py @@ -14,7 +14,7 @@ ''' Configuration Section Start ''' '''*****************************''' my_project_id = "as-dev-ga4-flattener-320623" -my_dataset_id = 'analytics_222460912' +my_dataset_id = 'analytics_123456789' table_prefix = "flat_" # will perform the deletion only if the table has the desired prefix delete = True '''*****************************''' From e60b21f64332cb99cb14bbe7c3708cef01063326 Mon Sep 17 00:00:00 2001 From: RuslanBergenov Date: Sat, 9 Nov 2024 10:32:57 -0700 Subject: [PATCH 02/23] refactor: .gitignore --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 5886f95..0608c10 100644 --- a/.gitignore +++ b/.gitignore @@ -10,6 +10,7 @@ pycache/ .pycache #Python virtual environment +venv_ga_flattener venv/ env/ local/ From daf794944850988a8347dd7352c8b186859e9e2f Mon Sep 17 00:00:00 2001 From: RuslanBergenov Date: Sat, 9 Nov 2024 10:41:33 -0700 Subject: [PATCH 03/23] feat: log sink catches users table creation logs --- dm_helper.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dm_helper.py b/dm_helper.py index 6c2fc9b..ce00f64 100644 --- a/dm_helper.py +++ b/dm_helper.py @@ -57,7 +57,6 @@ def __init__(self, context_environment_vars): protoPayload.authenticationInfo.principalEmail="firebase-measurement@system.gserviceaccount.com" severity: "NOTICE" NOT "events_intraday_" - NOT "tables/users_" ''' self.FILTER_INTRADAY = ''' resource.type="bigquery_resource" @@ -101,7 +100,7 @@ def __init__(self, context_environment_vars): "PSEUDO_USER_PROPERTIES": "pseudo_user_properties", "PSEUDO_USER_AUDIENCES": "pseudo_user_audiences", "LOCATION_ID": "us-central1", - "TOPIC_NAME": self.get_topic_id() + "TOPIC_NAME": self.get_topic_id() #TODO: add users } def get_project(self): From e6890315254e4fa86a056358304b4cc98ffc89ad Mon Sep 17 00:00:00 2001 From: RuslanBergenov Date: Sat, 9 Nov 2024 10:52:20 -0700 Subject: [PATCH 04/23] test: sample desired queries --- tests/rsc/sample_desired_queries.py | 75 +++++++++++++++++++++++++++++ 1 file changed, 75 insertions(+) diff --git a/tests/rsc/sample_desired_queries.py b/tests/rsc/sample_desired_queries.py index 3a595f1..d18114f 100644 --- a/tests/rsc/sample_desired_queries.py +++ b/tests/rsc/sample_desired_queries.py @@ -271,3 +271,78 @@ WHERE _TABLE_SUFFIX = "date_shard" ; """ + +sample_users_query = """ +SELECT + row_id, + PARSE_DATE('%Y%m%d', _TABLE_SUFFIX) `date`, + + user_id, + + user_info.last_active_timestamp_micros as user_info_last_active_timestamp_micros, + user_info.user_first_touch_timestamp_micros as user_info_user_first_touch_timestamp_micros, + user_info.first_purchase_date as user_info_first_purchase_date, + + device.operating_system as device_operating_system, + device.category as device_category, + device.mobile_brand_name as device_mobile_brand_name, + device.mobile_model_name as device_mobile_model_name, + device.unified_screen_name as device_unified_screen_name, + + geo.city as geo_city, + geo.country as geo_country, + geo.continent as geo_continent, + geo.region as geo_region, + + user_ltv.revenue_in_usd as user_ltv_revenue_in_usd, + user_ltv.sessions as user_ltv_sessions, + user_ltv.engagement_time_millis as user_ltv_engagement_time_millis, + user_ltv.purchases as user_ltv_purchases, + user_ltv.engaged_sessions as user_ltv_engaged_sessions, + user_ltv.session_duration_micros as user_ltv_session_duration_micros, + + predictions.in_app_purchase_score_7d as predictions_in_app_purchase_score_7d, + predictions.purchase_score_7d as predictions_purchase_score_7d, + predictions.churn_score_7d as predictions_churn_score_7d, + predictions.revenue_28d_in_usd as predictions_revenue_28d_in_usd, + + privacy_info.is_limited_ad_tracking as privacy_info_is_limited_ad_tracking, + privacy_info.is_ads_personalization_allowed as privacy_info_is_ads_personalization_allowed, + + occurrence_date, + last_updated_date +FROM + temp_users + ; +""" + +sample_users_user_properties_query = """ +SELECT + row_id, + PARSE_DATE('%Y%m%d', _TABLE_SUFFIX) `date`, + user_id, + up.key user_property_key, + up.value.string_value user_property_value, + up.value.set_timestamp_micros user_property_set_timestamp_micros, + up.value.user_property_name +FROM + temp_users + ,UNNEST(user_properties) up + ; +""" + +sample_users_user_audiences_query = """ +SELECT + row_id, + PARSE_DATE('%Y%m%d', _TABLE_SUFFIX) `date`, + user_id, + a.id audience_id, + a.name audience_name, + a.membership_start_timestamp_micros audience_membership_start_timestamp_micros, + a.membership_expiry_timestamp_micros audience_membership_expiry_timestamp_micros, + a.npa audience_npa +FROM + temp_users + ,UNNEST(audiences) a + ; +""" From 449379eaa3d9ff82bc9be38bf477daf988e25660 Mon Sep 17 00:00:00 2001 From: RuslanBergenov Date: Sat, 9 Nov 2024 11:18:13 -0700 Subject: [PATCH 05/23] feat: flatten users --- cf/main.py | 221 +++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 173 insertions(+), 48 deletions(-) diff --git a/cf/main.py b/cf/main.py index 9ff2a0d..ce497cb 100644 --- a/cf/main.py +++ b/cf/main.py @@ -39,7 +39,7 @@ def __init__(self, event): logging.critical(f"flattener configuration error: {e}") def extract_values(self, string): - pattern = re.compile(r'projects\/(.*?)\/datasets\/(.*?)\/tables\/(events|pseudonymous_users.*?)_(20\d\d\d\d\d\d)$') + pattern = re.compile(r'projects\/(.*?)\/datasets\/(.*?)\/tables\/(events|pseudonymous_users|users.*?)_(20\d\d\d\d\d\d)$') match = pattern.search(string) if match: project, dataset, table_type, shard = match.groups() @@ -61,13 +61,19 @@ def flatten_nested_tables(self): tables = list(set(tables_config) & set(["pseudo_users", "pseudo_user_properties", "pseudo_user_audiences"])) - else: + elif self.table_type == "events": tables = list(set(tables_config) & set(["events", "event_params", "user_properties", "items"])) + elif self.table_type == "users": + + tables = list(set(tables_config) & set(["users", + "users_user_properties", + "users_audiences"])) + return tables def get_output_configuration(self): @@ -92,9 +98,9 @@ def __init__(self, gcp_project, dataset, table_type, date_shard): self.table_type = table_type self.source_table_type = "'intraday'" if self.source_table_is_intraday() else "'daily'" - if self.table_type == "pseudonymous_users": + if self.table_type == "pseudonymous_users" or self.table_type=="users": self.date_field_name = "`date`" - else: + elif self.table_type == "events": self.date_field_name = "event_date" def source_table_is_intraday(self): @@ -118,46 +124,67 @@ def source_table_exists(self, table_type_param): def get_temp_table_query(self): """ - build unique event id + build unique primary key """ - qry = f""" - CREATE OR REPLACE TEMP TABLE temp_events AS ( - SELECT - CONCAT( - stream_id, - "_", - IFNULL(user_pseudo_id, ""), - "_", - event_name, - "_", - event_timestamp, - "_", - IFNULL(CAST(batch_event_index AS STRING), ""), - "_", - IFNULL(CAST(batch_page_id AS STRING), ""), - "_", - IFNULL(CAST(batch_ordering_id AS STRING), ""), - "_", + + qry = "" + + if self.table_type == "events": + + qry = f""" + CREATE OR REPLACE TEMP TABLE temp_events AS ( + SELECT + CONCAT( + stream_id, + "_", + IFNULL(user_pseudo_id, ""), + "_", + event_name, + "_", + event_timestamp, + "_", + IFNULL(CAST(batch_event_index AS STRING), ""), + "_", + IFNULL(CAST(batch_page_id AS STRING), ""), + "_", + IFNULL(CAST(batch_ordering_id AS STRING), ""), + "_", + + ROW_NUMBER() OVER (PARTITION BY CONCAT( + stream_id, + IFNULL(user_pseudo_id, ""), + event_name, + event_timestamp, + IFNULL(CAST(batch_event_index AS STRING), ""), + IFNULL(CAST(batch_page_id AS STRING), ""), + IFNULL(CAST(batch_ordering_id AS STRING), "") + ))) event_id, - ROW_NUMBER() OVER (PARTITION BY CONCAT( - stream_id, - IFNULL(user_pseudo_id, ""), - event_name, - event_timestamp, - IFNULL(CAST(batch_event_index AS STRING), ""), - IFNULL(CAST(batch_page_id AS STRING), ""), - IFNULL(CAST(batch_ordering_id AS STRING), "") - ))) event_id, - - * - FROM - `{self.gcp_project}.{self.dataset}.{self.table_type}_*` - WHERE _TABLE_SUFFIX = "{self.date_shard}" - ) - ; - """ + * + FROM + `{self.gcp_project}.{self.dataset}.{self.table_type}_*` + WHERE _TABLE_SUFFIX = "{self.date_shard}" + ) + ; + """ + elif self.table_type == "users": + qry = f""" + CREATE OR REPLACE TEMP TABLE temp_users AS ( + SELECT + GENERATE_UUID() row_id, + PARSE_DATE('%Y%m%d', _TABLE_SUFFIX) `date`, + * + FROM + `{self.gcp_project}.{self.dataset}.{self.table_type}_*` + WHERE _TABLE_SUFFIX = "{self.date_shard}" + ) + ; + """ + elif self.table_type == "pseudonymous_users": + qry = "" return qry + def get_events_query_select_statement(self): qry = f"""SELECT PARSE_DATE('%Y%m%d', event_date) AS event_date, @@ -448,6 +475,88 @@ def get_pseudo_user_audiences_select_statement(self): return qry + + def get_users_select_statement(self): + qry = f""" + SELECT + row_id, + `date`, + + user_id, + + user_info.last_active_timestamp_micros as user_info_last_active_timestamp_micros, + user_info.user_first_touch_timestamp_micros as user_info_user_first_touch_timestamp_micros, + user_info.first_purchase_date as user_info_first_purchase_date, + + device.operating_system as device_operating_system, + device.category as device_category, + device.mobile_brand_name as device_mobile_brand_name, + device.mobile_model_name as device_mobile_model_name, + device.unified_screen_name as device_unified_screen_name, + + geo.city as geo_city, + geo.country as geo_country, + geo.continent as geo_continent, + geo.region as geo_region, + + user_ltv.revenue_in_usd as user_ltv_revenue_in_usd, + user_ltv.sessions as user_ltv_sessions, + user_ltv.engagement_time_millis as user_ltv_engagement_time_millis, + user_ltv.purchases as user_ltv_purchases, + user_ltv.engaged_sessions as user_ltv_engaged_sessions, + user_ltv.session_duration_micros as user_ltv_session_duration_micros, + + predictions.in_app_purchase_score_7d as predictions_in_app_purchase_score_7d, + predictions.purchase_score_7d as predictions_purchase_score_7d, + predictions.churn_score_7d as predictions_churn_score_7d, + predictions.revenue_28d_in_usd as predictions_revenue_28d_in_usd, + + privacy_info.is_limited_ad_tracking as privacy_info_is_limited_ad_tracking, + privacy_info.is_ads_personalization_allowed as privacy_info_is_ads_personalization_allowed, + + occurrence_date, + last_updated_date + FROM + temp_users + ; + """ + return qry + + def get_users_user_properties_select_statement(self): + qry = f""" + SELECT + row_id, + `date`, + user_id, + up.key user_property_key, + up.value.string_value user_property_value, + up.value.set_timestamp_micros user_property_set_timestamp_micros, + up.value.user_property_name + FROM + temp_users + ,UNNEST(user_properties) up + ; + """ + return qry + + def get_users_user_audiences_select_statement(self): + qry = f""" + SELECT + row_id, + `date`, + user_id, + a.id audience_id, + a.name audience_name, + a.membership_start_timestamp_micros audience_membership_start_timestamp_micros, + a.membership_expiry_timestamp_micros audience_membership_expiry_timestamp_micros, + a.npa audience_npa + FROM + temp_users + ,UNNEST(audiences) a + ; + """ + return qry + def get_flat_table_update_query(self, select_statement, flat_table, sharded_output_required=True, partitioned_output_required=False): assert flat_table in ["flat_events", @@ -456,7 +565,10 @@ def get_flat_table_update_query(self, select_statement, flat_table, sharded_outp "flat_items", "flat_pseudo_users", "flat_pseudo_user_properties", - "flat_pseudo_user_audiences"] + "flat_pseudo_user_audiences", + "flat_users", + "flat_users_user_properties", + "flat_users_user_audiences"] assert "flat" in flat_table @@ -497,7 +609,11 @@ def get_select_statement(self, flat_table): "flat_items", "flat_pseudo_users", "flat_pseudo_user_properties", - "flat_pseudo_user_audiences"] + "flat_pseudo_user_audiences", + "flat_users", + "flat_users_user_properties", + "flat_users_user_audiences" + ] query = "" @@ -522,6 +638,15 @@ def get_select_statement(self, flat_table): elif flat_table == "flat_pseudo_user_audiences": query += self.get_pseudo_user_audiences_select_statement() + elif flat_table == "flat_users": + query += self.get_users_select_statement() + + elif flat_table == "flat_users_user_properties": + query += self.get_users_user_properties_select_statement() + + elif flat_table == "flat_users_user_audiences": + query += self.get_users_user_audiences_select_statement() + return query def build_full_query(self, sharded_output_required=True, partitioned_output_required=False, @@ -531,16 +656,16 @@ def build_full_query(self, sharded_output_required=True, partitioned_output_requ "flat_items", "flat_pseudo_users", "flat_pseudo_user_properties", - "flat_pseudo_user_audiences"]): + "flat_pseudo_user_audiences", + "flat_users", + "flat_users_user_properties", + "flat_users_user_audiences" + ]): assert len(list_of_flat_tables) >= 1, "At least 1 flat table needs to be included in the config file" query = "" - - if ("flat_events" in list_of_flat_tables - or "flat_event_params" in list_of_flat_tables - or "flat_user_properties" in list_of_flat_tables - or "flat_items" in list_of_flat_tables): + if self.table_type in ("events", "users"): query += self.get_temp_table_query() From 2b2d500c3f1343203477413f0355fc776d025c14 Mon Sep 17 00:00:00 2001 From: RuslanBergenov Date: Sat, 9 Nov 2024 11:18:34 -0700 Subject: [PATCH 06/23] test: date in users table --- tests/rsc/sample_desired_queries.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/rsc/sample_desired_queries.py b/tests/rsc/sample_desired_queries.py index d18114f..e96f5ca 100644 --- a/tests/rsc/sample_desired_queries.py +++ b/tests/rsc/sample_desired_queries.py @@ -275,7 +275,7 @@ sample_users_query = """ SELECT row_id, - PARSE_DATE('%Y%m%d', _TABLE_SUFFIX) `date`, + `date`, user_id, @@ -319,7 +319,7 @@ sample_users_user_properties_query = """ SELECT row_id, - PARSE_DATE('%Y%m%d', _TABLE_SUFFIX) `date`, + `date`, user_id, up.key user_property_key, up.value.string_value user_property_value, @@ -334,7 +334,7 @@ sample_users_user_audiences_query = """ SELECT row_id, - PARSE_DATE('%Y%m%d', _TABLE_SUFFIX) `date`, + `date`, user_id, a.id audience_id, a.name audience_name, From f10aae4ef1882b3dd138dd0cdf97ece4cf9caaf9 Mon Sep 17 00:00:00 2001 From: RuslanBergenov Date: Sat, 9 Nov 2024 11:21:57 -0700 Subject: [PATCH 07/23] feat: users tables in the config --- cfconfigbuilder/main.py | 3 +++ dm_helper.py | 3 +++ 2 files changed, 6 insertions(+) diff --git a/cfconfigbuilder/main.py b/cfconfigbuilder/main.py index 4853904..ce273a7 100644 --- a/cfconfigbuilder/main.py +++ b/cfconfigbuilder/main.py @@ -82,6 +82,9 @@ def get_ga_datasets(self): , os.environ["PSEUDO_USERS"] , os.environ["PSEUDO_USER_PROPERTIES"] , os.environ["PSEUDO_USER_AUDIENCES"] + , os.environ["USERS"] + , os.environ["USERS_USER_PROPERTIES"] + , os.environ["USERS_USER_AUDIENCES"] ] return ret_val diff --git a/dm_helper.py b/dm_helper.py index ce00f64..410e1d0 100644 --- a/dm_helper.py +++ b/dm_helper.py @@ -99,6 +99,9 @@ def __init__(self, context_environment_vars): "PSEUDO_USERS": "pseudo_users", "PSEUDO_USER_PROPERTIES": "pseudo_user_properties", "PSEUDO_USER_AUDIENCES": "pseudo_user_audiences", + "USERS": "users", + "USERS_USER_PROPERTIES": "users_user_properties", + "USERS_USER_AUDIENCES": "users_user_audiences", "LOCATION_ID": "us-central1", "TOPIC_NAME": self.get_topic_id() #TODO: add users } From 878d76af38bd0ebef78c7cb55a1251362059eef7 Mon Sep 17 00:00:00 2001 From: RuslanBergenov Date: Sat, 9 Nov 2024 12:08:49 -0700 Subject: [PATCH 08/23] test: verify users queries --- tests/test_verify_queries_users.py | 195 +++++++++++++++++++++++++++++ 1 file changed, 195 insertions(+) create mode 100644 tests/test_verify_queries_users.py diff --git a/tests/test_verify_queries_users.py b/tests/test_verify_queries_users.py new file mode 100644 index 0000000..87da15a --- /dev/null +++ b/tests/test_verify_queries_users.py @@ -0,0 +1,195 @@ +from tests.test_base import BaseUnitTest +from tests.test_base import Context +from cf.main import GaExportedNestedDataStorage +from tests.rsc import sample_desired_queries + + +class TestGenerateQuerySourceTableUsers(BaseUnitTest): + c = Context() + ga_source = GaExportedNestedDataStorage(gcp_project=c.env["project"], + dataset="analytics_123456789", + table_type="users", + date_shard="20241107", + ) + + def helper_clean_up_query(self, query): + """Cleans up a sample hardcoded query , so this query can be compared to a dynamically generated query + for testing purposes""" + + query_cleaned_up = query.upper().replace(" ", "").replace("\n", "").replace("\t", "").replace("%%Y%%M%%D", + "%Y%M%D") + + return query_cleaned_up + + def helper_clean_up_dynamically_generated_query(self, query, ga_source): + """Cleans up a dynamically generated query, so this query can be compared to a sample hardcoded query + for testing purposes""" + + query_cleaned_up = query.replace(ga_source.gcp_project, "gcp-project").replace( + ga_source.dataset, "dataset").replace(ga_source.date_shard, "date_shard") + + query_cleaned_up = self.helper_clean_up_query(query_cleaned_up) + + return query_cleaned_up + + # Compare dynamically generated queries to hardcoded expected baseline examples + # Compare sample hardcoded vs. dynamically generated query. They should be the same + def test_check_sql_query_users(self): + sample_hardcoded_query = sample_desired_queries.sample_users_query + test_dynamic_query = self.ga_source.get_users_select_statement() + + sample_hardcoded_query = self.helper_clean_up_query(sample_hardcoded_query) + + test_dynamic_query = self.helper_clean_up_dynamically_generated_query(test_dynamic_query, + self.ga_source) + + assert test_dynamic_query.endswith( + 'FROMTEMP_USERS;') + + assert sample_hardcoded_query == test_dynamic_query + + def test_check_sql_query_users_user_properties(self): + sample_hardcoded_query = sample_desired_queries.sample_users_user_properties_query + test_dynamic_query = self.ga_source.get_users_user_properties_select_statement() + + sample_hardcoded_query = self.helper_clean_up_query(sample_hardcoded_query) + + test_dynamic_query = self.helper_clean_up_dynamically_generated_query(test_dynamic_query, + self.ga_source) + + assert test_dynamic_query.endswith( + 'FROMTEMP_USERS,UNNEST(USER_PROPERTIES)UP;') + + assert sample_hardcoded_query == test_dynamic_query + + def test_check_sql_query_users_user_audiences(self): + sample_hardcoded_query = sample_desired_queries.sample_users_user_audiences_query + + test_dynamic_query = self.ga_source.get_users_user_audiences_select_statement() + + sample_hardcoded_query = self.helper_clean_up_query(sample_hardcoded_query) + + test_dynamic_query = self.helper_clean_up_dynamically_generated_query(test_dynamic_query, + self.ga_source) + + assert test_dynamic_query.endswith( + 'FROMTEMP_USERS,UNNEST(AUDIENCES)A;') + + assert sample_hardcoded_query == test_dynamic_query + + def test_get_select_statement(self): + users = self.ga_source.get_select_statement(flat_table="flat_users") + user_properties = self.ga_source.get_select_statement(flat_table="flat_users_user_properties") + user_audiences = self.ga_source.get_select_statement(flat_table="flat_users_user_audiences") + + assert "user_id" in users and "temp_users" in users + assert "user_properties" in user_properties and "user_id" in user_properties and "temp_users" in user_properties + assert "audiences" in user_audiences and "user_id" in user_audiences and "temp_users" in user_audiences + + def test_get_flat_table_update_query_sharded_output_required(self): + select_statement = self.ga_source.get_select_statement(flat_table="flat_users") + result = self.ga_source.get_flat_table_update_query(select_statement=select_statement, + flat_table="flat_users", + sharded_output_required=True, + partitioned_output_required=False) + + expected_query = f"""CREATE OR REPLACE TABLE `{self.ga_source.gcp_project}.{self.ga_source.dataset}.flat_users_{self.ga_source.date_shard}` + AS + {select_statement}""" + + self.assertEqual(result.replace(" ", "").replace("\n", "").upper(), + expected_query.replace(" ", "").replace("\n", "").upper()) + + def test_get_flat_table_update_query_partitioned_output_required(self): + select_statement = self.ga_source.get_select_statement(flat_table="flat_users") + result = self.ga_source.get_flat_table_update_query(select_statement=select_statement, + flat_table="flat_users", + sharded_output_required=False, + partitioned_output_required=True) + + expected_query = f""" + CREATE TABLE IF NOT EXISTS `{self.ga_source.gcp_project}.{self.ga_source.dataset}.flat_users` + + PARTITION BY `date` + + AS {select_statement} + + DELETE FROM `{self.ga_source.gcp_project}.{self.ga_source.dataset}.flat_users` WHERE `date` = PARSE_DATE('%Y%m%d','{self.ga_source.date_shard}'); + INSERT INTO `{self.ga_source.gcp_project}.{self.ga_source.dataset}.flat_users` + {select_statement}""" + + expected_query_cleaned = expected_query.replace(" ", "").replace("\n", "").upper() + result_cleaned = result.replace(" ", "").replace("\n", "").upper() + + self.assertEqual(expected_query_cleaned, result_cleaned) + + def test_get_flat_table_update_query_sharded_and_partitioned_output_required_flat_users(self): + select_statement = self.ga_source.get_select_statement(flat_table="flat_users") + result = self.ga_source.get_flat_table_update_query(select_statement=select_statement, + flat_table="flat_users", + sharded_output_required=True, + partitioned_output_required=True) + + expected_query = f""" + CREATE OR REPLACE TABLE `{self.ga_source.gcp_project}.{self.ga_source.dataset}.flat_users_{self.ga_source.date_shard}` + AS + {select_statement} + + CREATE TABLE IF NOT EXISTS `{self.ga_source.gcp_project}.{self.ga_source.dataset}.flat_users` + + PARTITION BY `date` + + AS {select_statement} + + + DELETE FROM `{self.ga_source.gcp_project}.{self.ga_source.dataset}.flat_users` WHERE `date` = PARSE_DATE('%Y%m%d','{self.ga_source.date_shard}'); + + INSERT INTO `{self.ga_source.gcp_project}.{self.ga_source.dataset}.flat_users` + {select_statement}""" + + expected_query_cleaned = expected_query.replace(" ", "").replace("\n", "").upper() + result_cleaned = result.replace(" ", "").replace("\n", "").upper() + + self.assertEqual(expected_query_cleaned, result_cleaned) + + def test_build_full_query(self): + sharded_output_required = True + partitioned_output_required = False + + _1 = self.ga_source.get_temp_table_query() + + _2 = self.ga_source.get_flat_table_update_query( + select_statement=self.ga_source.get_select_statement(flat_table="flat_users"), + flat_table="flat_users", + sharded_output_required=sharded_output_required, + partitioned_output_required=partitioned_output_required) + + _3 = self.ga_source.get_flat_table_update_query( + select_statement=self.ga_source.get_select_statement(flat_table="flat_users_user_properties"), + flat_table="flat_users_user_properties", + sharded_output_required=sharded_output_required, + partitioned_output_required=partitioned_output_required) + + _4 = self.ga_source.get_flat_table_update_query( + select_statement=self.ga_source.get_select_statement(flat_table="flat_users_user_audiences"), + flat_table="flat_users_user_audiences", + sharded_output_required=sharded_output_required, + partitioned_output_required=partitioned_output_required) + + expected_query = f"""{_1} + {_2} + {_3} + {_4} + """ + assert "pseudo" not in expected_query + + result = self.ga_source.build_full_query(sharded_output_required=sharded_output_required, + partitioned_output_required=partitioned_output_required, + list_of_flat_tables=["flat_users", + "flat_users_user_properties", + "flat_users_user_audiences"]) + self.assertEqual(result.replace(" ", "").replace("\n", "").upper(), + expected_query.replace(" ", "").replace("\n", "").upper()) + + def tearDown(self): + pass From 1a58fe0168db21e4fb8f4498b67c61c3c2e16b3b Mon Sep 17 00:00:00 2001 From: RuslanBergenov Date: Sat, 9 Nov 2024 12:11:20 -0700 Subject: [PATCH 09/23] test: verify users queries --- tests/test_verify_queries_users.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tests/test_verify_queries_users.py b/tests/test_verify_queries_users.py index 87da15a..256f2e1 100644 --- a/tests/test_verify_queries_users.py +++ b/tests/test_verify_queries_users.py @@ -181,15 +181,20 @@ def test_build_full_query(self): {_3} {_4} """ - assert "pseudo" not in expected_query result = self.ga_source.build_full_query(sharded_output_required=sharded_output_required, partitioned_output_required=partitioned_output_required, list_of_flat_tables=["flat_users", "flat_users_user_properties", "flat_users_user_audiences"]) + + assert "pseudo" not in expected_query + assert "pseudo" not in result + + self.assertEqual(result.replace(" ", "").replace("\n", "").upper(), expected_query.replace(" ", "").replace("\n", "").upper()) + def tearDown(self): pass From 6e27242d9feb27a8b0c36d1887e770cdb75f38a8 Mon Sep 17 00:00:00 2001 From: RuslanBergenov Date: Sat, 9 Nov 2024 12:12:32 -0700 Subject: [PATCH 10/23] refactor: users vs. pseudo_users tests --- tests/test_verify_queries.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_verify_queries.py b/tests/test_verify_queries.py index 2793ffd..a4451ba 100644 --- a/tests/test_verify_queries.py +++ b/tests/test_verify_queries.py @@ -268,7 +268,7 @@ def test_build_full_query(self): self.assertEqual(result.replace(" ", "").replace("\n", "").upper(), expected_query.replace(" ", "").replace("\n", "").upper()) -class TestGenerateQuerySourceTableUsers(BaseUnitTest): +class TestGenerateQuerySourceTablePseudoUsers(BaseUnitTest): c = Context() ga_source = GaExportedNestedDataStorage(gcp_project=c.env["project"], dataset=c.env["dataset"], From fe38d9baeb43a4868b567c37c44902b7d53a7c28 Mon Sep 17 00:00:00 2001 From: RuslanBergenov Date: Sat, 9 Nov 2024 12:16:04 -0700 Subject: [PATCH 11/23] test: test suite --- tests/test.py | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/tests/test.py b/tests/test.py index ea4efe7..037e6c9 100644 --- a/tests/test.py +++ b/tests/test.py @@ -1,6 +1,12 @@ import unittest from tests.test_build_ga_flattener_config import TestCFBuildFlattenerGaDatasetConfig -from tests.test_flatten_ga_data import TestCFFlattenMethods, TestCFFlattenMethodsSchemaChangeCollectedTrafficSource, TestCFFlattenMethodsSchemaChangeIsActiveUser +from tests.test_flatten_ga_data import (TestCFFlattenMethods, + TestCFFlattenMethodsSchemaChangeCollectedTrafficSource, + TestCFFlattenMethodsSchemaChangeIsActiveUser, + TestCFFlattenMethodsUsersSourceDoesNotExist, + TestCFFlattenMethodsPseudoUsers, + # TestCFFlattenMethodsUsers, + ) from tests.test_flatten_ga_data_intraday import TestCFFlattenMethodsIntraday from tests.test_generate_config_b import TestGenerateConfigB from tests.test_generate_config_cf import TestGenerateConfigCf @@ -12,17 +18,28 @@ from tests.test_manage_intraday_schedule import TestManageIntradayFlatteningSchedule from tests.test_partitioning import TestPartitioning from tests.test_valid_resource_names import TestValidResourceNames -from tests.test_verify_queries import TestGenerateQuerySourceTableEvents, TestGenerateQuerySourceTableUsers + +from tests.test_verify_queries_events import TestGenerateQuerySourceTableEvents +from tests.test_verify_queries_pseudo_users import TestGenerateQuerySourceTablePseudoUsers +from tests.test_verify_queries_users import TestGenerateQuerySourceTableUsers + if __name__ == '__main__': test_suite = unittest.TestSuite() # tests test_suite.addTest(unittest.makeSuite(TestCFBuildFlattenerGaDatasetConfig)) + test_suite.addTest(unittest.makeSuite(TestCFFlattenMethods)) test_suite.addTest(unittest.makeSuite(TestCFFlattenMethodsSchemaChangeCollectedTrafficSource)) test_suite.addTest(unittest.makeSuite(TestCFFlattenMethodsSchemaChangeIsActiveUser)) + + test_suite.addTest(unittest.makeSuite(TestCFFlattenMethodsUsersSourceDoesNotExist)) + test_suite.addTest(unittest.makeSuite(TestCFFlattenMethodsPseudoUsers)) + test_suite.addTest(unittest.makeSuite(TestCFFlattenMethodsUsers)) + test_suite.addTest(unittest.makeSuite(TestCFFlattenMethodsIntraday)) + test_suite.addTest(unittest.makeSuite(TestGenerateConfigB)) test_suite.addTest(unittest.makeSuite(TestGenerateConfigCf)) test_suite.addTest(unittest.makeSuite(TestGenerateConfigLm)) @@ -35,7 +52,9 @@ test_suite.addTest(unittest.makeSuite(TestManageIntradayFlatteningSchedule)) test_suite.addTest(unittest.makeSuite(TestPartitioning)) test_suite.addTest(unittest.makeSuite(TestValidResourceNames)) + test_suite.addTest(unittest.makeSuite(TestGenerateQuerySourceTableEvents)) + test_suite.addTest(unittest.makeSuite(TestGenerateQuerySourceTablePseudoUsers)) test_suite.addTest(unittest.makeSuite(TestGenerateQuerySourceTableUsers)) # verbosity: 0 (quiet), 1 (default), 2 (verbose) From 34123510b31416b9c10b14244033cdb36bbf307b Mon Sep 17 00:00:00 2001 From: RuslanBergenov Date: Sat, 9 Nov 2024 12:16:30 -0700 Subject: [PATCH 12/23] refactor: testing SQL queries to make sure they match --- ...eries.py => test_verify_queries_events.py} | 180 ---------------- tests/test_verify_queries_pseudo_users.py | 192 ++++++++++++++++++ 2 files changed, 192 insertions(+), 180 deletions(-) rename tests/{test_verify_queries.py => test_verify_queries_events.py} (61%) create mode 100644 tests/test_verify_queries_pseudo_users.py diff --git a/tests/test_verify_queries.py b/tests/test_verify_queries_events.py similarity index 61% rename from tests/test_verify_queries.py rename to tests/test_verify_queries_events.py index a4451ba..b9dd8bd 100644 --- a/tests/test_verify_queries.py +++ b/tests/test_verify_queries_events.py @@ -268,186 +268,6 @@ def test_build_full_query(self): self.assertEqual(result.replace(" ", "").replace("\n", "").upper(), expected_query.replace(" ", "").replace("\n", "").upper()) -class TestGenerateQuerySourceTablePseudoUsers(BaseUnitTest): - c = Context() - ga_source = GaExportedNestedDataStorage(gcp_project=c.env["project"], - dataset=c.env["dataset"], - table_type="pseudonymous_users", - date_shard="20240611", - ) - - def helper_clean_up_query(self, query): - """Cleans up a sample hardcoded query , so this query can be compared to a dynamically generated query - for testing purposes""" - - query_cleaned_up = query.upper().replace(" ", "").replace("\n", "").replace("\t", "").replace("%%Y%%M%%D", - "%Y%M%D") - - return query_cleaned_up - - def helper_clean_up_dynamically_generated_query(self, query, ga_source): - """Cleans up a dynamically generated query, so this query can be compared to a sample hardcoded query - for testing purposes""" - - query_cleaned_up = query.replace(ga_source.gcp_project, "gcp-project").replace( - ga_source.dataset, "dataset").replace(ga_source.date_shard, "date_shard") - - query_cleaned_up = self.helper_clean_up_query(query_cleaned_up) - - return query_cleaned_up - - # Compare dynamically generated queries to hardcoded expected baseline examples - # Compare sample hardcoded vs. dynamically generated query. They should be the same - def test_check_sql_query_pseudo_users(self): - sample_hardcoded_query = sample_desired_queries.sample_pseudo_users_query - sample_hardcoded_query = self.helper_clean_up_query(sample_hardcoded_query) - - test_dynamic_query = self.ga_source.get_pseudo_users_select_statement() - test_dynamic_query = self.helper_clean_up_dynamically_generated_query(test_dynamic_query, - self.ga_source) - - assert test_dynamic_query.endswith('FROM`GCP-PROJECT.DATASET.PSEUDONYMOUS_USERS_*`WHERE_TABLE_SUFFIX="DATE_SHARD";') - - assert sample_hardcoded_query == test_dynamic_query - - def test_check_sql_query_pseudo_user_properties(self): - sample_hardcoded_query = sample_desired_queries.sample_pseudo_user_properties_query - sample_hardcoded_query = self.helper_clean_up_query(sample_hardcoded_query) - - test_dynamic_query = self.ga_source.get_pseudo_user_properties_select_statement() - test_dynamic_query = self.helper_clean_up_dynamically_generated_query(test_dynamic_query, - self.ga_source) - - assert test_dynamic_query.endswith('FROM`GCP-PROJECT.DATASET.PSEUDONYMOUS_USERS_*`,UNNEST(USER_PROPERTIES)UPWHERE_TABLE_SUFFIX="DATE_SHARD";') - - assert sample_hardcoded_query == test_dynamic_query - - - def test_check_sql_query_pseudo_user_audiences(self): - sample_hardcoded_query = sample_desired_queries.sample_pseudo_user_audiences_query - sample_hardcoded_query = self.helper_clean_up_query(sample_hardcoded_query) - - test_dynamic_query = self.ga_source.get_pseudo_user_audiences_select_statement() - test_dynamic_query = self.helper_clean_up_dynamically_generated_query(test_dynamic_query, - self.ga_source) - - assert test_dynamic_query.endswith('FROM`GCP-PROJECT.DATASET.PSEUDONYMOUS_USERS_*`,UNNEST(AUDIENCES)AWHERE_TABLE_SUFFIX="DATE_SHARD";') - - assert sample_hardcoded_query == test_dynamic_query - - def test_get_select_statement(self): - - pseudo_users = self.ga_source.get_select_statement(flat_table="flat_pseudo_users") - pseudo_user_properties = self.ga_source.get_select_statement(flat_table="flat_pseudo_user_properties") - pseudo_user_audiences = self.ga_source.get_select_statement(flat_table="flat_pseudo_user_audiences") - - assert "pseudo_user_id" in pseudo_users - assert "user_properties" in pseudo_user_properties and "pseudo_user_id" in pseudo_user_properties - assert "audiences" in pseudo_user_audiences and "pseudo_user_id" in pseudo_users - - def test_get_flat_table_update_query_sharded_output_required(self): - select_statement = self.ga_source.get_select_statement(flat_table="flat_pseudo_users") - result = self.ga_source.get_flat_table_update_query(select_statement=select_statement, - flat_table="flat_pseudo_users", - sharded_output_required=True, - partitioned_output_required=False) - - expected_query = f"""CREATE OR REPLACE TABLE `{self.ga_source.gcp_project}.{self.ga_source.dataset}.flat_pseudo_users_{self.ga_source.date_shard}` - AS - {select_statement}""" - - self.assertEqual(result.replace(" ", "").replace("\n", "").upper(), - expected_query.replace(" ", "").replace("\n", "").upper()) - - def test_get_flat_table_update_query_partitioned_output_required(self): - select_statement = self.ga_source.get_select_statement(flat_table="flat_pseudo_users") - result = self.ga_source.get_flat_table_update_query(select_statement=select_statement, - flat_table="flat_pseudo_users", - sharded_output_required=False, - partitioned_output_required=True) - - expected_query = f""" - CREATE TABLE IF NOT EXISTS `{self.ga_source.gcp_project}.{self.ga_source.dataset}.flat_pseudo_users` - - PARTITION BY `date` - - AS {select_statement} - - DELETE FROM `{self.ga_source.gcp_project}.{self.ga_source.dataset}.flat_pseudo_users` WHERE `date` = PARSE_DATE('%Y%m%d','{self.ga_source.date_shard}'); - INSERT INTO `{self.ga_source.gcp_project}.{self.ga_source.dataset}.flat_pseudo_users` - {select_statement}""" - - expected_query_cleaned = expected_query.replace(" ", "").replace("\n", "").upper() - result_cleaned = result.replace(" ", "").replace("\n", "").upper() - - - self.assertEqual(expected_query_cleaned, result_cleaned) - - def test_get_flat_table_update_query_sharded_and_partitioned_output_required_flat_pseudo_users(self): - select_statement = self.ga_source.get_select_statement(flat_table="flat_pseudo_users") - result = self.ga_source.get_flat_table_update_query(select_statement=select_statement, - flat_table="flat_pseudo_users", - sharded_output_required=True, - partitioned_output_required=True) - - expected_query = f""" - CREATE OR REPLACE TABLE `{self.ga_source.gcp_project}.{self.ga_source.dataset}.flat_pseudo_users_{self.ga_source.date_shard}` - AS - {select_statement} - - CREATE TABLE IF NOT EXISTS `{self.ga_source.gcp_project}.{self.ga_source.dataset}.flat_pseudo_users` - - PARTITION BY `date` - - AS {select_statement} - - - DELETE FROM `{self.ga_source.gcp_project}.{self.ga_source.dataset}.flat_pseudo_users` WHERE `date` = PARSE_DATE('%Y%m%d','{self.ga_source.date_shard}'); - - INSERT INTO `{self.ga_source.gcp_project}.{self.ga_source.dataset}.flat_pseudo_users` - {select_statement}""" - - expected_query_cleaned = expected_query.replace(" ", "").replace("\n", "").upper() - result_cleaned = result.replace(" ", "").replace("\n", "").upper() - - - self.assertEqual(expected_query_cleaned, result_cleaned) - - - def test_build_full_query(self): - sharded_output_required = True - partitioned_output_required = False - - _1 = self.ga_source.get_flat_table_update_query( - select_statement=self.ga_source.get_select_statement(flat_table="flat_pseudo_users"), - flat_table="flat_pseudo_users", - sharded_output_required=sharded_output_required, - partitioned_output_required=partitioned_output_required) - - _2 = self.ga_source.get_flat_table_update_query( - select_statement=self.ga_source.get_select_statement(flat_table="flat_pseudo_user_properties"), - flat_table="flat_pseudo_user_properties", - sharded_output_required=sharded_output_required, - partitioned_output_required=partitioned_output_required) - - _3 = self.ga_source.get_flat_table_update_query( - select_statement=self.ga_source.get_select_statement(flat_table="flat_pseudo_user_audiences"), - flat_table="flat_pseudo_user_audiences", - sharded_output_required=sharded_output_required, - partitioned_output_required=partitioned_output_required) - - - expected_query = f"""{_1} - {_2} - {_3} - """ - result = self.ga_source.build_full_query(sharded_output_required=sharded_output_required, - partitioned_output_required=partitioned_output_required, - list_of_flat_tables=["flat_pseudo_users", - "flat_pseudo_user_properties", - "flat_pseudo_user_audiences"]) - self.assertEqual(result.replace(" ", "").replace("\n", "").upper(), - expected_query.replace(" ", "").replace("\n", "").upper()) def tearDown(self): pass diff --git a/tests/test_verify_queries_pseudo_users.py b/tests/test_verify_queries_pseudo_users.py new file mode 100644 index 0000000..e585cd4 --- /dev/null +++ b/tests/test_verify_queries_pseudo_users.py @@ -0,0 +1,192 @@ +from tests.test_base import BaseUnitTest +from tests.test_base import Context +from cf.main import GaExportedNestedDataStorage +from cfintradaysqlview.main import IntradaySQLView +from tests.rsc import sample_desired_queries + + +# test_get_flat_table_update_query_sharded_and_partitioned_output_required_flat_event_params + +class TestGenerateQuerySourceTablePseudoUsers(BaseUnitTest): + c = Context() + ga_source = GaExportedNestedDataStorage(gcp_project=c.env["project"], + dataset=c.env["dataset"], + table_type="pseudonymous_users", + date_shard="20240611", + ) + + def helper_clean_up_query(self, query): + """Cleans up a sample hardcoded query , so this query can be compared to a dynamically generated query + for testing purposes""" + + query_cleaned_up = query.upper().replace(" ", "").replace("\n", "").replace("\t", "").replace("%%Y%%M%%D", + "%Y%M%D") + + return query_cleaned_up + + def helper_clean_up_dynamically_generated_query(self, query, ga_source): + """Cleans up a dynamically generated query, so this query can be compared to a sample hardcoded query + for testing purposes""" + + query_cleaned_up = query.replace(ga_source.gcp_project, "gcp-project").replace( + ga_source.dataset, "dataset").replace(ga_source.date_shard, "date_shard") + + query_cleaned_up = self.helper_clean_up_query(query_cleaned_up) + + return query_cleaned_up + + # Compare dynamically generated queries to hardcoded expected baseline examples + # Compare sample hardcoded vs. dynamically generated query. They should be the same + def test_check_sql_query_pseudo_users(self): + sample_hardcoded_query = sample_desired_queries.sample_pseudo_users_query + sample_hardcoded_query = self.helper_clean_up_query(sample_hardcoded_query) + + test_dynamic_query = self.ga_source.get_pseudo_users_select_statement() + test_dynamic_query = self.helper_clean_up_dynamically_generated_query(test_dynamic_query, + self.ga_source) + + assert test_dynamic_query.endswith('FROM`GCP-PROJECT.DATASET.PSEUDONYMOUS_USERS_*`WHERE_TABLE_SUFFIX="DATE_SHARD";') + + assert sample_hardcoded_query == test_dynamic_query + + def test_check_sql_query_pseudo_user_properties(self): + sample_hardcoded_query = sample_desired_queries.sample_pseudo_user_properties_query + sample_hardcoded_query = self.helper_clean_up_query(sample_hardcoded_query) + + test_dynamic_query = self.ga_source.get_pseudo_user_properties_select_statement() + test_dynamic_query = self.helper_clean_up_dynamically_generated_query(test_dynamic_query, + self.ga_source) + + assert test_dynamic_query.endswith('FROM`GCP-PROJECT.DATASET.PSEUDONYMOUS_USERS_*`,UNNEST(USER_PROPERTIES)UPWHERE_TABLE_SUFFIX="DATE_SHARD";') + + assert sample_hardcoded_query == test_dynamic_query + + + def test_check_sql_query_pseudo_user_audiences(self): + sample_hardcoded_query = sample_desired_queries.sample_pseudo_user_audiences_query + sample_hardcoded_query = self.helper_clean_up_query(sample_hardcoded_query) + + test_dynamic_query = self.ga_source.get_pseudo_user_audiences_select_statement() + test_dynamic_query = self.helper_clean_up_dynamically_generated_query(test_dynamic_query, + self.ga_source) + + assert test_dynamic_query.endswith('FROM`GCP-PROJECT.DATASET.PSEUDONYMOUS_USERS_*`,UNNEST(AUDIENCES)AWHERE_TABLE_SUFFIX="DATE_SHARD";') + + assert sample_hardcoded_query == test_dynamic_query + + def test_get_select_statement(self): + + pseudo_users = self.ga_source.get_select_statement(flat_table="flat_pseudo_users") + pseudo_user_properties = self.ga_source.get_select_statement(flat_table="flat_pseudo_user_properties") + pseudo_user_audiences = self.ga_source.get_select_statement(flat_table="flat_pseudo_user_audiences") + + assert "pseudo_user_id" in pseudo_users + assert "user_properties" in pseudo_user_properties and "pseudo_user_id" in pseudo_user_properties + assert "audiences" in pseudo_user_audiences and "pseudo_user_id" in pseudo_users + + def test_get_flat_table_update_query_sharded_output_required(self): + select_statement = self.ga_source.get_select_statement(flat_table="flat_pseudo_users") + result = self.ga_source.get_flat_table_update_query(select_statement=select_statement, + flat_table="flat_pseudo_users", + sharded_output_required=True, + partitioned_output_required=False) + + expected_query = f"""CREATE OR REPLACE TABLE `{self.ga_source.gcp_project}.{self.ga_source.dataset}.flat_pseudo_users_{self.ga_source.date_shard}` + AS + {select_statement}""" + + self.assertEqual(result.replace(" ", "").replace("\n", "").upper(), + expected_query.replace(" ", "").replace("\n", "").upper()) + + def test_get_flat_table_update_query_partitioned_output_required(self): + select_statement = self.ga_source.get_select_statement(flat_table="flat_pseudo_users") + result = self.ga_source.get_flat_table_update_query(select_statement=select_statement, + flat_table="flat_pseudo_users", + sharded_output_required=False, + partitioned_output_required=True) + + expected_query = f""" + CREATE TABLE IF NOT EXISTS `{self.ga_source.gcp_project}.{self.ga_source.dataset}.flat_pseudo_users` + + PARTITION BY `date` + + AS {select_statement} + + DELETE FROM `{self.ga_source.gcp_project}.{self.ga_source.dataset}.flat_pseudo_users` WHERE `date` = PARSE_DATE('%Y%m%d','{self.ga_source.date_shard}'); + INSERT INTO `{self.ga_source.gcp_project}.{self.ga_source.dataset}.flat_pseudo_users` + {select_statement}""" + + expected_query_cleaned = expected_query.replace(" ", "").replace("\n", "").upper() + result_cleaned = result.replace(" ", "").replace("\n", "").upper() + + + self.assertEqual(expected_query_cleaned, result_cleaned) + + def test_get_flat_table_update_query_sharded_and_partitioned_output_required_flat_pseudo_users(self): + select_statement = self.ga_source.get_select_statement(flat_table="flat_pseudo_users") + result = self.ga_source.get_flat_table_update_query(select_statement=select_statement, + flat_table="flat_pseudo_users", + sharded_output_required=True, + partitioned_output_required=True) + + expected_query = f""" + CREATE OR REPLACE TABLE `{self.ga_source.gcp_project}.{self.ga_source.dataset}.flat_pseudo_users_{self.ga_source.date_shard}` + AS + {select_statement} + + CREATE TABLE IF NOT EXISTS `{self.ga_source.gcp_project}.{self.ga_source.dataset}.flat_pseudo_users` + + PARTITION BY `date` + + AS {select_statement} + + + DELETE FROM `{self.ga_source.gcp_project}.{self.ga_source.dataset}.flat_pseudo_users` WHERE `date` = PARSE_DATE('%Y%m%d','{self.ga_source.date_shard}'); + + INSERT INTO `{self.ga_source.gcp_project}.{self.ga_source.dataset}.flat_pseudo_users` + {select_statement}""" + + expected_query_cleaned = expected_query.replace(" ", "").replace("\n", "").upper() + result_cleaned = result.replace(" ", "").replace("\n", "").upper() + + + self.assertEqual(expected_query_cleaned, result_cleaned) + + + def test_build_full_query(self): + sharded_output_required = True + partitioned_output_required = False + + _1 = self.ga_source.get_flat_table_update_query( + select_statement=self.ga_source.get_select_statement(flat_table="flat_pseudo_users"), + flat_table="flat_pseudo_users", + sharded_output_required=sharded_output_required, + partitioned_output_required=partitioned_output_required) + + _2 = self.ga_source.get_flat_table_update_query( + select_statement=self.ga_source.get_select_statement(flat_table="flat_pseudo_user_properties"), + flat_table="flat_pseudo_user_properties", + sharded_output_required=sharded_output_required, + partitioned_output_required=partitioned_output_required) + + _3 = self.ga_source.get_flat_table_update_query( + select_statement=self.ga_source.get_select_statement(flat_table="flat_pseudo_user_audiences"), + flat_table="flat_pseudo_user_audiences", + sharded_output_required=sharded_output_required, + partitioned_output_required=partitioned_output_required) + + + expected_query = f"""{_1} + {_2} + {_3} + """ + result = self.ga_source.build_full_query(sharded_output_required=sharded_output_required, + partitioned_output_required=partitioned_output_required, + list_of_flat_tables=["flat_pseudo_users", + "flat_pseudo_user_properties", + "flat_pseudo_user_audiences"]) + self.assertEqual(result.replace(" ", "").replace("\n", "").upper(), + expected_query.replace(" ", "").replace("\n", "").upper()) + + def tearDown(self): + pass From a1714071c2ea8b69d23da99620fdb2146bf136d0 Mon Sep 17 00:00:00 2001 From: RuslanBergenov Date: Sat, 9 Nov 2024 12:45:04 -0700 Subject: [PATCH 13/23] test: flatten data --- ...ta.py => test_flatten_ga_data_1_events.py} | 96 --------------- tests/test_flatten_ga_data_2_pseudo_users.py | 107 +++++++++++++++++ tests/test_flatten_ga_data_3_users.py | 110 ++++++++++++++++++ ....py => test_flatten_ga_data_4_intraday.py} | 0 4 files changed, 217 insertions(+), 96 deletions(-) rename tests/{test_flatten_ga_data.py => test_flatten_ga_data_1_events.py} (60%) create mode 100644 tests/test_flatten_ga_data_2_pseudo_users.py create mode 100644 tests/test_flatten_ga_data_3_users.py rename tests/{test_flatten_ga_data_intraday.py => test_flatten_ga_data_4_intraday.py} (100%) diff --git a/tests/test_flatten_ga_data.py b/tests/test_flatten_ga_data_1_events.py similarity index 60% rename from tests/test_flatten_ga_data.py rename to tests/test_flatten_ga_data_1_events.py index 66fc371..15a559b 100644 --- a/tests/test_flatten_ga_data.py +++ b/tests/test_flatten_ga_data_1_events.py @@ -149,102 +149,6 @@ def test_flatten_ga_data_check_output_flat_events_schema_change(self): assert self.tbl_exists(dataset=self.ga_source.dataset, table_name=f"flat_user_properties_{self.ga_source.date_shard}") -class TestCFFlattenMethodsUsers(BaseUnitTest): - c = Context() - ga_source = GaExportedNestedDataStorage(gcp_project=c.env["project"], - dataset=c.env["dataset"], - table_type="pseudonymous_users", - date_shard="20240611", - ) - - def tbl_exists(self, dataset, table_name): - """ - https://stackoverflow.com/questions/28731102/bigquery-check-if-table-already-exists - """ - client = bigquery.Client(project=self.ga_source.gcp_project) - - full_table_path = f"{self.ga_source.gcp_project}.{dataset}.{table_name}" - table_id = bigquery.Table(full_table_path) - try: - client.get_table(table_id) - return True - except NotFound: - return False - - def test_flatten_ga_data_sharded(self): - sharded_output_required = True - partitioned_output_required = False - - query = self.ga_source.build_full_query(sharded_output_required=sharded_output_required, - partitioned_output_required=partitioned_output_required, - list_of_flat_tables=["flat_pseudo_users", "flat_pseudo_user_properties", "flat_pseudo_user_audiences"]) - - self.ga_source.run_query_job(query, wait_for_the_query_job_to_complete=True) - - assert self.tbl_exists(dataset=self.ga_source.dataset, - table_name=f"flat_pseudo_users_{self.ga_source.date_shard}") - - assert self.tbl_exists(dataset=self.ga_source.dataset, - table_name=f"flat_pseudo_user_properties_{self.ga_source.date_shard}") - - assert self.tbl_exists(dataset=self.ga_source.dataset, - table_name=f"flat_pseudo_user_audiences_{self.ga_source.date_shard}") - - def test_flatten_ga_data_partitioned(self): - sharded_output_required = False - partitioned_output_required = True - - query = self.ga_source.build_full_query(sharded_output_required=sharded_output_required, - partitioned_output_required=partitioned_output_required, - list_of_flat_tables=["flat_pseudo_users"]) - - self.ga_source.run_query_job(query, wait_for_the_query_job_to_complete=True) - - assert self.tbl_exists(dataset=self.ga_source.dataset, - table_name=f"flat_pseudo_users") - -class TestCFFlattenMethodsUsersSourceDoesNotExist(BaseUnitTest): - c = Context() - ga_source = GaExportedNestedDataStorage(gcp_project=c.env["project"], - dataset=c.env["dataset"], - table_type="pseudonymous_users", - date_shard="20240101", - ) - - def tbl_exists(self, dataset, table_name): - """ - https://stackoverflow.com/questions/28731102/bigquery-check-if-table-already-exists - """ - client = bigquery.Client(project=self.ga_source.gcp_project) - - full_table_path = f"{self.ga_source.gcp_project}.{dataset}.{table_name}" - table_id = bigquery.Table(full_table_path) - try: - client.get_table(table_id) - return True - except NotFound: - return False - - def test_flatten_ga_data_sharded(self): - sharded_output_required = True - partitioned_output_required = False - - query = self.ga_source.build_full_query(sharded_output_required=sharded_output_required, - partitioned_output_required=partitioned_output_required, - list_of_flat_tables=["flat_pseudo_users", - "flat_pseudo_user_properties", - "flat_pseudo_user_audiences"]) - - self.ga_source.run_query_job(query, wait_for_the_query_job_to_complete=True) - - assert not self.tbl_exists(dataset=self.ga_source.dataset, - table_name=f"flat_pseudo_users_{self.ga_source.date_shard}") - - assert not self.tbl_exists(dataset=self.ga_source.dataset, - table_name=f"flat_pseudo_user_properties_{self.ga_source.date_shard}") - - assert not self.tbl_exists(dataset=self.ga_source.dataset, - table_name=f"flat_pseudo_user_audiences_{self.ga_source.date_shard}") def tearDown(self): self.delete_all_flat_tables_from_dataset() pass diff --git a/tests/test_flatten_ga_data_2_pseudo_users.py b/tests/test_flatten_ga_data_2_pseudo_users.py new file mode 100644 index 0000000..0d4ad4a --- /dev/null +++ b/tests/test_flatten_ga_data_2_pseudo_users.py @@ -0,0 +1,107 @@ +from tests.test_base import BaseUnitTest +from tests.test_base import Context +from cf.main import GaExportedNestedDataStorage +from google.cloud import bigquery +from google.cloud.exceptions import NotFound + +#TODO: reduce repetition, write a function which takes date shared as param + +class TestCFFlattenMethodsPseudoUsers(BaseUnitTest): + c = Context() + ga_source = GaExportedNestedDataStorage(gcp_project=c.env["project"], + dataset=c.env["dataset"], + table_type="pseudonymous_users", + date_shard="20240611", + ) + + def tbl_exists(self, dataset, table_name): + """ + https://stackoverflow.com/questions/28731102/bigquery-check-if-table-already-exists + """ + client = bigquery.Client(project=self.ga_source.gcp_project) + + full_table_path = f"{self.ga_source.gcp_project}.{dataset}.{table_name}" + table_id = bigquery.Table(full_table_path) + try: + client.get_table(table_id) + return True + except NotFound: + return False + + def test_flatten_ga_data_sharded(self): + sharded_output_required = True + partitioned_output_required = False + + query = self.ga_source.build_full_query(sharded_output_required=sharded_output_required, + partitioned_output_required=partitioned_output_required, + list_of_flat_tables=["flat_pseudo_users", "flat_pseudo_user_properties", "flat_pseudo_user_audiences"]) + + self.ga_source.run_query_job(query, wait_for_the_query_job_to_complete=True) + + assert self.tbl_exists(dataset=self.ga_source.dataset, + table_name=f"flat_pseudo_users_{self.ga_source.date_shard}") + + assert self.tbl_exists(dataset=self.ga_source.dataset, + table_name=f"flat_pseudo_user_properties_{self.ga_source.date_shard}") + + assert self.tbl_exists(dataset=self.ga_source.dataset, + table_name=f"flat_pseudo_user_audiences_{self.ga_source.date_shard}") + + def test_flatten_ga_data_partitioned(self): + sharded_output_required = False + partitioned_output_required = True + + query = self.ga_source.build_full_query(sharded_output_required=sharded_output_required, + partitioned_output_required=partitioned_output_required, + list_of_flat_tables=["flat_pseudo_users"]) + + self.ga_source.run_query_job(query, wait_for_the_query_job_to_complete=True) + + assert self.tbl_exists(dataset=self.ga_source.dataset, + table_name=f"flat_pseudo_users") + +class TestCFFlattenMethodsPseudoUsersSourceDoesNotExist(BaseUnitTest): + c = Context() + ga_source = GaExportedNestedDataStorage(gcp_project=c.env["project"], + dataset=c.env["dataset"], + table_type="pseudonymous_users", + date_shard="20240101", + ) + + def tbl_exists(self, dataset, table_name): + """ + https://stackoverflow.com/questions/28731102/bigquery-check-if-table-already-exists + """ + client = bigquery.Client(project=self.ga_source.gcp_project) + + full_table_path = f"{self.ga_source.gcp_project}.{dataset}.{table_name}" + table_id = bigquery.Table(full_table_path) + try: + client.get_table(table_id) + return True + except NotFound: + return False + + def test_flatten_ga_data_sharded(self): + sharded_output_required = True + partitioned_output_required = False + + query = self.ga_source.build_full_query(sharded_output_required=sharded_output_required, + partitioned_output_required=partitioned_output_required, + list_of_flat_tables=["flat_pseudo_users", + "flat_pseudo_user_properties", + "flat_pseudo_user_audiences"]) + + self.ga_source.run_query_job(query, wait_for_the_query_job_to_complete=True) + + assert not self.tbl_exists(dataset=self.ga_source.dataset, + table_name=f"flat_pseudo_users_{self.ga_source.date_shard}") + + assert not self.tbl_exists(dataset=self.ga_source.dataset, + table_name=f"flat_pseudo_user_properties_{self.ga_source.date_shard}") + + assert not self.tbl_exists(dataset=self.ga_source.dataset, + table_name=f"flat_pseudo_user_audiences_{self.ga_source.date_shard}") + def tearDown(self): + self.delete_all_flat_tables_from_dataset() + pass diff --git a/tests/test_flatten_ga_data_3_users.py b/tests/test_flatten_ga_data_3_users.py new file mode 100644 index 0000000..d359d3d --- /dev/null +++ b/tests/test_flatten_ga_data_3_users.py @@ -0,0 +1,110 @@ +from tests.test_base import BaseUnitTest +from tests.test_base import Context +from cf.main import GaExportedNestedDataStorage +from google.cloud import bigquery +from google.cloud.exceptions import NotFound + +#TODO: reduce repetition, write a function which takes date shared as param + +class TestCFFlattenMethodsUsers(BaseUnitTest): + c = Context() + + ga_source = GaExportedNestedDataStorage(gcp_project=c.env["project"], + dataset="analytics_123456789", + table_type="users", + date_shard="20241107", + ) + + def tbl_exists(self, dataset, table_name): + """ + https://stackoverflow.com/questions/28731102/bigquery-check-if-table-already-exists + """ + client = bigquery.Client(project=self.ga_source.gcp_project) + + full_table_path = f"{self.ga_source.gcp_project}.{dataset}.{table_name}" + table_id = bigquery.Table(full_table_path) + try: + client.get_table(table_id) + return True + except NotFound: + return False + + def test_flatten_ga_data_sharded(self): + sharded_output_required = True + partitioned_output_required = False + + query = self.ga_source.build_full_query(sharded_output_required=sharded_output_required, + partitioned_output_required=partitioned_output_required, + list_of_flat_tables=["flat_users", + "flat_users_user_properties", + "flat_users_user_audiences"]) + + self.ga_source.run_query_job(query, wait_for_the_query_job_to_complete=True) + + assert self.tbl_exists(dataset=self.ga_source.dataset, + table_name=f"flat_users_{self.ga_source.date_shard}") + + assert self.tbl_exists(dataset=self.ga_source.dataset, + table_name=f"flat_users_user_properties_{self.ga_source.date_shard}") + + assert self.tbl_exists(dataset=self.ga_source.dataset, + table_name=f"flat_users_user_audiences_{self.ga_source.date_shard}") + + def test_flatten_ga_data_partitioned(self): + sharded_output_required = False + partitioned_output_required = True + + query = self.ga_source.build_full_query(sharded_output_required=sharded_output_required, + partitioned_output_required=partitioned_output_required, + list_of_flat_tables=["flat_users"]) + + self.ga_source.run_query_job(query, wait_for_the_query_job_to_complete=True) + + assert self.tbl_exists(dataset=self.ga_source.dataset, + table_name=f"flat_users") + +class TestCFFlattenMethodsUsersSourceDoesNotExist(BaseUnitTest): + c = Context() + ga_source = GaExportedNestedDataStorage(gcp_project=c.env["project"], + dataset="analytics_123456789", + table_type="users", + date_shard="20240101", + ) + + def tbl_exists(self, dataset, table_name): + """ + https://stackoverflow.com/questions/28731102/bigquery-check-if-table-already-exists + """ + client = bigquery.Client(project=self.ga_source.gcp_project) + + full_table_path = f"{self.ga_source.gcp_project}.{dataset}.{table_name}" + table_id = bigquery.Table(full_table_path) + try: + client.get_table(table_id) + return True + except NotFound: + return False + + def test_flatten_ga_data_sharded(self): + sharded_output_required = True + partitioned_output_required = False + + query = self.ga_source.build_full_query(sharded_output_required=sharded_output_required, + partitioned_output_required=partitioned_output_required, + list_of_flat_tables=["flat_users", + "flat_users_user_properties", + "flat_users_user_audiences"]) + + self.ga_source.run_query_job(query, wait_for_the_query_job_to_complete=True) + + assert not self.tbl_exists(dataset=self.ga_source.dataset, + table_name=f"flat_users_{self.ga_source.date_shard}") + + assert not self.tbl_exists(dataset=self.ga_source.dataset, + table_name=f"flat_users_user_properties_{self.ga_source.date_shard}") + + assert not self.tbl_exists(dataset=self.ga_source.dataset, + table_name=f"flat_users_user_audiences_{self.ga_source.date_shard}") + def tearDown(self): + # self.delete_all_flat_tables_from_dataset() + pass diff --git a/tests/test_flatten_ga_data_intraday.py b/tests/test_flatten_ga_data_4_intraday.py similarity index 100% rename from tests/test_flatten_ga_data_intraday.py rename to tests/test_flatten_ga_data_4_intraday.py From cba12751c0214ecd9bbddc6ad67a9d617422f6ec Mon Sep 17 00:00:00 2001 From: RuslanBergenov Date: Sat, 9 Nov 2024 12:49:38 -0700 Subject: [PATCH 14/23] test: test suite --- tests/test.py | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/tests/test.py b/tests/test.py index 037e6c9..8f7204a 100644 --- a/tests/test.py +++ b/tests/test.py @@ -1,13 +1,14 @@ import unittest from tests.test_build_ga_flattener_config import TestCFBuildFlattenerGaDatasetConfig -from tests.test_flatten_ga_data import (TestCFFlattenMethods, - TestCFFlattenMethodsSchemaChangeCollectedTrafficSource, - TestCFFlattenMethodsSchemaChangeIsActiveUser, - TestCFFlattenMethodsUsersSourceDoesNotExist, - TestCFFlattenMethodsPseudoUsers, - # TestCFFlattenMethodsUsers, - ) -from tests.test_flatten_ga_data_intraday import TestCFFlattenMethodsIntraday +from tests.test_flatten_ga_data_1_events import (TestCFFlattenMethods, + TestCFFlattenMethodsSchemaChangeCollectedTrafficSource, + TestCFFlattenMethodsSchemaChangeIsActiveUser, + ) + +from tests.test_flatten_ga_data_2_pseudo_users import TestCFFlattenMethodsPseudoUsers, TestCFFlattenMethodsPseudoUsersSourceDoesNotExist +from tests.test_flatten_ga_data_3_users import TestCFFlattenMethodsUsers, TestCFFlattenMethodsUsersSourceDoesNotExist +from tests.test_flatten_ga_data_4_intraday import TestCFFlattenMethodsIntraday + from tests.test_generate_config_b import TestGenerateConfigB from tests.test_generate_config_cf import TestGenerateConfigCf from tests.test_generate_config_lm import TestGenerateConfigLm @@ -30,15 +31,23 @@ # tests test_suite.addTest(unittest.makeSuite(TestCFBuildFlattenerGaDatasetConfig)) + # FLATTEN tests start + # events test_suite.addTest(unittest.makeSuite(TestCFFlattenMethods)) test_suite.addTest(unittest.makeSuite(TestCFFlattenMethodsSchemaChangeCollectedTrafficSource)) test_suite.addTest(unittest.makeSuite(TestCFFlattenMethodsSchemaChangeIsActiveUser)) - test_suite.addTest(unittest.makeSuite(TestCFFlattenMethodsUsersSourceDoesNotExist)) + # pseudo users test_suite.addTest(unittest.makeSuite(TestCFFlattenMethodsPseudoUsers)) + test_suite.addTest(unittest.makeSuite(TestCFFlattenMethodsPseudoUsersSourceDoesNotExist)) + + # users test_suite.addTest(unittest.makeSuite(TestCFFlattenMethodsUsers)) + test_suite.addTest(unittest.makeSuite(TestCFFlattenMethodsUsersSourceDoesNotExist)) + # intraday test_suite.addTest(unittest.makeSuite(TestCFFlattenMethodsIntraday)) + # FLATTEN tests end test_suite.addTest(unittest.makeSuite(TestGenerateConfigB)) test_suite.addTest(unittest.makeSuite(TestGenerateConfigCf)) From 0f722065720ef398d4f5b0c3ed82ca21ee44293a Mon Sep 17 00:00:00 2001 From: RuslanBergenov Date: Sat, 9 Nov 2024 12:50:35 -0700 Subject: [PATCH 15/23] test: CI/CD --- .github/workflows/python-package.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 9120315..0aa7187 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -10,7 +10,7 @@ env: on: push: - branches: [ staging ] + branches: [ staging, development-users ] pull_request: branches: [ master ] From 2f38c4aa41a1c55ad5522dfa3397c39e96c9e34b Mon Sep 17 00:00:00 2001 From: RuslanBergenov Date: Sat, 9 Nov 2024 12:53:15 -0700 Subject: [PATCH 16/23] docs: todo removed --- dm_helper.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dm_helper.py b/dm_helper.py index 410e1d0..471164c 100644 --- a/dm_helper.py +++ b/dm_helper.py @@ -103,7 +103,7 @@ def __init__(self, context_environment_vars): "USERS_USER_PROPERTIES": "users_user_properties", "USERS_USER_AUDIENCES": "users_user_audiences", "LOCATION_ID": "us-central1", - "TOPIC_NAME": self.get_topic_id() #TODO: add users + "TOPIC_NAME": self.get_topic_id() } def get_project(self): From 6ecfcf38fa424deb25a71a3fd4835beeec2b883b Mon Sep 17 00:00:00 2001 From: RuslanBergenov Date: Sat, 9 Nov 2024 12:54:41 -0700 Subject: [PATCH 17/23] test: cleanup --- tests/test_flatten_ga_data_3_users.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_flatten_ga_data_3_users.py b/tests/test_flatten_ga_data_3_users.py index d359d3d..1cebe00 100644 --- a/tests/test_flatten_ga_data_3_users.py +++ b/tests/test_flatten_ga_data_3_users.py @@ -106,5 +106,5 @@ def test_flatten_ga_data_sharded(self): assert not self.tbl_exists(dataset=self.ga_source.dataset, table_name=f"flat_users_user_audiences_{self.ga_source.date_shard}") def tearDown(self): - # self.delete_all_flat_tables_from_dataset() - pass + self.delete_all_flat_tables_from_dataset() + # pass From e39af0706b1a2f2eede97797946edce143eb3058 Mon Sep 17 00:00:00 2001 From: RuslanBergenov Date: Sat, 9 Nov 2024 12:55:52 -0700 Subject: [PATCH 18/23] docs: TODO --- cf/main.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cf/main.py b/cf/main.py index ce497cb..0e19c26 100644 --- a/cf/main.py +++ b/cf/main.py @@ -1,5 +1,4 @@ -#TODO: (optional for now) - intraday -#TODO: (optional for now) - remaining unit tests other than the most important ones +#TODO: (optional for now) - intraday flattening import base64 import json from google.cloud import bigquery From baccd162ef8f3da63e5ecbbfcd0fe421c621f77b Mon Sep 17 00:00:00 2001 From: RuslanBergenov Date: Sat, 9 Nov 2024 13:04:59 -0700 Subject: [PATCH 19/23] test: backfill for users --- tools/pubsub_message_publish.py | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/tools/pubsub_message_publish.py b/tools/pubsub_message_publish.py index d3c3dd7..f5035d6 100644 --- a/tools/pubsub_message_publish.py +++ b/tools/pubsub_message_publish.py @@ -1,3 +1,5 @@ +from collections import UserString + from google.cloud import pubsub_v1 import json import datetime, time @@ -17,12 +19,21 @@ project_id = "as-dev-ga4-flattener-320623" # GCP project ID, example: [PROJECT_ID] dry_run = False # set to False to Backfill. Setting to True will not pubish any messages to pubsub, but simply show what would have been published. # Desired dates to backfill, both start and end are inclusive -backfill_range_start = datetime.datetime(2024, 6, 11) -backfill_range_end = datetime.datetime(2024, 6, 11) # datetime.datetime.today() -datasets_to_backfill = ["analytics_222460912"] # GA properties to backfill, "analytics_222460912" + +# TESTING EVENTS +# datasets_to_backfill = ["analytics_222460912"] # GA properties to backfill, "analytics_222460912" +# backfill_range_start = datetime.datetime(2024, 6, 11) +# backfill_range_end = datetime.datetime(2024, 6, 11) # datetime.datetime.today() + +# TESTING USERS +datasets_to_backfill = ["analytics_123456789"] # GA properties to backfill, "analytics_222460912" +backfill_range_start = datetime.datetime(2024, 10, 28) +backfill_range_end = datetime.datetime(2024, 11, 7) # datetime.datetime.today() + table_types = [ - "events", - "pseudonymous_users" + # "events", + # "pseudonymous_users", + "users" ] '''*****************************''' ''' Configuration Section End ''' From e24b8d2600d9cb4edcfc8a772c1a1c4e6ca1e858 Mon Sep 17 00:00:00 2001 From: RuslanBergenov Date: Sat, 9 Nov 2024 13:16:27 -0700 Subject: [PATCH 20/23] fix: intraday flattening --- cf/main.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cf/main.py b/cf/main.py index 0e19c26..9379ffb 100644 --- a/cf/main.py +++ b/cf/main.py @@ -99,7 +99,7 @@ def __init__(self, gcp_project, dataset, table_type, date_shard): if self.table_type == "pseudonymous_users" or self.table_type=="users": self.date_field_name = "`date`" - elif self.table_type == "events": + elif self.table_type == "events" or self.table_type == "events_intraday": self.date_field_name = "event_date" def source_table_is_intraday(self): @@ -128,7 +128,7 @@ def get_temp_table_query(self): qry = "" - if self.table_type == "events": + if self.table_type in ("events", "events_intraday" ): qry = f""" CREATE OR REPLACE TEMP TABLE temp_events AS ( @@ -664,7 +664,7 @@ def build_full_query(self, sharded_output_required=True, partitioned_output_requ assert len(list_of_flat_tables) >= 1, "At least 1 flat table needs to be included in the config file" query = "" - if self.table_type in ("events", "users"): + if self.table_type in ("events", "events_intraday", "users"): query += self.get_temp_table_query() From ad88e569cf583e64df628d86638bd04575c01115 Mon Sep 17 00:00:00 2001 From: RuslanBergenov Date: Sat, 9 Nov 2024 13:45:14 -0700 Subject: [PATCH 21/23] fix: users_user_properties --- cf/main.py | 2 +- tools/pubsub_message_publish.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cf/main.py b/cf/main.py index 9379ffb..4084871 100644 --- a/cf/main.py +++ b/cf/main.py @@ -71,7 +71,7 @@ def flatten_nested_tables(self): tables = list(set(tables_config) & set(["users", "users_user_properties", - "users_audiences"])) + "users_user_audiences"])) return tables diff --git a/tools/pubsub_message_publish.py b/tools/pubsub_message_publish.py index f5035d6..5856905 100644 --- a/tools/pubsub_message_publish.py +++ b/tools/pubsub_message_publish.py @@ -27,7 +27,7 @@ # TESTING USERS datasets_to_backfill = ["analytics_123456789"] # GA properties to backfill, "analytics_222460912" -backfill_range_start = datetime.datetime(2024, 10, 28) +backfill_range_start = datetime.datetime(2024, 11, 7) backfill_range_end = datetime.datetime(2024, 11, 7) # datetime.datetime.today() table_types = [ From 323590d07d8d3c1e78a11a9bbfd7fd830e48f6d1 Mon Sep 17 00:00:00 2001 From: RuslanBergenov Date: Sat, 9 Nov 2024 13:45:26 -0700 Subject: [PATCH 22/23] test: CI/CD - temporarily removed unit tests --- .github/workflows/python-package.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 0aa7187..2907ded 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -75,10 +75,10 @@ jobs: # check for the config file gsutil stat gs://ga4-flattener-deployment-464892960897-adswerve-ga-flat-config/config_datasets.json || echo "ERROR: GCS config file not found" - - name: Test with pytest - run: | - # test - pytest --verbose +# - name: Test with pytest +# run: | +# # test +# pytest --verbose - name: Clean up run: | # clean up From cdb22d124d4dc6f1d7d3939b7fd913f4b80a32c2 Mon Sep 17 00:00:00 2001 From: RuslanBergenov Date: Sat, 9 Nov 2024 13:54:37 -0700 Subject: [PATCH 23/23] test: CI/CD --- .github/workflows/python-package.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 2907ded..86335d8 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -10,7 +10,7 @@ env: on: push: - branches: [ staging, development-users ] + branches: [ staging ] pull_request: branches: [ master ] @@ -76,9 +76,9 @@ jobs: gsutil stat gs://ga4-flattener-deployment-464892960897-adswerve-ga-flat-config/config_datasets.json || echo "ERROR: GCS config file not found" # - name: Test with pytest -# run: | -# # test -# pytest --verbose + run: | + # test + pytest --verbose - name: Clean up run: | # clean up