Multi-threaded audio streams

Under MULTITHREADED_AUDIO define.
This commit is contained in:
erorcun 2021-06-25 05:06:38 +03:00
parent 71f28c8cf5
commit ab73c2f539
4 changed files with 559 additions and 95 deletions

View File

@ -1,8 +1,6 @@
#include "common.h" #include "common.h"
#ifdef AUDIO_OAL #ifdef AUDIO_OAL
#include "stream.h"
#include "sampman.h"
#if defined _MSC_VER && !defined RE3_NO_AUTOLINK #if defined _MSC_VER && !defined RE3_NO_AUTOLINK
#ifdef AUDIO_OAL_USE_SNDFILE #ifdef AUDIO_OAL_USE_SNDFILE
@ -22,6 +20,28 @@
#include <opusfile.h> #include <opusfile.h>
#endif #endif
#include <queue>
#include <utility>
#ifdef MULTITHREADED_AUDIO
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include "MusicManager.h"
#include "stream.h"
std::thread gAudioThread;
std::mutex gAudioThreadQueueMutex;
std::condition_variable gAudioThreadCv;
bool gAudioThreadTerm = false;
std::queue<CStream*> gStreamsToProcess; // values are not unique, we will handle that ourself
#else
#include "stream.h"
#endif
#include "sampman.h"
#ifndef _WIN32 #ifndef _WIN32
#include "crossplatform.h" #include "crossplatform.h"
#endif #endif
@ -39,6 +59,10 @@ class CSortStereoBuffer
{ {
uint16* PcmBuf; uint16* PcmBuf;
size_t BufSize; size_t BufSize;
//#ifdef MULTITHREADED_AUDIO
// std::mutex Mutex;
//#endif
public: public:
CSortStereoBuffer() : PcmBuf(nil), BufSize(0) {} CSortStereoBuffer() : PcmBuf(nil), BufSize(0) {}
~CSortStereoBuffer() ~CSortStereoBuffer()
@ -65,6 +89,9 @@ public:
void SortStereo(void* buf, size_t size) void SortStereo(void* buf, size_t size)
{ {
//#ifdef MULTITHREADED_AUDIO
// std::lock_guard<std::mutex> lock(Mutex);
//#endif
uint16* InBuf = (uint16*)buf; uint16* InBuf = (uint16*)buf;
uint16* OutBuf = GetBuffer(size); uint16* OutBuf = GetBuffer(size);
@ -279,6 +306,10 @@ public:
#undef CLOSE_ON_ERROR #undef CLOSE_ON_ERROR
} }
void FileOpen()
{
}
~CWavFile() ~CWavFile()
{ {
Close(); Close();
@ -289,6 +320,7 @@ public:
return m_bIsOpen; return m_bIsOpen;
} }
uint32 GetSampleSize() uint32 GetSampleSize()
{ {
return sizeof(uint16); return sizeof(uint16);
@ -405,6 +437,10 @@ public:
m_pfSound = sf_open(path, SFM_READ, &m_soundInfo); m_pfSound = sf_open(path, SFM_READ, &m_soundInfo);
} }
void FileOpen()
{
}
~CSndFile() ~CSndFile()
{ {
if ( m_pfSound ) if ( m_pfSound )
@ -464,8 +500,6 @@ public:
#endif #endif
#ifdef AUDIO_OAL_USE_MPG123 #ifdef AUDIO_OAL_USE_MPG123
// fuzzy seek eliminates stutter when playing ADF but spams errors a lot (nothing breaks though)
#define MP3_USE_FUZZY_SEEK
class CMP3File : public IDecoder class CMP3File : public IDecoder
{ {
@ -473,37 +507,51 @@ class CMP3File : public IDecoder
bool m_bOpened; bool m_bOpened;
uint32 m_nRate; uint32 m_nRate;
uint32 m_nChannels; uint32 m_nChannels;
const char* m_pPath;
bool m_bFileNotOpenedYet;
public: public:
CMP3File(const char *path) : CMP3File(const char *path) :
m_pMH(nil), m_pMH(nil),
m_bOpened(false), m_bOpened(false),
m_nRate(0), m_nRate(0),
m_nChannels(0) m_nChannels(0),
m_pPath(path),
m_bFileNotOpenedYet(false)
{ {
m_pMH = mpg123_new(nil, nil); m_pMH = mpg123_new(nil, nil);
if ( m_pMH ) if ( m_pMH )
{ {
#ifdef MP3_USE_FUZZY_SEEK mpg123_param(m_pMH, MPG123_FLAGS, MPG123_SEEKBUFFER | MPG123_GAPLESS, 0.0);
mpg123_param(m_pMH, MPG123_FLAGS, MPG123_FUZZY | MPG123_SEEKBUFFER | MPG123_GAPLESS | MPG123_QUIET, 0.0);
#endif
long rate = 0;
int channels = 0;
int encoding = 0;
m_bOpened = mpg123_open(m_pMH, path) == MPG123_OK
&& mpg123_getformat(m_pMH, &rate, &channels, &encoding) == MPG123_OK;
m_nRate = rate; m_bOpened = true;
m_nChannels = channels; m_bFileNotOpenedYet = true;
// It's possible to move this to audioFileOpsThread(), but effect isn't noticable + probably not compatible with our current cutscene audio handling
if ( IsOpened() ) #if 1
{ FileOpen();
mpg123_format_none(m_pMH); #endif
mpg123_format(m_pMH, rate, channels, encoding);
}
} }
} }
void FileOpen()
{
if(!m_bFileNotOpenedYet) return;
long rate = 0;
int channels = 0;
int encoding = 0;
m_bOpened = mpg123_open(m_pMH, m_pPath) == MPG123_OK
&& mpg123_getformat(m_pMH, &rate, &channels, &encoding) == MPG123_OK;
m_nRate = rate;
m_nChannels = channels;
if(IsOpened()) {
mpg123_format_none(m_pMH);
mpg123_format(m_pMH, rate, channels, encoding);
}
m_bFileNotOpenedYet = false;
}
~CMP3File() ~CMP3File()
{ {
if ( m_pMH ) if ( m_pMH )
@ -526,7 +574,7 @@ public:
uint32 GetSampleCount() uint32 GetSampleCount()
{ {
if ( !IsOpened() ) return 0; if ( !IsOpened() || m_bFileNotOpenedYet ) return 0;
return mpg123_length(m_pMH); return mpg123_length(m_pMH);
} }
@ -542,19 +590,19 @@ public:
void Seek(uint32 milliseconds) void Seek(uint32 milliseconds)
{ {
if ( !IsOpened() ) return; if ( !IsOpened() || m_bFileNotOpenedYet ) return;
mpg123_seek(m_pMH, ms2samples(milliseconds), SEEK_SET); mpg123_seek(m_pMH, ms2samples(milliseconds), SEEK_SET);
} }
uint32 Tell() uint32 Tell()
{ {
if ( !IsOpened() ) return 0; if ( !IsOpened() || m_bFileNotOpenedYet ) return 0;
return samples2ms(mpg123_tell(m_pMH)); return samples2ms(mpg123_tell(m_pMH));
} }
uint32 Decode(void *buffer) uint32 Decode(void *buffer)
{ {
if ( !IsOpened() ) return 0; if ( !IsOpened() || m_bFileNotOpenedYet ) return 0;
size_t size; size_t size;
int err = mpg123_read(m_pMH, (unsigned char *)buffer, GetBufferSize(), &size); int err = mpg123_read(m_pMH, (unsigned char *)buffer, GetBufferSize(), &size);
@ -685,6 +733,10 @@ public:
m_ppVagBuffers[i] = new uint8[VB_BLOCK_SIZE]; m_ppVagBuffers[i] = new uint8[VB_BLOCK_SIZE];
} }
void FileOpen()
{
}
~CVbFile() ~CVbFile()
{ {
if (m_pFile) if (m_pFile)
@ -837,6 +889,10 @@ public:
m_bOpened = true; m_bOpened = true;
} }
} }
void FileOpen()
{
}
~COpusFile() ~COpusFile()
{ {
@ -902,11 +958,183 @@ public:
}; };
#endif #endif
// For multi-thread: Someone always acquire stream's mutex before entering here
void
CStream::BuffersShouldBeFilled()
{
#ifdef MULTITHREADED_AUDIO
if (MusicManager.m_nMusicMode != MUSICMODE_CUTSCENE) {
std::queue<std::pair<ALuint, ALuint>> tempQueue;
for(int i = 0; i < NUM_STREAMBUFFERS / 2; i++) {
tempQueue.push(std::pair<ALuint, ALuint>(m_alBuffers[i * 2], m_alBuffers[i * 2 + 1]));
}
m_fillBuffers.swap(tempQueue);
FlagAsToBeProcessed();
m_bActive = true; // to allow Update() to queue the filled buffers & play
return;
}
std::queue<std::pair<ALuint, ALuint>>().swap(m_fillBuffers);
#endif
if ( FillBuffers() != 0 )
{
SetPlay(true);
}
}
// returns whether it's queued (not on multi-thread)
bool
CStream::BufferShouldBeFilledAndQueued(std::pair<ALuint, ALuint>* bufs)
{
#ifdef MULTITHREADED_AUDIO
if (MusicManager.m_nMusicMode != MUSICMODE_CUTSCENE)
m_fillBuffers.push(*bufs);
else
#endif
{
ALuint alBuffers[2] = {(*bufs).first, (*bufs).second}; // left - right
if (FillBuffer(alBuffers)) {
alSourceQueueBuffers(m_pAlSources[0], 1, &alBuffers[0]);
alSourceQueueBuffers(m_pAlSources[1], 1, &alBuffers[1]);
return true;
}
}
return false;
}
#ifdef MULTITHREADED_AUDIO
void
CStream::FlagAsToBeProcessed(bool close)
{
if (!close && MusicManager.m_nMusicMode == MUSICMODE_CUTSCENE)
return;
gAudioThreadQueueMutex.lock();
gStreamsToProcess.push(this);
gAudioThreadQueueMutex.unlock();
gAudioThreadCv.notify_one();
}
extern CStream *aStream[];
void audioFileOpsThread()
{
std::queue<CStream*> m_streamsToDelete;
do
{
CStream *stream;
{
// Just a semaphore
std::unique_lock<std::mutex> queueMutex(gAudioThreadQueueMutex);
gAudioThreadCv.wait(queueMutex, [m_streamsToDelete] { return gStreamsToProcess.size() > 0 || m_streamsToDelete.size() > 0 || gAudioThreadTerm; });
if (gAudioThreadTerm)
return;
if (!gStreamsToProcess.empty()) {
stream = gStreamsToProcess.front();
gStreamsToProcess.pop();
} else {
// End of streams. Perform deleting streams
while(!m_streamsToDelete.empty()) {
CStream *stream = m_streamsToDelete.front();
m_streamsToDelete.pop();
if (stream->m_pSoundFile) {
delete stream->m_pSoundFile;
stream->m_pSoundFile = nil;
}
if (stream->m_pBuffer) {
free(stream->m_pBuffer);
stream->m_pBuffer = nil;
}
delete stream;
}
continue;
}
}
std::unique_lock<std::mutex> lock(stream->m_mutex);
std::pair<ALuint, ALuint> buffers, *lastBufAddr;
bool insertBufsAfterCheck = false;
do {
if (stream->m_nDeleteMe == 1) {
m_streamsToDelete.push(stream);
stream->m_nDeleteMe = 2;
break;
} else if (stream->m_nDeleteMe == 2) {
break;
}
if (!stream->IsOpened())
break;
if (stream->m_bReset)
break;
// We gave up this idea for now
/*
stream->m_pSoundFile->FileOpen();
// Deffered allocation, do it now
if (stream->m_pBuffer == nil) {
stream->m_pBuffer = malloc(stream->m_pSoundFile->GetBufferSize());
ASSERT(stream->m_pBuffer != nil);
}
*/
if (stream->m_bDoSeek) {
stream->m_bDoSeek = false;
int pos = stream->m_SeekPos;
lock.unlock();
stream->m_pSoundFile->Seek(pos);
lock.lock();
continue; // let's do the checks again, make sure we didn't miss anything while Seeking
}
if (insertBufsAfterCheck) {
stream->m_queueBuffers.push(buffers);
insertBufsAfterCheck = false;
}
if (!stream->m_fillBuffers.empty()) {
lastBufAddr = &stream->m_fillBuffers.front();
buffers = *lastBufAddr;
lock.unlock();
ALuint alBuffers[2] = {buffers.first, buffers.second}; // left - right
bool filled = stream->FillBuffer(alBuffers);
lock.lock();
// Make sure queue isn't touched after we released mutex
if (!stream->m_fillBuffers.empty() && lastBufAddr == &stream->m_fillBuffers.front()) {
stream->m_fillBuffers.pop();
if (filled)
insertBufsAfterCheck = true; // Also make sure stream's properties aren't changed. So make one more pass, and push it to m_queueBuffers only if it pass checks again.
}
} else
break;
} while (true);
} while(true);
}
#endif
void CStream::Initialise() void CStream::Initialise()
{ {
#ifdef AUDIO_OAL_USE_MPG123 #ifdef AUDIO_OAL_USE_MPG123
mpg123_init(); mpg123_init();
#endif #endif
#ifdef MULTITHREADED_AUDIO
gAudioThread = std::thread(audioFileOpsThread);
#endif
} }
void CStream::Terminate() void CStream::Terminate()
@ -914,6 +1142,14 @@ void CStream::Terminate()
#ifdef AUDIO_OAL_USE_MPG123 #ifdef AUDIO_OAL_USE_MPG123
mpg123_exit(); mpg123_exit();
#endif #endif
#ifdef MULTITHREADED_AUDIO
gAudioThreadQueueMutex.lock();
gAudioThreadTerm = true;
gAudioThreadQueueMutex.unlock();
gAudioThreadCv.notify_one();
gAudioThread.join();
#endif
} }
CStream::CStream(char *filename, ALuint *sources, ALuint (&buffers)[NUM_STREAMBUFFERS], uint32 overrideSampleRate) : CStream::CStream(char *filename, ALuint *sources, ALuint (&buffers)[NUM_STREAMBUFFERS], uint32 overrideSampleRate) :
@ -922,6 +1158,11 @@ CStream::CStream(char *filename, ALuint *sources, ALuint (&buffers)[NUM_STREAMBU
m_pBuffer(nil), m_pBuffer(nil),
m_bPaused(false), m_bPaused(false),
m_bActive(false), m_bActive(false),
#ifdef MULTITHREADED_AUDIO
m_nDeleteMe(false),
m_bDoSeek(false),
m_SeekPos(0),
#endif
m_pSoundFile(nil), m_pSoundFile(nil),
m_bReset(false), m_bReset(false),
m_nVolume(0), m_nVolume(0),
@ -966,42 +1207,57 @@ CStream::CStream(char *filename, ALuint *sources, ALuint (&buffers)[NUM_STREAMBU
if ( IsOpened() ) if ( IsOpened() )
{ {
m_pBuffer = malloc(m_pSoundFile->GetBufferSize()); uint32 bufSize = m_pSoundFile->GetBufferSize();
ASSERT(m_pBuffer!=nil); if(bufSize != 0) { // Otherwise it's deferred
m_pBuffer = malloc(bufSize);
DEV("AvgSamplesPerSec: %d\n", m_pSoundFile->GetAvgSamplesPerSec()); ASSERT(m_pBuffer != nil);
DEV("SampleCount: %d\n", m_pSoundFile->GetSampleCount());
DEV("SampleRate: %d\n", m_pSoundFile->GetSampleRate()); DEV("AvgSamplesPerSec: %d\n", m_pSoundFile->GetAvgSamplesPerSec());
DEV("Channels: %d\n", m_pSoundFile->GetChannels()); DEV("SampleCount: %d\n", m_pSoundFile->GetSampleCount());
DEV("Buffer Samples: %d\n", m_pSoundFile->GetBufferSamples()); DEV("SampleRate: %d\n", m_pSoundFile->GetSampleRate());
DEV("Buffer sec: %f\n", (float(m_pSoundFile->GetBufferSamples()) / float(m_pSoundFile->GetChannels())/ float(m_pSoundFile->GetSampleRate()))); DEV("Channels: %d\n", m_pSoundFile->GetChannels());
DEV("Length MS: %02d:%02d\n", (m_pSoundFile->GetLength() / 1000) / 60, (m_pSoundFile->GetLength() / 1000) % 60); DEV("Buffer Samples: %d\n", m_pSoundFile->GetBufferSamples());
DEV("Buffer sec: %f\n", (float(m_pSoundFile->GetBufferSamples()) / float(m_pSoundFile->GetChannels())/ float(m_pSoundFile->GetSampleRate())));
DEV("Length MS: %02d:%02d\n", (m_pSoundFile->GetLength() / 1000) / 60, (m_pSoundFile->GetLength() / 1000) % 60);
}
return; return;
} }
} }
CStream::~CStream() CStream::~CStream()
{ {
Delete(); assert(!IsOpened());
} }
void CStream::Delete() void CStream::Close()
{ {
#ifdef MULTITHREADED_AUDIO
{
std::lock_guard<std::mutex> lock(m_mutex);
Stop();
ClearBuffers();
m_nDeleteMe = true;
// clearing buffer queues are not needed. after m_nDeleteMe set, this stream is ded
}
FlagAsToBeProcessed(true);
#else
Stop(); Stop();
ClearBuffers(); ClearBuffers();
if ( m_pSoundFile ) if ( m_pSoundFile )
{ {
delete m_pSoundFile; delete m_pSoundFile;
m_pSoundFile = nil; m_pSoundFile = nil;
} }
if ( m_pBuffer ) if ( m_pBuffer )
{ {
free(m_pBuffer); free(m_pBuffer);
m_pBuffer = nil; m_pBuffer = nil;
} }
#endif
} }
bool CStream::HasSource() bool CStream::HasSource()
@ -1025,6 +1281,14 @@ bool CStream::IsPlaying()
alGetSourcei(m_pAlSources[1], AL_SOURCE_STATE, &sourceState[1]); alGetSourcei(m_pAlSources[1], AL_SOURCE_STATE, &sourceState[1]);
if (sourceState[0] == AL_PLAYING || sourceState[1] == AL_PLAYING) if (sourceState[0] == AL_PLAYING || sourceState[1] == AL_PLAYING)
return true; return true;
#ifdef MULTITHREADED_AUDIO
std::lock_guard<std::mutex> lock(m_mutex);
// Streams are designed in such a way that m_fillBuffers and m_queueBuffers will be *always* filled if audio is playing, and mutex is acquired
if (!m_fillBuffers.empty() || !m_queueBuffers.emptyNts())
return true;
#endif
} }
return false; return false;
@ -1099,8 +1363,24 @@ void CStream::SetPan(uint8 nPan)
void CStream::SetPosMS(uint32 nPos) void CStream::SetPosMS(uint32 nPos)
{ {
if ( !IsOpened() ) return; if ( !IsOpened() ) return;
m_pSoundFile->Seek(nPos);
#ifdef MULTITHREADED_AUDIO
std::lock_guard<std::mutex> lock(m_mutex);
std::queue<std::pair<ALuint, ALuint>>().swap(m_fillBuffers);
tsQueue<std::pair<ALuint, ALuint>>().swapNts(m_queueBuffers); // TSness not required, second thread always access it when stream mutex acquired
if (MusicManager.m_nMusicMode != MUSICMODE_CUTSCENE) {
m_bDoSeek = true;
m_SeekPos = nPos;
} else
#endif
{
m_pSoundFile->Seek(nPos);
}
ClearBuffers(); ClearBuffers();
// adding to gStreamsToProcess not needed, someone always calls Start() / BuffersShouldBeFilled() after SetPosMS
} }
uint32 CStream::GetPosMS() uint32 CStream::GetPosMS()
@ -1108,10 +1388,16 @@ uint32 CStream::GetPosMS()
if ( !HasSource() ) return 0; if ( !HasSource() ) return 0;
if ( !IsOpened() ) return 0; if ( !IsOpened() ) return 0;
// Deferred init causes division by zero
if (m_pSoundFile->GetChannels() == 0)
return 0;
ALint offset; ALint offset;
//alGetSourcei(m_alSource, AL_SAMPLE_OFFSET, &offset); //alGetSourcei(m_alSource, AL_SAMPLE_OFFSET, &offset);
alGetSourcei(m_pAlSources[0], AL_BYTE_OFFSET, &offset); alGetSourcei(m_pAlSources[0], AL_BYTE_OFFSET, &offset);
//std::lock_guard<std::mutex> lock(m_mutex);
return m_pSoundFile->Tell() return m_pSoundFile->Tell()
- m_pSoundFile->samples2ms(m_pSoundFile->GetBufferSamples() * (NUM_STREAMBUFFERS/2-1)) / m_pSoundFile->GetChannels() - m_pSoundFile->samples2ms(m_pSoundFile->GetBufferSamples() * (NUM_STREAMBUFFERS/2-1)) / m_pSoundFile->GetChannels()
+ m_pSoundFile->samples2ms(offset/m_pSoundFile->GetSampleSize()) / m_pSoundFile->GetChannels(); + m_pSoundFile->samples2ms(offset/m_pSoundFile->GetSampleSize()) / m_pSoundFile->GetChannels();
@ -1125,6 +1411,7 @@ uint32 CStream::GetLengthMS()
bool CStream::FillBuffer(ALuint *alBuffer) bool CStream::FillBuffer(ALuint *alBuffer)
{ {
#ifndef MULTITHREADED_AUDIO
if ( !HasSource() ) if ( !HasSource() )
return false; return false;
if ( !IsOpened() ) if ( !IsOpened() )
@ -1133,7 +1420,8 @@ bool CStream::FillBuffer(ALuint *alBuffer)
return false; return false;
if ( !(alBuffer[1] != AL_NONE && alIsBuffer(alBuffer[1])) ) if ( !(alBuffer[1] != AL_NONE && alIsBuffer(alBuffer[1])) )
return false; return false;
#endif
uint32 size = m_pSoundFile->Decode(m_pBuffer); uint32 size = m_pSoundFile->Decode(m_pBuffer);
if( size == 0 ) if( size == 0 )
return false; return false;
@ -1149,6 +1437,26 @@ bool CStream::FillBuffer(ALuint *alBuffer)
return true; return true;
} }
#ifdef MULTITHREADED_AUDIO
bool CStream::QueueBuffers()
{
bool buffersQueued = false;
std::pair<ALuint, ALuint> buffers;
while (m_queueBuffers.peekPop(&buffers)) // beware: m_queueBuffers is tsQueue
{
ALuint leftBuf = buffers.first;
ALuint rightBuf = buffers.second;
alSourceQueueBuffers(m_pAlSources[0], 1, &leftBuf);
alSourceQueueBuffers(m_pAlSources[1], 1, &rightBuf);
buffersQueued = true;
}
return buffersQueued;
}
#endif
// Only used in single-threaded audio or cutscene audio
int32 CStream::FillBuffers() int32 CStream::FillBuffers()
{ {
int32 i = 0; int32 i = 0;
@ -1178,17 +1486,33 @@ void CStream::ClearBuffers()
alSourceUnqueueBuffers(m_pAlSources[1], 1, &value); alSourceUnqueueBuffers(m_pAlSources[1], 1, &value);
} }
bool CStream::Setup(bool imSureQueueIsEmpty) bool CStream::Setup(bool imSureQueueIsEmpty, bool lock)
{ {
if ( IsOpened() ) if ( IsOpened() )
{ {
alSourcei(m_pAlSources[0], AL_LOOPING, AL_FALSE); #ifdef MULTITHREADED_AUDIO
alSourcei(m_pAlSources[1], AL_LOOPING, AL_FALSE); if (lock)
m_mutex.lock();
#endif
if (!imSureQueueIsEmpty) { if (!imSureQueueIsEmpty) {
SetPlay(false); Stop();
ClearBuffers(); ClearBuffers();
} }
#ifdef MULTITHREADED_AUDIO
if (MusicManager.m_nMusicMode == MUSICMODE_CUTSCENE) {
m_pSoundFile->Seek(0);
} else {
m_bDoSeek = true;
m_SeekPos = 0;
}
if (lock)
m_mutex.unlock();
#else
m_pSoundFile->Seek(0); m_pSoundFile->Seek(0);
#endif
//SetPosition(0.0f, 0.0f, 0.0f); //SetPosition(0.0f, 0.0f, 0.0f);
SetPitch(1.0f); SetPitch(1.0f);
//SetPan(m_nPan); //SetPan(m_nPan);
@ -1241,8 +1565,12 @@ void CStream::SetPlay(bool state)
void CStream::Start() void CStream::Start()
{ {
if ( !HasSource() ) return; if ( !HasSource() ) return;
if ( FillBuffers() != 0 )
SetPlay(true); #ifdef MULTITHREADED_AUDIO
std::lock_guard<std::mutex> lock(m_mutex);
tsQueue<std::pair<ALuint, ALuint>>().swapNts(m_queueBuffers); // TSness not required, second thread always access it when stream mutex acquired
#endif
BuffersShouldBeFilled();
} }
void CStream::Stop() void CStream::Stop()
@ -1264,9 +1592,23 @@ void CStream::Update()
if ( !m_bPaused ) if ( !m_bPaused )
{ {
ALint totalBuffers[2] = { 0, 0 };
ALint buffersProcessed[2] = { 0, 0 };
bool buffersQueuedAndStarted = false;
bool buffersQueuedButNotStarted = false;
#ifdef MULTITHREADED_AUDIO
// Put it in here because we need totalBuffers after queueing to decide when to loop audio
if (m_bActive)
{
buffersQueuedAndStarted = QueueBuffers();
if(buffersQueuedAndStarted) {
SetPlay(true);
}
}
#endif
ALint totalBuffers[2] = {0, 0};
ALint buffersProcessed[2] = {0, 0};
// Relying a lot on left buffer states in here // Relying a lot on left buffer states in here
do do
@ -1278,44 +1620,66 @@ void CStream::Update()
alGetSourcei(m_pAlSources[1], AL_BUFFERS_QUEUED, &totalBuffers[1]); alGetSourcei(m_pAlSources[1], AL_BUFFERS_QUEUED, &totalBuffers[1]);
alGetSourcei(m_pAlSources[1], AL_BUFFERS_PROCESSED, &buffersProcessed[1]); alGetSourcei(m_pAlSources[1], AL_BUFFERS_PROCESSED, &buffersProcessed[1]);
} while (buffersProcessed[0] != buffersProcessed[1]); } while (buffersProcessed[0] != buffersProcessed[1]);
assert(buffersProcessed[0] == buffersProcessed[1]); assert(buffersProcessed[0] == buffersProcessed[1]);
// Correcting OpenAL concepts here: // Correcting OpenAL concepts here:
// AL_BUFFERS_QUEUED = Number of *all* buffers in queue, including processed, processing and pending // AL_BUFFERS_QUEUED = Number of *all* buffers in queue, including processed, processing and pending
// AL_BUFFERS_PROCESSED = Index of the buffer being processing right now. Buffers coming after that(have greater index) are pending buffers. // AL_BUFFERS_PROCESSED = Index of the buffer being processing right now. Buffers coming after that(have greater index) are pending buffers.
// which means: totalBuffers[0] - buffersProcessed[0] = pending buffers // which means: totalBuffers[0] - buffersProcessed[0] = pending buffers
bool buffersRefilled = false;
// We should wait queue to be cleared to loop track, because position calculation relies on queue. // We should wait queue to be cleared to loop track, because position calculation relies on queue.
if (m_nLoopCount != 1 && m_bActive && totalBuffers[0] == 0) if (m_nLoopCount != 1 && m_bActive && totalBuffers[0] == 0)
{ {
Setup(true); #ifdef MULTITHREADED_AUDIO
buffersRefilled = FillBuffers() != 0; std::lock_guard<std::mutex> lock(m_mutex);
if (m_nLoopCount != 0)
m_nLoopCount--; if (m_fillBuffers.empty() && m_queueBuffers.emptyNts()) // we already acquired stream mutex, which is enough for second thread. thus Nts variant
#endif
{
Setup(true, false);
BuffersShouldBeFilled(); // will also call SetPlay(true)
if (m_nLoopCount != 0)
m_nLoopCount--;
}
} }
else else
{ {
while( buffersProcessed[0]-- ) static std::queue<std::pair<ALuint, ALuint>> tempFillBuffer;
while ( buffersProcessed[0]-- )
{ {
ALuint buffer[2]; ALuint buffer[2];
alSourceUnqueueBuffers(m_pAlSources[0], 1, &buffer[0]); alSourceUnqueueBuffers(m_pAlSources[0], 1, &buffer[0]);
alSourceUnqueueBuffers(m_pAlSources[1], 1, &buffer[1]); alSourceUnqueueBuffers(m_pAlSources[1], 1, &buffer[1]);
if (m_bActive && FillBuffer(buffer)) if (m_bActive)
{ {
buffersRefilled = true; tempFillBuffer.push(std::pair<ALuint, ALuint>(buffer[0], buffer[1]));
alSourceQueueBuffers(m_pAlSources[0], 1, &buffer[0]);
alSourceQueueBuffers(m_pAlSources[1], 1, &buffer[1]);
} }
} }
if (m_bActive && buffersProcessed[1])
{
#ifdef MULTITHREADED_AUDIO
m_mutex.lock();
#endif
while (!tempFillBuffer.empty()) {
auto elem = tempFillBuffer.front();
tempFillBuffer.pop();
buffersQueuedButNotStarted = BufferShouldBeFilledAndQueued(&elem);
}
#ifdef MULTITHREADED_AUDIO
m_mutex.unlock();
FlagAsToBeProcessed();
#endif
}
} }
// Two reasons: 1-Source may be starved to audio and stopped itself, 2- We're already waiting it to starve and die for looping track! // Source may be starved to audio and stopped itself
if (m_bActive && (buffersRefilled || (totalBuffers[1] - buffersProcessed[1] != 0))) if (m_bActive && !buffersQueuedAndStarted && (buffersQueuedButNotStarted || (totalBuffers[1] - buffersProcessed[1] != 0)))
SetPlay(true); SetPlay(true);
} }
} }
@ -1324,28 +1688,45 @@ void CStream::ProviderInit()
{ {
if ( m_bReset ) if ( m_bReset )
{ {
if ( Setup(true) ) if ( Setup(true, false) ) // lock not needed, thread can't process streams with m_bReset set
{ {
SetPan(m_nPan); SetPan(m_nPan);
SetVolume(m_nVolume); SetVolume(m_nVolume);
SetLoopCount(m_nLoopCount); SetLoopCount(m_nLoopCount);
SetPosMS(m_nPosBeforeReset); SetPosMS(m_nPosBeforeReset);
if (m_bActive) #ifdef MULTITHREADED_AUDIO
FillBuffers(); std::unique_lock<std::mutex> lock(m_mutex);
SetPlay(m_bActive); #endif
if ( m_bPaused ) if(m_bActive)
BuffersShouldBeFilled();
if (m_bPaused)
Pause(); Pause();
m_bReset = false;
} else {
#ifdef MULTITHREADED_AUDIO
std::unique_lock<std::mutex> lock(m_mutex);
#endif
m_bReset = false;
} }
m_bReset = false;
} }
} }
void CStream::ProviderTerm() void CStream::ProviderTerm()
{ {
#ifdef MULTITHREADED_AUDIO
std::lock_guard<std::mutex> lock(m_mutex);
// unlike Close() we will reuse this stream, so clearing queues are important.
std::queue<std::pair<ALuint, ALuint>>().swap(m_fillBuffers);
tsQueue<std::pair<ALuint, ALuint>>().swapNts(m_queueBuffers); // stream mutex is already acquired, thus Nts variant
#endif
m_bReset = true; m_bReset = true;
m_nPosBeforeReset = GetPosMS(); m_nPosBeforeReset = GetPosMS();
Stop();
ClearBuffers(); ClearBuffers();
} }

View File

@ -11,6 +11,7 @@ public:
virtual ~IDecoder() { } virtual ~IDecoder() { }
virtual bool IsOpened() = 0; virtual bool IsOpened() = 0;
virtual void FileOpen() = 0;
virtual uint32 GetSampleSize() = 0; virtual uint32 GetSampleSize() = 0;
virtual uint32 GetSampleCount() = 0; virtual uint32 GetSampleCount() = 0;
@ -48,12 +49,70 @@ public:
uint32 GetLength() uint32 GetLength()
{ {
FileOpen(); // abort deferred init, we need length now - game has to cache audio file sizes
return float(GetSampleCount()) * 1000.0f / float(GetSampleRate()); return float(GetSampleCount()) * 1000.0f / float(GetSampleRate());
} }
virtual uint32 Decode(void *buffer) = 0; virtual uint32 Decode(void *buffer) = 0;
}; };
#ifdef MULTITHREADED_AUDIO
template <typename T> class tsQueue
{
public:
tsQueue() : count(0) { }
void push(const T &value)
{
std::lock_guard<std::mutex> lock(m_mutex);
m_queue.push(value);
count++;
}
bool peekPop(T *retVal)
{
std::lock_guard<std::mutex> lock(m_mutex);
if (count == 0)
return false;
*retVal = m_queue.front();
m_queue.pop();
count--;
return true;
}
void swapNts(tsQueue<T> &replaceWith)
{
m_queue.swap(replaceWith.m_queue);
replaceWith.count = count;
}
/*
void swapTs(tsQueue<T> &replaceWith)
{
std::lock_guard<std::mutex> lock(m_mutex);
std::lock_guard<std::mutex> lock2(replaceWith.m_mutex);
swapNts(replaceWith);
}
*/
bool emptyNts()
{
return count == 0;
}
/*
bool emptyTs()
{
std::lock_guard<std::mutex> lock(m_mutex);
return emptyNts();
}
*/
std::queue<T> m_queue;
int count;
mutable std::mutex m_mutex;
};
#endif
class CStream class CStream
{ {
char m_aFilename[128]; char m_aFilename[128];
@ -63,6 +122,16 @@ class CStream
bool m_bPaused; bool m_bPaused;
bool m_bActive; bool m_bActive;
public:
#ifdef MULTITHREADED_AUDIO
std::mutex m_mutex;
std::queue<std::pair<ALuint, ALuint>> m_fillBuffers; // left and right buffer
tsQueue<std::pair<ALuint, ALuint>> m_queueBuffers;
bool m_bDoSeek;
uint32 m_SeekPos;
uint8 m_nDeleteMe; // 1: add to delete list 2: already on delete list
#endif
void *m_pBuffer; void *m_pBuffer;
bool m_bReset; bool m_bReset;
@ -72,7 +141,14 @@ class CStream
int32 m_nLoopCount; int32 m_nLoopCount;
IDecoder *m_pSoundFile; IDecoder *m_pSoundFile;
void BuffersShouldBeFilled(); // all
bool BufferShouldBeFilledAndQueued(std::pair<ALuint, ALuint>*); // two (left-right)
#ifdef MULTITHREADED_AUDIO
void FlagAsToBeProcessed(bool close = false);
bool QueueBuffers();
#endif
bool HasSource(); bool HasSource();
void SetPosition(int i, float x, float y, float z); void SetPosition(int i, float x, float y, float z);
void SetPitch(float pitch); void SetPitch(float pitch);
@ -81,15 +157,15 @@ class CStream
void SetPlay(bool state); void SetPlay(bool state);
bool FillBuffer(ALuint *alBuffer); bool FillBuffer(ALuint *alBuffer);
int32 FillBuffers(); int32 FillBuffers();
void ClearBuffers(); void ClearBuffers();
public: //public:
static void Initialise(); static void Initialise();
static void Terminate(); static void Terminate();
CStream(char *filename, ALuint *sources, ALuint (&buffers)[NUM_STREAMBUFFERS], uint32 overrideSampleRate = 32000); CStream(char *filename, ALuint *sources, ALuint (&buffers)[NUM_STREAMBUFFERS], uint32 overrideSampleRate = 32000);
~CStream(); ~CStream();
void Delete(); void Close();
bool IsOpened(); bool IsOpened();
bool IsPlaying(); bool IsPlaying();
@ -100,12 +176,11 @@ public:
uint32 GetPosMS(); uint32 GetPosMS();
uint32 GetLengthMS(); uint32 GetLengthMS();
bool Setup(bool imSureQueueIsEmpty = false); bool Setup(bool imSureQueueIsEmpty = false, bool lock = true);
void Start(); void Start();
void Stop(); void Stop();
void Update(void); void Update(void);
void SetLoopCount(int32); void SetLoopCount(int32);
void ProviderInit(); void ProviderInit();
void ProviderTerm(); void ProviderTerm();

View File

@ -34,6 +34,12 @@
#include "oal/oal_utils.h" #include "oal/oal_utils.h"
#include "oal/aldlist.h" #include "oal/aldlist.h"
#include "oal/channel.h" #include "oal/channel.h"
#include <utility>
#ifdef MULTITHREADED_AUDIO
#include <mutex>
#include <queue>
#endif
#include "oal/stream.h" #include "oal/stream.h"
#include "AudioManager.h" #include "AudioManager.h"
@ -520,7 +526,7 @@ _FindMP3s(void)
if (aStream[0] && aStream[0]->IsOpened()) if (aStream[0] && aStream[0]->IsOpened())
{ {
total_ms = aStream[0]->GetLengthMS(); total_ms = aStream[0]->GetLengthMS();
delete aStream[0]; aStream[0]->Close();
aStream[0] = NULL; aStream[0] = NULL;
OutputDebugString(fd.cFileName); OutputDebugString(fd.cFileName);
@ -578,7 +584,7 @@ _FindMP3s(void)
if (aStream[0] && aStream[0]->IsOpened()) if (aStream[0] && aStream[0]->IsOpened())
{ {
total_ms = aStream[0]->GetLengthMS(); total_ms = aStream[0]->GetLengthMS();
delete aStream[0]; aStream[0]->Close();
aStream[0] = NULL; aStream[0] = NULL;
pList->pNext = new tMP3Entry; pList->pNext = new tMP3Entry;
@ -732,6 +738,7 @@ cSampleManager::Initialise(void)
return TRUE; return TRUE;
EFXInit(); EFXInit();
CStream::Initialise(); CStream::Initialise();
{ {
@ -890,7 +897,7 @@ cSampleManager::Initialise(void)
if(aStream[0] && aStream[0]->IsOpened()) { if(aStream[0] && aStream[0]->IsOpened()) {
uint32 tatalms = aStream[0]->GetLengthMS(); uint32 tatalms = aStream[0]->GetLengthMS();
delete aStream[0]; aStream[0]->Close();
aStream[0] = NULL; aStream[0] = NULL;
nStreamLength[i] = tatalms; nStreamLength[i] = tatalms;
@ -939,7 +946,7 @@ cSampleManager::Initialise(void)
nStreamPan[i] = 63; nStreamPan[i] = 63;
} }
} }
{ {
_bSampmanInitialised = TRUE; _bSampmanInitialised = TRUE;
@ -1025,7 +1032,7 @@ cSampleManager::Terminate(void)
CStream *stream = aStream[i]; CStream *stream = aStream[i];
if (stream) if (stream)
{ {
delete stream; stream->Close();
aStream[i] = NULL; aStream[i] = NULL;
} }
} }
@ -1607,7 +1614,7 @@ cSampleManager::PreloadStreamedFile(uint8 nFile, uint8 nStream)
{ {
if ( aStream[nStream] ) if ( aStream[nStream] )
{ {
delete aStream[nStream]; aStream[nStream]->Close();
aStream[nStream] = NULL; aStream[nStream] = NULL;
} }
@ -1619,7 +1626,7 @@ cSampleManager::PreloadStreamedFile(uint8 nFile, uint8 nStream)
aStream[nStream] = stream; aStream[nStream] = stream;
if ( !stream->Setup() ) if ( !stream->Setup() )
{ {
delete stream; stream->Close();
aStream[nStream] = NULL; aStream[nStream] = NULL;
} }
} }
@ -1666,7 +1673,7 @@ cSampleManager::StartStreamedFile(uint8 nFile, uint32 nPos, uint8 nStream)
if ( aStream[nStream] ) if ( aStream[nStream] )
{ {
delete aStream[nStream]; aStream[nStream]->Close();
aStream[nStream] = NULL; aStream[nStream] = NULL;
} }
if ( nFile == STREAMED_SOUND_RADIO_MP3_PLAYER ) if ( nFile == STREAMED_SOUND_RADIO_MP3_PLAYER )
@ -1697,7 +1704,7 @@ cSampleManager::StartStreamedFile(uint8 nFile, uint32 nPos, uint8 nStream)
return TRUE; return TRUE;
} else { } else {
delete stream; stream->Close();
aStream[nStream] = NULL; aStream[nStream] = NULL;
} }
return FALSE; return FALSE;
@ -1721,7 +1728,7 @@ cSampleManager::StartStreamedFile(uint8 nFile, uint32 nPos, uint8 nStream)
_bIsMp3Active = TRUE; _bIsMp3Active = TRUE;
return TRUE; return TRUE;
} else { } else {
delete aStream[nStream]; aStream[nStream]->Close();
aStream[nStream] = NULL; aStream[nStream] = NULL;
} }
// fall through, start playing from another song // fall through, start playing from another song
@ -1753,7 +1760,7 @@ cSampleManager::StartStreamedFile(uint8 nFile, uint32 nPos, uint8 nStream)
return TRUE; return TRUE;
} else { } else {
delete stream; stream->Close();
aStream[nStream] = NULL; aStream[nStream] = NULL;
} }
return FALSE; return FALSE;
@ -1775,7 +1782,7 @@ cSampleManager::StartStreamedFile(uint8 nFile, uint32 nPos, uint8 nStream)
#endif #endif
return TRUE; return TRUE;
} else { } else {
delete aStream[nStream]; aStream[nStream]->Close();
aStream[nStream] = NULL; aStream[nStream] = NULL;
} }
@ -1800,7 +1807,7 @@ cSampleManager::StartStreamedFile(uint8 nFile, uint32 nPos, uint8 nStream)
return TRUE; return TRUE;
} else { } else {
delete stream; stream->Close();
aStream[nStream] = NULL; aStream[nStream] = NULL;
} }
return FALSE; return FALSE;
@ -1815,7 +1822,7 @@ cSampleManager::StopStreamedFile(uint8 nStream)
if ( stream ) if ( stream )
{ {
delete stream; stream->Close();
aStream[nStream] = NULL; aStream[nStream] = NULL;
if ( nStream == 0 ) if ( nStream == 0 )

View File

@ -399,6 +399,7 @@ enum Config {
//#define PS2_AUDIO_PATHS // changes audio paths for cutscenes and radio to PS2 paths (needs vbdec on MSS builds) //#define PS2_AUDIO_PATHS // changes audio paths for cutscenes and radio to PS2 paths (needs vbdec on MSS builds)
//#define AUDIO_OAL_USE_SNDFILE // use libsndfile to decode WAVs instead of our internal decoder //#define AUDIO_OAL_USE_SNDFILE // use libsndfile to decode WAVs instead of our internal decoder
#define AUDIO_OAL_USE_MPG123 // use mpg123 to support mp3 files #define AUDIO_OAL_USE_MPG123 // use mpg123 to support mp3 files
#define MULTITHREADED_AUDIO
#define PAUSE_RADIO_IN_FRONTEND // pause radio when game is paused #define PAUSE_RADIO_IN_FRONTEND // pause radio when game is paused
#ifdef AUDIO_OPUS #ifdef AUDIO_OPUS