diff --git a/dune_client/client.py b/dune_client/client.py index 1d391bd..63b7397 100644 --- a/dune_client/client.py +++ b/dune_client/client.py @@ -10,10 +10,10 @@ from typing import Any, Optional, Union import requests +from deprecated import deprecated from requests import Response, JSONDecodeError from dune_client.base_client import BaseDuneClient -from dune_client.interface import DuneInterface from dune_client.models import ( ExecutionResponse, ExecutionResultCSV, @@ -23,12 +23,11 @@ ResultsResponse, ExecutionState, ) - from dune_client.query import QueryBase, DuneQuery from dune_client.types import QueryParameter -class DuneClient(DuneInterface, BaseDuneClient): +class DuneClient(BaseDuneClient): # pylint: disable=too-many-public-methods """ An interface for Dune API with a few convenience methods combining the use of endpoints (e.g. refresh) @@ -93,41 +92,24 @@ def _patch(self, route: str, params: Any) -> Any: ) return self._handle_response(response) + @deprecated(version="1.2.1", reason="Please use execute_query") def execute( self, query: QueryBase, performance: Optional[str] = None ) -> ExecutionResponse: """Post's to Dune API for execute `query`""" - params = query.request_format() - params["performance"] = performance or self.performance - - self.logger.info( - f"executing {query.query_id} on {performance or self.performance} cluster" - ) - response_json = self._post( - route=f"/query/{query.query_id}/execute", - params=params, - ) - try: - return ExecutionResponse.from_dict(response_json) - except KeyError as err: - raise DuneError(response_json, "ExecutionResponse", err) from err + return self.execute_query(query, performance) + @deprecated(version="1.2.1", reason="Please use get_execution_status") def get_status(self, job_id: str) -> ExecutionStatusResponse: """GET status from Dune API for `job_id` (aka `execution_id`)""" - response_json = self._get(route=f"/execution/{job_id}/status") - try: - return ExecutionStatusResponse.from_dict(response_json) - except KeyError as err: - raise DuneError(response_json, "ExecutionStatusResponse", err) from err + return self.get_execution_status(job_id) + @deprecated(version="1.2.1", reason="Please use get_execution_results") def get_result(self, job_id: str) -> ResultsResponse: """GET results from Dune API for `job_id` (aka `execution_id`)""" - response_json = self._get(route=f"/execution/{job_id}/results") - try: - return ResultsResponse.from_dict(response_json) - except KeyError as err: - raise DuneError(response_json, "ResultsResponse", err) from err + return self.get_execution_results(job_id) + @deprecated(version="1.2.1", reason="Please use get_execution_results_csv") def get_result_csv(self, job_id: str) -> ExecutionResultCSV: """ GET results in CSV format from Dune API for `job_id` (aka `execution_id`) @@ -136,12 +118,7 @@ def get_result_csv(self, job_id: str) -> ExecutionResultCSV: use this method for large results where you want lower CPU and memory overhead if you need metadata information use get_results() or get_status() """ - route = f"/execution/{job_id}/results/csv" - url = self._route_url(f"/execution/{job_id}/results/csv") - self.logger.debug(f"GET CSV received input url={url}") - response = self._get(route=route, raw=True) - response.raise_for_status() - return ExecutionResultCSV(data=BytesIO(response.content)) + return self.get_execution_results_csv(job_id) def get_latest_result(self, query: Union[QueryBase, str, int]) -> ResultsResponse: """ @@ -193,20 +170,21 @@ def _refresh( fetches and returns the results. Sleeps `ping_frequency` seconds between each status request. """ - job_id = self.execute(query=query, performance=performance).execution_id - status = self.get_status(job_id) + job_id = self.execute_query(query=query, performance=performance).execution_id + status = self.get_execution_status(job_id) while status.state not in ExecutionState.terminal_states(): self.logger.info( f"waiting for query execution {job_id} to complete: {status}" ) time.sleep(ping_frequency) - status = self.get_status(job_id) + status = self.get_execution_status(job_id) if status.state == ExecutionState.FAILED: self.logger.error(status) raise QueryFailed(f"{status}. Perhaps your query took too long to run!") return job_id + @deprecated(version="1.2.1", reason="Please use run_query") def refresh( self, query: QueryBase, @@ -218,11 +196,9 @@ def refresh( fetches and returns the results. Sleeps `ping_frequency` seconds between each status request. """ - job_id = self._refresh( - query, ping_frequency=ping_frequency, performance=performance - ) - return self.get_result(job_id) + return self.run_query(query, ping_frequency, performance) + @deprecated(version="1.2.1", reason="Please use run_query_csv") def refresh_csv( self, query: QueryBase, @@ -234,11 +210,9 @@ def refresh_csv( fetches and the results in CSV format (use it load the data directly in pandas.from_csv() or similar frameworks) """ - job_id = self._refresh( - query, ping_frequency=ping_frequency, performance=performance - ) - return self.get_result_csv(job_id) + return self.run_query_csv(query, ping_frequency, performance) + @deprecated(version="1.2.1", reason="Please use run_query_dataframe") def refresh_into_dataframe( self, query: QueryBase, performance: Optional[str] = None ) -> Any: @@ -248,14 +222,7 @@ def refresh_into_dataframe( This is a convenience method that uses refresh_csv underneath """ - try: - import pandas # type: ignore # pylint: disable=import-outside-toplevel - except ImportError as exc: - raise ImportError( - "dependency failure, pandas is required but missing" - ) from exc - data = self.refresh_csv(query, performance=performance).data - return pandas.read_csv(data) + return self.run_query_dataframe(query, performance) # CRUD Operations: https://dune.com/docs/api/api-reference/edit-queries/ def create_query( @@ -397,3 +364,103 @@ def upload_csv(self, table_name: str, data: str, description: str = "") -> bool: return bool(response_json["success"]) except KeyError as err: raise DuneError(response_json, "upload_csv response", err) from err + + def execute_query( + self, query: QueryBase, performance: Optional[str] = None + ) -> ExecutionResponse: + """Post's to Dune API for execute `query`""" + params = query.request_format() + params["performance"] = performance or self.performance + + self.logger.info( + f"executing {query.query_id} on {performance or self.performance} cluster" + ) + response_json = self._post( + route=f"/query/{query.query_id}/execute", + params=params, + ) + try: + return ExecutionResponse.from_dict(response_json) + except KeyError as err: + raise DuneError(response_json, "ExecutionResponse", err) from err + + def get_execution_status(self, job_id: str) -> ExecutionStatusResponse: + """GET status from Dune API for `job_id` (aka `execution_id`)""" + response_json = self._get(route=f"/execution/{job_id}/status") + try: + return ExecutionStatusResponse.from_dict(response_json) + except KeyError as err: + raise DuneError(response_json, "ExecutionStatusResponse", err) from err + + def get_execution_results(self, job_id: str) -> ResultsResponse: + """GET results from Dune API for `job_id` (aka `execution_id`)""" + response_json = self._get(route=f"/execution/{job_id}/results") + try: + return ResultsResponse.from_dict(response_json) + except KeyError as err: + raise DuneError(response_json, "ResultsResponse", err) from err + + def get_execution_results_csv(self, job_id: str) -> ExecutionResultCSV: + """ + GET results in CSV format from Dune API for `job_id` (aka `execution_id`) + + this API only returns the raw data in CSV format, it is faster & lighterweight + use this method for large results where you want lower CPU and memory overhead + if you need metadata information use get_results() or get_status() + """ + route = f"/execution/{job_id}/results/csv" + url = self._route_url(f"/execution/{job_id}/results/csv") + self.logger.debug(f"GET CSV received input url={url}") + response = self._get(route=route, raw=True) + response.raise_for_status() + return ExecutionResultCSV(data=BytesIO(response.content)) + + def run_query( + self, + query: QueryBase, + ping_frequency: int = 5, + performance: Optional[str] = None, + ) -> ResultsResponse: + """ + Executes a Dune `query`, waits until execution completes, + fetches and returns the results. + Sleeps `ping_frequency` seconds between each status request. + """ + job_id = self._refresh( + query, ping_frequency=ping_frequency, performance=performance + ) + return self.get_execution_results(job_id) + + def run_query_csv( + self, + query: QueryBase, + ping_frequency: int = 5, + performance: Optional[str] = None, + ) -> ExecutionResultCSV: + """ + Executes a Dune query, waits till execution completes, + fetches and the results in CSV format + (use it load the data directly in pandas.from_csv() or similar frameworks) + """ + job_id = self._refresh( + query, ping_frequency=ping_frequency, performance=performance + ) + return self.get_execution_results_csv(job_id) + + def run_query_dataframe( + self, query: QueryBase, performance: Optional[str] = None + ) -> Any: + """ + Execute a Dune Query, waits till execution completes, + fetched and returns the result as a Pandas DataFrame + + This is a convenience method that uses refresh_csv underneath + """ + try: + import pandas # type: ignore # pylint: disable=import-outside-toplevel + except ImportError as exc: + raise ImportError( + "dependency failure, pandas is required but missing" + ) from exc + data = self.run_query_csv(query, performance=performance).data + return pandas.read_csv(data) diff --git a/requirements/prod.txt b/requirements/prod.txt index 5adf4d4..a3208d4 100644 --- a/requirements/prod.txt +++ b/requirements/prod.txt @@ -5,3 +5,5 @@ types-requests>=2.31.0.2 python-dateutil>=2.8.2 requests>=2.31.0 ndjson>=0.3.1 +Deprecated>=1.2.14 +types-Deprecated==1.2.9.3 diff --git a/tests/e2e/test_client.py b/tests/e2e/test_client.py index 934052b..c62e533 100644 --- a/tests/e2e/test_client.py +++ b/tests/e2e/test_client.py @@ -38,28 +38,28 @@ def test_from_env_constructor(self): except KeyError: self.fail("DuneClient.from_env raised unexpectedly!") - def test_get_status(self): + def test_get_execution_status(self): query = QueryBase(name="No Name", query_id=1276442, params=[]) dune = DuneClient(self.valid_api_key) - job_id = dune.execute(query).execution_id - status = dune.get_status(job_id) + job_id = dune.execute_query(query).execution_id + status = dune.get_execution_status(job_id) self.assertTrue( status.state in [ExecutionState.EXECUTING, ExecutionState.PENDING] ) - def test_refresh(self): + def test_run_query(self): dune = DuneClient(self.valid_api_key) - results = dune.refresh(self.query).get_rows() + results = dune.run_query(self.query).get_rows() self.assertGreater(len(results), 0) - def test_refresh_performance_large(self): + def test_run_query_performance_large(self): dune = DuneClient(self.valid_api_key) - results = dune.refresh(self.query, performance="large").get_rows() + results = dune.run_query(self.query, performance="large").get_rows() self.assertGreater(len(results), 0) - def test_refresh_into_dataframe(self): + def test_run_query_dataframe(self): dune = DuneClient(self.valid_api_key) - pd = dune.refresh_into_dataframe(self.query) + pd = dune.run_query_dataframe(self.query) self.assertGreater(len(pd), 0) def test_parameters_recognized(self): @@ -75,7 +75,7 @@ def test_parameters_recognized(self): self.assertEqual(query.parameters(), new_params) dune = DuneClient(self.valid_api_key) - results = dune.refresh(query) + results = dune.run_query(query) self.assertEqual( results.get_rows(), [ @@ -90,14 +90,14 @@ def test_parameters_recognized(self): def test_endpoints(self): dune = DuneClient(self.valid_api_key) - execution_response = dune.execute(self.query) + execution_response = dune.execute_query(self.query) self.assertIsInstance(execution_response, ExecutionResponse) job_id = execution_response.execution_id - status = dune.get_status(job_id) + status = dune.get_execution_status(job_id) self.assertIsInstance(status, ExecutionStatusResponse) - while dune.get_status(job_id).state != ExecutionState.COMPLETED: + while dune.get_execution_status(job_id).state != ExecutionState.COMPLETED: time.sleep(1) - results = dune.get_result(job_id).result.rows + results = dune.get_execution_results(job_id).result.rows self.assertGreater(len(results), 0) def test_cancel_execution(self): @@ -106,31 +106,31 @@ def test_cancel_execution(self): name="Long Running Query", query_id=1229120, ) - execution_response = dune.execute(query) + execution_response = dune.execute_query(query) job_id = execution_response.execution_id # POST Cancellation success = dune.cancel_execution(job_id) self.assertTrue(success) - results = dune.get_result(job_id) + results = dune.get_execution_results(job_id) self.assertEqual(results.state, ExecutionState.CANCELLED) def test_invalid_api_key_error(self): dune = DuneClient(api_key="Invalid Key") with self.assertRaises(DuneError) as err: - dune.execute(self.query) + dune.execute_query(self.query) self.assertEqual( str(err.exception), "Can't build ExecutionResponse from {'error': 'invalid API Key'}", ) with self.assertRaises(DuneError) as err: - dune.get_status("wonky job_id") + dune.get_execution_status("wonky job_id") self.assertEqual( str(err.exception), "Can't build ExecutionStatusResponse from {'error': 'invalid API Key'}", ) with self.assertRaises(DuneError) as err: - dune.get_result("wonky job_id") + dune.get_execution_results("wonky job_id") self.assertEqual( str(err.exception), "Can't build ResultsResponse from {'error': 'invalid API Key'}", @@ -142,7 +142,7 @@ def test_query_not_found_error(self): query.query_id = 99999999 # Invalid Query Id. with self.assertRaises(DuneError) as err: - dune.execute(query) + dune.execute_query(query) self.assertEqual( str(err.exception), "Can't build ExecutionResponse from {'error': 'Query not found'}", @@ -155,7 +155,7 @@ def test_internal_error(self): query.query_id = 9999999999999 with self.assertRaises(DuneError) as err: - dune.execute(query) + dune.execute_query(query) self.assertEqual( str(err.exception), "Can't build ExecutionResponse from {'error': 'An internal error occured'}", @@ -164,7 +164,7 @@ def test_internal_error(self): def test_invalid_job_id_error(self): dune = DuneClient(self.valid_api_key) with self.assertRaises(DuneError) as err: - dune.get_status("Wonky Job ID") + dune.get_execution_status("Wonky Job ID") self.assertEqual( str(err.exception), "Can't build ExecutionStatusResponse from "