Skip to content

Commit

Permalink
Merge branch 'feature/handle-xml-n-csv' into feature/dss12-sc-155806-…
Browse files Browse the repository at this point in the history
…add-secure-basic-preset
  • Loading branch information
alexbourret committed Mar 25, 2024
2 parents cd877f7 + c37132f commit 254fd21
Show file tree
Hide file tree
Showing 9 changed files with 137 additions and 24 deletions.
15 changes: 14 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,28 @@
# Changelog

## [Version 1.2.1](https://github.com/dataiku/dss-plugin-api-connect/releases/tag/v1.2.1) - Feature release - 2023-11-20
## [Version 1.2.2](https://github.com/dataiku/dss-plugin-api-connect/releases/tag/v1.2.2) - Feature release - 2024-02-14

- Handle XML and CSV endpoints
- Add secure SSO preset
- Add secure username / password preset

## [Version 1.2.1](https://github.com/dataiku/dss-plugin-api-connect/releases/tag/v1.2.1) - Bugfix release - 2023-12-13

- Fix the `Add an error column` error behaviour on the recipe
- Code-env descriptor for DSS 12

## [Version 1.1.5](https://github.com/dataiku/dss-plugin-api-connect/releases/tag/v1.1.5) - Bugfix release - 2023-12-13

- Fix the `Add an error column` error behaviour on the recipe
- Code-env descriptor for DSS 11

## [Version 1.2.0](https://github.com/dataiku/dss-plugin-api-connect/releases/tag/v1.2.0) - Feature and bugfix release - 2023-05-31

- Add Brotli compression
- Faster recurring calls
- dku_error column kept at all time in API-Connect recipe output schema
- Add XML to JSON conversion
- Add CSV decoding
- Updated code-env descriptor for DSS 12

