diff --git a/recidiviz/ingest/direct/views/view_config.py b/recidiviz/ingest/direct/views/view_config.py index a2fabc95b8..a717956a1c 100644 --- a/recidiviz/ingest/direct/views/view_config.py +++ b/recidiviz/ingest/direct/views/view_config.py @@ -38,9 +38,10 @@ def get_direct_ingest_view_builders() -> Sequence[BigQueryViewBuilder]: # table in all regions DirectIngestRawDataTableLatestViewCollector( region_code=state_code.value.lower(), - raw_data_source_instance=instance, + # We only deploy latest views for PRIMARY - views for SECONDARY can be + # loaded via a sandbox. + raw_data_source_instance=DirectIngestInstance.PRIMARY, ).collect_view_builders() - for instance in DirectIngestInstance for state_code in get_existing_direct_ingest_states() ) ) diff --git a/recidiviz/tests/tools/load_end_to_end_sandbox_test.py b/recidiviz/tests/tools/load_end_to_end_sandbox_test.py index b83bbee114..93115c4762 100644 --- a/recidiviz/tests/tools/load_end_to_end_sandbox_test.py +++ b/recidiviz/tests/tools/load_end_to_end_sandbox_test.py @@ -273,6 +273,7 @@ def test_no_post_ingest_pipelines(self) -> None: with local_project_id_override("recidiviz-456"): overrides_dict = get_view_update_input_dataset_overrides_dict( + state_code=StateCode.US_CA, ingest_pipeline_params=ingest_pipeline_params, post_ingest_pipeline_params=[], ) @@ -334,6 +335,7 @@ def test_complex(self) -> None: with local_project_id_override("recidiviz-456"): overrides_dict = get_view_update_input_dataset_overrides_dict( + state_code=StateCode.US_CA, ingest_pipeline_params=ingest_pipeline_params, post_ingest_pipeline_params=post_ingest_pipeline_params, ) diff --git a/recidiviz/tools/load_end_to_end_data_sandbox.py b/recidiviz/tools/load_end_to_end_data_sandbox.py index 80c52cab9e..252d951635 100644 --- a/recidiviz/tools/load_end_to_end_data_sandbox.py +++ b/recidiviz/tools/load_end_to_end_data_sandbox.py @@ -30,6 +30,14 @@ --state_code US_XX \ --sandbox_prefix my_prefix \ --load_up_to_addresses aggregated_metrics.incarceration_state_aggregated_metrics,aggregated_metrics.supervision_state_aggregated_metrics + +# Read only from raw data in `us_xx_raw_data_secondary` (e.g. to test a raw data +# reimport in SECONDARY). +python -m recidiviz.tools.load_end_to_end_data_sandbox \ + --state_code US_XX \ + --sandbox_prefix my_prefix \ + --raw_data_source_instance SECONDARY \ + --load_up_to_datasets sessions """ import argparse import json @@ -40,6 +48,7 @@ from recidiviz.big_query.address_overrides import BigQueryAddressOverrides from recidiviz.big_query.big_query_address import BigQueryAddress from recidiviz.common.constants.states import StateCode +from recidiviz.ingest.direct.dataset_config import raw_tables_dataset_for_region from recidiviz.ingest.direct.types.direct_ingest_instance import DirectIngestInstance from recidiviz.metrics.export.export_config import ExportViewCollectionConfig from recidiviz.pipelines.config_paths import PIPELINE_CONFIG_YAML_PATH @@ -217,6 +226,7 @@ def get_sandbox_post_ingest_pipeline_params( def get_view_update_input_dataset_overrides_dict( + state_code: StateCode, ingest_pipeline_params: IngestPipelineParameters, post_ingest_pipeline_params: list[ MetricsPipelineParameters | SupplementalPipelineParameters @@ -226,6 +236,15 @@ def get_view_update_input_dataset_overrides_dict( when loading sandbox views that read from the given sandbox pipelines. """ input_dataset_overrides_dict: dict[str, str] = {} + + ingest_input_instance = DirectIngestInstance( + ingest_pipeline_params.raw_data_source_instance + ) + if ingest_input_instance != DirectIngestInstance.PRIMARY: + input_dataset_overrides_dict[ + raw_tables_dataset_for_region(state_code, DirectIngestInstance.PRIMARY) + ] = raw_tables_dataset_for_region(state_code, ingest_input_instance) + for params in [ingest_pipeline_params, *post_ingest_pipeline_params]: output_dataset_overrides = assert_type( params.output_dataset_overrides, BigQueryAddressOverrides @@ -277,6 +296,8 @@ def _get_params_summary(params_list: list[PipelineParameters]) -> str: } if isinstance(params, MetricsPipelineParameters): metadata["metric_types"] = params.metric_types + elif isinstance(params, IngestPipelineParameters): + metadata["raw_data_source_instance"] = params.raw_data_source_instance table_data.append([params.job_name, json.dumps(metadata, indent=2)]) @@ -328,7 +349,7 @@ def load_end_to_end_sandbox( print("\nCollecting views to load after pipelines run...\n") view_update_input_dataset_overrides_dict = ( get_view_update_input_dataset_overrides_dict( - ingest_pipeline_params, post_ingest_pipeline_params + state_code, ingest_pipeline_params, post_ingest_pipeline_params ) ) view_builders_to_load = collect_changed_views_and_descendants_to_load(