Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove some deepcopy to speed up workflow conductor #256

Open
wants to merge 34 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
6a6f0b8
remove some deepcopy to speed up workflow conductor
guzzijones Jul 7, 2023
0f5d374
Update tox.yml
guzzijones Jul 8, 2023
73522e4
Update tox.yml
guzzijones Jul 8, 2023
46c7546
Update tox.yml
guzzijones Jul 8, 2023
3bff3e0
black formatting
guzzijones Jul 10, 2023
3805276
add back python 3.8
guzzijones Jul 10, 2023
8ce4a42
try 18.04
guzzijones Jul 10, 2023
61bf846
switch back to 20.04
guzzijones Jul 10, 2023
bbd2ea4
remove unittest2
guzzijones Jul 10, 2023
cb52348
install setup.py to load entrypoints
guzzijones Jul 10, 2023
7bdb543
set language
guzzijones Jul 10, 2023
8fd8bb1
remove another deepcopy
guzzijones Jul 10, 2023
aac13bf
remove staged copy
guzzijones Jul 10, 2023
4712345
remove reruns deepcopy
guzzijones Jul 10, 2023
f5caf3a
remove serialize parent_ctx copy
guzzijones Jul 10, 2023
6478f9b
deepcopy machine.py, deepcopy graphing.py, conducting.py
guzzijones Jul 11, 2023
0f58986
remove json_util
guzzijones Jul 11, 2023
d3541fb
need to copy staged so it isn't mutated for with items
guzzijones Jul 12, 2023
6e3866a
ensure staged is deep copied
guzzijones Jul 24, 2023
62a23a5
flake8 lint fix
guzzijones Jul 24, 2023
1364dca
add back deepcopy for errors as they are also mutated in st2
guzzijones Jul 27, 2023
d9816fa
do not copy with items context; added benchmarks
guzzijones Jan 23, 2024
016f825
remove comment about task render
guzzijones Jan 23, 2024
8cf2ff8
add test requirements
guzzijones Jan 23, 2024
3e7cfb4
fix conflicts
guzzijones Jan 23, 2024
eb09856
typo in benchmark vs benchmarks
guzzijones Jan 23, 2024
c5dbc16
add __init__.py so imports work
guzzijones Jan 23, 2024
84a6aa2
remove unused import ctx_util
guzzijones Jan 23, 2024
a1f4685
flake8 fixes
guzzijones Jan 23, 2024
3c2db2c
add license file
guzzijones Jan 23, 2024
429f0e0
remove extras require
guzzijones Jan 23, 2024
25cf175
more deep copy removed
guzzijones Jan 30, 2024
d4c848b
flake fix
guzzijones Jan 30, 2024
ce8fcdf
Merge branch 'master' into nocopy
guzzijones Sep 13, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/tox.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ on:

jobs:
tests:
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
strategy:
matrix:
python-version: ["3.6", "3.8"]
Expand All @@ -27,6 +27,7 @@ jobs:
run: |
python -m pip install --upgrade pip
pip install tox tox-gh-actions
pip install .
- name: "Run tox for ${{ matrix.python-version }}"
run: |
tox
2 changes: 1 addition & 1 deletion docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
#
# This is also used if you do content translation via gettext catalogs.
# Usually you set "language" from the command line for these cases.
language = None
language = 'English'

# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
Expand Down
56 changes: 27 additions & 29 deletions orquesta/conducting.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,29 +49,29 @@ def __init__(self, conductor=None):

def serialize(self):
data = {
"contexts": json_util.deepcopy(self.contexts),
"routes": json_util.deepcopy(self.routes),
"sequence": json_util.deepcopy(self.sequence),
"staged": json_util.deepcopy(self.staged),
"contexts": self.contexts,
"routes": self.routes,
"sequence": self.sequence,
"staged": self.staged,
"status": self.status,
"tasks": json_util.deepcopy(self.tasks),
"tasks": self.tasks,
}

if self.reruns:
data["reruns"] = json_util.deepcopy(self.reruns)
data["reruns"] = self.reruns

return data

