From 0751fb2c9b2613dd84daec73d278720dd2ddea09 Mon Sep 17 00:00:00 2001 From: atulya-astronomer Date: Wed, 20 Nov 2024 17:50:38 +0530 Subject: [PATCH 1/6] Added delete APIs --- .../compat/starship_compatability.py | 60 +++++++++++++++++-- astronomer_starship/starship.py | 2 +- astronomer_starship/starship_api.py | 12 ++-- 3 files changed, 63 insertions(+), 11 deletions(-) diff --git a/astronomer_starship/compat/starship_compatability.py b/astronomer_starship/compat/starship_compatability.py index 413e595..3b767e9 100644 --- a/astronomer_starship/compat/starship_compatability.py +++ b/astronomer_starship/compat/starship_compatability.py @@ -1,6 +1,7 @@ import json +import logging import os -from flask import jsonify +from flask import jsonify, Response from sqlalchemy.orm import Session from typing import TYPE_CHECKING @@ -12,6 +13,9 @@ import pytz +logger = logging.getLogger(__name__) + + def get_from_request(args, json, key, required: bool = False) -> "Any": val = json.get(key, args.get(key)) if val is None and required: @@ -146,6 +150,24 @@ def generic_set_one(session: Session, qualname: str, attrs: dict, **kwargs): raise e +def generic_delete(session: Session, qualname: str, **kwargs): + from http import HTTPStatus + from sqlalchemy import delete + + (_, thing_cls) = import_from_qualname(qualname) + + try: + filters = [getattr(thing_cls, attr) == val for attr, val in kwargs.items()] + deleted_rows = session.execute(delete(thing_cls).where(*filters)).rowcount + session.commit() + logger.info(f"Deleted {deleted_rows} rows for table {qualname}") + return Response(None, status=HTTPStatus.NO_CONTENT) + except Exception as e: + logger.error(f"Error deleting rows for table {qualname}: {e}") + session.rollback() + raise e + + def get_test_data(attrs: dict, method: "Union[str, None]" = None) -> "Dict[str, Any]": """ >>> get_test_data(method="POST", attrs={"key": {"attr": "key", "methods": [("POST", True)], "test_value": "key"}}) @@ -195,10 +217,21 @@ def set_env_vars(cls): res.status_code = 409 raise NotImplementedError() + @classmethod + def delete_env_vars(cls): + """This is not possible to do via API, so return an error""" + res = jsonify({"error": "Not implemented"}) + res.status_code = 405 + raise NotImplementedError() + @classmethod def variable_attrs(cls) -> "Dict[str, AttrDesc]": return { - "key": {"attr": "key", "methods": [("POST", True)], "test_value": "key"}, + "key": { + "attr": "key", + "methods": [("POST", True), ("DELETE", True)], + "test_value": "key", + }, "val": {"attr": "val", "methods": [("POST", True)], "test_value": "val"}, "description": { "attr": "description", @@ -217,12 +250,15 @@ def set_variable(self, **kwargs): self.session, "airflow.models.Variable", self.variable_attrs(), **kwargs ) + def delete_variable(self, **kwargs): + return generic_delete(self.session, "airflow.models.Variable", **kwargs) + @classmethod def pool_attrs(cls) -> "Dict[str, AttrDesc]": return { "name": { "attr": "pool", - "methods": [("POST", True)], + "methods": [("POST", True), ("DELETE", True)], "test_value": "test_name", }, "slots": {"attr": "slots", "methods": [("POST", True)], "test_value": 1}, @@ -241,12 +277,15 @@ def set_pool(self, **kwargs): self.session, "airflow.models.Pool", self.pool_attrs(), **kwargs ) + def delete_pool(self, **kwargs): + return generic_delete(self.session, "airflow.models.Pool", **kwargs) + @classmethod def connection_attrs(cls) -> "Dict[str, AttrDesc]": return { "conn_id": { "attr": "conn_id", - "methods": [("POST", True)], + "methods": [("POST", True), ("DELETE", True)], "test_value": "conn_id", }, "conn_type": { @@ -301,6 +340,9 @@ def set_connection(self, **kwargs): self.session, "airflow.models.Connection", self.connection_attrs(), **kwargs ) + def delete_connection(self, **kwargs): + return generic_delete(self.session, "airflow.models.Connection", **kwargs) + @classmethod def dag_attrs(cls) -> "Dict[str, AttrDesc]": return { @@ -442,7 +484,7 @@ def dag_runs_attrs(cls) -> "Dict[str, AttrDesc]": return { "dag_id": { "attr": "dag_id", - "methods": [("GET", True)], + "methods": [("GET", True), ("DELETE", True)], "test_value": "dag_0", }, # Limit is the number of rows to return. @@ -591,6 +633,9 @@ def set_dag_runs(self, dag_runs: list): dag_runs = self.insert_directly("dag_run", dag_runs) return {"dag_runs": dag_runs, "dag_run_count": self._get_dag_run_count(dag_id)} + def delete_dag_runs(self, **kwargs): + return generic_delete(self.session, "airflow.models.DagRun", **kwargs) + @classmethod def task_instances_attrs(cls) -> "Dict[str, AttrDesc]": epoch = datetime.datetime(1970, 1, 1, 0, 0) @@ -600,7 +645,7 @@ def task_instances_attrs(cls) -> "Dict[str, AttrDesc]": return { "dag_id": { "attr": "dag_id", - "methods": [("GET", True)], + "methods": [("GET", True), ("DELETE", True)], "test_value": "dag_0", }, # Limit is the number of rows to return. @@ -853,6 +898,9 @@ def set_task_instances(self, task_instances: list): task_instances = self.insert_directly("task_instance", task_instances) return {"task_instances": task_instances} + def delete_task_instances(self, **kwargs): + return generic_delete(self.session, "airflow.models.TaskInstance", **kwargs) + def insert_directly(self, table_name, items): from sqlalchemy.exc import InvalidRequestError from sqlalchemy import MetaData diff --git a/astronomer_starship/starship.py b/astronomer_starship/starship.py index 28d607c..653d11a 100644 --- a/astronomer_starship/starship.py +++ b/astronomer_starship/starship.py @@ -13,7 +13,7 @@ from airflow.security import permissions from airflow.www import auth -ALLOWED_PROXY_METHODS = ["GET", "POST", "PATCH"] +ALLOWED_PROXY_METHODS = ["GET", "POST", "PATCH", "DELETE"] class Starship(BaseView): diff --git a/astronomer_starship/starship_api.py b/astronomer_starship/starship_api.py index 0d99cd5..8637d88 100644 --- a/astronomer_starship/starship_api.py +++ b/astronomer_starship/starship_api.py @@ -266,7 +266,7 @@ def env_vars(self): return starship_route(get=starship_compat.get_env_vars) # @auth.has_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_POOL)]) - @expose("/pools", methods=["GET", "POST"]) + @expose("/pools", methods=["GET", "POST", "DELETE"]) @csrf.exempt def pools(self): """ @@ -310,11 +310,12 @@ def pools(self): return starship_route( get=starship_compat.get_pools, post=starship_compat.set_pool, + delete=starship_compat.delete_pool, kwargs_fn=partial(get_kwargs_fn, attrs=starship_compat.pool_attrs()), ) # @auth.has_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_VARIABLE)]) - @expose("/variables", methods=["GET", "POST"]) + @expose("/variables", methods=["GET", "POST", "DELETE"]) @csrf.exempt def variables(self): """ @@ -357,11 +358,12 @@ def variables(self): return starship_route( get=starship_compat.get_variables, post=starship_compat.set_variable, + delete=starship_compat.delete_variable, kwargs_fn=partial(get_kwargs_fn, attrs=starship_compat.variable_attrs()), ) # @auth.has_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_CONNECTION)]) - @expose("/connections", methods=["GET", "POST"]) + @expose("/connections", methods=["GET", "POST", "DELETE"]) @csrf.exempt def connections(self): """ @@ -419,6 +421,7 @@ def connections(self): return starship_route( get=starship_compat.get_connections, post=starship_compat.set_connection, + delete=starship_compat.delete_connection, kwargs_fn=partial(get_kwargs_fn, attrs=starship_compat.connection_attrs()), ) @@ -479,7 +482,7 @@ def dags(self): ) # @auth.has_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN)]) - @expose("/dag_runs", methods=["GET", "POST"]) + @expose("/dag_runs", methods=["GET", "POST", "DELETE"]) @csrf.exempt def dag_runs(self): """ @@ -567,6 +570,7 @@ def dag_runs(self): return starship_route( get=starship_compat.get_dag_runs, post=starship_compat.set_dag_runs, + delete=starship_compat.delete_dag_runs, kwargs_fn=partial(get_kwargs_fn, attrs=starship_compat.dag_runs_attrs()), ) From 1fc6873e2339d95ac4cbdc650bc5085349eccb9b Mon Sep 17 00:00:00 2001 From: atulya-astronomer Date: Mon, 9 Dec 2024 17:24:20 +0530 Subject: [PATCH 2/6] Added delete button to DAG page --- .../src/component/MigrateButton.jsx | 33 ++++++++---- .../src/pages/DAGHistoryPage.jsx | 52 ++++++++++++++++--- 2 files changed, 68 insertions(+), 17 deletions(-) diff --git a/astronomer_starship/src/component/MigrateButton.jsx b/astronomer_starship/src/component/MigrateButton.jsx index a68644c..3c3b9c4 100644 --- a/astronomer_starship/src/component/MigrateButton.jsx +++ b/astronomer_starship/src/component/MigrateButton.jsx @@ -2,11 +2,16 @@ import React, { useState } from 'react'; import axios from 'axios'; import { Button, useToast } from '@chakra-ui/react'; -import { MdErrorOutline } from 'react-icons/md'; -import { FaCheck } from 'react-icons/fa'; +import { MdErrorOutline, MdDeleteForever } from 'react-icons/md'; import { GoUpload } from 'react-icons/go'; import PropTypes from 'prop-types'; +function checkStatus(status, exists) { + if (status === 204) + return false; + return status === 200 || exists; +} + export default function MigrateButton({ route, headers, existsInRemote, sendData, isDisabled, }) { @@ -16,13 +21,23 @@ export default function MigrateButton({ const [exists, setExists] = useState(existsInRemote); function handleClick() { setLoading(true); - axios.post(route, sendData, { headers }) + axios({ + method: exists ? 'delete' : 'post', + url: route, + headers, + data: sendData, + }) .then((res) => { setLoading(false); - setExists(res.status === 200); + setExists(checkStatus(res.status, exists)); + toast({ + title: 'Success', + status: 'success', + isClosable: true, + }) }) .catch((err) => { - setExists(false); + setExists(exists); setLoading(false); toast({ title: err.response?.data?.error || err.response?.data || err.message, @@ -34,19 +49,19 @@ export default function MigrateButton({ } return ( ); } diff --git a/astronomer_starship/src/pages/DAGHistoryPage.jsx b/astronomer_starship/src/pages/DAGHistoryPage.jsx index 1f73a8e..2f34da6 100644 --- a/astronomer_starship/src/pages/DAGHistoryPage.jsx +++ b/astronomer_starship/src/pages/DAGHistoryPage.jsx @@ -19,9 +19,8 @@ import { } from '@chakra-ui/react'; import PropTypes from 'prop-types'; import axios from 'axios'; -import { MdErrorOutline } from 'react-icons/md'; +import { MdErrorOutline, MdDeleteForever } from 'react-icons/md'; import { GrDocumentMissing } from 'react-icons/gr'; -import { FaCheck } from 'react-icons/fa'; import { GoUpload } from 'react-icons/go'; import humanFormat from 'human-format'; import { ExternalLinkIcon, RepeatIcon } from '@chakra-ui/icons'; @@ -56,6 +55,44 @@ function DAGHistoryMigrateButton({ const percent = 100; function handleClick() { + + function deleteRuns() { + setLoadPerc(percent * 0.5); + axios({ + method: 'delete', + url: proxyUrl(url + constants.DAG_RUNS_ROUTE), + headers: proxyHeaders(token), + data: { dag_id: dagId }, + }).then((res) => { + setExists(!(res.status === 204)); + dispatch({ + type: 'set-dags-data', + dagsData: { + [dagId]: { + remote: { + dag_run_count: 0, + }, + }, + }, + }); + setLoadPerc(percent * 1); + setLoadPerc(0); + }).catch((err) => { + setExists(false); + setLoadPerc(percent * 0); + toast({ + title: err.response?.data?.error || err.response?.data || err.message, + status: 'error', + isClosable: true, + }); + setError(err); + }); + } + + if (exists) { + deleteRuns(); + return; + } const errFn = (err) => { setExists(false); // noinspection PointlessArithmeticExpressionJS @@ -117,23 +154,23 @@ function DAGHistoryMigrateButton({ return (