Skip to content

Commit

Permalink
updated classification flow
Browse files Browse the repository at this point in the history
  • Loading branch information
JimVincentW committed Dec 30, 2024
1 parent 2a077b6 commit f84480f
Showing 1 changed file with 23 additions and 17 deletions.
40 changes: 23 additions & 17 deletions opol/stack/flows/classification/classification_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,29 +184,35 @@ async def classify_contents_flow(batch_size: int):
for content_dict, content in zip(results, contents):
if not content_dict:
continue
if content_dict.get("type") != "Other":
llm_evaluation = await evaluate_content(content)
db_evaluation = ContentEvaluation(
content_id=content.id,
**llm_evaluation.model_dump()
)

content_dict_processed = {
'url': content.url,
'title': content.title,
'evaluations': db_evaluation.model_dump(exclude={'id'})
}
evaluated_contents.append(content_dict_processed)
else:
logger.info(f"Content classified as irrelevant: {content.title[:50]}...")
redis_conn = await Redis.from_url(get_redis_url(), db=4)
await redis_conn.rpush('filtered_out_queue', json.dumps(content.model_dump(), cls=UUIDEncoder))
try:
if content_dict.get("type") != "Other":
llm_evaluation = await evaluate_content(content)
db_evaluation = ContentEvaluation(
content_id=content.id,
**llm_evaluation.model_dump()
)

content_dict_processed = {
'url': content.url,
'title': content.title,
'evaluations': db_evaluation.model_dump(exclude={'id'})
}
evaluated_contents.append(content_dict_processed)
else:
logger.info(f"Content classified as irrelevant: {content.title[:50]}...")
redis_conn = await Redis.from_url(get_redis_url(), db=4)
await redis_conn.rpush('filtered_out_queue', json.dumps(content.model_dump(), cls=UUIDEncoder))
except Exception as e:
logger.error(f"Error processing content: {content.title[:50]}...")
logger.error(f"Error: {e}")
continue

if evaluated_contents:
logger.info(f"Writing {len(evaluated_contents)} evaluated contents to Redis")
await write_contents_to_redis(evaluated_contents)
return evaluated_contents

@task(log_prints=True)
async def process_content(content):
"""
This function can be used for individual content processing if needed.
Expand Down

0 comments on commit f84480f

Please sign in to comment.