Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OpenAI Realtime demo #285

Merged
merged 5 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions livebooks/openai_realtime_with_membrane_webrtc/assets/index.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<!DOCTYPE html>
<html lang="en">

<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta http-equiv="X-UA-Compatible" content="ie=edge">
<title>OpenAI Realtime with Membrane WebRTC demo</title>
</head>

<body
style="background-color: black; color: white; font-family: Arial, Helvetica, sans-serif; min-height: 100vh; margin: 0px; padding: 5px 0px 5px 0px">
<main>
<h1>OpenAI Realtime with Membrane WebRTC demo</h1>
<div id="status">Connecting</div>
<audio id="audioPlayer"></audio>
</main>
<script src="openai_realtime_demo.js"></script>
</body>

</html>
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
const pcConfig = { iceServers: [{ urls: "stun:stun.l.google.com:19302" }] };
const mediaConstraints = { video: false, audio: true };

const proto = window.location.protocol === "https:" ? "wss:" : "ws:";
const wsBrowserToElixir = new WebSocket(`${proto}//${window.location.hostname}:8829`);
const connBrowserToElixirStatus = document.getElementById("status");
wsBrowserToElixir.onopen = (_) => start_connection_browser_to_elixir(wsBrowserToElixir);
wsBrowserToElixir.onclose = (event) => {
connBrowserToElixirStatus.innerHTML = "Disconnected";
console.log("WebSocket connection was terminated:", event);
};

const start_connection_browser_to_elixir = async (ws) => {
const localStream = await navigator.mediaDevices.getUserMedia(mediaConstraints);
const pc = new RTCPeerConnection(pcConfig);

pc.onicecandidate = (event) => {
if (event.candidate === null) return;
console.log("Sent ICE candidate:", event.candidate);
ws.send(JSON.stringify({ type: "ice_candidate", data: event.candidate }));
};

pc.onconnectionstatechange = () => {
if (pc.connectionState == "connected") {
const button = document.createElement("button");
button.innerHTML = "Disconnect";
button.onclick = () => {
ws.close();
localStream.getTracks().forEach((track) => track.stop());
};
connBrowserToElixirStatus.innerHTML = "Connected ";
connBrowserToElixirStatus.appendChild(button);
}
};

for (const track of localStream.getTracks()) {
pc.addTrack(track, localStream);
}

ws.onmessage = async (event) => {
const { type, data } = JSON.parse(event.data);

switch (type) {
case "sdp_answer":
console.log("Received SDP answer:", data);
await pc.setRemoteDescription(data);
break;
case "ice_candidate":
console.log("Recieved ICE candidate:", data);
await pc.addIceCandidate(data);
break;
}
};

const offer = await pc.createOffer();
await pc.setLocalDescription(offer);
console.log("Sent SDP offer:", offer);
ws.send(JSON.stringify({ type: "sdp_offer", data: offer }));
};

const audioPlayer = document.getElementById("audioPlayer");
const wsElixirToBrowser = new WebSocket(`${proto}//${window.location.hostname}:8831`);
wsElixirToBrowser.onopen = () => start_connection_elixir_to_browser(wsElixirToBrowser);
wsElixirToBrowser.onclose = (event) => console.log("WebSocket connection was terminated:", event);

