Skip to content

Commit

Permalink
feat(azuredevops): restranformate without api client (#8035)
Browse files Browse the repository at this point in the history
* feat(plugins): restranformate without api client

* fix(zentao): fix test errors

* fix(bamboo): fix e2e test

* fix(jira): fix e2e test

* fix(zentao): fix e2e test

* fix(bamboo): fix e2e test

* feat(azuredevops): support param skipCollectors
  • Loading branch information
d4x1 authored Sep 12, 2024
1 parent b80340b commit b5ac0a3
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 26 deletions.
4 changes: 2 additions & 2 deletions backend/python/pydevlake/pydevlake/ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def test_connection(self, _, connection: dict):
return self._plugin.test_connection(connection)

@plugin_method
def make_pipeline(self, _, scope_config_pairs: list[tuple[dict, dict]], connection: dict):
def make_pipeline(self, _, scope_config_pairs: list[tuple[dict, dict]], connection: dict,skip_collectors: bool):
connection = self._plugin.connection_type(**connection)
scope_config_pairs = [
(
Expand All @@ -102,7 +102,7 @@ def make_pipeline(self, _, scope_config_pairs: list[tuple[dict, dict]], connecti
)
for raw_scope, raw_config in scope_config_pairs
]
return self._plugin.make_pipeline(scope_config_pairs, connection)
return self._plugin.make_pipeline(scope_config_pairs, connection, skip_collectors)

@plugin_method
def plugin_info(self, _):
Expand Down
50 changes: 27 additions & 23 deletions backend/python/pydevlake/pydevlake/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,23 @@
# limitations under the License.


from typing import Type, Union, Iterable, Optional
from abc import ABC, abstractmethod
from pathlib import Path
import os
import sys
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Type, Union, Iterable, Optional

import fire

import pydevlake.message as msg
import pydevlake.model_info
from pydevlake.subtasks import Subtask
from pydevlake.logger import logger
from pydevlake.ipc import PluginCommands
from pydevlake.context import Context
from pydevlake.stream import Stream
from pydevlake.model import ToolScope, DomainScope, Connection, ScopeConfig, raw_data_params
from pydevlake.ipc import PluginCommands
from pydevlake.logger import logger
from pydevlake.migration import MIGRATION_SCRIPTS

from pydevlake.model import ToolScope, DomainScope, Connection, ScopeConfig, raw_data_params
from pydevlake.stream import Stream
from pydevlake.subtasks import Subtask

ScopeConfigPair = tuple[ToolScope, ScopeConfig]

Expand Down Expand Up @@ -119,20 +118,20 @@ def make_remote_scopes(self, connection: Connection, group_id: Optional[str] = N
tool_scope.raw_data_params = raw_data_params(connection.id, tool_scope.id)
tool_scope.raw_data_table = self._raw_scope_table_name()
yield msg.RemoteScope(
id=tool_scope.id,
parent_id=group_id,
name=tool_scope.name,
data=tool_scope
)
id=tool_scope.id,
parent_id=group_id,
name=tool_scope.name,
data=tool_scope
)
else:
yield from self.remote_scope_groups(connection)

def make_pipeline(self, scope_config_pairs: list[ScopeConfigPair],
connection: Connection) -> msg.PipelineData:
connection: Connection, skip_collectors: bool) -> msg.PipelineData:
"""
Make a simple pipeline using the scopes declared by the plugin.
"""
plan = self.make_pipeline_plan(scope_config_pairs, connection)
plan = self.make_pipeline_plan(scope_config_pairs, connection, skip_collectors)
domain_scopes = []
for tool_scope, _ in scope_config_pairs:
for scope in self.domain_scopes(tool_scope):
Expand All @@ -151,13 +150,14 @@ def make_pipeline(self, scope_config_pairs: list[ScopeConfigPair],
)

def make_pipeline_plan(self, scope_config_pairs: list[ScopeConfigPair],
connection: Connection) -> list[list[msg.PipelineTask]]:
connection: Connection, skip_collectors: bool) -> list[list[msg.PipelineTask]]:
"""
Generate a pipeline plan with one stage per scope, plus optional additional stages.
Redefine `extra_stages` to add stages at the end of this pipeline.
"""
return [
*(self.make_pipeline_stage(scope, config, connection) for scope, config in scope_config_pairs),
*(self.make_pipeline_stage(scope, config, connection, skip_collectors) for scope, config in
scope_config_pairs),
*self.extra_stages(scope_config_pairs, connection)
]

