#include "StreamingMediaContext.h" #include "AudioStreamingPart.h" #include "VideoStreamingPart.h" #include "absl/types/optional.h" #include "rtc_base/thread.h" #include "rtc_base/time_utils.h" #include "absl/types/variant.h" #include "rtc_base/logging.h" #include "rtc_base/synchronization/mutex.h" #include "common_audio/ring_buffer.h" #include "modules/audio_mixer/frame_combiner.h" #include "modules/audio_processing/agc2/vad_with_level.h" #include "modules/audio_processing/audio_buffer.h" #include "api/video/video_sink_interface.h" #include "audio/utility/audio_frame_operations.h" namespace tgcalls { namespace { struct PendingAudioSegmentData { }; struct PendingVideoSegmentData { int32_t channelId = 0; VideoChannelDescription::Quality quality = VideoChannelDescription::Quality::Thumbnail; PendingVideoSegmentData(int32_t channelId_, VideoChannelDescription::Quality quality_) : channelId(channelId_), quality(quality_) { } }; struct PendingMediaSegmentPartResult { std::vector data; explicit PendingMediaSegmentPartResult(std::vector &&data_) : data(std::move(data_)) { } }; struct PendingMediaSegmentPart { absl::variant typeData; int64_t minRequestTimestamp = 0; std::shared_ptr task; std::shared_ptr result; }; struct PendingMediaSegment { int64_t timestamp = 0; std::vector> parts; }; struct VideoSegment { VideoChannelDescription::Quality quality; std::shared_ptr part; double lastFramePts = -1.0; int _displayedFrames = 0; bool isPlaying = false; std::shared_ptr pendingVideoQualityUpdatePart; }; struct MediaSegment { int64_t timestamp = 0; int64_t duration = 0; std::shared_ptr audio; std::vector> video; }; class SampleRingBuffer { public: SampleRingBuffer(size_t size) { _buffer = WebRtc_CreateBuffer(size, sizeof(int16_t)); } ~SampleRingBuffer() { if (_buffer) { WebRtc_FreeBuffer(_buffer); } } size_t availableForWriting() { return WebRtc_available_write(_buffer); } size_t write(int16_t const *samples, size_t count) { return WebRtc_WriteBuffer(_buffer, samples, count); } size_t read(int16_t *samples, size_t count) { return WebRtc_ReadBuffer(_buffer, nullptr, samples, count); } private: RingBuffer *_buffer = nullptr; }; static const int kVadResultHistoryLength = 8; class VadHistory { private: float _vadResultHistory[kVadResultHistoryLength]; public: VadHistory() { for (int i = 0; i < kVadResultHistoryLength; i++) { _vadResultHistory[i] = 0.0f; } } ~VadHistory() { } bool update(float vadProbability) { for (int i = 1; i < kVadResultHistoryLength; i++) { _vadResultHistory[i - 1] = _vadResultHistory[i]; } _vadResultHistory[kVadResultHistoryLength - 1] = vadProbability; float movingAverage = 0.0f; for (int i = 0; i < kVadResultHistoryLength; i++) { movingAverage += _vadResultHistory[i]; } movingAverage /= (float)kVadResultHistoryLength; bool vadResult = false; if (movingAverage > 0.8f) { vadResult = true; } return vadResult; } }; class CombinedVad { private: std::unique_ptr _vadWithLevel; VadHistory _history; public: CombinedVad() { _vadWithLevel = std::make_unique(500, webrtc::GetAvailableCpuFeatures()); } ~CombinedVad() { } bool update(webrtc::AudioBuffer *buffer) { if (buffer->num_channels() <= 0) { return _history.update(0.0f); } webrtc::AudioFrameView frameView(buffer->channels(), buffer->num_channels(), buffer->num_frames()); float peak = 0.0f; for (const auto &x : frameView.channel(0)) { peak = std::max(std::fabs(x), peak); } if (peak <= 0.01f) { return _history.update(false); } auto result = _vadWithLevel->AnalyzeFrame(frameView); return _history.update(result.speech_probability); } bool update() { return _history.update(0.0f); } }; class SparseVad { public: SparseVad() { } std::pair update(webrtc::AudioBuffer *buffer) { _sampleCount += buffer->num_frames(); if (_sampleCount >= 400) { _sampleCount = 0; _currentValue = _vad.update(buffer); } float currentPeak = 0.0; float *samples = buffer->channels()[0]; for (int i = 0; i < buffer->num_frames(); i++) { float sample = samples[i]; if (sample < 0.0f) { sample = -sample; } if (_peak < sample) { _peak = sample; } if (currentPeak < sample) { currentPeak = sample; } _peakCount += 1; } if (_peakCount >= 4400) { float norm = 8000.0f; _currentLevel = ((float)(_peak)) / norm; _peak = 0; _peakCount = 0; } return std::make_pair(_currentLevel, _currentValue); } private: CombinedVad _vad; bool _currentValue = false; size_t _sampleCount = 0; int _peakCount = 0; float _peak = 0.0; float _currentLevel = 0.0; }; } class StreamingMediaContextPrivate : public std::enable_shared_from_this { public: StreamingMediaContextPrivate(StreamingMediaContext::StreamingMediaContextArguments &&arguments) : _threads(arguments.threads), _requestCurrentTime(arguments.requestCurrentTime), _requestAudioBroadcastPart(arguments.requestAudioBroadcastPart), _requestVideoBroadcastPart(arguments.requestVideoBroadcastPart), _updateAudioLevel(arguments.updateAudioLevel), _audioRingBuffer(_audioDataRingBufferMaxSize), _audioFrameCombiner(false), _platformContext(arguments.platformContext) { } ~StreamingMediaContextPrivate() { } void start() { beginRenderTimer(0); } void beginRenderTimer(int timeoutMs) { const auto weak = std::weak_ptr(shared_from_this()); _threads->getMediaThread()->PostDelayedTask(RTC_FROM_HERE, [weak]() { auto strong = weak.lock(); if (!strong) { return; } strong->render(); strong->beginRenderTimer((int)(1.0 * 1000.0 / 120.0)); }, timeoutMs); } void render() { int64_t absoluteTimestamp = rtc::TimeMillis(); while (true) { if (_waitForBufferredMillisecondsBeforeRendering) { if (getAvailableBufferDuration() < _waitForBufferredMillisecondsBeforeRendering.value()) { break; } else { _waitForBufferredMillisecondsBeforeRendering = absl::nullopt; } } if (_availableSegments.empty()) { _playbackReferenceTimestamp = 0; _waitForBufferredMillisecondsBeforeRendering = _segmentBufferDuration + _segmentDuration; break; } if (_playbackReferenceTimestamp == 0) { _playbackReferenceTimestamp = absoluteTimestamp; } double relativeTimestamp = ((double)(absoluteTimestamp - _playbackReferenceTimestamp)) / 1000.0; auto segment = _availableSegments[0]; double segmentDuration = ((double)segment->duration) / 1000.0; for (auto &videoSegment : segment->video) { videoSegment->isPlaying = true; cancelPendingVideoQualityUpdate(videoSegment); auto frame = videoSegment->part->getFrameAtRelativeTimestamp(relativeTimestamp); if (frame) { if (videoSegment->lastFramePts != frame->pts) { videoSegment->lastFramePts = frame->pts; videoSegment->_displayedFrames += 1; auto sinkList = _videoSinks.find(frame->endpointId); if (sinkList != _videoSinks.end()) { for (const auto &weakSink : sinkList->second) { auto sink = weakSink.lock(); if (sink) { sink->OnFrame(frame->frame); } } } } } } if (segment->audio) { const auto available = [&] { _audioDataMutex.Lock(); const auto result = (_audioRingBuffer.availableForWriting() >= 480); _audioDataMutex.Unlock(); return result; }; while (available()) { auto audioChannels = segment->audio->get10msPerChannel(); if (audioChannels.empty()) { break; } std::vector audioFrames; for (const auto &audioChannel : audioChannels) { webrtc::AudioFrame *frame = new webrtc::AudioFrame(); frame->UpdateFrame(0, audioChannel.pcmData.data(), audioChannel.pcmData.size(), 48000, webrtc::AudioFrame::SpeechType::kNormalSpeech, webrtc::AudioFrame::VADActivity::kVadActive); auto volumeIt = _volumeBySsrc.find(audioChannel.ssrc); if (volumeIt != _volumeBySsrc.end()) { double outputGain = volumeIt->second; if (outputGain < 0.99f || outputGain > 1.01f) { webrtc::AudioFrameOperations::ScaleWithSat(outputGain, frame); } } audioFrames.push_back(frame); processAudioLevel(audioChannel.ssrc, audioChannel.pcmData); } webrtc::AudioFrame frameOut; _audioFrameCombiner.Combine(audioFrames, 1, 48000, audioFrames.size(), &frameOut); for (webrtc::AudioFrame *frame : audioFrames) { delete frame; } _audioDataMutex.Lock(); _audioRingBuffer.write(frameOut.data(), frameOut.samples_per_channel()); _audioDataMutex.Unlock(); } } if (relativeTimestamp >= segmentDuration) { _playbackReferenceTimestamp += segment->duration; if (segment->audio && segment->audio->getRemainingMilliseconds() > 0) { RTC_LOG(LS_INFO) << "render: discarding " << segment->audio->getRemainingMilliseconds() << " ms of audio at the end of a segment"; } if (!segment->video.empty()) { if (segment->video[0]->part->getActiveEndpointId()) { RTC_LOG(LS_INFO) << "render: discarding video frames at the end of a segment (displayed " << segment->video[0]->_displayedFrames << " frames)"; } } _availableSegments.erase(_availableSegments.begin()); } break; } requestSegmentsIfNeeded(); checkPendingSegments(); } void processAudioLevel(uint32_t ssrc, std::vector const &samples) { if (!_updateAudioLevel) { return; } webrtc::AudioBuffer buffer(48000, 1, 48000, 1, 48000, 1); webrtc::StreamConfig config(48000, 1); buffer.CopyFrom(samples.data(), config); std::pair vadResult = std::make_pair(0.0f, false); auto vad = _audioVadMap.find(ssrc); if (vad == _audioVadMap.end()) { auto newVad = std::make_unique(); vadResult = newVad->update(&buffer); _audioVadMap.insert(std::make_pair(ssrc, std::move(newVad))); } else { vadResult = vad->second->update(&buffer); } _updateAudioLevel(ssrc, vadResult.first, vadResult.second); } void getAudio(int16_t *audio_samples, const size_t num_samples, const size_t num_channels, const uint32_t samples_per_sec) { int16_t *buffer = nullptr; if (num_channels == 1) { buffer = audio_samples; } else { if (_tempAudioBuffer.size() < num_samples) { _tempAudioBuffer.resize(num_samples); } buffer = _tempAudioBuffer.data(); } _audioDataMutex.Lock(); size_t readSamples = _audioRingBuffer.read(buffer, num_samples); _audioDataMutex.Unlock(); if (num_channels != 1) { for (size_t sampleIndex = 0; sampleIndex < readSamples; sampleIndex++) { for (size_t channelIndex = 0; channelIndex < num_channels; channelIndex++) { audio_samples[sampleIndex * num_channels + channelIndex] = _tempAudioBuffer[sampleIndex]; } } } if (readSamples < num_samples) { memset(audio_samples + readSamples * num_channels, 0, (num_samples - readSamples) * num_channels * sizeof(int16_t)); } } int64_t getAvailableBufferDuration() { int64_t result = 0; for (const auto &segment : _availableSegments) { result += segment->duration; } return (int)result; } void discardAllPendingSegments() { for (size_t i = 0; i < _pendingSegments.size(); i++) { for (const auto &it : _pendingSegments[i]->parts) { if (it->task) { it->task->cancel(); } } } _pendingSegments.clear(); } void requestSegmentsIfNeeded() { while (true) { if (_nextSegmentTimestamp == 0) { if (_pendingSegments.size() >= 1) { break; } } else { int64_t availableAndRequestedSegmentsDuration = 0; availableAndRequestedSegmentsDuration += getAvailableBufferDuration(); availableAndRequestedSegmentsDuration += _pendingSegments.size() * _segmentDuration; if (availableAndRequestedSegmentsDuration > _segmentBufferDuration) { break; } } auto pendingSegment = std::make_shared(); pendingSegment->timestamp = _nextSegmentTimestamp; if (_nextSegmentTimestamp != 0) { _nextSegmentTimestamp += _segmentDuration; } auto audio = std::make_shared(); audio->typeData = PendingAudioSegmentData(); audio->minRequestTimestamp = 0; pendingSegment->parts.push_back(audio); for (const auto &videoChannel : _activeVideoChannels) { auto channelIdIt = _currentEndpointMapping.find(videoChannel.endpoint); if (channelIdIt == _currentEndpointMapping.end()) { continue; } int32_t channelId = channelIdIt->second + 1; auto video = std::make_shared(); video->typeData = PendingVideoSegmentData(channelId, videoChannel.quality); video->minRequestTimestamp = 0; pendingSegment->parts.push_back(video); } _pendingSegments.push_back(pendingSegment); if (_nextSegmentTimestamp == 0) { break; } } } void requestPendingVideoQualityUpdate(std::shared_ptr segment, int64_t timestamp) { if (segment->isPlaying) { return; } auto segmentEndpointId = segment->part->getActiveEndpointId(); if (!segmentEndpointId) { return; } absl::optional updatedChannelId; absl::optional updatedQuality; for (const auto &videoChannel : _activeVideoChannels) { auto channelIdIt = _currentEndpointMapping.find(videoChannel.endpoint); if (channelIdIt == _currentEndpointMapping.end()) { continue; } updatedChannelId = channelIdIt->second + 1; updatedQuality = videoChannel.quality; } if (updatedChannelId && updatedQuality) { if (segment->pendingVideoQualityUpdatePart) { const auto typeData = &segment->pendingVideoQualityUpdatePart->typeData; if (const auto videoData = absl::get_if(typeData)) { if (videoData->channelId == updatedChannelId.value() && videoData->quality == updatedQuality.value()) { return; } } cancelPendingVideoQualityUpdate(segment); } auto video = std::make_shared(); video->typeData = PendingVideoSegmentData(updatedChannelId.value(), updatedQuality.value()); video->minRequestTimestamp = 0; segment->pendingVideoQualityUpdatePart = video; const auto weak = std::weak_ptr(shared_from_this()); const auto weakSegment = std::weak_ptr(segment); beginPartTask(video, timestamp, [weak, weakSegment]() { auto strong = weak.lock(); if (!strong) { return; } auto strongSegment = weakSegment.lock(); if (!strongSegment) { return; } if (!strongSegment->pendingVideoQualityUpdatePart) { return; } auto result = strongSegment->pendingVideoQualityUpdatePart->result; if (result) { strongSegment->part = std::make_shared(std::move(result->data)); } strongSegment->pendingVideoQualityUpdatePart.reset(); }); } } void cancelPendingVideoQualityUpdate(std::shared_ptr segment) { if (!segment->pendingVideoQualityUpdatePart) { return; } if (segment->pendingVideoQualityUpdatePart->task) { segment->pendingVideoQualityUpdatePart->task->cancel(); } segment->pendingVideoQualityUpdatePart.reset(); } void checkPendingSegments() { const auto weak = std::weak_ptr(shared_from_this()); int64_t absoluteTimestamp = rtc::TimeMillis(); int64_t minDelayedRequestTimeout = INT_MAX; bool shouldRequestMoreSegments = false; for (int i = 0; i < _pendingSegments.size(); i++) { auto pendingSegment = _pendingSegments[i]; auto segmentTimestamp = pendingSegment->timestamp; bool allPartsDone = true; for (auto &part : pendingSegment->parts) { if (!part->result) { allPartsDone = false; } if (!part->result && !part->task) { if (part->minRequestTimestamp != 0) { if (i != 0) { continue; } if (part->minRequestTimestamp > absoluteTimestamp) { minDelayedRequestTimeout = std::min(minDelayedRequestTimeout, part->minRequestTimestamp - absoluteTimestamp); continue; } } const auto weakSegment = std::weak_ptr(pendingSegment); const auto weakPart = std::weak_ptr(part); std::function handleResult = [weak, weakSegment, weakPart, threads = _threads, segmentTimestamp](BroadcastPart &&part) { threads->getMediaThread()->PostTask(RTC_FROM_HERE, [weak, weakSegment, weakPart, part = std::move(part), segmentTimestamp]() mutable { auto strong = weak.lock(); if (!strong) { return; } auto strongSegment = weakSegment.lock(); if (!strongSegment) { return; } auto pendingPart = weakPart.lock(); if (!pendingPart) { return; } pendingPart->task.reset(); switch (part.status) { case BroadcastPart::Status::Success: { pendingPart->result = std::make_shared(std::move(part.data)); if (strong->_nextSegmentTimestamp == 0) { strong->_nextSegmentTimestamp = part.timestampMilliseconds + strong->_segmentDuration; } strong->checkPendingSegments(); break; } case BroadcastPart::Status::NotReady: { if (segmentTimestamp == 0) { int64_t responseTimestampMilliseconds = (int64_t)(part.responseTimestamp * 1000.0); int64_t responseTimestampBoundary = (responseTimestampMilliseconds / strong->_segmentDuration) * strong->_segmentDuration; strong->_nextSegmentTimestamp = responseTimestampBoundary; strong->discardAllPendingSegments(); strong->requestSegmentsIfNeeded(); strong->checkPendingSegments(); } else { pendingPart->minRequestTimestamp = rtc::TimeMillis() + 100; strong->checkPendingSegments(); } break; } case BroadcastPart::Status::ResyncNeeded: { int64_t responseTimestampMilliseconds = (int64_t)(part.responseTimestamp * 1000.0); int64_t responseTimestampBoundary = (responseTimestampMilliseconds / strong->_segmentDuration) * strong->_segmentDuration; strong->_nextSegmentTimestamp = responseTimestampBoundary; strong->discardAllPendingSegments(); strong->requestSegmentsIfNeeded(); strong->checkPendingSegments(); break; } default: { RTC_FATAL() << "Unknown part.status"; break; } } }); }; const auto typeData = &part->typeData; if (const auto audioData = absl::get_if(typeData)) { part->task = _requestAudioBroadcastPart(_platformContext, segmentTimestamp, _segmentDuration, handleResult); } else if (const auto videoData = absl::get_if(typeData)) { part->task = _requestVideoBroadcastPart(_platformContext, segmentTimestamp, _segmentDuration, videoData->channelId, videoData->quality, handleResult); } } } if (allPartsDone && i == 0) { std::shared_ptr segment = std::make_shared(); segment->timestamp = pendingSegment->timestamp; segment->duration = _segmentDuration; for (auto &part : pendingSegment->parts) { const auto typeData = &part->typeData; if (const auto audioData = absl::get_if(typeData)) { segment->audio = std::make_shared(std::move(part->result->data)); _currentEndpointMapping = segment->audio->getEndpointMapping(); } else if (const auto videoData = absl::get_if(typeData)) { auto videoSegment = std::make_shared(); videoSegment->quality = videoData->quality; if (part->result->data.empty()) { RTC_LOG(LS_INFO) << "Video part " << segment->timestamp << " is empty"; } videoSegment->part = std::make_shared(std::move(part->result->data)); segment->video.push_back(videoSegment); } } _availableSegments.push_back(segment); shouldRequestMoreSegments = true; _pendingSegments.erase(_pendingSegments.begin() + i); i--; } } if (minDelayedRequestTimeout < INT32_MAX) { const auto weak = std::weak_ptr(shared_from_this()); _threads->getMediaThread()->PostDelayedTask(RTC_FROM_HERE, [weak]() { auto strong = weak.lock(); if (!strong) { return; } strong->checkPendingSegments(); }, std::max((int32_t)minDelayedRequestTimeout, 10)); } if (shouldRequestMoreSegments) { requestSegmentsIfNeeded(); } } void beginPartTask(std::shared_ptr part, int64_t segmentTimestamp, std::function completion) { const auto weak = std::weak_ptr(shared_from_this()); const auto weakPart = std::weak_ptr(part); std::function handleResult = [weak, weakPart, threads = _threads, completion](BroadcastPart &&part) { threads->getMediaThread()->PostTask(RTC_FROM_HERE, [weak, weakPart, part = std::move(part), completion]() mutable { auto strong = weak.lock(); if (!strong) { return; } auto pendingPart = weakPart.lock(); if (!pendingPart) { return; } pendingPart->task.reset(); switch (part.status) { case BroadcastPart::Status::Success: { pendingPart->result = std::make_shared(std::move(part.data)); break; } case BroadcastPart::Status::NotReady: { break; } case BroadcastPart::Status::ResyncNeeded: { break; } default: { RTC_FATAL() << "Unknown part.status"; break; } } completion(); }); }; const auto typeData = &part->typeData; if (const auto audioData = absl::get_if(typeData)) { part->task = _requestAudioBroadcastPart(_platformContext, segmentTimestamp, _segmentDuration, handleResult); } else if (const auto videoData = absl::get_if(typeData)) { part->task = _requestVideoBroadcastPart(_platformContext, segmentTimestamp, _segmentDuration, videoData->channelId, videoData->quality, handleResult); } } void setVolume(uint32_t ssrc, double volume) { _volumeBySsrc[ssrc] = volume; } void setActiveVideoChannels(std::vector const &videoChannels) { _activeVideoChannels = videoChannels; /*#if DEBUG for (auto &updatedVideoChannel : _activeVideoChannels) { if (updatedVideoChannel.quality == VideoChannelDescription::Quality::Medium) { updatedVideoChannel.quality = VideoChannelDescription::Quality::Thumbnail; } } #endif*/ for (const auto &updatedVideoChannel : _activeVideoChannels) { for (const auto &segment : _availableSegments) { for (const auto &video : segment->video) { if (video->part->getActiveEndpointId() == updatedVideoChannel.endpoint) { if (video->quality != updatedVideoChannel.quality) { requestPendingVideoQualityUpdate(video, segment->timestamp); } } } } } } void addVideoSink(std::string const &endpointId, std::weak_ptr> sink) { auto it = _videoSinks.find(endpointId); if (it == _videoSinks.end()) { _videoSinks.insert(std::make_pair(endpointId, std::vector>>())); } _videoSinks[endpointId].push_back(sink); } private: std::shared_ptr _threads; std::function(std::function)> _requestCurrentTime; std::function(std::shared_ptr, int64_t, int64_t, std::function)> _requestAudioBroadcastPart; std::function(std::shared_ptr, int64_t, int64_t, int32_t, VideoChannelDescription::Quality, std::function)> _requestVideoBroadcastPart; std::function _updateAudioLevel; const int _segmentDuration = 1000; const int _segmentBufferDuration = 2000; int64_t _nextSegmentTimestamp = 0; absl::optional _waitForBufferredMillisecondsBeforeRendering; std::vector> _availableSegments; std::vector> _pendingSegments; int64_t _playbackReferenceTimestamp = 0; const size_t _audioDataRingBufferMaxSize = 4800; webrtc::Mutex _audioDataMutex; SampleRingBuffer _audioRingBuffer; std::vector _tempAudioBuffer; webrtc::FrameCombiner _audioFrameCombiner; std::map> _audioVadMap; std::map _volumeBySsrc; std::vector _activeVideoChannels; std::map>>> _videoSinks; std::map _currentEndpointMapping; std::shared_ptr _platformContext; }; StreamingMediaContext::StreamingMediaContext(StreamingMediaContextArguments &&arguments) { _private = std::make_shared(std::move(arguments)); _private->start(); } StreamingMediaContext::~StreamingMediaContext() { } void StreamingMediaContext::setActiveVideoChannels(std::vector const &videoChannels) { _private->setActiveVideoChannels(videoChannels); } void StreamingMediaContext::setVolume(uint32_t ssrc, double volume) { _private->setVolume(ssrc, volume); } void StreamingMediaContext::addVideoSink(std::string const &endpointId, std::weak_ptr> sink) { _private->addVideoSink(endpointId, sink); } void StreamingMediaContext::getAudio(int16_t *audio_samples, const size_t num_samples, const size_t num_channels, const uint32_t samples_per_sec) { _private->getAudio(audio_samples, num_samples, num_channels, samples_per_sec); } }