From df72f299774c1e1862ff16c14b345e594acf08fa Mon Sep 17 00:00:00 2001 From: John Wilger Date: Thu, 22 Feb 2024 07:25:28 -0800 Subject: [PATCH] Retry runs on time/token based rate limiting When the run status indicates that OpenAI has rate-limited the request due to tokens per minute or requests per minute, we will retry the requests after a configurable delay and up to a configurable number of times (defaults to no retries). With this commit, we slightly modify the error code returned with the RunFailed event to indicate whether the failure is due to temporary rate limiting or quota being reached as well as whether a retry will be attempted. We return the event even when a retry will be attempted, so that the subscribers can be aware that rate-limiting is occuring. --- config/config.exs | 6 +++ lib/gpt_agent.ex | 107 +++++++++++++++++++++++++++++++++++++++- mix.exs | 2 +- test/gpt_agent_test.exs | 105 ++++++++++++++++++++++++++++++++++++++- 4 files changed, 217 insertions(+), 3 deletions(-) diff --git a/config/config.exs b/config/config.exs index f416b13..744668c 100644 --- a/config/config.exs +++ b/config/config.exs @@ -8,12 +8,18 @@ case config_env() do config :logger, level: :debug + config :gpt_agent, :rate_limit_retry_delay, 30_000 + config :gpt_agent, :rate_limit_max_retries, 10 + :test -> config :open_ai_client, :openai_api_key, "test" config :open_ai_client, :openai_organization_id, "test" config :logger, level: :warning + config :gpt_agent, :rate_limit_retry_delay, 100 + config :gpt_agent, :rate_limit_max_retries, 2 + _ -> nil end diff --git a/lib/gpt_agent.ex b/lib/gpt_agent.ex index 2572915..7052b6a 100644 --- a/lib/gpt_agent.ex +++ b/lib/gpt_agent.ex @@ -26,6 +26,9 @@ defmodule GptAgent do # two minutes @timeout_ms 120_000 + @rate_limit_max_retries Application.compile_env(:gpt_agent, :rate_limit_max_retries, 0) + @rate_limit_retry_delay Application.compile_env(:gpt_agent, :rate_limit_retry_delay, 0) + typedstruct do field :assistant_id, Types.assistant_id(), enforce: true field :thread_id, Types.thread_id(), enforce: true @@ -35,6 +38,7 @@ defmodule GptAgent do field :tool_calls, [ToolCallRequested.t()], default: [] field :tool_outputs, [ToolCallOutputRecorded.t()], default: [] field :timeout_ms, non_neg_integer(), default: @timeout_ms + field :rate_limit_retry_attempt, non_neg_integer(), default: 0 end @type connect_opt() :: @@ -239,6 +243,7 @@ defmodule GptAgent do ) state + |> Map.put(:rate_limit_retry_attempt, 0) |> publish_event( UserMessageAdded.new!( id: id, @@ -347,7 +352,10 @@ defmodule GptAgent do end end - @impl true + def handle_info(:run, %__MODULE__{} = state) do + noreply(state, {:continue, :run}) + end + def handle_info({:check_run_status, id}, %__MODULE__{} = state) do log("Checking run status for run ID #{inspect(id)}") @@ -429,6 +437,103 @@ defmodule GptAgent do noreply(%{state | running?: true}) end + defp handle_run_status( + "failed", + id, + %{ + "last_error" => %{ + "code" => "rate_limit_exceeded", + "message" => "Rate limit reached" <> _ + } + } = response, + %__MODULE__{rate_limit_retry_attempt: attempts} = state + ) + when attempts < @rate_limit_max_retries do + log( + "Run ID #{inspect(id)} failed due to rate limiting. Will retry run in #{@rate_limit_retry_delay}ms." + ) + + Process.send_after(self(), :run, @rate_limit_retry_delay) + + state + |> Map.update!(:rate_limit_retry_attempt, &(&1 + 1)) + |> publish_event( + RunFailed.new!( + id: id, + thread_id: state.thread_id, + assistant_id: state.assistant_id, + code: "rate_limit_exceeded-retrying", + message: response |> Map.get("last_error", %{}) |> Map.get("message") + ) + ) + |> noreply() + end + + defp handle_run_status( + "failed", + id, + %{ + "last_error" => %{ + "code" => "rate_limit_exceeded", + "message" => "Rate limit reached" <> _ + } + } = response, + %__MODULE__{rate_limit_retry_attempt: attempts} = state + ) + when attempts >= @rate_limit_max_retries do + log("Run ID #{inspect(id)} failed due to rate limiting. Retries expired") + + state + |> Map.update!(:rate_limit_retry_attempt, &(&1 + 1)) + |> publish_event( + RunFailed.new!( + id: id, + thread_id: state.thread_id, + assistant_id: state.assistant_id, + code: "rate_limit_exceeded-final", + message: response |> Map.get("last_error", %{}) |> Map.get("message") + ) + ) + |> noreply() + end + + defp handle_run_status( + "failed", + id, + %{ + "last_error" => %{ + "code" => "rate_limit_exceeded", + "message" => + "You exceeded your current quota, please check your plan and billing details." <> _ + } + } = response, + %__MODULE__{} = state + ) do + log("Run ID #{inspect(id)} failed due to OpenAI account quota reached.") + + state + |> publish_event( + RunFailed.new!( + id: id, + thread_id: state.thread_id, + assistant_id: state.assistant_id, + code: "rate_limit_exceeded-final", + message: response |> Map.get("last_error", %{}) |> Map.get("message") + ) + ) + # DEPRECATED: remove this second publish_event call on major version bump to 10.0.0 + |> publish_event( + RunFailed.new!( + id: id, + thread_id: state.thread_id, + assistant_id: state.assistant_id, + code: "rate_limit_exceeded", + message: response |> Map.get("last_error", %{}) |> Map.get("message") + ) + ) + |> noreply() + end + defp handle_run_status(status, id, response, %__MODULE__{} = state) do log("Run ID #{inspect(id)} failed with status #{inspect(status)}", :warning) log("Response: #{inspect(response)}") diff --git a/mix.exs b/mix.exs index bb38115..c40d5de 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule GptAgent.MixProject do def project do [ app: :gpt_agent, - version: "9.1.0", + version: "9.2.0", elixir: "~> 1.16", start_permanent: Mix.env() == :prod, aliases: aliases(), diff --git a/test/gpt_agent_test.exs b/test/gpt_agent_test.exs index 39f0e41..68d55f9 100644 --- a/test/gpt_agent_test.exs +++ b/test/gpt_agent_test.exs @@ -19,6 +19,8 @@ defmodule GptAgentTest do alias GptAgent.Types.UserMessage + @retry_delay Application.compile_env(:gpt_agent, :rate_limit_retry_delay) + setup _context do bypass = Bypass.open() Application.put_env(:open_ai_client, :base_url, "http://localhost:#{bypass.port}") @@ -1117,7 +1119,97 @@ defmodule GptAgentTest do end @tag capture_log: true - test "when the run is fails, sends the RunFailed event to the callback handler", %{ + test "when the run fails due to rate limiting, attempt to perform the run again after a delay, up to two more times", + %{ + bypass: bypass, + assistant_id: assistant_id, + thread_id: thread_id, + run_id: run_id + } do + {:ok, pid} = + GptAgent.connect(thread_id: thread_id, last_message_id: nil, assistant_id: assistant_id) + + {:ok, counter} = Agent.start_link(fn -> 0 end) + + Bypass.expect(bypass, "GET", "/v1/threads/#{thread_id}/runs/#{run_id}", fn conn -> + failed_body = %{ + "id" => run_id, + "object" => "thread.run", + "created_at" => 1_699_075_072, + "assistant_id" => assistant_id, + "thread_id" => thread_id, + "status" => "failed", + "started_at" => 1_699_075_072, + "expires_at" => nil, + "cancelled_at" => nil, + "completed_at" => nil, + "failed_at" => 1_699_075_073, + "last_error" => %{ + "code" => "rate_limit_exceeded", + "message" => + "Rate limit reached for whatever model blah blah blah, wouldn't it be swell if they used a different error code instead of making me match against a long-ass error message?" + }, + "model" => "gpt-4-1106-preview", + "instructions" => nil, + "tools" => [], + "file_ids" => [], + "metadata" => %{} + } + + response_body = + case Agent.get_and_update(counter, &{&1, &1 + 1}) do + 0 -> failed_body + 1 -> failed_body + 2 -> failed_body + 3 -> raise "should not have reached a third retry" + end + + conn + |> Plug.Conn.put_resp_content_type("application/json") + |> Plug.Conn.resp(200, Jason.encode!(response_body)) + end) + + :ok = GptAgent.add_user_message(pid, "Hello") + + assert_receive {^pid, + %RunFailed{ + id: ^run_id, + thread_id: ^thread_id, + assistant_id: ^assistant_id, + code: "rate_limit_exceeded-retrying", + message: + "Rate limit reached for whatever model blah blah blah, wouldn't it be swell if they used a different error code instead of making me match against a long-ass error message?" + }} + + refute_receive {^pid, %RunFailed{}}, @retry_delay - 10 + + assert_receive {^pid, + %RunFailed{ + id: ^run_id, + thread_id: ^thread_id, + assistant_id: ^assistant_id, + code: "rate_limit_exceeded-retrying", + message: + "Rate limit reached for whatever model blah blah blah, wouldn't it be swell if they used a different error code instead of making me match against a long-ass error message?" + }}, + 20 + + refute_receive {^pid, %RunFailed{}}, @retry_delay - 10 + + assert_receive {^pid, + %RunFailed{ + id: ^run_id, + thread_id: ^thread_id, + assistant_id: ^assistant_id, + code: "rate_limit_exceeded-final", + message: + "Rate limit reached for whatever model blah blah blah, wouldn't it be swell if they used a different error code instead of making me match against a long-ass error message?" + }}, + 20 + end + + @tag capture_log: true + test "when the run fails, sends the RunFailed event to the callback handler", %{ bypass: bypass, assistant_id: assistant_id, thread_id: thread_id, @@ -1159,6 +1251,17 @@ defmodule GptAgentTest do :ok = GptAgent.add_user_message(pid, "Hello") + assert_receive {^pid, + %RunFailed{ + id: ^run_id, + thread_id: ^thread_id, + assistant_id: ^assistant_id, + code: "rate_limit_exceeded-final", + message: + "You exceeded your current quota, please check your plan and billing details. For more information on this error, read the docs: https://platform.openai.com/docs/guides/error-codes/api-errors." + }}, + 5_000 + assert_receive {^pid, %RunFailed{ id: ^run_id,