Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Yandex.Disk Query runner #6598

Merged
merged 22 commits into from
Nov 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added client/app/assets/images/db-logos/yandex_disk.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
37 changes: 12 additions & 25 deletions redash/query_runner/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
guarded_unpack_sequence,
safe_builtins,
)
from RestrictedPython.transformer import IOPERATOR_TO_STR

from redash import models
from redash.query_runner import (
Expand All @@ -23,16 +24,17 @@
register,
)
from redash.utils import json_dumps, json_loads
from redash.utils.pandas import pandas_installed

try:
import numpy as np
if pandas_installed:
import pandas as pd

pandas_installed = True
except ImportError:
pandas_installed = False
from redash.utils.pandas import pandas_to_result

enabled = True
else:
enabled = False

Check warning on line 36 in redash/query_runner/python.py

View check run for this annotation

Codecov / codecov/patch

redash/query_runner/python.py#L36

Added line #L36 was not covered by tests

from RestrictedPython.transformer import IOPERATOR_TO_STR

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -271,26 +273,11 @@
return query.latest_query_data.data

def dataframe_to_result(self, result, df):
result["rows"] = df.to_dict("records")

for column_name, column_type in df.dtypes.items():
if column_type == np.bool_:
redash_type = TYPE_BOOLEAN
elif column_type == np.inexact:
redash_type = TYPE_FLOAT
elif column_type == np.integer:
redash_type = TYPE_INTEGER
elif column_type in (np.datetime64, np.dtype("<M8[ns]")):
if df.empty:
redash_type = TYPE_DATETIME
elif len(df[column_name].head(1).astype(str).loc[0]) > 10:
redash_type = TYPE_DATETIME
else:
redash_type = TYPE_DATE
else:
redash_type = TYPE_STRING
converted_result = pandas_to_result(df)

Check warning on line 276 in redash/query_runner/python.py

View check run for this annotation

Codecov / codecov/patch

redash/query_runner/python.py#L276

Added line #L276 was not covered by tests

self.add_result_column(result, column_name, column_name, redash_type)
result["rows"] = converted_result["rows"]

Check warning on line 278 in redash/query_runner/python.py

View check run for this annotation

Codecov / codecov/patch

redash/query_runner/python.py#L278

Added line #L278 was not covered by tests
for column in converted_result["columns"]:
self.add_result_column(result, column["name"], column["friendly_name"], column["type"])

Check warning on line 280 in redash/query_runner/python.py

View check run for this annotation

Codecov / codecov/patch

redash/query_runner/python.py#L280

Added line #L280 was not covered by tests

def get_current_user(self):
return self._current_user.to_dict()
Expand Down
166 changes: 166 additions & 0 deletions redash/query_runner/yandex_disk.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
import logging
from importlib.util import find_spec

import requests
import yaml

from redash.query_runner import BaseSQLQueryRunner, register
from redash.utils import json_dumps
from redash.utils.pandas import pandas_installed

openpyxl_installed = find_spec("openpyxl")

if pandas_installed and openpyxl_installed:
import openpyxl # noqa: F401
import pandas as pd

from redash.utils.pandas import pandas_to_result

enabled = True

EXTENSIONS_READERS = {
"csv": pd.read_csv,
"tsv": pd.read_table,
"xls": pd.read_excel,
"xlsx": pd.read_excel,
}
else:
enabled = False

Check warning on line 28 in redash/query_runner/yandex_disk.py

View check run for this annotation

Codecov / codecov/patch

redash/query_runner/yandex_disk.py#L28

Added line #L28 was not covered by tests

logger = logging.getLogger(__name__)


class YandexDisk(BaseSQLQueryRunner):
should_annotate_query = False

@classmethod
def type(cls):
return "yandex_disk"

@classmethod
def name(cls):
return "Yandex Disk"

@classmethod
def configuration_schema(cls):
return {
"type": "object",
"properties": {
"token": {"type": "string", "title": "OAuth Token"},
},
"secret": ["token"],
"required": ["token"],
}

def __init__(self, configuration):
super(YandexDisk, self).__init__(configuration)
self.syntax = "yaml"
self.base_url = "https://cloud-api.yandex.net/v1/disk"
self.list_path = "counters"

def _get_tables(self, schema):
offset = 0
limit = 100

while True:
tmp_response = self._send_query(
"resources/public", media_type="spreadsheet,text", limit=limit, offset=offset
)

tmp_items = tmp_response["items"]

for file_info in tmp_items:
file_name = file_info["name"]
file_path = file_info["path"].replace("disk:", "")

