Skip to content

Commit

Permalink
[Issue #3148] Generate extracts of tables for opportunity data (#3153)
Browse files Browse the repository at this point in the history
## Summary
Fixes #3148

### Time to review: __10 mins__

## Changes proposed
Create a task we can run that will generate CSV extracts for a given set
of tables

## Context for reviewers
This dataset will be a sort of input to an analytics process being
written. We'll pull these extract files and load them into the analytics
database.

This uses pretty routine `copy` commands from Postgres which is very
very efficient at reading/writing from CSV files.
https://www.postgresql.org/docs/current/sql-copy.html

## Additional information
Running `make cmd args="task create-analytics-db-csvs"` locally to get
the script going with ~35k opportunities in my DB takes about 2 seconds
for these 5 tables:
![Screenshot 2024-12-09 at 4 48
06 PM](https://github.com/user-attachments/assets/36dc8df4-41fc-4db4-a715-e240404f2fa7)

Example files generated:

### Opportunity
```csv
opportunity_id,opportunity_number,opportunity_title,agency_code,opportunity_category_id,category_explanation,is_draft,revision_number,modified_comments,publisher_user_id,publisher_profile_id,created_at,updated_at
"1","HHS-OPHS-02-685","Donaldson-Montgomery 2004 award","DOI-BIA","2",,"f","0",,,,"2024-12-09 19:47:53.327412+00","2024-12-09 19:47:53.327412+00"
"2","THUS-824-49","Embassy program for Sport and exercise psychologist in Thailand","DOD-AMRAA","2",,"f","0",,,,"2024-12-09 19:47:53.385385+00","2024-12-09 19:47:53.385385+00"
"3","LEAST-443-18","Embassy program for Multimedia specialist in Bangladesh","USDA-FAS","2",,"f","0",,,,"2024-12-09 19:47:53.423241+00","2024-12-09 19:47:53.423241+00"
"4","HHS-IHS-07-253","Jones and Sons 1999 award","DOD-COE-FW","5","Include while thought.","f","0",,,,"2024-12-09 19:47:53.448716+00","2024-12-09 19:47:53.448716+00"
"5","DOI-USGS1-54-707","Austin Boyd Foundation Grant for cultivate collaborative niches","DOE-NETL","5","Successful.","f","0",,,,"2024-12-09 19:47:53.473957+00","2024-12-09 19:47:53.473957+00"
"6","DOC-EDA-40-090","Christine Garcia Foundation Grant for enable best-of-breed convergence","HHS-CDC-NCCDPHP","5","Degree family.","f","0",,,,"2024-12-09 19:47:53.510048+00","2024-12-09 19:47:53.510048+00"
```

### Current Opportunity Summary
```csv
opportunity_id,opportunity_summary_id,opportunity_status_id,created_at,updated_at
"1","1","1","2024-12-09 19:47:53.373247+00","2024-12-09 19:47:53.373247+00"
"2","2","1","2024-12-09 19:47:53.41556+00","2024-12-09 19:47:53.41556+00"
"3","3","1","2024-12-09 19:47:53.441631+00","2024-12-09 19:47:53.441631+00"
"4","4","1","2024-12-09 19:47:53.46677+00","2024-12-09 19:47:53.46677+00"
"5","5","1","2024-12-09 19:47:53.489682+00","2024-12-09 19:47:53.489682+00"
```

### Opportunity Summary

```csv
opportunity_summary_id,opportunity_id,summary_description,is_cost_sharing,is_forecast,post_date,close_date,close_date_description,archive_date,unarchive_date,expected_number_of_awards,estimated_total_program_funding,award_floor,award_ceiling,additional_info_url,additional_info_url_description,forecasted_post_date,forecasted_close_date,forecasted_close_date_description,forecasted_award_date,forecasted_project_start_date,fiscal_year,revision_number,modification_comments,funding_category_description,applicant_eligibility_description,agency_code,agency_name,agency_phone_number,agency_contact_description,agency_email_address,agency_email_address_description,is_deleted,can_send_mail,publisher_profile_id,publisher_user_id,updated_by,created_by,created_at,updated_at,version_number
"1","1","DOD-AFRL is looking to further investigate this topic. Machine blue door few market team run. Chance letter standard.","t","t","2024-12-05",,,"2025-01-02",,"2","7095000","3547500","7095000","sam.gov","Full Announcement","2024-12-29","2025-02-19","Speech defense finally.","2025-06-14","2025-07-21","2025",,,,,"DOI-BIA","Agency for Interior","123-456-0000","google.com Contact Center
Hours of operation are 24 hours a day, 7 days a week.
[email protected]","[email protected]","Contact Agency for Interior via email","f",,,,,,"2021-11-16 15:04:04.542119+00","2022-02-26 04:08:04.982404+00","12"
"2","2","The purpose of this Notice of Funding Opportunity (NOFO) is to support research into Pensions consultant and how we might Implemented contextually-based application.","f","t","2024-12-04",,,"2025-01-05",,"5","7345000","1469000","7345000","grants.gov","Program Announcement","2024-12-24","2025-02-27","Public environmental outside couple each common baby.","2025-06-12","2025-11-09","2025",,,"Participant trip up.","New open ready likely possible. Audience set doctor. Before or degree must out turn why. Billion example know woman. Big wide city pass too wide. Participant edge chair budget.","DOD-AMRAA","Agency for Housing","123-456-0001","Webmaster
[email protected]","[email protected]","Contact Agency for Housing via email","f",,,,,,"2021-09-18 07:41:17.520437+00","2023-02-01 08:42:17.682068+00","10"
```

### Opportunity Category
```csv
opportunity_category_id,description,created_at,updated_at
"1","discretionary","2024-12-09 19:46:40.288036+00","2024-12-09 19:46:40.288036+00"
"2","mandatory","2024-12-09 19:46:40.290119+00","2024-12-09 19:46:40.290119+00"
"3","continuation","2024-12-09 19:46:40.292564+00","2024-12-09 19:46:40.292564+00"
"4","earmark","2024-12-09 19:46:40.293851+00","2024-12-09 19:46:40.293851+00"
"5","other","2024-12-09 19:46:40.295182+00","2024-12-09 19:46:40.295182+00"
```

### Opportunity Status
```csv
opportunity_status_id,description,created_at,updated_at
"1","forecasted","2024-12-09 19:46:40.366307+00","2024-12-09 19:46:40.366307+00"
"2","posted","2024-12-09 19:46:40.367838+00","2024-12-09 19:46:40.367838+00"
"3","closed","2024-12-09 19:46:40.369048+00","2024-12-09 19:46:40.369048+00"
"4","archived","2024-12-09 19:46:40.370318+00","2024-12-09 19:46:40.370318+00"
```
  • Loading branch information
chouinar authored Dec 16, 2024
1 parent b05225b commit eb45d02
Show file tree
Hide file tree
Showing 6 changed files with 203 additions and 0 deletions.
3 changes: 3 additions & 0 deletions api/local.env
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ IS_LOCAL_FOREIGN_TABLE=true
# File path for the export_opportunity_data task
EXPORT_OPP_DATA_FILE_PATH=/tmp

# File path for the create-analytics-db-csvs task
ANALYTICS_DB_CSV_FILE_PATH=/tmp

############################
# Deploy Metadata
############################
Expand Down
1 change: 1 addition & 0 deletions api/src/task/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@
# import any of the other files so they get initialized and attached to the blueprint
import src.task.opportunities.set_current_opportunities_task # noqa: F401 E402 isort:skip
import src.task.opportunities.export_opportunity_data_task # noqa: F401 E402 isort:skip
import src.task.analytics.create_analytics_db_csvs # noqa: F401 E402 isort:skip

__all__ = ["task_blueprint"]
1 change: 1 addition & 0 deletions api/src/task/analytics/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Contains tasks that feed data into the analytics system"""
114 changes: 114 additions & 0 deletions api/src/task/analytics/create_analytics_db_csvs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import logging
import time
from enum import StrEnum

import click
import sqlalchemy
from pydantic_settings import SettingsConfigDict

import src.adapters.db as db
import src.adapters.db.flask_db as flask_db
from src.db.models import metadata as api_metadata
from src.task.task import Task
from src.task.task_blueprint import task_blueprint
from src.util import file_util
from src.util.env_config import PydanticBaseEnvConfig

logger = logging.getLogger(__name__)

TABLES_TO_EXTRACT = [
"opportunity",
"opportunity_summary",
"current_opportunity_summary",
"lk_opportunity_category",
"lk_opportunity_status",
]


@task_blueprint.cli.command(
"create-analytics-db-csvs",
help="Create extract CSVs of our database tables that analytics can use",
)
@click.option("--tables-to-extract", "-t", help="Tables to extract to a CSV file", multiple=True)
@flask_db.with_db_session()
def create_analytics_db_csvs(db_session: db.Session, tables_to_extract: list[str]) -> None:
logger.info("Create extract CSV file start")

CreateAnalyticsDbCsvsTask(db_session, tables_to_extract).run()

logger.info("Create extract CSV file complete")


class CreateAnalyticsDbCsvsConfig(PydanticBaseEnvConfig):
model_config = SettingsConfigDict(env_prefix="ANALYTICS_DB_CSV_")

# ANALYTICS_DB_CSV_FILE_PATH
file_path: str

# Override the schema for where the tables exist, only needed
# for testing right now
db_schema: str | None = None


class CreateAnalyticsDbCsvsTask(Task):

class Metrics(StrEnum):
TABLE_COUNT = "table_count"
ROW_COUNT = "row_count"

def __init__(
self,
db_session: db.Session,
tables_to_extract: list[str] | None = None,
config: CreateAnalyticsDbCsvsConfig | None = None,
) -> None:
super().__init__(db_session)

if tables_to_extract is None or len(tables_to_extract) == 0:
tables_to_extract = TABLES_TO_EXTRACT

# We only want to process tables that were configured
self.tables: list[sqlalchemy.Table] = [
t for t in api_metadata.tables.values() if t.name in tables_to_extract
]

if config is None:
config = CreateAnalyticsDbCsvsConfig()
self.config = config

def run_task(self) -> None:
for table in self.tables:
self.generate_csv(table)

def generate_csv(self, table: sqlalchemy.Table) -> None:
"""Generate the CSV file of a given table"""
output_path = file_util.join(self.config.file_path, f"{table.name}.csv")
log_extra = {
"table_name": table.name,
"output_path": output_path,
}
logger.info("Generating CSV extract for table", extra=log_extra)

start_time = time.monotonic()

cursor = self.db_session.connection().connection.cursor()
schema = table.schema if self.config.db_schema is None else self.config.db_schema

with cursor.copy(
f"COPY {schema}.{table.name} TO STDOUT with (DELIMITER ',', FORMAT CSV, HEADER TRUE, FORCE_QUOTE *, encoding 'utf-8')"
) as cursor_copy:
with file_util.open_stream(output_path, "wb") as outfile:
for data in cursor_copy:
outfile.write(data)

row_count = cursor.rowcount

duration = round(time.monotonic() - start_time, 3)
self.increment(self.Metrics.TABLE_COUNT)
self.set_metrics({f"{table.name}.time": duration})
self.increment(self.Metrics.ROW_COUNT, row_count, prefix=table.name)

logger.info(
"Generated CSV extract for table",
extra=log_extra | {"table_extract_duration_sec": duration, "row_count": row_count},
)
Empty file.
84 changes: 84 additions & 0 deletions api/tests/src/task/analytics/test_create_analytics_db_csvs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import csv

import pytest

import src.util.file_util as file_util
from src.task.analytics.create_analytics_db_csvs import (
CreateAnalyticsDbCsvsConfig,
CreateAnalyticsDbCsvsTask,
)
from tests.conftest import BaseTestClass
from tests.src.db.models.factories import OpportunityFactory


def validate_file(file_path: str, expected_record_count: int) -> dict:
with file_util.open_stream(file_path) as csvfile:
records = [record for record in csv.DictReader(csvfile)]

assert len(records) == expected_record_count

return records


class TestCreateAnalyticsDbCsvsTask(BaseTestClass):

@pytest.fixture(scope="class")
def opportunities(self, truncate_opportunities, enable_factory_create):
# Create a variety of opportunities
opps = []
opps.extend(OpportunityFactory.create_batch(size=5, is_posted_summary=True))
opps.extend(OpportunityFactory.create_batch(size=3, is_forecasted_summary=True))
opps.extend(OpportunityFactory.create_batch(size=4, is_closed_summary=True))
opps.extend(OpportunityFactory.create_batch(size=2, is_archived_non_forecast_summary=True))
opps.extend(OpportunityFactory.create_batch(size=4, is_archived_forecast_summary=True))
opps.extend(OpportunityFactory.create_batch(size=2, is_draft=True))
opps.extend(OpportunityFactory.create_batch(size=1, no_current_summary=True))
return opps

@pytest.fixture()
def task(self, db_session, mock_s3_bucket, test_api_schema):
config = CreateAnalyticsDbCsvsConfig(
file_path=f"s3://{mock_s3_bucket}/table-extracts", db_schema=test_api_schema
)
return CreateAnalyticsDbCsvsTask(db_session, config=config)

def test_create_analytics_db_csvs(self, db_session, task, opportunities):
task.run()

# Validate the opportunity file
csv_opps = validate_file(task.config.file_path + "/opportunity.csv", len(opportunities))
opportunity_ids = set([o.opportunity_id for o in opportunities])
csv_opportunity_ids = set([int(record["opportunity_id"]) for record in csv_opps])
assert opportunity_ids == csv_opportunity_ids

# Validate the current opportunity file
current_opportunity_summaries = [
o.current_opportunity_summary
for o in opportunities
if o.current_opportunity_summary is not None
]
csv_current_summaries = validate_file(
task.config.file_path + "/current_opportunity_summary.csv",
len(current_opportunity_summaries),
)
current_summary_ids = set(
[(o.opportunity_id, o.opportunity_summary_id) for o in current_opportunity_summaries]
)
csv_current_summary_ids = set(
[
(int(record["opportunity_id"]), int(record["opportunity_summary_id"]))
for record in csv_current_summaries
]
)
assert current_summary_ids == csv_current_summary_ids

# Validate the opportunity summary file
opportunity_summaries = [o.opportunity_summary for o in current_opportunity_summaries]
csv_summaries = validate_file(
task.config.file_path + "/opportunity_summary.csv", len(opportunity_summaries)
)
opportunity_summary_ids = set([o.opportunity_summary_id for o in opportunity_summaries])
csv_opportunity_summary_ids = set(
[int(record["opportunity_summary_id"]) for record in csv_summaries]
)
assert opportunity_summary_ids == csv_opportunity_summary_ids

0 comments on commit eb45d02

Please sign in to comment.