Skip to content

Commit

Permalink
[Issue #2604] Incrementally pull updates from the search queue table (#…
Browse files Browse the repository at this point in the history
…2686)

Summary
Fixes #2604

Time to review: 60 mins

Changes proposed
Change incremental index process to load from queue table.
Handle edge cases for updates - updates will re-index the whole
opportunity

Context for reviewers
WIP for now as we add testing scenarios. Initial draft to review
implementation.

Additional information
See unit tests.
  • Loading branch information
mikehgrantsgov authored Nov 1, 2024
1 parent 06ece75 commit e15a545
Show file tree
Hide file tree
Showing 6 changed files with 215 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
"""Remove has_update column from opportunity_search_index_queue
Revision ID: 8b96ade6f6a2
Revises: a8ebde13a18a
Create Date: 2024-10-31 16:57:43.256710
"""

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "8b96ade6f6a2"
down_revision = "a8ebde13a18a"
branch_labels = None
depends_on = None


create_old_trigger_function = """
CREATE OR REPLACE FUNCTION update_opportunity_search_queue()
RETURNS TRIGGER AS $$
DECLARE
opp_id bigint;
BEGIN
-- Determine the opportunity_id based on the table
CASE TG_TABLE_NAME
WHEN 'link_opportunity_summary_funding_instrument' THEN
opp_id := (SELECT opportunity_id FROM api.opportunity_summary WHERE opportunity_summary_id = NEW.opportunity_summary_id);
WHEN 'link_opportunity_summary_funding_category' THEN
opp_id := (SELECT opportunity_id FROM api.opportunity_summary WHERE opportunity_summary_id = NEW.opportunity_summary_id);
WHEN 'link_opportunity_summary_applicant_type' THEN
opp_id := (SELECT opportunity_id FROM api.opportunity_summary WHERE opportunity_summary_id = NEW.opportunity_summary_id);
WHEN 'opportunity_summary' THEN
opp_id := NEW.opportunity_id;
WHEN 'current_opportunity_summary' THEN
opp_id := NEW.opportunity_id;
ELSE
opp_id := NEW.opportunity_id;
END CASE;
INSERT INTO api.opportunity_search_index_queue (opportunity_id, has_update)
VALUES (opp_id, TRUE)
ON CONFLICT (opportunity_id)
DO UPDATE SET has_update = TRUE, updated_at = CURRENT_TIMESTAMP;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
"""

create_trigger_function = """
CREATE OR REPLACE FUNCTION update_opportunity_search_queue()
RETURNS TRIGGER AS $$
DECLARE
opp_id bigint;
BEGIN
-- Determine the opportunity_id based on the table
CASE TG_TABLE_NAME
WHEN 'link_opportunity_summary_funding_instrument' THEN
opp_id := (SELECT opportunity_id FROM api.opportunity_summary WHERE opportunity_summary_id = NEW.opportunity_summary_id);
WHEN 'link_opportunity_summary_funding_category' THEN
opp_id := (SELECT opportunity_id FROM api.opportunity_summary WHERE opportunity_summary_id = NEW.opportunity_summary_id);
WHEN 'link_opportunity_summary_applicant_type' THEN
opp_id := (SELECT opportunity_id FROM api.opportunity_summary WHERE opportunity_summary_id = NEW.opportunity_summary_id);
WHEN 'opportunity_summary' THEN
opp_id := NEW.opportunity_id;
WHEN 'current_opportunity_summary' THEN
opp_id := NEW.opportunity_id;
ELSE
opp_id := NEW.opportunity_id;
END CASE;
INSERT INTO api.opportunity_search_index_queue (opportunity_id)
VALUES (opp_id)
ON CONFLICT (opportunity_id)
DO NOTHING;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
"""


def upgrade():
# Update the trigger function
op.execute(create_trigger_function)

op.drop_column("opportunity_search_index_queue", "has_update", schema="api")


def downgrade():
op.execute(create_old_trigger_function)

