Skip to content

Commit

Permalink
Added auto-submitting of partial Task results right before Unit expir…
Browse files Browse the repository at this point in the history
…ation time
  • Loading branch information
meta-paul committed Nov 25, 2024
1 parent 4902b6d commit cbc5d96
Show file tree
Hide file tree
Showing 23 changed files with 324 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,7 @@ mephisto:
task_tags: "test,simple,form,form-composer,interactive-image-prompts"
force_rebuild: true
max_num_concurrent_units: 1
auto_submit_before_expiration_sec: 10
assignment_duration_in_seconds: 120
aux_parameters:
max_answer_loops: 3
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ function MainApp() {
remoteProcedure,
handleSubmit,
handleFatalError,
setTaskSubmitData,
} = useMephistoRemoteProcedureTask();

if (isLoading || !initialTaskData) {
Expand All @@ -40,6 +41,7 @@ function MainApp() {
onSubmit={handleSubmit}
onError={handleFatalError}
remoteProcedure={remoteProcedure}
setTaskSubmitData={setTaskSubmitData}
/>
</ErrorBoundary>
</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ function FormComposerBaseFrontend({
onError,
finalResults = null,
remoteProcedure,
setTaskSubmitData,
}) {
const [loadingFormData, setLoadingFormData] = React.useState(false);
const [formData, setFormData] = React.useState(null);
Expand Down Expand Up @@ -95,6 +96,7 @@ function FormComposerBaseFrontend({
<FormComposer
data={formData}
onSubmit={onSubmit}
setTaskSubmitData={setTaskSubmitData}
finalResults={finalResults}
setRenderingErrors={setFormComposerRenderingErrors}
remoteProcedureCollection={remoteProcedure}
Expand Down
24 changes: 8 additions & 16 deletions examples/static_react_task/webapp/src/app.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
* LICENSE file in the root directory of this source tree.
*/

import { ErrorBoundary, useMephistoTask } from "mephisto-core";
import React from "react";
import ReactDOM from "react-dom";
import {
BaseFrontend,
OnboardingComponent,
Instructions,
LoadingScreen,
OnboardingComponent,
StaticReactTaskFrontend,
} from "./components/core_components.jsx";
import { useMephistoTask, ErrorBoundary } from "mephisto-core";

/* ================= Application Components ================= */

Expand All @@ -38,18 +39,7 @@ function MainApp() {
}

if (isPreview) {
return (
<div className="card bg-primary mb-4">
<div className="card-body pt-xl-5 pb-xl-5">
<h2 className="text-white">
This is an incredibly simple React task working with Mephisto!
</h2>
<h5 className="text-white">
Inside you'll be asked to rate a given sentence as good or bad.
</h5>
</div>
</div>
);
return <Instructions />;
}
if (isLoading || !initialTaskData) {
return <LoadingScreen />;
Expand All @@ -61,7 +51,9 @@ function MainApp() {
return (
<div>
<ErrorBoundary handleError={handleFatalError}>
<BaseFrontend
<Instructions />

<StaticReactTaskFrontend
taskData={initialTaskData}
onSubmit={handleSubmit}
isOnboarding={isOnboarding}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,21 @@ function OnboardingComponent({ onSubmit }) {
);
}

function Instructions() {
return (
<div className="card bg-primary mb-4">
<div className="card-body pt-xl-5 pb-xl-5">
<h2 className="text-white">
This is an incredibly simple React task working with Mephisto!
</h2>
<h5 className="text-white">
Inside you'll be asked to rate a given sentence as good or bad.
</h5>
</div>
</div>
);
}

function LoadingScreen() {
return <Directions>Loading...</Directions>;
}
Expand All @@ -50,7 +65,12 @@ function Directions({ children }) {
);
}

function SimpleFrontend({ taskData, isOnboarding, onSubmit, onError }) {
function StaticReactTaskFrontend({
taskData,
isOnboarding,
onSubmit,
onError,
}) {
const [resonseSubmitted, setResonseSubmitted] = useState(false);

return (
Expand Down Expand Up @@ -100,4 +120,9 @@ function SimpleFrontend({ taskData, isOnboarding, onSubmit, onError }) {
);
}

export { LoadingScreen, SimpleFrontend as BaseFrontend, OnboardingComponent };
export {
Instructions,
LoadingScreen,
OnboardingComponent,
StaticReactTaskFrontend,
};
31 changes: 25 additions & 6 deletions mephisto/abstractions/_subcomponents/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,9 @@ def _launch_and_run_onboarding(
).track_inprogress(), EXECUTION_DURATION_SECONDS.labels(thread_type="onboarding").time():
live_run = onboarding_agent.get_live_run()
onboarding_id = onboarding_agent.get_agent_id()

logger.debug(f"Launching onboarding for {onboarding_agent}")

try:
self.run_onboarding(onboarding_agent)
except (
Expand All @@ -185,13 +187,16 @@ def _launch_and_run_onboarding(
if onboarding_agent.get_status() not in AgentState.complete():
# Absent agents at this stage should be disconnected
onboarding_agent.update_status(AgentState.STATUS_DISCONNECT)

self.cleanup_onboarding(onboarding_agent)
except Exception as e:
logger.exception(
f"Unhandled exception in onboarding {onboarding_agent}",
exc_info=True,
)

self.cleanup_onboarding(onboarding_agent)

del self.running_onboardings[onboarding_id]

# Onboarding now complete
Expand All @@ -207,6 +212,7 @@ async def register_then_cleanup():
f"Onboarding agent {onboarding_id} disconnected or errored, "
f"final status {onboarding_agent.get_status()}."
)

live_run.loop_wrap.execute_coro(cleanup_after())

def execute_unit(
Expand All @@ -218,6 +224,7 @@ def execute_unit(
if unit.db_id in self.running_units:
logger.debug(f"{unit} is already running")
return

unit_thread = threading.Thread(
target=self._launch_and_run_unit,
args=(unit, agent),
Expand Down Expand Up @@ -249,8 +256,11 @@ def _cleanup_special_units(self, unit: "Unit", agent: "Agent") -> None:
if agent.get_status() != AgentState.STATUS_COMPLETED:
if unit.unit_index == SCREENING_UNIT_INDEX:
blueprint = self.task_run.get_blueprint(args=self.args)

assert isinstance(blueprint, ScreenTaskRequired)

blueprint.screening_units_launched -= 1

unit.expire()

def _launch_and_run_unit(
Expand All @@ -272,14 +282,17 @@ def _launch_and_run_unit(
) as e:
# A returned Unit can be worked on again by someone else.
logger.exception(f"Handled exception in unit {unit}")

if unit.get_status() != AssignmentState.EXPIRED:
unit_agent = unit.get_assigned_agent()
if unit_agent is not None and unit_agent.db_id == agent.db_id:
logger.debug(f"Clearing {agent} from {unit} due to {e}")
unit.clear_assigned_agent()

if agent.get_status() not in AgentState.complete():
# Absent agents at this stage should be disconnected
agent.update_status(AgentState.STATUS_DISCONNECT)

self.cleanup_unit(unit)
except Exception as e:
logger.exception(f"Unhandled exception in unit {unit}", exc_info=True)
Expand All @@ -291,6 +304,7 @@ def _launch_and_run_unit(
if not agent.await_submit(timeout=None):
# Wait for a submit to occur
agent.await_submit(timeout=self.args.task.submission_timeout)

agent.update_status(AgentState.STATUS_COMPLETED)
agent.mark_done()

Expand All @@ -316,6 +330,7 @@ def execute_assignment(
if assignment.db_id in self.running_assignments:
logger.debug(f"Assignment {assignment} is already running")
return

assign_thread = threading.Thread(
target=self._launch_and_run_assignment,
args=(assignment, agents),
Expand All @@ -329,6 +344,7 @@ def execute_assignment(
agents=agents,
thread=assign_thread,
)

assign_thread.start()
return

Expand Down Expand Up @@ -359,6 +375,7 @@ def _launch_and_run_assignment(
# Must expire the disconnected unit so that
# new workers aren't shown it
agent.get_unit().expire()

if agent.get_status() not in AgentState.complete():
agent.update_status(AgentState.STATUS_DISCONNECT)
self.cleanup_assignment(assignment)
Expand All @@ -375,6 +392,7 @@ def _launch_and_run_assignment(
if not agent.await_submit(timeout=None):
# Wait for a submit to occur
agent.await_submit(timeout=self.args.task.submission_timeout)

agent.update_status(AgentState.STATUS_COMPLETED)
agent.mark_done()

Expand All @@ -386,6 +404,7 @@ def _launch_and_run_assignment(
f"Unhandled exception in on_unit_submitted for {unit}",
exc_info=True,
)

del self.running_assignments[assignment.db_id]

# Clear reservations
Expand All @@ -398,16 +417,12 @@ def _launch_and_run_assignment(

@staticmethod
def get_data_for_assignment(assignment: "Assignment") -> "InitializationData":
"""
Finds the right data to get for the given assignment.
"""
"""Finds the right data to get for the given assignment."""
return assignment.get_assignment_data()

@abstractmethod
def get_init_data_for_agent(self, agent: "Agent"):
"""
Return the data that an agent will need for their task.
"""
"""Return the data that an agent will need for their task."""
raise NotImplementedError()

def filter_units_for_worker(self, units: List["Unit"], worker: "Worker"):
Expand All @@ -433,17 +448,21 @@ def shutdown(self):
# Shut down the agents
for running_unit in running_units:
running_unit.agent.shutdown()

for running_assignment in running_assignments:
for agent in running_assignment.agents:
agent.shutdown()

for running_onboarding in running_onboardings:
running_onboarding.onboarding_agent.shutdown()

# Join the threads
for running_unit in running_units:
running_unit.thread.join()

for running_assignment in running_assignments:
running_assignment.thread.join()

for running_onboarding in running_onboardings:
running_onboarding.thread.join()

Expand Down
Loading

0 comments on commit cbc5d96

Please sign in to comment.