From 88444c5f1b466b25717c9d33d23f41edcfaffdaa Mon Sep 17 00:00:00 2001 From: lastpeony Date: Tue, 3 Jan 2023 11:48:27 +0300 Subject: [PATCH] New webhooks playStart, playStop, recordStart issue #4666 --- .../antmedia/AntMediaApplicationAdapter.java | 84 ++++++++++-- .../io/antmedia/filter/AbstractFilter.java | 12 ++ .../antmedia/filter/DashStatisticsFilter.java | 6 +- .../antmedia/filter/HlsStatisticsFilter.java | 4 +- .../java/io/antmedia/muxer/MuxAdaptor.java | 9 +- .../antmedia/statistic/DashViewerStats.java | 5 +- .../io/antmedia/statistic/HlsViewerStats.java | 6 +- .../io/antmedia/statistic/IStreamStats.java | 4 +- .../io/antmedia/statistic/ViewerStats.java | 34 +++-- .../AntMediaApplicationAdaptorUnitTest.java | 77 +++++++++-- .../java/io/antmedia/test/Application.java | 6 +- .../java/io/antmedia/test/MuxerUnitTest.java | 2 +- .../test/filter/DashStatisticsFilterTest.java | 66 ++++++--- .../test/filter/HlsStatisticsFilterTest.java | 26 ++-- .../test/statistic/DashViewerStatsTest.java | 128 ++++++++++++++---- .../test/statistic/HlsViewerStatsTest.java | 43 ++++-- 16 files changed, 383 insertions(+), 129 deletions(-) diff --git a/src/main/java/io/antmedia/AntMediaApplicationAdapter.java b/src/main/java/io/antmedia/AntMediaApplicationAdapter.java index aef8b80db..f9e2372e2 100644 --- a/src/main/java/io/antmedia/AntMediaApplicationAdapter.java +++ b/src/main/java/io/antmedia/AntMediaApplicationAdapter.java @@ -15,7 +15,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.regex.Pattern; import java.util.Queue; import java.util.Set; @@ -47,8 +46,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.errorprone.annotations.NoAllocation; - import io.antmedia.cluster.ClusterNode; import io.antmedia.cluster.IClusterNotifier; import io.antmedia.datastore.db.DataStore; @@ -65,7 +62,6 @@ import io.antmedia.plugin.api.IFrameListener; import io.antmedia.plugin.api.IPacketListener; import io.antmedia.plugin.api.IStreamListener; -import io.antmedia.plugin.api.StreamParametersInfo; import io.antmedia.rest.RestServiceBase; import io.antmedia.rest.model.Result; import io.antmedia.security.AcceptOnlyStreamsInDataStore; @@ -99,6 +95,9 @@ public class AntMediaApplicationAdapter extends MultiThreadedApplicationAdapter public static final String HOOK_ACTION_PUBLISH_TIMEOUT_ERROR = "publishTimeoutError"; public static final String HOOK_ACTION_ENCODER_NOT_OPENED_ERROR = "encoderNotOpenedError"; public static final String HOOK_ACTION_ENDPOINT_FAILED = "endpointFailed"; + public static final String HOOK_ACTION_START_PLAY = "playStart"; + public static final String HOOK_ACTION_STOP_PLAY = "playStop"; + public static final String HOOK_ACTION_START_RECORD = "recordStart"; public static final String STREAMS = "streams"; @@ -488,7 +487,7 @@ public void closeBroadcast(String streamId) { final String name = broadcast.getName(); final String category = broadcast.getCategory(); logger.info("Setting timer to call live stream ended hook for stream:{}",streamId ); - vertx.runOnContext(e -> notifyHook(listenerHookURL, streamId, HOOK_ACTION_END_LIVE_STREAM, name, category, null, null, null)); + vertx.runOnContext(e -> notifyHook(listenerHookURL, streamId, HOOK_ACTION_END_LIVE_STREAM, name, category, null, null, null, null)); } if (broadcast.isZombi()) { @@ -539,13 +538,60 @@ public void resetDASHStats(String streamId) { } } + public void sendStartPlayWebHook(final String viewerPlayType, final String streamId, final String viewerId){ + final Broadcast broadcast = getDataStore().get(streamId); + final String listenerHookURL = broadcast.getListenerHookURL(); + if (listenerHookURL == null || listenerHookURL.isEmpty()) { + return; + } + final String name = broadcast.getName(); + final String category = broadcast.getCategory(); + logger.info("Setting timer to call viewer play started hook for stream:{}", streamId); + vertx.setTimer(10, e -> notifyHook(listenerHookURL, streamId, HOOK_ACTION_START_PLAY, name, category, + null, null, viewerId, null)); + } + + public void sendStopPlayWebHook(final String viewerPlayType, final String streamId, final String viewerId){ + final Broadcast broadcast = getDataStore().get(streamId); + final String listenerHookURL = broadcast.getListenerHookURL(); + if (listenerHookURL == null || listenerHookURL.isEmpty()) { + return; + } + final String name = broadcast.getName(); + final String category = broadcast.getCategory(); + logger.info("Setting timer to call viewer play stopped hook for stream:{}", streamId); + vertx.setTimer(10, e -> notifyHook(listenerHookURL, streamId, HOOK_ACTION_STOP_PLAY, name, category, + null, null, viewerId, null)); + } + + public void sendStartRecordWebHook(final String streamId){ + final Broadcast broadcast = getDataStore().get(streamId); + final String listenerHookURL = broadcast.getListenerHookURL(); + if (listenerHookURL == null || listenerHookURL.isEmpty()) { + return; + } + final String name = broadcast.getName(); + final String category = broadcast.getCategory(); + logger.info("Setting timer to call stream start recording hook for stream:{}", streamId); + vertx.setTimer(10, e -> notifyHook(listenerHookURL, streamId, HOOK_ACTION_START_RECORD, name, category, + null, null, null, null)); + } + + private String getRtmpViewerId(){ + return "rtmp_" + RandomStringUtils.randomNumeric(8); + } + @Override public void streamPlayItemPlay(ISubscriberStream stream, IPlayItem item, boolean isLive) { - vertx.setTimer(1, l -> getDataStore().updateRtmpViewerCount(item.getName(), true)); + final String streamId = item.getName(); + sendStartPlayWebHook(ViewerStats.RTMP_TYPE, streamId, getRtmpViewerId()); + vertx.setTimer(1, l -> getDataStore().updateRtmpViewerCount(streamId, true)); } @Override public void streamPlayItemStop(ISubscriberStream stream, IPlayItem item) { - vertx.setTimer(1, l -> getDataStore().updateRtmpViewerCount(item.getName(), false)); + final String streamId = item.getName(); + sendStopPlayWebHook(ViewerStats.RTMP_TYPE, streamId, getRtmpViewerId()); + vertx.setTimer(1, l -> getDataStore().updateRtmpViewerCount(streamId, false)); } @Override @@ -557,7 +603,6 @@ public void streamSubscriberClose(ISubscriberStream stream) { public void startPublish(String streamId, long absoluteStartTimeMs, String publishType) { vertx.executeBlocking( handler -> { try { - Broadcast broadcast = updateBroadcastStatus(streamId, absoluteStartTimeMs, publishType, getDataStore().get(streamId)); final String listenerHookURL = getListenerHookURL(broadcast); @@ -567,7 +612,13 @@ public void startPublish(String streamId, long absoluteStartTimeMs, String publi final String category = broadcast.getCategory(); logger.info("Setting timer to call live stream started hook for stream:{}",streamId ); vertx.setTimer(10, e -> notifyHook(listenerHookURL, streamId, HOOK_ACTION_START_LIVE_STREAM, name, category, - null, null, null)); + null, null, null, null)); + } + + if ((broadcast.getMp4Enabled() == MuxAdaptor.RECORDING_ENABLED_FOR_STREAM || broadcast.getWebMEnabled() == MuxAdaptor.RECORDING_ENABLED_FOR_STREAM) + || (appSettings.isMp4MuxingEnabled() || appSettings.isWebMMuxingEnabled()) + ) { + sendStartRecordWebHook(streamId); } int ingestingStreamLimit = appSettings.getIngestingStreamLimit(); @@ -735,7 +786,7 @@ public void muxingFinished(final String streamId, File file, long startTime, lon final String baseName = vodName.substring(0, index); String finalListenerHookURL = listenerHookURL; logger.info("Setting timer for calling vod ready hook for stream:{}", streamId); - vertx.runOnContext(e -> notifyHook(finalListenerHookURL, streamId, HOOK_ACTION_VOD_READY, null, null, baseName, vodIdFinal, null)); + vertx.runOnContext(e -> notifyHook(finalListenerHookURL, streamId, HOOK_ACTION_VOD_READY, null, null, baseName, vodIdFinal, null, null)); } String muxerFinishScript = appSettings.getMuxerFinishScript(); @@ -806,7 +857,7 @@ public static String getRelativePath(String filePath){ * @return */ public StringBuilder notifyHook(String url, String id, String action, String streamName, String category, - String vodName, String vodId, String metadata) { + String vodName, String vodId, String viewerId, String metadata) { StringBuilder response = null; logger.info("Running notify hook url:{} stream id: {} action:{} vod name:{} vod id:{}", url, id, action, vodName, vodId); if (url != null && url.length() > 0) { @@ -829,6 +880,10 @@ public StringBuilder notifyHook(String url, String id, String action, String str variables.put("vodId", vodId); } + if(viewerId != null){ + variables.put("viewerId", viewerId); + } + if (metadata != null) { variables.put("metadata", metadata); } @@ -1304,7 +1359,7 @@ public synchronized void incrementEncoderNotOpenedError(String streamId) { final String name = broadcast.getName(); final String category = broadcast.getCategory(); logger.info("Setting timer to call encoder not opened error for stream:{}", streamId); - vertx.runOnContext(e -> notifyHook(listenerHookURL, streamId, HOOK_ACTION_ENCODER_NOT_OPENED_ERROR, name, category, null, null, null)); + vertx.runOnContext(e -> notifyHook(listenerHookURL, streamId, HOOK_ACTION_ENCODER_NOT_OPENED_ERROR, name, category, null, null, null,null)); } } } @@ -1329,7 +1384,7 @@ public synchronized void publishTimeoutError(String streamId) { final String name = broadcast.getName(); final String category = broadcast.getCategory(); logger.info("Setting timer to call hook that means live stream is not started to the publish timeout for stream:{}", streamId); - vertx.runOnContext(e -> notifyHook(listenerHookURL, streamId, HOOK_ACTION_PUBLISH_TIMEOUT_ERROR, name, category, null, null, null)); + vertx.runOnContext(e -> notifyHook(listenerHookURL, streamId, HOOK_ACTION_PUBLISH_TIMEOUT_ERROR, name, category, null, null, null, null)); } } } @@ -1441,7 +1496,6 @@ private boolean isEncoderSettingsValid(List encoderSettingsList /** * * @param newSettings - * @param checkUpdateTime * @return true if timing is valid, false if it is invalid */ public boolean isIncomingTimeValid(AppSettings newSettings) @@ -1686,7 +1740,7 @@ public void endpointFailedUpdate(String streamId, String url) { logger.info("Setting timer to call rtmp endpoint failed hook for stream:{}", streamId); JSONObject jsonObject = new JSONObject(); jsonObject.put("rtmp-url", url); - vertx.runOnContext(e -> notifyHook(listenerHookURL, streamId, HOOK_ACTION_ENDPOINT_FAILED, name, category, null, null, jsonObject.toJSONString())); + vertx.runOnContext(e -> notifyHook(listenerHookURL, streamId, HOOK_ACTION_ENDPOINT_FAILED, name, category, null, null, null, jsonObject.toJSONString())); } } } diff --git a/src/main/java/io/antmedia/filter/AbstractFilter.java b/src/main/java/io/antmedia/filter/AbstractFilter.java index f15b73eec..35552a668 100644 --- a/src/main/java/io/antmedia/filter/AbstractFilter.java +++ b/src/main/java/io/antmedia/filter/AbstractFilter.java @@ -9,6 +9,7 @@ import javax.servlet.FilterConfig; import javax.servlet.ServletException; +import io.antmedia.AntMediaApplicationAdapter; import org.apache.catalina.util.NetMask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -154,5 +155,16 @@ public Broadcast getBroadcast(String streamId) { } return broadcast; } + + protected AntMediaApplicationAdapter getAntMediaApplicationAdapter(){ + AntMediaApplicationAdapter antMediaApplicationAdapter = null; + ApplicationContext context = getAppContext(); + if (context != null) + { + antMediaApplicationAdapter= (AntMediaApplicationAdapter)context.getBean(AntMediaApplicationAdapter.BEAN_NAME); + } + return antMediaApplicationAdapter; + + } } diff --git a/src/main/java/io/antmedia/filter/DashStatisticsFilter.java b/src/main/java/io/antmedia/filter/DashStatisticsFilter.java index 980e0c9a6..51e3b6dd3 100644 --- a/src/main/java/io/antmedia/filter/DashStatisticsFilter.java +++ b/src/main/java/io/antmedia/filter/DashStatisticsFilter.java @@ -10,6 +10,7 @@ import javax.servlet.http.HttpServletResponse; import javax.ws.rs.HttpMethod; +import io.antmedia.statistic.ViewerStats; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,7 +26,7 @@ public class DashStatisticsFilter extends AbstractFilter { public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException { - + System.out.println("i got called"); HttpServletRequest httpRequest =(HttpServletRequest)request; String method = httpRequest.getMethod(); @@ -52,8 +53,7 @@ public void doFilter(ServletRequest request, ServletResponse response, FilterCha logger.debug("req ip {} session id {} stream id {} status {}", request.getRemoteHost(), sessionId, streamId, status); IStreamStats stats = getStreamStats(DashViewerStats.BEAN_NAME); if (stats != null) { - stats.registerNewViewer(streamId, sessionId, subscriberId); - + stats.registerNewViewer(streamId, sessionId, subscriberId, ViewerStats.DASH_TYPE, getAntMediaApplicationAdapter()); } } } diff --git a/src/main/java/io/antmedia/filter/HlsStatisticsFilter.java b/src/main/java/io/antmedia/filter/HlsStatisticsFilter.java index 0e3ff9f68..d7ab64e09 100644 --- a/src/main/java/io/antmedia/filter/HlsStatisticsFilter.java +++ b/src/main/java/io/antmedia/filter/HlsStatisticsFilter.java @@ -10,6 +10,7 @@ import javax.servlet.http.HttpServletResponse; import javax.ws.rs.HttpMethod; +import io.antmedia.statistic.ViewerStats; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,8 +53,7 @@ public void doFilter(ServletRequest request, ServletResponse response, FilterCha logger.debug("req ip {} session id {} stream id {} status {}", request.getRemoteHost(), sessionId, streamId, status); IStreamStats stats = getStreamStats(HlsViewerStats.BEAN_NAME); if (stats != null) { - stats.registerNewViewer(streamId, sessionId, subscriberId); - + stats.registerNewViewer(streamId, sessionId, subscriberId, ViewerStats.HLS_TYPE, getAntMediaApplicationAdapter()); } } } diff --git a/src/main/java/io/antmedia/muxer/MuxAdaptor.java b/src/main/java/io/antmedia/muxer/MuxAdaptor.java index 2c19d852d..a28c3fc9f 100644 --- a/src/main/java/io/antmedia/muxer/MuxAdaptor.java +++ b/src/main/java/io/antmedia/muxer/MuxAdaptor.java @@ -1681,7 +1681,6 @@ private Muxer addMp4Muxer() { * @return */ public RecordMuxer startRecording(RecordType recordType) { - if (!isRecording.get()) { logger.warn("Starting recording return false for stream:{} because stream is being prepared", streamId); return null; @@ -1692,7 +1691,6 @@ public RecordMuxer startRecording(RecordType recordType) { return null; } - RecordMuxer muxer = null; if(recordType == RecordType.MP4) { Mp4Muxer mp4Muxer = createMp4Muxer(); @@ -1862,10 +1860,15 @@ public Result startRtmpStreaming(String rtmpUrl, int resolutionHeight) } public void sendEndpointErrorNotifyHook(String url){ + AntMediaApplicationAdapter adaptor = getAntMediaApplicationAdaptor(); + adaptor.endpointFailedUpdate(this.streamId, url); + } + + protected AntMediaApplicationAdapter getAntMediaApplicationAdaptor(){ IContext context = MuxAdaptor.this.scope.getContext(); ApplicationContext appCtx = context.getApplicationContext(); AntMediaApplicationAdapter adaptor = (AntMediaApplicationAdapter) appCtx.getBean(AntMediaApplicationAdapter.BEAN_NAME); - adaptor.endpointFailedUpdate(this.streamId, url); + return adaptor; } /** diff --git a/src/main/java/io/antmedia/statistic/DashViewerStats.java b/src/main/java/io/antmedia/statistic/DashViewerStats.java index 74b6197c2..1f160570c 100644 --- a/src/main/java/io/antmedia/statistic/DashViewerStats.java +++ b/src/main/java/io/antmedia/statistic/DashViewerStats.java @@ -1,6 +1,7 @@ package io.antmedia.statistic; +import io.antmedia.AntMediaApplicationAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationContext; @@ -28,11 +29,11 @@ public void setApplicationContext(ApplicationContext applicationContext) { AppSettings settings = (AppSettings)applicationContext.getBean(AppSettings.BEAN_NAME); timeoutMS = getTimeoutMSFromSettings(settings, timeoutMS, DASH_TYPE); - + final AntMediaApplicationAdapter antMediaApplicationAdapter = (AntMediaApplicationAdapter)applicationContext.getBean(AntMediaApplicationAdapter.BEAN_NAME); vertx.setPeriodic(DEFAULT_TIME_PERIOD_FOR_VIEWER_COUNT, yt-> { synchronized (lock) { - updateViewerCountProcess(DASH_TYPE); + updateViewerCountProcess(DASH_TYPE, antMediaApplicationAdapter); } }); } diff --git a/src/main/java/io/antmedia/statistic/HlsViewerStats.java b/src/main/java/io/antmedia/statistic/HlsViewerStats.java index 126fe9e3a..ca7a0a611 100644 --- a/src/main/java/io/antmedia/statistic/HlsViewerStats.java +++ b/src/main/java/io/antmedia/statistic/HlsViewerStats.java @@ -1,5 +1,6 @@ package io.antmedia.statistic; +import io.antmedia.AntMediaApplicationAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationContext; @@ -28,11 +29,12 @@ public void setApplicationContext(ApplicationContext applicationContext) { AppSettings settings = (AppSettings)applicationContext.getBean(AppSettings.BEAN_NAME); timeoutMS = getTimeoutMSFromSettings(settings, timeoutMS, HLS_TYPE); - + final AntMediaApplicationAdapter antMediaApplicationAdapter = (AntMediaApplicationAdapter)applicationContext.getBean(AntMediaApplicationAdapter.BEAN_NAME); + vertx.setPeriodic(DEFAULT_TIME_PERIOD_FOR_VIEWER_COUNT, yt-> { synchronized (lock) { - updateViewerCountProcess(HLS_TYPE); + updateViewerCountProcess(HLS_TYPE, antMediaApplicationAdapter); } }); } diff --git a/src/main/java/io/antmedia/statistic/IStreamStats.java b/src/main/java/io/antmedia/statistic/IStreamStats.java index 00d250c1e..81a2d9e25 100644 --- a/src/main/java/io/antmedia/statistic/IStreamStats.java +++ b/src/main/java/io/antmedia/statistic/IStreamStats.java @@ -1,5 +1,7 @@ package io.antmedia.statistic; +import io.antmedia.AntMediaApplicationAdapter; + public interface IStreamStats { /** @@ -7,7 +9,7 @@ public interface IStreamStats { * @param streamId * @param sessionId */ - void registerNewViewer(String streamId, String sessionId, String subscriberId); + void registerNewViewer(String streamId, String sessionId, String subscriberId, String playType, AntMediaApplicationAdapter antMediaApplicationAdapter); /** diff --git a/src/main/java/io/antmedia/statistic/ViewerStats.java b/src/main/java/io/antmedia/statistic/ViewerStats.java index 252bcce25..c3617dd7a 100644 --- a/src/main/java/io/antmedia/statistic/ViewerStats.java +++ b/src/main/java/io/antmedia/statistic/ViewerStats.java @@ -6,6 +6,7 @@ import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; +import io.antmedia.AntMediaApplicationAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -20,16 +21,18 @@ public class ViewerStats { protected static Logger logger = LoggerFactory.getLogger(ViewerStats.class); - + protected Vertx vertx; public static final String HLS_TYPE = "hls"; public static final String DASH_TYPE = "dash"; - + public static final String RTMP_TYPE = "rtmp"; + public static final String WEBRTC_TYPE = "webrtc"; + + private DataStore dataStore; protected DataStoreFactory dataStoreFactory; - public static final int DEFAULT_TIME_PERIOD_FOR_VIEWER_COUNT = 10000; /** @@ -49,14 +52,12 @@ public class ViewerStats { */ protected int timeoutMS = 20000; - public void registerNewViewer(String streamId, String sessionId, String subscriberId) + public void registerNewViewer(String streamId, String sessionId, String subscriberId, String viewerPlayType, AntMediaApplicationAdapter antMediaApplicationAdapter) { //do not block the thread, run in vertx event queue vertx.runOnContext(h -> { - synchronized (lock) { //synchronize with database update calculations, because some odd cases may happen - Map viewerMap = streamsViewerMap.get(streamId); if (viewerMap == null) { viewerMap = new ConcurrentHashMap<>(); @@ -66,7 +67,11 @@ public void registerNewViewer(String streamId, String sessionId, String subscrib int streamIncrementCounter = getIncreaseCounterMap(streamId); streamIncrementCounter++; increaseCounterMap.put(streamId, streamIncrementCounter); - + if(subscriberId != null && !subscriberId.equals("undefined")){ + antMediaApplicationAdapter.sendStartPlayWebHook(viewerPlayType, streamId, subscriberId); + }else{ + antMediaApplicationAdapter.sendStartPlayWebHook(viewerPlayType, streamId, sessionId); + } } viewerMap.put(sessionId, System.currentTimeMillis()); streamsViewerMap.put(streamId, viewerMap); @@ -85,7 +90,7 @@ public void registerNewViewer(String streamId, String sessionId, String subscrib }); } - + public void resetViewerMap(String streamID, String type) { Iterator> viewerIterator; @@ -110,7 +115,7 @@ public void resetViewerMap(String streamID, String type) { logger.info("Reset {} Stream ID: {} remove failed or null", type, streamID); } } - + public int getViewerCount(String streamId) { Map viewerMap = streamsViewerMap.get(streamId); int viewerCount = 0; @@ -206,7 +211,7 @@ public void setVertx(Vertx vertx) { this.vertx = vertx; } - public void updateViewerCountProcess(String type) { + public void updateViewerCountProcess(String type, AntMediaApplicationAdapter antMediaApplicationAdapter) { Iterator>> streamIterator = streamsViewerMap.entrySet().iterator(); @@ -243,9 +248,15 @@ public void updateViewerCountProcess(String type) { // regard it as not a viewer viewerIterator.remove(); numberOfDecrement++; - + String sessionId = viewer.getKey(); String subscriberId = sessionId2subscriberId.get(sessionId); + + if(subscriberId !=null && !subscriberId.equals("undefined")){ + antMediaApplicationAdapter.sendStopPlayWebHook(type,streamId,subscriberId); + }else{ + antMediaApplicationAdapter.sendStopPlayWebHook(type,streamId,sessionId); + } // set subscriber status to not connected if(subscriberId != null) { // add a disconnected event to the subscriber @@ -307,5 +318,4 @@ public void updateViewerCountProcess(String type) { } } - } diff --git a/src/test/java/io/antmedia/test/AntMediaApplicationAdaptorUnitTest.java b/src/test/java/io/antmedia/test/AntMediaApplicationAdaptorUnitTest.java index 8341e015e..eaee0a3f9 100644 --- a/src/test/java/io/antmedia/test/AntMediaApplicationAdaptorUnitTest.java +++ b/src/test/java/io/antmedia/test/AntMediaApplicationAdaptorUnitTest.java @@ -7,6 +7,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.timeout; @@ -32,6 +33,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; +import io.antmedia.statistic.ViewerStats; import org.apache.commons.lang3.RandomUtils; import org.apache.http.HttpEntity; import org.apache.http.StatusLine; @@ -641,10 +643,10 @@ public void testNotifyHook() { AntMediaApplicationAdapter spyAdaptor = Mockito.spy(adapter); - StringBuilder notifyHook = spyAdaptor.notifyHook(null, null, null, null, null, null, null, null); + StringBuilder notifyHook = spyAdaptor.notifyHook(null, null, null, null, null, null, null, null, null); assertNull(notifyHook); - notifyHook = spyAdaptor.notifyHook("", null, null, null, null, null, null, null); + notifyHook = spyAdaptor.notifyHook("", null, null, null, null, null, null, null, null); assertNull(notifyHook); @@ -656,8 +658,10 @@ public void testNotifyHook() { String vodName = "vod name" + String.valueOf((Math.random() * 10000)); String vodId = String.valueOf((Math.random() * 10000)); + String viewerId = String.valueOf((Math.random() * 10000)); + String url = "this is url"; - notifyHook = spyAdaptor.notifyHook(url, id, action, streamName, category, vodName, vodId, null); + notifyHook = spyAdaptor.notifyHook(url, id, action, streamName, category, vodName, vodId, viewerId, null); assertNull(notifyHook); try { @@ -673,6 +677,7 @@ public void testNotifyHook() { assertEquals(category, variablesMap.get("category")); assertEquals(vodName, variablesMap.get("vodName")); assertEquals(vodId, variablesMap.get("vodId")); + assertEquals(viewerId, variablesMap.get("viewerId")); } catch (IOException e) { e.printStackTrace(); @@ -681,7 +686,7 @@ public void testNotifyHook() { url = "this is second url"; - notifyHook = spyAdaptor.notifyHook(url, id, null, null, null, null, null, null); + notifyHook = spyAdaptor.notifyHook(url, id, null, null, null, null, null, null, null); assertNull(notifyHook); try { @@ -734,6 +739,7 @@ public void testNotifyHookErrors(){ ArgumentCaptor captureCategory = ArgumentCaptor.forClass(String.class); ArgumentCaptor captureVodName = ArgumentCaptor.forClass(String.class); ArgumentCaptor captureVodId = ArgumentCaptor.forClass(String.class); + ArgumentCaptor captureViewerId = ArgumentCaptor.forClass(String.class); ArgumentCaptor captureMetadata = ArgumentCaptor.forClass(String.class); /* @@ -748,7 +754,7 @@ public void testNotifyHookErrors(){ //verify that notifyHook is called 1 time verify(spyAdaptor, times(1)).notifyHook(captureUrl.capture(), captureId.capture(), captureAction.capture(), - captureStreamName.capture(), captureCategory.capture(),captureVodName.capture(), captureVodId.capture(), captureMetadata.capture()); + captureStreamName.capture(), captureCategory.capture(),captureVodName.capture(), captureVodId.capture(), captureViewerId.capture(), captureMetadata.capture()); assertEquals(captureUrl.getValue(), broadcast.getListenerHookURL()); assertEquals(captureId.getValue(), broadcast.getStreamId()); @@ -774,7 +780,7 @@ public void testNotifyHookErrors(){ //verify that notifyHook is called 1 time verify(spyAdaptor, times(2)).notifyHook(captureUrl.capture(), captureId.capture(), captureAction.capture(), - captureStreamName.capture(), captureCategory.capture(),captureVodName.capture(), captureVodId.capture(), captureMetadata.capture()); + captureStreamName.capture(), captureCategory.capture(),captureVodName.capture(), captureVodId.capture(), captureViewerId.capture(), captureMetadata.capture()); assertEquals(captureUrl.getValue(), broadcast.getListenerHookURL()); assertEquals(captureId.getValue(), broadcast.getStreamId()); @@ -799,7 +805,7 @@ public void testNotifyHookErrors(){ //verify that notifyHook is called 1 time verify(spyAdaptor, times(3)).notifyHook(captureUrl.capture(), captureId.capture(), captureAction.capture(), - captureStreamName.capture(), captureCategory.capture(),captureVodName.capture(), captureVodId.capture(), captureMetadata.capture()); + captureStreamName.capture(), captureCategory.capture(),captureVodName.capture(), captureVodId.capture(), captureViewerId.capture(), captureMetadata.capture()); assertEquals(captureUrl.getValue(), broadcast.getListenerHookURL()); assertEquals(captureId.getValue(), broadcast.getStreamId()); @@ -854,6 +860,7 @@ public void testNotifyHookFromMuxingFinished() { ArgumentCaptor captureCategory = ArgumentCaptor.forClass(String.class); ArgumentCaptor captureVodName = ArgumentCaptor.forClass(String.class); ArgumentCaptor captureVodId = ArgumentCaptor.forClass(String.class); + ArgumentCaptor captureViewerId = ArgumentCaptor.forClass(String.class); ArgumentCaptor captureMetadata = ArgumentCaptor.forClass(String.class); @@ -863,7 +870,7 @@ public void testNotifyHookFromMuxingFinished() { //verify that notifyHook is never called verify(spyAdaptor, never()).notifyHook(captureUrl.capture(), captureId.capture(), captureAction.capture(), - captureStreamName.capture(), captureCategory.capture(), captureVodName.capture(), captureVodId.capture(), captureMetadata.capture()); + captureStreamName.capture(), captureCategory.capture(), captureVodName.capture(), captureVodId.capture(), captureViewerId.capture(), captureMetadata.capture()); /* @@ -889,7 +896,7 @@ public void testNotifyHookFromMuxingFinished() { //verify that notifyHook is called 1 time verify(spyAdaptor, times(1)).notifyHook(captureUrl.capture(), captureId.capture(), captureAction.capture(), - captureStreamName.capture(), captureCategory.capture(), captureVodName.capture(), captureVodId.capture(), captureMetadata.capture()); + captureStreamName.capture(), captureCategory.capture(), captureVodName.capture(), captureVodId.capture(), captureViewerId.capture(), captureMetadata.capture()); assertEquals(captureUrl.getValue(), broadcast.getListenerHookURL()); assertEquals(captureId.getValue(), broadcast.getStreamId()); @@ -923,7 +930,7 @@ public void testNotifyHookFromMuxingFinished() { //verify that no new notifyHook is called verify(spyAdaptor, times(1)).notifyHook(captureUrl.capture(), captureId.capture(), captureAction.capture(), - captureStreamName.capture(), captureCategory.capture(), captureVodName.capture(), captureVodId.capture(), captureMetadata.capture()); + captureStreamName.capture(), captureCategory.capture(), captureVodName.capture(), captureVodId.capture(), captureViewerId.capture(), captureMetadata.capture()); called = true; } @@ -952,7 +959,7 @@ public void testNotifyHookFromMuxingFinished() { //verify that notifyHook is called 2 times verify(spyAdaptor, times(2)).notifyHook(captureUrl.capture(), captureId.capture(), captureAction.capture(), - captureStreamName.capture(), captureCategory.capture(), captureVodName.capture(), captureVodId.capture(), captureMetadata.capture()); + captureStreamName.capture(), captureCategory.capture(), captureVodName.capture(), captureVodId.capture(), captureViewerId.capture(), captureMetadata.capture()); assertEquals(captureUrl.getValue(), broadcast.getListenerHookURL()); assertEquals(captureId.getValue(), broadcast.getStreamId()); @@ -1775,5 +1782,53 @@ public void testAppDeletion() adapter.stopApplication(true); verify(dataStore, timeout(ClusterNode.NODE_UPDATE_PERIOD+1000)).close(true); } + @Test + public void testRecordStartedHook() throws Exception { + final AntMediaApplicationAdapter spyAdaptor = Mockito.spy(adapter); + AppSettings appSettings = new AppSettings(); + spyAdaptor.setAppSettings(appSettings); + + Broadcast broadcast = new Broadcast(); + assertNull(spyAdaptor.getListenerHookURL(broadcast)); + broadcast.setMp4Enabled(MuxAdaptor.RECORDING_ENABLED_FOR_STREAM); + String hookURL = "listener_hook_url"; + appSettings.setListenerHookURL(hookURL); + + assertEquals(hookURL, spyAdaptor.getListenerHookURL(broadcast)); + + + appSettings = new AppSettings(); + spyAdaptor.setServerSettings(new ServerSettings()); + spyAdaptor.setAppSettings(appSettings); + DataStore dataStore = new InMemoryDataStore("testHook"); + DataStoreFactory dsf = Mockito.mock(DataStoreFactory.class); + Mockito.when(dsf.getDataStore()).thenReturn(dataStore); + spyAdaptor.setDataStoreFactory(dsf); + spyAdaptor.setDataStore(dataStore); + broadcast.setStreamId("stream1"); + broadcast.setName("name"); + broadcast.setCategory("category"); + broadcast.setListenerHookURL(hookURL); + dataStore.save(broadcast); + String streamId = broadcast.getStreamId(); + + + doReturn(new StringBuilder()).when(spyAdaptor).sendPOST(anyString(),anyMap()); + + spyAdaptor.startPublish(streamId, 0, IAntMediaStreamHandler.PUBLISH_TYPE_WEBRTC); + verify(spyAdaptor, times(1)).sendStartRecordWebHook(streamId); + Awaitility.await().atMost(5, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until( + ()-> { + boolean called = false; + try{ + verify(spyAdaptor,times(1)).notifyHook(broadcast.getListenerHookURL(),streamId,AntMediaApplicationAdapter.HOOK_ACTION_START_RECORD, broadcast.getName(),broadcast.getCategory(),null,null,null,null); + called = true; + }catch (Exception e){ + e.printStackTrace(); + } + return called; + }); + + } } diff --git a/src/test/java/io/antmedia/test/Application.java b/src/test/java/io/antmedia/test/Application.java index e2a28ac58..f3d1fb427 100644 --- a/src/test/java/io/antmedia/test/Application.java +++ b/src/test/java/io/antmedia/test/Application.java @@ -23,7 +23,8 @@ public class Application extends AntMediaApplicationAdapter implements IAntMedia public static boolean enableSourceHealthUpdate = false; public static List notifyVodId = new ArrayList<>();; - + + public static List notifyViewerId = new ArrayList<>();; @Override @@ -51,7 +52,7 @@ public static void resetFields() { @Override public StringBuilder notifyHook(String url, String id, String action, String streamName, String category, - String vodName, String vodId, String metadata) { + String vodName, String vodId, String viewerId, String metadata) { logger.info("notify hook action: {}", action); notifyHookAction.add(action); notitfyURL.add(url); @@ -60,6 +61,7 @@ public StringBuilder notifyHook(String url, String id, String action, String str notifyCategory.add(category); notifyVodName.add(vodName); notifyVodId.add(vodId); + notifyViewerId.add(viewerId); return null; } diff --git a/src/test/java/io/antmedia/test/MuxerUnitTest.java b/src/test/java/io/antmedia/test/MuxerUnitTest.java index 51b1aa8ac..f21f0eaca 100644 --- a/src/test/java/io/antmedia/test/MuxerUnitTest.java +++ b/src/test/java/io/antmedia/test/MuxerUnitTest.java @@ -1679,7 +1679,7 @@ public void testMp4MuxingAndNotifyCallback() { Application app = (Application) applicationContext.getBean("web.handler"); AntMediaApplicationAdapter appAdaptor = Mockito.spy(app); - doReturn(new StringBuilder("")).when(appAdaptor).notifyHook(anyString(), anyString(), anyString(), anyString(), anyString(), anyString(), anyString(), anyString()); + doReturn(new StringBuilder("")).when(appAdaptor).notifyHook(anyString(), anyString(), anyString(), anyString(), anyString(), anyString(), anyString(), anyString(), anyString()); assertNotNull(appAdaptor); //just check below value that it is not null, this is not related to this case but it should be tested diff --git a/src/test/java/io/antmedia/test/filter/DashStatisticsFilterTest.java b/src/test/java/io/antmedia/test/filter/DashStatisticsFilterTest.java index 0f83e17b2..2c2597e8b 100644 --- a/src/test/java/io/antmedia/test/filter/DashStatisticsFilterTest.java +++ b/src/test/java/io/antmedia/test/filter/DashStatisticsFilterTest.java @@ -1,14 +1,11 @@ package io.antmedia.test.filter; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.*; import java.io.IOException; +import java.util.concurrent.TimeUnit; import javax.servlet.FilterChain; import javax.servlet.FilterConfig; @@ -18,8 +15,13 @@ import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpSession; +import io.antmedia.AntMediaApplicationAdapter; +import io.antmedia.statistic.ViewerStats; import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; +import org.awaitility.Awaitility; +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -130,8 +132,10 @@ public void testDoFilter() { .thenReturn(context); when(filterconfig.getServletContext()).thenReturn(servletContext); - - + + AntMediaApplicationAdapter antMediaApplicationAdapter = mock(AntMediaApplicationAdapter.class); + when(context.getBean(AntMediaApplicationAdapter.BEAN_NAME)).thenReturn(antMediaApplicationAdapter); + try { dashStatisticsFilter.init(filterconfig); //when(dashStatisticsFilter.getStreamStats()).thenReturn(streamStats); @@ -160,12 +164,31 @@ public void testDoFilter() { logger.info("session id {}, stream id {}", sessionId, streamId); dashStatisticsFilter.doFilter(mockRequest, mockResponse, mockChain); - - - verify(streamStats, times(1)).registerNewViewer(streamId, sessionId, null); - - - + + verify(streamStats, times(1)).registerNewViewer(streamId, sessionId, null, ViewerStats.DASH_TYPE, antMediaApplicationAdapter); + /*verify(antMediaApplicationAdapter, times(1)).sendStartPlayWebHook(ViewerStats.DASH_TYPE, streamId, null); + + Awaitility.await().atMost(10, TimeUnit.SECONDS).until(()-> { + boolean called = false; + try { + Broadcast broadcast = dataStore.get(streamId); + String url = broadcast.getListenerHookURL(); + String id = streamId; + String action = AntMediaApplicationAdapter.HOOK_ACTION_START_PLAY; + String streamName = broadcast.getName(); + String category = broadcast.getCategory(); + //String viewerId = sessionId; + verify(antMediaApplicationAdapter,times(1)).notifyHook(url,id,action,streamName,category, null, null,null, null); + called = true; + } catch (Exception e) { + e.printStackTrace(); + + } + return called; + });*/ + + + } catch (ServletException|IOException e) { logger.error(ExceptionUtils.getStackTrace(e)); fail(ExceptionUtils.getStackTrace(e)); @@ -189,7 +212,10 @@ public void testDASHViewerLimit() { IStreamStats streamStats = mock(IStreamStats.class); when(context.getBean(DashViewerStats.BEAN_NAME)).thenReturn(streamStats); - + + AntMediaApplicationAdapter antMediaApplicationAdapter = mock(AntMediaApplicationAdapter.class); + when(context.getBean(AntMediaApplicationAdapter.BEAN_NAME)).thenReturn(antMediaApplicationAdapter); + when(context.isRunning()).thenReturn(true); DataStoreFactory dsf = mock(DataStoreFactory.class); when(context.getBean(DataStoreFactory.BEAN_NAME)).thenReturn(dsf); @@ -211,17 +237,17 @@ public void testDASHViewerLimit() { try { dashStatisticsFilter.init(filterconfig); //when(dashStatisticsFilter.getStreamStats()).thenReturn(streamStats); - + String sessionId = requestDash(streamId); - verify(streamStats, times(1)).registerNewViewer(streamId, sessionId, null); + verify(streamStats, times(1)).registerNewViewer(streamId, sessionId, null, ViewerStats.DASH_TYPE, antMediaApplicationAdapter); broadcast.setDashViewerCount(1); String sessionId2 = requestDash(streamId); - verify(streamStats, times(1)).registerNewViewer(streamId, sessionId2, null); + verify(streamStats, times(1)).registerNewViewer(streamId, sessionId2, null, ViewerStats.DASH_TYPE, antMediaApplicationAdapter); broadcast.setDashViewerCount(2); String sessionId3 = requestDash(streamId); - verify(streamStats, never()).registerNewViewer(streamId, sessionId3, null); + verify(streamStats, never()).registerNewViewer(streamId, sessionId3, null, ViewerStats.DASH_TYPE, antMediaApplicationAdapter); } catch (ServletException|IOException e) { logger.error(ExceptionUtils.getStackTrace(e)); fail(ExceptionUtils.getStackTrace(e)); diff --git a/src/test/java/io/antmedia/test/filter/HlsStatisticsFilterTest.java b/src/test/java/io/antmedia/test/filter/HlsStatisticsFilterTest.java index a2a233ff0..eb65f6a2b 100644 --- a/src/test/java/io/antmedia/test/filter/HlsStatisticsFilterTest.java +++ b/src/test/java/io/antmedia/test/filter/HlsStatisticsFilterTest.java @@ -18,6 +18,8 @@ import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpSession; +import io.antmedia.AntMediaApplicationAdapter; +import io.antmedia.statistic.ViewerStats; import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; import org.junit.After; @@ -130,7 +132,8 @@ public void testDoFilter() { .thenReturn(context); when(filterconfig.getServletContext()).thenReturn(servletContext); - + AntMediaApplicationAdapter antMediaApplicationAdapter = mock(AntMediaApplicationAdapter.class); + when(context.getBean(AntMediaApplicationAdapter.BEAN_NAME)).thenReturn(antMediaApplicationAdapter); try { hlsStatisticsFilter.init(filterconfig); @@ -159,12 +162,9 @@ public void testDoFilter() { logger.info("session id {}, stream id {}", sessionId, streamId); hlsStatisticsFilter.doFilter(mockRequest, mockResponse, mockChain); - - - verify(streamStats, times(1)).registerNewViewer(streamId, sessionId, null); - - - + + verify(streamStats, times(1)).registerNewViewer(streamId, sessionId, null, ViewerStats.HLS_TYPE, antMediaApplicationAdapter); + } catch (ServletException|IOException e) { logger.error(ExceptionUtils.getStackTrace(e)); fail(ExceptionUtils.getStackTrace(e)); @@ -205,20 +205,22 @@ public void testHLSViewerLimit() { broadcast.setHlsViewerLimit(2); when(dataStore.get(streamId)).thenReturn(broadcast); - + AntMediaApplicationAdapter antMediaApplicationAdapter = mock(AntMediaApplicationAdapter.class); + when(context.getBean(AntMediaApplicationAdapter.BEAN_NAME)).thenReturn(antMediaApplicationAdapter); + try { hlsStatisticsFilter.init(filterconfig); - + String sessionId = requestHls(streamId); - verify(streamStats, times(1)).registerNewViewer(streamId, sessionId, null); + verify(streamStats, times(1)).registerNewViewer(streamId, sessionId, null, ViewerStats.HLS_TYPE, antMediaApplicationAdapter); broadcast.setHlsViewerCount(1); String sessionId2 = requestHls(streamId); - verify(streamStats, times(1)).registerNewViewer(streamId, sessionId2, null); + verify(streamStats, times(1)).registerNewViewer(streamId, sessionId2, null, ViewerStats.HLS_TYPE, antMediaApplicationAdapter); broadcast.setHlsViewerCount(2); String sessionId3 = requestHls(streamId); - verify(streamStats, never()).registerNewViewer(streamId, sessionId3, null); + verify(streamStats, never()).registerNewViewer(streamId, sessionId3, null, ViewerStats.HLS_TYPE, antMediaApplicationAdapter); } catch (ServletException|IOException e) { logger.error(ExceptionUtils.getStackTrace(e)); fail(ExceptionUtils.getStackTrace(e)); diff --git a/src/test/java/io/antmedia/test/statistic/DashViewerStatsTest.java b/src/test/java/io/antmedia/test/statistic/DashViewerStatsTest.java index 86fd027f3..edaf90bc3 100644 --- a/src/test/java/io/antmedia/test/statistic/DashViewerStatsTest.java +++ b/src/test/java/io/antmedia/test/statistic/DashViewerStatsTest.java @@ -4,17 +4,25 @@ import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; +import static org.mockito.Mockito.times; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import io.antmedia.security.AcceptOnlyStreamsInDataStore; +import org.apache.http.HttpResponse; +import org.apache.http.StatusLine; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.impl.client.CloseableHttpClient; import org.awaitility.Awaitility; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.Mockito; +import org.red5.server.api.IContext; +import org.red5.server.api.scope.IScope; import org.springframework.context.ApplicationContext; import ch.qos.logback.classic.Logger; @@ -34,8 +42,8 @@ public class DashViewerStatsTest { - static Vertx vertx; - + static Vertx vertx; + @BeforeClass public static void beforeClass() { vertx = io.vertx.core.Vertx.vertx(); @@ -64,10 +72,11 @@ public void testDASHViewerCount() { // TODO Auto-generated catch block e.printStackTrace(); } + AntMediaApplicationAdapter antMediaApplicationAdapter = mock(AntMediaApplicationAdapter.class); for (int i = 0; i < 100; i++) { String sessionId = String.valueOf((Math.random() * 999999)); - viewerStats.registerNewViewer(streamId, sessionId, null); + viewerStats.registerNewViewer(streamId, sessionId, null, ViewerStats.DASH_TYPE, antMediaApplicationAdapter); } Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until( @@ -81,7 +90,7 @@ public void testDASHViewerCount() { //Add same session ID for (int i = 0; i < 10; i++) { String sessionId = "sameSessionID"; - viewerStats.registerNewViewer(streamId, sessionId, null); + viewerStats.registerNewViewer(streamId, sessionId, null, ViewerStats.DASH_TYPE, antMediaApplicationAdapter); } Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until( @@ -115,7 +124,9 @@ public void testSubscriberEvents() { String sessionId = String.valueOf((Math.random() * 999999)); // check if viewer is added - viewerStats.registerNewViewer(streamId, sessionId, subscriberPlay.getSubscriberId()); + AntMediaApplicationAdapter antMediaApplicationAdapter = mock(AntMediaApplicationAdapter.class); + + viewerStats.registerNewViewer(streamId, sessionId, subscriberPlay.getSubscriberId(), ViewerStats.DASH_TYPE, antMediaApplicationAdapter); Awaitility.await().atMost(15, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until( ()-> { boolean eventExist = false; @@ -154,6 +165,10 @@ public void testGetTimeout() { @Test public void testSetApplicationContextSubscribers() { ApplicationContext context = mock(ApplicationContext.class); + AntMediaApplicationAdapter adapter = new AntMediaApplicationAdapter(); + AppSettings appSettings = new AppSettings(); + adapter.setAppSettings(appSettings); + adapter.setVertx(vertx); try { @@ -173,20 +188,39 @@ public void testSetApplicationContextSubscribers() { when(context.getBean(AppSettings.BEAN_NAME)).thenReturn(settings); when(context.getBean(ServerSettings.BEAN_NAME)).thenReturn(new ServerSettings()); - + + IScope scope = mock(IScope.class); + + when(scope.getName()).thenReturn("junit"); + adapter.setScope(scope); + adapter.setDataStoreFactory(dsf); + AntMediaApplicationAdapter spyAdapter = Mockito.spy(adapter); + + when(context.getBean(AntMediaApplicationAdapter.BEAN_NAME)).thenReturn(spyAdapter); + + CloseableHttpClient httpClient = Mockito.mock(CloseableHttpClient.class); + Mockito.doReturn(httpClient).when(spyAdapter).getHttpClient(); + + CloseableHttpResponse httpResponse = Mockito.mock(CloseableHttpResponse.class); + Mockito.when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); + Mockito.when(httpResponse.getStatusLine()).thenReturn(Mockito.mock(StatusLine.class)); + + Mockito.when(httpResponse.getEntity()).thenReturn(null); + DashViewerStats viewerStats = new DashViewerStats(); viewerStats.setTimePeriodMS(1000); viewerStats.setApplicationContext(context); - Broadcast broadcast = new Broadcast(); broadcast.setStatus(AntMediaApplicationAdapter.BROADCAST_STATUS_BROADCASTING); broadcast.setName("name"); + broadcast.setListenerHookURL("url"); dsf.setWriteStatsToDatastore(true); dsf.setApplicationContext(context); String streamId = dsf.getDataStore().save(broadcast); + assertEquals(1000, viewerStats.getTimePeriodMS()); assertEquals(10000, viewerStats.getTimeoutMS()); @@ -214,11 +248,28 @@ public void testSetApplicationContextSubscribers() { subscriberPlay3.setSubscriberId("subscriber3"); subscriberPlay3.setB32Secret("6qsp6qhndryqs56zjmvs37i6gqtjsdvc"); subscriberPlay3.setType(Subscriber.PLAY_TYPE); - dsf.getDataStore().addSubscriber(subscriberPlay3.getStreamId(), subscriberPlay3); - - viewerStats.registerNewViewer(streamId, sessionId, subscriberPlay.getSubscriberId()); - viewerStats.registerNewViewer(streamId, sessionId2, subscriberPlay.getSubscriberId()); - + dsf.getDataStore().addSubscriber(subscriberPlay3.getStreamId(), subscriberPlay3); + + //spyAdapter.setDataStoreFactory(dsf); + + viewerStats.registerNewViewer(streamId, sessionId, subscriberPlay.getSubscriberId(), ViewerStats.DASH_TYPE, spyAdapter); + viewerStats.registerNewViewer(streamId, sessionId2, subscriberPlay.getSubscriberId(), ViewerStats.DASH_TYPE, spyAdapter); + + + Awaitility.await().atMost(5, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until( + ()-> { + boolean called = false; + try{ + verify(spyAdapter, times(2)).sendStartPlayWebHook(ViewerStats.DASH_TYPE, streamId, subscriberPlay.getSubscriberId()); + verify(spyAdapter,times(2)).notifyHook(broadcast.getListenerHookURL(),streamId,AntMediaApplicationAdapter.HOOK_ACTION_START_PLAY, broadcast.getName(),broadcast.getCategory(),null,null,subscriberPlay.getSubscriberId(),null); + called = true; + }catch (Exception e){ + e.printStackTrace(); + } + return called; + }); + + Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until( ()->viewerStats.getViewerCount(streamId) == 2 ); @@ -227,11 +278,11 @@ public void testSetApplicationContextSubscribers() { Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until( ()->viewerStats.getTotalViewerCount() == 2 ); - + //Viewer timeout increase - viewerStats.registerNewViewer(streamId, sessionId, subscriberPlay2.getSubscriberId()); - viewerStats.registerNewViewer(streamId, sessionId2, subscriberPlay2.getSubscriberId()); - + viewerStats.registerNewViewer(streamId, sessionId, subscriberPlay2.getSubscriberId(), ViewerStats.DASH_TYPE, spyAdapter); + viewerStats.registerNewViewer(streamId, sessionId2, subscriberPlay2.getSubscriberId(), ViewerStats.DASH_TYPE, spyAdapter); + Awaitility.await().atMost(15, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until( ()-> { boolean eventExist = false; @@ -246,7 +297,6 @@ public void testSetApplicationContextSubscribers() { return subData.isConnected() && subData.getCurrentConcurrentConnections() == 2 && eventExist; }); - // Check viewer is online Awaitility.await().atMost(20, TimeUnit.SECONDS).until( ()-> dsf.getDataStore().get(streamId).getDashViewerCount() == 2); @@ -254,7 +304,24 @@ public void testSetApplicationContextSubscribers() { // Wait some time for detect disconnect Awaitility.await().atMost(20, TimeUnit.SECONDS).until( ()-> dsf.getDataStore().get(streamId).getDashViewerCount() == 0); - + + + Awaitility.await().atMost(20, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until( + ()-> { + boolean called = false; + try{ + verify(spyAdapter, times(2)).sendStopPlayWebHook(ViewerStats.DASH_TYPE, streamId, subscriberPlay2.getSubscriberId()); + + verify(spyAdapter,times(2)).notifyHook(broadcast.getListenerHookURL(),streamId,AntMediaApplicationAdapter.HOOK_ACTION_STOP_PLAY, broadcast.getName(),broadcast.getCategory(),null,null,subscriberPlay2.getSubscriberId(),null); + + called = true; + }catch (Exception e){ + e.printStackTrace(); + } + return called; + }); + + assertEquals(0, viewerStats.getViewerCount(streamId)); assertEquals(0, viewerStats.getIncreaseCounterMap(streamId)); assertEquals(0, viewerStats.getTotalViewerCount()); @@ -276,7 +343,7 @@ public void testSetApplicationContextSubscribers() { ()-> dsf.getDataStore().save(broadcast).equals(streamId)); - viewerStats.registerNewViewer(streamId, sessionId, subscriberPlay3.getSubscriberId()); + viewerStats.registerNewViewer(streamId, sessionId, subscriberPlay3.getSubscriberId(), ViewerStats.DASH_TYPE, spyAdapter); Awaitility.await().atMost(20, TimeUnit.SECONDS).until( ()-> viewerStats.getViewerCount(streamId) == 1); @@ -321,11 +388,13 @@ public void testSetApplicationContext() { ApplicationContext context = mock(ApplicationContext.class); try { - + AntMediaApplicationAdapter antMediaApplicationAdapter = mock(AntMediaApplicationAdapter.class); + when((AntMediaApplicationAdapter) context.getBean(AntMediaApplicationAdapter.BEAN_NAME)).thenReturn(antMediaApplicationAdapter); DataStoreFactory dsf = new DataStoreFactory(); dsf.setDbType("memorydb"); dsf.setDbName("datastore"); when(context.getBean(DataStoreFactory.BEAN_NAME)).thenReturn(dsf); + antMediaApplicationAdapter.setDataStoreFactory(dsf); when(context.containsBean(AppSettings.BEAN_NAME)).thenReturn(true); @@ -356,9 +425,13 @@ public void testSetApplicationContext() { assertEquals(10000, viewerStats.getTimeoutMS()); String sessionId = "sessionId" + (int)(Math.random() * 10000); + antMediaApplicationAdapter.setAppSettings(settings); + + + //AntMediaApplicationAdapter spyAdapter = Mockito.spy(new AntMediaApplicationAdapter()); + + viewerStats.registerNewViewer(streamId, sessionId, null, ViewerStats.DASH_TYPE, antMediaApplicationAdapter); - viewerStats.registerNewViewer(streamId, sessionId, null); - Awaitility.await().atMost(30, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until( ()->viewerStats.getViewerCount(streamId) == 1 ); @@ -369,7 +442,7 @@ public void testSetApplicationContext() { ()->viewerStats.getTotalViewerCount() == 1 ); //Viewer timeout increase - viewerStats.registerNewViewer(streamId, sessionId, null); + viewerStats.registerNewViewer(streamId, sessionId, null, ViewerStats.DASH_TYPE, antMediaApplicationAdapter); // Check viewer is online Awaitility.await().atMost(30, TimeUnit.SECONDS).until( @@ -390,9 +463,8 @@ public void testSetApplicationContext() { Awaitility.await().atMost(30, TimeUnit.SECONDS).until( ()-> dsf.getDataStore().save(broadcast).equals(streamId)); - - viewerStats.registerNewViewer(streamId, sessionId, null); - + viewerStats.registerNewViewer(streamId, sessionId, null, ViewerStats.DASH_TYPE, antMediaApplicationAdapter); + Awaitility.await().atMost(30, TimeUnit.SECONDS).until( ()-> viewerStats.getViewerCount(streamId) == 1); diff --git a/src/test/java/io/antmedia/test/statistic/HlsViewerStatsTest.java b/src/test/java/io/antmedia/test/statistic/HlsViewerStatsTest.java index fd3a3eadb..6849444af 100644 --- a/src/test/java/io/antmedia/test/statistic/HlsViewerStatsTest.java +++ b/src/test/java/io/antmedia/test/statistic/HlsViewerStatsTest.java @@ -65,10 +65,11 @@ public void testHLSViewerCount() { // TODO Auto-generated catch block e.printStackTrace(); } + AntMediaApplicationAdapter antMediaApplicationAdapter = mock(AntMediaApplicationAdapter.class); for (int i = 0; i < 100; i++) { String sessionId = String.valueOf((Math.random() * 999999)); - viewerStats.registerNewViewer(streamId, sessionId, null); + viewerStats.registerNewViewer(streamId, sessionId, null, ViewerStats.HLS_TYPE, antMediaApplicationAdapter); } Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until( @@ -82,7 +83,7 @@ public void testHLSViewerCount() { //Add same session ID for (int i = 0; i < 10; i++) { String sessionId = "sameSessionID"; - viewerStats.registerNewViewer(streamId, sessionId, null); + viewerStats.registerNewViewer(streamId, sessionId, null, ViewerStats.HLS_TYPE, antMediaApplicationAdapter); } Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until( @@ -115,8 +116,12 @@ public void testSubscriberEvents() { dataStore.addSubscriber(subscriberPlay.getStreamId(), subscriberPlay); String sessionId = String.valueOf((Math.random() * 999999)); + + AntMediaApplicationAdapter antMediaApplicationAdapter = mock(AntMediaApplicationAdapter.class); + + // check if viewer is added - viewerStats.registerNewViewer(streamId, sessionId, subscriberPlay.getSubscriberId()); + viewerStats.registerNewViewer(streamId, sessionId, subscriberPlay.getSubscriberId(), ViewerStats.HLS_TYPE, antMediaApplicationAdapter); Awaitility.await().atMost(15, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until( ()-> { boolean eventExist = false; @@ -168,7 +173,10 @@ public void testSetApplicationContextSubscribers() { when(context.getBean(IAntMediaStreamHandler.VERTX_BEAN_NAME)).thenReturn(vertx); AppSettings settings = mock(AppSettings.class); - + AntMediaApplicationAdapter antMediaApplicationAdapter = mock(AntMediaApplicationAdapter.class); + when((AntMediaApplicationAdapter) context.getBean(AntMediaApplicationAdapter.BEAN_NAME)).thenReturn(antMediaApplicationAdapter); + antMediaApplicationAdapter.setDataStoreFactory(dsf); + antMediaApplicationAdapter.setAppSettings(settings); //set hls time to 1 when(settings.getHlsTime()).thenReturn("1"); @@ -213,10 +221,11 @@ public void testSetApplicationContextSubscribers() { subscriberPlay3.setSubscriberId("subscriber3"); subscriberPlay3.setB32Secret("6qsp6qhndryqs56zjmvs37i6gqtjsdvc"); subscriberPlay3.setType(Subscriber.PLAY_TYPE); - dsf.getDataStore().addSubscriber(subscriberPlay3.getStreamId(), subscriberPlay3); - - - viewerStats.registerNewViewer(streamId, sessionId, subscriberPlay.getSubscriberId()); + dsf.getDataStore().addSubscriber(subscriberPlay3.getStreamId(), subscriberPlay3); + + + + viewerStats.registerNewViewer(streamId, sessionId, subscriberPlay.getSubscriberId(), ViewerStats.HLS_TYPE, antMediaApplicationAdapter); Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until( ()->viewerStats.getViewerCount(streamId) == 1 ); @@ -226,9 +235,10 @@ public void testSetApplicationContextSubscribers() { Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until( ()->viewerStats.getTotalViewerCount() == 1 ); - + + //Viewer timeout increase - viewerStats.registerNewViewer(streamId, sessionId, subscriberPlay2.getSubscriberId()); + viewerStats.registerNewViewer(streamId, sessionId, subscriberPlay2.getSubscriberId(), ViewerStats.HLS_TYPE, antMediaApplicationAdapter); Awaitility.await().atMost(15, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until( ()-> { @@ -275,7 +285,7 @@ public void testSetApplicationContextSubscribers() { ()-> dsf.getDataStore().save(broadcast).equals(streamId)); - viewerStats.registerNewViewer(streamId, sessionId, subscriberPlay3.getSubscriberId()); + viewerStats.registerNewViewer(streamId, sessionId, subscriberPlay3.getSubscriberId(), ViewerStats.HLS_TYPE, antMediaApplicationAdapter); Awaitility.await().atMost(20, TimeUnit.SECONDS).until( ()-> viewerStats.getViewerCount(streamId) == 1); @@ -329,7 +339,10 @@ public void testSetApplicationContext() { when(context.getBean(IAntMediaStreamHandler.VERTX_BEAN_NAME)).thenReturn(vertx); AppSettings settings = mock(AppSettings.class); - + AntMediaApplicationAdapter antMediaApplicationAdapter = mock(AntMediaApplicationAdapter.class); + when((AntMediaApplicationAdapter) context.getBean(AntMediaApplicationAdapter.BEAN_NAME)).thenReturn(antMediaApplicationAdapter); + antMediaApplicationAdapter.setDataStoreFactory(dsf); + antMediaApplicationAdapter.setAppSettings(settings); //set hls time to 1 when(settings.getHlsTime()).thenReturn("1"); @@ -354,7 +367,7 @@ public void testSetApplicationContext() { String sessionId = "sessionId" + (int)(Math.random() * 10000); - viewerStats.registerNewViewer(streamId, sessionId, null); + viewerStats.registerNewViewer(streamId, sessionId, null, ViewerStats.HLS_TYPE, antMediaApplicationAdapter); Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until( ()->viewerStats.getViewerCount(streamId) == 1 ); @@ -366,7 +379,7 @@ public void testSetApplicationContext() { ()->viewerStats.getTotalViewerCount() == 1 ); //Viewer timeout increase - viewerStats.registerNewViewer(streamId, sessionId, null); + viewerStats.registerNewViewer(streamId, sessionId, null, ViewerStats.HLS_TYPE, antMediaApplicationAdapter); // Check viewer is online Awaitility.await().atMost(20, TimeUnit.SECONDS).until( @@ -388,7 +401,7 @@ public void testSetApplicationContext() { ()-> dsf.getDataStore().save(broadcast).equals(streamId)); - viewerStats.registerNewViewer(streamId, sessionId, null); + viewerStats.registerNewViewer(streamId, sessionId, null, ViewerStats.HLS_TYPE, antMediaApplicationAdapter); Awaitility.await().atMost(20, TimeUnit.SECONDS).until( ()-> viewerStats.getViewerCount(streamId) == 1);