From afcf43ed20752ac97d64dde786a73eb968d96d0a Mon Sep 17 00:00:00 2001 From: Divya Sampath Kumar Date: Thu, 21 Mar 2024 11:12:50 -0700 Subject: [PATCH 01/16] Refactor TWCC (#1934) * Remove stackqueue usage * Use hashtable instead..working logic * Cleanup, increase hash table size, fix loop bug * method 2, array of pointers * Add rolling window * rolling window with hashtable * hash table with rw * Fix bug * Fix twcc unit test * Cleanup rw logic * Cleanup * Cleanup logic * Update README * unused var fix * Use defines for hash table size * Address comments, disable TWCC by default * readme * Fix windows gst issue * Comments --- samples/Common.c | 2 + src/source/PeerConnection/PeerConnection.c | 131 ++++++++++++----- src/source/PeerConnection/PeerConnection.h | 18 +-- src/source/PeerConnection/Rtcp.c | 133 +++++++++++++----- .../PeerConnection/SessionDescription.h | 2 +- tst/RtcpFunctionalityTest.cpp | 65 +++++++-- 6 files changed, 261 insertions(+), 90 deletions(-) diff --git a/samples/Common.c b/samples/Common.c index 35bc2c3b80..eabfb2d30c 100644 --- a/samples/Common.c +++ b/samples/Common.c @@ -365,6 +365,8 @@ STATUS initializePeerConnection(PSampleConfiguration pSampleConfiguration, PRtcP // Set the ICE mode explicitly configuration.iceTransportPolicy = ICE_TRANSPORT_POLICY_ALL; + configuration.kvsRtcConfiguration.enableIceStats = pSampleConfiguration->enableIceStats; + configuration.kvsRtcConfiguration.disableSenderSideBandwidthEstimation = TRUE; // Set the STUN server PCHAR pKinesisVideoStunUrlPostFix = KINESIS_VIDEO_STUN_URL_POSTFIX; // If region is in CN, add CN region uri postfix diff --git a/src/source/PeerConnection/PeerConnection.c b/src/source/PeerConnection/PeerConnection.c index 2aee95885d..c2fae98d6f 100644 --- a/src/source/PeerConnection/PeerConnection.c +++ b/src/source/PeerConnection/PeerConnection.c @@ -1005,6 +1005,8 @@ STATUS createPeerConnection(PRtcConfiguration pConfiguration, PRtcPeerConnection pKvsPeerConnection->twccLock = MUTEX_CREATE(TRUE); pKvsPeerConnection->pTwccManager = (PTwccManager) MEMCALLOC(1, SIZEOF(TwccManager)); CHK(pKvsPeerConnection->pTwccManager != NULL, STATUS_NOT_ENOUGH_MEMORY); + CHK_STATUS(hashTableCreateWithParams(TWCC_HASH_TABLE_BUCKET_COUNT, TWCC_HASH_TABLE_BUCKET_LENGTH, + &pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable)); } *ppPeerConnection = (PRtcPeerConnection) pKvsPeerConnection; @@ -1041,10 +1043,12 @@ STATUS freePeerConnection(PRtcPeerConnection* ppPeerConnection) { ENTERS(); STATUS retStatus = STATUS_SUCCESS; - PKvsPeerConnection pKvsPeerConnection; + PKvsPeerConnection pKvsPeerConnection = NULL; PDoubleListNode pCurNode = NULL; UINT64 item = 0; UINT64 startTime; + UINT32 twccHashTableCount = 0; + BOOL twccLocked = FALSE; CHK(ppPeerConnection != NULL, STATUS_NULL_ARG); @@ -1113,12 +1117,21 @@ STATUS freePeerConnection(PRtcPeerConnection* ppPeerConnection) } if (pKvsPeerConnection->pTwccManager != NULL) { + MUTEX_LOCK(pKvsPeerConnection->twccLock); + twccLocked = TRUE; + if (STATUS_SUCCEEDED(hashTableGetCount(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable, &twccHashTableCount))) { + DLOGI("Number of TWCC info packets in memory: %d", twccHashTableCount); + } + CHK_LOG_ERR(hashTableIterateEntries(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable, 0, freeHashEntry)); + CHK_LOG_ERR(hashTableFree(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable)); if (IS_VALID_MUTEX_VALUE(pKvsPeerConnection->twccLock)) { + if (twccLocked) { + MUTEX_UNLOCK(pKvsPeerConnection->twccLock); + twccLocked = FALSE; + } MUTEX_FREE(pKvsPeerConnection->twccLock); + pKvsPeerConnection->twccLock = INVALID_MUTEX_VALUE; } - // twccManager.twccPackets contains sequence numbers of packets (as opposed to pointers to actual packets) - // we should not deallocate items but we do need to clear the queue - CHK_LOG_ERR(stackQueueClear(&pKvsPeerConnection->pTwccManager->twccPackets, FALSE)); SAFE_MEMFREE(pKvsPeerConnection->pTwccManager); } @@ -1127,8 +1140,17 @@ STATUS freePeerConnection(PRtcPeerConnection* ppPeerConnection) PROFILE_WITH_START_TIME_OBJ(startTime, pKvsPeerConnection->peerConnectionDiagnostics.freePeerConnectionTime, "Free peer connection"); SAFE_MEMFREE(*ppPeerConnection); + ppPeerConnection = NULL; CleanUp: - + if (ppPeerConnection != NULL) { + if (IS_VALID_MUTEX_VALUE(pKvsPeerConnection->twccLock)) { + if (twccLocked) { + MUTEX_UNLOCK(pKvsPeerConnection->twccLock); + twccLocked = FALSE; + } + MUTEX_FREE(pKvsPeerConnection->twccLock); + } + } LEAVES(); return retStatus; } @@ -1826,47 +1848,88 @@ STATUS deinitKvsWebRtc(VOID) return retStatus; } -STATUS twccManagerOnPacketSent(PKvsPeerConnection pc, PRtpPacket pRtpPacket) +// Not thread safe. Ensure this function is invoked in a guarded section +static STATUS twccRollingWindowDeletion(PKvsPeerConnection pKvsPeerConnection, PRtpPacket pRtpPacket, UINT16 seqNum) +{ + ENTERS(); + STATUS retStatus = STATUS_SUCCESS; + UINT16 updatedSeqNum = 0; + PTwccRtpPacketInfo tempTwccRtpPktInfo = NULL; + UINT64 ageOfOldest = 0, firstRtpTime = 0; + UINT64 twccPacketValue = 0; + BOOL isCheckComplete = FALSE; + + CHK(pKvsPeerConnection != NULL && pRtpPacket != NULL && pKvsPeerConnection->pTwccManager != NULL, STATUS_NULL_ARG); + + updatedSeqNum = pKvsPeerConnection->pTwccManager->firstSeqNumInRollingWindow; + do { + // If the seqNum is not present in the hash table, it is ok. We move on to the next + if (STATUS_SUCCEEDED(hashTableGet(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable, updatedSeqNum, &twccPacketValue))) { + tempTwccRtpPktInfo = (PTwccRtpPacketInfo) twccPacketValue; + } + if (tempTwccRtpPktInfo != NULL) { + firstRtpTime = tempTwccRtpPktInfo->localTimeKvs; + // Would be the case if the timestamps are not monotonically increasing. + if (pRtpPacket->sentTime >= firstRtpTime) { + ageOfOldest = pRtpPacket->sentTime - firstRtpTime; + if (ageOfOldest > TWCC_ESTIMATOR_TIME_WINDOW) { + // If the seqNum is not present in the hash table, move on. However, this case should not happen + // given this function is holding the lock and tempTwccRtpPktInfo is populated because it exists + if (STATUS_SUCCEEDED(hashTableRemove(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable, updatedSeqNum))) { + SAFE_MEMFREE(tempTwccRtpPktInfo); + } + updatedSeqNum++; + } else { + isCheckComplete = TRUE; + } + } else { + // Move to the next seqNum to check if we can remove the next one atleast + DLOGV("Detected timestamp not increasing monotonically for RTP packet %d [ts: %" PRIu64 ". Current RTP packets' ts: %" PRIu64, + updatedSeqNum, firstRtpTime, pRtpPacket->sentTime); + updatedSeqNum++; + } + } else { + updatedSeqNum++; + } + // reset before next iteration + tempTwccRtpPktInfo = NULL; + } while (!isCheckComplete && updatedSeqNum != (seqNum + 1)); + + // Update regardless. The loop checks until current RTP packets seq number irrespective of the failure + pKvsPeerConnection->pTwccManager->firstSeqNumInRollingWindow = updatedSeqNum; +CleanUp: + LEAVES(); + return retStatus; +} + +STATUS twccManagerOnPacketSent(PKvsPeerConnection pKvsPeerConnection, PRtpPacket pRtpPacket) { ENTERS(); STATUS retStatus = STATUS_SUCCESS; BOOL locked = FALSE; - UINT64 sn = 0; - UINT16 seqNum; - BOOL isEmpty = FALSE; - INT64 firstTimeKvs, lastLocalTimeKvs, ageOfOldest; - CHK(pc != NULL && pRtpPacket != NULL, STATUS_NULL_ARG); - CHK(pc->onSenderBandwidthEstimation != NULL && pc->pTwccManager != NULL, STATUS_SUCCESS); + UINT16 seqNum = 0; + PTwccRtpPacketInfo pTwccRtpPktInfo = NULL; + + CHK(pKvsPeerConnection != NULL && pRtpPacket != NULL, STATUS_NULL_ARG); + CHK(pKvsPeerConnection->onSenderBandwidthEstimation != NULL && pKvsPeerConnection->pTwccManager != NULL, STATUS_SUCCESS); CHK(TWCC_EXT_PROFILE == pRtpPacket->header.extensionProfile, STATUS_SUCCESS); - MUTEX_LOCK(pc->twccLock); + MUTEX_LOCK(pKvsPeerConnection->twccLock); locked = TRUE; + CHK((pTwccRtpPktInfo = MEMCALLOC(1, SIZEOF(TwccRtpPacketInfo))) != NULL, STATUS_NOT_ENOUGH_MEMORY); + + pTwccRtpPktInfo->packetSize = pRtpPacket->payloadLength; + pTwccRtpPktInfo->localTimeKvs = pRtpPacket->sentTime; + pTwccRtpPktInfo->remoteTimeKvs = TWCC_PACKET_LOST_TIME; seqNum = TWCC_SEQNUM(pRtpPacket->header.extensionPayload); - CHK_STATUS(stackQueueEnqueue(&pc->pTwccManager->twccPackets, seqNum)); - pc->pTwccManager->twccPacketBySeqNum[seqNum].seqNum = seqNum; - pc->pTwccManager->twccPacketBySeqNum[seqNum].packetSize = pRtpPacket->payloadLength; - pc->pTwccManager->twccPacketBySeqNum[seqNum].localTimeKvs = pRtpPacket->sentTime; - pc->pTwccManager->twccPacketBySeqNum[seqNum].remoteTimeKvs = TWCC_PACKET_LOST_TIME; - pc->pTwccManager->lastLocalTimeKvs = pRtpPacket->sentTime; - - // cleanup queue until it contains up to 2 seconds of sent packets - do { - CHK_STATUS(stackQueuePeek(&pc->pTwccManager->twccPackets, &sn)); - firstTimeKvs = pc->pTwccManager->twccPacketBySeqNum[(UINT16) sn].localTimeKvs; - lastLocalTimeKvs = pRtpPacket->sentTime; - ageOfOldest = lastLocalTimeKvs - firstTimeKvs; - if (ageOfOldest > TWCC_ESTIMATOR_TIME_WINDOW) { - CHK_STATUS(stackQueueDequeue(&pc->pTwccManager->twccPackets, &sn)); - CHK_STATUS(stackQueueIsEmpty(&pc->pTwccManager->twccPackets, &isEmpty)); - } else { - break; - } - } while (!isEmpty); + CHK_STATUS(hashTableUpsert(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable, seqNum, (UINT64) pTwccRtpPktInfo)); + // Ensure twccRollingWindowDeletion is run in a guarded section + CHK_STATUS(twccRollingWindowDeletion(pKvsPeerConnection, pRtpPacket, seqNum)); CleanUp: if (locked) { - MUTEX_UNLOCK(pc->twccLock); + MUTEX_UNLOCK(pKvsPeerConnection->twccLock); } CHK_LOG_ERR(retStatus); diff --git a/src/source/PeerConnection/PeerConnection.h b/src/source/PeerConnection/PeerConnection.h index 79d5d85085..7311ec9794 100644 --- a/src/source/PeerConnection/PeerConnection.h +++ b/src/source/PeerConnection/PeerConnection.h @@ -31,6 +31,8 @@ extern "C" { #define CODEC_HASH_TABLE_BUCKET_LENGTH 2 #define RTX_HASH_TABLE_BUCKET_COUNT 50 #define RTX_HASH_TABLE_BUCKET_LENGTH 2 +#define TWCC_HASH_TABLE_BUCKET_COUNT 100 +#define TWCC_HASH_TABLE_BUCKET_LENGTH 2 #define DATA_CHANNEL_HASH_TABLE_BUCKET_COUNT 200 #define DATA_CHANNEL_HASH_TABLE_BUCKET_LENGTH 2 @@ -47,17 +49,16 @@ typedef enum { } RTX_CODEC; typedef struct { - UINT16 seqNum; - UINT16 packetSize; UINT64 localTimeKvs; UINT64 remoteTimeKvs; -} TwccPacket, *PTwccPacket; + UINT32 packetSize; +} TwccRtpPacketInfo, *PTwccRtpPacketInfo; typedef struct { - StackQueue twccPackets; - TwccPacket twccPacketBySeqNum[65536]; // twccPacketBySeqNum takes about 1.2MB of RAM but provides great cache locality - UINT64 lastLocalTimeKvs; - UINT16 lastReportedSeqNum; + PHashTable pTwccRtpPktInfosHashTable; // Hash table of [seqNum, PTwccPacket] + UINT16 firstSeqNumInRollingWindow; // To monitor the last deleted packet in the rolling window + UINT16 lastReportedSeqNum; // To monitor the last packet's seqNum in the TWCC response + UINT16 prevReportedBaseSeqNum; // To monitor the base seqNum in the TWCC response } TwccManager, *PTwccManager; typedef struct { @@ -70,7 +71,7 @@ typedef struct { typedef struct { RtcPeerConnection peerConnection; - // UINT32 padding padding makes transportWideSequenceNumber 64bit aligned + // UINT32 padding makes transportWideSequenceNumber 64bit aligned // we put atomics at the top of structs because customers application could set the packing to 0 // in which case any atomic operations would result in bus errors if there is a misalignment // for more see https://github.com/awslabs/amazon-kinesis-video-streams-webrtc-sdk-c/pull/987#discussion_r534432907 @@ -183,6 +184,7 @@ VOID onSctpSessionDataChannelOpen(UINT64, UINT32, PBYTE, UINT32); STATUS sendPacketToRtpReceiver(PKvsPeerConnection, PBYTE, UINT32); STATUS changePeerConnectionState(PKvsPeerConnection, RTC_PEER_CONNECTION_STATE); STATUS twccManagerOnPacketSent(PKvsPeerConnection, PRtpPacket); +UINT32 parseExtId(PCHAR); // visible for testing only VOID onIceConnectionStateChange(UINT64, UINT64); diff --git a/src/source/PeerConnection/Rtcp.c b/src/source/PeerConnection/Rtcp.c index 0279299467..d10f2a8cfb 100644 --- a/src/source/PeerConnection/Rtcp.c +++ b/src/source/PeerConnection/Rtcp.c @@ -185,11 +185,13 @@ STATUS parseRtcpTwccPacket(PRtcpPacket pRtcpPacket, PTwccManager pTwccManager) UINT32 statuses; UINT32 i; UINT64 referenceTime; + PTwccRtpPacketInfo pTwccPacket = NULL; + UINT64 twccPktValue = 0; CHK(pTwccManager != NULL && pRtcpPacket != NULL, STATUS_NULL_ARG); baseSeqNum = getUnalignedInt16BigEndian(pRtcpPacket->payload + 8); + pTwccManager->prevReportedBaseSeqNum = baseSeqNum; packetStatusCount = TWCC_PACKET_STATUS_COUNT(pRtcpPacket->payload); - referenceTime = (pRtcpPacket->payload[12] << 16) | (pRtcpPacket->payload[13] << 8) | (pRtcpPacket->payload[14] & 0xff); referenceTime = KVS_CONVERT_TIMESCALE(referenceTime * 64, MILLISECONDS_PER_SECOND, HUNDREDS_OF_NANOS_IN_A_SECOND); // TODO: handle lost twcc report packets @@ -205,7 +207,6 @@ STATUS parseRtcpTwccPacket(PRtcpPacket pRtcpPacket, PTwccManager pTwccManager) } chunkOffset += TWCC_FB_PACKETCHUNK_SIZE; } - recvOffset = chunkOffset; chunkOffset = 16; packetSeqNum = baseSeqNum; @@ -227,7 +228,14 @@ STATUS parseRtcpTwccPacket(PRtcpPacket pRtcpPacket, PTwccManager pTwccManager) break; case TWCC_STATUS_SYMBOL_NOTRECEIVED: DLOGS("runLength packetSeqNum %u not received %lu", packetSeqNum, referenceTime); - pTwccManager->twccPacketBySeqNum[packetSeqNum].remoteTimeKvs = TWCC_PACKET_LOST_TIME; + // If it does not exist it means the packet was already visited + if (STATUS_SUCCEEDED(hashTableGet(pTwccManager->pTwccRtpPktInfosHashTable, packetSeqNum, &twccPktValue))) { + pTwccPacket = (PTwccRtpPacketInfo) twccPktValue; + if (pTwccPacket != NULL) { + pTwccPacket->remoteTimeKvs = TWCC_PACKET_LOST_TIME; + CHK_STATUS(hashTableUpsert(pTwccManager->pTwccRtpPktInfosHashTable, packetSeqNum, (UINT64) pTwccPacket)); + } + } pTwccManager->lastReportedSeqNum = packetSeqNum; break; default: @@ -236,11 +244,21 @@ STATUS parseRtcpTwccPacket(PRtcpPacket pRtcpPacket, PTwccManager pTwccManager) if (recvDelta != MIN_INT16) { referenceTime += KVS_CONVERT_TIMESCALE(recvDelta, TWCC_TICKS_PER_SECOND, HUNDREDS_OF_NANOS_IN_A_SECOND); DLOGS("runLength packetSeqNum %u received %lu", packetSeqNum, referenceTime); - pTwccManager->twccPacketBySeqNum[packetSeqNum].remoteTimeKvs = referenceTime; + + // If it does not exist it means the packet was already visited + if (STATUS_SUCCEEDED(hashTableGet(pTwccManager->pTwccRtpPktInfosHashTable, packetSeqNum, &twccPktValue))) { + pTwccPacket = (PTwccRtpPacketInfo) twccPktValue; + if (pTwccPacket != NULL) { + pTwccPacket->remoteTimeKvs = referenceTime; + CHK_STATUS(hashTableUpsert(pTwccManager->pTwccRtpPktInfosHashTable, packetSeqNum, (UINT64) pTwccPacket)); + } + } pTwccManager->lastReportedSeqNum = packetSeqNum; } packetSeqNum++; packetsRemaining--; + // Reset to NULL before next iteration + pTwccPacket = NULL; } } else { statuses = MIN(TWCC_STATUSVECTOR_COUNT(packetChunk), packetsRemaining); @@ -259,7 +277,14 @@ STATUS parseRtcpTwccPacket(PRtcpPacket pRtcpPacket, PTwccManager pTwccManager) break; case TWCC_STATUS_SYMBOL_NOTRECEIVED: DLOGS("statusVector packetSeqNum %u not received %lu", packetSeqNum, referenceTime); - pTwccManager->twccPacketBySeqNum[packetSeqNum].remoteTimeKvs = TWCC_PACKET_LOST_TIME; + // If it does not exist it means the packet was already visited + if (STATUS_SUCCEEDED(hashTableGet(pTwccManager->pTwccRtpPktInfosHashTable, packetSeqNum, &twccPktValue))) { + pTwccPacket = (PTwccRtpPacketInfo) twccPktValue; + if (pTwccPacket != NULL) { + pTwccPacket->remoteTimeKvs = TWCC_PACKET_LOST_TIME; + CHK_STATUS(hashTableUpsert(pTwccManager->pTwccRtpPktInfosHashTable, packetSeqNum, (UINT64) pTwccPacket)); + } + } pTwccManager->lastReportedSeqNum = packetSeqNum; break; default: @@ -268,16 +293,25 @@ STATUS parseRtcpTwccPacket(PRtcpPacket pRtcpPacket, PTwccManager pTwccManager) if (recvDelta != MIN_INT16) { referenceTime += KVS_CONVERT_TIMESCALE(recvDelta, TWCC_TICKS_PER_SECOND, HUNDREDS_OF_NANOS_IN_A_SECOND); DLOGS("statusVector packetSeqNum %u received %lu", packetSeqNum, referenceTime); - pTwccManager->twccPacketBySeqNum[packetSeqNum].remoteTimeKvs = referenceTime; + // If it does not exist it means the packet was already visited + if (STATUS_SUCCEEDED(hashTableGet(pTwccManager->pTwccRtpPktInfosHashTable, packetSeqNum, &twccPktValue))) { + pTwccPacket = (PTwccRtpPacketInfo) twccPktValue; + if (pTwccPacket != NULL) { + pTwccPacket->remoteTimeKvs = referenceTime; + CHK_STATUS(hashTableUpsert(pTwccManager->pTwccRtpPktInfosHashTable, packetSeqNum, (UINT64) pTwccPacket)); + } + } pTwccManager->lastReportedSeqNum = packetSeqNum; } packetSeqNum++; packetsRemaining--; + // Reset to NULL before next iteration + pTwccPacket = NULL; } } chunkOffset += TWCC_FB_PACKETCHUNK_SIZE; } - + DLOGV("Checking seqNum %d to %d of TWCC reports", baseSeqNum, pTwccManager->lastReportedSeqNum); CleanUp: CHK_LOG_ERR(retStatus); return retStatus; @@ -286,44 +320,75 @@ STATUS parseRtcpTwccPacket(PRtcpPacket pRtcpPacket, PTwccManager pTwccManager) STATUS onRtcpTwccPacket(PRtcpPacket pRtcpPacket, PKvsPeerConnection pKvsPeerConnection) { STATUS retStatus = STATUS_SUCCESS; - PTwccManager twcc; + PTwccManager pTwccManager = NULL; BOOL locked = FALSE; - BOOL empty = TRUE; - UINT64 sn = 0; - INT64 ageOfOldestPacket; + UINT16 baseSeqNum = 0; UINT64 localStartTimeKvs, localEndTimeKvs; UINT64 sentBytes = 0, receivedBytes = 0; UINT64 sentPackets = 0, receivedPackets = 0; INT64 duration = 0; - UINT16 seqNum; - PTwccPacket twccPacket; + UINT16 seqNum = 0; + PTwccRtpPacketInfo pTwccPacket = NULL; + UINT64 twccPktValue = 0; + BOOL localStartTimeRecorded = FALSE; CHK(pKvsPeerConnection != NULL && pRtcpPacket != NULL, STATUS_NULL_ARG); CHK(pKvsPeerConnection->onSenderBandwidthEstimation != NULL && pKvsPeerConnection->pTwccManager != NULL, STATUS_SUCCESS); MUTEX_LOCK(pKvsPeerConnection->twccLock); locked = TRUE; - twcc = pKvsPeerConnection->pTwccManager; - CHK_STATUS(parseRtcpTwccPacket(pRtcpPacket, twcc)); - CHK_STATUS(stackQueueIsEmpty(&twcc->twccPackets, &empty)); - CHK(!empty, STATUS_SUCCESS); - CHK_STATUS(stackQueuePeek(&twcc->twccPackets, &sn)); - ageOfOldestPacket = twcc->lastLocalTimeKvs - twcc->twccPacketBySeqNum[(UINT16) sn].localTimeKvs; - CHK(ageOfOldestPacket > TWCC_ESTIMATOR_TIME_WINDOW / 2, STATUS_SUCCESS); - localStartTimeKvs = twcc->twccPacketBySeqNum[(UINT16) (sn - 1)].localTimeKvs; - if (localStartTimeKvs == TWCC_PACKET_UNITIALIZED_TIME) { - // time not yet set (only happens for first rtp packet) - localStartTimeKvs = twcc->twccPacketBySeqNum[(UINT16) sn].localTimeKvs; - } - for (seqNum = sn; seqNum != twcc->lastReportedSeqNum; seqNum++) { - twccPacket = &twcc->twccPacketBySeqNum[seqNum]; - localEndTimeKvs = twccPacket->localTimeKvs; - duration = localEndTimeKvs - localStartTimeKvs; - sentBytes += twccPacket->packetSize; - sentPackets++; - if (twccPacket->remoteTimeKvs != TWCC_PACKET_LOST_TIME) { - receivedBytes += twccPacket->packetSize; - receivedPackets++; + pTwccManager = pKvsPeerConnection->pTwccManager; + CHK_STATUS(parseRtcpTwccPacket(pRtcpPacket, pTwccManager)); + baseSeqNum = pTwccManager->prevReportedBaseSeqNum; + + // Use != instead to cover the case where the group of sequence numbers being checked + // are trending towards MAX_UINT16 and rolling over to 0+, example range [65534, 10] + // We also check for twcc->lastReportedSeqNum + 1 to include the last seq number in the + // report. Without this, we do not check for the seqNum that could cause it to not be cleared + // from memory + for (seqNum = baseSeqNum; seqNum != (pTwccManager->lastReportedSeqNum + 1); seqNum++) { + if (!localStartTimeRecorded) { + // This could happen if the prev packet was deleted as part of rolling window or if there + // is an overlap of RTP packet statuses between TWCC packets. This could also fail if it is + // the first ever packet (seqNum 0) + if (hashTableGet(pTwccManager->pTwccRtpPktInfosHashTable, seqNum - 1, &twccPktValue) == STATUS_HASH_KEY_NOT_PRESENT) { + localStartTimeKvs = TWCC_PACKET_UNITIALIZED_TIME; + } else { + pTwccPacket = (PTwccRtpPacketInfo) twccPktValue; + if (pTwccPacket != NULL) { + localStartTimeKvs = pTwccPacket->localTimeKvs; + localStartTimeRecorded = TRUE; + } + } + if (localStartTimeKvs == TWCC_PACKET_UNITIALIZED_TIME) { + // time not yet set. If prev seqNum was deleted + if (STATUS_SUCCEEDED(hashTableGet(pTwccManager->pTwccRtpPktInfosHashTable, seqNum, &twccPktValue))) { + pTwccPacket = (PTwccRtpPacketInfo) twccPktValue; + if (pTwccPacket != NULL) { + localStartTimeKvs = pTwccPacket->localTimeKvs; + localStartTimeRecorded = TRUE; + } + } + } + } + + // The time it would not succeed is if there is an overlap in the RTP packet status between the TWCC + // packets + if (STATUS_SUCCEEDED(hashTableGet(pTwccManager->pTwccRtpPktInfosHashTable, seqNum, &twccPktValue))) { + pTwccPacket = (PTwccRtpPacketInfo) twccPktValue; + if (pTwccPacket != NULL) { + localEndTimeKvs = pTwccPacket->localTimeKvs; + duration = localEndTimeKvs - localStartTimeKvs; + sentBytes += pTwccPacket->packetSize; + sentPackets++; + if (pTwccPacket->remoteTimeKvs != TWCC_PACKET_LOST_TIME) { + receivedBytes += pTwccPacket->packetSize; + receivedPackets++; + if (STATUS_SUCCEEDED(hashTableRemove(pTwccManager->pTwccRtpPktInfosHashTable, seqNum))) { + SAFE_MEMFREE(pTwccPacket); + } + } + } } } diff --git a/src/source/PeerConnection/SessionDescription.h b/src/source/PeerConnection/SessionDescription.h index 63b88ec7fc..a7e5ac4903 100644 --- a/src/source/PeerConnection/SessionDescription.h +++ b/src/source/PeerConnection/SessionDescription.h @@ -81,7 +81,7 @@ extern "C" { // https://tools.ietf.org/html/draft-holmer-rmcat-transport-wide-cc-extensions-01 #define TWCC_SDP_ATTR "transport-cc" -#define TWCC_EXT_URL "http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01" +#define TWCC_EXT_URL (PCHAR) "http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01" #define CODEC_RTPMAP_PAYLOAD_TYPES_HASH_TABLE_BUCKET_LENGTH 2 diff --git a/tst/RtcpFunctionalityTest.cpp b/tst/RtcpFunctionalityTest.cpp index 00d76ad659..8c3269e946 100644 --- a/tst/RtcpFunctionalityTest.cpp +++ b/tst/RtcpFunctionalityTest.cpp @@ -295,27 +295,65 @@ TEST_F(RtcpFunctionalityTest, onpli) freePeerConnection(&pRtcPeerConnection); } +static void testBwHandler(UINT64 customData, UINT32 txBytes, UINT32 rxBytes, UINT32 txPacketsCnt, UINT32 rxPacketsCnt, + UINT64 duration) { + UNUSED_PARAM(customData); + UNUSED_PARAM(txBytes); + UNUSED_PARAM(rxBytes); + UNUSED_PARAM(txPacketsCnt); + UNUSED_PARAM(rxPacketsCnt); + UNUSED_PARAM(duration); + return; +} + static void parseTwcc(const std::string& hex, const uint32_t expectedReceived, const uint32_t expectedNotReceived) { + PRtcPeerConnection pRtcPeerConnection = nullptr; + PKvsPeerConnection pKvsPeerConnection; BYTE payload[256] = {0}; UINT32 payloadLen = 256; hexDecode(const_cast(hex.data()), hex.size(), payload, &payloadLen); RtcpPacket rtcpPacket{}; + RtpPacket rtpPacket{}; + RtcConfiguration config{}; + UINT64 value; + UINT16 twsn; + UINT16 i = 0; + UINT32 extpayload, received = 0, lost = 0; + rtcpPacket.header.packetLength = payloadLen / 4; rtcpPacket.payload = payload; rtcpPacket.payloadLength = payloadLen; - TwccManager twcc{}; - - ASSERT_EQ(STATUS_SUCCESS, parseRtcpTwccPacket(&rtcpPacket, &twcc)); - ASSERT_EQ(STATUS_SUCCESS, stackQueueClear(&twcc.twccPackets, FALSE)); - - uint32_t received = 0; - uint32_t lost = 0; - for (const auto& packet : twcc.twccPacketBySeqNum) { - if (packet.remoteTimeKvs == TWCC_PACKET_LOST_TIME) { - lost++; - } else if (packet.remoteTimeKvs != TWCC_PACKET_UNITIALIZED_TIME) { - received++; + + + EXPECT_EQ(STATUS_SUCCESS, createPeerConnection(&config, &pRtcPeerConnection)); + pKvsPeerConnection = reinterpret_cast(pRtcPeerConnection); + EXPECT_EQ(STATUS_SUCCESS, peerConnectionOnSenderBandwidthEstimation(pRtcPeerConnection, 0, + testBwHandler)); + + UINT16 baseSeqNum = getUnalignedInt16BigEndian(rtcpPacket.payload + 8); + UINT16 pktCount = TWCC_PACKET_STATUS_COUNT(rtcpPacket.payload); + + for(i = baseSeqNum; i < baseSeqNum + pktCount; i++) { + rtpPacket.header.extension = TRUE; + rtpPacket.header.extensionProfile = TWCC_EXT_PROFILE; + rtpPacket.header.extensionLength = SIZEOF(UINT32); + twsn = i; + extpayload = TWCC_PAYLOAD(parseExtId(TWCC_EXT_URL), twsn); + rtpPacket.header.extensionPayload = (PBYTE) &extpayload; + EXPECT_EQ(STATUS_SUCCESS, twccManagerOnPacketSent(pKvsPeerConnection, &rtpPacket)); + } + + EXPECT_EQ(STATUS_SUCCESS, parseRtcpTwccPacket(&rtcpPacket, pKvsPeerConnection->pTwccManager)); + + for(i = 0; i < MAX_UINT16; i++) { + if(hashTableGet(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable, i, &value) == STATUS_SUCCESS) { + PTwccRtpPacketInfo tempTwccRtpPktInfo = (PTwccRtpPacketInfo) value; + if(tempTwccRtpPktInfo->remoteTimeKvs == TWCC_PACKET_LOST_TIME) { + lost++; + } else if (tempTwccRtpPktInfo->remoteTimeKvs != TWCC_PACKET_UNITIALIZED_TIME) { + received++; + } } } @@ -323,9 +361,10 @@ static void parseTwcc(const std::string& hex, const uint32_t expectedReceived, c EXPECT_EQ(expectedReceived + expectedNotReceived, TWCC_PACKET_STATUS_COUNT(rtcpPacket.payload)); EXPECT_EQ(expectedReceived, received); EXPECT_EQ(expectedNotReceived, lost); + EXPECT_EQ(STATUS_SUCCESS, freePeerConnection(&pRtcPeerConnection)); } -TEST_F(RtcpFunctionalityTest, twcc3) +TEST_F(RtcpFunctionalityTest, twccParsePacketTest) { parseTwcc("", 0, 0); parseTwcc("4487A9E754B3E6FD01810001147A75A62001C801", 1, 0); From d7ae2abcad811bea4bcc7b71861e36dbc17e7d46 Mon Sep 17 00:00:00 2001 From: Stefan Kieszkowski <85728496+stefankiesz@users.noreply.github.com> Date: Tue, 5 Nov 2024 11:21:17 -0800 Subject: [PATCH 02/16] Remove enableIceStats merge conflict --- samples/Common.c | 1 - 1 file changed, 1 deletion(-) diff --git a/samples/Common.c b/samples/Common.c index eabfb2d30c..8593ffdc8f 100644 --- a/samples/Common.c +++ b/samples/Common.c @@ -365,7 +365,6 @@ STATUS initializePeerConnection(PSampleConfiguration pSampleConfiguration, PRtcP // Set the ICE mode explicitly configuration.iceTransportPolicy = ICE_TRANSPORT_POLICY_ALL; - configuration.kvsRtcConfiguration.enableIceStats = pSampleConfiguration->enableIceStats; configuration.kvsRtcConfiguration.disableSenderSideBandwidthEstimation = TRUE; // Set the STUN server PCHAR pKinesisVideoStunUrlPostFix = KINESIS_VIDEO_STUN_URL_POSTFIX; From 7f3ff911a4799dbb1f67f759f3f103826f87083c Mon Sep 17 00:00:00 2001 From: Stefan Kieszkowski <85728496+stefankiesz@users.noreply.github.com> Date: Wed, 6 Nov 2024 22:58:48 -0800 Subject: [PATCH 03/16] Address comments --- samples/Common.c | 1 - src/source/PeerConnection/PeerConnection.c | 3 ++- tst/RtcpFunctionalityTest.cpp | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/samples/Common.c b/samples/Common.c index 8593ffdc8f..35bc2c3b80 100644 --- a/samples/Common.c +++ b/samples/Common.c @@ -365,7 +365,6 @@ STATUS initializePeerConnection(PSampleConfiguration pSampleConfiguration, PRtcP // Set the ICE mode explicitly configuration.iceTransportPolicy = ICE_TRANSPORT_POLICY_ALL; - configuration.kvsRtcConfiguration.disableSenderSideBandwidthEstimation = TRUE; // Set the STUN server PCHAR pKinesisVideoStunUrlPostFix = KINESIS_VIDEO_STUN_URL_POSTFIX; // If region is in CN, add CN region uri postfix diff --git a/src/source/PeerConnection/PeerConnection.c b/src/source/PeerConnection/PeerConnection.c index c2fae98d6f..50650c1fbc 100644 --- a/src/source/PeerConnection/PeerConnection.c +++ b/src/source/PeerConnection/PeerConnection.c @@ -1140,9 +1140,10 @@ STATUS freePeerConnection(PRtcPeerConnection* ppPeerConnection) PROFILE_WITH_START_TIME_OBJ(startTime, pKvsPeerConnection->peerConnectionDiagnostics.freePeerConnectionTime, "Free peer connection"); SAFE_MEMFREE(*ppPeerConnection); + *ppPeerConnection = NULL; ppPeerConnection = NULL; CleanUp: - if (ppPeerConnection != NULL) { + if (ppPeerConnection != NULL && *ppPeerConnection != NULL ) { if (IS_VALID_MUTEX_VALUE(pKvsPeerConnection->twccLock)) { if (twccLocked) { MUTEX_UNLOCK(pKvsPeerConnection->twccLock); diff --git a/tst/RtcpFunctionalityTest.cpp b/tst/RtcpFunctionalityTest.cpp index 8c3269e946..9d5a1cc4e9 100644 --- a/tst/RtcpFunctionalityTest.cpp +++ b/tst/RtcpFunctionalityTest.cpp @@ -347,7 +347,7 @@ static void parseTwcc(const std::string& hex, const uint32_t expectedReceived, c EXPECT_EQ(STATUS_SUCCESS, parseRtcpTwccPacket(&rtcpPacket, pKvsPeerConnection->pTwccManager)); for(i = 0; i < MAX_UINT16; i++) { - if(hashTableGet(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable, i, &value) == STATUS_SUCCESS) { + if(STATUS_SUCCEEDED(hashTableGet(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable, i, &value) == STATUS_SUCCESS)) { PTwccRtpPacketInfo tempTwccRtpPktInfo = (PTwccRtpPacketInfo) value; if(tempTwccRtpPktInfo->remoteTimeKvs == TWCC_PACKET_LOST_TIME) { lost++; From b7882529ab7b72a4c6cacffdbda206f37fdb8d19 Mon Sep 17 00:00:00 2001 From: Stefan Kieszkowski <85728496+stefankiesz@users.noreply.github.com> Date: Thu, 7 Nov 2024 10:26:11 -0800 Subject: [PATCH 04/16] Fix left over typo --- tst/RtcpFunctionalityTest.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tst/RtcpFunctionalityTest.cpp b/tst/RtcpFunctionalityTest.cpp index 9d5a1cc4e9..08101a55b7 100644 --- a/tst/RtcpFunctionalityTest.cpp +++ b/tst/RtcpFunctionalityTest.cpp @@ -347,7 +347,7 @@ static void parseTwcc(const std::string& hex, const uint32_t expectedReceived, c EXPECT_EQ(STATUS_SUCCESS, parseRtcpTwccPacket(&rtcpPacket, pKvsPeerConnection->pTwccManager)); for(i = 0; i < MAX_UINT16; i++) { - if(STATUS_SUCCEEDED(hashTableGet(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable, i, &value) == STATUS_SUCCESS)) { + if(STATUS_SUCCEEDED(hashTableGet(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable, i, &value))) { PTwccRtpPacketInfo tempTwccRtpPktInfo = (PTwccRtpPacketInfo) value; if(tempTwccRtpPktInfo->remoteTimeKvs == TWCC_PACKET_LOST_TIME) { lost++; From 9f726b35da2059a837b8af8a8312496d054d39b7 Mon Sep 17 00:00:00 2001 From: Stefan Kieszkowski <85728496+stefankiesz@users.noreply.github.com> Date: Thu, 7 Nov 2024 21:23:16 -0800 Subject: [PATCH 05/16] Address comments, refactor updaing of TWCC hash table in onRtcpTwccPacket for testing --- src/source/PeerConnection/PeerConnection.c | 1 - src/source/PeerConnection/Rtcp.c | 59 ++++++++++++++-------- src/source/PeerConnection/Rtcp.h | 1 + tst/RtcpFunctionalityTest.cpp | 54 ++++++++++++++++++++ 4 files changed, 93 insertions(+), 22 deletions(-) diff --git a/src/source/PeerConnection/PeerConnection.c b/src/source/PeerConnection/PeerConnection.c index 50650c1fbc..2de2079c97 100644 --- a/src/source/PeerConnection/PeerConnection.c +++ b/src/source/PeerConnection/PeerConnection.c @@ -1140,7 +1140,6 @@ STATUS freePeerConnection(PRtcPeerConnection* ppPeerConnection) PROFILE_WITH_START_TIME_OBJ(startTime, pKvsPeerConnection->peerConnectionDiagnostics.freePeerConnectionTime, "Free peer connection"); SAFE_MEMFREE(*ppPeerConnection); - *ppPeerConnection = NULL; ppPeerConnection = NULL; CleanUp: if (ppPeerConnection != NULL && *ppPeerConnection != NULL ) { diff --git a/src/source/PeerConnection/Rtcp.c b/src/source/PeerConnection/Rtcp.c index d10f2a8cfb..a285cbeca0 100644 --- a/src/source/PeerConnection/Rtcp.c +++ b/src/source/PeerConnection/Rtcp.c @@ -317,28 +317,20 @@ STATUS parseRtcpTwccPacket(PRtcpPacket pRtcpPacket, PTwccManager pTwccManager) return retStatus; } -STATUS onRtcpTwccPacket(PRtcpPacket pRtcpPacket, PKvsPeerConnection pKvsPeerConnection) +STATUS updateTwccHashTable(PTwccManager pTwccManager, PINT64 duration, PUINT64 receivedBytes, PUINT64 receivedPackets, PUINT64 sentBytes, PUINT64 sentPackets) { STATUS retStatus = STATUS_SUCCESS; - PTwccManager pTwccManager = NULL; - BOOL locked = FALSE; + UINT64 localStartTimeKvs, localEndTimeKvs = 0; UINT16 baseSeqNum = 0; - UINT64 localStartTimeKvs, localEndTimeKvs; - UINT64 sentBytes = 0, receivedBytes = 0; - UINT64 sentPackets = 0, receivedPackets = 0; - INT64 duration = 0; - UINT16 seqNum = 0; - PTwccRtpPacketInfo pTwccPacket = NULL; - UINT64 twccPktValue = 0; BOOL localStartTimeRecorded = FALSE; + UINT64 twccPktValue = 0; + PTwccRtpPacketInfo pTwccPacket = NULL; + UINT16 seqNum = 0; - CHK(pKvsPeerConnection != NULL && pRtcpPacket != NULL, STATUS_NULL_ARG); - CHK(pKvsPeerConnection->onSenderBandwidthEstimation != NULL && pKvsPeerConnection->pTwccManager != NULL, STATUS_SUCCESS); + duration, *sentBytes, *receivedBytes, *receivedPackets, *sentBytes, *sentPackets = 0; + + CHK(pTwccManager != NULL && duration != NULL && receivedBytes != NULL && receivedPackets != NULL && sentBytes != NULL && sentPackets != NULL, STATUS_NULL_ARG); - MUTEX_LOCK(pKvsPeerConnection->twccLock); - locked = TRUE; - pTwccManager = pKvsPeerConnection->pTwccManager; - CHK_STATUS(parseRtcpTwccPacket(pRtcpPacket, pTwccManager)); baseSeqNum = pTwccManager->prevReportedBaseSeqNum; // Use != instead to cover the case where the group of sequence numbers being checked @@ -378,12 +370,12 @@ STATUS onRtcpTwccPacket(PRtcpPacket pRtcpPacket, PKvsPeerConnection pKvsPeerConn pTwccPacket = (PTwccRtpPacketInfo) twccPktValue; if (pTwccPacket != NULL) { localEndTimeKvs = pTwccPacket->localTimeKvs; - duration = localEndTimeKvs - localStartTimeKvs; - sentBytes += pTwccPacket->packetSize; - sentPackets++; + *duration = localEndTimeKvs - localStartTimeKvs; + *sentBytes += pTwccPacket->packetSize; + *sentPackets++; if (pTwccPacket->remoteTimeKvs != TWCC_PACKET_LOST_TIME) { - receivedBytes += pTwccPacket->packetSize; - receivedPackets++; + *receivedBytes += pTwccPacket->packetSize; + *receivedPackets++; if (STATUS_SUCCEEDED(hashTableRemove(pTwccManager->pTwccRtpPktInfosHashTable, seqNum))) { SAFE_MEMFREE(pTwccPacket); } @@ -392,6 +384,31 @@ STATUS onRtcpTwccPacket(PRtcpPacket pRtcpPacket, PKvsPeerConnection pKvsPeerConn } } +CleanUp: + CHK_LOG_ERR(retStatus); + return retStatus; +} + + +STATUS onRtcpTwccPacket(PRtcpPacket pRtcpPacket, PKvsPeerConnection pKvsPeerConnection) +{ + STATUS retStatus = STATUS_SUCCESS; + PTwccManager pTwccManager = NULL; + BOOL locked = FALSE; + UINT64 sentBytes = 0, receivedBytes = 0; + UINT64 sentPackets = 0, receivedPackets = 0; + INT64 duration = 0; + + CHK(pKvsPeerConnection != NULL && pRtcpPacket != NULL, STATUS_NULL_ARG); + CHK(pKvsPeerConnection->pTwccManager != NULL && pKvsPeerConnection->onSenderBandwidthEstimation != NULL, STATUS_SUCCESS); + + MUTEX_LOCK(pKvsPeerConnection->twccLock); + locked = TRUE; + pTwccManager = pKvsPeerConnection->pTwccManager; + CHK_STATUS(parseRtcpTwccPacket(pRtcpPacket, pTwccManager)); + + updateTwccHashTable(pTwccManager, &duration, &receivedBytes, &receivedPackets, &sentBytes, &sentPackets); + if (duration > 0) { MUTEX_UNLOCK(pKvsPeerConnection->twccLock); locked = FALSE; diff --git a/src/source/PeerConnection/Rtcp.h b/src/source/PeerConnection/Rtcp.h index 229cebbe3f..ae2e505e02 100644 --- a/src/source/PeerConnection/Rtcp.h +++ b/src/source/PeerConnection/Rtcp.h @@ -12,6 +12,7 @@ STATUS onRtcpRembPacket(PRtcpPacket, PKvsPeerConnection); STATUS onRtcpPLIPacket(PRtcpPacket, PKvsPeerConnection); STATUS parseRtcpTwccPacket(PRtcpPacket, PTwccManager); STATUS onRtcpTwccPacket(PRtcpPacket, PKvsPeerConnection); +STATUS updateTwccHashTable(PTwccManager, PINT64, PUINT64, PUINT64, PUINT64, PUINT64); // https://tools.ietf.org/html/draft-holmer-rmcat-transport-wide-cc-extensions-01 // Deltas are represented as multiples of 250us: diff --git a/tst/RtcpFunctionalityTest.cpp b/tst/RtcpFunctionalityTest.cpp index 08101a55b7..7c158c290b 100644 --- a/tst/RtcpFunctionalityTest.cpp +++ b/tst/RtcpFunctionalityTest.cpp @@ -404,6 +404,60 @@ TEST_F(RtcpFunctionalityTest, twccParsePacketTest) parseTwcc("4487A9E754B3E6FD04B60036147CAA852024C002D999D6407800000000000000000000000000040000000000000000", 43, 11); parseTwcc("4487A9E754B3E6FD040200E4147C9F81202700B7E6649000000000000000000004000000000008000018000000001", 43, 185); } + +TEST_F(RtcpFunctionalityTest, twccHandleTwccPacketTest) +{ + PRtcPeerConnection pRtcPeerConnection = NULL; + PKvsPeerConnection pKvsPeerConnection = NULL; + RtcConfiguration config{}; + UINT64 receivedBytes, receivedPackets, sentBytes, sentPackets = 0; + INT64 duration = 0; + PTwccRtpPacketInfo pTwccRtpPacketInfo = NULL; + PHashTable pTwccRtpPktInfosHashTable = NULL; + UINT16 hashTableInsertionCount = 0; + UINT16 lowerBound = UINT16_MAX - 3; + UINT16 upperBound = 3; + UINT32 hashTableItemCount = 0; + UINT16 i = 0; + + // Initialize structs and members. + EXPECT_EQ(STATUS_SUCCESS, createPeerConnection(&config, &pRtcPeerConnection)); + pKvsPeerConnection = reinterpret_cast(pRtcPeerConnection); + EXPECT_EQ(STATUS_SUCCESS, peerConnectionOnSenderBandwidthEstimation(pRtcPeerConnection, 0, testBwHandler)); + + // Grab the hash table. + pTwccRtpPktInfosHashTable = pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable; + + pKvsPeerConnection->pTwccManager->prevReportedBaseSeqNum = lowerBound; + pKvsPeerConnection->pTwccManager->lastReportedSeqNum = upperBound + 10; + + // Breakup the packet indexes to be across the max int overflow. + for (i = lowerBound; i < UINT16_MAX; i++) + { + pTwccRtpPacketInfo = (PTwccRtpPacketInfo) MEMCALLOC(1, SIZEOF(TwccRtpPacketInfo)); + EXPECT_EQ(STATUS_SUCCESS, hashTableUpsert(pTwccRtpPktInfosHashTable, i, (UINT64) pTwccRtpPacketInfo)); + hashTableInsertionCount++; + } + for (i = 0; i < upperBound; i++) + { + pTwccRtpPacketInfo = (PTwccRtpPacketInfo) MEMCALLOC(1, SIZEOF(TwccRtpPacketInfo)); + EXPECT_EQ(STATUS_SUCCESS, hashTableUpsert(pTwccRtpPktInfosHashTable, i, (UINT64) pTwccRtpPacketInfo)); + hashTableInsertionCount++; + } + + // Add at a non-monotonically-increased index. + pTwccRtpPacketInfo = (PTwccRtpPacketInfo) MEMCALLOC(1, SIZEOF(TwccRtpPacketInfo)); + EXPECT_EQ(STATUS_SUCCESS, hashTableUpsert(pTwccRtpPktInfosHashTable, upperBound + 10, (UINT64) pTwccRtpPacketInfo)); + hashTableInsertionCount++; + + // Validate hash table size after and before updating (onRtcpTwccPacket case). + EXPECT_EQ(hashTableInsertionCount, pTwccRtpPktInfosHashTable->itemCount); + EXPECT_EQ(STATUS_SUCCESS, updateTwccHashTable(pKvsPeerConnection->pTwccManager, &duration, &receivedBytes, &receivedPackets, &sentBytes, &sentPackets)); + EXPECT_EQ(0, pTwccRtpPktInfosHashTable->itemCount); + + EXPECT_EQ(STATUS_SUCCESS, freePeerConnection(&pRtcPeerConnection)); +} + } // namespace webrtcclient } // namespace video } // namespace kinesis From e8634becb1d5a57b7ffcae29bb0e7e07da7addc4 Mon Sep 17 00:00:00 2001 From: Stefan Kieszkowski <85728496+stefankiesz@users.noreply.github.com> Date: Thu, 7 Nov 2024 21:44:24 -0800 Subject: [PATCH 06/16] Clang format, fix compiler Werror --- src/source/PeerConnection/PeerConnection.c | 2 +- src/source/PeerConnection/Rtcp.c | 19 ++++++++++++------- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/src/source/PeerConnection/PeerConnection.c b/src/source/PeerConnection/PeerConnection.c index 2de2079c97..5bce9fcd00 100644 --- a/src/source/PeerConnection/PeerConnection.c +++ b/src/source/PeerConnection/PeerConnection.c @@ -1142,7 +1142,7 @@ STATUS freePeerConnection(PRtcPeerConnection* ppPeerConnection) SAFE_MEMFREE(*ppPeerConnection); ppPeerConnection = NULL; CleanUp: - if (ppPeerConnection != NULL && *ppPeerConnection != NULL ) { + if (ppPeerConnection != NULL && *ppPeerConnection != NULL) { if (IS_VALID_MUTEX_VALUE(pKvsPeerConnection->twccLock)) { if (twccLocked) { MUTEX_UNLOCK(pKvsPeerConnection->twccLock); diff --git a/src/source/PeerConnection/Rtcp.c b/src/source/PeerConnection/Rtcp.c index a285cbeca0..fee8083f82 100644 --- a/src/source/PeerConnection/Rtcp.c +++ b/src/source/PeerConnection/Rtcp.c @@ -317,7 +317,8 @@ STATUS parseRtcpTwccPacket(PRtcpPacket pRtcpPacket, PTwccManager pTwccManager) return retStatus; } -STATUS updateTwccHashTable(PTwccManager pTwccManager, PINT64 duration, PUINT64 receivedBytes, PUINT64 receivedPackets, PUINT64 sentBytes, PUINT64 sentPackets) +STATUS updateTwccHashTable(PTwccManager pTwccManager, PINT64 duration, PUINT64 receivedBytes, PUINT64 receivedPackets, PUINT64 sentBytes, + PUINT64 sentPackets) { STATUS retStatus = STATUS_SUCCESS; UINT64 localStartTimeKvs, localEndTimeKvs = 0; @@ -327,9 +328,14 @@ STATUS updateTwccHashTable(PTwccManager pTwccManager, PINT64 duration, PUINT64 r PTwccRtpPacketInfo pTwccPacket = NULL; UINT16 seqNum = 0; - duration, *sentBytes, *receivedBytes, *receivedPackets, *sentBytes, *sentPackets = 0; + duration = 0; + *sentBytes = 0; + *receivedBytes = 0; + *receivedPackets = 0; + *sentPackets = 0; - CHK(pTwccManager != NULL && duration != NULL && receivedBytes != NULL && receivedPackets != NULL && sentBytes != NULL && sentPackets != NULL, STATUS_NULL_ARG); + CHK(pTwccManager != NULL && duration != NULL && receivedBytes != NULL && receivedPackets != NULL && sentBytes != NULL && sentPackets != NULL, + STATUS_NULL_ARG); baseSeqNum = pTwccManager->prevReportedBaseSeqNum; @@ -385,11 +391,10 @@ STATUS updateTwccHashTable(PTwccManager pTwccManager, PINT64 duration, PUINT64 r } CleanUp: - CHK_LOG_ERR(retStatus); - return retStatus; + CHK_LOG_ERR(retStatus); + return retStatus; } - STATUS onRtcpTwccPacket(PRtcpPacket pRtcpPacket, PKvsPeerConnection pKvsPeerConnection) { STATUS retStatus = STATUS_SUCCESS; @@ -398,7 +403,7 @@ STATUS onRtcpTwccPacket(PRtcpPacket pRtcpPacket, PKvsPeerConnection pKvsPeerConn UINT64 sentBytes = 0, receivedBytes = 0; UINT64 sentPackets = 0, receivedPackets = 0; INT64 duration = 0; - + CHK(pKvsPeerConnection != NULL && pRtcpPacket != NULL, STATUS_NULL_ARG); CHK(pKvsPeerConnection->pTwccManager != NULL && pKvsPeerConnection->onSenderBandwidthEstimation != NULL, STATUS_SUCCESS); From 4b9f88516f2024dff158f27b158b58f709221d5a Mon Sep 17 00:00:00 2001 From: Stefan Kieszkowski <85728496+stefankiesz@users.noreply.github.com> Date: Thu, 7 Nov 2024 22:29:11 -0800 Subject: [PATCH 07/16] Correct typos --- src/source/PeerConnection/Rtcp.c | 13 ++++++++----- tst/RtcpFunctionalityTest.cpp | 4 ++-- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/src/source/PeerConnection/Rtcp.c b/src/source/PeerConnection/Rtcp.c index fee8083f82..9a38c41826 100644 --- a/src/source/PeerConnection/Rtcp.c +++ b/src/source/PeerConnection/Rtcp.c @@ -328,15 +328,15 @@ STATUS updateTwccHashTable(PTwccManager pTwccManager, PINT64 duration, PUINT64 r PTwccRtpPacketInfo pTwccPacket = NULL; UINT16 seqNum = 0; - duration = 0; - *sentBytes = 0; + CHK(pTwccManager != NULL && duration != NULL && receivedBytes != NULL && receivedPackets != NULL && sentBytes != NULL && sentPackets != NULL, + STATUS_NULL_ARG); + + *duration = 0; *receivedBytes = 0; *receivedPackets = 0; + *sentBytes = 0; *sentPackets = 0; - CHK(pTwccManager != NULL && duration != NULL && receivedBytes != NULL && receivedPackets != NULL && sentBytes != NULL && sentPackets != NULL, - STATUS_NULL_ARG); - baseSeqNum = pTwccManager->prevReportedBaseSeqNum; // Use != instead to cover the case where the group of sequence numbers being checked @@ -390,6 +390,9 @@ STATUS updateTwccHashTable(PTwccManager pTwccManager, PINT64 duration, PUINT64 r } } + UNUSED_PARAM(sentPackets); + UNUSED_PARAM(receivedPackets); + CleanUp: CHK_LOG_ERR(retStatus); return retStatus; diff --git a/tst/RtcpFunctionalityTest.cpp b/tst/RtcpFunctionalityTest.cpp index 7c158c290b..5cf7d5eaad 100644 --- a/tst/RtcpFunctionalityTest.cpp +++ b/tst/RtcpFunctionalityTest.cpp @@ -405,12 +405,12 @@ TEST_F(RtcpFunctionalityTest, twccParsePacketTest) parseTwcc("4487A9E754B3E6FD040200E4147C9F81202700B7E6649000000000000000000004000000000008000018000000001", 43, 185); } -TEST_F(RtcpFunctionalityTest, twccHandleTwccPacketTest) +TEST_F(RtcpFunctionalityTest, updateTwccHashTableTest) { PRtcPeerConnection pRtcPeerConnection = NULL; PKvsPeerConnection pKvsPeerConnection = NULL; RtcConfiguration config{}; - UINT64 receivedBytes, receivedPackets, sentBytes, sentPackets = 0; + UINT64 receivedBytes = 0, receivedPackets = 0, sentBytes = 0, sentPackets = 0; INT64 duration = 0; PTwccRtpPacketInfo pTwccRtpPacketInfo = NULL; PHashTable pTwccRtpPktInfosHashTable = NULL; From 13d2dd125bd88c4f4c1f57620dae057260ef1b63 Mon Sep 17 00:00:00 2001 From: Stefan Kieszkowski <85728496+stefankiesz@users.noreply.github.com> Date: Thu, 7 Nov 2024 22:56:20 -0800 Subject: [PATCH 08/16] Fix "expression result unused" Werrors --- src/source/PeerConnection/Rtcp.c | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/source/PeerConnection/Rtcp.c b/src/source/PeerConnection/Rtcp.c index 9a38c41826..572e48d195 100644 --- a/src/source/PeerConnection/Rtcp.c +++ b/src/source/PeerConnection/Rtcp.c @@ -378,10 +378,10 @@ STATUS updateTwccHashTable(PTwccManager pTwccManager, PINT64 duration, PUINT64 r localEndTimeKvs = pTwccPacket->localTimeKvs; *duration = localEndTimeKvs - localStartTimeKvs; *sentBytes += pTwccPacket->packetSize; - *sentPackets++; + (*sentPackets)++; if (pTwccPacket->remoteTimeKvs != TWCC_PACKET_LOST_TIME) { *receivedBytes += pTwccPacket->packetSize; - *receivedPackets++; + (*receivedPackets)++; if (STATUS_SUCCEEDED(hashTableRemove(pTwccManager->pTwccRtpPktInfosHashTable, seqNum))) { SAFE_MEMFREE(pTwccPacket); } @@ -390,9 +390,6 @@ STATUS updateTwccHashTable(PTwccManager pTwccManager, PINT64 duration, PUINT64 r } } - UNUSED_PARAM(sentPackets); - UNUSED_PARAM(receivedPackets); - CleanUp: CHK_LOG_ERR(retStatus); return retStatus; From 97b17e871ff3c43049383c952462cfe58bee4d9a Mon Sep 17 00:00:00 2001 From: Stefan Kieszkowski <85728496+stefankiesz@users.noreply.github.com> Date: Thu, 7 Nov 2024 23:18:55 -0800 Subject: [PATCH 09/16] Remove unused variable --- tst/RtcpFunctionalityTest.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/tst/RtcpFunctionalityTest.cpp b/tst/RtcpFunctionalityTest.cpp index 5cf7d5eaad..0e7328d3dd 100644 --- a/tst/RtcpFunctionalityTest.cpp +++ b/tst/RtcpFunctionalityTest.cpp @@ -417,7 +417,6 @@ TEST_F(RtcpFunctionalityTest, updateTwccHashTableTest) UINT16 hashTableInsertionCount = 0; UINT16 lowerBound = UINT16_MAX - 3; UINT16 upperBound = 3; - UINT32 hashTableItemCount = 0; UINT16 i = 0; // Initialize structs and members. From 73175a5bd47cbcd0be2a9af18814a32942856c86 Mon Sep 17 00:00:00 2001 From: Stefan Kieszkowski <85728496+stefankiesz@users.noreply.github.com> Date: Fri, 8 Nov 2024 10:32:01 -0800 Subject: [PATCH 10/16] Address comments --- src/source/PeerConnection/PeerConnection.c | 4 ++-- tst/RtcpFunctionalityTest.cpp | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/source/PeerConnection/PeerConnection.c b/src/source/PeerConnection/PeerConnection.c index 5bce9fcd00..4b34b6320a 100644 --- a/src/source/PeerConnection/PeerConnection.c +++ b/src/source/PeerConnection/PeerConnection.c @@ -1849,7 +1849,7 @@ STATUS deinitKvsWebRtc(VOID) } // Not thread safe. Ensure this function is invoked in a guarded section -static STATUS twccRollingWindowDeletion(PKvsPeerConnection pKvsPeerConnection, PRtpPacket pRtpPacket, UINT16 seqNum) +static STATUS twccRollingWindowDeletion(PKvsPeerConnection pKvsPeerConnection, PRtpPacket pRtpPacket, UINT16 endingSeqNum) { ENTERS(); STATUS retStatus = STATUS_SUCCESS; @@ -1893,7 +1893,7 @@ static STATUS twccRollingWindowDeletion(PKvsPeerConnection pKvsPeerConnection, P } // reset before next iteration tempTwccRtpPktInfo = NULL; - } while (!isCheckComplete && updatedSeqNum != (seqNum + 1)); + } while (!isCheckComplete && updatedSeqNum != (endingSeqNum + 1)); // Update regardless. The loop checks until current RTP packets seq number irrespective of the failure pKvsPeerConnection->pTwccManager->firstSeqNumInRollingWindow = updatedSeqNum; diff --git a/tst/RtcpFunctionalityTest.cpp b/tst/RtcpFunctionalityTest.cpp index 0e7328d3dd..417f27db99 100644 --- a/tst/RtcpFunctionalityTest.cpp +++ b/tst/RtcpFunctionalityTest.cpp @@ -431,7 +431,7 @@ TEST_F(RtcpFunctionalityTest, updateTwccHashTableTest) pKvsPeerConnection->pTwccManager->lastReportedSeqNum = upperBound + 10; // Breakup the packet indexes to be across the max int overflow. - for (i = lowerBound; i < UINT16_MAX; i++) + for (i = lowerBound; i <= UINT16_MAX; i++) { pTwccRtpPacketInfo = (PTwccRtpPacketInfo) MEMCALLOC(1, SIZEOF(TwccRtpPacketInfo)); EXPECT_EQ(STATUS_SUCCESS, hashTableUpsert(pTwccRtpPktInfosHashTable, i, (UINT64) pTwccRtpPacketInfo)); From 6ad90fb32fba3cf882146a5f430c07c51da259c7 Mon Sep 17 00:00:00 2001 From: Stefan Kieszkowski <85728496+stefankiesz@users.noreply.github.com> Date: Fri, 8 Nov 2024 12:52:55 -0800 Subject: [PATCH 11/16] Fix endless loop --- tst/RtcpFunctionalityTest.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tst/RtcpFunctionalityTest.cpp b/tst/RtcpFunctionalityTest.cpp index 417f27db99..ee4a371511 100644 --- a/tst/RtcpFunctionalityTest.cpp +++ b/tst/RtcpFunctionalityTest.cpp @@ -431,8 +431,9 @@ TEST_F(RtcpFunctionalityTest, updateTwccHashTableTest) pKvsPeerConnection->pTwccManager->lastReportedSeqNum = upperBound + 10; // Breakup the packet indexes to be across the max int overflow. - for (i = lowerBound; i <= UINT16_MAX; i++) + for (i = lowerBound; i <= UINT16_MAX && i != 0 ; i++) { + DLOGD("HERE: %d", i); pTwccRtpPacketInfo = (PTwccRtpPacketInfo) MEMCALLOC(1, SIZEOF(TwccRtpPacketInfo)); EXPECT_EQ(STATUS_SUCCESS, hashTableUpsert(pTwccRtpPktInfosHashTable, i, (UINT64) pTwccRtpPacketInfo)); hashTableInsertionCount++; From a2798ee76db51d7b3abaf45be9bffdf19de20c84 Mon Sep 17 00:00:00 2001 From: Stefan Kieszkowski <85728496+stefankiesz@users.noreply.github.com> Date: Mon, 11 Nov 2024 14:48:01 -0800 Subject: [PATCH 12/16] Address comment --- src/source/PeerConnection/PeerConnection.c | 2 +- tst/RtcpFunctionalityTest.cpp | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/source/PeerConnection/PeerConnection.c b/src/source/PeerConnection/PeerConnection.c index 4b34b6320a..85d2501d7b 100644 --- a/src/source/PeerConnection/PeerConnection.c +++ b/src/source/PeerConnection/PeerConnection.c @@ -1884,7 +1884,7 @@ static STATUS twccRollingWindowDeletion(PKvsPeerConnection pKvsPeerConnection, P } } else { // Move to the next seqNum to check if we can remove the next one atleast - DLOGV("Detected timestamp not increasing monotonically for RTP packet %d [ts: %" PRIu64 ". Current RTP packets' ts: %" PRIu64, + DLOGV("Non-monotonic timestamp detected for RTP packet seqNum %d [ts: %" PRIu64 ". Current RTP packets' ts: %" PRIu64, updatedSeqNum, firstRtpTime, pRtpPacket->sentTime); updatedSeqNum++; } diff --git a/tst/RtcpFunctionalityTest.cpp b/tst/RtcpFunctionalityTest.cpp index ee4a371511..db9e55e35a 100644 --- a/tst/RtcpFunctionalityTest.cpp +++ b/tst/RtcpFunctionalityTest.cpp @@ -433,7 +433,6 @@ TEST_F(RtcpFunctionalityTest, updateTwccHashTableTest) // Breakup the packet indexes to be across the max int overflow. for (i = lowerBound; i <= UINT16_MAX && i != 0 ; i++) { - DLOGD("HERE: %d", i); pTwccRtpPacketInfo = (PTwccRtpPacketInfo) MEMCALLOC(1, SIZEOF(TwccRtpPacketInfo)); EXPECT_EQ(STATUS_SUCCESS, hashTableUpsert(pTwccRtpPktInfosHashTable, i, (UINT64) pTwccRtpPacketInfo)); hashTableInsertionCount++; From 9310996354cef5ece5fc5f9f446da4b2a14c67b6 Mon Sep 17 00:00:00 2001 From: Stefan Kieszkowski <85728496+stefankiesz@users.noreply.github.com> Date: Mon, 11 Nov 2024 15:17:40 -0800 Subject: [PATCH 13/16] Add removal of null hashTable items, add test for null item in hashTable --- src/source/PeerConnection/PeerConnection.c | 35 ++++++++++++---------- src/source/PeerConnection/Rtcp.c | 2 ++ tst/RtcpFunctionalityTest.cpp | 14 +++++++++ 3 files changed, 35 insertions(+), 16 deletions(-) diff --git a/src/source/PeerConnection/PeerConnection.c b/src/source/PeerConnection/PeerConnection.c index 85d2501d7b..5934a4f7d7 100644 --- a/src/source/PeerConnection/PeerConnection.c +++ b/src/source/PeerConnection/PeerConnection.c @@ -1866,26 +1866,29 @@ static STATUS twccRollingWindowDeletion(PKvsPeerConnection pKvsPeerConnection, P // If the seqNum is not present in the hash table, it is ok. We move on to the next if (STATUS_SUCCEEDED(hashTableGet(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable, updatedSeqNum, &twccPacketValue))) { tempTwccRtpPktInfo = (PTwccRtpPacketInfo) twccPacketValue; - } - if (tempTwccRtpPktInfo != NULL) { - firstRtpTime = tempTwccRtpPktInfo->localTimeKvs; - // Would be the case if the timestamps are not monotonically increasing. - if (pRtpPacket->sentTime >= firstRtpTime) { - ageOfOldest = pRtpPacket->sentTime - firstRtpTime; - if (ageOfOldest > TWCC_ESTIMATOR_TIME_WINDOW) { - // If the seqNum is not present in the hash table, move on. However, this case should not happen - // given this function is holding the lock and tempTwccRtpPktInfo is populated because it exists - if (STATUS_SUCCEEDED(hashTableRemove(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable, updatedSeqNum))) { - SAFE_MEMFREE(tempTwccRtpPktInfo); + if (tempTwccRtpPktInfo != NULL) { + firstRtpTime = tempTwccRtpPktInfo->localTimeKvs; + // Would be the case if the timestamps are not monotonically increasing. + if (pRtpPacket->sentTime >= firstRtpTime) { + ageOfOldest = pRtpPacket->sentTime - firstRtpTime; + if (ageOfOldest > TWCC_ESTIMATOR_TIME_WINDOW) { + // If the seqNum is not present in the hash table, move on. However, this case should not happen + // given this function is holding the lock and tempTwccRtpPktInfo is populated because it exists + if (STATUS_SUCCEEDED(hashTableRemove(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable, updatedSeqNum))) { + SAFE_MEMFREE(tempTwccRtpPktInfo); + } + updatedSeqNum++; + } else { + isCheckComplete = TRUE; } - updatedSeqNum++; } else { - isCheckComplete = TRUE; + // Move to the next seqNum to check if we can remove the next one atleast + DLOGV("Non-monotonic timestamp detected for RTP packet seqNum %d [ts: %" PRIu64 ". Current RTP packets' ts: %" PRIu64, + updatedSeqNum, firstRtpTime, pRtpPacket->sentTime); + updatedSeqNum++; } } else { - // Move to the next seqNum to check if we can remove the next one atleast - DLOGV("Non-monotonic timestamp detected for RTP packet seqNum %d [ts: %" PRIu64 ". Current RTP packets' ts: %" PRIu64, - updatedSeqNum, firstRtpTime, pRtpPacket->sentTime); + CHK_STATUS(hashTableRemove(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable, updatedSeqNum)); updatedSeqNum++; } } else { diff --git a/src/source/PeerConnection/Rtcp.c b/src/source/PeerConnection/Rtcp.c index 572e48d195..f4c8147c49 100644 --- a/src/source/PeerConnection/Rtcp.c +++ b/src/source/PeerConnection/Rtcp.c @@ -386,6 +386,8 @@ STATUS updateTwccHashTable(PTwccManager pTwccManager, PINT64 duration, PUINT64 r SAFE_MEMFREE(pTwccPacket); } } + } else { + CHK_STATUS(hashTableRemove(pTwccManager->pTwccRtpPktInfosHashTable, seqNum)); } } } diff --git a/tst/RtcpFunctionalityTest.cpp b/tst/RtcpFunctionalityTest.cpp index db9e55e35a..7a3cd74da6 100644 --- a/tst/RtcpFunctionalityTest.cpp +++ b/tst/RtcpFunctionalityTest.cpp @@ -454,6 +454,20 @@ TEST_F(RtcpFunctionalityTest, updateTwccHashTableTest) EXPECT_EQ(STATUS_SUCCESS, updateTwccHashTable(pKvsPeerConnection->pTwccManager, &duration, &receivedBytes, &receivedPackets, &sentBytes, &sentPackets)); EXPECT_EQ(0, pTwccRtpPktInfosHashTable->itemCount); + hashTableInsertionCount = 0; + pTwccRtpPacketInfo = NULL; + for (i = 0; i <= upperBound; i++) + { + EXPECT_EQ(STATUS_SUCCESS, hashTableUpsert(pTwccRtpPktInfosHashTable, i, (UINT64) pTwccRtpPacketInfo)); + hashTableInsertionCount++; + } + EXPECT_EQ(hashTableInsertionCount, pTwccRtpPktInfosHashTable->itemCount); + EXPECT_EQ(STATUS_SUCCESS, updateTwccHashTable(pKvsPeerConnection->pTwccManager, &duration, &receivedBytes, &receivedPackets, &sentBytes, &sentPackets)); + EXPECT_EQ(0, pTwccRtpPktInfosHashTable->itemCount); + + MUTEX_LOCK(pKvsPeerConnection->twccLock); + MUTEX_UNLOCK(pKvsPeerConnection->twccLock); + EXPECT_EQ(STATUS_SUCCESS, freePeerConnection(&pRtcPeerConnection)); } From 35e2b885c1027c8f305f3fd9605c479069402f6e Mon Sep 17 00:00:00 2001 From: Stefan Kieszkowski <85728496+stefankiesz@users.noreply.github.com> Date: Tue, 12 Nov 2024 10:31:10 -0800 Subject: [PATCH 14/16] Address comments --- src/source/PeerConnection/PeerConnection.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/source/PeerConnection/PeerConnection.c b/src/source/PeerConnection/PeerConnection.c index 5934a4f7d7..69082c2570 100644 --- a/src/source/PeerConnection/PeerConnection.c +++ b/src/source/PeerConnection/PeerConnection.c @@ -1140,9 +1140,9 @@ STATUS freePeerConnection(PRtcPeerConnection* ppPeerConnection) PROFILE_WITH_START_TIME_OBJ(startTime, pKvsPeerConnection->peerConnectionDiagnostics.freePeerConnectionTime, "Free peer connection"); SAFE_MEMFREE(*ppPeerConnection); - ppPeerConnection = NULL; + pKvsPeerConnection = NULL; CleanUp: - if (ppPeerConnection != NULL && *ppPeerConnection != NULL) { + if (pKvsPeerConnection != NULL) { if (IS_VALID_MUTEX_VALUE(pKvsPeerConnection->twccLock)) { if (twccLocked) { MUTEX_UNLOCK(pKvsPeerConnection->twccLock); @@ -1901,6 +1901,8 @@ static STATUS twccRollingWindowDeletion(PKvsPeerConnection pKvsPeerConnection, P // Update regardless. The loop checks until current RTP packets seq number irrespective of the failure pKvsPeerConnection->pTwccManager->firstSeqNumInRollingWindow = updatedSeqNum; CleanUp: + CHK_LOG_ERR(retStatus); + LEAVES(); return retStatus; } From 8b3742411614bbced9d4b35569017c5876740cc9 Mon Sep 17 00:00:00 2001 From: Stefan Kieszkowski <85728496+stefankiesz@users.noreply.github.com> Date: Tue, 12 Nov 2024 13:34:12 -0800 Subject: [PATCH 15/16] Cleanup freeing of KvsPeerConnection --- src/source/PeerConnection/PeerConnection.c | 26 +++++++++------------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/src/source/PeerConnection/PeerConnection.c b/src/source/PeerConnection/PeerConnection.c index 69082c2570..7dd38a19c4 100644 --- a/src/source/PeerConnection/PeerConnection.c +++ b/src/source/PeerConnection/PeerConnection.c @@ -1119,38 +1119,34 @@ STATUS freePeerConnection(PRtcPeerConnection* ppPeerConnection) if (pKvsPeerConnection->pTwccManager != NULL) { MUTEX_LOCK(pKvsPeerConnection->twccLock); twccLocked = TRUE; + if (STATUS_SUCCEEDED(hashTableGetCount(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable, &twccHashTableCount))) { DLOGI("Number of TWCC info packets in memory: %d", twccHashTableCount); } + CHK_LOG_ERR(hashTableIterateEntries(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable, 0, freeHashEntry)); CHK_LOG_ERR(hashTableFree(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable)); - if (IS_VALID_MUTEX_VALUE(pKvsPeerConnection->twccLock)) { - if (twccLocked) { - MUTEX_UNLOCK(pKvsPeerConnection->twccLock); - twccLocked = FALSE; - } - MUTEX_FREE(pKvsPeerConnection->twccLock); - pKvsPeerConnection->twccLock = INVALID_MUTEX_VALUE; - } + SAFE_MEMFREE(pKvsPeerConnection->pTwccManager); } // Incase the `RemoteSessionDescription` has not already been freed. SAFE_MEMFREE(pKvsPeerConnection->pRemoteSessionDescription); - PROFILE_WITH_START_TIME_OBJ(startTime, pKvsPeerConnection->peerConnectionDiagnostics.freePeerConnectionTime, "Free peer connection"); - SAFE_MEMFREE(*ppPeerConnection); - pKvsPeerConnection = NULL; -CleanUp: - if (pKvsPeerConnection != NULL) { - if (IS_VALID_MUTEX_VALUE(pKvsPeerConnection->twccLock)) { + if (IS_VALID_MUTEX_VALUE(pKvsPeerConnection->twccLock)) { if (twccLocked) { MUTEX_UNLOCK(pKvsPeerConnection->twccLock); twccLocked = FALSE; } MUTEX_FREE(pKvsPeerConnection->twccLock); - } + pKvsPeerConnection->twccLock = INVALID_MUTEX_VALUE; } + + PROFILE_WITH_START_TIME_OBJ(startTime, pKvsPeerConnection->peerConnectionDiagnostics.freePeerConnectionTime, "Free peer connection"); + SAFE_MEMFREE(*ppPeerConnection); + +CleanUp: + LEAVES(); return retStatus; } From 3940ce27a542534216731e0a45e6188babe70fae Mon Sep 17 00:00:00 2001 From: Stefan Kieszkowski <85728496+stefankiesz@users.noreply.github.com> Date: Tue, 12 Nov 2024 13:53:07 -0800 Subject: [PATCH 16/16] Clang format --- src/source/PeerConnection/PeerConnection.c | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/source/PeerConnection/PeerConnection.c b/src/source/PeerConnection/PeerConnection.c index 7dd38a19c4..ce5dd86bc7 100644 --- a/src/source/PeerConnection/PeerConnection.c +++ b/src/source/PeerConnection/PeerConnection.c @@ -1123,10 +1123,10 @@ STATUS freePeerConnection(PRtcPeerConnection* ppPeerConnection) if (STATUS_SUCCEEDED(hashTableGetCount(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable, &twccHashTableCount))) { DLOGI("Number of TWCC info packets in memory: %d", twccHashTableCount); } - + CHK_LOG_ERR(hashTableIterateEntries(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable, 0, freeHashEntry)); CHK_LOG_ERR(hashTableFree(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable)); - + SAFE_MEMFREE(pKvsPeerConnection->pTwccManager); } @@ -1134,12 +1134,12 @@ STATUS freePeerConnection(PRtcPeerConnection* ppPeerConnection) SAFE_MEMFREE(pKvsPeerConnection->pRemoteSessionDescription); if (IS_VALID_MUTEX_VALUE(pKvsPeerConnection->twccLock)) { - if (twccLocked) { - MUTEX_UNLOCK(pKvsPeerConnection->twccLock); - twccLocked = FALSE; - } - MUTEX_FREE(pKvsPeerConnection->twccLock); - pKvsPeerConnection->twccLock = INVALID_MUTEX_VALUE; + if (twccLocked) { + MUTEX_UNLOCK(pKvsPeerConnection->twccLock); + twccLocked = FALSE; + } + MUTEX_FREE(pKvsPeerConnection->twccLock); + pKvsPeerConnection->twccLock = INVALID_MUTEX_VALUE; } PROFILE_WITH_START_TIME_OBJ(startTime, pKvsPeerConnection->peerConnectionDiagnostics.freePeerConnectionTime, "Free peer connection");