Skip to content

Commit

Permalink
feature: 支持批量任务状态查询接口
Browse files Browse the repository at this point in the history
  • Loading branch information
normal-wls committed Aug 14, 2023
1 parent 51ff5ba commit 75dec4f
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 68 deletions.
39 changes: 33 additions & 6 deletions gcloud/iam_auth/view_interceptors/taskflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,13 @@
"""

import abc
import ujson as json

from iam import Action, Subject, Request
from iam.exceptions import AuthFailedException
import ujson as json
from iam.exceptions import AuthFailedException, MultiAuthFailedException

from gcloud.iam_auth import IAMMeta
from gcloud.iam_auth import get_iam_client
from gcloud.iam_auth import IAMMeta, get_iam_client, res_factory
from gcloud.iam_auth.intercept import ViewInterceptor
from gcloud.iam_auth import res_factory
from iam import Action, Request, Subject

iam = get_iam_client()

Expand Down Expand Up @@ -95,3 +93,32 @@ class GetNodeLogInterceptor(TaskSingleActionGetInterceptor):

class StatusViewInterceptor(TaskSingleActionGetInterceptor):
action = IAMMeta.TASK_VIEW_ACTION


class BatchStatusViewInterceptor(ViewInterceptor):
def process(self, request, *args, **kwargs):
task_ids = json.loads(request.body).get("task_ids") or []
subject = Subject("user", request.user.username)
action = Action(IAMMeta.TASK_VIEW_ACTION)
resources_list = res_factory.resources_list_for_tasks(task_ids)

if not resources_list:
return

resources_map = {}
for resources in resources_list:
resources_map[resources[0].id] = resources

request = Request(IAMMeta.SYSTEM_ID, subject, action, [], {})
result = iam.batch_is_allowed(request, resources_list)

if not result:
raise MultiAuthFailedException(IAMMeta.SYSTEM_ID, subject, action, resources_list)

not_allowed_list = []
for tid, allow in result.items():
if not allow:
not_allowed_list.append(resources_map[tid])

if not_allowed_list:
raise MultiAuthFailedException(IAMMeta.SYSTEM_ID, subject, action, not_allowed_list)
117 changes: 58 additions & 59 deletions gcloud/taskflow3/apis/django/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,62 +15,61 @@
import traceback

import ujson as json
from blueapps.account.decorators import login_exempt
from cryptography.fernet import Fernet
from django.db import transaction
from django.http import JsonResponse
from django.utils.translation import ugettext_lazy as _
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_GET, require_POST
from django.utils.translation import ugettext_lazy as _
from drf_yasg.utils import swagger_auto_schema
from rest_framework.decorators import api_view

import env
from blueapps.account.decorators import login_exempt
from gcloud.utils.throttle import check_task_operation_throttle, get_task_operation_frequence

from iam.contrib.http import HTTP_AUTH_FORBIDDEN_CODE
from iam.exceptions import RawAuthFailedException
from rest_framework.decorators import api_view

import env
from gcloud import err_code
from gcloud.core.models import EngineConfig
from gcloud.utils.decorators import request_validate
from gcloud.conf import settings
from gcloud.constants import TASK_CREATE_METHOD, PROJECT, JobBizScopeType
from gcloud.taskflow3.models import TaskFlowInstance, TimeoutNodeConfig
from gcloud.taskflow3.domains.context import TaskContext
from gcloud.constants import PROJECT, TASK_CREATE_METHOD, JobBizScopeType
from gcloud.contrib.analysis.analyse_items import task_flow_instance
from gcloud.contrib.operate_record.constants import OperateType, RecordType
from gcloud.contrib.operate_record.decorators import record_operation
from gcloud.contrib.operate_record.constants import RecordType, OperateType
from gcloud.taskflow3.apis.django.validators import (
StatusValidator,
DataValidator,
DetailValidator,
GetJobInstanceLogValidator,
TaskActionValidator,
NodesActionValidator,
SpecNodesTimerResetValidator,
TaskCloneValidator,
TaskFuncClaimValidator,
PreviewTaskTreeValidator,
QueryTaskCountValidator,
GetNodeLogValidator,
)
from gcloud.taskflow3.domains.dispatchers import NodeCommandDispatcher, TaskCommandDispatcher
from gcloud.taskflow3.domains.auto_retry import AutoRetryNodeStrategyCreator
from gcloud.core.models import EngineConfig
from gcloud.iam_auth.intercept import iam_intercept
from gcloud.iam_auth.view_interceptors.taskflow import (
BatchStatusViewInterceptor,
DataViewInterceptor,
DetailViewInterceptor,
TaskActionInterceptor,
GetNodeLogInterceptor,
NodesActionInterceptor,
SpecNodesTimerResetInpterceptor,
StatusViewInterceptor,
TaskActionInterceptor,
TaskCloneInpterceptor,
TaskFuncClaimInterceptor,
GetNodeLogInterceptor,
StatusViewInterceptor,
)
from gcloud.openapi.schema import AnnotationAutoSchema

from gcloud.taskflow3.apis.django.validators import (
BatchStatusValidator,
DataValidator,
DetailValidator,
GetJobInstanceLogValidator,
GetNodeLogValidator,
NodesActionValidator,
PreviewTaskTreeValidator,
QueryTaskCountValidator,
SpecNodesTimerResetValidator,
StatusValidator,
TaskActionValidator,
TaskCloneValidator,
TaskFuncClaimValidator,
)
from gcloud.taskflow3.domains.auto_retry import AutoRetryNodeStrategyCreator
from gcloud.taskflow3.domains.context import TaskContext
from gcloud.taskflow3.domains.dispatchers import NodeCommandDispatcher, TaskCommandDispatcher
from gcloud.taskflow3.models import TaskFlowInstance, TimeoutNodeConfig
from gcloud.utils.decorators import request_validate
from gcloud.utils.throttle import check_task_operation_throttle, get_task_operation_frequence
from pipeline_web.preview import preview_template_tree

logger = logging.getLogger("root")
Expand Down Expand Up @@ -102,12 +101,7 @@ def status(request, project_id):
message = _(f"任务查询失败: 任务[ID: {instance_id}]不存在, 请检查 | status")
logger.error(message)
return JsonResponse(
{
"result": False,
"message": message,
"data": None,
"code": err_code.CONTENT_NOT_EXIST.code,
}
{"result": False, "message": message, "data": None, "code": err_code.CONTENT_NOT_EXIST.code}
)

dispatcher = TaskCommandDispatcher(
Expand All @@ -117,6 +111,27 @@ def status(request, project_id):
return JsonResponse(result)


@require_POST
@request_validate(BatchStatusValidator)
@iam_intercept(BatchStatusViewInterceptor())
def batch_status(request, project_id):
"""用于批量获取独立子流程状态"""
body = json.loads(request.body)
task_ids = body.get("task_ids") or []
tasks = TaskFlowInstance.objects.filter(id__in=task_ids, project_id=project_id)
total_result = {"result": True, "data": {}, "code": err_code.SUCCESS.code, "message": ""}
for task in set(tasks):
dispatcher = TaskCommandDispatcher(
engine_ver=task.engine_ver,
taskflow_id=task.id,
pipeline_instance=task.pipeline_instance,
project_id=project_id,
)
total_result["data"][task.id] = dispatcher.get_task_status()

return JsonResponse(total_result)


@require_GET
@request_validate(DataValidator)
@iam_intercept(DataViewInterceptor())
Expand Down Expand Up @@ -241,13 +256,7 @@ def task_action(request, action, project_id):
allowed_times, scope_seconds = frequence_data
message = _(f"任务操作失败: 项目[ID: {project_id}]启动任务的极限: {allowed_times}/{scope_seconds}(单位:秒)")
logger.error(message)
return JsonResponse(
{
"result": False,
"message": message,
"code": err_code.INVALID_OPERATION.code,
}
)
return JsonResponse({"result": False, "message": message, "code": err_code.INVALID_OPERATION.code})

ctx = task.task_action(action, username)
return JsonResponse(ctx)
Expand Down Expand Up @@ -437,17 +446,9 @@ def get_node_log(request, project_id, node_id):

task = TaskFlowInstance.objects.get(pk=task_id, project_id=project_id)
if not task.has_node(node_id):
message = _(
f"节点状态请求失败: 任务[ID: {task.id}]中未找到节点[ID: {node_id}]. 请重试. 如持续失败可联系管理员处理 | get_node_log"
)
message = _(f"节点状态请求失败: 任务[ID: {task.id}]中未找到节点[ID: {node_id}]. 请重试. 如持续失败可联系管理员处理 | get_node_log")
logger.error(message)
return JsonResponse(
{
"result": False,
"data": None,
"message": message,
}
)
return JsonResponse({"result": False, "data": None, "message": message})

dispatcher = NodeCommandDispatcher(engine_ver=task.engine_ver, node_id=node_id, taskflow_id=task_id)
return JsonResponse(dispatcher.get_node_log(history_id))
Expand Down Expand Up @@ -480,9 +481,7 @@ def node_callback(request, token):
try:
callback_data = json.loads(request.body)
except Exception:
message = _(
f"节点回调失败: 无效的请求, 请重试. 如持续失败可联系管理员处理. {traceback.format_exc()} | api node_callback"
)
message = _(f"节点回调失败: 无效的请求, 请重试. 如持续失败可联系管理员处理. {traceback.format_exc()} | api node_callback")
logger.error(message)
return JsonResponse({"result": False, "message": message}, status=400)

Expand Down
12 changes: 11 additions & 1 deletion gcloud/taskflow3/apis/django/validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

import ujson as json

from gcloud.utils.validate import RequestValidator, ObjectJsonBodyValidator
from gcloud.utils.validate import ObjectJsonBodyValidator, RequestValidator


class StatusValidator(RequestValidator):
Expand All @@ -27,6 +27,16 @@ def validate(self, request, *args, **kwargs):
return True, ""


class BatchStatusValidator(RequestValidator):
def validate(self, request, *args, **kwargs):
try:
json.loads(request.body)
except Exception as e:
return False, f"request body is not a valid json string: {e}"

return True, ""


class DataValidator(RequestValidator):
def validate(self, request, *args, **kwargs):
task_id = request.GET.get("instance_id")
Expand Down
5 changes: 3 additions & 2 deletions gcloud/taskflow3/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,20 @@
specific language governing permissions and limitations under the License.
"""

