Skip to content

Commit

Permalink
New webhooks playStart, playStop, recordStart issue #4666
Browse files Browse the repository at this point in the history
  • Loading branch information
lastpeony committed Mar 8, 2023
1 parent 7543ea1 commit 2a3a296
Show file tree
Hide file tree
Showing 16 changed files with 384 additions and 132 deletions.
88 changes: 70 additions & 18 deletions src/main/java/io/antmedia/AntMediaApplicationAdapter.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.regex.Pattern;
import java.util.Queue;
import java.util.Set;

Expand Down Expand Up @@ -47,8 +46,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.errorprone.annotations.NoAllocation;

import io.antmedia.cluster.ClusterNode;
import io.antmedia.cluster.IClusterNotifier;
import io.antmedia.datastore.db.DataStore;
Expand All @@ -65,7 +62,6 @@
import io.antmedia.plugin.api.IFrameListener;
import io.antmedia.plugin.api.IPacketListener;
import io.antmedia.plugin.api.IStreamListener;
import io.antmedia.plugin.api.StreamParametersInfo;
import io.antmedia.rest.RestServiceBase;
import io.antmedia.rest.model.Result;
import io.antmedia.security.AcceptOnlyStreamsInDataStore;
Expand Down Expand Up @@ -100,6 +96,9 @@ public class AntMediaApplicationAdapter extends MultiThreadedApplicationAdapter
public static final String HOOK_ACTION_PUBLISH_TIMEOUT_ERROR = "publishTimeoutError";
public static final String HOOK_ACTION_ENCODER_NOT_OPENED_ERROR = "encoderNotOpenedError";
public static final String HOOK_ACTION_ENDPOINT_FAILED = "endpointFailed";
public static final String HOOK_ACTION_START_PLAY = "playStart";
public static final String HOOK_ACTION_STOP_PLAY = "playStop";
public static final String HOOK_ACTION_START_RECORD = "recordStart";

public static final String STREAMS = "streams";

Expand Down Expand Up @@ -490,7 +489,8 @@ public void closeBroadcast(String streamId) {
final String category = broadcast.getCategory();
final String metaData = broadcast.getMetaData();
logger.info("Setting timer to call live stream ended hook for stream:{}",streamId );
vertx.runOnContext(e -> notifyHook(listenerHookURL, streamId, HOOK_ACTION_END_LIVE_STREAM, name, category, null, null, metaData));
vertx.runOnContext(e -> notifyHook(listenerHookURL, streamId, HOOK_ACTION_END_LIVE_STREAM, name, category, null, null,
null, metaData));
}

if (broadcast.isZombi()) {
Expand Down Expand Up @@ -541,13 +541,60 @@ public void resetDASHStats(String streamId) {
}
}

public void sendStartPlayWebHook(final String viewerPlayType, final String streamId, final String viewerId){
final Broadcast broadcast = getDataStore().get(streamId);
final String listenerHookURL = broadcast.getListenerHookURL();
if (listenerHookURL == null || listenerHookURL.isEmpty()) {
return;
}
final String name = broadcast.getName();
final String category = broadcast.getCategory();
logger.info("Setting timer to call viewer play started hook for stream:{}", streamId);
vertx.setTimer(10, e -> notifyHook(listenerHookURL, streamId, HOOK_ACTION_START_PLAY, name, category,
null, null, viewerId, null));
}

public void sendStopPlayWebHook(final String viewerPlayType, final String streamId, final String viewerId){
final Broadcast broadcast = getDataStore().get(streamId);
final String listenerHookURL = broadcast.getListenerHookURL();
if (listenerHookURL == null || listenerHookURL.isEmpty()) {
return;
}
final String name = broadcast.getName();
final String category = broadcast.getCategory();
logger.info("Setting timer to call viewer play stopped hook for stream:{}", streamId);
vertx.setTimer(10, e -> notifyHook(listenerHookURL, streamId, HOOK_ACTION_STOP_PLAY, name, category,
null, null, viewerId, null));
}

public void sendStartRecordWebHook(final String streamId){
final Broadcast broadcast = getDataStore().get(streamId);
final String listenerHookURL = broadcast.getListenerHookURL();
if (listenerHookURL == null || listenerHookURL.isEmpty()) {
return;
}
final String name = broadcast.getName();
final String category = broadcast.getCategory();
logger.info("Setting timer to call stream start recording hook for stream:{}", streamId);
vertx.setTimer(10, e -> notifyHook(listenerHookURL, streamId, HOOK_ACTION_START_RECORD, name, category,
null, null, null, null));
}

