diff --git a/api/src/db/migrations/versions/2024_10_31_remove_has_update_column_from_.py b/api/src/db/migrations/versions/2024_10_31_remove_has_update_column_from_.py new file mode 100644 index 000000000..e278a45df --- /dev/null +++ b/api/src/db/migrations/versions/2024_10_31_remove_has_update_column_from_.py @@ -0,0 +1,98 @@ +"""Remove has_update column from opportunity_search_index_queue + +Revision ID: 8b96ade6f6a2 +Revises: a8ebde13a18a +Create Date: 2024-10-31 16:57:43.256710 + +""" + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "8b96ade6f6a2" +down_revision = "a8ebde13a18a" +branch_labels = None +depends_on = None + + +create_old_trigger_function = """ +CREATE OR REPLACE FUNCTION update_opportunity_search_queue() +RETURNS TRIGGER AS $$ +DECLARE + opp_id bigint; +BEGIN + -- Determine the opportunity_id based on the table + CASE TG_TABLE_NAME + WHEN 'link_opportunity_summary_funding_instrument' THEN + opp_id := (SELECT opportunity_id FROM api.opportunity_summary WHERE opportunity_summary_id = NEW.opportunity_summary_id); + WHEN 'link_opportunity_summary_funding_category' THEN + opp_id := (SELECT opportunity_id FROM api.opportunity_summary WHERE opportunity_summary_id = NEW.opportunity_summary_id); + WHEN 'link_opportunity_summary_applicant_type' THEN + opp_id := (SELECT opportunity_id FROM api.opportunity_summary WHERE opportunity_summary_id = NEW.opportunity_summary_id); + WHEN 'opportunity_summary' THEN + opp_id := NEW.opportunity_id; + WHEN 'current_opportunity_summary' THEN + opp_id := NEW.opportunity_id; + ELSE + opp_id := NEW.opportunity_id; + END CASE; + + INSERT INTO api.opportunity_search_index_queue (opportunity_id, has_update) + VALUES (opp_id, TRUE) + ON CONFLICT (opportunity_id) + DO UPDATE SET has_update = TRUE, updated_at = CURRENT_TIMESTAMP; + + RETURN NEW; +END; +$$ LANGUAGE plpgsql; +""" + +create_trigger_function = """ +CREATE OR REPLACE FUNCTION update_opportunity_search_queue() +RETURNS TRIGGER AS $$ +DECLARE + opp_id bigint; +BEGIN + -- Determine the opportunity_id based on the table + CASE TG_TABLE_NAME + WHEN 'link_opportunity_summary_funding_instrument' THEN + opp_id := (SELECT opportunity_id FROM api.opportunity_summary WHERE opportunity_summary_id = NEW.opportunity_summary_id); + WHEN 'link_opportunity_summary_funding_category' THEN + opp_id := (SELECT opportunity_id FROM api.opportunity_summary WHERE opportunity_summary_id = NEW.opportunity_summary_id); + WHEN 'link_opportunity_summary_applicant_type' THEN + opp_id := (SELECT opportunity_id FROM api.opportunity_summary WHERE opportunity_summary_id = NEW.opportunity_summary_id); + WHEN 'opportunity_summary' THEN + opp_id := NEW.opportunity_id; + WHEN 'current_opportunity_summary' THEN + opp_id := NEW.opportunity_id; + ELSE + opp_id := NEW.opportunity_id; + END CASE; + + INSERT INTO api.opportunity_search_index_queue (opportunity_id) + VALUES (opp_id) + ON CONFLICT (opportunity_id) + DO NOTHING; + + RETURN NEW; +END; +$$ LANGUAGE plpgsql; +""" + + +def upgrade(): + # Update the trigger function + op.execute(create_trigger_function) + + op.drop_column("opportunity_search_index_queue", "has_update", schema="api") + + +def downgrade(): + op.execute(create_old_trigger_function) + + op.add_column( + "opportunity_search_index_queue", + sa.Column("has_update", sa.BOOLEAN(), autoincrement=False, nullable=False), + schema="api", + ) diff --git a/api/src/db/models/opportunity_models.py b/api/src/db/models/opportunity_models.py index f6c5868e9..194198d4c 100644 --- a/api/src/db/models/opportunity_models.py +++ b/api/src/db/models/opportunity_models.py @@ -440,4 +440,3 @@ class OpportunitySearchIndexQueue(ApiSchemaTable, TimestampMixin): BigInteger, ForeignKey(Opportunity.opportunity_id), primary_key=True, index=True ) opportunity: Mapped[Opportunity] = relationship(Opportunity) - has_update: Mapped[bool] diff --git a/api/src/search/backend/load_opportunities_to_index.py b/api/src/search/backend/load_opportunities_to_index.py index 61deb895f..2a01583b6 100644 --- a/api/src/search/backend/load_opportunities_to_index.py +++ b/api/src/search/backend/load_opportunities_to_index.py @@ -4,13 +4,17 @@ from pydantic import Field from pydantic_settings import SettingsConfigDict -from sqlalchemy import select +from sqlalchemy import delete, select from sqlalchemy.orm import noload, selectinload import src.adapters.db as db import src.adapters.search as search from src.api.opportunities_v1.opportunity_schemas import OpportunityV1Schema -from src.db.models.opportunity_models import CurrentOpportunitySummary, Opportunity +from src.db.models.opportunity_models import ( + CurrentOpportunitySummary, + Opportunity, + OpportunitySearchIndexQueue, +) from src.task.task import Task from src.util.datetime_util import get_now_us_eastern_datetime from src.util.env_config import PydanticBaseEnvConfig @@ -68,21 +72,70 @@ def run_task(self) -> None: def incremental_updates_and_deletes(self) -> None: existing_opportunity_ids = self.fetch_existing_opportunity_ids_in_index() - # load the records incrementally - # TODO - The point of this incremental load is to support upcoming work - # to load only opportunities that have changes as we'll eventually be indexing - # files which will take longer. However - the structure of the data isn't yet - # known so I want to hold on actually setting up any change-detection logic - loaded_opportunity_ids = set() - for opp_batch in self.fetch_opportunities(): - loaded_opportunity_ids.update(self.load_records(opp_batch)) + # Fetch opportunities that need processing from the queue + queued_opportunities = ( + self.db_session.execute( + select(Opportunity) + .join(OpportunitySearchIndexQueue) + .join(CurrentOpportunitySummary) + .where( + Opportunity.is_draft.is_(False), + CurrentOpportunitySummary.opportunity_status.isnot(None), + ) + .options(selectinload("*"), noload(Opportunity.all_opportunity_summaries)) + ) + .scalars() + .all() + ) + + # Process updates and inserts + processed_opportunity_ids = set() + opportunities_to_index = [] - # Delete - opportunity_ids_to_delete = existing_opportunity_ids - loaded_opportunity_ids + for opportunity in queued_opportunities: + logger.info( + "Processing queued opportunity", + extra={ + "opportunity_id": opportunity.opportunity_id, + "status": ( + "update" + if opportunity.opportunity_id in existing_opportunity_ids + else "insert" + ), + }, + ) + + # Add to index batch if it's indexable + opportunities_to_index.append(opportunity) + processed_opportunity_ids.add(opportunity.opportunity_id) + + # Bulk index the opportunities (handles both inserts and updates) + if opportunities_to_index: + loaded_ids = self.load_records(opportunities_to_index) + logger.info(f"Indexed {len(loaded_ids)} opportunities") + + # Handle deletes - opportunities in search but not in our processed set + # and not in our database (or are drafts) + opportunity_ids_to_delete = existing_opportunity_ids - processed_opportunity_ids - if len(opportunity_ids_to_delete) > 0: + for opportunity_id in opportunity_ids_to_delete: + logger.info( + "Deleting opportunity from search", + extra={"opportunity_id": opportunity_id, "status": "delete"}, + ) + + if opportunity_ids_to_delete: self.search_client.bulk_delete(self.index_name, opportunity_ids_to_delete) + # Clear processed entries from the queue + if processed_opportunity_ids: + self.db_session.execute( + delete(OpportunitySearchIndexQueue).where( + OpportunitySearchIndexQueue.opportunity_id.in_(processed_opportunity_ids) + ) + ) + self.db_session.commit() + def full_refresh(self) -> None: # create the index self.search_client.create_index( diff --git a/api/tests/src/db/models/factories.py b/api/tests/src/db/models/factories.py index b0fed7d11..a4b17411a 100644 --- a/api/tests/src/db/models/factories.py +++ b/api/tests/src/db/models/factories.py @@ -1812,3 +1812,11 @@ def create_tgroups_agency( groups.append(tgroup) return groups + + +class OpportunitySearchIndexQueueFactory(BaseFactory): + class Meta: + model = opportunity_models.OpportunitySearchIndexQueue + + opportunity = factory.SubFactory(OpportunityFactory) + opportunity_id = factory.LazyAttribute(lambda s: s.opportunity.opportunity_id) diff --git a/api/tests/src/search/backend/test_load_opportunities_to_index.py b/api/tests/src/search/backend/test_load_opportunities_to_index.py index 9a3961f2b..5598c9d10 100644 --- a/api/tests/src/search/backend/test_load_opportunities_to_index.py +++ b/api/tests/src/search/backend/test_load_opportunities_to_index.py @@ -1,12 +1,13 @@ import pytest +from src.db.models.opportunity_models import OpportunitySearchIndexQueue from src.search.backend.load_opportunities_to_index import ( LoadOpportunitiesToIndex, LoadOpportunitiesToIndexConfig, ) from src.util.datetime_util import get_now_us_eastern_datetime from tests.conftest import BaseTestClass -from tests.src.db.models.factories import OpportunityFactory +from tests.src.db.models.factories import OpportunityFactory, OpportunitySearchIndexQueueFactory class TestLoadOpportunitiesToIndexFullRefresh(BaseTestClass): @@ -41,6 +42,11 @@ def test_load_opportunities_to_index( OpportunityFactory.create_batch(size=3, is_draft=True) OpportunityFactory.create_batch(size=4, no_current_summary=True) + for opportunity in opportunities: + OpportunitySearchIndexQueueFactory.create( + opportunity=opportunity, + ) + load_opportunities_to_index.run() # Verify some metrics first assert ( @@ -123,6 +129,11 @@ def test_load_opportunities_to_index( OpportunityFactory.create_batch(size=6, is_archived_forecast_summary=True) ) + for opportunity in opportunities: + OpportunitySearchIndexQueueFactory.create( + opportunity=opportunity, + ) + load_opportunities_to_index.run() resp = search_client.search(opportunity_index_alias, {"size": 100}) @@ -136,6 +147,11 @@ def test_load_opportunities_to_index( for opportunity in opportunities_to_delete: db_session.delete(opportunity) + for opportunity in opportunities: + OpportunitySearchIndexQueueFactory.create( + opportunity=opportunity, + ) + load_opportunities_to_index.run() resp = search_client.search(opportunity_index_alias, {"size": 100}) @@ -151,3 +167,29 @@ def test_load_opportunities_to_index_index_does_not_exist(self, db_session, sear with pytest.raises(RuntimeError, match="please run the full refresh job"): load_opportunities_to_index.run() + + def test_new_opportunity_gets_indexed(self, db_session, load_opportunities_to_index): + """Test that a new opportunity in the queue gets indexed""" + test_opportunity = OpportunityFactory.create() + + # Add to queue + OpportunitySearchIndexQueueFactory.create(opportunity=test_opportunity) + + load_opportunities_to_index.run() + + # Verify queue was cleared + remaining_queue = db_session.query(OpportunitySearchIndexQueue).all() + assert len(remaining_queue) == 0 + + def test_draft_opportunity_not_indexed(self, db_session, load_opportunities_to_index): + """Test that draft opportunities are not indexed""" + test_opportunity = OpportunityFactory.create(is_draft=True) + + # Add to queue + OpportunitySearchIndexQueueFactory.create(opportunity=test_opportunity) + + load_opportunities_to_index.run() + + # Verify queue was not cleared + remaining_queue = db_session.query(OpportunitySearchIndexQueue).all() + assert len(remaining_queue) == 1 diff --git a/documentation/api/database/erds/api-schema.png b/documentation/api/database/erds/api-schema.png index 79844abc5..4922ea885 100644 Binary files a/documentation/api/database/erds/api-schema.png and b/documentation/api/database/erds/api-schema.png differ