from django.conf.urls import include, url
from django.urls import path
from django.conf.urls import url, include

from gcloud.taskflow3.apis.django import api
from gcloud.taskflow3.apis.django.v4.urls import v4_urlpatterns
from gcloud.taskflow3.apis.drf.viewsets.render_current_constants import RenderCurrentConstantsView
from gcloud.taskflow3.apis.drf.viewsets.engine_v2_node_log import EngineV2NodeLogView
from gcloud.taskflow3.apis.drf.viewsets.preview_task_tree import PreviewTaskTreeWithSchemesView
from gcloud.taskflow3.apis.drf.viewsets.render_current_constants import RenderCurrentConstantsView
from gcloud.taskflow3.apis.drf.viewsets.update_task_constants import UpdateTaskConstantsView

urlpatterns = [
url(r"^api/context/$", api.context),
url(r"^api/status/(?P<project_id>\d+)/$", api.status),
url(r"^api/batch_status/(?P<project_id>\d+)/$", api.batch_status),
url(r"^api/clone/(?P<project_id>\d+)/$", api.task_clone),
url(r"^api/action/(?P<action>\w+)/(?P<project_id>\d+)/$", api.task_action),
url(r"^api/flow/claim/(?P<project_id>\d+)/$", api.task_func_claim),
Expand Down

0 comments on commit 75dec4f

Please sign in to comment.