Skip to content

Commit

Permalink
Implements consent on Customer Match (#157)
Browse files Browse the repository at this point in the history
  • Loading branch information
Mr-Lopes authored Dec 15, 2023
1 parent 073c4b8 commit 5aef29c
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ def _get_login_customer_id(self, account_config: AccountConfig, destination: Des
return self._get_customer_id(account_config, destination)

def _get_job_by_list_name(self, offline_user_data_job_service, list_resource_name: str, operator: str,
customer_id: str, login_customer_id: str) -> str:
customer_id: str, login_customer_id: str, consents: Dict[str,Any]) -> str:
cache_key = f"{list_resource_name}:{operator}"

if cache_key in self._job_cache:
Expand All @@ -170,7 +170,8 @@ def _get_job_by_list_name(self, offline_user_data_job_service, list_resource_nam
job_creation_payload = {
'type_': 'CUSTOMER_MATCH_USER_LIST',
'customer_match_user_list_metadata': {
'user_list': list_resource_name
'user_list': list_resource_name,
**(consents)
}
}

Expand All @@ -194,6 +195,21 @@ def _get_remove_all(self, operator: str) -> bool:
def get_filtered_rows(self, rows: List[Any], keys: List[str]) -> List[Dict[str, Any]]:
return [{key: row.get(key) for key in keys if key in row} for row in rows]

def _get_consents(self, destination_metadata: List[str]) -> Dict[str, str]:
if len(destination_metadata) >= 7:
if (
destination_metadata[5] is not None
and destination_metadata[6] is not None
):
return {
"consent": {
"ad_user_data": destination_metadata[5],
"ad_personalization": destination_metadata[6],
},
}

return {}

@utils.safe_process(logger=logging.getLogger(_DEFAULT_LOGGER))
def process(self, batch: Batch, **kwargs):
if not self.active:
Expand All @@ -218,8 +234,10 @@ def process(self, batch: Batch, **kwargs):
execution.destination.destination_metadata))

operator = self._get_list_operator(execution.destination.destination_metadata[1])

consents = self._get_consents(execution.destination.destination_metadata)
job_resource_name = self._get_job_by_list_name(offline_user_data_job_service, list_resource_name, operator,
customer_id, login_customer_id)
customer_id, login_customer_id, consents)

rows = self.get_filtered_rows(batch.elements, self.get_row_keys())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,12 @@ class GoogleAdsCustomerMatchContactInfoUploaderDoFn(GoogleAdsCustomerMatchAbstra
def get_list_definition(self, account_config: AccountConfig, destination_metadata: List[str]) -> Dict[str, Any]:
list_name = destination_metadata[0]
# Defines the list's lifespan to unlimited
life_span = 10000

# Overwrites lifespan value if any
if len(destination_metadata) >=6 and destination_metadata[5]:
life_span = int(destination_metadata[5])

return {
'membership_status': 'OPEN',
'name': list_name,
'description': 'List created automatically by Megalista',
'membership_life_span': life_span,
'membership_life_span': 10000,
'crm_based_user_list': {
'upload_key_type': 'CONTACT_INFO', #CONTACT_INFO, CRM_ID, MOBILE_ADVERTISING_ID
'data_source_type': 'FIRST_PARTY',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,4 +267,70 @@ def test_upload_add_users_with_mcc_account_override(mocker, uploader):
uploader._get_offline_user_data_job_service.assert_called_with(_ads_mcc_account_id)
uploader._get_offline_user_data_job_service.return_value.add_offline_user_data_job_operations.assert_called_once_with(
request=data_insertion_payload
)
)

def test_upload_add_users_consent(mocker, uploader, error_notifier):
mocker.patch.object(uploader, '_get_offline_user_data_job_service')

uploader._get_offline_user_data_job_service.return_value.create_offline_user_data_job.return_value.resource_name = 'a'

uploader._get_offline_user_data_job_service.return_value.add_offline_user_data_job_operations.return_value = MagicMock().partial_failure_error = None

destination = Destination(
'dest1', DestinationType.ADS_CUSTOMER_MATCH_CONTACT_INFO_UPLOAD, ['user_list', 'ADD', '', '', '', 'GRANTED', 'GRANTED'])
source = Source('orig1', SourceType.BIG_QUERY, ['dt1', 'buyers'])
execution = Execution(_account_config, source, destination)

batch = Batch(execution, [{
'hashed_email': 'email1',
'hashed_phone_number': 'phone1',
'address_info': {
'hashed_first_name': 'first1',
'hashed_last_name': 'last1',
'country_code': 'country1',
'postal_code': 'postal1',
}
}])

uploader.process(batch)
uploader.finish_bundle()

data_insertion_payload = {
'enable_partial_failure': True,
'operations': [
{'create': {'user_identifiers': [{'hashed_email': 'email1'}]}},
{'create': {'user_identifiers': [{'address_info': {
'hashed_first_name': 'first1',
'hashed_last_name': 'last1',
'country_code': 'country1',
'postal_code': 'postal1'}},
]}},
{'create': {'user_identifiers': [{'hashed_phone_number': 'phone1'}]}}
],
'resource_name': 'a',
}

uploader._get_offline_user_data_job_service.assert_called_with(_ads_account_id)

job_creation_payload = {
'type_': 'CUSTOMER_MATCH_USER_LIST',
'customer_match_user_list_metadata': {
'user_list': 'None',
'consent': {
'ad_user_data': 'GRANTED',
'ad_personalization': 'GRANTED'
}
}
}

uploader._get_offline_user_data_job_service.return_value.create_offline_user_data_job.assert_called_once_with(customer_id=_ads_account_id,
job=job_creation_payload
)

uploader._get_offline_user_data_job_service.return_value.add_offline_user_data_job_operations.assert_called_once_with(
request=data_insertion_payload
)

uploader._get_offline_user_data_job_service.return_value.run_offline_user_data_job.assert_called_once()

assert not error_notifier.were_errors_sent

0 comments on commit 5aef29c

Please sign in to comment.