file_extension = file_name.split(".")[-1].lower()
if file_extension not in EXTENSIONS_READERS:
continue

schema[file_name] = {"name": file_name, "columns": [file_path]}

if len(tmp_items) < limit:
break

offset += limit

Check warning on line 85 in redash/query_runner/yandex_disk.py

View check run for this annotation

Codecov / codecov/patch

redash/query_runner/yandex_disk.py#L85

Added line #L85 was not covered by tests

return list(schema.values())

def test_connection(self):
self._send_query()

def _send_query(self, url_path="", **kwargs):
token = kwargs.pop("oauth_token", self.configuration["token"])
r = requests.get(
f"{self.base_url}/{url_path}",
headers={"Authorization": f"OAuth {token}"},
params=kwargs,
)

response_data = r.json()

if not r.ok:
error_message = f"Code: {r.status_code}, message: {r.text}"
raise Exception(error_message)
return response_data

def run_query(self, query, user):
logger.debug("Yandex Disk is about to execute query: %s", query)
data = None

if not query:
error = "Query is empty"
return data, error

try:
params = yaml.safe_load(query)
except (ValueError, AttributeError) as e:
logger.exception(e)
error = f"YAML read error: {str(e)}"
return data, error

if not isinstance(params, dict):
error = "The query format must be JSON or YAML"
return data, error

if "path" not in params:
error = "The query must contain path"
return data, error

file_extension = params["path"].split(".")[-1].lower()

read_params = {}
is_multiple_sheets = False

if file_extension not in EXTENSIONS_READERS:
error = f"Unsupported file extension: {file_extension}"
return data, error
elif file_extension in ("xls", "xlsx"):
read_params["sheet_name"] = params.get("sheet_name", 0)
if read_params["sheet_name"] is None:
is_multiple_sheets = True

file_url = self._send_query("resources/download", path=params["path"])["href"]

try:
df = EXTENSIONS_READERS[file_extension](file_url, **read_params)
denisov-vlad marked this conversation as resolved.
Show resolved Hide resolved
except Exception as e:
logger.exception(e)
error = f"Read file error: {str(e)}"
return data, error

if is_multiple_sheets:
new_df = []
for sheet_name, sheet_df in df.items():
sheet_df["sheet_name"] = sheet_name
new_df.append(sheet_df)
new_df = pd.concat(new_df, ignore_index=True)
df = new_df.copy()

data = json_dumps(pandas_to_result(df))
error = None

return data, error


register(YandexDisk)
1 change: 1 addition & 0 deletions redash/settings/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ def email_server_is_configured():
"redash.query_runner.clickhouse",
"redash.query_runner.tinybird",
"redash.query_runner.yandex_metrica",
"redash.query_runner.yandex_disk",
"redash.query_runner.rockset",
"redash.query_runner.treasuredata",
"redash.query_runner.sqlite",
Expand Down
47 changes: 47 additions & 0 deletions redash/utils/pandas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import logging
from importlib.util import find_spec

from redash.query_runner import (
TYPE_BOOLEAN,
TYPE_DATE,
TYPE_DATETIME,
TYPE_FLOAT,
TYPE_INTEGER,
TYPE_STRING,
)

logger = logging.getLogger(__name__)

pandas_installed = find_spec("pandas") and find_spec("numpy")

if pandas_installed:
import numpy as np
import pandas as pd

def get_column_types_from_dataframe(df: pd.DataFrame) -> list:
columns = []
for column_name, column_type in df.dtypes.items():
if column_type in (np.bool_,):
redash_type = TYPE_BOOLEAN
elif column_type in (np.int64, np.int32):
redash_type = TYPE_INTEGER
elif column_type in (np.float64,):
redash_type = TYPE_FLOAT
elif column_type in (np.datetime64, np.dtype("<M8[ns]")):
if df.empty:
redash_type = TYPE_DATETIME

Check warning on line 32 in redash/utils/pandas.py

View check run for this annotation

Codecov / codecov/patch

redash/utils/pandas.py#L32

Added line #L32 was not covered by tests
elif len(df[column_name].head(1).astype(str).loc[0]) > 10:
redash_type = TYPE_DATETIME
else:
redash_type = TYPE_DATE
else:
redash_type = TYPE_STRING

columns.append({"name": column_name, "friendly_name": column_name, "type": redash_type})

return columns

def pandas_to_result(df: pd.DataFrame) -> dict:
columns = get_column_types_from_dataframe(df)
rows = df.to_dict("records")
return {"columns": columns, "rows": rows}
Loading
Loading