private String getRtmpViewerId(){
return "rtmp_" + RandomStringUtils.randomNumeric(8);
}

@Override
public void streamPlayItemPlay(ISubscriberStream stream, IPlayItem item, boolean isLive) {
vertx.setTimer(1, l -> getDataStore().updateRtmpViewerCount(item.getName(), true));
final String streamId = item.getName();
sendStartPlayWebHook(ViewerStats.RTMP_TYPE, streamId, getRtmpViewerId());
vertx.setTimer(1, l -> getDataStore().updateRtmpViewerCount(streamId, true));
}
@Override
public void streamPlayItemStop(ISubscriberStream stream, IPlayItem item) {
vertx.setTimer(1, l -> getDataStore().updateRtmpViewerCount(item.getName(), false));
final String streamId = item.getName();
sendStopPlayWebHook(ViewerStats.RTMP_TYPE, streamId, getRtmpViewerId());
vertx.setTimer(1, l -> getDataStore().updateRtmpViewerCount(streamId, false));
}

@Override
Expand All @@ -559,7 +606,6 @@ public void streamSubscriberClose(ISubscriberStream stream) {
public void startPublish(String streamId, long absoluteStartTimeMs, String publishType) {
vertx.executeBlocking( handler -> {
try {

Broadcast broadcast = updateBroadcastStatus(streamId, absoluteStartTimeMs, publishType, getDataStore().get(streamId));

final String listenerHookURL = getListenerHookURL(broadcast);
Expand All @@ -569,8 +615,13 @@ public void startPublish(String streamId, long absoluteStartTimeMs, String publi
final String category = broadcast.getCategory();
final String metaData = broadcast.getMetaData();
logger.info("Setting timer to call live stream started hook for stream:{}",streamId );
vertx.setTimer(10, e -> notifyHook(listenerHookURL, streamId, HOOK_ACTION_START_LIVE_STREAM, name, category,
null, null, metaData));
vertx.setTimer(10, e -> notifyHook(listenerHookURL, streamId, HOOK_ACTION_START_LIVE_STREAM, name, category, null, null,null, metaData));
}

if ((broadcast.getMp4Enabled() == MuxAdaptor.RECORDING_ENABLED_FOR_STREAM || broadcast.getWebMEnabled() == MuxAdaptor.RECORDING_ENABLED_FOR_STREAM)
|| (appSettings.isMp4MuxingEnabled() || appSettings.isWebMMuxingEnabled())
) {
sendStartRecordWebHook(streamId);
}

int ingestingStreamLimit = appSettings.getIngestingStreamLimit();
Expand Down Expand Up @@ -739,7 +790,7 @@ public void muxingFinished(final String streamId, File file, long startTime, lon
final String metaData = (broadcast != null) ? broadcast.getMetaData() : null;
String finalListenerHookURL = listenerHookURL;
logger.info("Setting timer for calling vod ready hook for stream:{}", streamId);
vertx.runOnContext(e -> notifyHook(finalListenerHookURL, streamId, HOOK_ACTION_VOD_READY, null, null, baseName, vodIdFinal, metaData));
vertx.runOnContext(e -> notifyHook(finalListenerHookURL, streamId, HOOK_ACTION_VOD_READY, null, null, baseName, vodIdFinal, null, metaData));
}

String muxerFinishScript = appSettings.getMuxerFinishScript();
Expand Down Expand Up @@ -810,7 +861,7 @@ public static String getRelativePath(String filePath){
* @return
*/
public StringBuilder notifyHook(String url, String id, String action, String streamName, String category,
String vodName, String vodId, String metadata) {
String vodName, String vodId, String viewerId, String metadata) {
StringBuilder response = null;
logger.info("Running notify hook url:{} stream id: {} action:{} vod name:{} vod id:{}", url, id, action, vodName, vodId);
if (url != null && url.length() > 0) {
Expand All @@ -833,6 +884,10 @@ public StringBuilder notifyHook(String url, String id, String action, String str
variables.put("vodId", vodId);
}

if(viewerId != null){
variables.put("viewerId", viewerId);
}

if (metadata != null) {
variables.put("metadata", metadata);
}
Expand Down Expand Up @@ -1309,7 +1364,7 @@ public synchronized void incrementEncoderNotOpenedError(String streamId) {
final String category = broadcast.getCategory();
final String metaData = broadcast.getMetaData();
logger.info("Setting timer to call encoder not opened error for stream:{}", streamId);
vertx.runOnContext(e -> notifyHook(listenerHookURL, streamId, HOOK_ACTION_ENCODER_NOT_OPENED_ERROR, name, category, null, null, metaData));
vertx.runOnContext(e -> notifyHook(listenerHookURL, streamId, HOOK_ACTION_ENCODER_NOT_OPENED_ERROR, name, category, null, null, null,metaData));
}
}
}
Expand All @@ -1334,11 +1389,9 @@ public synchronized void publishTimeoutError(String streamId, String subscriberI
final String name = broadcast.getName();
final String category = broadcast.getCategory();
logger.info("Setting timer to call hook that means live stream is not started to the publish timeout for stream:{}", streamId);

JSONObject jsonResponse = new JSONObject();
jsonResponse.put(WebSocketConstants.SUBSCRIBER_ID, subscriberId);

vertx.runOnContext(e -> notifyHook(listenerHookURL, streamId, HOOK_ACTION_PUBLISH_TIMEOUT_ERROR, name, category, null, null, jsonResponse.toJSONString()));
vertx.runOnContext(e -> notifyHook(listenerHookURL, streamId, HOOK_ACTION_PUBLISH_TIMEOUT_ERROR, name, category, null, null, null, jsonResponse.toJSONString()));
}
}
}
Expand Down Expand Up @@ -1450,7 +1503,6 @@ private boolean isEncoderSettingsValid(List<EncoderSettings> encoderSettingsList
/**
*
* @param newSettings
* @param checkUpdateTime
* @return true if timing is valid, false if it is invalid
*/
public boolean isIncomingTimeValid(AppSettings newSettings)
Expand Down Expand Up @@ -1697,7 +1749,7 @@ public void endpointFailedUpdate(String streamId, String url) {
logger.info("Setting timer to call rtmp endpoint failed hook for stream:{}", streamId);
JSONObject jsonObject = new JSONObject();
jsonObject.put("rtmp-url", url);
vertx.runOnContext(e -> notifyHook(listenerHookURL, streamId, HOOK_ACTION_ENDPOINT_FAILED, name, category, null, null, jsonObject.toJSONString()));
vertx.runOnContext(e -> notifyHook(listenerHookURL, streamId, HOOK_ACTION_ENDPOINT_FAILED, name, category, null, null, null, jsonObject.toJSONString()));
}
}
}
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/io/antmedia/filter/AbstractFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;

import io.antmedia.AntMediaApplicationAdapter;
import org.apache.catalina.util.NetMask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -154,5 +155,16 @@ public Broadcast getBroadcast(String streamId) {
}
return broadcast;
}

protected AntMediaApplicationAdapter getAntMediaApplicationAdapter(){
AntMediaApplicationAdapter antMediaApplicationAdapter = null;
ApplicationContext context = getAppContext();
if (context != null)
{
antMediaApplicationAdapter= (AntMediaApplicationAdapter)context.getBean(AntMediaApplicationAdapter.BEAN_NAME);
}
return antMediaApplicationAdapter;

}

}
6 changes: 3 additions & 3 deletions src/main/java/io/antmedia/filter/DashStatisticsFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.HttpMethod;

import io.antmedia.statistic.ViewerStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -25,7 +26,7 @@ public class DashStatisticsFilter extends AbstractFilter {
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException {


System.out.println("i got called");
HttpServletRequest httpRequest =(HttpServletRequest)request;

String method = httpRequest.getMethod();
Expand All @@ -52,8 +53,7 @@ public void doFilter(ServletRequest request, ServletResponse response, FilterCha
logger.debug("req ip {} session id {} stream id {} status {}", request.getRemoteHost(), sessionId, streamId, status);
IStreamStats stats = getStreamStats(DashViewerStats.BEAN_NAME);
if (stats != null) {
stats.registerNewViewer(streamId, sessionId, subscriberId);

stats.registerNewViewer(streamId, sessionId, subscriberId, ViewerStats.DASH_TYPE, getAntMediaApplicationAdapter());
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/antmedia/filter/HlsStatisticsFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.HttpMethod;

import io.antmedia.statistic.ViewerStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -52,8 +53,7 @@ public void doFilter(ServletRequest request, ServletResponse response, FilterCha
logger.debug("req ip {} session id {} stream id {} status {}", request.getRemoteHost(), sessionId, streamId, status);
IStreamStats stats = getStreamStats(HlsViewerStats.BEAN_NAME);
if (stats != null) {
stats.registerNewViewer(streamId, sessionId, subscriberId);

stats.registerNewViewer(streamId, sessionId, subscriberId, ViewerStats.HLS_TYPE, getAntMediaApplicationAdapter());
}
}
}
Expand Down
9 changes: 6 additions & 3 deletions src/main/java/io/antmedia/muxer/MuxAdaptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -1689,7 +1689,6 @@ private Muxer addMp4Muxer() {
* @return
*/
public RecordMuxer startRecording(RecordType recordType) {

if (!isRecording.get()) {
logger.warn("Starting recording return false for stream:{} because stream is being prepared", streamId);
return null;
Expand All @@ -1700,7 +1699,6 @@ public RecordMuxer startRecording(RecordType recordType) {
return null;
}


RecordMuxer muxer = null;
if(recordType == RecordType.MP4) {
Mp4Muxer mp4Muxer = createMp4Muxer();
Expand Down Expand Up @@ -1870,10 +1868,15 @@ public Result startRtmpStreaming(String rtmpUrl, int resolutionHeight)
}

public void sendEndpointErrorNotifyHook(String url){
AntMediaApplicationAdapter adaptor = getAntMediaApplicationAdaptor();
adaptor.endpointFailedUpdate(this.streamId, url);
}

protected AntMediaApplicationAdapter getAntMediaApplicationAdaptor(){
IContext context = MuxAdaptor.this.scope.getContext();
ApplicationContext appCtx = context.getApplicationContext();
AntMediaApplicationAdapter adaptor = (AntMediaApplicationAdapter) appCtx.getBean(AntMediaApplicationAdapter.BEAN_NAME);
adaptor.endpointFailedUpdate(this.streamId, url);
return adaptor;
}

/**
Expand Down
5 changes: 3 additions & 2 deletions src/main/java/io/antmedia/statistic/DashViewerStats.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.antmedia.statistic;


import io.antmedia.AntMediaApplicationAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
Expand Down Expand Up @@ -28,11 +29,11 @@ public void setApplicationContext(ApplicationContext applicationContext) {

AppSettings settings = (AppSettings)applicationContext.getBean(AppSettings.BEAN_NAME);
timeoutMS = getTimeoutMSFromSettings(settings, timeoutMS, DASH_TYPE);

final AntMediaApplicationAdapter antMediaApplicationAdapter = (AntMediaApplicationAdapter)applicationContext.getBean(AntMediaApplicationAdapter.BEAN_NAME);
vertx.setPeriodic(DEFAULT_TIME_PERIOD_FOR_VIEWER_COUNT, yt->
{
synchronized (lock) {
updateViewerCountProcess(DASH_TYPE);
updateViewerCountProcess(DASH_TYPE, antMediaApplicationAdapter);
}
});
}
Expand Down
6 changes: 4 additions & 2 deletions src/main/java/io/antmedia/statistic/HlsViewerStats.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.antmedia.statistic;

import io.antmedia.AntMediaApplicationAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
Expand Down Expand Up @@ -28,11 +29,12 @@ public void setApplicationContext(ApplicationContext applicationContext) {

AppSettings settings = (AppSettings)applicationContext.getBean(AppSettings.BEAN_NAME);
timeoutMS = getTimeoutMSFromSettings(settings, timeoutMS, HLS_TYPE);

final AntMediaApplicationAdapter antMediaApplicationAdapter = (AntMediaApplicationAdapter)applicationContext.getBean(AntMediaApplicationAdapter.BEAN_NAME);

vertx.setPeriodic(DEFAULT_TIME_PERIOD_FOR_VIEWER_COUNT, yt->
{
synchronized (lock) {
updateViewerCountProcess(HLS_TYPE);
updateViewerCountProcess(HLS_TYPE, antMediaApplicationAdapter);
}
});
}
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/io/antmedia/statistic/IStreamStats.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package io.antmedia.statistic;

import io.antmedia.AntMediaApplicationAdapter;

public interface IStreamStats {

/**
* Register a new viewer to a stream
* @param streamId
* @param sessionId
*/
void registerNewViewer(String streamId, String sessionId, String subscriberId);
void registerNewViewer(String streamId, String sessionId, String subscriberId, String playType, AntMediaApplicationAdapter antMediaApplicationAdapter);


/**
Expand Down
Loading

0 comments on commit 2a3a296

Please sign in to comment.