Skip to content

Commit

Permalink
Prevent hang due to OpenAI receive time
Browse files Browse the repository at this point in the history
We now no longer allow the receive timeout to effectively be longer than
the agent timeout period. This ensures that we won't hang "forever"
waiting on OpenAI to respond.
  • Loading branch information
jwilger committed Mar 1, 2024
1 parent 5ae9a41 commit c8bd228
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 8 deletions.
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ config :open_ai_client, :base_url, System.get_env("OPENAI_BASE_URL") || "https:/
config :open_ai_client, :openai_api_key, System.get_env("OPENAI_API_KEY") || raise("OPENAI_API_KEY is not set")
config :open_ai_client, :openai_organization_id, System.get_env("OPENAI_ORGANIZATION_ID")
config :gpt_agent, :heartbeat_interval_ms, if(config_env() == :test, do: 1, else: 1000)
config :gpt_agent, :timeout_ms, get_env("GPT_AGENT_RECEIVE_TIMEOUT_MS", 120_000, :int)

# if this is set to a number larger than the `:timeout_ms` above, the above value will be used instead
config :gpt_agent, :receive_timeout_ms, get_env("GPT_AGENT_RECEIVE_TIMEOUT_MS", 30_000, :int)
```

Make sure you have the `OPENAI_API_KEY` and (optionally) `OPENAI_ORGANIZATION_ID`
Expand Down Expand Up @@ -123,3 +127,9 @@ normally. If you would like to set a different timeout value, you can pass the
# Shut down the process if it has not received any activity within 200ms
{:ok, pid} = GptAgent.connect(thread_id: thread_id, assistant_id: assistant_id, timeout_ms: 200)
```

There is also a receive timeout value for the HTTP requests to OpenAI. This is
specified via the `:receive_timeout_ms` configuration value in your application
config, however if the agent timeout is set to a smaller value, the agent
timeout will also be used as the receive timeout. This is to prevent situations
where the Agent will hang due to OpenAI taking too long to respond.
24 changes: 17 additions & 7 deletions lib/gpt_agent.ex
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,15 @@ defmodule GptAgent do
end
end

defp receive_timeout_ms(%__MODULE__{} = state) do
default_receive_timeout_ms = Application.get_env(:gpt_agent, :receive_timeout_ms)
Enum.min([default_receive_timeout_ms, state.timeout_ms])
end

defp retrieve_current_run_status(%__MODULE__{} = state) do
{:ok, %{body: %{"object" => "list", "data" => runs}}} =
OpenAiClient.get("/v1/threads/#{state.thread_id}/runs?limit=1&order=desc",
receive_timeout: Application.get_env(:gpt_agent, :receive_timeout_ms)
receive_timeout: receive_timeout_ms(state)
)

case runs do
Expand All @@ -132,7 +137,7 @@ defmodule GptAgent do
json: %{
"assistant_id" => state.assistant_id
},
receive_timeout: Application.get_env(:gpt_agent, :receive_timeout_ms)
receive_timeout: receive_timeout_ms(state)
)

Process.send_after(self(), {:check_run_status, id}, heartbeat_interval_ms())
Expand Down Expand Up @@ -175,7 +180,7 @@ defmodule GptAgent do
log("Reading messages with request to #{url}")

{:ok, %{body: %{"object" => "list", "data" => messages}}} =
OpenAiClient.get(url, receive_timeout: Application.get_env(:gpt_agent, :receive_timeout_ms))
OpenAiClient.get(url, receive_timeout: receive_timeout_ms(state))

state
|> process_messages(messages)
Expand Down Expand Up @@ -242,7 +247,7 @@ defmodule GptAgent do
{:ok, %{body: %{"id" => id}}} =
OpenAiClient.post("/v1/threads/#{state.thread_id}/messages",
json: message,
receive_timeout: Application.get_env(:gpt_agent, :receive_timeout_ms)
receive_timeout: receive_timeout_ms(state)
)

state
Expand Down Expand Up @@ -311,7 +316,7 @@ defmodule GptAgent do
{:ok, %{body: %{"object" => "thread.run", "cancelled_at" => nil, "failed_at" => nil}}} =
OpenAiClient.post("/v1/threads/#{state.thread_id}/runs/#{state.run_id}/submit_tool_outputs",
json: %{tool_outputs: state.tool_outputs},
receive_timeout: Application.get_env(:gpt_agent, :receive_timeout_ms)
receive_timeout: receive_timeout_ms(state)
)

Process.send_after(self(), {:check_run_status, state.run_id}, heartbeat_interval_ms())
Expand Down Expand Up @@ -364,7 +369,7 @@ defmodule GptAgent do

{:ok, %{body: %{"status" => status} = response}} =
OpenAiClient.get("/v1/threads/#{state.thread_id}/runs/#{id}",
receive_timeout: Application.get_env(:gpt_agent, :receive_timeout_ms)
receive_timeout: receive_timeout_ms(state)
)

handle_run_status(status, id, response, state)
Expand Down Expand Up @@ -672,6 +677,11 @@ defmodule GptAgent do

defp maybe_subscribe(result, _opts), do: result

defp receive_timeout_ms(%GptAgent{} = state) do
default_receive_timeout_ms = Application.get_env(:gpt_agent, :receive_timeout_ms)
Enum.min([default_receive_timeout_ms, state.timeout_ms])
end

defp handle_existing_agent(pid, last_message_id, assistant_id) do
log("Found existing GPT Agent with PID #{inspect(pid)}")
log("Updating last message ID to #{inspect(last_message_id)}")
Expand All @@ -692,7 +702,7 @@ defmodule GptAgent do
)

case OpenAiClient.get("/v1/threads/#{thread_id}",
receive_timeout: Application.get_env(:gpt_agent, :receive_timeout_ms)
receive_timeout: receive_timeout_ms(state)
) do
{:ok, %{status: 404}} ->
log("Thread ID #{inspect(thread_id)} not found")
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule GptAgent.MixProject do
def project do
[
app: :gpt_agent,
version: "9.2.1",
version: "9.2.2",
elixir: "~> 1.16",
start_permanent: Mix.env() == :prod,
aliases: aliases(),
Expand Down

0 comments on commit c8bd228

Please sign in to comment.