NekoX/TMessagesProj/jni/libtgvoip3/MessageThread.cpp

188 lines
5.1 KiB
C++

//
// Created by Grishka on 17.06.2018.
//
#include "logging.h"
#include "MessageThread.h"
#include "VoIPController.h"
#include <cassert>
#include <cmath>
#include <cstdint>
#include <ctime>
#ifndef _WIN32
#include <sys/time.h>
#endif
using namespace tgvoip;
MessageThread::MessageThread()
: Thread(std::bind(&MessageThread::Run, this))
, m_running(true)
{
SetName("MessageThread");
#ifdef _WIN32
#if !defined(WINAPI_FAMILY) || WINAPI_FAMILY != WINAPI_FAMILY_PHONE_APP
event = CreateEvent(nullptr, false, false, nullptr);
#else
event = CreateEventEx(nullptr, nullptr, 0, EVENT_ALL_ACCESS);
#endif
#else
::pthread_cond_init(&cond, nullptr);
#endif
}
MessageThread::~MessageThread()
{
Stop();
#ifdef _WIN32
CloseHandle(event);
#else
::pthread_cond_destroy(&cond);
#endif
}
void MessageThread::Stop()
{
if (m_running)
{
m_running = false;
#ifdef _WIN32
SetEvent(event);
#else
::pthread_cond_signal(&cond);
#endif
Join();
}
}
void MessageThread::Run()
{
m_queueMutex.Lock();
while (m_running)
{
double currentTime = VoIPController::GetCurrentTime();
double waitTimeout;
{
MutexGuard lock(m_queueAccessMutex);
waitTimeout = m_queue.empty() ? std::numeric_limits<double>::max() : (m_queue.begin()->deliverAt - currentTime);
}
if (waitTimeout > 0.0)
{
#ifdef _WIN32
queueMutex.Unlock();
DWORD actualWaitTimeout = waitTimeout == DBL_MAX ? INFINITE : ((DWORD)round(waitTimeout * 1000.0));
#if !defined(WINAPI_FAMILY) || WINAPI_FAMILY != WINAPI_FAMILY_PHONE_APP
WaitForSingleObject(event, actualWaitTimeout);
#else
WaitForSingleObjectEx(event, actualWaitTimeout, false);
#endif
// we don't really care if a context switch happens here and anything gets added to the queue by another thread
// since any new no-delay messages will get delivered on this iteration anyway
queueMutex.Lock();
#else
if (waitTimeout != std::numeric_limits<double>::max())
{
struct timeval now;
struct timespec timeout;
gettimeofday(&now, nullptr);
waitTimeout += now.tv_sec;
waitTimeout += (now.tv_usec / 1000000.0);
timeout.tv_sec = static_cast<std::time_t>(std::floor(waitTimeout));
timeout.tv_nsec = static_cast<decltype(timeout.tv_nsec)>((waitTimeout - std::floor(waitTimeout)) * 1000 * 1000 * 1000.0);
::pthread_cond_timedwait(&cond, m_queueMutex.NativeHandle(), &timeout);
}
else
{
::pthread_cond_wait(&cond, m_queueMutex.NativeHandle());
}
#endif
}
if (!m_running)
{
m_queueMutex.Unlock();
return;
}
currentTime = VoIPController::GetCurrentTime();
std::vector<Message> messagesToDeliverNow;
{
MutexGuard lock(m_queueAccessMutex);
auto msgsToDeliverNowBegin = m_queue.begin();
auto msgsToDeliverNowEnd = m_queue.upper_bound(Message{ .id = 0, .deliverAt = currentTime, .interval = 0, .func = nullptr });
for (auto it = msgsToDeliverNowBegin; it != msgsToDeliverNowEnd; it = m_queue.erase(it))
messagesToDeliverNow.emplace_back(*it);
}
for (Message& message : messagesToDeliverNow)
{
m_cancelCurrent = false;
if (message.deliverAt == 0.0)
message.deliverAt = VoIPController::GetCurrentTime();
if (message.func != nullptr)
message.func();
if (!m_cancelCurrent && message.interval > 0.0)
{
message.deliverAt += message.interval;
InsertMessageInternal(message);
}
}
}
m_queueMutex.Unlock();
}
std::uint32_t MessageThread::Post(std::function<void()> func, double delay, double interval)
{
assert(delay >= 0);
Message message;
double currentTime = VoIPController::GetCurrentTime();
{
std::lock_guard<std::mutex> lock(m_mutexLastMessageID);
message = { m_lastMessageID++, delay == 0.0 ? 0.0 : (currentTime + delay), interval, std::move(func) };
}
InsertMessageInternal(message);
if (!IsCurrent())
{
#ifdef _WIN32
SetEvent(event);
#else
::pthread_cond_signal(&cond);
#endif
}
return message.id;
}
bool MessageThread::Message::operator<(const MessageThread::Message& other) const
{
return std::tie(deliverAt, id) < std::tie(other.deliverAt, other.id);
}
void MessageThread::InsertMessageInternal(const MessageThread::Message& message)
{
MutexGuard lock(m_queueAccessMutex);
m_queue.emplace(message);
}
void MessageThread::Cancel(std::uint32_t id)
{
MutexGuard lock(m_queueAccessMutex);
for (auto it = m_queue.begin(); it != m_queue.end(); ++it)
{
if (it->id == id)
{
m_queue.erase(it);
break;
}
}
}
void MessageThread::CancelSelf()
{
assert(IsCurrent());
m_cancelCurrent = true;
}