// RTPAudioStream.cpp: implementation of the RTPAudioStream class. // ////////////////////////////////////////////////////////////////////// #include "RTPAudioStream.h" #include "Parameters.h" #include "AudioSample.h" #include "AudioSampleManager.h" #include "RTPPacket.h" #define STATS_INTERVAL 100 int avgBufferSize(0); int minBufferSize(0); int numRepeats(0); int numPackets(0); using namespace std; ////////////////////////////////////////////////////////////////////// // Construction/Destruction ////////////////////////////////////////////////////////////////////// RTPAudioStream::RTPAudioStream(unsigned long ssrc) { char subFacilityName[100]; sprintf(subFacilityName, "RTPAudioStream:%x", this); tracer.SetSubFacilityName(subFacilityName); SetTraceLevel(); dynamicJitterBuffer = GetRegKeyBool(HKEY_CURRENT_USER, "Software\\Cisco Systems\\CCNMediaTerm\\1.0", "UseDynamicJitterBuffer", true); jitterBufferDepth = 0; lastOutputDuration = 0; lastOutputTimestamp = 0; lastPacketRepeatCount = 0; lastPacketTransmitted = NULL; numSIDPackets = 0; restartThreshold = DEFAULT_JITTERBUFFER_SIZE; silenceIntervalInProgress = true; this->ssrc = ssrc; this->tracer = tracer; InitializeCriticalSection(&packetQueueMutex); InitializeCriticalSection(&audioStreamMutex); } RTPAudioStream::~RTPAudioStream() { EnterCriticalSection(&audioStreamMutex); SetLastTransmittedPacket(NULL); while (packetQueue.size() > 0) { AudioSample *audioSample = NULL;; PopNextPacket(&audioSample); audioSample->Release(this); audioSample = NULL; } LeaveCriticalSection(&audioStreamMutex); DeleteCriticalSection(&packetQueueMutex); DeleteCriticalSection(&audioStreamMutex); } int RTPAudioStream::SetTraceLevel() { long SystemMask = 0; if ((SystemMask = GetRegKeyLong(HKEY_CURRENT_USER, "Software\\Cisco Systems\\MTC\\Tracing", "AllComponents", 0x0)) == 0) { SystemMask = GetRegKeyLong(HKEY_CURRENT_USER, "Software\\Cisco Systems\\MTC\\Tracing", "RTPJitterBuffer", 0x100000); } tracer.SetSystemMask(SystemMask); return 0; } int RTPAudioStream::SetRestartThreshold(int depthInMillisec) { EnterCriticalSection(&audioStreamMutex); restartThreshold = depthInMillisec; LeaveCriticalSection(&audioStreamMutex); return 0; } int RTPAudioStream::NumQueuedPackets() { EnterCriticalSection(&audioStreamMutex); int numQdPackets = packetQueue.size(); LeaveCriticalSection(&audioStreamMutex); return numQdPackets; } int RTPAudioStream::JitterBufferDepth() { EnterCriticalSection(&audioStreamMutex); int depth = jitterBufferDepth; LeaveCriticalSection(&audioStreamMutex); return depth; } unsigned long RTPAudioStream::SSRC() { EnterCriticalSection(&audioStreamMutex); unsigned long ssrcValue= this->ssrc; LeaveCriticalSection(&audioStreamMutex); return ssrcValue; } int RTPAudioStream::SetLastTransmittedPacket(AudioSample *newPacket) { EnterCriticalSection(&audioStreamMutex); if (newPacket != lastPacketTransmitted) { if (lastPacketTransmitted) { lastPacketTransmitted->Release(this); lastPacketTransmitted = NULL; lastPacketRepeatCount = 0; } lastPacketTransmitted = newPacket; if (newPacket) { newPacket->AddRef(this); } } LeaveCriticalSection(&audioStreamMutex); return 0; } int RTPAudioStream::TimeDifference(unsigned long timestamp1, unsigned long timestamp2) { int result(0); unsigned long timediff = timestamp1 > timestamp2 ? timestamp1 - timestamp2 : timestamp2 - timestamp1; if (timediff < 0x8fffffff) { result = timediff; return timestamp1 > timestamp2 ? -result : result; } else { unsigned long temp = (0xffffffff - timediff) + 1; result = temp; return timestamp1 > timestamp2 ? result : -result; } } int RTPAudioStream::TimeDifference(AudioSample *sample1, AudioSample *sample2) { return TimeDifference(sample1->RTPHeader()->Timestamp(), sample2->RTPHeader()->Timestamp()); } int RTPAudioStream::TimeDifferenceMillisec(AudioSample *sample1, AudioSample *sample2) { int timestampDiff = TimeDifference(sample1, sample2); return (int)(((double)timestampDiff * sample1->MinFrameDuration())/1000.0 + 0.5); } int RTPAudioStream::Clear() { EnterCriticalSection(&audioStreamMutex); while(packetQueue.size() > 0) { AudioSample *audioSample = NULL; PopNextPacket(&audioSample); if (audioSample) { audioSample->Release(this); audioSample = NULL; } } silenceIntervalInProgress = true; SetLastTransmittedPacket(NULL); lastOutputTimestamp = 0; lastOutputDuration = 0; lastPacketRepeatCount = 0; numSIDPackets = 0; jitterBufferDepth = 0; numPackets = 0; numRepeats = 0; dynamicJitterBuffer = GetRegKeyBool(HKEY_CURRENT_USER, "Software\\Cisco Systems\\CCNMediaTerm\\1.0", "UseDynamicJitterBuffer", true); LeaveCriticalSection(&audioStreamMutex); return 0; } /** * AddPacket makes sure that all samples added to the queue have an RTP header */ int RTPAudioStream::AddPacket(AudioSample *newPacket) { RTPPacket *rtpHeader = newPacket->RTPHeader(); if (rtpHeader) { EnterCriticalSection(&audioStreamMutex); if (jitterBufferDepth < restartThreshold * JITTERBUFFER_MAXSIZE_MULTIPLE) { if (rtpHeader->PayloadType() == RTP_PAYLOADTYPE_SID) { numSIDPackets++; } newPacket->AddRef(this); packetQueue.push(newPacket); jitterBufferDepth += newPacket->GetDuration(); tracer.tracef(ARB, "RTPAudioStream:%x : AddPacket : pushed audioSample 0x%x, ssrc %u, seqNo %u\n", this, newPacket, rtpHeader->Ssrc(), rtpHeader->SeqNo()); } LeaveCriticalSection(&audioStreamMutex); } return 0; } int RTPAudioStream::PopNextPacket(AudioSample **pAudioSample) { *pAudioSample = NULL; EnterCriticalSection(&audioStreamMutex); if (packetQueue.size() > 0) { *pAudioSample = packetQueue.top(); packetQueue.pop(); RTPPacket *rtpHeader = (*pAudioSample)->RTPHeader(); tracer.tracef(ARB, "RTPAudioStream:%x : PopNextPacket : popped audioSample 0x%x, ssrc %u, seqNo %u\n", this, *pAudioSample, rtpHeader->Ssrc(), rtpHeader->SeqNo()); if (rtpHeader->PayloadType() == RTP_PAYLOADTYPE_SID) { numSIDPackets--; } else { jitterBufferDepth -= (*pAudioSample)->GetDuration(); } } LeaveCriticalSection(&audioStreamMutex); return 0; } int RTPAudioStream::PopNextPacketForTransmit(AudioSample **pOutSample) { AudioSample *audioSample = NULL; EnterCriticalSection(&audioStreamMutex); PopNextPacket(&audioSample); if (audioSample) { RTPPacket *rtpHeader = audioSample->RTPHeader(); SetLastTransmittedPacket(audioSample); *pOutSample = audioSample; lastOutputTimestamp = rtpHeader->Timestamp(); lastOutputDuration = audioSample->GetDuration(); lastPacketRepeatCount = 1; } LeaveCriticalSection(&audioStreamMutex); return 0; } /* makes sure that only audiosamples with valid rtpheader are added */ int RTPAudioStream::InsertRTPPacket(AudioSample *audioSample) { tracer.tracef(DET, "RTPAudioStream:%x : InsertRTPPacket 0x%x\n", this, audioSample); RTPPacket *newPacketRTPHeader = audioSample->RTPHeader(); if (newPacketRTPHeader) { EnterCriticalSection(&audioStreamMutex); if (lastPacketTransmitted && (TimeDifference(lastPacketTransmitted, audioSample) < 0)) { RTPPacket *lastPacketRTPHeader = lastPacketTransmitted->RTPHeader(); // if we receive a packet that is 25 packets before the last transmitted packet, it means a new stream if ((int)(newPacketRTPHeader->SeqNo()) - ((int)lastPacketRTPHeader->SeqNo()) < -25) { Clear(); AddPacket(audioSample); } } else { AddPacket(audioSample); } LeaveCriticalSection(&audioStreamMutex); } else { tracer.tracef(ERR, "RTPAudioStream:%x : InserRTPPacket : audioSample has no rtpHeader\n", this); } return 0; } int RTPAudioStream::GiveNextRTPPacket(AudioSample **pOutSample) { EnterCriticalSection(&audioStreamMutex); *pOutSample = NULL; if (silenceIntervalInProgress && (numSIDPackets > 0 || jitterBufferDepth >= restartThreshold)) { silenceIntervalInProgress = false; } do { if (silenceIntervalInProgress) { break; } if (dynamicJitterBuffer && numPackets == 99) { tracer.tracef(EE, "RTPAudioStream:%x : GiveNextRTPPacket : numRepeats = %d\n", this, numRepeats); if (numRepeats <= 1) { int newThreshold(restartThreshold); if (jitterBufferDepth <= restartThreshold + 40) { newThreshold = restartThreshold - 20; if (newThreshold < 40) newThreshold = 40; } tracer.tracef(EE, "RTPAudioStream:%x : GiveNextRTPPacket : restartThreshold = %d, newThreshold = %d\n", this, restartThreshold, newThreshold); SetRestartThreshold(newThreshold); tracer.tracef(EE, "RTPAudioStream:%x : GiveNextRTPPacket : jitter buffer depth = %d\n", this, jitterBufferDepth); int durationDeleted(0); while(jitterBufferDepth > restartThreshold) { AudioSample *discardSample = NULL; PopNextPacketForTransmit(&discardSample); if (discardSample) { durationDeleted += discardSample->GetDuration(); discardSample->Release(this); discardSample = NULL; } if (durationDeleted >= 60) { break; } } numRepeats = 0; numPackets = 100; } else { numRepeats = 0; restartThreshold = restartThreshold + 20; numPackets = 100; tracer.tracef(EE, "RTPAudioStream:%x : GiveNextRTPPacket : increased restartThreshold by 20 to %d\n", this, restartThreshold); } } if (packetQueue.size() <= 0) { if (lastPacketTransmitted && lastPacketRepeatCount < JITTERBUFFER_PLC_MAXREPEATS) { lastPacketTransmitted->AddRef(this); lastPacketRepeatCount++; if (lastPacketRepeatCount == 2) { numRepeats++; } lastOutputTimestamp += (lastOutputDuration * 1000) / lastPacketTransmitted->MinFrameDuration(); *pOutSample = lastPacketTransmitted; numPackets = (numPackets + 1) % 100; break; } else { silenceIntervalInProgress = true; SetLastTransmittedPacket(NULL); break; } break; } AudioSample *audioSample = packetQueue.top(); tracer.tracef(DET, "RTPAudioStream:%x : GiveNextRTPPacket : packetQueue.top = audioSample 0x%x\n", this, audioSample); RTPPacket *rtpHeader = audioSample->RTPHeader(); if (rtpHeader->PayloadType() == RTP_PAYLOADTYPE_SID) { PopNextPacket(&audioSample); audioSample->Release(this); audioSample = NULL; silenceIntervalInProgress = true; SetLastTransmittedPacket(NULL); break; } if (!lastPacketTransmitted) { // transmit whichever is the next packet PopNextPacketForTransmit(pOutSample); numPackets = (numPackets + 1) % 100; break; } int timeDifference = TimeDifferenceMillisec(lastPacketTransmitted, audioSample) - lastOutputDuration * lastPacketRepeatCount; if (timeDifference < (int)lastOutputDuration) { PopNextPacketForTransmit(pOutSample); numPackets = (numPackets + 1) % 100; break; } if (lastPacketRepeatCount < JITTERBUFFER_PLC_MAXREPEATS) { lastPacketTransmitted->AddRef(this); lastPacketRepeatCount++; lastOutputTimestamp += (lastOutputDuration * 1000) / lastPacketTransmitted->MinFrameDuration(); *pOutSample = lastPacketTransmitted; numPackets = (numPackets + 1) % 100; break; } silenceIntervalInProgress = true; SetLastTransmittedPacket(NULL); } while(false); LeaveCriticalSection(&audioStreamMutex); return 0; }