Skip to content

Commit

Permalink
Retry runs on time/token based rate limiting
Browse files Browse the repository at this point in the history
When the run status indicates that OpenAI has rate-limited the request
due to tokens per minute or requests per minute, we will retry the
requests after a configurable delay and up to a configurable number of
times (defaults to no retries).

With this commit, we slightly modify the error code returned with the
RunFailed event to indicate whether the failure is due to temporary rate
limiting or quota being reached as well as whether a retry will be
attempted. We return the event even when a retry will be attempted, so
that the subscribers can be aware that rate-limiting is occuring.
  • Loading branch information
jwilger committed Feb 22, 2024
1 parent 8d82ed9 commit df72f29
Show file tree
Hide file tree
Showing 4 changed files with 217 additions and 3 deletions.
6 changes: 6 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,18 @@ case config_env() do

config :logger, level: :debug

config :gpt_agent, :rate_limit_retry_delay, 30_000
config :gpt_agent, :rate_limit_max_retries, 10

:test ->
config :open_ai_client, :openai_api_key, "test"
config :open_ai_client, :openai_organization_id, "test"

config :logger, level: :warning

config :gpt_agent, :rate_limit_retry_delay, 100
config :gpt_agent, :rate_limit_max_retries, 2

_ ->
nil
end
107 changes: 106 additions & 1 deletion lib/gpt_agent.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ defmodule GptAgent do
# two minutes
@timeout_ms 120_000

@rate_limit_max_retries Application.compile_env(:gpt_agent, :rate_limit_max_retries, 0)
@rate_limit_retry_delay Application.compile_env(:gpt_agent, :rate_limit_retry_delay, 0)

typedstruct do
field :assistant_id, Types.assistant_id(), enforce: true
field :thread_id, Types.thread_id(), enforce: true
Expand All @@ -35,6 +38,7 @@ defmodule GptAgent do
field :tool_calls, [ToolCallRequested.t()], default: []
field :tool_outputs, [ToolCallOutputRecorded.t()], default: []
field :timeout_ms, non_neg_integer(), default: @timeout_ms
field :rate_limit_retry_attempt, non_neg_integer(), default: 0
end

@type connect_opt() ::
Expand Down Expand Up @@ -239,6 +243,7 @@ defmodule GptAgent do
)

