Skip to content

Commit

Permalink
Added delete button for Variables, Pools, Connections and Dag Runs (#114
Browse files Browse the repository at this point in the history
)

* Added delete APIs

* Added delete button to DAG page

* Changed to params from body

* Added tests and updated version to 2.2

* Updated documentation

* Added docker tests

All tests passing. Merging and creating a new release
  • Loading branch information
atulya-astronomer authored Dec 17, 2024
1 parent ee99464 commit b854d53
Show file tree
Hide file tree
Showing 8 changed files with 212 additions and 30 deletions.
2 changes: 1 addition & 1 deletion astronomer_starship/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "2.1.0"
__version__ = "2.2.0"


def get_provider_info():
Expand Down
69 changes: 63 additions & 6 deletions astronomer_starship/compat/starship_compatability.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import logging
import os
from flask import jsonify
from flask import jsonify, Response
from sqlalchemy.orm import Session
from typing import TYPE_CHECKING

Expand All @@ -12,6 +13,9 @@
import pytz


logger = logging.getLogger(__name__)


def get_from_request(args, json, key, required: bool = False) -> "Any":
val = json.get(key, args.get(key))
if val is None and required:
Expand Down Expand Up @@ -146,6 +150,24 @@ def generic_set_one(session: Session, qualname: str, attrs: dict, **kwargs):
raise e


def generic_delete(session: Session, qualname: str, **kwargs) -> Response:
from http import HTTPStatus
from sqlalchemy import delete

(_, thing_cls) = import_from_qualname(qualname)

try:
filters = [getattr(thing_cls, attr) == val for attr, val in kwargs.items()]
deleted_rows = session.execute(delete(thing_cls).where(*filters)).rowcount
session.commit()
logger.info(f"Deleted {deleted_rows} rows for table {qualname}")
return Response(status=HTTPStatus.NO_CONTENT)
except Exception as e:
logger.error(f"Error deleting row(s) for table {qualname}: {e}")
session.rollback()
raise e


