From eb45d02ebcb7a65528aeca81eee9e7d1c837e271 Mon Sep 17 00:00:00 2001 From: Michael Chouinard <46358556+chouinar@users.noreply.github.com> Date: Mon, 16 Dec 2024 10:05:23 -0500 Subject: [PATCH] [Issue #3148] Generate extracts of tables for opportunity data (#3153) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Summary Fixes #3148 ### Time to review: __10 mins__ ## Changes proposed Create a task we can run that will generate CSV extracts for a given set of tables ## Context for reviewers This dataset will be a sort of input to an analytics process being written. We'll pull these extract files and load them into the analytics database. This uses pretty routine `copy` commands from Postgres which is very very efficient at reading/writing from CSV files. https://www.postgresql.org/docs/current/sql-copy.html ## Additional information Running `make cmd args="task create-analytics-db-csvs"` locally to get the script going with ~35k opportunities in my DB takes about 2 seconds for these 5 tables: ![Screenshot 2024-12-09 at 4 48 06 PM](https://github.com/user-attachments/assets/36dc8df4-41fc-4db4-a715-e240404f2fa7) Example files generated: ### Opportunity ```csv opportunity_id,opportunity_number,opportunity_title,agency_code,opportunity_category_id,category_explanation,is_draft,revision_number,modified_comments,publisher_user_id,publisher_profile_id,created_at,updated_at "1","HHS-OPHS-02-685","Donaldson-Montgomery 2004 award","DOI-BIA","2",,"f","0",,,,"2024-12-09 19:47:53.327412+00","2024-12-09 19:47:53.327412+00" "2","THUS-824-49","Embassy program for Sport and exercise psychologist in Thailand","DOD-AMRAA","2",,"f","0",,,,"2024-12-09 19:47:53.385385+00","2024-12-09 19:47:53.385385+00" "3","LEAST-443-18","Embassy program for Multimedia specialist in Bangladesh","USDA-FAS","2",,"f","0",,,,"2024-12-09 19:47:53.423241+00","2024-12-09 19:47:53.423241+00" "4","HHS-IHS-07-253","Jones and Sons 1999 award","DOD-COE-FW","5","Include while thought.","f","0",,,,"2024-12-09 19:47:53.448716+00","2024-12-09 19:47:53.448716+00" "5","DOI-USGS1-54-707","Austin Boyd Foundation Grant for cultivate collaborative niches","DOE-NETL","5","Successful.","f","0",,,,"2024-12-09 19:47:53.473957+00","2024-12-09 19:47:53.473957+00" "6","DOC-EDA-40-090","Christine Garcia Foundation Grant for enable best-of-breed convergence","HHS-CDC-NCCDPHP","5","Degree family.","f","0",,,,"2024-12-09 19:47:53.510048+00","2024-12-09 19:47:53.510048+00" ``` ### Current Opportunity Summary ```csv opportunity_id,opportunity_summary_id,opportunity_status_id,created_at,updated_at "1","1","1","2024-12-09 19:47:53.373247+00","2024-12-09 19:47:53.373247+00" "2","2","1","2024-12-09 19:47:53.41556+00","2024-12-09 19:47:53.41556+00" "3","3","1","2024-12-09 19:47:53.441631+00","2024-12-09 19:47:53.441631+00" "4","4","1","2024-12-09 19:47:53.46677+00","2024-12-09 19:47:53.46677+00" "5","5","1","2024-12-09 19:47:53.489682+00","2024-12-09 19:47:53.489682+00" ``` ### Opportunity Summary ```csv opportunity_summary_id,opportunity_id,summary_description,is_cost_sharing,is_forecast,post_date,close_date,close_date_description,archive_date,unarchive_date,expected_number_of_awards,estimated_total_program_funding,award_floor,award_ceiling,additional_info_url,additional_info_url_description,forecasted_post_date,forecasted_close_date,forecasted_close_date_description,forecasted_award_date,forecasted_project_start_date,fiscal_year,revision_number,modification_comments,funding_category_description,applicant_eligibility_description,agency_code,agency_name,agency_phone_number,agency_contact_description,agency_email_address,agency_email_address_description,is_deleted,can_send_mail,publisher_profile_id,publisher_user_id,updated_by,created_by,created_at,updated_at,version_number "1","1","DOD-AFRL is looking to further investigate this topic. Machine blue door few market team run. Chance letter standard.","t","t","2024-12-05",,,"2025-01-02",,"2","7095000","3547500","7095000","sam.gov","Full Announcement","2024-12-29","2025-02-19","Speech defense finally.","2025-06-14","2025-07-21","2025",,,,,"DOI-BIA","Agency for Interior","123-456-0000","google.com Contact Center Hours of operation are 24 hours a day, 7 days a week. mark89@example.net","kimberlypeters@example.net","Contact Agency for Interior via email","f",,,,,,"2021-11-16 15:04:04.542119+00","2022-02-26 04:08:04.982404+00","12" "2","2","The purpose of this Notice of Funding Opportunity (NOFO) is to support research into Pensions consultant and how we might Implemented contextually-based application.","f","t","2024-12-04",,,"2025-01-05",,"5","7345000","1469000","7345000","grants.gov","Program Announcement","2024-12-24","2025-02-27","Public environmental outside couple each common baby.","2025-06-12","2025-11-09","2025",,,"Participant trip up.","New open ready likely possible. Audience set doctor. Before or degree must out turn why. Billion example know woman. Big wide city pass too wide. Participant edge chair budget.","DOD-AMRAA","Agency for Housing","123-456-0001","Webmaster allenbrewer@example.net","justinevans@example.com","Contact Agency for Housing via email","f",,,,,,"2021-09-18 07:41:17.520437+00","2023-02-01 08:42:17.682068+00","10" ``` ### Opportunity Category ```csv opportunity_category_id,description,created_at,updated_at "1","discretionary","2024-12-09 19:46:40.288036+00","2024-12-09 19:46:40.288036+00" "2","mandatory","2024-12-09 19:46:40.290119+00","2024-12-09 19:46:40.290119+00" "3","continuation","2024-12-09 19:46:40.292564+00","2024-12-09 19:46:40.292564+00" "4","earmark","2024-12-09 19:46:40.293851+00","2024-12-09 19:46:40.293851+00" "5","other","2024-12-09 19:46:40.295182+00","2024-12-09 19:46:40.295182+00" ``` ### Opportunity Status ```csv opportunity_status_id,description,created_at,updated_at "1","forecasted","2024-12-09 19:46:40.366307+00","2024-12-09 19:46:40.366307+00" "2","posted","2024-12-09 19:46:40.367838+00","2024-12-09 19:46:40.367838+00" "3","closed","2024-12-09 19:46:40.369048+00","2024-12-09 19:46:40.369048+00" "4","archived","2024-12-09 19:46:40.370318+00","2024-12-09 19:46:40.370318+00" ``` --- api/local.env | 3 + api/src/task/__init__.py | 1 + api/src/task/analytics/__init__.py | 1 + .../analytics/create_analytics_db_csvs.py | 114 ++++++++++++++++++ api/tests/src/task/analytics/__init__.py | 0 .../test_create_analytics_db_csvs.py | 84 +++++++++++++ 6 files changed, 203 insertions(+) create mode 100644 api/src/task/analytics/__init__.py create mode 100644 api/src/task/analytics/create_analytics_db_csvs.py create mode 100644 api/tests/src/task/analytics/__init__.py create mode 100644 api/tests/src/task/analytics/test_create_analytics_db_csvs.py diff --git a/api/local.env b/api/local.env index f22645cee..cddb3adf4 100644 --- a/api/local.env +++ b/api/local.env @@ -130,6 +130,9 @@ IS_LOCAL_FOREIGN_TABLE=true # File path for the export_opportunity_data task EXPORT_OPP_DATA_FILE_PATH=/tmp +# File path for the create-analytics-db-csvs task +ANALYTICS_DB_CSV_FILE_PATH=/tmp + ############################ # Deploy Metadata ############################ diff --git a/api/src/task/__init__.py b/api/src/task/__init__.py index 9dc10de50..686934142 100644 --- a/api/src/task/__init__.py +++ b/api/src/task/__init__.py @@ -3,5 +3,6 @@ # import any of the other files so they get initialized and attached to the blueprint import src.task.opportunities.set_current_opportunities_task # noqa: F401 E402 isort:skip import src.task.opportunities.export_opportunity_data_task # noqa: F401 E402 isort:skip +import src.task.analytics.create_analytics_db_csvs # noqa: F401 E402 isort:skip __all__ = ["task_blueprint"] diff --git a/api/src/task/analytics/__init__.py b/api/src/task/analytics/__init__.py new file mode 100644 index 000000000..ea8072159 --- /dev/null +++ b/api/src/task/analytics/__init__.py @@ -0,0 +1 @@ +"""Contains tasks that feed data into the analytics system""" diff --git a/api/src/task/analytics/create_analytics_db_csvs.py b/api/src/task/analytics/create_analytics_db_csvs.py new file mode 100644 index 000000000..cdeff8fbb --- /dev/null +++ b/api/src/task/analytics/create_analytics_db_csvs.py @@ -0,0 +1,114 @@ +import logging +import time +from enum import StrEnum + +import click +import sqlalchemy +from pydantic_settings import SettingsConfigDict + +import src.adapters.db as db +import src.adapters.db.flask_db as flask_db +from src.db.models import metadata as api_metadata +from src.task.task import Task +from src.task.task_blueprint import task_blueprint +from src.util import file_util +from src.util.env_config import PydanticBaseEnvConfig + +logger = logging.getLogger(__name__) + +TABLES_TO_EXTRACT = [ + "opportunity", + "opportunity_summary", + "current_opportunity_summary", + "lk_opportunity_category", + "lk_opportunity_status", +] + + +@task_blueprint.cli.command( + "create-analytics-db-csvs", + help="Create extract CSVs of our database tables that analytics can use", +) +@click.option("--tables-to-extract", "-t", help="Tables to extract to a CSV file", multiple=True) +@flask_db.with_db_session() +def create_analytics_db_csvs(db_session: db.Session, tables_to_extract: list[str]) -> None: + logger.info("Create extract CSV file start") + + CreateAnalyticsDbCsvsTask(db_session, tables_to_extract).run() + + logger.info("Create extract CSV file complete") + + +class CreateAnalyticsDbCsvsConfig(PydanticBaseEnvConfig): + model_config = SettingsConfigDict(env_prefix="ANALYTICS_DB_CSV_") + + # ANALYTICS_DB_CSV_FILE_PATH + file_path: str + + # Override the schema for where the tables exist, only needed + # for testing right now + db_schema: str | None = None + + +class CreateAnalyticsDbCsvsTask(Task): + + class Metrics(StrEnum): + TABLE_COUNT = "table_count" + ROW_COUNT = "row_count" + + def __init__( + self, + db_session: db.Session, + tables_to_extract: list[str] | None = None, + config: CreateAnalyticsDbCsvsConfig | None = None, + ) -> None: + super().__init__(db_session) + + if tables_to_extract is None or len(tables_to_extract) == 0: + tables_to_extract = TABLES_TO_EXTRACT + + # We only want to process tables that were configured + self.tables: list[sqlalchemy.Table] = [ + t for t in api_metadata.tables.values() if t.name in tables_to_extract + ] + + if config is None: + config = CreateAnalyticsDbCsvsConfig() + self.config = config + + def run_task(self) -> None: + for table in self.tables: + self.generate_csv(table) + + def generate_csv(self, table: sqlalchemy.Table) -> None: + """Generate the CSV file of a given table""" + output_path = file_util.join(self.config.file_path, f"{table.name}.csv") + log_extra = { + "table_name": table.name, + "output_path": output_path, + } + logger.info("Generating CSV extract for table", extra=log_extra) + + start_time = time.monotonic() + + cursor = self.db_session.connection().connection.cursor() + schema = table.schema if self.config.db_schema is None else self.config.db_schema + + with cursor.copy( + f"COPY {schema}.{table.name} TO STDOUT with (DELIMITER ',', FORMAT CSV, HEADER TRUE, FORCE_QUOTE *, encoding 'utf-8')" + ) as cursor_copy: + with file_util.open_stream(output_path, "wb") as outfile: + for data in cursor_copy: + outfile.write(data) + + row_count = cursor.rowcount + + duration = round(time.monotonic() - start_time, 3) + self.increment(self.Metrics.TABLE_COUNT) + self.set_metrics({f"{table.name}.time": duration}) + self.increment(self.Metrics.ROW_COUNT, row_count, prefix=table.name) + + logger.info( + "Generated CSV extract for table", + extra=log_extra | {"table_extract_duration_sec": duration, "row_count": row_count}, + ) diff --git a/api/tests/src/task/analytics/__init__.py b/api/tests/src/task/analytics/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/api/tests/src/task/analytics/test_create_analytics_db_csvs.py b/api/tests/src/task/analytics/test_create_analytics_db_csvs.py new file mode 100644 index 000000000..39ac12463 --- /dev/null +++ b/api/tests/src/task/analytics/test_create_analytics_db_csvs.py @@ -0,0 +1,84 @@ +import csv + +import pytest + +import src.util.file_util as file_util +from src.task.analytics.create_analytics_db_csvs import ( + CreateAnalyticsDbCsvsConfig, + CreateAnalyticsDbCsvsTask, +) +from tests.conftest import BaseTestClass +from tests.src.db.models.factories import OpportunityFactory + + +def validate_file(file_path: str, expected_record_count: int) -> dict: + with file_util.open_stream(file_path) as csvfile: + records = [record for record in csv.DictReader(csvfile)] + + assert len(records) == expected_record_count + + return records + + +class TestCreateAnalyticsDbCsvsTask(BaseTestClass): + + @pytest.fixture(scope="class") + def opportunities(self, truncate_opportunities, enable_factory_create): + # Create a variety of opportunities + opps = [] + opps.extend(OpportunityFactory.create_batch(size=5, is_posted_summary=True)) + opps.extend(OpportunityFactory.create_batch(size=3, is_forecasted_summary=True)) + opps.extend(OpportunityFactory.create_batch(size=4, is_closed_summary=True)) + opps.extend(OpportunityFactory.create_batch(size=2, is_archived_non_forecast_summary=True)) + opps.extend(OpportunityFactory.create_batch(size=4, is_archived_forecast_summary=True)) + opps.extend(OpportunityFactory.create_batch(size=2, is_draft=True)) + opps.extend(OpportunityFactory.create_batch(size=1, no_current_summary=True)) + return opps + + @pytest.fixture() + def task(self, db_session, mock_s3_bucket, test_api_schema): + config = CreateAnalyticsDbCsvsConfig( + file_path=f"s3://{mock_s3_bucket}/table-extracts", db_schema=test_api_schema + ) + return CreateAnalyticsDbCsvsTask(db_session, config=config) + + def test_create_analytics_db_csvs(self, db_session, task, opportunities): + task.run() + + # Validate the opportunity file + csv_opps = validate_file(task.config.file_path + "/opportunity.csv", len(opportunities)) + opportunity_ids = set([o.opportunity_id for o in opportunities]) + csv_opportunity_ids = set([int(record["opportunity_id"]) for record in csv_opps]) + assert opportunity_ids == csv_opportunity_ids + + # Validate the current opportunity file + current_opportunity_summaries = [ + o.current_opportunity_summary + for o in opportunities + if o.current_opportunity_summary is not None + ] + csv_current_summaries = validate_file( + task.config.file_path + "/current_opportunity_summary.csv", + len(current_opportunity_summaries), + ) + current_summary_ids = set( + [(o.opportunity_id, o.opportunity_summary_id) for o in current_opportunity_summaries] + ) + csv_current_summary_ids = set( + [ + (int(record["opportunity_id"]), int(record["opportunity_summary_id"])) + for record in csv_current_summaries + ] + ) + assert current_summary_ids == csv_current_summary_ids + + # Validate the opportunity summary file + opportunity_summaries = [o.opportunity_summary for o in current_opportunity_summaries] + csv_summaries = validate_file( + task.config.file_path + "/opportunity_summary.csv", len(opportunity_summaries) + ) + opportunity_summary_ids = set([o.opportunity_summary_id for o in opportunity_summaries]) + csv_opportunity_summary_ids = set( + [int(record["opportunity_summary_id"]) for record in csv_summaries] + ) + assert opportunity_summary_ids == csv_opportunity_summary_ids