state
|> Map.put(:rate_limit_retry_attempt, 0)
|> publish_event(
UserMessageAdded.new!(
id: id,
Expand Down Expand Up @@ -347,7 +352,10 @@ defmodule GptAgent do
end
end

@impl true
def handle_info(:run, %__MODULE__{} = state) do
noreply(state, {:continue, :run})
end

def handle_info({:check_run_status, id}, %__MODULE__{} = state) do
log("Checking run status for run ID #{inspect(id)}")

Expand Down Expand Up @@ -429,6 +437,103 @@ defmodule GptAgent do
noreply(%{state | running?: true})
end

defp handle_run_status(
"failed",
id,
%{
"last_error" => %{
"code" => "rate_limit_exceeded",
"message" => "Rate limit reached" <> _
}
} = response,
%__MODULE__{rate_limit_retry_attempt: attempts} = state
)
when attempts < @rate_limit_max_retries do
log(
"Run ID #{inspect(id)} failed due to rate limiting. Will retry run in #{@rate_limit_retry_delay}ms."
)

Process.send_after(self(), :run, @rate_limit_retry_delay)

state
|> Map.update!(:rate_limit_retry_attempt, &(&1 + 1))
|> publish_event(
RunFailed.new!(
id: id,
thread_id: state.thread_id,
assistant_id: state.assistant_id,
code: "rate_limit_exceeded-retrying",
message: response |> Map.get("last_error", %{}) |> Map.get("message")
)
)
|> noreply()
end

defp handle_run_status(
"failed",
id,
%{
"last_error" => %{
"code" => "rate_limit_exceeded",
"message" => "Rate limit reached" <> _
}
} = response,
%__MODULE__{rate_limit_retry_attempt: attempts} = state
)
when attempts >= @rate_limit_max_retries do
log("Run ID #{inspect(id)} failed due to rate limiting. Retries expired")

state
|> Map.update!(:rate_limit_retry_attempt, &(&1 + 1))
|> publish_event(
RunFailed.new!(
id: id,
thread_id: state.thread_id,
assistant_id: state.assistant_id,
code: "rate_limit_exceeded-final",
message: response |> Map.get("last_error", %{}) |> Map.get("message")
)
)
|> noreply()
end

defp handle_run_status(
"failed",
id,
%{
"last_error" => %{
"code" => "rate_limit_exceeded",
"message" =>
"You exceeded your current quota, please check your plan and billing details." <> _
}
} = response,
%__MODULE__{} = state
) do
log("Run ID #{inspect(id)} failed due to OpenAI account quota reached.")

state
|> publish_event(
RunFailed.new!(
id: id,
thread_id: state.thread_id,
assistant_id: state.assistant_id,
code: "rate_limit_exceeded-final",
message: response |> Map.get("last_error", %{}) |> Map.get("message")
)
)
# DEPRECATED: remove this second publish_event call on major version bump to 10.0.0
|> publish_event(
RunFailed.new!(
id: id,
thread_id: state.thread_id,
assistant_id: state.assistant_id,
code: "rate_limit_exceeded",
message: response |> Map.get("last_error", %{}) |> Map.get("message")
)
)
|> noreply()
end

defp handle_run_status(status, id, response, %__MODULE__{} = state) do
log("Run ID #{inspect(id)} failed with status #{inspect(status)}", :warning)
log("Response: #{inspect(response)}")
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule GptAgent.MixProject do
def project do
[
app: :gpt_agent,
version: "9.1.0",
version: "9.2.0",
elixir: "~> 1.16",
start_permanent: Mix.env() == :prod,
aliases: aliases(),
Expand Down
105 changes: 104 additions & 1 deletion test/gpt_agent_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ defmodule GptAgentTest do

alias GptAgent.Types.UserMessage

@retry_delay Application.compile_env(:gpt_agent, :rate_limit_retry_delay)

setup _context do
bypass = Bypass.open()
Application.put_env(:open_ai_client, :base_url, "http://localhost:#{bypass.port}")
Expand Down Expand Up @@ -1117,7 +1119,97 @@ defmodule GptAgentTest do
end

@tag capture_log: true
test "when the run is fails, sends the RunFailed event to the callback handler", %{
test "when the run fails due to rate limiting, attempt to perform the run again after a delay, up to two more times",
%{
bypass: bypass,
assistant_id: assistant_id,
thread_id: thread_id,
run_id: run_id
} do
{:ok, pid} =
GptAgent.connect(thread_id: thread_id, last_message_id: nil, assistant_id: assistant_id)

{:ok, counter} = Agent.start_link(fn -> 0 end)

Bypass.expect(bypass, "GET", "/v1/threads/#{thread_id}/runs/#{run_id}", fn conn ->
failed_body = %{
"id" => run_id,
"object" => "thread.run",
"created_at" => 1_699_075_072,
"assistant_id" => assistant_id,
"thread_id" => thread_id,
"status" => "failed",
"started_at" => 1_699_075_072,
"expires_at" => nil,
"cancelled_at" => nil,
"completed_at" => nil,
"failed_at" => 1_699_075_073,
"last_error" => %{
"code" => "rate_limit_exceeded",
"message" =>
"Rate limit reached for whatever model blah blah blah, wouldn't it be swell if they used a different error code instead of making me match against a long-ass error message?"
},
"model" => "gpt-4-1106-preview",
"instructions" => nil,
"tools" => [],
"file_ids" => [],
"metadata" => %{}
}

response_body =
case Agent.get_and_update(counter, &{&1, &1 + 1}) do
0 -> failed_body
1 -> failed_body
2 -> failed_body
3 -> raise "should not have reached a third retry"
end

conn
|> Plug.Conn.put_resp_content_type("application/json")
|> Plug.Conn.resp(200, Jason.encode!(response_body))
end)

:ok = GptAgent.add_user_message(pid, "Hello")

assert_receive {^pid,
%RunFailed{
id: ^run_id,
thread_id: ^thread_id,
assistant_id: ^assistant_id,
code: "rate_limit_exceeded-retrying",
message:
"Rate limit reached for whatever model blah blah blah, wouldn't it be swell if they used a different error code instead of making me match against a long-ass error message?"
}}

refute_receive {^pid, %RunFailed{}}, @retry_delay - 10

assert_receive {^pid,
%RunFailed{
id: ^run_id,
thread_id: ^thread_id,
assistant_id: ^assistant_id,
code: "rate_limit_exceeded-retrying",
message:
"Rate limit reached for whatever model blah blah blah, wouldn't it be swell if they used a different error code instead of making me match against a long-ass error message?"
}},
20

refute_receive {^pid, %RunFailed{}}, @retry_delay - 10

assert_receive {^pid,
%RunFailed{
id: ^run_id,
thread_id: ^thread_id,
assistant_id: ^assistant_id,
code: "rate_limit_exceeded-final",
message:
"Rate limit reached for whatever model blah blah blah, wouldn't it be swell if they used a different error code instead of making me match against a long-ass error message?"
}},
20
end

@tag capture_log: true
test "when the run fails, sends the RunFailed event to the callback handler", %{
bypass: bypass,
assistant_id: assistant_id,
thread_id: thread_id,
Expand Down Expand Up @@ -1159,6 +1251,17 @@ defmodule GptAgentTest do

:ok = GptAgent.add_user_message(pid, "Hello")

assert_receive {^pid,
%RunFailed{
id: ^run_id,
thread_id: ^thread_id,
assistant_id: ^assistant_id,
code: "rate_limit_exceeded-final",
message:
"You exceeded your current quota, please check your plan and billing details. For more information on this error, read the docs: https://platform.openai.com/docs/guides/error-codes/api-errors."
}},
5_000

assert_receive {^pid,
%RunFailed{
id: ^run_id,
Expand Down

0 comments on commit df72f29

Please sign in to comment.