@classmethod
def deserialize(cls, data):
instance = cls()
instance.contexts = json_util.deepcopy(data.get("contexts", list()))
instance.routes = json_util.deepcopy(data.get("routes", list()))
instance.sequence = json_util.deepcopy(data.get("sequence", list()))
instance.contexts = data.get("contexts", list())
instance.routes = data.get("routes", list())
instance.sequence = data.get("sequence", list())
instance.staged = json_util.deepcopy(data.get("staged", list()))
instance.status = data.get("status", statuses.UNSET)
instance.tasks = json_util.deepcopy(data.get("tasks", dict()))
instance.reruns = json_util.deepcopy(data.get("reruns", list()))
instance.tasks = data.get("tasks", dict())
instance.reruns = data.get("reruns", list())

return instance

Expand Down Expand Up @@ -279,10 +279,10 @@ def serialize(self):
"spec": self.spec.serialize(),
"graph": self.graph.serialize(),
"input": self.get_workflow_input(),
"context": self.get_workflow_parent_context(),
"context": self._parent_ctx,
"state": self.workflow_state.serialize(),
"log": json_util.deepcopy(self.log),
"errors": json_util.deepcopy(self.errors),
"log": self.log,
"errors": self.errors,
"output": self.get_workflow_output(),
}

Expand All @@ -292,12 +292,12 @@ def deserialize(cls, data):
spec = spec_module.WorkflowSpec.deserialize(data["spec"])

graph = graphing.WorkflowGraph.deserialize(data["graph"])
inputs = json_util.deepcopy(data["input"])
context = json_util.deepcopy(data["context"])
inputs = data["input"]
context = data["context"]
state = WorkflowState.deserialize(data["state"])
log = json_util.deepcopy(data.get("log", []))
log = data.get("log", [])
errors = json_util.deepcopy(data["errors"])
guzzijones marked this conversation as resolved.
Show resolved Hide resolved
outputs = json_util.deepcopy(data["output"])
outputs = data["output"]

instance = cls(spec)
instance.restore(graph, log, errors, state, inputs, outputs, context)
Expand All @@ -317,7 +317,7 @@ def workflow_state(self):
self._workflow_state = WorkflowState(conductor=self)

# Set any given context as the initial context.
init_ctx = self.get_workflow_parent_context()
init_ctx = self.get_workflow_parent_context_copy()

# Render workflow inputs and merge into the initial context.
workflow_input = self.get_workflow_input()
Expand Down Expand Up @@ -370,7 +370,6 @@ def log_entry(
result=None,
data=None,
):

# Check entry type.
if entry_type not in ["info", "warn", "error"]:
raise exc.WorkflowLogEntryError('The log entry type "%s" is not valid.' % entry_type)
Expand Down Expand Up @@ -408,11 +407,11 @@ def log_errors(self, errors, task_id=None, route=None, task_transition_id=None):
error, task_id=task_id, route=route, task_transition_id=task_transition_id
)

def get_workflow_parent_context(self):
def get_workflow_parent_context_copy(self):
return json_util.deepcopy(self._parent_ctx)

def get_workflow_input(self):
return json_util.deepcopy(self._inputs)
return self._inputs

def get_workflow_status(self):
return self.workflow_state.status
Expand Down Expand Up @@ -461,7 +460,7 @@ def request_workflow_status(self, status):
raise exc.InvalidWorkflowStatusTransition(current_status, wf_ex_event.name)

def get_workflow_initial_context(self):
return json_util.deepcopy(self.workflow_state.contexts[0])
return self.workflow_state.contexts[0]

def get_workflow_terminal_context(self):
if self.get_workflow_status() not in statuses.COMPLETED_STATUSES:
Expand All @@ -482,8 +481,7 @@ def get_workflow_terminal_context(self):
for idx, task in other_term_tasks:
# Remove the initial context since the first task processed above already
# inclulded that and we only want to apply the differences.
in_ctx_idxs = json_util.deepcopy(task["ctxs"]["in"])
in_ctx_idxs.remove(0)
in_ctx_idxs = [i for index, i in enumerate(task["ctxs"]["in"]) if index != 0]

