From aa76dc06ba95f7bc9674ffd0701cdc1007fe8c0a Mon Sep 17 00:00:00 2001 From: Paul Abumov Date: Tue, 19 Nov 2024 12:20:07 -0500 Subject: [PATCH] Added doc pages for In-House and Mock providers --- .../workflows/cypress-end-to-end-tests.yml | 12 +- .../guides/how_to_use/providers/inhouse.md | 43 +++++++ .../docs/guides/how_to_use/providers/mock.md | 14 +++ .../common_qualification_flows.md | 25 +++- .../worker_quality/other_methods.md | 12 +- examples/form_composer_demo/README.md | 6 +- .../conf/dynamic_example__local__inhouse.yaml | 3 + .../conf/example__local__inhouse.yaml | 1 + ...xample_with_gold_unit__local__inhouse.yaml | 3 + ...ample_with_onboarding__local__inhouse.yaml | 3 + ...xample_with_screening__local__inhouse.yaml | 3 + .../hydra_configs/conf/_base.yaml | 1 + .../conf/custom_prebuilt__local__inhouse.yaml | 3 + .../conf/custom_simple__local__inhouse.yaml | 3 + .../conf/example__local__inhouse.yaml | 1 + .../onboarding_example__local__inhouse.yaml | 3 + .../conf/example__local__inhouse.yaml | 3 + ...xample_with_screening__local__inhouse.yaml | 3 + .../conf/example__local__inhouse.yaml | 3 + ...ample_with_onboarding__local__inhouse.yaml | 3 + .../static_react_task/data/authorization.csv | 6 + .../conf/example__local__inhouse.yaml | 3 + ...le_with_authorization__local__inhouse.yaml | 31 +++++ ...ample_with_onboarding__local__inhouse.yaml | 3 + ...xample_with_screening__local__inhouse.yaml | 3 + ...task_with_authorization__local__inhouse.py | 41 +++++++ .../conf/example__local__inhouse.yaml | 3 + ...ample_with_onboarding__local__inhouse.yaml | 3 + .../conf/dynamic_example__local__inhouse.yaml | 3 + .../conf/example__local__inhouse.yaml | 1 + .../abstract/static_task/static_blueprint.py | 106 ++++++++++------- .../remote_procedure_blueprint.py | 4 +- mephisto/abstractions/database.py | 9 +- .../abstractions/providers/inhouse/README.md | 2 +- .../providers/inhouse/inhouse_agent.py | 4 +- .../inhouse/inhouse_datastore_export.py | 2 +- .../providers/inhouse/inhouse_provider.py | 27 ++++- .../providers/inhouse/inhouse_requester.py | 6 +- .../providers/inhouse/inhouse_unit.py | 5 +- .../providers/inhouse/inhouse_worker.py | 30 ++++- .../providers/inhouse/migrations/__init__.py | 2 +- .../providers/inhouse/wrap_crowd_source.js | 8 +- .../providers/prolific/prolific_provider.py | 23 +++- .../providers/prolific/prolific_worker.py | 30 +++-- mephisto/data_model/task.py | 51 ++++---- mephisto/data_model/task_run.py | 3 + mephisto/data_model/worker.py | 16 ++- mephisto/operations/client_io_handler.py | 6 +- mephisto/operations/datatypes.py | 33 ++++-- mephisto/operations/worker_pool.py | 112 ++++++++++++++---- mephisto/utils/qualifications.py | 98 ++++++++++----- 51 files changed, 611 insertions(+), 211 deletions(-) create mode 100644 docs/web/docs/guides/how_to_use/providers/inhouse.md create mode 100644 docs/web/docs/guides/how_to_use/providers/mock.md create mode 100644 examples/static_react_task/data/authorization.csv create mode 100644 examples/static_react_task/hydra_configs/conf/example_with_authorization__local__inhouse.yaml create mode 100644 examples/static_react_task/run_task_with_authorization__local__inhouse.py diff --git a/.github/workflows/cypress-end-to-end-tests.yml b/.github/workflows/cypress-end-to-end-tests.yml index 0a162d963..382182a2e 100644 --- a/.github/workflows/cypress-end-to-end-tests.yml +++ b/.github/workflows/cypress-end-to-end-tests.yml @@ -209,7 +209,7 @@ jobs: - name: πŸ“‚ Set the data directory run: mephisto config core.main_data_directory ~/mephisto/data - - name: 🚚 Create Inhouse provider + - name: 🚚 Create In-House provider run: mephisto register inhouse - name: πŸ“¦ Setting up mephisto-core package @@ -259,7 +259,7 @@ jobs: - name: πŸ“‚ Set the data directory run: mephisto config core.main_data_directory ~/mephisto/data - - name: 🚚 Create Inhouse provider + - name: 🚚 Create In-House provider run: mephisto register inhouse - name: πŸ“¦ Setting up mephisto-core package @@ -309,7 +309,7 @@ jobs: - name: πŸ“‚ Set the data directory run: mephisto config core.main_data_directory ~/mephisto/data - - name: 🚚 Create Inhouse provider + - name: 🚚 Create In-House provider run: mephisto register inhouse - name: πŸ“¦ Setting up mephisto-core package @@ -414,7 +414,7 @@ jobs: - name: πŸ“‚ Set the data directory run: mephisto config core.main_data_directory ~/mephisto/data - - name: 🚚 Create Inhouse provider + - name: 🚚 Create In-House provider run: mephisto register inhouse - name: πŸ“¦ Setting up mephisto-core package @@ -465,7 +465,7 @@ jobs: - name: πŸ“‚ Set the data directory run: mephisto config core.main_data_directory ~/mephisto/data - - name: 🚚 Create Inhouse provider + - name: 🚚 Create In-House provider run: mephisto register inhouse - name: πŸ“¦ Setting up mephisto-core package @@ -516,7 +516,7 @@ jobs: - name: πŸ“‚ Set the data directory run: mephisto config core.main_data_directory ~/mephisto/data - - name: 🚚 Create Inhouse provider + - name: 🚚 Create In-House provider run: mephisto register inhouse - name: πŸ“¦ Setting up mephisto-core package diff --git a/docs/web/docs/guides/how_to_use/providers/inhouse.md b/docs/web/docs/guides/how_to_use/providers/inhouse.md new file mode 100644 index 000000000..820640732 --- /dev/null +++ b/docs/web/docs/guides/how_to_use/providers/inhouse.md @@ -0,0 +1,43 @@ +--- + +# Copyright (c) Meta Platforms and its affiliates. +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +sidebar_position: 3 +--- + +# In-House + +In-House CrowdProvider allows running live Tasks with no third-party worker platform integration. + + +## Simple In-House authorization + +> Note that this feature is enabled only for In-House provider. + +To prevent unauthorized access to your Task, you can enable a simple authorization. +Providing a one-column CSV file with allowed Worker usernames will limit your target audience only to these specific workers. + + +### Enable CSV authorization + +1. Create a CSV file with one column containing allowed Worker usernames + 1. Worker will need to use this username on Task's Welcome page + 2. Alternatively, you can include username in a Task URL as a parameter (that you will send to Worker to invite them to the Task) +2. Place this file in the data directory of your Task (e.g. `examples/static_react_task/data/authorization.csv`) +3. In your Task config, set `provider.authorization_csv` parameter to this file path +```yaml +mephisto: + ... + + provider: + authorization_csv: ${task_dir}/data/authorization.csv + ... +``` +4. Run your Task and try authorizing yourself with one of the allowed usernames from the CSV file. + + +## Example + +To understand how it works, you can run an example Task from our [In-House authorization example](https://github.com/facebookresearch/Mephisto/blob/main/examples/static_react_task/run_task_with_authorization__local__inhouse.py). diff --git a/docs/web/docs/guides/how_to_use/providers/mock.md b/docs/web/docs/guides/how_to_use/providers/mock.md new file mode 100644 index 000000000..b8df79087 --- /dev/null +++ b/docs/web/docs/guides/how_to_use/providers/mock.md @@ -0,0 +1,14 @@ +--- + +# Copyright (c) Meta Platforms and its affiliates. +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +sidebar_position: 4 +--- + +# Mock + +Mock provider is used solely for running tests. +It has simplified implementation, and cannot be used with any workers. +If you wish to run a live Task without thrid party provides, consider [In-House provider](/docs/guides/how_to_use/providers/inhouse/) instead. diff --git a/docs/web/docs/guides/how_to_use/worker_quality/common_qualification_flows.md b/docs/web/docs/guides/how_to_use/worker_quality/common_qualification_flows.md index 66c06408b..d36f8c4a1 100644 --- a/docs/web/docs/guides/how_to_use/worker_quality/common_qualification_flows.md +++ b/docs/web/docs/guides/how_to_use/worker_quality/common_qualification_flows.md @@ -10,13 +10,15 @@ sidebar_position: 1 # Using qualifications to improve worker quality Qualification control is a powerful component of Mephisto, allowing you to filter out workers with both manual and automatic controls. Within this are typical allowlists and blocklists, setting up value-based qualifications, making automatic qualifications for onboarding, and also utilizing the qualifications that various crowdsourcing providers have to offer. This document seeks to describe some common use cases for qualifications, and how we currently go about using them. + ### Blocking qualifications When you set a `block_qualification` during a launch, calling `Worker.grant_qualification()` will prevent that worker from working on any tasks that you have set the same `block_qualification` for. You can use this to set up blocklists for specific tasks, or for groups of tasks. + ### Onboarding qualifications Mephisto has an automatic setup for assigning workers qualifications for particular tasks that they've worked on, such that it's possible to specify a qualification that a worker can be granted on the first time they take out a particular task. This qualification is given the name `onboarding_qualification`, and is compatible with any blueprints that have onboarding tasks. -When a worker accepts your task for the first time, they will have neither the passing or failing version of the onboarding qualification, and will be put into a test version of the task that determines if they are qualified. Then only those that qualify the first time will be able to continue working on that task. +When a worker accepts your task for the first time, they will have neither the passing or failing version of the onboarding qualification, and will be put into a trial version of the task that determines if they are qualified. Then only those that qualify the first time will be able to continue working on that task. The `onboarding_qualification` is shared between all task runs that use the same qualification name, and as such you can ensure that a worker need not repeatedly qualify for the same or similar tasks by sharing the same lists. @@ -29,7 +31,7 @@ from mephisto.utils.qualifications import make_qualification_dict ONBOARDING_QUALIFICATION_NAME = "TEST_ONBOARDING_QUAL_NAME" -# Making a qualification that requires a worker has +# Making a qualification that requires a worker has # passed an onboarding from a different task shared_state.qualifications = [ make_qualification_dict( @@ -39,7 +41,7 @@ shared_state.qualifications = [ ) ] -# Making a qualification that requires that a worker +# Making a qualification that requires that a worker # has not failed a particular onboarding from a different task shared_state.qualifications = [ make_qualification_dict( @@ -82,6 +84,19 @@ shared_state.qualifications = [ ] ``` +### Admitting Workers with no prior qualification + +Let's say your Task requires certain qualifications and you wish to expand your pool of Workers, but you do not wish to use Onboarding Qualifications. In this case you can allow Task access to all workers that lack any failing qualification by using `admit_workers_with_no_prior_qualification` Task parameter: + +```yaml +mephisto: + ... + + provider: + admit_workers_with_no_prior_qualification: true + ... +``` + ### Adding custom qualifications to SharedTaskState You should be able to specify a qualification in Mephisto using the following: ```python @@ -103,11 +118,13 @@ where `QUAL_COMPARATOR` is any of the comparators available [here](https://githu You can directly grant that qualification to mephisto `Worker`'s using `Worker.grant_qualification("QUALIFICATION_NAME", qualification_value)`. + ### What if I want to block a worker that hasn't connected before? -For this you'll want to use the interface that a `CrowdProvider` has set up to do the granting process directly. An example for this can be found in `abstractions.providers.mturk.utils.script_utils`. +For this you'll want to use the interface that a `CrowdProvider` has set up to do the granting process directly. An example for this can be found in `abstractions.providers.mturk.utils.script_utils`. Note, while you're able to grant these qualifications to a worker that isn't tracked by Mephisto, it will not be possible for Mephisto to help in bookkeeping qualifications granted to workers in this manner. + ### What if I want to use qualifications only set by a provider? For the special case of provider-specific qualifications, `SharedTaskState` has fields for `_specific_qualifications` wherein you can put qualifications in the expected format for that crowd provider. For instance, you can do the following for using an [MTurk-specific qualification](https://docs.aws.amazon.com/AWSMechTurk/latest/AWSMturkAPI/ApiReference_QualificationRequirementDataStructureArticle.html#ApiReference_QualificationType-IDs) on a task: ```python diff --git a/docs/web/docs/guides/how_to_use/worker_quality/other_methods.md b/docs/web/docs/guides/how_to_use/worker_quality/other_methods.md index acce0dc87..f8116a7cf 100644 --- a/docs/web/docs/guides/how_to_use/worker_quality/other_methods.md +++ b/docs/web/docs/guides/how_to_use/worker_quality/other_methods.md @@ -7,30 +7,26 @@ sidebar_position: 6 --- -# Other methods for quality control +# Future features While not yet implemented in Mephisto's core codebase, there are a few additional methods of quality control that may be successful. This doc lists a few that we've considered for Mephisto thusfar. ## Worker Agreement -A fairly common method for ensuring that data is of high quality is to check for inter-annotator agreement. Putting the same work out for different annotators to complete is currently supported by mephisto using the `mephisto.blueprint.units_per_assignment` argument on static and remote query tasks. This ensures that the specified number of different workers will complete each task. +A fairly common method for ensuring that data is of high quality is to check for inter-annotator agreement. Putting the same work out for different annotators to complete is currently supported by mephisto using the `mephisto.blueprint.units_per_assignment` argument on static and remote query tasks. This ensures that the specified number of different workers will complete each Assignment from the Task. Once you have multiple completions, you can write your own review script to parse the results for all `Assignment` of your `TaskRun` to see if the `Unit`s within each `Assignment` have similar enough submitted data. Partial worker agreement may be a more efficient method of determining whether a worker is performing to your expectations, wherein you sample the tasks from a given worker and relaunch for others to complete and validate. -## Review tasks as tasks +## Tasks for reviewing other Tasks An extension of the above, it may be preferable to create tasks to review the data of other submitted workers. You can then use the results to simplify the time taken reviewing over all samples to just reviewing the borderline cases from your metareviewers. -A review project like this almost certainly would require creating a specific allowlist of workers who are qualified to review the work of others, generally some of your higher performing workers on other tasks or during pilots. +A review project like this almost certainly would require creating a specific allowlist of workers who are qualified to review the work of others, generally some of your higher performing workers on other tasks or during pilots. There's certainly a lot of lift to implement this type of workflow, so we're looking to support this type of functionality within Mephisto in our 1.1 release. -## Multi-tier worker qualification - -Some have found it effective to keep local ratings on worker quality such that allowlist and blocklist can be created on the fly for specific tasks. You can certainly extend any review script you use to allow categorizing workers, and then may find that your higher-tiered workers are more appropriate for sensitive tasks, or those that require a quality comparison. - ## Contributing While all of the above methods these aren't yet codified, they should all be able to hook into Mephisto primatives in some form or other. We'd be excited to review contributions for any of the above. diff --git a/examples/form_composer_demo/README.md b/examples/form_composer_demo/README.md index 2a057d2f8..205d9ca7c 100644 --- a/examples/form_composer_demo/README.md +++ b/examples/form_composer_demo/README.md @@ -8,8 +8,8 @@ These form-based questionnaires are example of FormComposer task generator. 2. SSH into running container to run server: `docker exec -it mephisto_dc bash` 3. Inside the container, go to FormComposer examples directory: `cd /mephisto/examples/form_composer_demo` 4. Inside the `examples` directory, run a desired example with one of these commands: - - Simple form with Inhouse provider: `python ./run_task__local__inhouse.py` - - Dynamic form with Inhouse provider: `python ./run_task_dynamic__local__inhouse.py` + - Simple form with In-House provider: `python ./run_task__local__inhouse.py` + - Dynamic form with In-House provider: `python ./run_task_dynamic__local__inhouse.py` - Dynamic form with Mturk on EC2: `python ./run_task_dynamic__ec2__mturk_sandbox.py` - Dynamic form with Prolific on EC2: `python ./run_task_dynamic__ec2__prolific.py` - Dynamic form with Presigned URLs: `python ./run_task_dynamic_presigned_urls__ec2__prolific.py` @@ -56,6 +56,6 @@ mephisto form_composer config --directory /mephisto/examples/form_composer_demo/ mephisto form_composer config --directory /mephisto/examples/form_composer_demo/data/dynamic_presigned_urls/ --extrapolate-token-sets # 2b. Run the Task -cd /mephisto/examples/form_composer_demo +cd /mephisto/examples/form_composer_demo python ./run_task_dynamic_presigned_urls__ec2__prolific.py ``` diff --git a/examples/form_composer_demo/hydra_configs/conf/dynamic_example__local__inhouse.yaml b/examples/form_composer_demo/hydra_configs/conf/dynamic_example__local__inhouse.yaml index 3994d3506..4f43e431f 100644 --- a/examples/form_composer_demo/hydra_configs/conf/dynamic_example__local__inhouse.yaml +++ b/examples/form_composer_demo/hydra_configs/conf/dynamic_example__local__inhouse.yaml @@ -17,6 +17,9 @@ mephisto: link_task_source: false extra_source_dir: ${task_dir}/webapp/src/static units_per_assignment: 2 + log_level: "debug" + provider: + ui_base_url: "http://localhost:3001" task: task_name: "Sample Questionnaire" task_title: "Example how to easily create dynamic form-based Tasks" diff --git a/examples/form_composer_demo/hydra_configs/conf/example__local__inhouse.yaml b/examples/form_composer_demo/hydra_configs/conf/example__local__inhouse.yaml index c9ece7fd7..42f31ee7f 100644 --- a/examples/form_composer_demo/hydra_configs/conf/example__local__inhouse.yaml +++ b/examples/form_composer_demo/hydra_configs/conf/example__local__inhouse.yaml @@ -17,6 +17,7 @@ mephisto: link_task_source: false extra_source_dir: ${task_dir}/webapp/src/static units_per_assignment: 2 + log_level: "debug" provider: ui_base_url: "http://localhost:3001" task: diff --git a/examples/form_composer_demo/hydra_configs/conf/example_with_gold_unit__local__inhouse.yaml b/examples/form_composer_demo/hydra_configs/conf/example_with_gold_unit__local__inhouse.yaml index 0dd48447b..7c7a9bc76 100644 --- a/examples/form_composer_demo/hydra_configs/conf/example_with_gold_unit__local__inhouse.yaml +++ b/examples/form_composer_demo/hydra_configs/conf/example_with_gold_unit__local__inhouse.yaml @@ -22,6 +22,9 @@ mephisto: min_golds: 1 # Required for Gold Units max_incorrect_golds: 1 # Required for Gold Units max_gold_units: 1 # Required for Gold Units + log_level: "debug" + provider: + ui_base_url: "http://localhost:3001" task: allowed_concurrent: 1 # Required for Gold Units task_name: "Sample Questionnaire" diff --git a/examples/form_composer_demo/hydra_configs/conf/example_with_onboarding__local__inhouse.yaml b/examples/form_composer_demo/hydra_configs/conf/example_with_onboarding__local__inhouse.yaml index ed0d88938..0b85bb699 100644 --- a/examples/form_composer_demo/hydra_configs/conf/example_with_onboarding__local__inhouse.yaml +++ b/examples/form_composer_demo/hydra_configs/conf/example_with_onboarding__local__inhouse.yaml @@ -18,6 +18,9 @@ mephisto: extra_source_dir: ${task_dir}/webapp/src/static units_per_assignment: 1 onboarding_qualification: onboarding-qualification # Required for Onboarding + log_level: "debug" + provider: + ui_base_url: "http://localhost:3001" task: task_name: "Sample Questionnaire" task_title: "Example how to easily create simple form-based Tasks with onboarding" diff --git a/examples/form_composer_demo/hydra_configs/conf/example_with_screening__local__inhouse.yaml b/examples/form_composer_demo/hydra_configs/conf/example_with_screening__local__inhouse.yaml index 4e636068e..3824ced58 100644 --- a/examples/form_composer_demo/hydra_configs/conf/example_with_screening__local__inhouse.yaml +++ b/examples/form_composer_demo/hydra_configs/conf/example_with_screening__local__inhouse.yaml @@ -21,6 +21,9 @@ mephisto: block_qualification: blocked-qualification # Required for Screening use_screening_task: true # Required for Screening max_screening_units: 1 # Required for Screening + log_level: "debug" + provider: + ui_base_url: "http://localhost:3001" task: allowed_concurrent: 1 # Required for Screening task_name: "Sample Questionnaire" diff --git a/examples/parlai_chat_task_demo/hydra_configs/conf/_base.yaml b/examples/parlai_chat_task_demo/hydra_configs/conf/_base.yaml index ae59af518..afc95fb63 100644 --- a/examples/parlai_chat_task_demo/hydra_configs/conf/_base.yaml +++ b/examples/parlai_chat_task_demo/hydra_configs/conf/_base.yaml @@ -8,6 +8,7 @@ defaults: - /mephisto/blueprint: parlai_chat - /mephisto/architect: local - /mephisto/provider: inhouse + mephisto: blueprint: world_file: ${task_dir}/demo_worlds.py diff --git a/examples/parlai_chat_task_demo/hydra_configs/conf/custom_prebuilt__local__inhouse.yaml b/examples/parlai_chat_task_demo/hydra_configs/conf/custom_prebuilt__local__inhouse.yaml index 7584b6035..a8d0b5c87 100644 --- a/examples/parlai_chat_task_demo/hydra_configs/conf/custom_prebuilt__local__inhouse.yaml +++ b/examples/parlai_chat_task_demo/hydra_configs/conf/custom_prebuilt__local__inhouse.yaml @@ -10,6 +10,9 @@ defaults: mephisto: blueprint: custom_source_bundle: ${task_dir}/webapp/build/bundle.js + log_level: "debug" + provider: + ui_base_url: "http://localhost:3001" task: task_name: parlai-chat-example task_title: "Test ParlAI Prebuilt Chat Task" diff --git a/examples/parlai_chat_task_demo/hydra_configs/conf/custom_simple__local__inhouse.yaml b/examples/parlai_chat_task_demo/hydra_configs/conf/custom_simple__local__inhouse.yaml index 3d2c45d7f..4dd5b8a8e 100644 --- a/examples/parlai_chat_task_demo/hydra_configs/conf/custom_simple__local__inhouse.yaml +++ b/examples/parlai_chat_task_demo/hydra_configs/conf/custom_simple__local__inhouse.yaml @@ -10,6 +10,9 @@ defaults: mephisto: blueprint: custom_source_dir: ${task_dir}/custom_simple + log_level: "debug" + provider: + ui_base_url: "http://localhost:3001" task: task_name: parlai-chat-example task_title: "Test ParlAI Simply Built Chat Task" diff --git a/examples/parlai_chat_task_demo/hydra_configs/conf/example__local__inhouse.yaml b/examples/parlai_chat_task_demo/hydra_configs/conf/example__local__inhouse.yaml index fc9750e7a..a6a2c21b3 100644 --- a/examples/parlai_chat_task_demo/hydra_configs/conf/example__local__inhouse.yaml +++ b/examples/parlai_chat_task_demo/hydra_configs/conf/example__local__inhouse.yaml @@ -12,6 +12,7 @@ mephisto: world_file: ${task_dir}/demo_worlds.py task_description_file: ${task_dir}/task_description.html num_conversations: 1 + log_level: "debug" provider: ui_base_url: "http://localhost:3001" task: diff --git a/examples/parlai_chat_task_demo/hydra_configs/conf/onboarding_example__local__inhouse.yaml b/examples/parlai_chat_task_demo/hydra_configs/conf/onboarding_example__local__inhouse.yaml index 80a45c6f3..d90710c2e 100644 --- a/examples/parlai_chat_task_demo/hydra_configs/conf/onboarding_example__local__inhouse.yaml +++ b/examples/parlai_chat_task_demo/hydra_configs/conf/onboarding_example__local__inhouse.yaml @@ -10,6 +10,9 @@ defaults: mephisto: blueprint: onboarding_qualification: test-parlai-chat-qualification + log_level: "debug" + provider: + ui_base_url: "http://localhost:3001" task: task_name: parlai-chat-example task_title: "Test ParlAI Chat Task" diff --git a/examples/remote_procedure/mnist/hydra_configs/conf/example__local__inhouse.yaml b/examples/remote_procedure/mnist/hydra_configs/conf/example__local__inhouse.yaml index 550a38574..47d7b6f67 100644 --- a/examples/remote_procedure/mnist/hydra_configs/conf/example__local__inhouse.yaml +++ b/examples/remote_procedure/mnist/hydra_configs/conf/example__local__inhouse.yaml @@ -16,6 +16,9 @@ mephisto: # NOTE pick something based on your task block_qualification: test_qual_block units_per_assignment: 1 + log_level: "debug" + provider: + ui_base_url: "http://localhost:3001" task: allowed_concurrent: 1 task_name: remote-procedure-mnist diff --git a/examples/remote_procedure/mnist/hydra_configs/conf/example_with_screening__local__inhouse.yaml b/examples/remote_procedure/mnist/hydra_configs/conf/example_with_screening__local__inhouse.yaml index d6d4cc1a7..7befb9bea 100644 --- a/examples/remote_procedure/mnist/hydra_configs/conf/example_with_screening__local__inhouse.yaml +++ b/examples/remote_procedure/mnist/hydra_configs/conf/example_with_screening__local__inhouse.yaml @@ -18,6 +18,9 @@ mephisto: block_qualification: "test-mnist-blocked-qualification" use_screening_task: true max_screening_units: 3 + log_level: "debug" + provider: + ui_base_url: "http://localhost:3001" task: task_name: remote-procedure-mnist task_title: "Provide feedback on our MNIST model" diff --git a/examples/simple_static_task/hydra_configs/conf/example__local__inhouse.yaml b/examples/simple_static_task/hydra_configs/conf/example__local__inhouse.yaml index e383830d8..1467d3bc5 100644 --- a/examples/simple_static_task/hydra_configs/conf/example__local__inhouse.yaml +++ b/examples/simple_static_task/hydra_configs/conf/example__local__inhouse.yaml @@ -16,6 +16,9 @@ mephisto: preview_source: ${task_dir}/server_files/demo_preview.html extra_source_dir: ${task_dir}/server_files/extra_refs units_per_assignment: 2 + log_level: "debug" + provider: + ui_base_url: "http://localhost:3001" task: task_name: html-static-task-example task_title: "Test static task" diff --git a/examples/simple_static_task/hydra_configs/conf/example_with_onboarding__local__inhouse.yaml b/examples/simple_static_task/hydra_configs/conf/example_with_onboarding__local__inhouse.yaml index 4fceb061e..d90145ed9 100644 --- a/examples/simple_static_task/hydra_configs/conf/example_with_onboarding__local__inhouse.yaml +++ b/examples/simple_static_task/hydra_configs/conf/example_with_onboarding__local__inhouse.yaml @@ -17,6 +17,9 @@ mephisto: onboarding_source: ${task_dir}/server_files/demo_onboarding.html onboarding_qualification: static-test-onboarding-qual units_per_assignment: 2 + log_level: "debug" + provider: + ui_base_url: "http://localhost:3001" task: task_name: html-static-task-example task_title: "Test static task" diff --git a/examples/static_react_task/data/authorization.csv b/examples/static_react_task/data/authorization.csv new file mode 100644 index 000000000..ad1eac0b8 --- /dev/null +++ b/examples/static_react_task/data/authorization.csv @@ -0,0 +1,6 @@ +ο»ΏWORKER_USERNAME +WORKER_USERNAME2 +WORKER_USERNAME3 +x +y +z diff --git a/examples/static_react_task/hydra_configs/conf/example__local__inhouse.yaml b/examples/static_react_task/hydra_configs/conf/example__local__inhouse.yaml index 1df4b7fff..58a962abc 100644 --- a/examples/static_react_task/hydra_configs/conf/example__local__inhouse.yaml +++ b/examples/static_react_task/hydra_configs/conf/example__local__inhouse.yaml @@ -15,6 +15,9 @@ mephisto: link_task_source: false extra_source_dir: ${task_dir}/webapp/src/static units_per_assignment: 1 + log_level: "debug" + provider: + ui_base_url: "http://localhost:3001" task: task_name: react-static-task-example task_title: "Rating a sentence as good or bad" diff --git a/examples/static_react_task/hydra_configs/conf/example_with_authorization__local__inhouse.yaml b/examples/static_react_task/hydra_configs/conf/example_with_authorization__local__inhouse.yaml new file mode 100644 index 000000000..4bbd2a243 --- /dev/null +++ b/examples/static_react_task/hydra_configs/conf/example_with_authorization__local__inhouse.yaml @@ -0,0 +1,31 @@ +#@package _global_ + +# Copyright (c) Meta Platforms and its affiliates. +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +defaults: + - /mephisto/blueprint: static_react_task + - /mephisto/architect: local + - /mephisto/provider: inhouse +mephisto: + blueprint: + task_source: ${task_dir}/webapp/build/bundle.js + extra_source_dir: ${task_dir}/webapp/src/static + units_per_assignment: 1 + passed_qualification_name: "test-react-static-passed_qualification" + block_qualification: "test-react-static-blocked-qualification" + log_level: "debug" + provider: + ui_base_url: "http://localhost:3001" + authorization_csv: ${task_dir}/data/authorization.csv + admit_workers_with_no_prior_qualification: true + task: + task_name: react-static-task-example + task_title: "Rating a sentence as good or bad" + task_description: "In this task, you'll be given a sentence. It is your job to rate it as either good or bad. But only authorized workers can complete this task." + task_reward: 0.05 + task_tags: "test,simple,button" + # We expect to be able to handle 300 concurrent tasks without issue + max_num_concurrent_units: 300 + allowed_concurrent: 1 diff --git a/examples/static_react_task/hydra_configs/conf/example_with_onboarding__local__inhouse.yaml b/examples/static_react_task/hydra_configs/conf/example_with_onboarding__local__inhouse.yaml index 88d5f61c6..30c272110 100644 --- a/examples/static_react_task/hydra_configs/conf/example_with_onboarding__local__inhouse.yaml +++ b/examples/static_react_task/hydra_configs/conf/example_with_onboarding__local__inhouse.yaml @@ -14,6 +14,9 @@ mephisto: extra_source_dir: ${task_dir}/webapp/src/static units_per_assignment: 1 onboarding_qualification: test-react-static-qualification + log_level: "debug" + provider: + ui_base_url: "http://localhost:3001" task: task_name: react-static-task-example task_title: "Rating a sentence as good or bad" diff --git a/examples/static_react_task/hydra_configs/conf/example_with_screening__local__inhouse.yaml b/examples/static_react_task/hydra_configs/conf/example_with_screening__local__inhouse.yaml index 6f64411a1..6f8dd9f53 100644 --- a/examples/static_react_task/hydra_configs/conf/example_with_screening__local__inhouse.yaml +++ b/examples/static_react_task/hydra_configs/conf/example_with_screening__local__inhouse.yaml @@ -17,6 +17,9 @@ mephisto: block_qualification: "test-react-static-blocked-qualification" use_screening_task: true max_screening_units: 3 + log_level: "debug" + provider: + ui_base_url: "http://localhost:3001" task: task_name: react-static-task-example task_title: "Rating a sentence as good or bad" diff --git a/examples/static_react_task/run_task_with_authorization__local__inhouse.py b/examples/static_react_task/run_task_with_authorization__local__inhouse.py new file mode 100644 index 000000000..a63d36a39 --- /dev/null +++ b/examples/static_react_task/run_task_with_authorization__local__inhouse.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python3 + +# Copyright (c) Meta Platforms and its affiliates. +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +from omegaconf import DictConfig + +from mephisto.abstractions.blueprints.abstract.static_task.static_blueprint import ( + SharedStaticTaskState, +) +from mephisto.data_model.qualification import QUAL_GREATER_EQUAL +from mephisto.operations.operator import Operator +from mephisto.tools.building_react_apps import examples +from mephisto.tools.scripts import task_script +from mephisto.utils.qualifications import make_qualification_dict + + +@task_script(default_config_file="example_with_authorization__local__inhouse") +def main(operator: Operator, cfg: DictConfig) -> None: + examples.build_static_react_task( + force_rebuild=cfg.mephisto.task.force_rebuild, + post_install_script=cfg.mephisto.task.post_install_script, + ) + + shared_state = SharedStaticTaskState( + static_task_data=[ + {"text": "This text is good text!"}, + {"text": "This text is bad text!"}, + ], + ) + shared_state.qualifications = [ + make_qualification_dict("test-react-static-passed_qualification", QUAL_GREATER_EQUAL, 2), + ] + + operator.launch_task_run(cfg.mephisto, shared_state) + operator.wait_for_runs_then_shutdown(skip_input=True, log_rate=30) + + +if __name__ == "__main__": + main() diff --git a/examples/static_react_task_with_worker_opinion/hydra_configs/conf/example__local__inhouse.yaml b/examples/static_react_task_with_worker_opinion/hydra_configs/conf/example__local__inhouse.yaml index 0561563fe..15e57792e 100644 --- a/examples/static_react_task_with_worker_opinion/hydra_configs/conf/example__local__inhouse.yaml +++ b/examples/static_react_task_with_worker_opinion/hydra_configs/conf/example__local__inhouse.yaml @@ -14,6 +14,9 @@ mephisto: link_task_source: true extra_source_dir: ${task_dir}/webapp/src/static units_per_assignment: 1 + log_level: "debug" + provider: + ui_base_url: "http://localhost:3001" task: task_name: react-static-task-with-tips task_title: "Rating a sentence as good or bad. There is a tips button that can help when completing this task." diff --git a/examples/static_react_task_with_worker_opinion/hydra_configs/conf/example_with_onboarding__local__inhouse.yaml b/examples/static_react_task_with_worker_opinion/hydra_configs/conf/example_with_onboarding__local__inhouse.yaml index 8ce5caa42..7a9e19e29 100644 --- a/examples/static_react_task_with_worker_opinion/hydra_configs/conf/example_with_onboarding__local__inhouse.yaml +++ b/examples/static_react_task_with_worker_opinion/hydra_configs/conf/example_with_onboarding__local__inhouse.yaml @@ -14,6 +14,9 @@ mephisto: extra_source_dir: ${task_dir}/webapp/src/static units_per_assignment: 1 onboarding_qualification: test-react-static-qualification + log_level: "debug" + provider: + ui_base_url: "http://localhost:3001" task: task_name: react-static-task-with-tips task_title: "Rating a sentence as good or bad. There is a tips button that can help when completing this task." diff --git a/examples/video_annotator_demo/hydra_configs/conf/dynamic_example__local__inhouse.yaml b/examples/video_annotator_demo/hydra_configs/conf/dynamic_example__local__inhouse.yaml index f400cb038..521da0819 100644 --- a/examples/video_annotator_demo/hydra_configs/conf/dynamic_example__local__inhouse.yaml +++ b/examples/video_annotator_demo/hydra_configs/conf/dynamic_example__local__inhouse.yaml @@ -17,6 +17,9 @@ mephisto: link_task_source: false extra_source_dir: ${task_dir}/webapp/src/static units_per_assignment: 2 + log_level: "debug" + provider: + ui_base_url: "http://localhost:3001" task: task_name: "Video annotator" task_title: "Example how to easily create dynamic Tasks to annotate video" diff --git a/examples/video_annotator_demo/hydra_configs/conf/example__local__inhouse.yaml b/examples/video_annotator_demo/hydra_configs/conf/example__local__inhouse.yaml index 037e278b7..c383adbff 100644 --- a/examples/video_annotator_demo/hydra_configs/conf/example__local__inhouse.yaml +++ b/examples/video_annotator_demo/hydra_configs/conf/example__local__inhouse.yaml @@ -17,6 +17,7 @@ mephisto: link_task_source: false extra_source_dir: ${task_dir}/webapp/src/static units_per_assignment: 2 + log_level: "debug" provider: ui_base_url: "http://localhost:3001" task: diff --git a/mephisto/abstractions/blueprints/abstract/static_task/static_blueprint.py b/mephisto/abstractions/blueprints/abstract/static_task/static_blueprint.py index be2020b5c..a341b8bfc 100644 --- a/mephisto/abstractions/blueprints/abstract/static_task/static_blueprint.py +++ b/mephisto/abstractions/blueprints/abstract/static_task/static_blueprint.py @@ -4,53 +4,51 @@ # This source code is licensed under the MIT license found in the # LICENSE file in the root directory of this source tree. -from mephisto.abstractions.blueprint import ( - Blueprint, - BlueprintArgs, - SharedTaskState, -) -from mephisto.abstractions.blueprints.mixins.onboarding_required import ( - OnboardingRequired, - OnboardingSharedState, - OnboardingRequiredArgs, -) -from dataclasses import dataclass, field -from omegaconf import MISSING, DictConfig -from mephisto.abstractions.blueprints.mixins.screen_task_required import ( - ScreenTaskRequired, - ScreenTaskRequiredArgs, - ScreenTaskSharedState, -) -from mephisto.abstractions.blueprints.mixins.use_gold_unit import ( - UseGoldUnit, - UseGoldUnitArgs, - GoldUnitSharedState, +import csv +import json +import os +import types +from dataclasses import dataclass +from dataclasses import field +from typing import Any +from typing import ClassVar +from typing import Dict +from typing import Iterable +from typing import List +from typing import Type +from typing import TYPE_CHECKING + +from omegaconf import DictConfig +from omegaconf import MISSING + +from mephisto.abstractions.blueprint import Blueprint +from mephisto.abstractions.blueprint import BlueprintArgs +from mephisto.abstractions.blueprint import SharedTaskState +from mephisto.abstractions.blueprints.abstract.static_task.empty_task_builder import ( + EmptyStaticTaskBuilder, ) -from mephisto.data_model.assignment import InitializationData from mephisto.abstractions.blueprints.abstract.static_task.static_agent_state import ( StaticAgentState, ) from mephisto.abstractions.blueprints.abstract.static_task.static_task_runner import ( StaticTaskRunner, ) -from mephisto.abstractions.blueprints.abstract.static_task.empty_task_builder import ( - EmptyStaticTaskBuilder, -) - -import os -import csv -import json -import types - -from typing import ClassVar, Type, Any, Dict, Iterable, TYPE_CHECKING +from mephisto.abstractions.blueprints.mixins.onboarding_required import OnboardingRequired +from mephisto.abstractions.blueprints.mixins.onboarding_required import OnboardingRequiredArgs +from mephisto.abstractions.blueprints.mixins.onboarding_required import OnboardingSharedState +from mephisto.abstractions.blueprints.mixins.screen_task_required import ScreenTaskRequired +from mephisto.abstractions.blueprints.mixins.screen_task_required import ScreenTaskRequiredArgs +from mephisto.abstractions.blueprints.mixins.screen_task_required import ScreenTaskSharedState +from mephisto.abstractions.blueprints.mixins.use_gold_unit import GoldUnitSharedState +from mephisto.abstractions.blueprints.mixins.use_gold_unit import UseGoldUnit +from mephisto.abstractions.blueprints.mixins.use_gold_unit import UseGoldUnitArgs +from mephisto.data_model.assignment import InitializationData if TYPE_CHECKING: + from mephisto.abstractions.blueprint import AgentState + from mephisto.abstractions.blueprint import TaskBuilder + from mephisto.abstractions.blueprint import TaskRunner from mephisto.data_model.task_run import TaskRun - from mephisto.abstractions.blueprint import ( - AgentState, - TaskRunner, - TaskBuilder, - ) BLUEPRINT_TYPE_STATIC = "abstract_static" @@ -140,9 +138,13 @@ def __init__( shared_state: "SharedStaticTaskState", ): super().__init__(task_run, args, shared_state) + # Originally just a list of dicts, but can also be a generator of dicts - self._initialization_data_dicts: Iterable[Dict[str, Any]] = [] + self._initialization_data_dicts: List[Dict[str, Any]] = [] + blue_args = args.blueprint + + # CSV if blue_args.get("data_csv", None) is not None: csv_file = os.path.expanduser(blue_args.data_csv) with open(csv_file, "r", encoding="utf-8-sig") as csv_fp: @@ -153,12 +155,16 @@ def __init__( for i, col in enumerate(row): row_data[headers[i]] = col self._initialization_data_dicts.append(row_data) + + # JSON elif blue_args.get("data_json", None) is not None: json_file = os.path.expanduser(blue_args.data_json) with open(json_file, "r", encoding="utf-8-sig") as json_fp: json_data = json.load(json_fp) for jd in json_data: self._initialization_data_dicts.append(jd) + + # JSONL elif blue_args.get("data_jsonl", None) is not None: jsonl_file = os.path.expanduser(blue_args.data_jsonl) with open(jsonl_file, "r", encoding="utf-8-sig") as jsonl_fp: @@ -167,6 +173,8 @@ def __init__( j = json.loads(line) self._initialization_data_dicts.append(j) line = jsonl_fp.readline() + + # Task data (dynamically passed as `static_task_data` argument - not from files) elif shared_state.static_task_data is not None: self._initialization_data_dicts = shared_state.static_task_data else: @@ -181,20 +189,33 @@ def assert_task_args(cls, args: DictConfig, shared_state: "SharedTaskState"): assert isinstance( shared_state, SharedStaticTaskState ), "Must use SharedStaticTaskState for static blueprints" + blue_args = args.blueprint + + # CSV if blue_args.get("data_csv", None) is not None: csv_file = os.path.expanduser(blue_args.data_csv) + assert os.path.exists(csv_file), f"Provided csv file {csv_file} doesn't exist" + + # JSON elif blue_args.get("data_json", None) is not None: json_file = os.path.expanduser(blue_args.data_json) + assert os.path.exists(json_file), f"Provided JSON file {json_file} doesn't exist" + + # JSONL elif blue_args.get("data_jsonl", None) is not None: jsonl_file = os.path.expanduser(blue_args.data_jsonl) + assert os.path.exists(jsonl_file), f"Provided JSON-L file {jsonl_file} doesn't exist" + + # Task data (dynamically passed as `static_task_data` argument - not from files) elif shared_state.static_task_data is not None: if isinstance(shared_state.static_task_data, types.GeneratorType): # TODO(#97) can we check something about this? - # Some discussion here: https://stackoverflow.com/questions/661603/how-do-i-know-if-a-generator-is-empty-from-the-start + # Some discussion here: + # https://stackoverflow.com/questions/661603/how-do-i-know-if-a-generator-is-empty-from-the-start pass else: assert ( @@ -204,9 +225,7 @@ def assert_task_args(cls, args: DictConfig, shared_state: "SharedTaskState"): raise AssertionError("Must provide one of a data csv, json, json-L, or a list of tasks") def get_initialization_data(self) -> Iterable["InitializationData"]: - """ - Return the InitializationData retrieved from the specified stream - """ + """Return the InitializationData retrieved from the specified stream""" if isinstance(self._initialization_data_dicts, types.GeneratorType): def data_generator() -> Iterable["InitializationData"]: @@ -220,7 +239,8 @@ def data_generator() -> Iterable["InitializationData"]: else: return [ InitializationData( - shared=d, unit_data=[{}] * self.args.blueprint.units_per_assignment + shared=d, + unit_data=[{}] * self.args.blueprint.units_per_assignment, ) for d in self._initialization_data_dicts ] diff --git a/mephisto/abstractions/blueprints/remote_procedure/remote_procedure_blueprint.py b/mephisto/abstractions/blueprints/remote_procedure/remote_procedure_blueprint.py index 99a59a987..7923ef39b 100644 --- a/mephisto/abstractions/blueprints/remote_procedure/remote_procedure_blueprint.py +++ b/mephisto/abstractions/blueprints/remote_procedure/remote_procedure_blueprint.py @@ -245,9 +245,7 @@ def assert_task_args(cls, args: "DictConfig", shared_state: "SharedTaskState") - ) def get_initialization_data(self) -> Iterable["InitializationData"]: - """ - Return the InitializationData retrieved from the specified stream - """ + """Return the InitializationData retrieved from the specified stream""" if isinstance(self._initialization_data_dicts, types.GeneratorType): def data_generator() -> Iterable["InitializationData"]: diff --git a/mephisto/abstractions/database.py b/mephisto/abstractions/database.py index 6a09f0e87..fa1e46a91 100644 --- a/mephisto/abstractions/database.py +++ b/mephisto/abstractions/database.py @@ -994,13 +994,16 @@ def find_granted_qualifications( self, worker_id: Optional[str] = None, qualification_id: Optional[str] = None, + value: Optional[int] = None, ) -> List[GrantedQualification]: """ Find granted qualifications. If nothing supplied, returns all granted qualifications. """ return self._check_granted_qualifications( - worker_id=worker_id, qualification_id=qualification_id + worker_id=worker_id, + qualification_id=qualification_id, + value=value, ) @abstractmethod @@ -1040,7 +1043,9 @@ def check_granted_qualifications( """ warnings.warn("Use 'find_granted_qualifications' instead.", DeprecationWarning) return self._check_granted_qualifications( - qualification_id=qualification_id, worker_id=worker_id, value=value + qualification_id=qualification_id, + worker_id=worker_id, + value=value, ) @abstractmethod diff --git a/mephisto/abstractions/providers/inhouse/README.md b/mephisto/abstractions/providers/inhouse/README.md index 0545848af..4abf69808 100644 --- a/mephisto/abstractions/providers/inhouse/README.md +++ b/mephisto/abstractions/providers/inhouse/README.md @@ -4,4 +4,4 @@ LICENSE file in the root directory of this source tree. --> -Inhouse provider is a local Mephisto provider in case you want to run your tasks without any third-party provider. +In-House provider is a local Mephisto provider in case you want to run your tasks without any third-party provider. diff --git a/mephisto/abstractions/providers/inhouse/inhouse_agent.py b/mephisto/abstractions/providers/inhouse/inhouse_agent.py index 32c0d5621..537473820 100644 --- a/mephisto/abstractions/providers/inhouse/inhouse_agent.py +++ b/mephisto/abstractions/providers/inhouse/inhouse_agent.py @@ -67,7 +67,7 @@ def new_from_provider_data( bookkeeping information from a crowd provider for this agent """ logger.debug( - f"{cls.log_prefix}Registering Inhouse Submission in datastore from Inhouse. " + f"{cls.log_prefix}Registering In-House Submission in datastore from Inhouse. " f"Data: {provider_data}" ) @@ -139,7 +139,7 @@ def reject_work(self, review_note: Optional[str] = None) -> None: def mark_done(self) -> None: """ - Inhouse agents are marked as done on the side of Inhouse, so if this agent + In-House agents are marked as done on the side of Inhouse, so if this agent is marked as done there's nothing else we need to do as the task has been submitted. """ diff --git a/mephisto/abstractions/providers/inhouse/inhouse_datastore_export.py b/mephisto/abstractions/providers/inhouse/inhouse_datastore_export.py index c5cfd8503..6afd620c8 100644 --- a/mephisto/abstractions/providers/inhouse/inhouse_datastore_export.py +++ b/mephisto/abstractions/providers/inhouse/inhouse_datastore_export.py @@ -18,5 +18,5 @@ def export_datastore( task_run_ids: Optional[List[str]] = None, **kwargs, ) -> dict: - """Logic of collecting export data from Inhouse datastore""" + """Logic of collecting export data from In-House datastore""" return {} diff --git a/mephisto/abstractions/providers/inhouse/inhouse_provider.py b/mephisto/abstractions/providers/inhouse/inhouse_provider.py index 114a59609..b230237eb 100644 --- a/mephisto/abstractions/providers/inhouse/inhouse_provider.py +++ b/mephisto/abstractions/providers/inhouse/inhouse_provider.py @@ -9,6 +9,7 @@ from dataclasses import field from typing import Any from typing import ClassVar +from typing import Optional from typing import Type from typing import TYPE_CHECKING @@ -55,13 +56,29 @@ class InhouseProviderArgs(ProviderArgs): "required": False, }, ) + admit_workers_with_no_prior_qualification: Optional[bool] = field( + default=None, + metadata={ + "help": ( + "If task has required qualifications, but worker is newly created, " + "and this param is True, we allow this worker to complete this Task" + ), + "required": False, + }, + ) + authorization_csv: Optional[str] = field( + default=None, + metadata={ + "help": "Path to csv file containing list of worker names", + }, + ) @register_mephisto_abstraction() class InhouseProvider(CrowdProvider): """ - Inhouse implementation of a CrowdProvider that stores everything - in a local state in the class for use in tests. + Implementation of a CrowdProvider to run tasks + without any integration with third-party services """ UnitClass: ClassVar[Type["Unit"]] = InhouseUnit @@ -83,7 +100,7 @@ def initialize_provider_datastore(self, storage_path: str) -> Any: @property def log_prefix(self) -> str: - return "[Inhouse Provider] " + return "[In-House Provider] " def setup_resources_for_task_run( self, @@ -92,12 +109,12 @@ def setup_resources_for_task_run( shared_state: "SharedTaskState", server_url: str, ) -> None: - logger.debug(f"{self.log_prefix}Setting up Inhouse resources for TaskRun") + logger.debug(f"{self.log_prefix}Setting up In-House resources for TaskRun") return None def cleanup_resources_from_task_run(self, task_run: "TaskRun", server_url: str) -> None: """Cleanup all temporary data for this TaskRun""" - logger.debug(f"{self.log_prefix}Cleanning up Inhouse resources from TaskRun") + logger.debug(f"{self.log_prefix}Cleanning up In-House resources from TaskRun") return None @classmethod diff --git a/mephisto/abstractions/providers/inhouse/inhouse_requester.py b/mephisto/abstractions/providers/inhouse/inhouse_requester.py index 4d74b3267..944a25691 100644 --- a/mephisto/abstractions/providers/inhouse/inhouse_requester.py +++ b/mephisto/abstractions/providers/inhouse/inhouse_requester.py @@ -61,18 +61,18 @@ def log_prefix(self) -> str: return "[Inhouse Requester] " def register(self, args: Optional[InhouseRequesterArgs] = None) -> None: - logger.debug(f"{self.log_prefix}Registering Inhouse requester") + logger.debug(f"{self.log_prefix}Registering In-House requester") return None def is_registered(self) -> bool: """Return whether this requester has registered yet""" logger.debug( - f"{self.log_prefix}Check if Inhouse requester is registered: {DEFAULT_IS_REGISTERED}" + f"{self.log_prefix}Check if In-House requester is registered: {DEFAULT_IS_REGISTERED}" ) return DEFAULT_IS_REGISTERED def get_available_budget(self) -> float: - logger.debug(f"{self.log_prefix}Check if Inhouse requester is registered: true") + logger.debug(f"{self.log_prefix}Check if In-House requester is registered: true") return DEFAULT_AVAILABLE_BUDGET @staticmethod diff --git a/mephisto/abstractions/providers/inhouse/inhouse_unit.py b/mephisto/abstractions/providers/inhouse/inhouse_unit.py index 56d48b4c3..22d651d8a 100644 --- a/mephisto/abstractions/providers/inhouse/inhouse_unit.py +++ b/mephisto/abstractions/providers/inhouse/inhouse_unit.py @@ -63,11 +63,12 @@ def get_pay_amount(self) -> float: return total_amount def launch(self, task_url: str) -> None: - """Publish this Unit on Inhouse (making it available)""" + """Publish this Unit on In-House (making it available)""" logger.debug(f"{self.log_prefix}Launching Unit") self.set_db_status(AssignmentState.LAUNCHED) task_run: TaskRun = self.get_task_run() - ui_base_url = task_run.args.provider.ui_base_url + provider_args = task_run.get_provider_args() + ui_base_url = provider_args.ui_base_url # This param `id` will only be used by `getAssignmentId` from `wrap_crowd_source.js` # as any random pseudo id to pass server validation unit_url = f"{ui_base_url}?worker_id=WORKER_USERNAME&id={self.assignment_id}" diff --git a/mephisto/abstractions/providers/inhouse/inhouse_worker.py b/mephisto/abstractions/providers/inhouse/inhouse_worker.py index c01aea435..fa904225d 100644 --- a/mephisto/abstractions/providers/inhouse/inhouse_worker.py +++ b/mephisto/abstractions/providers/inhouse/inhouse_worker.py @@ -4,6 +4,8 @@ # This source code is licensed under the MIT license found in the # LICENSE file in the root directory of this source tree. +import csv +import os from typing import Any from typing import cast from typing import List @@ -68,7 +70,7 @@ def bonus_worker( logger.debug( f"{self.log_prefix}Trying to pay bonuses. " - f"But with Inhouse provider you need to do this manually" + f"But with In-House provider you need to do this manually" ) return True, "" @@ -107,9 +109,33 @@ def is_eligible(self, task_run: "TaskRun") -> bool: """Determine if this worker is eligible for the given task run""" return DEFAULT_IS_ELIGIBLE + def is_authorized(self, task_run: "TaskRun") -> bool: + provider_args = task_run.get_provider_args() + + # 1. If `authorization_csv` was not specified in config, we authorize all by default + if provider_args.get("authorization_csv", None) is None: + return super().is_authorized(task_run) + + # 2. Check if the name of current Worker is in the CSV file + try: + csv_file = os.path.expanduser(provider_args.authorization_csv) + with open(csv_file, "r", encoding="utf-8-sig") as csv_fp: + csv_reader = csv.reader(csv_fp) + authorized_worker_names = [row[0] for row in csv_reader] + + if self.worker_name not in authorized_worker_names: + return False + except (csv.Error, ValueError, OSError): + logger.exception(f"{self.log_prefix}Could not read authorization CSV file") + raise + + return super().is_authorized(task_run) + def send_feedback_message(self, text: str, unit: "Unit") -> bool: """Send feedback message to a worker""" - logger.debug(f"Inhouse sending feedback message to worker: '{text}'. Unit: {unit}") + logger.debug( + f"{self.log_prefix}In-House sending feedback message to worker: '{text}'. Unit: {unit}" + ) return True @staticmethod diff --git a/mephisto/abstractions/providers/inhouse/migrations/__init__.py b/mephisto/abstractions/providers/inhouse/migrations/__init__.py index d541d9696..9910edd4d 100644 --- a/mephisto/abstractions/providers/inhouse/migrations/__init__.py +++ b/mephisto/abstractions/providers/inhouse/migrations/__init__.py @@ -5,5 +5,5 @@ # LICENSE file in the root directory of this source tree. migrations = { - # Add migrations here. See Inhouse provider as an example + # Add migrations here. See In-House provider as an example } diff --git a/mephisto/abstractions/providers/inhouse/wrap_crowd_source.js b/mephisto/abstractions/providers/inhouse/wrap_crowd_source.js index b197742c5..55e9a1a3a 100644 --- a/mephisto/abstractions/providers/inhouse/wrap_crowd_source.js +++ b/mephisto/abstractions/providers/inhouse/wrap_crowd_source.js @@ -24,16 +24,16 @@ const UNIT_URL_WORKER_ID_PARAM = 'worker_id'; const UNIT_URL_PSEUDO_ASSIGNMENT_ID_PARAM = 'id'; -// Inhouse IMPLEMENTATION +// In-House IMPLEMENTATION function getWorkerName() { - // Inhouse worker name is passed via url params as UNIT_URL_WORKER_ID_PARAM + // In-House worker name is passed via url params as UNIT_URL_WORKER_ID_PARAM let urlParams = new URLSearchParams(window.location.search); return urlParams.get(UNIT_URL_WORKER_ID_PARAM); } function getAssignmentId() { - // Inhouse assignment ID is passed via url params as UNIT_URL_PSEUDO_ASSIGNMENT_ID_PARAM + // In-House assignment ID is passed via url params as UNIT_URL_PSEUDO_ASSIGNMENT_ID_PARAM // We are not using this id and only insert it to pass server validation let urlParams = new URLSearchParams(window.location.search); return urlParams.get(UNIT_URL_PSEUDO_ASSIGNMENT_ID_PARAM); @@ -41,7 +41,7 @@ function getAssignmentId() { function getAgentRegistration() { - // Inhouse agents are created using the Mephisto `worker_id` and Inhouse `submission_id` + // In-House agents are created using the Mephisto `worker_id` and In-House `submission_id` return { worker_name: getWorkerName(), agent_registration_id: getAssignmentId() + "-" + getWorkerName(), diff --git a/mephisto/abstractions/providers/prolific/prolific_provider.py b/mephisto/abstractions/providers/prolific/prolific_provider.py index 9127a10ee..1ba97b5ed 100644 --- a/mephisto/abstractions/providers/prolific/prolific_provider.py +++ b/mephisto/abstractions/providers/prolific/prolific_provider.py @@ -13,6 +13,7 @@ from typing import cast from typing import ClassVar from typing import List +from typing import Optional from typing import Type from typing import TYPE_CHECKING @@ -140,10 +141,7 @@ class ProlificProviderArgs(ProviderArgs): @register_mephisto_abstraction() class ProlificProvider(CrowdProvider): - """ - Prolific implementation of a CrowdProvider that stores everything - in a local state in the class for use in tests. - """ + """Implementation of a CrowdProvider that interfaces with Prolific""" UnitClass: ClassVar[Type["Unit"]] = ProlificUnit @@ -170,6 +168,20 @@ def _get_client(self, requester_name: str) -> ProlificClient: """Get a Prolific client""" return self.datastore.get_client_for_requester(requester_name) + def _get_requester(self) -> "ProlificRequester": + requester: "ProlificRequester" = cast( + "ProlificRequester", + self.db.find_requesters(provider_type=self.provider_type)[-1], + ) + return requester + + def _get_last_task_run(self, requester: Optional["Requester"] = None) -> "TaskRun": + if not requester: + requester = self._get_requester() + + task_runs: List[TaskRun] = requester.get_task_runs() + return task_runs[-1] + def _get_qualified_workers( self, qualifications: List[QualificationType], @@ -181,7 +193,8 @@ def _get_qualified_workers( available_workers = [w for w in workers if w.worker_name not in bloked_participant_ids] for worker in available_workers: - if worker_is_qualified(worker, qualifications): + task_run = self._get_last_task_run() + if worker_is_qualified(worker, qualifications, task_run): qualified_workers.append(worker) return qualified_workers diff --git a/mephisto/abstractions/providers/prolific/prolific_worker.py b/mephisto/abstractions/providers/prolific/prolific_worker.py index 7eb0fc7f4..8e012126f 100644 --- a/mephisto/abstractions/providers/prolific/prolific_worker.py +++ b/mephisto/abstractions/providers/prolific/prolific_worker.py @@ -53,6 +53,20 @@ def _get_client(self, requester_name: str) -> Any: """Get a Prolific client for usage with `prolific_utils`""" return self.datastore.get_client_for_requester(requester_name) + def _get_requester(self) -> "ProlificRequester": + requester: "ProlificRequester" = cast( + "ProlificRequester", + self.db.find_requesters(provider_type=self.provider_type)[-1], + ) + return requester + + def _get_last_task_run(self, requester: Optional["Requester"] = None) -> "TaskRun": + if not requester: + requester = self._get_requester() + + task_runs: List[TaskRun] = requester.get_task_runs() + return task_runs[-1] + @property def log_prefix(self) -> str: return f"[Worker {self.db_id}] " @@ -102,11 +116,6 @@ def bonus_worker( return True, "" - @staticmethod - def _get_first_task_run(requester: "Requester") -> "TaskRun": - task_runs: List[TaskRun] = requester.get_task_runs() - return task_runs[0] - def block_worker( self, reason: str, @@ -126,7 +135,7 @@ def block_worker( task_run = unit.get_assignment().get_task_run() requester: "ProlificRequester" = cast("ProlificRequester", task_run.get_requester()) else: - task_run = self._get_first_task_run(requester) + task_run = self._get_last_task_run(requester) logger.debug(f"{self.log_prefix}Task Run: {task_run}") @@ -162,7 +171,7 @@ def unblock_worker(self, reason: str, requester: "Requester") -> Tuple[bool, str """Unblock a blocked worker for the specified reason. Return success of unblock""" logger.debug(f"{self.log_prefix}Unlocking worker {self.worker_name}") - task_run = self._get_first_task_run(requester) + task_run = self._get_last_task_run(requester) logger.debug(f"{self.log_prefix}Task Run: {task_run}") @@ -181,8 +190,7 @@ def unblock_worker(self, reason: str, requester: "Requester") -> Tuple[bool, str def is_blocked(self, requester: "Requester") -> bool: """Determine if a worker is blocked""" - task_run = self._get_first_task_run(requester) - requester: "ProlificRequester" = cast("ProlificRequester", requester) + task_run = self._get_last_task_run(requester) is_blocked = self.datastore.get_worker_blocked(self.get_prolific_participant_id()) logger.debug( @@ -228,7 +236,8 @@ def _grant_crowd_qualifications( ] for qualifications, prolific_participant_group_id in qualifications_groups: - if worker_is_qualified(self, qualifications): + task_run = self._get_last_task_run() + if worker_is_qualified(self, qualifications, task_run): # Worker is still qualified or was upgraded, and so is eligible now prolific_utils.give_worker_qualification( client, @@ -302,7 +311,6 @@ def send_feedback_message(self, text: str, unit: "Unit") -> bool: "ProlificRequester", self.db.find_requesters(provider_type=self.provider_type)[-1], ) - # assert isinstance(requester, ProlificRequester), "Must be an Prolific requester" client = self._get_client(requester.requester_name) datastore_unit = self.datastore.get_unit(unit.db_id) diff --git a/mephisto/data_model/task.py b/mephisto/data_model/task.py index fad9970b2..07c600548 100644 --- a/mephisto/data_model/task.py +++ b/mephisto/data_model/task.py @@ -4,34 +4,27 @@ # This source code is licensed under the MIT license found in the # LICENSE file in the root directory of this source tree. - -import os from datetime import datetime -from shutil import copytree +from typing import Any +from typing import List +from typing import Mapping +from typing import Optional +from typing import TYPE_CHECKING from dateutil.parser import parse +from mephisto.data_model._db_backed_meta import MephistoDataModelComponentMixin +from mephisto.data_model._db_backed_meta import MephistoDBBackedMeta from mephisto.data_model.project import Project -from mephisto.data_model._db_backed_meta import ( - MephistoDBBackedMeta, - MephistoDataModelComponentMixin, -) from mephisto.utils.dirs import get_dir_for_task -from functools import reduce - -from typing import List, Optional, Mapping, TYPE_CHECKING, Any - if TYPE_CHECKING: from mephisto.abstractions.database import MephistoDB from mephisto.data_model.assignment import Assignment - from mephisto.data_model.worker import Worker - from mephisto.data_model.unit import Unit from mephisto.data_model.task_run import TaskRun - from mephisto.abstractions.crowd_provider import CrowdProvider -def assert_task_is_valid(dir_name, task_type) -> None: +def assert_task_is_valid(dir_name: str, task_type: str) -> None: """ Go through the given task directory and ensure it is valid under the given task type @@ -59,10 +52,14 @@ def __init__( "Direct Task and data model access via Task(db, id) is " "now deprecated in favor of calling Task.get(db, id). " ) + self.db: "MephistoDB" = db + if row is None: row = db.get_task(db_id) + assert row is not None, f"Given db_id {db_id} did not exist in given db" + self.db_id: str = row["task_id"] self.task_name: str = row["task_name"] self.task_type: str = row["task_type"] @@ -71,13 +68,11 @@ def __init__( self.creation_date: Optional[datetime] = parse(row["creation_date"]) def get_project(self) -> Optional[Project]: - """ - Get the project for this task, if it exists - """ + """Get the project for this task, if it exists""" if self.project_id is not None: return Project.get(self.db, self.project_id) - else: - return None + + return None def set_project(self, project: Project) -> None: if self.project_id != project.db_id: @@ -85,15 +80,11 @@ def set_project(self, project: Project) -> None: raise NotImplementedError() def get_runs(self) -> List["TaskRun"]: - """ - Return all of the runs of this task that have been launched - """ + """Return all of the runs of this task that have been launched""" return self.db.find_task_runs(task_id=self.db_id) def get_assignments(self) -> List["Assignment"]: - """ - Return all of the assignments for all runs of this task - """ + """Return all of the assignments for all runs of this task""" return self.db.find_assignments(task_id=self.db_id) def get_total_spend(self) -> float: @@ -101,8 +92,10 @@ def get_total_spend(self) -> float: Return the total amount of funding spent for this task. """ total_spend = 0.0 + for task_run in self.get_runs(): total_spend += task_run.get_total_spend() + return total_spend @staticmethod @@ -124,11 +117,13 @@ def new( ), f"A task named {task_name} already exists!" new_task_dir = get_dir_for_task(task_name, not_exists_ok=True) + assert new_task_dir is not None, "Should always be able to make a new task dir" + assert_task_is_valid(new_task_dir, task_type) db_id = db.new_task(task_name, task_type) return Task.get(db, db_id) - def __repr__(self): - return f"Task-{self.task_name} [{self.task_type}]" + def __repr__(self): + return f"Task-{self.task_name} [{self.task_type}]" diff --git a/mephisto/data_model/task_run.py b/mephisto/data_model/task_run.py index 2ccc1fbbd..c92993070 100644 --- a/mephisto/data_model/task_run.py +++ b/mephisto/data_model/task_run.py @@ -375,6 +375,9 @@ def get_task(self) -> "Task": def get_task_args(self) -> "DictConfig": return self.args.task + def get_provider_args(self) -> "DictConfig": + return self.args.provider + def get_requester(self) -> Requester: """ Return the requester that started this task. diff --git a/mephisto/data_model/worker.py b/mephisto/data_model/worker.py index d308f7c40..1b1683f8d 100644 --- a/mephisto/data_model/worker.py +++ b/mephisto/data_model/worker.py @@ -147,7 +147,7 @@ def get_granted_qualification( return None return granted_qualifications[0] - def is_disqualified(self, qualification_name: str): + def is_disqualified(self, qualification_name: str) -> bool: """ Find out if the given worker has been disqualified by the given qualification @@ -157,9 +157,10 @@ def is_disqualified(self, qualification_name: str): qualification = self.get_granted_qualification(qualification_name) if qualification is None: return False + return not qualification.value - def is_qualified(self, qualification_name: str): + def is_qualified(self, qualification_name: str) -> bool: """ Find out if the given worker has qualified by the given qualification @@ -169,8 +170,13 @@ def is_qualified(self, qualification_name: str): qualification = self.get_granted_qualification(qualification_name) if qualification is None: return False + return bool(qualification.value) + def is_authorized(self, task_run: "TaskRun") -> bool: + """Optional authorization. All workers are authorized by default""" + return True + def revoke_qualification( self, qualification_name: str, @@ -239,9 +245,6 @@ def grant_qualification( return True - def __repr__(self) -> str: - return f"{self.__class__.__name__}({self.db_id})" - # Children classes can implement the following methods def grant_crowd_qualification(self, qualification_name: str, value: int = 1) -> None: @@ -309,3 +312,6 @@ def new(db: "MephistoDB", worker_name: str) -> "Worker": can be successfully created to have it put into the db. """ raise NotImplementedError() + + def __repr__(self) -> str: + return f"{self.__class__.__name__}({self.db_id})" diff --git a/mephisto/operations/client_io_handler.py b/mephisto/operations/client_io_handler.py index 2f336e2e9..9f14882fd 100644 --- a/mephisto/operations/client_io_handler.py +++ b/mephisto/operations/client_io_handler.py @@ -363,12 +363,11 @@ def _register_agent(self, packet: Packet, channel_id: str) -> None: ) def enqueue_agent_details(self, request_id: str, additional_data: Dict[str, Any]): - """ - Synchronous method to enqueue a message sending the given agent details - """ + """Synchronous method to enqueue a message sending the given agent details""" base_data = {"request_id": request_id} for key, val in additional_data.items(): base_data[key] = val + self.message_queue.put( Packet( packet_type=PACKET_TYPE_AGENT_DETAILS, @@ -378,6 +377,7 @@ def enqueue_agent_details(self, request_id: str, additional_data: Dict[str, Any] ) self.process_outgoing_queue(self.message_queue) self.log_metrics_for_packet(self.request_id_to_packet[request_id]) + # TODO Sometimes this request ID is lost, and we don't quite know why del self.request_id_to_channel_id[request_id] del self.request_id_to_packet[request_id] diff --git a/mephisto/operations/datatypes.py b/mephisto/operations/datatypes.py index c799a2894..597773c82 100644 --- a/mephisto/operations/datatypes.py +++ b/mephisto/operations/datatypes.py @@ -9,21 +9,22 @@ to facilitate executing task runs. """ -from dataclasses import dataclass import asyncio -from functools import partial -from typing import Dict, Set, Optional, List, Any, Union, TYPE_CHECKING import threading +from dataclasses import dataclass +from functools import partial +from typing import Any +from typing import Dict +from typing import List +from typing import TYPE_CHECKING if TYPE_CHECKING: - from mephisto.data_model.task_run import TaskRun + from mephisto.abstractions.architect import Architect from mephisto.abstractions.blueprint import TaskRunner, Blueprint from mephisto.abstractions.crowd_provider import CrowdProvider - from mephisto.abstractions.architect import Architect - from mephisto.operations.task_launcher import TaskLauncher - from mephisto.abstractions._subcomponents.channel import Channel - from mephisto.data_model.agent import Agent, OnboardingAgent + from mephisto.data_model.task_run import TaskRun from mephisto.operations.client_io_handler import ClientIOHandler + from mephisto.operations.task_launcher import TaskLauncher from mephisto.operations.worker_pool import WorkerPool @@ -80,7 +81,17 @@ def shutdown(self): class WorkerFailureReasons: NOT_QUALIFIED = "You are not currently qualified to work on this task..." + NOT_AUTHORIZED = "You are not authorized to work on this task..." NO_AVAILABLE_UNITS = "There is currently no available work, please try again later..." - TOO_MANY_CONCURRENT = "You are currently working on too many tasks concurrently to accept another, please finish your current work." - MAX_FOR_TASK = "You have already completed the maximum amount of tasks the requester has set for this task." - TASK_MISSING = "You appear to have already completed this task, or have disconnected long enough for your session to clear..." + TOO_MANY_CONCURRENT = ( + "You are currently working on too many tasks concurrently to accept another, " + "please finish your current work." + ) + MAX_FOR_TASK = ( + "You have already completed the maximum amount of tasks " + "the requester has set for this task." + ) + TASK_MISSING = ( + "You appear to have already completed this task, " + "or have disconnected long enough for your session to clear..." + ) diff --git a/mephisto/operations/worker_pool.py b/mephisto/operations/worker_pool.py index e2b88d72a..70406bf43 100644 --- a/mephisto/operations/worker_pool.py +++ b/mephisto/operations/worker_pool.py @@ -5,12 +5,20 @@ # LICENSE file in the root directory of this source tree. import time +from dataclasses import dataclass +from dataclasses import fields from functools import partial -from dataclasses import dataclass, fields -from prometheus_client import Histogram, Gauge, Counter # type: ignore -from mephisto.data_model.worker import Worker -from mephisto.data_model.agent import Agent, OnboardingAgent -from mephisto.utils.qualifications import worker_is_qualified +from typing import Any +from typing import Dict +from typing import List +from typing import Optional +from typing import TYPE_CHECKING +from typing import Union + +from prometheus_client import Counter # type: ignore +from prometheus_client import Gauge +from prometheus_client import Histogram + from mephisto.abstractions.blueprint import AgentState from mephisto.abstractions.blueprints.mixins.onboarding_required import ( OnboardingRequired, @@ -19,28 +27,24 @@ ScreenTaskRequired, ) from mephisto.abstractions.blueprints.mixins.use_gold_unit import UseGoldUnit -from mephisto.operations.task_launcher import ( - SCREENING_UNIT_INDEX, - GOLD_UNIT_INDEX, -) -from mephisto.operations.datatypes import LiveTaskRun, WorkerFailureReasons - -from typing import Sequence, Dict, Union, Optional, List, Any, TYPE_CHECKING +from mephisto.data_model.agent import Agent +from mephisto.data_model.agent import OnboardingAgent +from mephisto.data_model.worker import Worker +from mephisto.operations.datatypes import LiveTaskRun +from mephisto.operations.datatypes import WorkerFailureReasons +from mephisto.utils.logger_core import get_logger +from mephisto.utils.qualifications import worker_is_qualified if TYPE_CHECKING: - from mephisto.data_model.unit import Unit from mephisto.abstractions.database import MephistoDB - from mephisto.data_model.task_run import TaskRun - -from mephisto.utils.logger_core import get_logger - -logger = get_logger(name=__name__) + from mephisto.data_model.unit import Unit AGENT_DETAILS_COUNT = Counter( "agent_details_responses", "Responses to agent details requests", ["response"] ) AGENT_DETAILS_COUNT.labels(response="not_qualified") +AGENT_DETAILS_COUNT.labels(response="not_authorized") AGENT_DETAILS_COUNT.labels(response="no_available_units") AGENT_DETAILS_COUNT.labels(response="agent_missing") AGENT_DETAILS_COUNT.labels(response="reconnection") @@ -69,6 +73,8 @@ EXTERNAL_FUNCTION_LATENCY.labels(function="launch_screening_unit") EXTERNAL_FUNCTION_LATENCY.labels(function="get_gold_unit_data_for_worker") +logger = get_logger(name=__name__) + @dataclass class OnboardingInfo: @@ -99,14 +105,17 @@ class WorkerPool: def __init__(self, db: "MephistoDB"): self.db = db + # Tracked agents self.agents: Dict[str, "Agent"] = {} self.onboarding_agents: Dict[str, "OnboardingAgent"] = {} self.onboarding_infos: Dict[str, OnboardingInfo] = {} self.final_onboardings: Dict[str, "OnboardingAgent"] = {} + # Agent status handling self.last_status_check = time.time() + # A mark that this pool is already shutdown self.is_shutdown = False # Deferred initializiation @@ -117,12 +126,15 @@ def register_run(self, live_run: "LiveTaskRun") -> None: assert ( self._live_run is None ), "Cannot associate more than one live run to a worker pool at a time" + self._live_run = live_run def get_live_run(self) -> "LiveTaskRun": """Get the associated live run for this worker pool, asserting it's set""" live_run = self._live_run + assert live_run is not None, "Live run must be registered to use this" + return live_run def get_agent_for_id(self, agent_id: str) -> Optional[Union["Agent", "OnboardingAgent"]]: @@ -134,23 +146,25 @@ def get_agent_for_id(self, agent_id: str) -> Optional[Union["Agent", "Onboarding elif agent_id in self.final_onboardings: logger.debug(f"Found agent id {agent_id} in final_onboardings for get_agent_for_id") return self.final_onboardings[agent_id] + return None async def register_worker(self, crowd_data: Dict[str, Any], request_id: str) -> None: - """ - First process the worker registration, then hand off for - registering an agent - """ + """First process the worker registration, then hand off for registering an agent""" live_run = self.get_live_run() loop = live_run.loop_wrap.loop crowd_provider = live_run.provider - is_sandbox = crowd_provider.is_sandbox() worker_name = crowd_data["worker_name"] + + # 1. Append postfix to Worker name if current provider is sandbox if crowd_provider.is_sandbox(): # TODO(WISH) there are better ways to get rid of this designation worker_name += "_sandbox" + + # 2. Get or create Worker workers = await loop.run_in_executor( - None, partial(self.db.find_workers, worker_name=worker_name) + None, + partial(self.db.find_workers, worker_name=worker_name), ) if len(workers) == 0: worker = await loop.run_in_executor( @@ -164,16 +178,34 @@ async def register_worker(self, crowd_data: Dict[str, Any], request_id: str) -> else: worker = workers[0] + # 3. Check if Worker is authorized to work on this task + is_authorized = await loop.run_in_executor( + None, + partial(worker.is_authorized, live_run.task_run), + ) + if not is_authorized: + # 3a. If not, we send an error message and do not continue + AGENT_DETAILS_COUNT.labels(response="not_authorized").inc() + live_run.client_io.enqueue_agent_details( + request_id, + AgentDetails(failure_reason=WorkerFailureReasons.NOT_AUTHORIZED).to_dict(), + ) + return + + # 4. Check if Worker qualified to work on this task is_qualified = await loop.run_in_executor( - None, partial(worker_is_qualified, worker, live_run.qualifications) + None, + partial(worker_is_qualified, worker, live_run.qualifications, live_run.task_run), ) if not is_qualified: + # 4a. Send an error message to the client AGENT_DETAILS_COUNT.labels(response="not_qualified").inc() live_run.client_io.enqueue_agent_details( request_id, AgentDetails(failure_reason=WorkerFailureReasons.NOT_QUALIFIED).to_dict(), ) else: + # 4b. Proceed other steps (register Agent) await self.register_agent(crowd_data, worker, request_id) async def _assign_unit_to_agent( @@ -191,10 +223,12 @@ async def _assign_unit_to_agent( logger.debug(f"Worker {worker.db_id} is being assigned one of {len(units)} units.") + unit = None reserved_unit = None while len(units) > 0 and reserved_unit is None: unit = units.pop(0) reserved_unit = task_run.reserve_unit(unit) + if reserved_unit is None: AGENT_DETAILS_COUNT.labels(response="no_available_units").inc() live_run.client_io.enqueue_agent_details( @@ -271,6 +305,7 @@ async def _assign_unit_to_agent( # Mypy not-null cast non_null_agents = [a for a in agents if a is not None] + # Launch the backend for this assignment registered_agents = [ self.agents[a.get_agent_id()] for a in non_null_agents if a is not None @@ -309,6 +344,7 @@ async def register_agent_from_onboarding(self, onboarding_agent: "OnboardingAgen ) assert blueprint.onboarding_qualification_name is not None + worker.grant_qualification(blueprint.onboarding_qualification_name, int(worker_passed)) if not worker_passed: ONBOARDING_OUTCOMES.labels(outcome="failed").inc() @@ -325,6 +361,7 @@ async def register_agent_from_onboarding(self, onboarding_agent: "OnboardingAgen units = await loop.run_in_executor( None, partial(live_run.task_run.get_valid_units_for_worker, worker) ) + with EXTERNAL_FUNCTION_LATENCY.labels(function="filter_units_for_worker").time(): usable_units = await loop.run_in_executor( None, @@ -371,6 +408,7 @@ async def reconnect_agent(self, agent_id: str, request_id: str): loop = live_run.loop_wrap.loop task_runner = live_run.task_runner agent = self.get_agent_for_id(agent_id) + if agent is None: logger.info(f"Looking for reconnecting agent {agent_id} but none found locally") AGENT_DETAILS_COUNT.labels(response="agent_missing").inc() @@ -381,6 +419,7 @@ async def reconnect_agent(self, agent_id: str, request_id: str): ).to_dict(), ) return + worker = agent.get_worker() AGENT_DETAILS_COUNT.labels(response="reconnection").inc() if isinstance(agent, OnboardingAgent): @@ -400,7 +439,9 @@ async def reconnect_agent(self, agent_id: str, request_id: str): ) else: blueprint = live_run.blueprint + assert isinstance(blueprint, OnboardingRequired) and blueprint.use_onboarding + onboard_data = blueprint.get_onboarding_data(worker.db_id) live_run.client_io.enqueue_agent_details( request_id, @@ -420,6 +461,7 @@ async def reconnect_agent(self, agent_id: str, request_id: str): agent, ), ) + live_run.client_io.enqueue_agent_details( request_id, AgentDetails( @@ -449,11 +491,14 @@ async def _assign_unit_or_qa( screening_data = await loop.run_in_executor( None, blueprint.get_screening_unit_data ) + if screening_data is not None: launcher = live_run.task_launcher + assert ( launcher is not None ), "LiveTaskRun must have launcher to use screening tasks" + with EXTERNAL_FUNCTION_LATENCY.labels(function="launch_screening_unit").time(): screen_unit = await loop.run_in_executor( None, @@ -474,6 +519,7 @@ async def _assign_unit_or_qa( ) logger.debug(f"No screening units left for {agent_registration_id}.") return + # Check golds if isinstance(blueprint, UseGoldUnit) and blueprint.use_golds: if blueprint.should_produce_gold_for_worker(worker): @@ -483,6 +529,7 @@ async def _assign_unit_or_qa( gold_data = await loop.run_in_executor( None, partial(blueprint.get_gold_unit_data_for_worker, worker) ) + if gold_data is not None: launcher = live_run.task_launcher gold_unit = await loop.run_in_executor( @@ -532,16 +579,20 @@ async def register_agent(self, crowd_data: Dict[str, Any], worker: "Worker", req ) logger.debug(f"agent_registration_id {agent_registration_id}, had no valid units.") return + with EXTERNAL_FUNCTION_LATENCY.labels(function="filter_units_for_worker").time(): units = await loop.run_in_executor( None, partial(live_run.task_runner.filter_units_for_worker, units, worker), ) + # If there's onboarding, see if this worker has already been disqualified blueprint = live_run.blueprint if isinstance(blueprint, OnboardingRequired) and blueprint.use_onboarding: qual_name = blueprint.onboarding_qualification_name + assert qual_name is not None, "Cannot be using onboarding and have a null qual" + if worker.is_disqualified(qual_name): AGENT_DETAILS_COUNT.labels(response="not_qualified").inc() live_run.client_io.enqueue_agent_details( @@ -611,6 +662,7 @@ async def push_status_update(self, agent: Union["Agent", "OnboardingAgent"]) -> """ if self.is_shutdown: return # We don't push when shutdown + status = agent.db_status if isinstance(agent, OnboardingAgent): if status in [AgentState.STATUS_APPROVED, AgentState.STATUS_REJECTED]: @@ -634,22 +686,29 @@ def handle_updated_agent_status(self, status_map: Dict[str, str]): if status not in AgentState.valid(): logger.warning(f"Invalid status for agent {agent_id}: {status}") continue + if agent_id in self.final_onboardings: # no longer tracking this onboarding continue + agent = self.get_agent_for_id(agent_id) if agent is None: # no longer tracking agent continue + db_status = agent.get_status() if status != db_status: if status == AgentState.STATUS_COMPLETED: # Frontend agent completed but hasn't confirmed yet continue + if status != AgentState.STATUS_DISCONNECT: # Stale or reconnect, send a status update live_run.loop_wrap.execute_coro(self.push_status_update(agent)) - continue # Only DISCONNECT can be marked remotely, rest are mismatch (except STATUS_COMPLETED) + # Only DISCONNECT can be marked remotely, rest are mismatch + # (except STATUS_COMPLETED) + continue + agent.update_status(status) pass @@ -661,6 +720,7 @@ def disconnect_active_agents(self) -> None: for agent in self.agents.values(): if agent.get_status() not in AgentState.complete(): agent.update_status(AgentState.STATUS_DISCONNECT) + for onboarding_agent in self.onboarding_agents.values(): onboarding_agent.update_status(AgentState.STATUS_DISCONNECT) diff --git a/mephisto/utils/qualifications.py b/mephisto/utils/qualifications.py index 00669de87..40848dfe6 100644 --- a/mephisto/utils/qualifications.py +++ b/mephisto/utils/qualifications.py @@ -4,58 +4,92 @@ # This source code is licensed under the MIT license found in the # LICENSE file in the root directory of this source tree. -from typing import List, Optional, Dict, TYPE_CHECKING, Any -from mephisto.data_model.qualification import ( - QUAL_EXISTS, - QUAL_NOT_EXIST, - COMPARATOR_OPERATIONS, - SUPPORTED_COMPARATORS, - QUAL_GREATER, - QUAL_LESS, - QUAL_GREATER_EQUAL, - QUAL_LESS_EQUAL, - QUAL_IN_LIST, - QUAL_NOT_IN_LIST, -) +from typing import Any +from typing import Dict +from typing import List +from typing import Optional +from typing import TYPE_CHECKING + +from mephisto.data_model.qualification import COMPARATOR_OPERATIONS +from mephisto.data_model.qualification import QUAL_EXISTS +from mephisto.data_model.qualification import QUAL_GREATER +from mephisto.data_model.qualification import QUAL_GREATER_EQUAL +from mephisto.data_model.qualification import QUAL_IN_LIST +from mephisto.data_model.qualification import QUAL_LESS +from mephisto.data_model.qualification import QUAL_LESS_EQUAL +from mephisto.data_model.qualification import QUAL_NOT_EXIST +from mephisto.data_model.qualification import QUAL_NOT_IN_LIST +from mephisto.data_model.qualification import SUPPORTED_COMPARATORS +from mephisto.utils.logger_core import get_logger if TYPE_CHECKING: from mephisto.abstractions.database import MephistoDB + from mephisto.data_model.task_run import TaskRun from mephisto.data_model.worker import Worker -from mephisto.utils.logger_core import get_logger +logger = get_logger(name=__name__) QualificationType = Dict[str, Any] -logger = get_logger(name=__name__) -def worker_is_qualified(worker: "Worker", qualifications: List[QualificationType]): +def worker_is_qualified( + worker: "Worker", + shared_state_qualifications: List[QualificationType], + task_run: Optional["TaskRun"] = None, +): db = worker.db - for qualification in qualifications: - qual_name = qualification["qualification_name"] - qual_objs = db.find_qualifications(qual_name) - if not qual_objs: + + # 1. Check if provider has `admit_workers_with_no_prior_qualification` setting + provider_args = task_run.get_provider_args() + admit_with_no_prior_qualification = provider_args.get( + "admit_workers_with_no_prior_qualification" + ) + all_worker_granted_qualifications = db.find_granted_qualifications(worker_id=worker.db_id) + worker_has_granted_qualifications = len(all_worker_granted_qualifications) > 0 + task_run_has_qualifications = len(shared_state_qualifications) > 0 + if ( + admit_with_no_prior_qualification is True + and task_run_has_qualifications + and not worker_has_granted_qualifications + ): + # If TaskRun has quelifications and Worker has no granted qualifications, + # they should be considered as quailified + return True + + # 2. Check Worker's qualification + for shared_state_qualification in shared_state_qualifications: + qualification_name = shared_state_qualification["qualification_name"] + qualifications = db.find_qualifications(qualification_name) + + if not qualifications: logger.warning( - f"Expected to create qualification for {qual_name}, but none found... skipping." + f"Expected to create qualification for {qualification_name}, " + f"but none found... skipping." ) continue - qual_obj = qual_objs[0] - granted_quals = db.check_granted_qualifications( - qualification_id=qual_obj.db_id, worker_id=worker.db_id + + qualification = qualifications[0] + granted_qualifications = db.find_granted_qualifications( + qualification_id=qualification.db_id, + worker_id=worker.db_id, ) - comp = qualification["comparator"] - compare_value = qualification["value"] - if comp == QUAL_EXISTS and not granted_quals: + comparator = shared_state_qualification["comparator"] + compare_value = shared_state_qualification["value"] + + if comparator == QUAL_EXISTS and not granted_qualifications: return False - elif comp == QUAL_NOT_EXIST and granted_quals: + elif comparator == QUAL_NOT_EXIST and granted_qualifications: return False - elif comp in [QUAL_EXISTS, QUAL_NOT_EXIST]: + elif comparator in [QUAL_EXISTS, QUAL_NOT_EXIST]: continue else: - if not granted_quals: + if not granted_qualifications: return False - granted_qual = granted_quals[0] - if not COMPARATOR_OPERATIONS[comp](granted_qual.value, compare_value): + + granted_qualification = granted_qualifications[0] + if not COMPARATOR_OPERATIONS[comparator](granted_qualification.value, compare_value): return False + return True