Skip to content

Commit

Permalink
updated main flow
Browse files Browse the repository at this point in the history
  • Loading branch information
JimVincentW committed Dec 31, 2024
1 parent 96aabc8 commit 06705e8
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 15 deletions.
1 change: 1 addition & 0 deletions opol/stack/compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ services:
worker-prefect:
image: openpoliticsproject/worker-base:latest
command: ["prefect", "worker", "start", "--pool", "docker-pool"]
restart: always
env_file:
- .env
build:
Expand Down
24 changes: 9 additions & 15 deletions opol/stack/flows/orchestration/orchestration.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,6 @@ async def create_jobs_flow():
try:
# Schedule all job creations
await produce_flags()
await create_scrape_jobs()
await create_embedding_jobs()
await create_entity_extraction_jobs()
await create_geocoding_jobs()
Expand All @@ -223,21 +222,17 @@ async def create_jobs_flow():
logger.error(f"Error in create_jobs_flow: {e}")
raise e

@flow(name="trigger-processing-flow")
async def trigger_processing_flow():
await create_jobs_flow()
await geocode_contents()
await extract_entities()
await classify_contents()


@flow(name="save-all-flows")
async def save_all_flows():
await save_scraped_contents()
await save_contents_with_embeddings_flow()
await save_contents_with_entities_flow()
await save_geocoded_contents_flow()
await save_contents_with_classification_flow()
try:
await save_scraped_contents()
await save_contents_with_embeddings_flow()
await save_contents_with_entities_flow()
await save_geocoded_contents_flow()
await save_contents_with_classification_flow()
except Exception as e:
logger.error(f"Error in save_all_flows: {e}")
raise e

# ======================
# Deployment Definitions
Expand All @@ -246,5 +241,4 @@ async def save_all_flows():
@flow(name="meta-flow")
async def meta_flow():
await create_jobs_flow()
await trigger_processing_flow()
await save_all_flows()

0 comments on commit 06705e8

Please sign in to comment.