From f8f2029d5b5d0935c87f6bbe986e49d102ad7cb5 Mon Sep 17 00:00:00 2001 From: Stefan Kieszkowski <85728496+stefankiesz@users.noreply.github.com> Date: Wed, 13 Nov 2024 17:37:38 -0800 Subject: [PATCH] Memory Optimization - Refactor TWCC (#1934) (#2075) * Refactor TWCC (#1934) * Remove stackqueue usage * Use hashtable instead..working logic * Cleanup, increase hash table size, fix loop bug * method 2, array of pointers * Add rolling window * rolling window with hashtable * hash table with rw * Fix bug * Fix twcc unit test * Cleanup rw logic * Cleanup * Cleanup logic * Update README * unused var fix * Use defines for hash table size * Address comments, disable TWCC by default * readme * Fix windows gst issue * Comments * Remove enableIceStats merge conflict * Address comments * Fix left over typo * Address comments, refactor updaing of TWCC hash table in onRtcpTwccPacket for testing * Clang format, fix compiler Werror * Correct typos * Fix "expression result unused" Werrors * Remove unused variable * Address comments * Fix endless loop * Address comment * Add removal of null hashTable items, add test for null item in hashTable * Address comments * Cleanup freeing of KvsPeerConnection * Clang format --------- Co-authored-by: Divya Sampath Kumar --- src/source/PeerConnection/PeerConnection.c | 134 ++++++++++---- src/source/PeerConnection/PeerConnection.h | 18 +- src/source/PeerConnection/Rtcp.c | 163 ++++++++++++++---- src/source/PeerConnection/Rtcp.h | 1 + .../PeerConnection/SessionDescription.h | 2 +- tst/RtcpFunctionalityTest.cpp | 132 ++++++++++++-- 6 files changed, 356 insertions(+), 94 deletions(-) diff --git a/src/source/PeerConnection/PeerConnection.c b/src/source/PeerConnection/PeerConnection.c index 2aee95885d..ce5dd86bc7 100644 --- a/src/source/PeerConnection/PeerConnection.c +++ b/src/source/PeerConnection/PeerConnection.c @@ -1005,6 +1005,8 @@ STATUS createPeerConnection(PRtcConfiguration pConfiguration, PRtcPeerConnection pKvsPeerConnection->twccLock = MUTEX_CREATE(TRUE); pKvsPeerConnection->pTwccManager = (PTwccManager) MEMCALLOC(1, SIZEOF(TwccManager)); CHK(pKvsPeerConnection->pTwccManager != NULL, STATUS_NOT_ENOUGH_MEMORY); + CHK_STATUS(hashTableCreateWithParams(TWCC_HASH_TABLE_BUCKET_COUNT, TWCC_HASH_TABLE_BUCKET_LENGTH, + &pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable)); } *ppPeerConnection = (PRtcPeerConnection) pKvsPeerConnection; @@ -1041,10 +1043,12 @@ STATUS freePeerConnection(PRtcPeerConnection* ppPeerConnection) { ENTERS(); STATUS retStatus = STATUS_SUCCESS; - PKvsPeerConnection pKvsPeerConnection; + PKvsPeerConnection pKvsPeerConnection = NULL; PDoubleListNode pCurNode = NULL; UINT64 item = 0; UINT64 startTime; + UINT32 twccHashTableCount = 0; + BOOL twccLocked = FALSE; CHK(ppPeerConnection != NULL, STATUS_NULL_ARG); @@ -1113,20 +1117,34 @@ STATUS freePeerConnection(PRtcPeerConnection* ppPeerConnection) } if (pKvsPeerConnection->pTwccManager != NULL) { - if (IS_VALID_MUTEX_VALUE(pKvsPeerConnection->twccLock)) { - MUTEX_FREE(pKvsPeerConnection->twccLock); + MUTEX_LOCK(pKvsPeerConnection->twccLock); + twccLocked = TRUE; + + if (STATUS_SUCCEEDED(hashTableGetCount(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable, &twccHashTableCount))) { + DLOGI("Number of TWCC info packets in memory: %d", twccHashTableCount); } - // twccManager.twccPackets contains sequence numbers of packets (as opposed to pointers to actual packets) - // we should not deallocate items but we do need to clear the queue - CHK_LOG_ERR(stackQueueClear(&pKvsPeerConnection->pTwccManager->twccPackets, FALSE)); + + CHK_LOG_ERR(hashTableIterateEntries(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable, 0, freeHashEntry)); + CHK_LOG_ERR(hashTableFree(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable)); + SAFE_MEMFREE(pKvsPeerConnection->pTwccManager); } // Incase the `RemoteSessionDescription` has not already been freed. SAFE_MEMFREE(pKvsPeerConnection->pRemoteSessionDescription); + if (IS_VALID_MUTEX_VALUE(pKvsPeerConnection->twccLock)) { + if (twccLocked) { + MUTEX_UNLOCK(pKvsPeerConnection->twccLock); + twccLocked = FALSE; + } + MUTEX_FREE(pKvsPeerConnection->twccLock); + pKvsPeerConnection->twccLock = INVALID_MUTEX_VALUE; + } + PROFILE_WITH_START_TIME_OBJ(startTime, pKvsPeerConnection->peerConnectionDiagnostics.freePeerConnectionTime, "Free peer connection"); SAFE_MEMFREE(*ppPeerConnection); + CleanUp: LEAVES(); @@ -1826,47 +1844,93 @@ STATUS deinitKvsWebRtc(VOID) return retStatus; } -STATUS twccManagerOnPacketSent(PKvsPeerConnection pc, PRtpPacket pRtpPacket) +// Not thread safe. Ensure this function is invoked in a guarded section +static STATUS twccRollingWindowDeletion(PKvsPeerConnection pKvsPeerConnection, PRtpPacket pRtpPacket, UINT16 endingSeqNum) +{ + ENTERS(); + STATUS retStatus = STATUS_SUCCESS; + UINT16 updatedSeqNum = 0; + PTwccRtpPacketInfo tempTwccRtpPktInfo = NULL; + UINT64 ageOfOldest = 0, firstRtpTime = 0; + UINT64 twccPacketValue = 0; + BOOL isCheckComplete = FALSE; + + CHK(pKvsPeerConnection != NULL && pRtpPacket != NULL && pKvsPeerConnection->pTwccManager != NULL, STATUS_NULL_ARG); + + updatedSeqNum = pKvsPeerConnection->pTwccManager->firstSeqNumInRollingWindow; + do { + // If the seqNum is not present in the hash table, it is ok. We move on to the next + if (STATUS_SUCCEEDED(hashTableGet(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable, updatedSeqNum, &twccPacketValue))) { + tempTwccRtpPktInfo = (PTwccRtpPacketInfo) twccPacketValue; + if (tempTwccRtpPktInfo != NULL) { + firstRtpTime = tempTwccRtpPktInfo->localTimeKvs; + // Would be the case if the timestamps are not monotonically increasing. + if (pRtpPacket->sentTime >= firstRtpTime) { + ageOfOldest = pRtpPacket->sentTime - firstRtpTime; + if (ageOfOldest > TWCC_ESTIMATOR_TIME_WINDOW) { + // If the seqNum is not present in the hash table, move on. However, this case should not happen + // given this function is holding the lock and tempTwccRtpPktInfo is populated because it exists + if (STATUS_SUCCEEDED(hashTableRemove(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable, updatedSeqNum))) { + SAFE_MEMFREE(tempTwccRtpPktInfo); + } + updatedSeqNum++; + } else { + isCheckComplete = TRUE; + } + } else { + // Move to the next seqNum to check if we can remove the next one atleast + DLOGV("Non-monotonic timestamp detected for RTP packet seqNum %d [ts: %" PRIu64 ". Current RTP packets' ts: %" PRIu64, + updatedSeqNum, firstRtpTime, pRtpPacket->sentTime); + updatedSeqNum++; + } + } else { + CHK_STATUS(hashTableRemove(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable, updatedSeqNum)); + updatedSeqNum++; + } + } else { + updatedSeqNum++; + } + // reset before next iteration + tempTwccRtpPktInfo = NULL; + } while (!isCheckComplete && updatedSeqNum != (endingSeqNum + 1)); + + // Update regardless. The loop checks until current RTP packets seq number irrespective of the failure + pKvsPeerConnection->pTwccManager->firstSeqNumInRollingWindow = updatedSeqNum; +CleanUp: + CHK_LOG_ERR(retStatus); + + LEAVES(); + return retStatus; +} + +STATUS twccManagerOnPacketSent(PKvsPeerConnection pKvsPeerConnection, PRtpPacket pRtpPacket) { ENTERS(); STATUS retStatus = STATUS_SUCCESS; BOOL locked = FALSE; - UINT64 sn = 0; - UINT16 seqNum; - BOOL isEmpty = FALSE; - INT64 firstTimeKvs, lastLocalTimeKvs, ageOfOldest; - CHK(pc != NULL && pRtpPacket != NULL, STATUS_NULL_ARG); - CHK(pc->onSenderBandwidthEstimation != NULL && pc->pTwccManager != NULL, STATUS_SUCCESS); + UINT16 seqNum = 0; + PTwccRtpPacketInfo pTwccRtpPktInfo = NULL; + + CHK(pKvsPeerConnection != NULL && pRtpPacket != NULL, STATUS_NULL_ARG); + CHK(pKvsPeerConnection->onSenderBandwidthEstimation != NULL && pKvsPeerConnection->pTwccManager != NULL, STATUS_SUCCESS); CHK(TWCC_EXT_PROFILE == pRtpPacket->header.extensionProfile, STATUS_SUCCESS); - MUTEX_LOCK(pc->twccLock); + MUTEX_LOCK(pKvsPeerConnection->twccLock); locked = TRUE; + CHK((pTwccRtpPktInfo = MEMCALLOC(1, SIZEOF(TwccRtpPacketInfo))) != NULL, STATUS_NOT_ENOUGH_MEMORY); + + pTwccRtpPktInfo->packetSize = pRtpPacket->payloadLength; + pTwccRtpPktInfo->localTimeKvs = pRtpPacket->sentTime; + pTwccRtpPktInfo->remoteTimeKvs = TWCC_PACKET_LOST_TIME; seqNum = TWCC_SEQNUM(pRtpPacket->header.extensionPayload); - CHK_STATUS(stackQueueEnqueue(&pc->pTwccManager->twccPackets, seqNum)); - pc->pTwccManager->twccPacketBySeqNum[seqNum].seqNum = seqNum; - pc->pTwccManager->twccPacketBySeqNum[seqNum].packetSize = pRtpPacket->payloadLength; - pc->pTwccManager->twccPacketBySeqNum[seqNum].localTimeKvs = pRtpPacket->sentTime; - pc->pTwccManager->twccPacketBySeqNum[seqNum].remoteTimeKvs = TWCC_PACKET_LOST_TIME; - pc->pTwccManager->lastLocalTimeKvs = pRtpPacket->sentTime; - - // cleanup queue until it contains up to 2 seconds of sent packets - do { - CHK_STATUS(stackQueuePeek(&pc->pTwccManager->twccPackets, &sn)); - firstTimeKvs = pc->pTwccManager->twccPacketBySeqNum[(UINT16) sn].localTimeKvs; - lastLocalTimeKvs = pRtpPacket->sentTime; - ageOfOldest = lastLocalTimeKvs - firstTimeKvs; - if (ageOfOldest > TWCC_ESTIMATOR_TIME_WINDOW) { - CHK_STATUS(stackQueueDequeue(&pc->pTwccManager->twccPackets, &sn)); - CHK_STATUS(stackQueueIsEmpty(&pc->pTwccManager->twccPackets, &isEmpty)); - } else { - break; - } - } while (!isEmpty); + CHK_STATUS(hashTableUpsert(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable, seqNum, (UINT64) pTwccRtpPktInfo)); + // Ensure twccRollingWindowDeletion is run in a guarded section + CHK_STATUS(twccRollingWindowDeletion(pKvsPeerConnection, pRtpPacket, seqNum)); CleanUp: if (locked) { - MUTEX_UNLOCK(pc->twccLock); + MUTEX_UNLOCK(pKvsPeerConnection->twccLock); } CHK_LOG_ERR(retStatus); diff --git a/src/source/PeerConnection/PeerConnection.h b/src/source/PeerConnection/PeerConnection.h index 79d5d85085..7311ec9794 100644 --- a/src/source/PeerConnection/PeerConnection.h +++ b/src/source/PeerConnection/PeerConnection.h @@ -31,6 +31,8 @@ extern "C" { #define CODEC_HASH_TABLE_BUCKET_LENGTH 2 #define RTX_HASH_TABLE_BUCKET_COUNT 50 #define RTX_HASH_TABLE_BUCKET_LENGTH 2 +#define TWCC_HASH_TABLE_BUCKET_COUNT 100 +#define TWCC_HASH_TABLE_BUCKET_LENGTH 2 #define DATA_CHANNEL_HASH_TABLE_BUCKET_COUNT 200 #define DATA_CHANNEL_HASH_TABLE_BUCKET_LENGTH 2 @@ -47,17 +49,16 @@ typedef enum { } RTX_CODEC; typedef struct { - UINT16 seqNum; - UINT16 packetSize; UINT64 localTimeKvs; UINT64 remoteTimeKvs; -} TwccPacket, *PTwccPacket; + UINT32 packetSize; +} TwccRtpPacketInfo, *PTwccRtpPacketInfo; typedef struct { - StackQueue twccPackets; - TwccPacket twccPacketBySeqNum[65536]; // twccPacketBySeqNum takes about 1.2MB of RAM but provides great cache locality - UINT64 lastLocalTimeKvs; - UINT16 lastReportedSeqNum; + PHashTable pTwccRtpPktInfosHashTable; // Hash table of [seqNum, PTwccPacket] + UINT16 firstSeqNumInRollingWindow; // To monitor the last deleted packet in the rolling window + UINT16 lastReportedSeqNum; // To monitor the last packet's seqNum in the TWCC response + UINT16 prevReportedBaseSeqNum; // To monitor the base seqNum in the TWCC response } TwccManager, *PTwccManager; typedef struct { @@ -70,7 +71,7 @@ typedef struct { typedef struct { RtcPeerConnection peerConnection; - // UINT32 padding padding makes transportWideSequenceNumber 64bit aligned + // UINT32 padding makes transportWideSequenceNumber 64bit aligned // we put atomics at the top of structs because customers application could set the packing to 0 // in which case any atomic operations would result in bus errors if there is a misalignment // for more see https://github.com/awslabs/amazon-kinesis-video-streams-webrtc-sdk-c/pull/987#discussion_r534432907 @@ -183,6 +184,7 @@ VOID onSctpSessionDataChannelOpen(UINT64, UINT32, PBYTE, UINT32); STATUS sendPacketToRtpReceiver(PKvsPeerConnection, PBYTE, UINT32); STATUS changePeerConnectionState(PKvsPeerConnection, RTC_PEER_CONNECTION_STATE); STATUS twccManagerOnPacketSent(PKvsPeerConnection, PRtpPacket); +UINT32 parseExtId(PCHAR); // visible for testing only VOID onIceConnectionStateChange(UINT64, UINT64); diff --git a/src/source/PeerConnection/Rtcp.c b/src/source/PeerConnection/Rtcp.c index 0279299467..f4c8147c49 100644 --- a/src/source/PeerConnection/Rtcp.c +++ b/src/source/PeerConnection/Rtcp.c @@ -185,11 +185,13 @@ STATUS parseRtcpTwccPacket(PRtcpPacket pRtcpPacket, PTwccManager pTwccManager) UINT32 statuses; UINT32 i; UINT64 referenceTime; + PTwccRtpPacketInfo pTwccPacket = NULL; + UINT64 twccPktValue = 0; CHK(pTwccManager != NULL && pRtcpPacket != NULL, STATUS_NULL_ARG); baseSeqNum = getUnalignedInt16BigEndian(pRtcpPacket->payload + 8); + pTwccManager->prevReportedBaseSeqNum = baseSeqNum; packetStatusCount = TWCC_PACKET_STATUS_COUNT(pRtcpPacket->payload); - referenceTime = (pRtcpPacket->payload[12] << 16) | (pRtcpPacket->payload[13] << 8) | (pRtcpPacket->payload[14] & 0xff); referenceTime = KVS_CONVERT_TIMESCALE(referenceTime * 64, MILLISECONDS_PER_SECOND, HUNDREDS_OF_NANOS_IN_A_SECOND); // TODO: handle lost twcc report packets @@ -205,7 +207,6 @@ STATUS parseRtcpTwccPacket(PRtcpPacket pRtcpPacket, PTwccManager pTwccManager) } chunkOffset += TWCC_FB_PACKETCHUNK_SIZE; } - recvOffset = chunkOffset; chunkOffset = 16; packetSeqNum = baseSeqNum; @@ -227,7 +228,14 @@ STATUS parseRtcpTwccPacket(PRtcpPacket pRtcpPacket, PTwccManager pTwccManager) break; case TWCC_STATUS_SYMBOL_NOTRECEIVED: DLOGS("runLength packetSeqNum %u not received %lu", packetSeqNum, referenceTime); - pTwccManager->twccPacketBySeqNum[packetSeqNum].remoteTimeKvs = TWCC_PACKET_LOST_TIME; + // If it does not exist it means the packet was already visited + if (STATUS_SUCCEEDED(hashTableGet(pTwccManager->pTwccRtpPktInfosHashTable, packetSeqNum, &twccPktValue))) { + pTwccPacket = (PTwccRtpPacketInfo) twccPktValue; + if (pTwccPacket != NULL) { + pTwccPacket->remoteTimeKvs = TWCC_PACKET_LOST_TIME; + CHK_STATUS(hashTableUpsert(pTwccManager->pTwccRtpPktInfosHashTable, packetSeqNum, (UINT64) pTwccPacket)); + } + } pTwccManager->lastReportedSeqNum = packetSeqNum; break; default: @@ -236,11 +244,21 @@ STATUS parseRtcpTwccPacket(PRtcpPacket pRtcpPacket, PTwccManager pTwccManager) if (recvDelta != MIN_INT16) { referenceTime += KVS_CONVERT_TIMESCALE(recvDelta, TWCC_TICKS_PER_SECOND, HUNDREDS_OF_NANOS_IN_A_SECOND); DLOGS("runLength packetSeqNum %u received %lu", packetSeqNum, referenceTime); - pTwccManager->twccPacketBySeqNum[packetSeqNum].remoteTimeKvs = referenceTime; + + // If it does not exist it means the packet was already visited + if (STATUS_SUCCEEDED(hashTableGet(pTwccManager->pTwccRtpPktInfosHashTable, packetSeqNum, &twccPktValue))) { + pTwccPacket = (PTwccRtpPacketInfo) twccPktValue; + if (pTwccPacket != NULL) { + pTwccPacket->remoteTimeKvs = referenceTime; + CHK_STATUS(hashTableUpsert(pTwccManager->pTwccRtpPktInfosHashTable, packetSeqNum, (UINT64) pTwccPacket)); + } + } pTwccManager->lastReportedSeqNum = packetSeqNum; } packetSeqNum++; packetsRemaining--; + // Reset to NULL before next iteration + pTwccPacket = NULL; } } else { statuses = MIN(TWCC_STATUSVECTOR_COUNT(packetChunk), packetsRemaining); @@ -259,7 +277,14 @@ STATUS parseRtcpTwccPacket(PRtcpPacket pRtcpPacket, PTwccManager pTwccManager) break; case TWCC_STATUS_SYMBOL_NOTRECEIVED: DLOGS("statusVector packetSeqNum %u not received %lu", packetSeqNum, referenceTime); - pTwccManager->twccPacketBySeqNum[packetSeqNum].remoteTimeKvs = TWCC_PACKET_LOST_TIME; + // If it does not exist it means the packet was already visited + if (STATUS_SUCCEEDED(hashTableGet(pTwccManager->pTwccRtpPktInfosHashTable, packetSeqNum, &twccPktValue))) { + pTwccPacket = (PTwccRtpPacketInfo) twccPktValue; + if (pTwccPacket != NULL) { + pTwccPacket->remoteTimeKvs = TWCC_PACKET_LOST_TIME; + CHK_STATUS(hashTableUpsert(pTwccManager->pTwccRtpPktInfosHashTable, packetSeqNum, (UINT64) pTwccPacket)); + } + } pTwccManager->lastReportedSeqNum = packetSeqNum; break; default: @@ -268,15 +293,104 @@ STATUS parseRtcpTwccPacket(PRtcpPacket pRtcpPacket, PTwccManager pTwccManager) if (recvDelta != MIN_INT16) { referenceTime += KVS_CONVERT_TIMESCALE(recvDelta, TWCC_TICKS_PER_SECOND, HUNDREDS_OF_NANOS_IN_A_SECOND); DLOGS("statusVector packetSeqNum %u received %lu", packetSeqNum, referenceTime); - pTwccManager->twccPacketBySeqNum[packetSeqNum].remoteTimeKvs = referenceTime; + // If it does not exist it means the packet was already visited + if (STATUS_SUCCEEDED(hashTableGet(pTwccManager->pTwccRtpPktInfosHashTable, packetSeqNum, &twccPktValue))) { + pTwccPacket = (PTwccRtpPacketInfo) twccPktValue; + if (pTwccPacket != NULL) { + pTwccPacket->remoteTimeKvs = referenceTime; + CHK_STATUS(hashTableUpsert(pTwccManager->pTwccRtpPktInfosHashTable, packetSeqNum, (UINT64) pTwccPacket)); + } + } pTwccManager->lastReportedSeqNum = packetSeqNum; } packetSeqNum++; packetsRemaining--; + // Reset to NULL before next iteration + pTwccPacket = NULL; } } chunkOffset += TWCC_FB_PACKETCHUNK_SIZE; } + DLOGV("Checking seqNum %d to %d of TWCC reports", baseSeqNum, pTwccManager->lastReportedSeqNum); +CleanUp: + CHK_LOG_ERR(retStatus); + return retStatus; +} + +STATUS updateTwccHashTable(PTwccManager pTwccManager, PINT64 duration, PUINT64 receivedBytes, PUINT64 receivedPackets, PUINT64 sentBytes, + PUINT64 sentPackets) +{ + STATUS retStatus = STATUS_SUCCESS; + UINT64 localStartTimeKvs, localEndTimeKvs = 0; + UINT16 baseSeqNum = 0; + BOOL localStartTimeRecorded = FALSE; + UINT64 twccPktValue = 0; + PTwccRtpPacketInfo pTwccPacket = NULL; + UINT16 seqNum = 0; + + CHK(pTwccManager != NULL && duration != NULL && receivedBytes != NULL && receivedPackets != NULL && sentBytes != NULL && sentPackets != NULL, + STATUS_NULL_ARG); + + *duration = 0; + *receivedBytes = 0; + *receivedPackets = 0; + *sentBytes = 0; + *sentPackets = 0; + + baseSeqNum = pTwccManager->prevReportedBaseSeqNum; + + // Use != instead to cover the case where the group of sequence numbers being checked + // are trending towards MAX_UINT16 and rolling over to 0+, example range [65534, 10] + // We also check for twcc->lastReportedSeqNum + 1 to include the last seq number in the + // report. Without this, we do not check for the seqNum that could cause it to not be cleared + // from memory + for (seqNum = baseSeqNum; seqNum != (pTwccManager->lastReportedSeqNum + 1); seqNum++) { + if (!localStartTimeRecorded) { + // This could happen if the prev packet was deleted as part of rolling window or if there + // is an overlap of RTP packet statuses between TWCC packets. This could also fail if it is + // the first ever packet (seqNum 0) + if (hashTableGet(pTwccManager->pTwccRtpPktInfosHashTable, seqNum - 1, &twccPktValue) == STATUS_HASH_KEY_NOT_PRESENT) { + localStartTimeKvs = TWCC_PACKET_UNITIALIZED_TIME; + } else { + pTwccPacket = (PTwccRtpPacketInfo) twccPktValue; + if (pTwccPacket != NULL) { + localStartTimeKvs = pTwccPacket->localTimeKvs; + localStartTimeRecorded = TRUE; + } + } + if (localStartTimeKvs == TWCC_PACKET_UNITIALIZED_TIME) { + // time not yet set. If prev seqNum was deleted + if (STATUS_SUCCEEDED(hashTableGet(pTwccManager->pTwccRtpPktInfosHashTable, seqNum, &twccPktValue))) { + pTwccPacket = (PTwccRtpPacketInfo) twccPktValue; + if (pTwccPacket != NULL) { + localStartTimeKvs = pTwccPacket->localTimeKvs; + localStartTimeRecorded = TRUE; + } + } + } + } + + // The time it would not succeed is if there is an overlap in the RTP packet status between the TWCC + // packets + if (STATUS_SUCCEEDED(hashTableGet(pTwccManager->pTwccRtpPktInfosHashTable, seqNum, &twccPktValue))) { + pTwccPacket = (PTwccRtpPacketInfo) twccPktValue; + if (pTwccPacket != NULL) { + localEndTimeKvs = pTwccPacket->localTimeKvs; + *duration = localEndTimeKvs - localStartTimeKvs; + *sentBytes += pTwccPacket->packetSize; + (*sentPackets)++; + if (pTwccPacket->remoteTimeKvs != TWCC_PACKET_LOST_TIME) { + *receivedBytes += pTwccPacket->packetSize; + (*receivedPackets)++; + if (STATUS_SUCCEEDED(hashTableRemove(pTwccManager->pTwccRtpPktInfosHashTable, seqNum))) { + SAFE_MEMFREE(pTwccPacket); + } + } + } else { + CHK_STATUS(hashTableRemove(pTwccManager->pTwccRtpPktInfosHashTable, seqNum)); + } + } + } CleanUp: CHK_LOG_ERR(retStatus); @@ -286,46 +400,21 @@ STATUS parseRtcpTwccPacket(PRtcpPacket pRtcpPacket, PTwccManager pTwccManager) STATUS onRtcpTwccPacket(PRtcpPacket pRtcpPacket, PKvsPeerConnection pKvsPeerConnection) { STATUS retStatus = STATUS_SUCCESS; - PTwccManager twcc; + PTwccManager pTwccManager = NULL; BOOL locked = FALSE; - BOOL empty = TRUE; - UINT64 sn = 0; - INT64 ageOfOldestPacket; - UINT64 localStartTimeKvs, localEndTimeKvs; UINT64 sentBytes = 0, receivedBytes = 0; UINT64 sentPackets = 0, receivedPackets = 0; INT64 duration = 0; - UINT16 seqNum; - PTwccPacket twccPacket; CHK(pKvsPeerConnection != NULL && pRtcpPacket != NULL, STATUS_NULL_ARG); - CHK(pKvsPeerConnection->onSenderBandwidthEstimation != NULL && pKvsPeerConnection->pTwccManager != NULL, STATUS_SUCCESS); + CHK(pKvsPeerConnection->pTwccManager != NULL && pKvsPeerConnection->onSenderBandwidthEstimation != NULL, STATUS_SUCCESS); MUTEX_LOCK(pKvsPeerConnection->twccLock); locked = TRUE; - twcc = pKvsPeerConnection->pTwccManager; - CHK_STATUS(parseRtcpTwccPacket(pRtcpPacket, twcc)); - CHK_STATUS(stackQueueIsEmpty(&twcc->twccPackets, &empty)); - CHK(!empty, STATUS_SUCCESS); - CHK_STATUS(stackQueuePeek(&twcc->twccPackets, &sn)); - ageOfOldestPacket = twcc->lastLocalTimeKvs - twcc->twccPacketBySeqNum[(UINT16) sn].localTimeKvs; - CHK(ageOfOldestPacket > TWCC_ESTIMATOR_TIME_WINDOW / 2, STATUS_SUCCESS); - localStartTimeKvs = twcc->twccPacketBySeqNum[(UINT16) (sn - 1)].localTimeKvs; - if (localStartTimeKvs == TWCC_PACKET_UNITIALIZED_TIME) { - // time not yet set (only happens for first rtp packet) - localStartTimeKvs = twcc->twccPacketBySeqNum[(UINT16) sn].localTimeKvs; - } - for (seqNum = sn; seqNum != twcc->lastReportedSeqNum; seqNum++) { - twccPacket = &twcc->twccPacketBySeqNum[seqNum]; - localEndTimeKvs = twccPacket->localTimeKvs; - duration = localEndTimeKvs - localStartTimeKvs; - sentBytes += twccPacket->packetSize; - sentPackets++; - if (twccPacket->remoteTimeKvs != TWCC_PACKET_LOST_TIME) { - receivedBytes += twccPacket->packetSize; - receivedPackets++; - } - } + pTwccManager = pKvsPeerConnection->pTwccManager; + CHK_STATUS(parseRtcpTwccPacket(pRtcpPacket, pTwccManager)); + + updateTwccHashTable(pTwccManager, &duration, &receivedBytes, &receivedPackets, &sentBytes, &sentPackets); if (duration > 0) { MUTEX_UNLOCK(pKvsPeerConnection->twccLock); diff --git a/src/source/PeerConnection/Rtcp.h b/src/source/PeerConnection/Rtcp.h index 229cebbe3f..ae2e505e02 100644 --- a/src/source/PeerConnection/Rtcp.h +++ b/src/source/PeerConnection/Rtcp.h @@ -12,6 +12,7 @@ STATUS onRtcpRembPacket(PRtcpPacket, PKvsPeerConnection); STATUS onRtcpPLIPacket(PRtcpPacket, PKvsPeerConnection); STATUS parseRtcpTwccPacket(PRtcpPacket, PTwccManager); STATUS onRtcpTwccPacket(PRtcpPacket, PKvsPeerConnection); +STATUS updateTwccHashTable(PTwccManager, PINT64, PUINT64, PUINT64, PUINT64, PUINT64); // https://tools.ietf.org/html/draft-holmer-rmcat-transport-wide-cc-extensions-01 // Deltas are represented as multiples of 250us: diff --git a/src/source/PeerConnection/SessionDescription.h b/src/source/PeerConnection/SessionDescription.h index 63b88ec7fc..a7e5ac4903 100644 --- a/src/source/PeerConnection/SessionDescription.h +++ b/src/source/PeerConnection/SessionDescription.h @@ -81,7 +81,7 @@ extern "C" { // https://tools.ietf.org/html/draft-holmer-rmcat-transport-wide-cc-extensions-01 #define TWCC_SDP_ATTR "transport-cc" -#define TWCC_EXT_URL "http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01" +#define TWCC_EXT_URL (PCHAR) "http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01" #define CODEC_RTPMAP_PAYLOAD_TYPES_HASH_TABLE_BUCKET_LENGTH 2 diff --git a/tst/RtcpFunctionalityTest.cpp b/tst/RtcpFunctionalityTest.cpp index 00d76ad659..7a3cd74da6 100644 --- a/tst/RtcpFunctionalityTest.cpp +++ b/tst/RtcpFunctionalityTest.cpp @@ -295,27 +295,65 @@ TEST_F(RtcpFunctionalityTest, onpli) freePeerConnection(&pRtcPeerConnection); } +static void testBwHandler(UINT64 customData, UINT32 txBytes, UINT32 rxBytes, UINT32 txPacketsCnt, UINT32 rxPacketsCnt, + UINT64 duration) { + UNUSED_PARAM(customData); + UNUSED_PARAM(txBytes); + UNUSED_PARAM(rxBytes); + UNUSED_PARAM(txPacketsCnt); + UNUSED_PARAM(rxPacketsCnt); + UNUSED_PARAM(duration); + return; +} + static void parseTwcc(const std::string& hex, const uint32_t expectedReceived, const uint32_t expectedNotReceived) { + PRtcPeerConnection pRtcPeerConnection = nullptr; + PKvsPeerConnection pKvsPeerConnection; BYTE payload[256] = {0}; UINT32 payloadLen = 256; hexDecode(const_cast(hex.data()), hex.size(), payload, &payloadLen); RtcpPacket rtcpPacket{}; + RtpPacket rtpPacket{}; + RtcConfiguration config{}; + UINT64 value; + UINT16 twsn; + UINT16 i = 0; + UINT32 extpayload, received = 0, lost = 0; + rtcpPacket.header.packetLength = payloadLen / 4; rtcpPacket.payload = payload; rtcpPacket.payloadLength = payloadLen; - TwccManager twcc{}; - - ASSERT_EQ(STATUS_SUCCESS, parseRtcpTwccPacket(&rtcpPacket, &twcc)); - ASSERT_EQ(STATUS_SUCCESS, stackQueueClear(&twcc.twccPackets, FALSE)); - - uint32_t received = 0; - uint32_t lost = 0; - for (const auto& packet : twcc.twccPacketBySeqNum) { - if (packet.remoteTimeKvs == TWCC_PACKET_LOST_TIME) { - lost++; - } else if (packet.remoteTimeKvs != TWCC_PACKET_UNITIALIZED_TIME) { - received++; + + + EXPECT_EQ(STATUS_SUCCESS, createPeerConnection(&config, &pRtcPeerConnection)); + pKvsPeerConnection = reinterpret_cast(pRtcPeerConnection); + EXPECT_EQ(STATUS_SUCCESS, peerConnectionOnSenderBandwidthEstimation(pRtcPeerConnection, 0, + testBwHandler)); + + UINT16 baseSeqNum = getUnalignedInt16BigEndian(rtcpPacket.payload + 8); + UINT16 pktCount = TWCC_PACKET_STATUS_COUNT(rtcpPacket.payload); + + for(i = baseSeqNum; i < baseSeqNum + pktCount; i++) { + rtpPacket.header.extension = TRUE; + rtpPacket.header.extensionProfile = TWCC_EXT_PROFILE; + rtpPacket.header.extensionLength = SIZEOF(UINT32); + twsn = i; + extpayload = TWCC_PAYLOAD(parseExtId(TWCC_EXT_URL), twsn); + rtpPacket.header.extensionPayload = (PBYTE) &extpayload; + EXPECT_EQ(STATUS_SUCCESS, twccManagerOnPacketSent(pKvsPeerConnection, &rtpPacket)); + } + + EXPECT_EQ(STATUS_SUCCESS, parseRtcpTwccPacket(&rtcpPacket, pKvsPeerConnection->pTwccManager)); + + for(i = 0; i < MAX_UINT16; i++) { + if(STATUS_SUCCEEDED(hashTableGet(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable, i, &value))) { + PTwccRtpPacketInfo tempTwccRtpPktInfo = (PTwccRtpPacketInfo) value; + if(tempTwccRtpPktInfo->remoteTimeKvs == TWCC_PACKET_LOST_TIME) { + lost++; + } else if (tempTwccRtpPktInfo->remoteTimeKvs != TWCC_PACKET_UNITIALIZED_TIME) { + received++; + } } } @@ -323,9 +361,10 @@ static void parseTwcc(const std::string& hex, const uint32_t expectedReceived, c EXPECT_EQ(expectedReceived + expectedNotReceived, TWCC_PACKET_STATUS_COUNT(rtcpPacket.payload)); EXPECT_EQ(expectedReceived, received); EXPECT_EQ(expectedNotReceived, lost); + EXPECT_EQ(STATUS_SUCCESS, freePeerConnection(&pRtcPeerConnection)); } -TEST_F(RtcpFunctionalityTest, twcc3) +TEST_F(RtcpFunctionalityTest, twccParsePacketTest) { parseTwcc("", 0, 0); parseTwcc("4487A9E754B3E6FD01810001147A75A62001C801", 1, 0); @@ -365,6 +404,73 @@ TEST_F(RtcpFunctionalityTest, twcc3) parseTwcc("4487A9E754B3E6FD04B60036147CAA852024C002D999D6407800000000000000000000000000040000000000000000", 43, 11); parseTwcc("4487A9E754B3E6FD040200E4147C9F81202700B7E6649000000000000000000004000000000008000018000000001", 43, 185); } + +TEST_F(RtcpFunctionalityTest, updateTwccHashTableTest) +{ + PRtcPeerConnection pRtcPeerConnection = NULL; + PKvsPeerConnection pKvsPeerConnection = NULL; + RtcConfiguration config{}; + UINT64 receivedBytes = 0, receivedPackets = 0, sentBytes = 0, sentPackets = 0; + INT64 duration = 0; + PTwccRtpPacketInfo pTwccRtpPacketInfo = NULL; + PHashTable pTwccRtpPktInfosHashTable = NULL; + UINT16 hashTableInsertionCount = 0; + UINT16 lowerBound = UINT16_MAX - 3; + UINT16 upperBound = 3; + UINT16 i = 0; + + // Initialize structs and members. + EXPECT_EQ(STATUS_SUCCESS, createPeerConnection(&config, &pRtcPeerConnection)); + pKvsPeerConnection = reinterpret_cast(pRtcPeerConnection); + EXPECT_EQ(STATUS_SUCCESS, peerConnectionOnSenderBandwidthEstimation(pRtcPeerConnection, 0, testBwHandler)); + + // Grab the hash table. + pTwccRtpPktInfosHashTable = pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable; + + pKvsPeerConnection->pTwccManager->prevReportedBaseSeqNum = lowerBound; + pKvsPeerConnection->pTwccManager->lastReportedSeqNum = upperBound + 10; + + // Breakup the packet indexes to be across the max int overflow. + for (i = lowerBound; i <= UINT16_MAX && i != 0 ; i++) + { + pTwccRtpPacketInfo = (PTwccRtpPacketInfo) MEMCALLOC(1, SIZEOF(TwccRtpPacketInfo)); + EXPECT_EQ(STATUS_SUCCESS, hashTableUpsert(pTwccRtpPktInfosHashTable, i, (UINT64) pTwccRtpPacketInfo)); + hashTableInsertionCount++; + } + for (i = 0; i < upperBound; i++) + { + pTwccRtpPacketInfo = (PTwccRtpPacketInfo) MEMCALLOC(1, SIZEOF(TwccRtpPacketInfo)); + EXPECT_EQ(STATUS_SUCCESS, hashTableUpsert(pTwccRtpPktInfosHashTable, i, (UINT64) pTwccRtpPacketInfo)); + hashTableInsertionCount++; + } + + // Add at a non-monotonically-increased index. + pTwccRtpPacketInfo = (PTwccRtpPacketInfo) MEMCALLOC(1, SIZEOF(TwccRtpPacketInfo)); + EXPECT_EQ(STATUS_SUCCESS, hashTableUpsert(pTwccRtpPktInfosHashTable, upperBound + 10, (UINT64) pTwccRtpPacketInfo)); + hashTableInsertionCount++; + + // Validate hash table size after and before updating (onRtcpTwccPacket case). + EXPECT_EQ(hashTableInsertionCount, pTwccRtpPktInfosHashTable->itemCount); + EXPECT_EQ(STATUS_SUCCESS, updateTwccHashTable(pKvsPeerConnection->pTwccManager, &duration, &receivedBytes, &receivedPackets, &sentBytes, &sentPackets)); + EXPECT_EQ(0, pTwccRtpPktInfosHashTable->itemCount); + + hashTableInsertionCount = 0; + pTwccRtpPacketInfo = NULL; + for (i = 0; i <= upperBound; i++) + { + EXPECT_EQ(STATUS_SUCCESS, hashTableUpsert(pTwccRtpPktInfosHashTable, i, (UINT64) pTwccRtpPacketInfo)); + hashTableInsertionCount++; + } + EXPECT_EQ(hashTableInsertionCount, pTwccRtpPktInfosHashTable->itemCount); + EXPECT_EQ(STATUS_SUCCESS, updateTwccHashTable(pKvsPeerConnection->pTwccManager, &duration, &receivedBytes, &receivedPackets, &sentBytes, &sentPackets)); + EXPECT_EQ(0, pTwccRtpPktInfosHashTable->itemCount); + + MUTEX_LOCK(pKvsPeerConnection->twccLock); + MUTEX_UNLOCK(pKvsPeerConnection->twccLock); + + EXPECT_EQ(STATUS_SUCCESS, freePeerConnection(&pRtcPeerConnection)); +} + } // namespace webrtcclient } // namespace video } // namespace kinesis