diff --git a/client/app/assets/images/db-logos/yandex_disk.png b/client/app/assets/images/db-logos/yandex_disk.png new file mode 100644 index 0000000000..7b375648df Binary files /dev/null and b/client/app/assets/images/db-logos/yandex_disk.png differ diff --git a/redash/query_runner/python.py b/redash/query_runner/python.py index 28219a7f89..939bcfbf75 100644 --- a/redash/query_runner/python.py +++ b/redash/query_runner/python.py @@ -9,6 +9,7 @@ guarded_unpack_sequence, safe_builtins, ) +from RestrictedPython.transformer import IOPERATOR_TO_STR from redash import models from redash.query_runner import ( @@ -23,16 +24,17 @@ register, ) from redash.utils import json_dumps, json_loads +from redash.utils.pandas import pandas_installed -try: - import numpy as np +if pandas_installed: import pandas as pd - pandas_installed = True -except ImportError: - pandas_installed = False + from redash.utils.pandas import pandas_to_result + + enabled = True +else: + enabled = False -from RestrictedPython.transformer import IOPERATOR_TO_STR logger = logging.getLogger(__name__) @@ -271,26 +273,11 @@ def get_query_result(query_id): return query.latest_query_data.data def dataframe_to_result(self, result, df): - result["rows"] = df.to_dict("records") - - for column_name, column_type in df.dtypes.items(): - if column_type == np.bool_: - redash_type = TYPE_BOOLEAN - elif column_type == np.inexact: - redash_type = TYPE_FLOAT - elif column_type == np.integer: - redash_type = TYPE_INTEGER - elif column_type in (np.datetime64, np.dtype(" 10: - redash_type = TYPE_DATETIME - else: - redash_type = TYPE_DATE - else: - redash_type = TYPE_STRING + converted_result = pandas_to_result(df) - self.add_result_column(result, column_name, column_name, redash_type) + result["rows"] = converted_result["rows"] + for column in converted_result["columns"]: + self.add_result_column(result, column["name"], column["friendly_name"], column["type"]) def get_current_user(self): return self._current_user.to_dict() diff --git a/redash/query_runner/yandex_disk.py b/redash/query_runner/yandex_disk.py new file mode 100644 index 0000000000..145c52b212 --- /dev/null +++ b/redash/query_runner/yandex_disk.py @@ -0,0 +1,166 @@ +import logging +from importlib.util import find_spec + +import requests +import yaml + +from redash.query_runner import BaseSQLQueryRunner, register +from redash.utils import json_dumps +from redash.utils.pandas import pandas_installed + +openpyxl_installed = find_spec("openpyxl") + +if pandas_installed and openpyxl_installed: + import openpyxl # noqa: F401 + import pandas as pd + + from redash.utils.pandas import pandas_to_result + + enabled = True + + EXTENSIONS_READERS = { + "csv": pd.read_csv, + "tsv": pd.read_table, + "xls": pd.read_excel, + "xlsx": pd.read_excel, + } +else: + enabled = False + +logger = logging.getLogger(__name__) + + +class YandexDisk(BaseSQLQueryRunner): + should_annotate_query = False + + @classmethod + def type(cls): + return "yandex_disk" + + @classmethod + def name(cls): + return "Yandex Disk" + + @classmethod + def configuration_schema(cls): + return { + "type": "object", + "properties": { + "token": {"type": "string", "title": "OAuth Token"}, + }, + "secret": ["token"], + "required": ["token"], + } + + def __init__(self, configuration): + super(YandexDisk, self).__init__(configuration) + self.syntax = "yaml" + self.base_url = "https://cloud-api.yandex.net/v1/disk" + self.list_path = "counters" + + def _get_tables(self, schema): + offset = 0 + limit = 100 + + while True: + tmp_response = self._send_query( + "resources/public", media_type="spreadsheet,text", limit=limit, offset=offset + ) + + tmp_items = tmp_response["items"] + + for file_info in tmp_items: + file_name = file_info["name"] + file_path = file_info["path"].replace("disk:", "") + + file_extension = file_name.split(".")[-1].lower() + if file_extension not in EXTENSIONS_READERS: + continue + + schema[file_name] = {"name": file_name, "columns": [file_path]} + + if len(tmp_items) < limit: + break + + offset += limit + + return list(schema.values()) + + def test_connection(self): + self._send_query() + + def _send_query(self, url_path="", **kwargs): + token = kwargs.pop("oauth_token", self.configuration["token"]) + r = requests.get( + f"{self.base_url}/{url_path}", + headers={"Authorization": f"OAuth {token}"}, + params=kwargs, + ) + + response_data = r.json() + + if not r.ok: + error_message = f"Code: {r.status_code}, message: {r.text}" + raise Exception(error_message) + return response_data + + def run_query(self, query, user): + logger.debug("Yandex Disk is about to execute query: %s", query) + data = None + + if not query: + error = "Query is empty" + return data, error + + try: + params = yaml.safe_load(query) + except (ValueError, AttributeError) as e: + logger.exception(e) + error = f"YAML read error: {str(e)}" + return data, error + + if not isinstance(params, dict): + error = "The query format must be JSON or YAML" + return data, error + + if "path" not in params: + error = "The query must contain path" + return data, error + + file_extension = params["path"].split(".")[-1].lower() + + read_params = {} + is_multiple_sheets = False + + if file_extension not in EXTENSIONS_READERS: + error = f"Unsupported file extension: {file_extension}" + return data, error + elif file_extension in ("xls", "xlsx"): + read_params["sheet_name"] = params.get("sheet_name", 0) + if read_params["sheet_name"] is None: + is_multiple_sheets = True + + file_url = self._send_query("resources/download", path=params["path"])["href"] + + try: + df = EXTENSIONS_READERS[file_extension](file_url, **read_params) + except Exception as e: + logger.exception(e) + error = f"Read file error: {str(e)}" + return data, error + + if is_multiple_sheets: + new_df = [] + for sheet_name, sheet_df in df.items(): + sheet_df["sheet_name"] = sheet_name + new_df.append(sheet_df) + new_df = pd.concat(new_df, ignore_index=True) + df = new_df.copy() + + data = json_dumps(pandas_to_result(df)) + error = None + + return data, error + + +register(YandexDisk) diff --git a/redash/settings/__init__.py b/redash/settings/__init__.py index 74842fd769..feea99f287 100644 --- a/redash/settings/__init__.py +++ b/redash/settings/__init__.py @@ -298,6 +298,7 @@ def email_server_is_configured(): "redash.query_runner.clickhouse", "redash.query_runner.tinybird", "redash.query_runner.yandex_metrica", + "redash.query_runner.yandex_disk", "redash.query_runner.rockset", "redash.query_runner.treasuredata", "redash.query_runner.sqlite", diff --git a/redash/utils/pandas.py b/redash/utils/pandas.py new file mode 100644 index 0000000000..40d74b2c5e --- /dev/null +++ b/redash/utils/pandas.py @@ -0,0 +1,47 @@ +import logging +from importlib.util import find_spec + +from redash.query_runner import ( + TYPE_BOOLEAN, + TYPE_DATE, + TYPE_DATETIME, + TYPE_FLOAT, + TYPE_INTEGER, + TYPE_STRING, +) + +logger = logging.getLogger(__name__) + +pandas_installed = find_spec("pandas") and find_spec("numpy") + +if pandas_installed: + import numpy as np + import pandas as pd + + def get_column_types_from_dataframe(df: pd.DataFrame) -> list: + columns = [] + for column_name, column_type in df.dtypes.items(): + if column_type in (np.bool_,): + redash_type = TYPE_BOOLEAN + elif column_type in (np.int64, np.int32): + redash_type = TYPE_INTEGER + elif column_type in (np.float64,): + redash_type = TYPE_FLOAT + elif column_type in (np.datetime64, np.dtype(" 10: + redash_type = TYPE_DATETIME + else: + redash_type = TYPE_DATE + else: + redash_type = TYPE_STRING + + columns.append({"name": column_name, "friendly_name": column_name, "type": redash_type}) + + return columns + + def pandas_to_result(df: pd.DataFrame) -> dict: + columns = get_column_types_from_dataframe(df) + rows = df.to_dict("records") + return {"columns": columns, "rows": rows} diff --git a/tests/query_runner/test_yandex_disk.py b/tests/query_runner/test_yandex_disk.py new file mode 100644 index 0000000000..437e8068bb --- /dev/null +++ b/tests/query_runner/test_yandex_disk.py @@ -0,0 +1,250 @@ +from io import BytesIO +from unittest import mock + +import yaml + +from redash.query_runner.yandex_disk import enabled +from redash.utils import json_dumps + +if enabled: + import pandas as pd + + from redash.query_runner.yandex_disk import EXTENSIONS_READERS, YandexDisk + + test_df = pd.DataFrame( + [ + {"id": 1, "name": "Alice", "age": 20}, + {"id": 2, "name": "Bob", "age": 21}, + {"id": 3, "name": "Charlie", "age": 22}, + {"id": 4, "name": "Dave", "age": 23}, + {"id": 5, "name": "Eve", "age": 24}, + ] + ) + + +import pytest + +test_token = "AAAAQAA" +skip_condition = pytest.mark.skipif(not enabled, reason="pandas and/or openpyxl are not installed") + + +@pytest.fixture +def mock_yandex_disk(): + return YandexDisk(configuration={"token": test_token}) + + +@skip_condition +def test_yandex_disk_type(): + assert YandexDisk.type() == "yandex_disk" + + +@skip_condition +def test_yandex_disk_name(): + assert YandexDisk.name() == "Yandex Disk" + + +@skip_condition +@mock.patch("requests.get") +def test__send_query(mock_requests_get): + mock_requests_get.return_value.ok = True + mock_requests_get.return_value.json.return_value = {"foo": "bar"} + + configuration = {"token": test_token} + disk = YandexDisk(configuration) + response = disk._send_query("test_url") + + assert response == {"foo": "bar"} + mock_requests_get.assert_called_once() + + +@skip_condition +@pytest.mark.parametrize( + "configuration, error_message", + [({"token": test_token}, None), ({"token": ""}, "Code: 400, message: Unauthorized")], +) +@mock.patch("requests.get") +def test_test_connection(mock_requests_get, configuration, error_message): + if error_message: + mock_requests_get.return_value.ok = False + mock_requests_get.return_value.status_code = 400 + mock_requests_get.return_value.text = "Unauthorized" + else: + mock_requests_get.return_value.ok = True + + disk = YandexDisk(configuration) + if error_message: + with pytest.raises(Exception, match=error_message): + disk.test_connection() + else: + assert disk.test_connection() is None + + +@skip_condition +def test_get_tables(mock_yandex_disk): + mock_files = { + "items": [ + {"name": "test_file.csv", "path": "disk:/test_path/test_file.csv"}, + {"name": "invalid_file.txt", "path": "disk:/test_path/invalid_file.txt"}, + ] + } + mock_yandex_disk._send_query = mock.MagicMock(return_value=mock_files) + + tables = mock_yandex_disk._get_tables({}) + assert len(tables) == 1 + assert tables[0]["name"] == "test_file.csv" + assert tables[0]["columns"] == ["/test_path/test_file.csv"] + + +def mock_ext_readers_return(url, **params): + return test_df + + +def mock_ext_readers_return_multiple_sheets(url, **params): + return {"sheet1": test_df} + + +@skip_condition +@mock.patch("requests.get") +def test_run_query(mocked_requests, mock_yandex_disk): + mocked_response = mock.MagicMock() + mocked_response.ok = True + mocked_response.json.return_value = {"href": "test_file.csv"} + mocked_requests.return_value = mocked_response + + mock_readers = EXTENSIONS_READERS.copy() + mock_readers["csv"] = mock_ext_readers_return + + expected_data = json_dumps( + { + "columns": [ + {"name": "id", "friendly_name": "id", "type": "integer"}, + {"name": "name", "friendly_name": "name", "type": "string"}, + {"name": "age", "friendly_name": "age", "type": "integer"}, + ], + "rows": [ + {"id": 1, "name": "Alice", "age": 20}, + {"id": 2, "name": "Bob", "age": 21}, + {"id": 3, "name": "Charlie", "age": 22}, + {"id": 4, "name": "Dave", "age": 23}, + {"id": 5, "name": "Eve", "age": 24}, + ], + } + ) + + with mock.patch.dict("redash.query_runner.yandex_disk.EXTENSIONS_READERS", mock_readers, clear=True): + data, error = mock_yandex_disk.run_query(yaml.dump({"path": "/tmp/file.csv"}), "user") + + assert error is None + assert data == expected_data + + +@skip_condition +def test_run_query_with_empty_query(mock_yandex_disk): + result = mock_yandex_disk.run_query("", "user") + assert result == (None, "Query is empty") + + +@skip_condition +def test_run_query_nonstring_yaml(mock_yandex_disk): + bad_yaml_query = [0, 1] + data, error = mock_yandex_disk.run_query(bad_yaml_query, "user") + assert data is None + assert error.startswith("YAML read error: ") + + +@skip_condition +def test_run_query_bad_yaml(mock_yandex_disk): + bad_yaml_query = "unparseable = yaml" + result = mock_yandex_disk.run_query(bad_yaml_query, "user") + assert result == (None, "The query format must be JSON or YAML") + + +@skip_condition +def test_run_query_without_path(mock_yandex_disk): + bad_yaml_query = "without: path" + result = mock_yandex_disk.run_query(bad_yaml_query, "user") + assert result == (None, "The query must contain path") + + +@skip_condition +def test_run_query_unsupported_extension(mock_yandex_disk): + bad_yaml_query = "path: /tmp/file.txt" + result = mock_yandex_disk.run_query(bad_yaml_query, "user") + assert result == (None, "Unsupported file extension: txt") + + +@skip_condition +def test_run_query_read_file_error(mock_yandex_disk): + mock_yandex_disk._send_query = mock.MagicMock(return_value={"href": "test_file.csv"}) + mock_yandex_disk._get_tables = mock.MagicMock(return_value=[{"name": "test_file.csv", "columns": []}]) + mock_yandex_disk._read_file = mock.MagicMock(side_effect=Exception("Read file error")) + + data, error = mock_yandex_disk.run_query(yaml.dump({"path": "/tmp/file.csv"}), "user") + assert data is None + assert error is not None and error.startswith("Read file error") + + +@skip_condition +@mock.patch("requests.get") +def test_run_query_multiple_sheets(mocked_requests, mock_yandex_disk): + mocked_response = mock.MagicMock() + mocked_response.ok = True + mocked_response.json.return_value = {"href": "test_file.xlsx"} + mocked_requests.return_value = mocked_response + + query = """ + path: /tmp/file.xlsx + sheet_name: null + """ + + mock_readers = EXTENSIONS_READERS.copy() + mock_readers["xlsx"] = mock_ext_readers_return_multiple_sheets + + with mock.patch.dict("redash.query_runner.yandex_disk.EXTENSIONS_READERS", mock_readers, clear=True): + data, error = mock_yandex_disk.run_query(query, "user") + + assert error is None + assert data == json_dumps( + { + "columns": [ + {"name": "id", "friendly_name": "id", "type": "integer"}, + {"name": "name", "friendly_name": "name", "type": "string"}, + {"name": "age", "friendly_name": "age", "type": "integer"}, + {"name": "sheet_name", "friendly_name": "sheet_name", "type": "string"}, + ], + "rows": [ + {"id": 1, "name": "Alice", "age": 20, "sheet_name": "sheet1"}, + {"id": 2, "name": "Bob", "age": 21, "sheet_name": "sheet1"}, + {"id": 3, "name": "Charlie", "age": 22, "sheet_name": "sheet1"}, + {"id": 4, "name": "Dave", "age": 23, "sheet_name": "sheet1"}, + {"id": 5, "name": "Eve", "age": 24, "sheet_name": "sheet1"}, + ], + } + ) + + +@skip_condition +def test_read_xlsx(): + output = BytesIO() + writer = pd.ExcelWriter(output) + test_df.to_excel(writer, index=False) + writer.save() + assert test_df.equals(EXTENSIONS_READERS["xlsx"](output)) + + +@skip_condition +def test_read_csv(): + output = BytesIO() + test_df.to_csv(output, index=False) + output.seek(0) + + assert test_df.equals(EXTENSIONS_READERS["csv"](output)) + + +@skip_condition +def test_tsv(): + output = BytesIO() + test_df.to_csv(output, index=False, sep="\t") + output.seek(0) + + assert test_df.equals(EXTENSIONS_READERS["tsv"](output)) diff --git a/tests/test_utils.py b/tests/test_utils.py index 736d6899e6..78d193ab8f 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,7 +1,17 @@ from collections import namedtuple from unittest import TestCase +import pytest + from redash import create_app +from redash.query_runner import ( + TYPE_BOOLEAN, + TYPE_DATE, + TYPE_DATETIME, + TYPE_FLOAT, + TYPE_INTEGER, + TYPE_STRING, +) from redash.utils import ( build_url, collect_parameters_from_request, @@ -10,9 +20,18 @@ json_dumps, render_template, ) +from redash.utils.pandas import pandas_installed DummyRequest = namedtuple("DummyRequest", ["host", "scheme"]) +skip_condition = pytest.mark.skipif(not pandas_installed, reason="pandas is not installed") + +if pandas_installed: + import numpy as np + import pandas as pd + + from redash.utils.pandas import get_column_types_from_dataframe, pandas_to_result + class TestBuildUrl(TestCase): def test_simple_case(self): @@ -100,3 +119,44 @@ def test_render(self): html, text = [render_template("emails/failures.{}".format(f), d) for f in ["html", "txt"]] self.assertIn("Failure Unit Test", html) self.assertIn("Failure Unit Test", text) + + +@pytest.fixture +@skip_condition +def mock_dataframe(): + df = pd.DataFrame( + { + "boolean_col": [True, False], + "integer_col": [1, 2], + "float_col": [1.1, 2.2], + "date_col": [np.datetime64("2020-01-01"), np.datetime64("2020-05-05")], + "datetime_col": [np.datetime64("2020-01-01 12:00:00"), np.datetime64("2020-05-05 14:30:00")], + "string_col": ["A", "B"], + } + ) + return df + + +@skip_condition +def test_get_column_types_from_dataframe(mock_dataframe): + result = get_column_types_from_dataframe(mock_dataframe) + expected_output = [ + {"name": "boolean_col", "friendly_name": "boolean_col", "type": TYPE_BOOLEAN}, + {"name": "integer_col", "friendly_name": "integer_col", "type": TYPE_INTEGER}, + {"name": "float_col", "friendly_name": "float_col", "type": TYPE_FLOAT}, + {"name": "date_col", "friendly_name": "date_col", "type": TYPE_DATE}, + {"name": "datetime_col", "friendly_name": "datetime_col", "type": TYPE_DATETIME}, + {"name": "string_col", "friendly_name": "string_col", "type": TYPE_STRING}, + ] + + assert result == expected_output + + +@skip_condition +def test_pandas_to_result(mock_dataframe): + result = pandas_to_result(mock_dataframe) + + assert "columns" in result + assert "rows" in result + + assert mock_dataframe.equals(pd.DataFrame(result["rows"]))