const start_connection_elixir_to_browser = async (ws) => {
audioPlayer.srcObject = new MediaStream();

const pc = new RTCPeerConnection(pcConfig);
pc.ontrack = (event) => audioPlayer.srcObject.addTrack(event.track);
pc.onicecandidate = (event) => {
if (event.candidate === null) return;

console.log("Sent ICE candidate:", event.candidate);
ws.send(JSON.stringify({ type: "ice_candidate", data: event.candidate }));
};

ws.onmessage = async (event) => {
const { type, data } = JSON.parse(event.data);

switch (type) {
case "sdp_offer":
console.log("Received SDP offer:", data);
await pc.setRemoteDescription(data);
const answer = await pc.createAnswer();
await pc.setLocalDescription(answer);
ws.send(JSON.stringify({ type: "sdp_answer", data: answer }));
console.log("Sent SDP answer:", answer);
break;
case "ice_candidate":
console.log("Recieved ICE candidate:", data);
await pc.addIceCandidate(data);
}
};
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
# OpenAI Realtime Integration with Membrane WebRTC

```elixir
File.cd(__DIR__)
Logger.configure(level: :info)

Mix.install([
{:membrane_core, "~> 1.1"},
{:membrane_webrtc_plugin, "~> 0.22.0"},
{:membrane_opus_plugin, "~> 0.20.4"},
{:membrane_raw_audio_parser_plugin, "~> 0.4.0"},
{:membrane_realtimer_plugin, "~> 0.10.0"},
{:kino_membrane, "~> 0.3.0"},
{:websockex, "~> 0.4.3"},
{:jason, "~> 1.4"}
])
```

## Introduction

This demo shows how to use Membrane Framework to create a simple WebRTC based app that allows you to have a conversation with ChatGPT using the newest [OpenAI Realtime API](https://openai.com/index/introducing-the-realtime-api/).

## WebSocket handler

OpenAI Realtime API requires sending and receiving audio via the WebSocket. Let's create a module responsible for handling it with `WebSockex` library.

```elixir
defmodule OpenAIWebSocket do
use WebSockex
require Logger

def start_link(opts) do
WebSockex.start_link(
"wss://api.openai.com/v1/realtime?model=gpt-4o-realtime-preview-2024-10-01",
__MODULE__,
%{parent: self()},
opts
)
end

@impl true
def handle_frame(frame, state) do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def handle_frame(frame, state) do
@impl true
def handle_frame(frame, state) do

send(state.parent, {:websocket_frame, frame})
{:ok, state}
end

def send_frame(ws, frame), do: WebSockex.send_frame(ws, {:text, frame})
end
```

## Membrane Components

Then, we will create a Membrane Element that will receive and send raw audio frames via the WebSocket.

```elixir
defmodule OpenAIEndpoint do
use Membrane.Endpoint
require Membrane.Logger

def_input_pad(:input, accepted_format: _any)
def_output_pad(:output, accepted_format: _any, flow_control: :push)

def_options(websocket_opts: [])

@impl true
def handle_init(_ctx, opts) do
{:ok, ws} = OpenAIWebSocket.start_link(opts.websocket_opts)
{[], %{ws: ws}}
end

@impl true
def handle_playing(_ctx, state) do
# format of the buffers sent in the line 36
format = %Membrane.RawAudio{channels: 1, sample_rate: 24_000, sample_format: :s16le}
{[stream_format: {:output, format}], state}
end

@impl true
def handle_buffer(:input, buffer, _ctx, state) do
audio = Base.encode64(buffer.payload)
frame = %{type: "input_audio_buffer.append", audio: audio} |> Jason.encode!()
:ok = OpenAIWebSocket.send_frame(state.ws, frame)
{[], state}
end

@impl true
def handle_info({:websocket_frame, {:text, frame}}, _ctx, state) do
case Jason.decode!(frame) do
%{"type" => "response.audio.delta", "delta" => delta} ->
audio_payload = Base.decode64!(delta)
{[buffer: {:output, %Membrane.Buffer{payload: audio_payload}}], state}

%{"type" => "response.audio.done"} ->
{[event: {:output, %Membrane.Realtimer.Events.Reset{}}], state}

%{"type" => "response.audio_transcript.done", "transcript" => transcript} ->
Membrane.Logger.info("AI transcription: #{transcript}")
{[], state}

%{} ->
{[], state}
end
end
end
```

Now, let's write a Pipeline module that exchanges the media with the browser using `Membrane.WebRTC.Source` and `Sink` and with OpenAI server using `OpenAIEndpoint`.

Because WebRTC requires and provides audio in OPUS format and OpenAI Realtime API uses raw audio, we have to spawn the proper encoder and decoder between WebRTC and OpenAI elements.

```elixir
defmodule OpenAIPipeline do
use Membrane.Pipeline

@impl true
def handle_init(_ctx, opts) do
spec =
child(:webrtc_source, %Membrane.WebRTC.Source{
signaling: {:websocket, port: opts[:webrtc_source_ws_port]}
})
|> via_out(:output, options: [kind: :audio])
|> child(:input_opus_parser, Membrane.Opus.Parser)
|> child(:opus_decoder, %Membrane.Opus.Decoder{sample_rate: 24_000})
|> child(:open_ai, %OpenAIEndpoint{websocket_opts: opts[:openai_ws_opts]})
|> child(:raw_audio_parser, %Membrane.RawAudioParser{overwrite_pts?: true})
|> via_in(:input, target_queue_size: 1_000_000_000, toilet_capacity: 1_000_000_000)
|> child(:realtimer, Membrane.Realtimer)
|> child(:opus_encoder, Membrane.Opus.Encoder)
|> via_in(:input, options: [kind: :audio])
|> child(:webrtc_sink, %Membrane.WebRTC.Sink{
tracks: [:audio],
signaling: {:websocket, port: opts[:webrtc_sink_ws_port]}
})

{[spec: spec], %{}}
end
end
```

## Getting OpenAI API key from the env

Let's set the WebSocket options (remember to set `OPENAI_API KEY` env).

```elixir
openai_api_key = System.get_env("OPENAI_API_KEY")

if openai_api_key == nil do
raise "You have to set OPENAI_API_KEY env"
end

openai_ws_opts = [
extra_headers: [
{"Authorization", "Bearer " <> openai_api_key},
{"OpenAI-Beta", "realtime=v1"}
]
]

:ok
```

## Running the server

Now, let's start the pipeline.

```elixir
{:ok, _supervisor, pipeline} =
Membrane.Pipeline.start_link(OpenAIPipeline,
openai_ws_opts: openai_ws_opts,
webrtc_source_ws_port: 8829,
webrtc_sink_ws_port: 8831
)

:inets.start()

:inets.start(:httpd,
bind_address: ~c"localhost",
port: 8000,
document_root: ~c"#{__DIR__}/assets",
server_name: ~c"webrtc",
server_root: "/tmp"
)

Process.monitor(pipeline)

receive do
{:DOWN, _ref, :process, ^pipeline, _reason} -> :ok
end
```

Enter <http://localhost:8000/index.html> from the new tab of Google Chrome and start your conversation with the AI!

Transcription of AI answers will be available in the logs of the cell below.