Expand All @@ -170,7 +170,7 @@ def extra_stages(self, scope_config_pairs: list[ScopeConfigPair],
return []

def make_pipeline_stage(self, scope: ToolScope, config: ScopeConfig,
connection: Connection) -> list[msg.PipelineTask]:
connection: Connection, skip_collectors: bool) -> list[msg.PipelineTask]:
"""
Generate a pipeline stage for the given scope, plus optional additional tasks.
Subtasks are selected from `entity_types` via `select_subtasks`.
Expand All @@ -180,7 +180,7 @@ def make_pipeline_stage(self, scope: ToolScope, config: ScopeConfig,
msg.PipelineTask(
plugin=self.name,
skip_on_fail=False,
subtasks=self.select_subtasks(scope, config),
subtasks=self.select_subtasks(scope, config, skip_collectors),
options={
"scopeId": scope.id,
"scopeName": scope.name,
Expand All @@ -196,14 +196,17 @@ def extra_tasks(self, scope: ToolScope, config: ScopeConfig,
"""Override this method to add tasks to the given scope stage"""
return []

def select_subtasks(self, scope: ToolScope, config: ScopeConfig) -> list[str]:
def select_subtasks(self, scope: ToolScope, config: ScopeConfig, skip_collectors: bool) -> list[str]:
"""
Returns the list of subtasks names that should be run for given scope and entity types.
"""
subtasks = []
for stream in self._streams.values():
if set(stream.domain_types).intersection(config.domain_types) and stream.should_run_on(scope):
for subtask in stream.subtasks:
target_subtasks = stream.subtasks
if skip_collectors:
target_subtasks = stream.subtasks_without_collector
for subtask in target_subtasks:
subtasks.append(subtask.name)
return subtasks

Expand Down Expand Up @@ -234,7 +237,8 @@ def plugin_info(self) -> msg.PluginInfo:
connection_model_info=pydevlake.model_info.DynamicModelInfo.from_model(self.connection_type),
scope_model_info=pydevlake.model_info.DynamicModelInfo.from_model(self.tool_scope_type),
scope_config_model_info=pydevlake.model_info.DynamicModelInfo.from_model(self.scope_config_type),
tool_model_infos=[pydevlake.model_info.DynamicModelInfo.from_model(stream.tool_model) for stream in self._streams.values()],
tool_model_infos=[pydevlake.model_info.DynamicModelInfo.from_model(stream.tool_model) for stream in
self._streams.values()],
subtask_metas=subtask_metas,
migration_scripts=MIGRATION_SCRIPTS
)
Expand Down
4 changes: 4 additions & 0 deletions backend/python/pydevlake/pydevlake/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ def __init__(self, plugin_name: str):
def subtasks(self):
return [self.collector, self.extractor, self.convertor]

@property
def subtasks_without_collector(self):
return [self.extractor, self.convertor]

@property
def name(self):
return type(self).__name__.lower()
Expand Down
2 changes: 1 addition & 1 deletion backend/server/services/remote/plugin/plugin_extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (p remoteDatasourcePlugin) MakeDataSourcePipelinePlanV200(
}

planData := models.PipelineData{}
err = p.invoker.Call("make-pipeline", bridge.DefaultContext, toolScopeConfigPairs, connection.Unwrap()).Get(&planData)
err = p.invoker.Call("make-pipeline", bridge.DefaultContext, toolScopeConfigPairs, connection.Unwrap(), skipCollectors).Get(&planData)
if err != nil {
return nil, nil, err
}
Expand Down

0 comments on commit b5ac0a3

Please sign in to comment.