Skip to content

Commit

Permalink
Merge pull request #20 from neha-rajput-salesforce/Neha
Browse files Browse the repository at this point in the history
@W-12178997: Python Connector Types not preserved fix @testfix@
  • Loading branch information
neha-rajput-salesforce authored Feb 15, 2023
2 parents 8b9406b + 7f13807 commit 325542c
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 9 deletions.
6 changes: 4 additions & 2 deletions salesforcecdpconnector/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
AUTH_PARAM_CDP_SUBJECT_TOKEN_TYPE_VALUE = 'urn:ietf:params:oauth:token-type:access_token'
AUTH_PARAM_REFRESH_TOKEN_GRANT_TYPE = 'refresh_token'

DATA_TYPE_TIMESTAMP = 'TIMESTAMP'

ENCODING_ASCII = 'ascii'

DATA_TYPE_DECIMAL = 'DECIMAL'
DATA_TYPE_TIMESTAMP = 'TIMESTAMP'
DATA_TYPE_TIMESTAMP_WITH_TIMEZONE = 'TIMESTAMP WITH TIME ZONE'

QUERY_RESPONSE_KEY_ARROW_STREAM = 'arrowStream'
QUERY_RESPONSE_KEY_DATA = 'data'
QUERY_RESPONSE_KEY_METADATA = 'metadata'
Expand Down
31 changes: 28 additions & 3 deletions salesforcecdpconnector/pandas_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@
#

import base64
import decimal

import dateutil.parser
import pyarrow

from .constants import API_VERSION_V2
from .constants import API_VERSION_V2, DATA_TYPE_DECIMAL, DATA_TYPE_TIMESTAMP_WITH_TIMEZONE,DATA_TYPE_TIMESTAMP
from .constants import QUERY_RESPONSE_KEY_DONE
from .constants import QUERY_RESPONSE_KEY_NEXT_BATCH_ID
from .constants import QUERY_RESPONSE_KEY_ARROW_STREAM
from .constants import QUERY_RESPONSE_KEY_METADATA
from .constants import QUERY_RESPONSE_KEY_METADATA_TYPE
from .constants import DATA_TYPE_TIMESTAMP
from .constants import ENCODING_ASCII
from .query_submitter import QuerySubmitter

Expand Down Expand Up @@ -45,6 +45,9 @@ def get_dataframe(connection, query):
if len(arrow_stream_list) > 0:
pandas_df = pyarrow.concat_tables(arrow_stream_list).to_pandas()
date_columns = PandasUtils._get_date_columns(result)
decimal_columns = PandasUtils._get_decimal_columns(result)
for decimal_column in decimal_columns:
pandas_df = pandas_df.apply(lambda row: PandasUtils._convert_to_decimal(row, decimal_column), axis=1)
for date_column in date_columns:
pandas_df = pandas_df.apply(lambda row: PandasUtils._convert_to_date(row, date_column), axis=1)
return pandas_df
Expand All @@ -71,7 +74,29 @@ def _istimestamp(key, metadata_list):
metadata_type = metadata_list[key][QUERY_RESPONSE_KEY_METADATA_TYPE]
if metadata_type is not None:
metadata_type = metadata_type.upper()
return metadata_type == DATA_TYPE_TIMESTAMP
return metadata_type == DATA_TYPE_TIMESTAMP or metadata_type == DATA_TYPE_TIMESTAMP_WITH_TIMEZONE

@staticmethod
def _get_decimal_columns(result):
metadata = result[QUERY_RESPONSE_KEY_METADATA]
decimal_columns = [x for x in metadata.keys() if PandasUtils._isdecimal(x, metadata)]
return decimal_columns

@staticmethod
def _isdecimal(key, metadata_list):
metadata_type = metadata_list[key][QUERY_RESPONSE_KEY_METADATA_TYPE]
if metadata_type is not None:
metadata_type = metadata_type.upper()
return metadata_type == DATA_TYPE_DECIMAL

@staticmethod
def _convert_to_decimal(row, column):
value = row[column]
if isinstance(value, decimal.Decimal):
row[column] = float(value)
else:
row[column] = None
return row

@staticmethod
def _get_pyarrow_table(encoded_arrow_stream):
Expand Down
10 changes: 7 additions & 3 deletions salesforcecdpconnector/query_result_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from .constants import QUERY_RESPONSE_KEY_METADATA
from .constants import QUERY_RESPONSE_KEY_DONE
from .constants import QUERY_RESPONSE_KEY_NEXT_BATCH_ID
from .constants import DATA_TYPE_TIMESTAMP
from .constants import DATA_TYPE_TIMESTAMP,DATA_TYPE_TIMESTAMP_WITH_TIMEZONE,DATA_TYPE_DECIMAL
from .constants import QUERY_RESPONSE_KEY_PLACE_IN_ORDER
from .parsed_query_result import QueryResult

Expand Down Expand Up @@ -47,10 +47,14 @@ def _convert_timestamps(data, description):
:return: None
"""
for i in range(0, len(description)):
if description[i][1] == DATA_TYPE_TIMESTAMP:
if description[i][1] == DATA_TYPE_TIMESTAMP or description[i][1] == DATA_TYPE_TIMESTAMP_WITH_TIMEZONE:
for data_row in data:
if data_row[i] is not None and isinstance(data_row[i], str) and len(data_row[i]) > 0:
data_row[i] = dateutil.parser.parse(data_row[i])
data_row[i] = str(dateutil.parser.parse(data_row[i]))
elif description[i][1] == DATA_TYPE_DECIMAL:
for data_row in data:
if data_row[i] is not None and isinstance(data_row[i], str) and len(data_row[i]) > 0:
data_row[i] = float(data_row[i])

@staticmethod
def _convert_metadata_item_to_description_item(metadata_item):
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[metadata]
name = salesforce-cdp-connector
version = 1.0.9
version = 1.0.10
author = Query Service
description = Python Connector for Salesforce CDP
long_description = file: README.md
Expand Down

0 comments on commit 325542c

Please sign in to comment.