## [Version 1.1.4](https://github.com/dataiku/dss-plugin-api-connect/releases/tag/v1.1.4) - Feature and bugfix release - 2023-02-28
Expand Down
1 change: 1 addition & 0 deletions code-env/python/spec/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ jsonpath-ng==1.5.3
requests_ntlm==1.1.0
requests==2.26.0
Brotli==1.0.9
xmltodict==0.13.0
2 changes: 1 addition & 1 deletion plugin.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"id": "api-connect",
"version": "1.2.1",
"version": "1.2.2",
"meta": {
"label": "API Connect",
"description": "Retrieve data from any REST API",
Expand Down
12 changes: 10 additions & 2 deletions python-connectors/api-connect_dataset/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
from dataikuapi.utils import DataikuException
from safe_logger import SafeLogger
from rest_api_client import RestAPIClient
from dku_utils import get_dku_key_values, get_endpoint_parameters, parse_keys_for_json, get_value_from_path, get_secure_credentials
from dku_utils import (
get_dku_key_values, get_endpoint_parameters,
parse_keys_for_json, get_value_from_path, get_secure_credentials, decode_csv_data
)
from dku_constants import DKUConstants
import json

Expand Down Expand Up @@ -50,9 +53,14 @@ def generate_rows(self, dataset_schema=None, dataset_partitioning=None,
record_count += len(data)
for row in data:
yield self.format_output(row, metadata)
else:
elif isinstance(data, dict):
record_count += 1
yield self.format_output(data, metadata)
else:
data = decode_csv_data(data)
record_count += len(data)
for row in data:
yield self.format_output(row, metadata)
if is_records_limit and record_count >= records_limit:
break

Expand Down
2 changes: 1 addition & 1 deletion python-lib/dku_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ class DKUConstants(object):
API_RESPONSE_KEY = "api_response"
FORBIDDEN_KEYS = ["token", "password", "api_key_value", "secure_token"]
FORM_DATA_BODY_FORMAT = "FORM_DATA"
PLUGIN_VERSION = "1.2.1"
PLUGIN_VERSION = "1.2.2-beta.1"
RAW_BODY_FORMAT = "RAW"
REPONSE_ERROR_KEY = "dku_error"
38 changes: 38 additions & 0 deletions python-lib/dku_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import json
import copy
from jsonpath_ng.ext import parse
from safe_logger import SafeLogger


logger = SafeLogger("api-connect plugin utils")


def get_dku_key_values(endpoint_query_string):
Expand Down Expand Up @@ -116,3 +120,37 @@ def extract_key_using_json_path(json_dictionary, json_path):
return res
else:
return None


def is_reponse_xml(response):
content_types = response.headers.get("Content-Type", "").split(";")
for content_type in content_types:
if content_type in ["text/xml", "application/soap+xml", "text/plain", "application/xml"]:
return True
return False


def xml_to_json(content):
import xmltodict
json_response = None
try:
json_response = xmltodict.parse(content)
except Exception as error:
logger.error("XML to JSON conversion failed, processing as STRING ({})".format(error))
json_response = content
return json_response


def decode_csv_data(data):
import csv
import io
json_data = None
if isinstance(data, bytes):
data = data.decode("utf-8")
try:
reader = csv.DictReader(io.StringIO(data))
json_data = list(reader)
except Exception as error:
logger.error("Could not extract csv data. Error={}".format(error))
json_data = data
return json_data
44 changes: 32 additions & 12 deletions python-lib/rest_api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from pagination import Pagination
from safe_logger import SafeLogger
from loop_detector import LoopDetector
from dku_utils import get_dku_key_values, template_dict, format_template
from dku_utils import get_dku_key_values, template_dict, format_template, is_reponse_xml, xml_to_json
from dku_constants import DKUConstants
from rest_api_auth import get_auth

Expand Down Expand Up @@ -153,18 +153,9 @@ def request(self, method, url, can_raise_exeption=True, **kwargs):
if response.status_code in [204]:
self.pagination.update_next_page({}, response.links)
return self.empty_json_response()
try:
json_response = response.json()
except Exception as err:
self.pagination.update_next_page({}, None)
error_message = "Error '{}' when decoding JSON".format(str(err)[:100])
logger.error(error_message)
logger.error("response.content={}".format(response.content))
if can_raise_exeption:
raise RestAPIClientError("The API did not return JSON as expected. {}".format(error_message))
return {} if self.behaviour_when_error=="ignore" else {DKUConstants.REPONSE_ERROR_KEY: error_message}

self.pagination.update_next_page(json_response, response.links)
json_response = self.get_json_from_response(response, can_raise_exeption=can_raise_exeption)

return json_response

def request_with_redirect_retry(self, method, url, **kwargs):
Expand Down Expand Up @@ -235,3 +226,32 @@ def assert_secure_domain(self, url):
raise RestAPIClientError("The use of this secure preset is restricted to https URLs")
if not url.startswith(self.secure_domain):
raise RestAPIClientError("The use of this preset is restricted to the {} domain".format(self.secure_domain))

def get_json_from_response(self, response, can_raise_exeption=True):
json_response = None
if is_reponse_xml(response):
logger.info("XML reply detected, converting to JSON")
json_response = xml_to_json(response.content)
else:
try:
json_response = response.json()
except Exception as err:
self.pagination.update_next_page({}, None)
error_message = "Error '{}' when decoding JSON".format(str(err)[:100])
logger.error(error_message)
return response.content

self.pagination.update_next_page(json_response, response.links)
return json_response


def get_status_code(response):
if isinstance(response, requests.Response):
return response.status_code
return None


def get_headers(response):
if isinstance(response, requests.Response):
return response.headers
return None
39 changes: 32 additions & 7 deletions python-lib/rest_api_recipe_session.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from dataikuapi.utils import DataikuException
from rest_api_client import RestAPIClient
from safe_logger import SafeLogger
from dku_utils import parse_keys_for_json, get_value_from_path
from dku_utils import parse_keys_for_json, get_value_from_path, decode_csv_data
from dku_constants import DKUConstants
import copy
import json
import requests
import collections


logger = SafeLogger("api-connect plugin", forbidden_keys=DKUConstants.FORBIDDEN_KEYS)
Expand Down Expand Up @@ -80,13 +81,15 @@ def retrieve_next_page(self, is_raw_output):
logger.info("retrieve_next_page: Calling next page")
json_response = self.client.paginated_api_call(can_raise_exeption=self.can_raise)
default_dict = {
DKUConstants.REPONSE_ERROR_KEY: json_response.get(DKUConstants.REPONSE_ERROR_KEY, None)
DKUConstants.REPONSE_ERROR_KEY: ""
} if self.behaviour_when_error == "keep-error-column" else {}
if isinstance(json_response, dict) and DKUConstants.REPONSE_ERROR_KEY in default_dict:
default_dict[DKUConstants.REPONSE_ERROR_KEY] = json_response.get(DKUConstants.REPONSE_ERROR_KEY, None)
metadata = self.client.get_metadata() if self.display_metadata else default_dict
is_api_returning_dict = True
if self.extraction_key:
data_rows = get_value_from_path(json_response, self.extraction_key.split("."), can_raise=False)
if data_rows is None or type(data_rows) != list:
if data_rows is None:
if self.behaviour_when_error == "ignore":
return []
error_message = "Extraction key '{}' was not found in the incoming data".format(self.extraction_key)
Expand Down Expand Up @@ -117,7 +120,13 @@ def retrieve_next_page(self, is_raw_output):
base_row.update(parse_keys_for_json(row))
base_row.update(self.initial_parameter_columns)
page_rows.append(base_row)

else:
json_response = decode_csv_data(json_response)
for row in json_response:
base_row = copy.deepcopy(metadata)
base_row.update(parse_keys_for_json(row))
base_row.update(self.initial_parameter_columns)
page_rows.append(base_row)
if is_api_returning_dict:
base_row.update(self.initial_parameter_columns)
page_rows.append(base_row)
Expand All @@ -126,7 +135,23 @@ def retrieve_next_page(self, is_raw_output):
def format_page_rows(self, data_rows, is_raw_output, metadata=None):
page_rows = []
metadata = metadata or {}
for data_row in data_rows:
if type(data_rows) in [str, bytes]:
data_rows = decode_csv_data(data_rows)
if type(data_rows) in [list]:
for data_row in data_rows:
base_row = copy.deepcopy(self.initial_parameter_columns)
base_row.update(metadata)
if is_raw_output:
if is_error_message(data_row):
base_row.update(parse_keys_for_json(data_row))
else:
base_row.update({
DKUConstants.API_RESPONSE_KEY: json.dumps(data_row)
})
else:
base_row.update(parse_keys_for_json(data_row))
page_rows.append(base_row)
if type(data_rows) in [dict, collections.OrderedDict]:
base_row = copy.deepcopy(self.initial_parameter_columns)
base_row.update(metadata)
if is_raw_output:
Expand All @@ -137,10 +162,10 @@ def format_page_rows(self, data_rows, is_raw_output, metadata=None):
base_row.update(parse_keys_for_json(data_row))
else:
base_row.update({
DKUConstants.API_RESPONSE_KEY: json.dumps(data_row)
DKUConstants.API_RESPONSE_KEY: json.dumps(data_rows)
})
else:
base_row.update(parse_keys_for_json(data_row))
base_row.update(parse_keys_for_json(data_rows))
page_rows.append(base_row)
return page_rows

Expand Down
8 changes: 8 additions & 0 deletions tests/python/integration/test_scenario.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,11 @@ def test_run_api_connect_relative_url_pagination(user_dss_clients):

def test_run_api_connect_check_sc_110446(user_dss_clients):
dss_scenario.run(user_dss_clients, project_key=TEST_PROJECT_KEY, scenario_id="CHECKSC110446")


def test_run_api_connect_check_sc_163656_error_column_always_in_schema(user_dss_clients):
dss_scenario.run(user_dss_clients, project_key=TEST_PROJECT_KEY, scenario_id="SC163656")


def test_run_api_connect_xml_handling(user_dss_clients):
dss_scenario.run(user_dss_clients, project_key=TEST_PROJECT_KEY, scenario_id="XML_HANDLING")

0 comments on commit 254fd21

Please sign in to comment.