Skip to content

Commit

Permalink
Require last_message_id when connecting to an agent (#13)
Browse files Browse the repository at this point in the history
This pull request adds a new requirement for the `last_message_id`
parameter when connecting to an agent. It must always be explicitly set,
even if it is set to `nil`. This ensures that client code is proactively
setting the value, preventing situations where an old value is used and
later messages are not displayed.
  • Loading branch information
jwilger authored Jan 31, 2024
1 parent b392a52 commit 3c920d6
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 42 deletions.
64 changes: 56 additions & 8 deletions lib/gpt_agent.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ defmodule GptAgent do
typedstruct do
field :assistant_id, Types.assistant_id(), enforce: true
field :thread_id, Types.thread_id(), enforce: true
field :last_message_id, Types.message_id() | nil, enforce: true
field :running?, boolean(), default: false
field :run_id, Types.run_id() | nil
field :tool_calls, [ToolCallRequested.t()], default: []
field :tool_outputs, [ToolCallOutputRecorded.t()], default: []
field :last_message_id, Types.message_id() | nil
field :timeout_ms, non_neg_integer(), default: @timeout_ms
end

Expand Down Expand Up @@ -201,6 +201,12 @@ defmodule GptAgent do
{:noreply, %{state | assistant_id: assistant_id}}
end

@impl true
def handle_cast({:set_last_message_id, last_message_id}, %__MODULE__{} = state) do
log("Setting last message ID to #{last_message_id}")
{:noreply, %{state | last_message_id: last_message_id}}
end

@impl true
def handle_call(:shutdown, _caller, %__MODULE__{} = state) do
log("Shutting down")
Expand Down Expand Up @@ -400,6 +406,8 @@ defmodule GptAgent do
defp log(message, level \\ :debug) when is_binary(message),
do: Logger.log(level, "[GptAgent (#{inspect(self())})] " <> message)

defp ok(data), do: {:ok, data}

@impl true
def create_thread do
log("Creating thread")
Expand All @@ -419,7 +427,7 @@ defmodule GptAgent do

@impl true
def connect(opts) when is_list(opts) do
opts = validate_and_convert_opts(opts)
{:ok, opts} = validate_and_convert_opts(opts)

opts
|> connect_to_new_or_existing_agent()
Expand All @@ -431,24 +439,61 @@ defmodule GptAgent do

case Registry.lookup(GptAgent.Registry, opts.thread_id) do
[{pid, :gpt_agent}] ->
handle_existing_agent(pid)
handle_existing_agent(pid, opts.last_message_id)

[] ->
handle_no_existing_agent(opts.thread_id, opts.assistant_id, opts.timeout_ms)
handle_no_existing_agent(
opts.thread_id,
opts.last_message_id,
opts.assistant_id,
opts.timeout_ms
)
end
end

defp validate_and_convert_opts(opts) do
Keyword.validate!(opts, [
:thread_id,
:last_message_id,
:assistant_id,
subscribe: true,
assistant_id: nil,
last_message_id: nil,
timeout_ms: nil
])
|> Enum.into(%{})
|> ok()
|> validate_thread_id()
|> validate_last_message_id()
|> validate_assistant_id()
end

defp validate_thread_id({:ok, %{thread_id: _thread_id} = opts}) do
ok(opts)
end

defp validate_thread_id({:ok, _opts}) do
{:error, :missing_thread_id}
end

defp validate_last_message_id({:ok, %{last_message_id: _last_message_id} = opts}) do
ok(opts)
end

defp validate_last_message_id({:ok, _opts}) do
{:error, :missing_last_message_id}
end

defp validate_last_message_id({:error, _} = error), do: error

defp validate_assistant_id({:ok, %{assistant_id: _assistant_id} = opts}) do
ok(opts)
end

defp validate_assistant_id({:ok, _opts}) do
{:error, :missing_assistant_id}
end

defp validate_assistant_id({:error, _} = error), do: error

defp maybe_subscribe({:ok, _pid} = result, opts) do
if opts.subscribe do
Phoenix.PubSub.subscribe(GptAgent.PubSub, "gpt_agent:#{opts.thread_id}")
Expand All @@ -459,17 +504,20 @@ defmodule GptAgent do

defp maybe_subscribe(result, _opts), do: result

defp handle_existing_agent(pid) do
defp handle_existing_agent(pid, last_message_id) do
log("Found existing GPT Agent with PID #{inspect(pid)}")
log("Updating last message ID to #{inspect(last_message_id)}")
GenServer.cast(pid, {:set_last_message_id, last_message_id})
{:ok, pid}
end

defp handle_no_existing_agent(thread_id, assistant_id, timeout_ms) do
defp handle_no_existing_agent(thread_id, last_message_id, assistant_id, timeout_ms) do
log("No existing GPT Agent found, starting new one")

state =
GptAgent.new!(
thread_id: thread_id,
last_message_id: last_message_id,
assistant_id: assistant_id,
timeout_ms: timeout_ms || default_timeout_ms()
)
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule GptAgent.MixProject do
def project do
[
app: :gpt_agent,
version: "5.1.1",
version: "6.0.0",
elixir: "~> 1.16",
start_permanent: Mix.env() == :prod,
aliases: aliases(),
Expand Down
Loading

0 comments on commit 3c920d6

Please sign in to comment.