diff --git a/lib/syskit/network_generation/engine.rb b/lib/syskit/network_generation/engine.rb index d969caa7d..90ef2cc26 100644 --- a/lib/syskit/network_generation/engine.rb +++ b/lib/syskit/network_generation/engine.rb @@ -132,21 +132,61 @@ def compute_deployed_network( def apply_deployed_network_to_plan # Finally, we map the deployed network to the currently # running tasks - @deployment_tasks, @deployed_tasks = + @deployment_tasks, @reused_deployed_tasks, @new_deployed_tasks = log_timepoint_group "finalize_deployed_tasks" do finalize_deployed_tasks end + @deployed_tasks = @reused_deployed_tasks + @new_deployed_tasks + + sever_old_plan_from_new_plan if @dataflow_dynamics @dataflow_dynamics.apply_merges(merge_solver) log_timepoint "apply_merged_to_dataflow_dynamics" end + Engine.deployment_postprocessing.each do |block| block.call(self, work_plan) log_timepoint "postprocessing:#{block}" end end + # "Cut" relations between the "old" plan and the new one + # + # At this stage, old components (task contexts and compositions) + # that are not part of the new plan may still be child of bits of + # the new plan. This happens if they are added as children of other + # task contexts. The transformer does this to register dynamic + # transformation producers + # + # This pass looks for all proxies of compositions and task contexts + # that are not the target of a merge operation. When this happens, + # we know that the component is not being reused, and we remove all + # dependency relations where it is child and where the parent is + # "useful" + # + # Note that we do this only for relations between Syskit + # components. Relations with "plan" Roby tasks are updated because + # we replace toplevel tasks. + def sever_old_plan_from_new_plan + old_tasks = + work_plan + .find_local_tasks(Syskit::Component) + .find_all(&:transaction_proxy?) + + merge_leaves = merge_solver.each_merge_leaf.to_set + old_tasks.each do |old_task| + next if merge_leaves.include?(old_task) + + parents = + old_task + .each_parent_task + .find_all { |t| merge_leaves.include?(t) } + + parents.each { |t| t.remove_child(old_task) } + end + end + class << self # Set of blocks registered with # register_instanciation_postprocessing @@ -349,8 +389,9 @@ def finalize_deployed_tasks end log_timepoint "select_deployments" - reused_deployed_tasks = - reconfigure_tasks_on_static_port_modification(reused_deployed_tasks) + reconfigure_tasks_on_static_port_modification( + reused_deployed_tasks, newly_deployed_tasks + ) log_timepoint "reconfigure_tasks_on_static_port_modification" debug do @@ -366,7 +407,7 @@ def finalize_deployed_tasks merge_solver.merge_identical_tasks log_timepoint "merge" - [selected_deployment_tasks, reused_deployed_tasks | newly_deployed_tasks] + [selected_deployment_tasks, reused_deployed_tasks, newly_deployed_tasks] end # Process a single deployment in {#finalize_deployed_tasks} @@ -395,18 +436,18 @@ def handle_required_deployment(required, usable, not_reusable) end if usable - newly_deployed_tasks = [] - reused_deployed_tasks = adapt_existing_deployment(required, usable) + new_deployed_tasks, reused_deployed_tasks = + adapt_existing_deployment(required, usable) selected = usable else # Nothing to do, we leave the plan as it is - newly_deployed_tasks = required.each_executed_task + new_deployed_tasks = required.each_executed_task reused_deployed_tasks = [] selected = required end selected.should_start_after(not_reusable.stop_event) if not_reusable - [selected, newly_deployed_tasks, reused_deployed_tasks] + [selected, new_deployed_tasks, reused_deployed_tasks] end # Validate that the usable deployment we found is actually usable @@ -525,9 +566,9 @@ def import_existing_deployments(used_deployments) # Note that tasks that are already reconfigured because of # {#adapt_existing_deployment} will be fine as the task is not # configured yet - def reconfigure_tasks_on_static_port_modification(deployed_tasks) - final_deployed_tasks = deployed_tasks.dup - + def reconfigure_tasks_on_static_port_modification( + reused_deployed_tasks, newly_deployed_tasks + ) # We filter against 'deployed_tasks' to always select the tasks # that have been selected in this deployment. It does mean that # the task is always the 'current' one, that is we would pick @@ -538,7 +579,7 @@ def reconfigure_tasks_on_static_port_modification(deployed_tasks) .find_tasks(Syskit::TaskContext).not_finished.not_finishing .find_all { |t| !t.read_only? } .find_all do |t| - deployed_tasks.include?(t) && (t.setting_up? || t.setup?) + reused_deployed_tasks.include?(t) && (t.setting_up? || t.setup?) end already_setup_tasks.each do |t| @@ -552,10 +593,9 @@ def reconfigure_tasks_on_static_port_modification(deployed_tasks) new_task = t.execution_agent.task(t.orocos_name, t.concrete_model) merge_solver.apply_merge_group(t => new_task) new_task.should_configure_after t.stop_event - final_deployed_tasks.delete(t) - final_deployed_tasks << new_task + reused_deployed_tasks.delete(t) + newly_deployed_tasks << new_task end - final_deployed_tasks end # Find the "last" deployed task in a set of related deployed tasks @@ -597,14 +637,13 @@ def adapt_existing_deployment(deployment_task, existing_deployment_task) (orocos_name_to_existing[t.orocos_name] ||= []) << t end - applied_merges = Set.new deployed_tasks = deployment_task.each_executed_task.to_a + new_deployed_tasks = [] + reused_deployed_tasks = [] deployed_tasks.each do |task| existing_tasks = orocos_name_to_existing[task.orocos_name] || [] - unless existing_tasks.empty? - existing_task = find_current_deployed_task(existing_tasks) - end + existing_task = find_current_deployed_task(existing_tasks) if !existing_task || !task.can_be_deployed_by?(existing_task) debug do @@ -629,25 +668,19 @@ def adapt_existing_deployment(deployment_task, existing_deployment_task) "to finish before reconfiguring" end - parent_task_contexts = - previous_task - .each_parent_task - .find_all { |t| t.kind_of?(Syskit::TaskContext) } - - parent_task_contexts.each do |t| - t.remove_child(previous_task) - end new_task.should_configure_after(previous_task.stop_event) end + new_deployed_tasks << new_task existing_task = new_task + else + reused_deployed_tasks << existing_task end merge_solver.apply_merge_group(task => existing_task) - applied_merges << existing_task debug { " using #{existing_task} for #{task} (#{task.orocos_name})" } end work_plan.remove_task(deployment_task) - applied_merges + [new_deployed_tasks, reused_deployed_tasks] end # Computes the set of requirement tasks that should be used for @@ -830,8 +863,6 @@ def apply_system_network_to_plan( log_timepoint "final_network_postprocessing:#{block}" end - # Finally, we should now only have deployed tasks. Verify it - # and compute the connection policies if garbage_collect && validate_final_network validate_final_network(required_instances, work_plan, compute_deployments: compute_deployments) @@ -839,6 +870,8 @@ def apply_system_network_to_plan( end commit_work_plan + + validate_reconfigured_tasks_are_not_held(@new_deployed_tasks) end def discard_work_plan @@ -899,7 +932,13 @@ def handle_resolution_exception(e, on_error: :discard) def validate_final_network( required_instances, plan, compute_deployments: true ) - # Check that all device instances are proper tasks (not proxies) + validate_required_instances_are_tasks(required_instances) + + super if defined? super + end + + def validate_required_instances_are_tasks(required_instances) + # Check that the final set of root required instances are proper tasks required_instances.each do |_req_task, task| if task.transaction_proxy? raise InternalError, @@ -910,8 +949,32 @@ def validate_final_network( "instance definition #{task} has been removed from plan" end end + end - super if defined? super + # Exception added to the plan when we detect that a task being reconfigured + # is held against garbage collection. + # + # This is an internal error (i.e. should not happen), but not triggering + # this causes the system to "hold on" forever. + class InternalErrorReconfiguredTaskIsHeld < Roby::LocalizedError + end + + # Validate that "old" tasks in a reconfigured pair will be garbage collected + # + # This must be called at the very end + def validate_reconfigured_tasks_are_not_held(new_deployed_tasks) + reconfigured_tasks = new_deployed_tasks.flat_map do |task| + task.start_event.parent_objects( + Roby::EventStructure::SyskitConfigurationPrecedence + ).map(&:task).to_a + end + + useful_tasks = real_plan.useful_tasks + reconfigured_tasks.each do |t| + next unless useful_tasks.include?(t) + + t.add_error(InternalErrorReconfiguredTaskIsHeld.new(t)) + end end @@dot_index = 0 diff --git a/lib/syskit/network_generation/merge_solver.rb b/lib/syskit/network_generation/merge_solver.rb index e59b750e4..ee2d57527 100644 --- a/lib/syskit/network_generation/merge_solver.rb +++ b/lib/syskit/network_generation/merge_solver.rb @@ -572,6 +572,14 @@ def display_merge_graph(title, merge_graph) break end end + + def each_merge_leaf + return enum_for(__method__) unless block_given? + + task_replacement_graph.each_vertex do |v| + yield(v) if task_replacement_graph.leaf?(v) + end + end end end end diff --git a/lib/syskit/test/network_manipulation.rb b/lib/syskit/test/network_manipulation.rb index f4ad9329e..ebc6a22fd 100644 --- a/lib/syskit/test/network_manipulation.rb +++ b/lib/syskit/test/network_manipulation.rb @@ -210,11 +210,13 @@ def syskit_deploy( resolve_options = Hash[on_error: :commit].merge(resolve_options) begin syskit_engine_resolve_handle_plan_export do - syskit_engine ||= Syskit::NetworkGeneration::Engine.new(plan) - syskit_engine.resolve( - default_deployment_group: default_deployment_group, - **resolve_options - ) + execute do + syskit_engine ||= Syskit::NetworkGeneration::Engine.new(plan) + syskit_engine.resolve( + default_deployment_group: default_deployment_group, + **resolve_options + ) + end end rescue StandardError => e expect_execution do diff --git a/test/network_generation/test_engine.rb b/test/network_generation/test_engine.rb index a53c61644..c3fdd67e9 100644 --- a/test/network_generation/test_engine.rb +++ b/test/network_generation/test_engine.rb @@ -163,92 +163,55 @@ def work_plan end describe "#reconfigure_tasks_on_static_port_modification" do - it "reconfigures already-configured tasks whose static input ports have been modified" do - task = syskit_stub_deploy_and_configure("Task", as: "task") { input_port("in", "/double").static } - proxy = work_plan[task] - flexmock(proxy).should_receive(:transaction_modifies_static_ports?).once.and_return(true) - syskit_engine.reconfigure_tasks_on_static_port_modification([proxy]) + attr_reader :task, :proxy + + before do + @task = syskit_stub_deploy_and_configure("Task", as: "task") do + input_port("in", "/double").static + end + @proxy = work_plan[task] + end + + it "reconfigures already-configured tasks whose static input "\ + "ports have been modified" do + flexmock(proxy).should_receive(:transaction_modifies_static_ports?) + .once.and_return(true) + + syskit_engine.reconfigure_tasks_on_static_port_modification( + reused = [proxy], new = [] + ) + assert_equal [], reused tasks = work_plan.find_local_tasks(Syskit::TaskContext) .with_arguments(orocos_name: task.orocos_name).to_a - assert_equal 2, tasks.size - tasks.delete(proxy) - new_task = tasks.first + assert_equal(Set[proxy, *new], tasks.to_set) + new_task = new.first - assert_child_of proxy.stop_event, new_task.start_event, - Roby::EventStructure::SyskitConfigurationPrecedence + assert_configures_after proxy.stop_event, new_task.start_event end - it "does not reconfigure already-configured tasks whose static input ports have not been modified" do - task = syskit_stub_deploy_and_configure("Task", as: "task") { input_port("in", "/double").static } - proxy = work_plan[task] - flexmock(proxy).should_receive(:transaction_modifies_static_ports?).once.and_return(false) - syskit_engine.reconfigure_tasks_on_static_port_modification([proxy]) + it "does not reconfigure already-configured tasks whose "\ + "static input ports have not been modified" do + flexmock(proxy).should_receive(:transaction_modifies_static_ports?) + .once.and_return(false) + syskit_engine.reconfigure_tasks_on_static_port_modification( + reused = [proxy], new = [] + ) + assert_equal [proxy], reused + assert_equal [], new tasks = work_plan.find_local_tasks(Syskit::TaskContext) .with_arguments(orocos_name: task.orocos_name).to_a - assert_equal work_plan.wrap([task]), tasks + assert_equal [proxy], tasks end it "does not reconfigure not-setup tasks" do - task = syskit_stub_and_deploy("Task") { input_port("in", "/double").static } - syskit_engine.reconfigure_tasks_on_static_port_modification([task]) + syskit_engine.reconfigure_tasks_on_static_port_modification( + reused = [proxy], new = [] + ) + assert_equal [proxy], reused + assert_equal [], new tasks = work_plan.find_local_tasks(Syskit::TaskContext) .with_arguments(orocos_name: task.orocos_name).to_a - assert_equal work_plan.wrap([task]), tasks - end - - describe "when child of a composition" do - it "ensures that the existing deployment will be garbage collected" do - task_m = Syskit::TaskContext.new_submodel - cmp_m = Syskit::Composition.new_submodel - cmp_m.add task_m, as: "test" - - syskit_stub_configured_deployment(task_m) - cmp = syskit_deploy(cmp_m) - original_task = cmp.test_child - flexmock(task_m).new_instances.should_receive(:can_be_deployed_by?) - .with(->(proxy) { proxy.__getobj__ == cmp.test_child }).and_return(false) - new_cmp = syskit_deploy(cmp_m) - - # Should have instanciated a new composition since the children - # differ - refute_equal new_cmp, cmp - # Should have of course created a new task - refute_equal new_cmp.test_child, cmp.test_child - # And the old tasks should be ready to garbage-collect - assert_equal [cmp, original_task].to_set, - execute { plan.static_garbage_collect.to_set } - end - end - - describe "when child of a task" do - it "ensures that the existing deployment will be garbage collected" do - child_m = Syskit::TaskContext.new_submodel - parent_m = Syskit::TaskContext.new_submodel - parent_m.singleton_class.class_eval do - define_method(:instanciate) do |*args, **kw| - task = super(*args, **kw) - task.depends_on(child_m.instanciate(*args, **kw), - role: "test") - task - end - end - - syskit_stub_configured_deployment(child_m) - parent_m = syskit_stub_requirements(parent_m) - parent = syskit_deploy(parent_m) - child = parent.test_child - - flexmock(child_m).new_instances.should_receive(:can_be_deployed_by?) - .with(->(proxy) { proxy.__getobj__ == child }).and_return(false) - new_parent = syskit_deploy(parent_m) - new_child = new_parent.test_child - - assert_equal new_parent, parent - refute_equal new_child, child - # And the old tasks should be ready to garbage-collect - assert_equal [child].to_set, - execute { plan.static_garbage_collect.to_set } - end + assert_equal [proxy], tasks end end @@ -343,6 +306,133 @@ def work_plan end end + describe "when scheduling tasks for reconfiguration" do + it "ensures that the old task is garbage collected "\ + "when child of a composition" do + task_m = Syskit::TaskContext.new_submodel + cmp_m = Syskit::Composition.new_submodel + cmp_m.add task_m, as: "test" + + syskit_stub_configured_deployment(task_m) + cmp = syskit_deploy(cmp_m) + original_task = cmp.test_child + flexmock(task_m).new_instances.should_receive(:can_be_deployed_by?) + .with(->(proxy) { proxy.__getobj__ == cmp.test_child }).and_return(false) + new_cmp = syskit_deploy(cmp_m) + + # Should have instanciated a new composition since the children + # differ + refute_equal new_cmp, cmp + # Should have of course created a new task + refute_equal new_cmp.test_child, cmp.test_child + # And the old tasks should be ready to garbage-collect + assert_equal [cmp, original_task].to_set, + execute { plan.static_garbage_collect.to_set } + end + + it "ensures that the old task gets garbage collected when child "\ + "of another still useful task" do + child_m = Syskit::TaskContext.new_submodel + parent_m = Syskit::TaskContext.new_submodel + parent_m.singleton_class.class_eval do + define_method(:instanciate) do |*args, **kw| + task = super(*args, **kw) + task.depends_on(child_m.instanciate(*args, **kw), + role: "test") + task + end + end + + syskit_stub_configured_deployment(child_m) + parent_m = syskit_stub_requirements(parent_m) + parent = syskit_deploy(parent_m) + child = parent.test_child + + flexmock(child_m) + .new_instances.should_receive(:can_be_deployed_by?) + .with(->(proxy) { proxy.__getobj__ == child }).and_return(false) + new_parent = syskit_deploy(parent_m) + new_child = new_parent.test_child + + assert_equal new_parent, parent + refute_equal new_child, child + # And the old tasks should be ready to garbage-collect + assert_equal [child].to_set, + execute { plan.static_garbage_collect.to_set } + end + + it "ensures that the old task gets garbage collected when child "\ + "of a composition, itself child of a useful task" do + child_m = Syskit::TaskContext.new_submodel + cmp_m = Syskit::Composition.new_submodel + cmp_m.add child_m, as: "task" + parent_m = Syskit::TaskContext.new_submodel + parent_m.singleton_class.class_eval do + define_method(:instanciate) do |*args, **kw| + task = super(*args, **kw) + task.depends_on(cmp_m.instanciate(*args, **kw), + role: "test") + task + end + end + + syskit_stub_configured_deployment(child_m) + parent_m = syskit_stub_requirements(parent_m) + parent = syskit_deploy(parent_m) + child = parent.test_child + child_task = child.task_child + + flexmock(child_m) + .new_instances.should_receive(:can_be_deployed_by?) + .with(->(proxy) { proxy.__getobj__ == child_task }) + .and_return(false) + new_parent = syskit_deploy(parent_m) + new_child = new_parent.test_child + new_child_task = new_child.task_child + + assert_equal new_parent, parent + refute_equal new_child, child + refute_equal new_child_task, child_task + # And the old tasks should be ready to garbage-collect + assert_equal [child, child_task].to_set, + execute { plan.static_garbage_collect.to_set } + end + + it "detects if the scheduling code fails to 'liberate' the old task" do + flexmock(Engine) + .new_instances.should_receive(:sever_old_plan_from_new_plan) + + child_m = Syskit::TaskContext.new_submodel + parent_m = Syskit::TaskContext.new_submodel + parent_m.singleton_class.class_eval do + define_method(:instanciate) do |*args, **kw| + task = super(*args, **kw) + task.depends_on(child_m.instanciate(*args, **kw), + role: "test") + task + end + end + + syskit_stub_configured_deployment(child_m) + parent_m = syskit_stub_requirements(parent_m) + parent = syskit_deploy(parent_m) + child = parent.test_child + + flexmock(child_m) + .new_instances.should_receive(:can_be_deployed_by?) + .with(->(proxy) { proxy.__getobj__ == child }).and_return(false) + e = assert_raises( + Roby::Test::ExecutionExpectations::UnexpectedErrors + ) do + syskit_deploy(parent_m) + end + e = e.each_execution_exception.first + assert_equal child, e.origin + assert_kind_of Engine::InternalErrorReconfiguredTaskIsHeld, + e.exception + end + end + describe "#find_current_deployed_task" do it "ignores garbage tasks that have not been finalized yet" do component_m = Syskit::Component.new_submodel @@ -414,7 +504,7 @@ def work_plan required_deployment, (required0, task2) = add_deployment_and_tasks(work_plan, deployment_m, %w[task0 task2]) - selected_deployments, selected_deployed_tasks = + selected_deployments, reused_tasks, new_tasks = syskit_engine.finalize_deployed_tasks expected_deployment = work_plan[existing_deployment] @@ -427,8 +517,8 @@ def work_plan assert_equal [work_plan[task0], work_plan[task1], task2].to_set, expected_deployment.each_executed_task.to_set - assert_equal [work_plan[task0], task2].to_set, - selected_deployed_tasks.to_set + assert_equal [work_plan[task0]], reused_tasks.to_a + assert_equal [task2], new_tasks.to_a end it "maintains the dependencies" do @@ -522,15 +612,22 @@ def add_deployment_and_tasks(plan, deployment_m, task_names) deployed = syskit_deploy(composition_model) # This deregisters the task from the list of requirements in the # syskit engine - execute { plan.remove_task(deployed.planning_task) } + execute do + plan.remove_task(deployed.planning_task) + plan.unmark_mission_task(deployed) + end new_deployed = syskit_deploy( - composition_model.use("child" => task_model.with_conf("non_default")) + composition_model.use( + "child" => task_model.with_conf("non_default") + ) ) assert_equal(["non_default"], new_deployed.child_child.conf) - assert_equal [deployed.child_child.stop_event], - new_deployed.child_child.start_event.parent_objects(Roby::EventStructure::SyskitConfigurationPrecedence).to_a + assert_configures_after( + deployed.child_child.stop_event, + new_deployed.child_child.start_event + ) end it "reconfigures a toplevel task if its configuration changed" do @@ -543,9 +640,9 @@ def add_deployment_and_tasks(plan, deployment_m, task_names) deployed_reconf = syskit_deploy(task_model.with_conf("non_default")) plan.add_mission_task(deployed_reconf) - assert_equal [deployed_task.stop_event], - deployed_reconf.start_event.parent_objects(Roby::EventStructure::SyskitConfigurationPrecedence).to_a - plan.useful_tasks + assert_configures_after( + deployed_task.stop_event, deployed_reconf.start_event + ) assert_equal([planning_task, deployed_task].to_set, execute { plan.static_garbage_collect.to_set }) assert(["non_default"], deployed_reconf.conf) @@ -566,8 +663,7 @@ def add_deployment_and_tasks(plan, deployment_m, task_names) new_cmp, = syskit_deploy(composition_model.use("child" => task_model)) new_child = new_cmp.child_child - assert_equal [child.stop_event], - new_child.start_event.parent_objects(Roby::EventStructure::SyskitConfigurationPrecedence).to_a + assert_configures_after child.stop_event, new_child.start_event end it "does not change anything if asked to deploy the same composition twice" do @@ -850,6 +946,13 @@ def deploy_dev_and_bus end end end + + def assert_configures_after(expected_event, event) + should_configure_after = event.parent_objects( + Roby::EventStructure::SyskitConfigurationPrecedence + ).to_a + assert_equal [expected_event], should_configure_after + end end class EngineTestStubDeployment < Roby::Task