op.add_column(
"opportunity_search_index_queue",
sa.Column("has_update", sa.BOOLEAN(), autoincrement=False, nullable=False),
schema="api",
)
1 change: 0 additions & 1 deletion api/src/db/models/opportunity_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,4 +440,3 @@ class OpportunitySearchIndexQueue(ApiSchemaTable, TimestampMixin):
BigInteger, ForeignKey(Opportunity.opportunity_id), primary_key=True, index=True
)
opportunity: Mapped[Opportunity] = relationship(Opportunity)
has_update: Mapped[bool]
79 changes: 66 additions & 13 deletions api/src/search/backend/load_opportunities_to_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@

from pydantic import Field
from pydantic_settings import SettingsConfigDict
from sqlalchemy import select
from sqlalchemy import delete, select
from sqlalchemy.orm import noload, selectinload

import src.adapters.db as db
import src.adapters.search as search
from src.api.opportunities_v1.opportunity_schemas import OpportunityV1Schema
from src.db.models.opportunity_models import CurrentOpportunitySummary, Opportunity
from src.db.models.opportunity_models import (
CurrentOpportunitySummary,
Opportunity,
OpportunitySearchIndexQueue,
)
from src.task.task import Task
from src.util.datetime_util import get_now_us_eastern_datetime
from src.util.env_config import PydanticBaseEnvConfig
Expand Down Expand Up @@ -68,21 +72,70 @@ def run_task(self) -> None:
def incremental_updates_and_deletes(self) -> None:
existing_opportunity_ids = self.fetch_existing_opportunity_ids_in_index()

# load the records incrementally
# TODO - The point of this incremental load is to support upcoming work
# to load only opportunities that have changes as we'll eventually be indexing
# files which will take longer. However - the structure of the data isn't yet
# known so I want to hold on actually setting up any change-detection logic
loaded_opportunity_ids = set()
for opp_batch in self.fetch_opportunities():
loaded_opportunity_ids.update(self.load_records(opp_batch))
# Fetch opportunities that need processing from the queue
queued_opportunities = (
self.db_session.execute(
select(Opportunity)
.join(OpportunitySearchIndexQueue)
.join(CurrentOpportunitySummary)
.where(
Opportunity.is_draft.is_(False),
CurrentOpportunitySummary.opportunity_status.isnot(None),
)
.options(selectinload("*"), noload(Opportunity.all_opportunity_summaries))
)
.scalars()
.all()
)

# Process updates and inserts
processed_opportunity_ids = set()
opportunities_to_index = []

# Delete
opportunity_ids_to_delete = existing_opportunity_ids - loaded_opportunity_ids
for opportunity in queued_opportunities:
logger.info(
"Processing queued opportunity",
extra={
"opportunity_id": opportunity.opportunity_id,
"status": (
"update"
if opportunity.opportunity_id in existing_opportunity_ids
else "insert"
),
},
)

# Add to index batch if it's indexable
opportunities_to_index.append(opportunity)
processed_opportunity_ids.add(opportunity.opportunity_id)

# Bulk index the opportunities (handles both inserts and updates)
if opportunities_to_index:
loaded_ids = self.load_records(opportunities_to_index)
logger.info(f"Indexed {len(loaded_ids)} opportunities")

# Handle deletes - opportunities in search but not in our processed set
# and not in our database (or are drafts)
opportunity_ids_to_delete = existing_opportunity_ids - processed_opportunity_ids

if len(opportunity_ids_to_delete) > 0:
for opportunity_id in opportunity_ids_to_delete:
logger.info(
"Deleting opportunity from search",
extra={"opportunity_id": opportunity_id, "status": "delete"},
)

if opportunity_ids_to_delete:
self.search_client.bulk_delete(self.index_name, opportunity_ids_to_delete)

