From d8c07c088c6da98c0ee8f1b8bdc05b7e83d3d525 Mon Sep 17 00:00:00 2001 From: Boris Grozev Date: Thu, 24 Oct 2024 13:52:09 -0500 Subject: [PATCH] feat: Export audio to a recorder over a WS. --- .../node/incoming/AudioLevelReader.kt | 6 +- jvb/pom.xml | 4 + .../org/jitsi/videobridge/Conference.java | 19 ++- .../videobridge/PotentialPacketHandler.java | 5 +- .../colibri2/Colibri2ConferenceHandler.kt | 1 + .../org/jitsi/videobridge/export/Exporter.kt | 108 ++++++++++++++++++ .../videobridge/export/MediaJsonEncoder.kt | 102 +++++++++++++++++ pom.xml | 9 +- 8 files changed, 246 insertions(+), 8 deletions(-) create mode 100644 jvb/src/main/kotlin/org/jitsi/videobridge/export/Exporter.kt create mode 100644 jvb/src/main/kotlin/org/jitsi/videobridge/export/MediaJsonEncoder.kt diff --git a/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/transform/node/incoming/AudioLevelReader.kt b/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/transform/node/incoming/AudioLevelReader.kt index d57957e3f2..6c28201da5 100644 --- a/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/transform/node/incoming/AudioLevelReader.kt +++ b/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/transform/node/incoming/AudioLevelReader.kt @@ -74,16 +74,16 @@ class AudioLevelReader( if (!silence) stats.nonSilence(AudioLevelHeaderExtension.getVad(ext)) if (silence && forwardedSilencePackets > forwardedSilencePacketsLimit) { - packetInfo.shouldDiscard = true + // packetInfo.shouldDiscard = true stats.discardedSilence() } else if (this@AudioLevelReader.forceMute) { - packetInfo.shouldDiscard = true + // packetInfo.shouldDiscard = true stats.discardedForceMute() } else { forwardedSilencePackets = if (silence) forwardedSilencePackets + 1 else 0 audioLevelListener?.let { listener -> if (listener.onLevelReceived(audioRtpPacket.ssrc, (127 - level).toPositiveLong())) { - packetInfo.shouldDiscard = true + // packetInfo.shouldDiscard = true stats.discardedRanking() } } diff --git a/jvb/pom.xml b/jvb/pom.xml index 2aaa72ad20..cdd97ef912 100644 --- a/jvb/pom.xml +++ b/jvb/pom.xml @@ -139,6 +139,10 @@ ${project.groupId} jicoco-config + + ${project.groupId} + jicoco-mediajson + ${project.groupId} jicoco-metrics diff --git a/jvb/src/main/java/org/jitsi/videobridge/Conference.java b/jvb/src/main/java/org/jitsi/videobridge/Conference.java index cb5ff73882..854c95072a 100644 --- a/jvb/src/main/java/org/jitsi/videobridge/Conference.java +++ b/jvb/src/main/java/org/jitsi/videobridge/Conference.java @@ -29,6 +29,7 @@ import org.jitsi.utils.logging2.*; import org.jitsi.utils.queue.*; import org.jitsi.videobridge.colibri2.*; +import org.jitsi.videobridge.export.*; import org.jitsi.videobridge.message.*; import org.jitsi.videobridge.metrics.*; import org.jitsi.videobridge.relay.*; @@ -40,7 +41,6 @@ import org.json.simple.*; import org.jxmpp.jid.*; -import java.time.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; @@ -180,6 +180,9 @@ public long getLocalVideoSsrc() @Nullable private final String meetingId; + @NotNull + private final Exporter exporter = new Exporter(); + /** * A regex pattern to trim UUIDs to just their first 8 hex characters. */ @@ -599,6 +602,7 @@ void expire() logger.debug(() -> "Expiring endpoints."); getEndpoints().forEach(AbstractEndpoint::expire); getRelays().forEach(Relay::expire); + exporter.stop(); speechActivity.expire(); updateStatisticsOnExpire(); @@ -1118,6 +1122,14 @@ private void sendOut(PacketInfo packetInfo) prevHandler = relay; } } + if (exporter.wants(packetInfo)) + { + if (prevHandler != null) + { + prevHandler.send(packetInfo.clone()); + } + prevHandler = exporter; + } if (prevHandler != null) { @@ -1130,6 +1142,11 @@ private void sendOut(PacketInfo packetInfo) } } + public void setConnects(List exports) + { + exporter.setConnects(exports); + } + public boolean hasRelays() { return !relaysById.isEmpty(); diff --git a/jvb/src/main/java/org/jitsi/videobridge/PotentialPacketHandler.java b/jvb/src/main/java/org/jitsi/videobridge/PotentialPacketHandler.java index 56fd57b4a8..5161a72736 100644 --- a/jvb/src/main/java/org/jitsi/videobridge/PotentialPacketHandler.java +++ b/jvb/src/main/java/org/jitsi/videobridge/PotentialPacketHandler.java @@ -16,6 +16,7 @@ package org.jitsi.videobridge; +import org.jetbrains.annotations.*; import org.jitsi.nlj.*; public interface PotentialPacketHandler @@ -26,11 +27,11 @@ public interface PotentialPacketHandler * @param packet the RTP/RTCP packet * @return true if this handler wants the given packet, false otherwise */ - boolean wants(PacketInfo packet); + boolean wants(@NotNull PacketInfo packet); /** * Send the given RTP/RTCP 'packet' (which came from 'source') * @param packet the RTP/RTCP packet */ - void send(PacketInfo packet); + void send(@NotNull PacketInfo packet); } diff --git a/jvb/src/main/kotlin/org/jitsi/videobridge/colibri2/Colibri2ConferenceHandler.kt b/jvb/src/main/kotlin/org/jitsi/videobridge/colibri2/Colibri2ConferenceHandler.kt index c0f466e09d..69fd304d4a 100644 --- a/jvb/src/main/kotlin/org/jitsi/videobridge/colibri2/Colibri2ConferenceHandler.kt +++ b/jvb/src/main/kotlin/org/jitsi/videobridge/colibri2/Colibri2ConferenceHandler.kt @@ -76,6 +76,7 @@ class Colibri2ConferenceHandler( for (e in conferenceModifyIQ.endpoints) { responseBuilder.addEndpoint(handleColibri2Endpoint(e, ignoreUnknownEndpoints)) } + conferenceModifyIQ.connects?.let { conference.setConnects(it.getConnects()) } for (r in conferenceModifyIQ.relays) { if (!RelayConfig.config.enabled) { throw IqProcessingException(Condition.feature_not_implemented, "Octo is disabled in configuration.") diff --git a/jvb/src/main/kotlin/org/jitsi/videobridge/export/Exporter.kt b/jvb/src/main/kotlin/org/jitsi/videobridge/export/Exporter.kt new file mode 100644 index 0000000000..28efbb6c99 --- /dev/null +++ b/jvb/src/main/kotlin/org/jitsi/videobridge/export/Exporter.kt @@ -0,0 +1,108 @@ +/* + * Copyright @ 2024 - Present, 8x8 Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.jitsi.videobridge.export + +import org.eclipse.jetty.websocket.api.WebSocketAdapter +import org.eclipse.jetty.websocket.client.ClientUpgradeRequest +import org.eclipse.jetty.websocket.client.WebSocketClient +import org.jitsi.nlj.PacketInfo +import org.jitsi.nlj.rtp.AudioRtpPacket +import org.jitsi.nlj.util.PacketInfoQueue +import org.jitsi.utils.logging2.createLogger +import org.jitsi.videobridge.PotentialPacketHandler +import org.jitsi.videobridge.colibri2.FeatureNotImplementedException +import org.jitsi.videobridge.util.ByteBufferPool +import org.jitsi.videobridge.util.TaskPools +import org.jitsi.videobridge.websocket.config.WebsocketServiceConfig +import org.jitsi.xmpp.extensions.colibri2.Connect + +class Exporter : PotentialPacketHandler { + val logger = createLogger() + var started = false + val queue = PacketInfoQueue( + "${javaClass.simpleName}-packet-queue", + TaskPools.IO_POOL, + this::doHandlePacket, + 128 + ) + + private var wsNotConnectedErrors = 0 + private fun logWsNotConnectedError(): Boolean = (wsNotConnectedErrors++ % 1000) == 0 + private val encoder = MediaJsonEncoder { + if (recorderWebSocket.isConnected) { + recorderWebSocket.remote?.sendString(it.toJson()) + ?: logger.info("Websocket is connected, but remote is null") + } else if (logWsNotConnectedError()) { + logger.info("Can not send packet, websocket is not connected (count=$wsNotConnectedErrors).") + } + } + private var recorderWebSocket = WebSocketAdapter() + + fun setConnects(exports: List) { + when { + started && exports.isNotEmpty() -> throw FeatureNotImplementedException("Changing exports once enabled.") + exports.isEmpty() -> stop() + exports.size > 1 -> throw FeatureNotImplementedException("Multiple exports") + exports[0].video -> throw FeatureNotImplementedException("Video") + else -> start(exports[0]) + } + } + + private fun doHandlePacket(packet: PacketInfo): Boolean { + if (started) { + encoder.encode(packet.packetAs(), packet.endpointId!!) + } + ByteBufferPool.returnBuffer(packet.packet.buffer) + return true + } + + override fun wants(packet: PacketInfo): Boolean = started && packet.packet is AudioRtpPacket + + override fun send(packet: PacketInfo) { + if (started) { + queue.add(packet) + } else { + ByteBufferPool.returnBuffer(packet.packet.buffer) + } + } + + fun stop() { + started = false + logger.info("Stopping.") + recorderWebSocket.session?.close(org.eclipse.jetty.websocket.core.CloseStatus.SHUTDOWN, "closing") + } + + fun start(connect: Connect) { + if (connect.video) throw FeatureNotImplementedException("Video") + if (connect.protocol != Connect.Protocols.MEDIAJSON) { + throw FeatureNotImplementedException("Protocol ${connect.protocol}") + } + if (connect.type != Connect.Types.RECORDER) { + throw FeatureNotImplementedException("Type ${connect.type}") + } + + logger.info("Starting with url=${connect.url}") + webSocketClient.connect(recorderWebSocket, connect.url, ClientUpgradeRequest()) + started = true + } + + companion object { + val webSocketClient = WebSocketClient().apply { + idleTimeout = WebsocketServiceConfig.config.idleTimeout + start() + } + } +} diff --git a/jvb/src/main/kotlin/org/jitsi/videobridge/export/MediaJsonEncoder.kt b/jvb/src/main/kotlin/org/jitsi/videobridge/export/MediaJsonEncoder.kt new file mode 100644 index 0000000000..9b119ebe16 --- /dev/null +++ b/jvb/src/main/kotlin/org/jitsi/videobridge/export/MediaJsonEncoder.kt @@ -0,0 +1,102 @@ +/* + * Copyright @ 2024 - Present, 8x8 Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.jitsi.videobridge.export + +import org.jitsi.mediajson.CustomParameters +import org.jitsi.mediajson.Event +import org.jitsi.mediajson.Media +import org.jitsi.mediajson.MediaEvent +import org.jitsi.mediajson.MediaFormat +import org.jitsi.mediajson.Start +import org.jitsi.mediajson.StartEvent +import org.jitsi.nlj.rtp.AudioRtpPacket +import org.jitsi.nlj.util.Rfc3711IndexTracker +import org.jitsi.rtp.rtp.RtpPacket +import org.jitsi.utils.logging2.createLogger +import java.time.Clock +import java.time.Duration +import kotlin.io.encoding.Base64 +import kotlin.io.encoding.ExperimentalEncodingApi + +/** + * Encodes the media in a conference into a mediajson format. Maintains state for each SSRC in order to maintain a + * common space for timestamps. + * + * Note we're using a common clock with a rate of 48000 for all SSRCs (that's equivalent to the RTP timestamp for opus). + */ +class MediaJsonEncoder( + /** Encoded mediajson events are sent to this function */ + val handleEvent: (Event) -> Unit +) { + val logger = createLogger() + val ref = Clock.systemUTC().instant() + + private data class SsrcState( + val ssrc: Long, + val initialRtpTs: Long, + // Offset of this SSRC since the start time in RTP units + val offset: Long + ) + + private val ssrcsStarted = mutableSetOf() + var seq = 0 + + fun encode(p: AudioRtpPacket, epId: String) = synchronized(ssrcsStarted) { + if (ssrcsStarted.none { it.ssrc == p.ssrc }) { + // This is a new SSRC, save it and produce a StartEvent + val state = SsrcState( + p.ssrc, + p.timestamp, + (Duration.between(ref, Clock.systemUTC().instant()).toNanos() * 48.0e-6).toLong() + ) + ssrcsStarted.add(state) + val startEvent = StartEvent( + ++seq, + Start( + "$epId-${p.ssrc}", + MediaFormat( + "opus", + 48000, + 2 + ), + CustomParameters(endpointId = epId) + ) + ) + handleEvent(startEvent) + } + + seq++ + handleEvent(p.encodeAsJson(epId)) + } + + @OptIn(ExperimentalEncodingApi::class) + private fun RtpPacket.encodeAsJson(epId: String): Event { + val ssrcState = ssrcsStarted.find { it.ssrc == this.ssrc }!! + val elapsedRtpTime = this.timestamp - ssrcState.initialRtpTs + val ts = elapsedRtpTime + ssrcState.offset + val indexTracker = Rfc3711IndexTracker() + val p = MediaEvent( + seq, + media = Media( + "$epId-${this.ssrc}", + indexTracker.update(this.sequenceNumber), + ts, + Base64.encode(this.buffer, this.payloadOffset, this.payloadOffset + this.payloadLength) + ) + ) + return p + } +} diff --git a/pom.xml b/pom.xml index 96df738c07..cba92a6306 100644 --- a/pom.xml +++ b/pom.xml @@ -28,7 +28,7 @@ 5.9.1 5.10.2 1.0-133-g6af1020 - 1.1-143-g175c44b + 1.1-144-ga2c5ec1 1.13.11 3.2.0 3.6.0 @@ -93,6 +93,11 @@ jicoco-metrics ${jicoco.version} + + ${project.groupId} + jicoco-mediajson + ${jicoco.version} + ${project.groupId} jitsi-utils @@ -111,7 +116,7 @@ ${project.groupId} jitsi-xmpp-extensions - 1.0-81-g3816e5a + 1.0-SNAPSHOT