wf_term_ctx = dict_util.merge_dicts(
wf_term_ctx, self.get_task_context(in_ctx_idxs), overwrite=True
Expand Down Expand Up @@ -513,7 +511,7 @@ def render_workflow_output(self):
self.request_workflow_status(statuses.FAILED)

def get_workflow_output(self):
return json_util.deepcopy(self._outputs) if self._outputs else None
return self._outputs if self._outputs else None

def reset_workflow_output(self):
self._outputs = None
Expand Down Expand Up @@ -780,7 +778,7 @@ def setup_retry_in_task_state(self, task_state_entry, in_ctx_idxs):
# Setup the retry in the task state.
task_id = task_state_entry["id"]
task_retry_spec = self.graph.get_task_retry_spec(task_id)
task_state_entry["retry"] = json_util.deepcopy(task_retry_spec)
task_state_entry["retry"] = task_retry_spec
task_state_entry["retry"]["tally"] = 0

# Get task context for evaluating the expression in delay and count.
Expand Down Expand Up @@ -1186,8 +1184,8 @@ def get_task_transition_contexts(self, task_id, route):

def _request_task_rerun(self, task_id, route, reset_items=False):
task = self.workflow_state.get_task(task_id, route)
task_ctx = json_util.deepcopy(task["ctxs"]["in"])
task_prev = json_util.deepcopy(task["prev"])
task_ctx = task["ctxs"]["in"]
task_prev = task["prev"]
task_spec = self.spec.tasks.get_task(task_id)

# Reset terminal status for the rerunnable candidate.
Expand Down
2 changes: 1 addition & 1 deletion orquesta/graphing.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def serialize(self):

@classmethod
def deserialize(cls, data):
g = json_graph.adjacency_graph(json_util.deepcopy(data), directed=True, multigraph=True)
g = json_graph.adjacency_graph(data, directed=True, multigraph=True)
return cls(graph=g)

@staticmethod
Expand Down
11 changes: 7 additions & 4 deletions orquesta/machines.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from orquesta import events
from orquesta import exceptions as exc
from orquesta import statuses
from orquesta.utils import jsonify as json_util


LOG = logging.getLogger(__name__)
Expand Down Expand Up @@ -527,11 +526,15 @@ def add_context_to_task_item_event(cls, workflow_state, task_id, task_route, ac_
if ac_ex_event.status in requirements:
# Make a copy of the items and remove current item under evaluation.
staged_task = workflow_state.get_staged_task(task_id, task_route)
items = json_util.deepcopy(staged_task["items"])
del items[ac_ex_event.item_id]
items_status = [item.get("status", statuses.UNSET) for item in items]
items = staged_task["items"]
items_status = [
item.get("status", statuses.UNSET)
for index, item in enumerate(items)
if index != ac_ex_event.item_id
]

# Assess various situations.
# todo(aj) loop over list one time and add to each list
active = list(filter(lambda x: x in statuses.ACTIVE_STATUSES, items_status))
incomplete = list(filter(lambda x: x not in statuses.COMPLETED_STATUSES, items_status))
paused = list(filter(lambda x: x in [statuses.PENDING, statuses.PAUSED], items_status))
Expand Down
2 changes: 0 additions & 2 deletions orquesta/tests/hacking/import_aliases_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@


def get_alias(logical_line):

parts = logical_line.split()

if (
Expand All @@ -68,7 +67,6 @@ def get_alias(logical_line):
and parts[1] != "__future__"
and not core.is_import_exception(parts[1])
):

# from path.to.module import module
if len(parts) == 4:
return ".".join([parts[1], parts[3]]), None
Expand Down
11 changes: 0 additions & 11 deletions orquesta/tests/unit/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ def format_task_item(
items_count=None,
items_concurrency=None,
):

if not actions and items_count is None:
actions = [{"action": spec.action, "input": spec.input}]

Expand Down Expand Up @@ -210,20 +209,11 @@ def assert_task_list(self, conductor, actual, expected):
expected_copy = copy.deepcopy(expected)

for task in actual_copy:
for staged_task in task["ctx"]["__state"]["staged"]:
if "items" in staged_task:
del staged_task["items"]

task["spec"] = task["spec"].serialize()

for task in expected_copy:
task["ctx"]["__current_task"] = {"id": task["id"], "route": task["route"]}
task["ctx"]["__state"] = conductor.workflow_state.serialize()

for staged_task in task["ctx"]["__state"]["staged"]:
if "items" in staged_task:
del staged_task["items"]

task["spec"] = task["spec"].serialize()

self.assertListEqual(actual_copy, expected_copy)
Expand Down Expand Up @@ -285,7 +275,6 @@ def assert_task_items(
concurrency=None,
mock_ac_ex_results=None,
):

# Set up test cases.
tests = list(zip(mock_ac_ex_statuses, expected_task_statuses, expected_workflow_statuses))
tk_ex_result = [None] * len(items)
Expand Down
4 changes: 2 additions & 2 deletions orquesta/tests/unit/conducting/test_workflow_conductor.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def _prep_conductor(self, context=None, inputs=None, status=None):
self.assertDictEqual(conductor._inputs, user_inputs)
self.assertDictEqual(conductor.get_workflow_input(), user_inputs)
self.assertDictEqual(conductor._parent_ctx, parent_context)
self.assertDictEqual(conductor.get_workflow_parent_context(), parent_context)
self.assertDictEqual(conductor.get_workflow_parent_context_copy(), parent_context)

default_inputs = {"a": None, "b": False}
init_ctx_value = dict_util.merge_dicts(default_inputs, user_inputs, True)
Expand Down Expand Up @@ -282,7 +282,7 @@ def test_serialization(self):
"spec": conductor.spec.serialize(),
"graph": conductor.graph.serialize(),
"state": conductor.workflow_state.serialize(),
"context": conductor.get_workflow_parent_context(),
"context": conductor.get_workflow_parent_context_copy(),
"input": conductor.get_workflow_input(),
"output": conductor.get_workflow_output(),
"errors": conductor.errors,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@


class WorkflowConductorDataFlowTest(test_base.WorkflowConductorTest):

wf_def_yaql = """
version: 1.0

Expand Down Expand Up @@ -157,14 +156,14 @@ def assert_data_flow(self, input_value):

def assert_unicode_data_flow(self, input_value):
inputs = {
u"a1": (
"a1": (
str_util.unicode(input_value, encoding_type="utf-8", force=True)
if six.PY2
else input_value
)
}

expected_output = {u"a5": inputs["a1"], u"b5": inputs["a1"]}
expected_output = {"a5": inputs["a1"], "b5": inputs["a1"]}

self._assert_data_flow(inputs, expected_output)

Expand Down
20 changes: 20 additions & 0 deletions orquesta/tests/unit/conducting/test_workflow_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,26 @@ def test_get_tasks_by_task_id(self):

self.assertListEqual(actual_task_sequence, expected_task_sequence)

def test_stage_is_deepcopied(self):
# if staged is not deep copied then an st2 workflow with a failed
# with_items task will never finish running. It will remain in a
# running state forever. I believe it is due to iteration over staged
# tasks and mutation of the staged section.
data = copy.deepcopy(MOCK_WORKFLOW_STATE)

task_sequence = [
{"id": "task1", "route": 0},
{"id": "task2", "route": 0},
{"id": "task2", "route": 1},
{"id": "task2", "route": 2},
{"id": "task3", "route": 0},
]

data["sequence"] = copy.deepcopy(task_sequence)
state = conducting.WorkflowState.deserialize(data)
MOCK_WORKFLOW_STATE["staged"] = ["something"]
self.assertNotEqual(len(state.staged), len(MOCK_WORKFLOW_STATE["staged"]))

def test_get_tasks_by_task_id_and_route(self):
data = copy.deepcopy(MOCK_WORKFLOW_STATE)

Expand Down
2 changes: 1 addition & 1 deletion orquesta/tests/unit/utils/test_strings.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def test_unescape(self):
def test_unicode(self):
self.assertEqual(str_util.unicode(123), 123)
self.assertEqual(str_util.unicode("foobar"), "foobar")
self.assertEqual(str_util.unicode(u"fubar" if six.PY2 else str("fubar")), "fubar")
self.assertEqual(str_util.unicode("fubar" if six.PY2 else str("fubar")), "fubar")
self.assertEqual(str_util.unicode("鐵甲奇俠"), "鐵甲奇俠")
self.assertEqual(str_util.unicode("\xe9\x90\xb5\xe7\x94\xb2"), "\xe9\x90\xb5\xe7\x94\xb2")

Expand Down
1 change: 0 additions & 1 deletion requirements-test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,3 @@ nosexcover
pep8>=1.6.0,<1.7
pylint>=2.5.2,<2.6
twine
unittest2