Skip to content

Commit

Permalink
Bedrock sample (#116)
Browse files Browse the repository at this point in the history
* init + fix: logging

* workflow 2 now gives a chat summary at the end. Added python typing

* formatting

* fix: workflow params, localized try/catch, types, tidying

* refactor: activities to fit blog, workers to single aync function

* feat: sync return result

* refactor: tidy

* refactor: only generate summary at wf end

* fix: query named

* docs: add bedrock link to readme

* refactor: rename bedrock sample directories

* build: use poetry to run bedrock samples

* refactor: rename dirs to allow poe to run without modification

* refactor: shared activity, shared boto3 client, separate workflow ids

* refactor: remove unneeded kwargs

* refactor: minor tidyups

* fix: linting

* fix: start workflow with empty params

* refactor: only summarize once

* refactor: linting

* refactor

* log warning if messages received while chat is closed

* chore: linting

* Update bedrock/basic/README.md

Co-authored-by: Chad Retz <[email protected]>

* Update bedrock/basic/send_message.py

Co-authored-by: Chad Retz <[email protected]>

* Update bedrock/entity/workflows.py

Co-authored-by: Chad Retz <[email protected]>

* chad changes

* prerequisites in README

* refactor: mionr changes

---------

Co-authored-by: Steve Androulakis <[email protected]>
Co-authored-by: Chad Retz <[email protected]>
  • Loading branch information
3 people authored Jun 11, 2024
1 parent ce12576 commit ba5fd0f
Show file tree
Hide file tree
Showing 25 changed files with 1,189 additions and 521 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ Some examples require extra dependencies. See each sample's directory for specif
* [hello_signal](hello/hello_signal.py) - Send signals to a workflow.
<!-- Keep this list in alphabetical order -->
* [activity_worker](activity_worker) - Use Python activities from a workflow in another language.
* [bedrock](bedrock) - Orchestrate a chatbot with Amazon Bedrock.
* [cloud_export_to_parquet](cloud_export_to_parquet) - Set up schedule workflow to process exported files on an hourly basis
* [context_propagation](context_propagation) - Context propagation through workflows/activities via interceptor.
* [custom_converter](custom_converter) - Use a custom payload converter to handle custom types.
Expand Down
25 changes: 25 additions & 0 deletions bedrock/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# AI Chatbot example using Amazon Bedrock

Demonstrates how Temporal and Amazon Bedrock can be used to quickly build bulletproof AI applications.

## Samples

* [basic](basic) - A basic Bedrock workflow to process a single prompt.
* [signals_and_queries](signals_and_queries) - Extension to the basic workflow to allow multiple prompts through signals & queries.
* [entity](entity) - Full multi-Turn chat using an entity workflow..

## Pre-requisites

1. An AWS account with Bedrock enabled.
2. A machine that has access to Bedrock.
3. A local Temporal server running on the same machine. See [Temporal's dev server docs](https://docs.temporal.io/cli#start-dev-server) for more information.

These examples use Amazon's Python SDK (Boto3). To configure Boto3 to use your AWS credentials, follow the instructions in [the Boto3 documentation](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html).

## Running the samples

For these sample, the optional `bedrock` dependency group must be included. To include, run:

poetry install --with bedrock

There are 3 Bedrock samples, see the README.md in each sub-directory for instructions on running each.
Empty file added bedrock/__init__.py
Empty file.
10 changes: 10 additions & 0 deletions bedrock/basic/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Basic Amazon Bedrock workflow

A basic Bedrock workflow. Starts a workflow with a prompt, generates a response and ends the workflow.

To run, first see `samples-python` [README.md](../../README.md), and `bedrock` [README.md](../README.md) for prerequisites specific to this sample. Once set up, run the following from this directory:

1. Run the worker: `poetry run python run_worker.py`
2. In another terminal run the client with a prompt:

e.g. `poetry run python send_message.py 'What animals are marsupials?'`
Empty file added bedrock/basic/__init__.py
Empty file.
35 changes: 35 additions & 0 deletions bedrock/basic/run_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import asyncio
import concurrent.futures
import logging

from temporalio.client import Client
from temporalio.worker import Worker
from workflows import BasicBedrockWorkflow

from bedrock.shared.activities import BedrockActivities


async def main():
# Create client connected to server at the given address
client = await Client.connect("localhost:7233")
activities = BedrockActivities()

# Run the worker
with concurrent.futures.ThreadPoolExecutor(max_workers=100) as activity_executor:
worker = Worker(
client,
task_queue="bedrock-task-queue",
workflows=[BasicBedrockWorkflow],
activities=[activities.prompt_bedrock],
activity_executor=activity_executor,
)
await worker.run()


if __name__ == "__main__":
print("Starting worker")
print("Then run 'python send_message.py \"<prompt>\"'")

logging.basicConfig(level=logging.INFO)

asyncio.run(main())
29 changes: 29 additions & 0 deletions bedrock/basic/send_message.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import asyncio
import sys

from temporalio.client import Client
from workflows import BasicBedrockWorkflow


async def main(prompt: str) -> str:
# Create client connected to server at the given address
client = await Client.connect("localhost:7233")

# Start the workflow
workflow_id = "basic-bedrock-workflow"
handle = await client.start_workflow(
BasicBedrockWorkflow.run,
prompt, # Initial prompt
id=workflow_id,
task_queue="bedrock-task-queue",
)
return await handle.result()


if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: python send_message.py '<prompt>'")
print("Example: python send_message.py 'What animals are marsupials?'")
else:
result = asyncio.run(main(sys.argv[1]))
print(result)
24 changes: 24 additions & 0 deletions bedrock/basic/workflows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from datetime import timedelta

from temporalio import workflow

with workflow.unsafe.imports_passed_through():
from bedrock.shared.activities import BedrockActivities


@workflow.defn
class BasicBedrockWorkflow:
@workflow.run
async def run(self, prompt: str) -> str:

workflow.logger.info("Prompt: %s" % prompt)

response = await workflow.execute_activity_method(
BedrockActivities.prompt_bedrock,
prompt,
schedule_to_close_timeout=timedelta(seconds=20),
)

workflow.logger.info("Response: %s" % response)

return response
19 changes: 19 additions & 0 deletions bedrock/entity/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Multi-turn chat with Amazon Bedrock Entity Workflow

Multi-Turn Chat using an Entity Workflow. The workflow runs forever unless explicitly ended. The workflow continues as new after a configurable number of chat turns to keep the prompt size small and the Temporal event history small. Each continued-as-new workflow receives a summary of the conversation history so far for context.

To run, first see `samples-python` [README.md](../../README.md), and `bedrock` [README.md](../README.md) for prerequisites specific to this sample. Once set up, run the following from this directory:

1. Run the worker: `poetry run python run_worker.py`
2. In another terminal run the client with a prompt.

Example: `poetry run python send_message.py 'What animals are marsupials?'`

3. View the worker's output for the response.
4. Give followup prompts by signaling the workflow.

Example: `poetry run python send_message.py 'Do they lay eggs?'`
5. Get the conversation history summary by querying the workflow.

Example: `poetry run python get_history.py`
6. To end the chat session, run `poetry run python end_chat.py`
Empty file added bedrock/entity/__init__.py
Empty file.
22 changes: 22 additions & 0 deletions bedrock/entity/end_chat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import asyncio
import sys

from temporalio.client import Client
from workflows import EntityBedrockWorkflow


async def main():
# Create client connected to server at the given address
client = await Client.connect("localhost:7233")

workflow_id = "entity-bedrock-workflow"

handle = client.get_workflow_handle_for(EntityBedrockWorkflow.run, workflow_id)

# Sends a signal to the workflow
await handle.signal(EntityBedrockWorkflow.end_chat)


if __name__ == "__main__":
print("Sending signal to end chat.")
asyncio.run(main())
31 changes: 31 additions & 0 deletions bedrock/entity/get_history.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import asyncio

from temporalio.client import Client
from workflows import EntityBedrockWorkflow


async def main():
# Create client connected to server at the given address
client = await Client.connect("localhost:7233")
workflow_id = "entity-bedrock-workflow"

handle = client.get_workflow_handle(workflow_id)

# Queries the workflow for the conversation history
history = await handle.query(EntityBedrockWorkflow.get_conversation_history)

print("Conversation History")
print(
*(f"{speaker.title()}: {message}\n" for speaker, message in history), sep="\n"
)

# Queries the workflow for the conversation summary
summary = await handle.query(EntityBedrockWorkflow.get_summary_from_history)

if summary is not None:
print("Conversation Summary:")
print(summary)


if __name__ == "__main__":
asyncio.run(main())
35 changes: 35 additions & 0 deletions bedrock/entity/run_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import asyncio
import concurrent.futures
import logging

from temporalio.client import Client
from temporalio.worker import Worker
from workflows import EntityBedrockWorkflow

from bedrock.shared.activities import BedrockActivities


async def main():
# Create client connected to server at the given address
client = await Client.connect("localhost:7233")
activities = BedrockActivities()

# Run the worker
with concurrent.futures.ThreadPoolExecutor(max_workers=100) as activity_executor:
worker = Worker(
client,
task_queue="bedrock-task-queue",
workflows=[EntityBedrockWorkflow],
activities=[activities.prompt_bedrock],
activity_executor=activity_executor,
)
await worker.run()


if __name__ == "__main__":
print("Starting worker")
print("Then run 'python send_message.py \"<prompt>\"'")

logging.basicConfig(level=logging.INFO)

asyncio.run(main())
30 changes: 30 additions & 0 deletions bedrock/entity/send_message.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import asyncio
import sys

from temporalio.client import Client
from workflows import BedrockParams, EntityBedrockWorkflow


async def main(prompt):
# Create client connected to server at the given address
client = await Client.connect("localhost:7233")

workflow_id = "entity-bedrock-workflow"

# Sends a signal to the workflow (and starts it if needed)
await client.start_workflow(
EntityBedrockWorkflow.run,
BedrockParams(None, None),
id=workflow_id,
task_queue="bedrock-task-queue",
start_signal="user_prompt",
start_signal_args=[prompt],
)


if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: python send_message.py '<prompt>'")
print("Example: python send_message.py 'What animals are marsupials?'")
else:
asyncio.run(main(sys.argv[1]))
Loading

0 comments on commit ba5fd0f

Please sign in to comment.