Skip to content

Commit

Permalink
New webhooks playStart, playStop, recordStart issue #4666
Browse files Browse the repository at this point in the history
  • Loading branch information
lastpeony committed Jan 17, 2023
1 parent 8d66694 commit 878221c
Show file tree
Hide file tree
Showing 8 changed files with 33 additions and 40 deletions.
3 changes: 1 addition & 2 deletions src/main/java/io/antmedia/filter/DashStatisticsFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.HttpMethod;

import io.antmedia.statistic.ViewerStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -51,7 +50,7 @@ public void doFilter(ServletRequest request, ServletResponse response, FilterCha
logger.debug("req ip {} session id {} stream id {} status {}", request.getRemoteHost(), sessionId, streamId, status);
IStreamStats stats = getStreamStats(DashViewerStats.BEAN_NAME);
if (stats != null) {
stats.registerNewViewer(streamId, sessionId, subscriberId, ViewerStats.DASH_TYPE, getAntMediaApplicationAdapter());
stats.registerNewViewer(streamId, sessionId, subscriberId, getAntMediaApplicationAdapter());
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions src/main/java/io/antmedia/filter/HlsStatisticsFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.HttpMethod;

import io.antmedia.statistic.ViewerStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -53,7 +52,7 @@ public void doFilter(ServletRequest request, ServletResponse response, FilterCha
logger.debug("req ip {} session id {} stream id {} status {}", request.getRemoteHost(), sessionId, streamId, status);
IStreamStats stats = getStreamStats(HlsViewerStats.BEAN_NAME);
if (stats != null) {
stats.registerNewViewer(streamId, sessionId, subscriberId, ViewerStats.HLS_TYPE, getAntMediaApplicationAdapter());
stats.registerNewViewer(streamId, sessionId, subscriberId, getAntMediaApplicationAdapter());
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/io/antmedia/statistic/IStreamStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ public interface IStreamStats {

/**
* Register a new viewer to a stream
*
* @param streamId
* @param sessionId
*/
void registerNewViewer(String streamId, String sessionId, String subscriberId, String playType, AntMediaApplicationAdapter antMediaApplicationAdapter);
void registerNewViewer(String streamId, String sessionId, String subscriberId, AntMediaApplicationAdapter antMediaApplicationAdapter);


/**
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/antmedia/statistic/ViewerStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class ViewerStats {
*/
protected int timeoutMS = 20000;

public void registerNewViewer(String streamId, String sessionId, String subscriberId, String viewerPlayType, AntMediaApplicationAdapter antMediaApplicationAdapter)
public void registerNewViewer(String streamId, String sessionId, String subscriberId, AntMediaApplicationAdapter antMediaApplicationAdapter)
{
//do not block the thread, run in vertx event queue
vertx.runOnContext(h -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import static org.mockito.Mockito.*;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
Expand All @@ -16,12 +15,8 @@
import javax.servlet.http.HttpSession;

import io.antmedia.AntMediaApplicationAdapter;
import io.antmedia.statistic.ViewerStats;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.awaitility.Awaitility;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
Expand Down Expand Up @@ -165,7 +160,7 @@ public void testDoFilter() {
logger.info("session id {}, stream id {}", sessionId, streamId);
dashStatisticsFilter.doFilter(mockRequest, mockResponse, mockChain);

verify(streamStats, times(1)).registerNewViewer(streamId, sessionId, null, ViewerStats.DASH_TYPE, antMediaApplicationAdapter);
verify(streamStats, times(1)).registerNewViewer(streamId, sessionId, null, antMediaApplicationAdapter);
/*verify(antMediaApplicationAdapter, times(1)).sendStartPlayWebHook(ViewerStats.DASH_TYPE, streamId, null);
Awaitility.await().atMost(10, TimeUnit.SECONDS).until(()-> {
Expand Down Expand Up @@ -239,15 +234,15 @@ public void testDASHViewerLimit() {
//when(dashStatisticsFilter.getStreamStats()).thenReturn(streamStats);

String sessionId = requestDash(streamId);
verify(streamStats, times(1)).registerNewViewer(streamId, sessionId, null, ViewerStats.DASH_TYPE, antMediaApplicationAdapter);
verify(streamStats, times(1)).registerNewViewer(streamId, sessionId, null, antMediaApplicationAdapter);
broadcast.setDashViewerCount(1);

String sessionId2 = requestDash(streamId);
verify(streamStats, times(1)).registerNewViewer(streamId, sessionId2, null, ViewerStats.DASH_TYPE, antMediaApplicationAdapter);
verify(streamStats, times(1)).registerNewViewer(streamId, sessionId2, null, antMediaApplicationAdapter);
broadcast.setDashViewerCount(2);

String sessionId3 = requestDash(streamId);
verify(streamStats, never()).registerNewViewer(streamId, sessionId3, null, ViewerStats.DASH_TYPE, antMediaApplicationAdapter);
verify(streamStats, never()).registerNewViewer(streamId, sessionId3, null, antMediaApplicationAdapter);
} catch (ServletException|IOException e) {
logger.error(ExceptionUtils.getStackTrace(e));
fail(ExceptionUtils.getStackTrace(e));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import javax.servlet.http.HttpSession;

import io.antmedia.AntMediaApplicationAdapter;
import io.antmedia.statistic.ViewerStats;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.junit.After;
Expand Down Expand Up @@ -163,7 +162,7 @@ public void testDoFilter() {
logger.info("session id {}, stream id {}", sessionId, streamId);
hlsStatisticsFilter.doFilter(mockRequest, mockResponse, mockChain);

verify(streamStats, times(1)).registerNewViewer(streamId, sessionId, null, ViewerStats.HLS_TYPE, antMediaApplicationAdapter);
verify(streamStats, times(1)).registerNewViewer(streamId, sessionId, null, antMediaApplicationAdapter);

} catch (ServletException|IOException e) {
logger.error(ExceptionUtils.getStackTrace(e));
Expand Down Expand Up @@ -212,15 +211,15 @@ public void testHLSViewerLimit() {
hlsStatisticsFilter.init(filterconfig);

String sessionId = requestHls(streamId);
verify(streamStats, times(1)).registerNewViewer(streamId, sessionId, null, ViewerStats.HLS_TYPE, antMediaApplicationAdapter);
verify(streamStats, times(1)).registerNewViewer(streamId, sessionId, null, antMediaApplicationAdapter);
broadcast.setHlsViewerCount(1);

String sessionId2 = requestHls(streamId);
verify(streamStats, times(1)).registerNewViewer(streamId, sessionId2, null, ViewerStats.HLS_TYPE, antMediaApplicationAdapter);
verify(streamStats, times(1)).registerNewViewer(streamId, sessionId2, null, antMediaApplicationAdapter);
broadcast.setHlsViewerCount(2);

String sessionId3 = requestHls(streamId);
verify(streamStats, never()).registerNewViewer(streamId, sessionId3, null, ViewerStats.HLS_TYPE, antMediaApplicationAdapter);
verify(streamStats, never()).registerNewViewer(streamId, sessionId3, null, antMediaApplicationAdapter);
} catch (ServletException|IOException e) {
logger.error(ExceptionUtils.getStackTrace(e));
fail(ExceptionUtils.getStackTrace(e));
Expand Down
22 changes: 11 additions & 11 deletions src/test/java/io/antmedia/test/statistic/DashViewerStatsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void testDASHViewerCount() {

for (int i = 0; i < 100; i++) {
String sessionId = String.valueOf((Math.random() * 999999));
viewerStats.registerNewViewer(streamId, sessionId, null, ViewerStats.DASH_TYPE, antMediaApplicationAdapter);
viewerStats.registerNewViewer(streamId, sessionId, null, antMediaApplicationAdapter);
}

Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until(
Expand All @@ -86,7 +86,7 @@ public void testDASHViewerCount() {
//Add same session ID
for (int i = 0; i < 10; i++) {
String sessionId = "sameSessionID";
viewerStats.registerNewViewer(streamId, sessionId, null, ViewerStats.DASH_TYPE, antMediaApplicationAdapter);
viewerStats.registerNewViewer(streamId, sessionId, null, antMediaApplicationAdapter);
}

Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until(
Expand Down Expand Up @@ -122,7 +122,7 @@ public void testSubscriberEvents() {
// check if viewer is added
AntMediaApplicationAdapter antMediaApplicationAdapter = mock(AntMediaApplicationAdapter.class);

viewerStats.registerNewViewer(streamId, sessionId, subscriberPlay.getSubscriberId(), ViewerStats.DASH_TYPE, antMediaApplicationAdapter);
viewerStats.registerNewViewer(streamId, sessionId, subscriberPlay.getSubscriberId(), antMediaApplicationAdapter);
Awaitility.await().atMost(15, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until(
()-> {
boolean eventExist = false;
Expand Down Expand Up @@ -248,8 +248,8 @@ public void testSetApplicationContextSubscribers() {

//spyAdapter.setDataStoreFactory(dsf);

viewerStats.registerNewViewer(streamId, sessionId, subscriberPlay.getSubscriberId(), ViewerStats.DASH_TYPE, spyAdapter);
viewerStats.registerNewViewer(streamId, sessionId2, subscriberPlay.getSubscriberId(), ViewerStats.DASH_TYPE, spyAdapter);
viewerStats.registerNewViewer(streamId, sessionId, subscriberPlay.getSubscriberId(), spyAdapter);
viewerStats.registerNewViewer(streamId, sessionId2, subscriberPlay.getSubscriberId(), spyAdapter);


Awaitility.await().atMost(5, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until(
Expand All @@ -276,8 +276,8 @@ public void testSetApplicationContextSubscribers() {
()->viewerStats.getTotalViewerCount() == 2 );

//Viewer timeout increase
viewerStats.registerNewViewer(streamId, sessionId, subscriberPlay2.getSubscriberId(), ViewerStats.DASH_TYPE, spyAdapter);
viewerStats.registerNewViewer(streamId, sessionId2, subscriberPlay2.getSubscriberId(), ViewerStats.DASH_TYPE, spyAdapter);
viewerStats.registerNewViewer(streamId, sessionId, subscriberPlay2.getSubscriberId(), spyAdapter);
viewerStats.registerNewViewer(streamId, sessionId2, subscriberPlay2.getSubscriberId(), spyAdapter);

Awaitility.await().atMost(15, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until(
()-> {
Expand Down Expand Up @@ -339,7 +339,7 @@ public void testSetApplicationContextSubscribers() {
()-> dsf.getDataStore().save(broadcast).equals(streamId));


viewerStats.registerNewViewer(streamId, sessionId, subscriberPlay3.getSubscriberId(), ViewerStats.DASH_TYPE, spyAdapter);
viewerStats.registerNewViewer(streamId, sessionId, subscriberPlay3.getSubscriberId(), spyAdapter);

Awaitility.await().atMost(20, TimeUnit.SECONDS).until(
()-> viewerStats.getViewerCount(streamId) == 1);
Expand Down Expand Up @@ -423,7 +423,7 @@ public void testSetApplicationContext() {
String sessionId = "sessionId" + (int)(Math.random() * 10000);
antMediaApplicationAdapter.setAppSettings(settings);

viewerStats.registerNewViewer(streamId, sessionId, null, ViewerStats.DASH_TYPE, antMediaApplicationAdapter);
viewerStats.registerNewViewer(streamId, sessionId, null, antMediaApplicationAdapter);

Awaitility.await().atMost(30, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until(
()->viewerStats.getViewerCount(streamId) == 1 );
Expand All @@ -435,7 +435,7 @@ public void testSetApplicationContext() {
()->viewerStats.getTotalViewerCount() == 1 );

//Viewer timeout increase
viewerStats.registerNewViewer(streamId, sessionId, null, ViewerStats.DASH_TYPE, antMediaApplicationAdapter);
viewerStats.registerNewViewer(streamId, sessionId, null, antMediaApplicationAdapter);

// Check viewer is online
Awaitility.await().atMost(30, TimeUnit.SECONDS).until(
Expand All @@ -456,7 +456,7 @@ public void testSetApplicationContext() {
Awaitility.await().atMost(30, TimeUnit.SECONDS).until(
()-> dsf.getDataStore().save(broadcast).equals(streamId));

viewerStats.registerNewViewer(streamId, sessionId, null, ViewerStats.DASH_TYPE, antMediaApplicationAdapter);
viewerStats.registerNewViewer(streamId, sessionId, null, antMediaApplicationAdapter);

Awaitility.await().atMost(30, TimeUnit.SECONDS).until(
()-> viewerStats.getViewerCount(streamId) == 1);
Expand Down
18 changes: 9 additions & 9 deletions src/test/java/io/antmedia/test/statistic/HlsViewerStatsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void testHLSViewerCount() {

for (int i = 0; i < 100; i++) {
String sessionId = String.valueOf((Math.random() * 999999));
viewerStats.registerNewViewer(streamId, sessionId, null, ViewerStats.HLS_TYPE, antMediaApplicationAdapter);
viewerStats.registerNewViewer(streamId, sessionId, null, antMediaApplicationAdapter);
}

Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until(
Expand All @@ -83,7 +83,7 @@ public void testHLSViewerCount() {
//Add same session ID
for (int i = 0; i < 10; i++) {
String sessionId = "sameSessionID";
viewerStats.registerNewViewer(streamId, sessionId, null, ViewerStats.HLS_TYPE, antMediaApplicationAdapter);
viewerStats.registerNewViewer(streamId, sessionId, null, antMediaApplicationAdapter);
}

Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until(
Expand Down Expand Up @@ -121,7 +121,7 @@ public void testSubscriberEvents() {


// check if viewer is added
viewerStats.registerNewViewer(streamId, sessionId, subscriberPlay.getSubscriberId(), ViewerStats.HLS_TYPE, antMediaApplicationAdapter);
viewerStats.registerNewViewer(streamId, sessionId, subscriberPlay.getSubscriberId(), antMediaApplicationAdapter);
Awaitility.await().atMost(15, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until(
()-> {
boolean eventExist = false;
Expand Down Expand Up @@ -223,7 +223,7 @@ public void testSetApplicationContextSubscribers() {
subscriberPlay3.setType(Subscriber.PLAY_TYPE);
dsf.getDataStore().addSubscriber(subscriberPlay3.getStreamId(), subscriberPlay3);

viewerStats.registerNewViewer(streamId, sessionId, subscriberPlay.getSubscriberId(), ViewerStats.HLS_TYPE, antMediaApplicationAdapter);
viewerStats.registerNewViewer(streamId, sessionId, subscriberPlay.getSubscriberId(), antMediaApplicationAdapter);

Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until(
()->viewerStats.getViewerCount(streamId) == 1 );
Expand All @@ -236,7 +236,7 @@ public void testSetApplicationContextSubscribers() {


//Viewer timeout increase
viewerStats.registerNewViewer(streamId, sessionId, subscriberPlay2.getSubscriberId(), ViewerStats.HLS_TYPE, antMediaApplicationAdapter);
viewerStats.registerNewViewer(streamId, sessionId, subscriberPlay2.getSubscriberId(), antMediaApplicationAdapter);

Awaitility.await().atMost(15, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until(
()-> {
Expand Down Expand Up @@ -283,7 +283,7 @@ public void testSetApplicationContextSubscribers() {
()-> dsf.getDataStore().save(broadcast).equals(streamId));


viewerStats.registerNewViewer(streamId, sessionId, subscriberPlay3.getSubscriberId(), ViewerStats.HLS_TYPE, antMediaApplicationAdapter);
viewerStats.registerNewViewer(streamId, sessionId, subscriberPlay3.getSubscriberId(), antMediaApplicationAdapter);

Awaitility.await().atMost(20, TimeUnit.SECONDS).until(
()-> viewerStats.getViewerCount(streamId) == 1);
Expand Down Expand Up @@ -365,7 +365,7 @@ public void testSetApplicationContext() {

String sessionId = "sessionId" + (int)(Math.random() * 10000);

viewerStats.registerNewViewer(streamId, sessionId, null, ViewerStats.HLS_TYPE, antMediaApplicationAdapter);
viewerStats.registerNewViewer(streamId, sessionId, null, antMediaApplicationAdapter);

Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until(
()->viewerStats.getViewerCount(streamId) == 1 );
Expand All @@ -377,7 +377,7 @@ public void testSetApplicationContext() {
()->viewerStats.getTotalViewerCount() == 1 );

//Viewer timeout increase
viewerStats.registerNewViewer(streamId, sessionId, null, ViewerStats.HLS_TYPE, antMediaApplicationAdapter);
viewerStats.registerNewViewer(streamId, sessionId, null, antMediaApplicationAdapter);

// Check viewer is online
Awaitility.await().atMost(20, TimeUnit.SECONDS).until(
Expand All @@ -399,7 +399,7 @@ public void testSetApplicationContext() {
()-> dsf.getDataStore().save(broadcast).equals(streamId));


viewerStats.registerNewViewer(streamId, sessionId, null, ViewerStats.HLS_TYPE, antMediaApplicationAdapter);
viewerStats.registerNewViewer(streamId, sessionId, null, antMediaApplicationAdapter);

Awaitility.await().atMost(20, TimeUnit.SECONDS).until(
()-> viewerStats.getViewerCount(streamId) == 1);
Expand Down

0 comments on commit 878221c

Please sign in to comment.