def get_test_data(attrs: dict, method: "Union[str, None]" = None) -> "Dict[str, Any]":
"""
>>> get_test_data(method="POST", attrs={"key": {"attr": "key", "methods": [("POST", True)], "test_value": "key"}})
Expand Down Expand Up @@ -195,10 +217,21 @@ def set_env_vars(cls):
res.status_code = 409
raise NotImplementedError()

@classmethod
def delete_env_vars(cls):
"""This is not possible to do via API, so return an error"""
res = jsonify({"error": "Not implemented"})
res.status_code = 405
raise NotImplementedError()

@classmethod
def variable_attrs(cls) -> "Dict[str, AttrDesc]":
return {
"key": {"attr": "key", "methods": [("POST", True)], "test_value": "key"},
"key": {
"attr": "key",
"methods": [("POST", True), ("DELETE", True)],
"test_value": "key",
},
"val": {"attr": "val", "methods": [("POST", True)], "test_value": "val"},
"description": {
"attr": "description",
Expand All @@ -217,12 +250,16 @@ def set_variable(self, **kwargs):
self.session, "airflow.models.Variable", self.variable_attrs(), **kwargs
)

def delete_variable(self, **kwargs):
attrs = {self.variable_attrs()[k]["attr"]: v for k, v in kwargs.items()}
return generic_delete(self.session, "airflow.models.Variable", **attrs)

@classmethod
def pool_attrs(cls) -> "Dict[str, AttrDesc]":
return {
"name": {
"attr": "pool",
"methods": [("POST", True)],
"methods": [("POST", True), ("DELETE", True)],
"test_value": "test_name",
},
"slots": {"attr": "slots", "methods": [("POST", True)], "test_value": 1},
Expand All @@ -241,12 +278,20 @@ def set_pool(self, **kwargs):
self.session, "airflow.models.Pool", self.pool_attrs(), **kwargs
)

def delete_pool(self, **kwargs):
attrs = {
self.pool_attrs()[k]["attr"]: v
for k, v in kwargs.items()
if k in self.pool_attrs()
}
return generic_delete(self.session, "airflow.models.Pool", **attrs)

@classmethod
def connection_attrs(cls) -> "Dict[str, AttrDesc]":
return {
"conn_id": {
"attr": "conn_id",
"methods": [("POST", True)],
"methods": [("POST", True), ("DELETE", True)],
"test_value": "conn_id",
},
"conn_type": {
Expand Down Expand Up @@ -301,6 +346,10 @@ def set_connection(self, **kwargs):
self.session, "airflow.models.Connection", self.connection_attrs(), **kwargs
)

def delete_connection(self, **kwargs):
attrs = {self.connection_attrs()[k]["attr"]: v for k, v in kwargs.items()}
return generic_delete(self.session, "airflow.models.Connection", **attrs)

@classmethod
def dag_attrs(cls) -> "Dict[str, AttrDesc]":
return {
Expand Down Expand Up @@ -442,7 +491,7 @@ def dag_runs_attrs(cls) -> "Dict[str, AttrDesc]":
return {
"dag_id": {
"attr": "dag_id",
"methods": [("GET", True)],
"methods": [("GET", True), ("DELETE", True)],
"test_value": "dag_0",
},
# Limit is the number of rows to return.
Expand Down Expand Up @@ -591,6 +640,10 @@ def set_dag_runs(self, dag_runs: list):
dag_runs = self.insert_directly("dag_run", dag_runs)
return {"dag_runs": dag_runs, "dag_run_count": self._get_dag_run_count(dag_id)}

def delete_dag_runs(self, **kwargs):
attrs = {self.dag_runs_attrs()[k]["attr"]: v for k, v in kwargs.items()}
return generic_delete(self.session, "airflow.models.DagRun", **attrs)

@classmethod
def task_instances_attrs(cls) -> "Dict[str, AttrDesc]":
epoch = datetime.datetime(1970, 1, 1, 0, 0)
Expand All @@ -600,7 +653,7 @@ def task_instances_attrs(cls) -> "Dict[str, AttrDesc]":
return {
"dag_id": {
"attr": "dag_id",
"methods": [("GET", True)],
"methods": [("GET", True), ("DELETE", True)],
"test_value": "dag_0",
},
# Limit is the number of rows to return.
Expand Down Expand Up @@ -853,6 +906,10 @@ def set_task_instances(self, task_instances: list):
task_instances = self.insert_directly("task_instance", task_instances)
return {"task_instances": task_instances}

def delete_task_instances(self, **kwargs):
attrs = {self.task_instances_attrs()[k]["attr"]: v for k, v in kwargs.items()}
return generic_delete(self.session, "airflow.models.TaskInstance", **attrs)

def insert_directly(self, table_name, items):
from sqlalchemy.exc import InvalidRequestError
from sqlalchemy import MetaData
Expand Down
33 changes: 24 additions & 9 deletions astronomer_starship/src/component/MigrateButton.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@
import React, { useState } from 'react';
import axios from 'axios';
import { Button, useToast } from '@chakra-ui/react';
import { MdErrorOutline } from 'react-icons/md';
import { FaCheck } from 'react-icons/fa';
import { MdErrorOutline, MdDeleteForever } from 'react-icons/md';
import { GoUpload } from 'react-icons/go';
import PropTypes from 'prop-types';

function checkStatus(status, exists) {
if (status === 204)
return false;
return status === 200 || exists;
}

export default function MigrateButton({
route, headers, existsInRemote, sendData, isDisabled,
}) {
Expand All @@ -16,13 +21,23 @@ export default function MigrateButton({
const [exists, setExists] = useState(existsInRemote);
function handleClick() {
setLoading(true);
axios.post(route, sendData, { headers })
axios({
method: exists ? 'delete' : 'post',
url: route,
headers,
params: sendData,
})
.then((res) => {
setLoading(false);
setExists(res.status === 200);
setExists(checkStatus(res.status, exists));
toast({
title: 'Success',
status: 'success',
isClosable: true,
})
})
.catch((err) => {
setExists(false);
setExists(exists);
setLoading(false);
toast({
title: err.response?.data?.error || err.response?.data || err.message,
Expand All @@ -34,19 +49,19 @@ export default function MigrateButton({
}
return (
<Button
isDisabled={loading || isDisabled || exists}
isDisabled={loading || isDisabled}
isLoading={loading}
loadingText="Loading"
variant="solid"
leftIcon={(
error ? <MdErrorOutline /> : exists ? <FaCheck /> : !loading ? <GoUpload /> : <span />
error ? <MdErrorOutline /> : exists ? <MdDeleteForever /> : !loading ? <GoUpload /> : <span />
)}
colorScheme={
exists ? 'green' : loading ? 'teal' : error ? 'red' : 'teal'
exists ? 'red' : loading ? 'teal' : error ? 'red' : 'teal'
}
onClick={() => handleClick()}
>
{exists ? 'Ok' : loading ? '' : error ? 'Error!' : 'Migrate'}
{exists ? 'Delete' : loading ? '' : error ? 'Error!' : 'Migrate'}
</Button>
);
}
Expand Down
52 changes: 44 additions & 8 deletions astronomer_starship/src/pages/DAGHistoryPage.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ import {
} from '@chakra-ui/react';
import PropTypes from 'prop-types';
import axios from 'axios';
import { MdErrorOutline } from 'react-icons/md';
import { MdErrorOutline, MdDeleteForever } from 'react-icons/md';
import { GrDocumentMissing } from 'react-icons/gr';
import { FaCheck } from 'react-icons/fa';
import { GoUpload } from 'react-icons/go';
import humanFormat from 'human-format';
import { ExternalLinkIcon, RepeatIcon } from '@chakra-ui/icons';
Expand Down Expand Up @@ -56,6 +55,44 @@ function DAGHistoryMigrateButton({
const percent = 100;

function handleClick() {

function deleteRuns() {
setLoadPerc(percent * 0.5);
axios({
method: 'delete',
url: proxyUrl(url + constants.DAG_RUNS_ROUTE),
headers: proxyHeaders(token),
params: { dag_id: dagId },
}).then((res) => {
setExists(!(res.status === 204));
dispatch({
type: 'set-dags-data',
dagsData: {
[dagId]: {
remote: {
dag_run_count: 0,
},
},
},
});
setLoadPerc(percent * 1);
setLoadPerc(0);
}).catch((err) => {
setExists(false);
setLoadPerc(percent * 0);
toast({
title: err.response?.data?.error || err.response?.data || err.message,
status: 'error',
isClosable: true,
});
setError(err);
});
}

if (exists) {
deleteRuns();
return;
}
const errFn = (err) => {
setExists(false);
// noinspection PointlessArithmeticExpressionJS
Expand Down Expand Up @@ -117,23 +154,23 @@ function DAGHistoryMigrateButton({
return (
<WithTooltip isDisabled={isDisabled}>
<Button
isDisabled={isDisabled || loadPerc || exists}
isDisabled={isDisabled || loadPerc}
// isLoading={loading}
// loadingText="Loading"
variant="solid"
leftIcon={(
error ? <MdErrorOutline />
: exists ? <FaCheck />
: exists ? <MdDeleteForever />
: isDisabled ? <GrDocumentMissing />
: !loadPerc ? <GoUpload />
: <span />
)}
colorScheme={
exists ? 'green' : isDisabled ? 'gray' : error ? 'red' : 'teal'
exists ? 'red' : isDisabled ? 'gray' : error ? 'red' : 'teal'
}
onClick={() => handleClick()}
>
{exists ? 'Ok'
{exists ? 'Delete'
: loadPerc ? (
<CircularProgress thickness="20px" size="30px" value={loadPerc} />
)
Expand Down Expand Up @@ -326,8 +363,7 @@ export default function DAGHistoryPage({ state, dispatch }) {
isDisabled={
!info.row.original.remote?.dag_id ? 'DAG not found in remote'
: !info.row.original.local.dag_run_count ? 'No DAG Runs to migrate'
: info.row.original.remote?.dag_run_count ? 'DAG Runs already exist in remote'
: false
: false
}
dispatch={dispatch}
/>
Expand Down
2 changes: 1 addition & 1 deletion astronomer_starship/starship.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from airflow.security import permissions
from airflow.www import auth

ALLOWED_PROXY_METHODS = ["GET", "POST", "PATCH"]
ALLOWED_PROXY_METHODS = ["GET", "POST", "PATCH", "DELETE"]


class Starship(BaseView):
Expand Down
Loading

0 comments on commit b854d53

Please sign in to comment.