# Clear processed entries from the queue
if processed_opportunity_ids:
self.db_session.execute(
delete(OpportunitySearchIndexQueue).where(
OpportunitySearchIndexQueue.opportunity_id.in_(processed_opportunity_ids)
)
)
self.db_session.commit()

def full_refresh(self) -> None:
# create the index
self.search_client.create_index(
Expand Down
8 changes: 8 additions & 0 deletions api/tests/src/db/models/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -1812,3 +1812,11 @@ def create_tgroups_agency(
groups.append(tgroup)

return groups


class OpportunitySearchIndexQueueFactory(BaseFactory):
class Meta:
model = opportunity_models.OpportunitySearchIndexQueue

opportunity = factory.SubFactory(OpportunityFactory)
opportunity_id = factory.LazyAttribute(lambda s: s.opportunity.opportunity_id)
44 changes: 43 additions & 1 deletion api/tests/src/search/backend/test_load_opportunities_to_index.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import pytest

from src.db.models.opportunity_models import OpportunitySearchIndexQueue
from src.search.backend.load_opportunities_to_index import (
LoadOpportunitiesToIndex,
LoadOpportunitiesToIndexConfig,
)
from src.util.datetime_util import get_now_us_eastern_datetime
from tests.conftest import BaseTestClass
from tests.src.db.models.factories import OpportunityFactory
from tests.src.db.models.factories import OpportunityFactory, OpportunitySearchIndexQueueFactory


class TestLoadOpportunitiesToIndexFullRefresh(BaseTestClass):
Expand Down Expand Up @@ -41,6 +42,11 @@ def test_load_opportunities_to_index(
OpportunityFactory.create_batch(size=3, is_draft=True)
OpportunityFactory.create_batch(size=4, no_current_summary=True)

for opportunity in opportunities:
OpportunitySearchIndexQueueFactory.create(
opportunity=opportunity,
)

load_opportunities_to_index.run()
# Verify some metrics first
assert (
Expand Down Expand Up @@ -123,6 +129,11 @@ def test_load_opportunities_to_index(
OpportunityFactory.create_batch(size=6, is_archived_forecast_summary=True)
)

for opportunity in opportunities:
OpportunitySearchIndexQueueFactory.create(
opportunity=opportunity,
)

load_opportunities_to_index.run()

resp = search_client.search(opportunity_index_alias, {"size": 100})
Expand All @@ -136,6 +147,11 @@ def test_load_opportunities_to_index(
for opportunity in opportunities_to_delete:
db_session.delete(opportunity)

for opportunity in opportunities:
OpportunitySearchIndexQueueFactory.create(
opportunity=opportunity,
)

load_opportunities_to_index.run()

resp = search_client.search(opportunity_index_alias, {"size": 100})
Expand All @@ -151,3 +167,29 @@ def test_load_opportunities_to_index_index_does_not_exist(self, db_session, sear

with pytest.raises(RuntimeError, match="please run the full refresh job"):
load_opportunities_to_index.run()

def test_new_opportunity_gets_indexed(self, db_session, load_opportunities_to_index):
"""Test that a new opportunity in the queue gets indexed"""
test_opportunity = OpportunityFactory.create()

# Add to queue
OpportunitySearchIndexQueueFactory.create(opportunity=test_opportunity)

load_opportunities_to_index.run()

# Verify queue was cleared
remaining_queue = db_session.query(OpportunitySearchIndexQueue).all()
assert len(remaining_queue) == 0

def test_draft_opportunity_not_indexed(self, db_session, load_opportunities_to_index):
"""Test that draft opportunities are not indexed"""
test_opportunity = OpportunityFactory.create(is_draft=True)

# Add to queue
OpportunitySearchIndexQueueFactory.create(opportunity=test_opportunity)

load_opportunities_to_index.run()

# Verify queue was not cleared
remaining_queue = db_session.query(OpportunitySearchIndexQueue).all()
assert len(remaining_queue) == 1
Binary file modified documentation/api/database/erds/api-schema.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit e15a545

Please sign in to comment.