LCOV - code coverage report
Current view: top level - source - MessageQueue.cpp (source / functions) Coverage Total Hit
Test: coverage.info Lines: 99.0 % 96 95
Test Date: 2026-05-12 09:11:14 Functions: 100.0 % 23 23

            Line data    Source code
       1              : /*
       2              :  * Copyright (C) 2023 Sky UK
       3              :  *
       4              :  * This library is free software; you can redistribute it and/or
       5              :  * modify it under the terms of the GNU Lesser General Public
       6              :  * License as published by the Free Software Foundation;
       7              :  * version 2.1 of the License.
       8              :  *
       9              :  * This library is distributed in the hope that it will be useful,
      10              :  * but WITHOUT ANY WARRANTY; without even the implied warranty of
      11              :  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
      12              :  * Lesser General Public License for more details.
      13              :  *
      14              :  * You should have received a copy of the GNU Lesser General Public
      15              :  * License along with this library; if not, write to the Free Software
      16              :  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
      17              :  */
      18              : 
      19              : #include "MessageQueue.h"
      20              : #include "GstreamerCatLog.h"
      21              : #define GST_CAT_DEFAULT rialtoGStreamerCat
      22         1685 : CallInEventLoopMessage::CallInEventLoopMessage(const std::function<void()> &func) : m_func(func), m_done{false} {}
      23              : 
      24         1682 : void CallInEventLoopMessage::handle()
      25              : {
      26         1682 :     std::unique_lock<std::mutex> lock(m_callInEventLoopMutex);
      27         1682 :     m_func();
      28         1682 :     m_done = true;
      29         1682 :     m_callInEventLoopCondVar.notify_all();
      30              : }
      31              : 
      32            1 : void CallInEventLoopMessage::skip()
      33              : {
      34            1 :     std::unique_lock<std::mutex> lock(m_callInEventLoopMutex);
      35            1 :     m_done = true;
      36            1 :     m_callInEventLoopCondVar.notify_all();
      37              : }
      38              : 
      39         1683 : void CallInEventLoopMessage::wait()
      40              : {
      41         1683 :     std::unique_lock<std::mutex> lock(m_callInEventLoopMutex);
      42         5048 :     m_callInEventLoopCondVar.wait(lock, [this]() { return m_done; });
      43         1682 : }
      44              : 
      45            1 : ScheduleInEventLoopMessage::ScheduleInEventLoopMessage(const std::function<void()> &func) : m_func(func) {}
      46              : 
      47            1 : void ScheduleInEventLoopMessage::handle()
      48              : {
      49            1 :     m_func();
      50              : }
      51              : 
      52          162 : std::shared_ptr<IMessageQueueFactory> IMessageQueueFactory::createFactory()
      53              : {
      54          162 :     return std::make_shared<MessageQueueFactory>();
      55              : }
      56              : 
      57          304 : std::unique_ptr<IMessageQueue> MessageQueueFactory::createMessageQueue() const
      58              : {
      59          304 :     return std::make_unique<rialto::MessageQueue>();
      60              : }
      61              : 
      62              : namespace rialto
      63              : {
      64          361 : MessageQueue::MessageQueue() : m_running(false), m_acceptingMessages{false} {}
      65              : 
      66          712 : MessageQueue::~MessageQueue()
      67              : {
      68          361 :     doStop();
      69          712 : }
      70              : 
      71          360 : void MessageQueue::start()
      72              : {
      73          360 :     if (m_running)
      74              :     {
      75              :         // queue is running
      76            1 :         return;
      77              :     }
      78          359 :     m_running = true;
      79          359 :     m_acceptingMessages = true;
      80          359 :     std::thread startThread(&MessageQueue::processMessages, this);
      81          359 :     m_workerThread.swap(startThread);
      82              : }
      83              : 
      84          213 : void MessageQueue::stop()
      85              : {
      86          213 :     doStop();
      87              : }
      88              : 
      89            1 : void MessageQueue::clear()
      90              : {
      91            1 :     doClear();
      92              : }
      93              : 
      94         1739 : std::shared_ptr<Message> MessageQueue::waitForMessage()
      95              : {
      96         1739 :     std::unique_lock<std::mutex> lock(m_mutex);
      97         3300 :     while (m_queue.empty())
      98              :     {
      99         1561 :         m_condVar.wait(lock);
     100              :     }
     101         1739 :     std::shared_ptr<Message> message = m_queue.front();
     102         1739 :     m_queue.pop_front();
     103         3478 :     return message;
     104         1739 : }
     105              : 
     106         1386 : bool MessageQueue::postMessage(const std::shared_ptr<Message> &msg)
     107              : {
     108         1386 :     const std::lock_guard<std::mutex> lock(m_mutex);
     109         1386 :     if (!m_running || !m_acceptingMessages)
     110              :     {
     111            3 :         GST_ERROR("Message queue is not running or not accepting messages");
     112            3 :         return false;
     113              :     }
     114         1383 :     m_queue.push_back(msg);
     115         1383 :     m_condVar.notify_all();
     116              : 
     117         1383 :     return true;
     118         1386 : }
     119              : 
     120          359 : void MessageQueue::processMessages()
     121              : {
     122              :     do
     123              :     {
     124         1739 :         std::shared_ptr<Message> message = waitForMessage();
     125         1739 :         message->handle();
     126         1739 :     } while (m_running);
     127          359 : }
     128              : 
     129            1 : bool MessageQueue::scheduleInEventLoop(const std::function<void()> &func)
     130              : {
     131            1 :     auto message = std::make_shared<ScheduleInEventLoopMessage>(func);
     132            1 :     if (!postMessage(message))
     133              :     {
     134            0 :         return false;
     135              :     }
     136              : 
     137            1 :     return true;
     138              : }
     139              : 
     140         1380 : bool MessageQueue::callInEventLoop(const std::function<void()> &func)
     141              : {
     142         1380 :     return callInEventLoopInternal(func);
     143              : }
     144              : 
     145         1380 : bool MessageQueue::callInEventLoopInternal(const std::function<void()> &func)
     146              : {
     147         1380 :     if (std::this_thread::get_id() != m_workerThread.get_id())
     148              :     {
     149         1328 :         auto message = std::make_shared<CallInEventLoopMessage>(func);
     150         1328 :         if (!postMessage(message))
     151              :         {
     152            2 :             return false;
     153              :         }
     154         1326 :         message->wait();
     155         1328 :     }
     156              :     else
     157              :     {
     158           52 :         func();
     159              :     }
     160              : 
     161         1378 :     return true;
     162              : }
     163              : 
     164          574 : void MessageQueue::doStop()
     165              : {
     166          574 :     if (!m_running)
     167              :     {
     168              :         // queue is not running
     169          215 :         return;
     170              :     }
     171          359 :     if (std::this_thread::get_id() == m_workerThread.get_id())
     172              :     {
     173            2 :         m_acceptingMessages = false;
     174            2 :         m_running = false;
     175            2 :         m_workerThread.detach();
     176              :     }
     177              :     else
     178              :     {
     179          714 :         auto message = std::make_shared<CallInEventLoopMessage>([this]() { m_running = false; });
     180              :         {
     181          357 :             const std::lock_guard<std::mutex> lock(m_mutex);
     182          357 :             m_acceptingMessages = false;
     183          357 :             m_queue.push_back(message);
     184          357 :             m_condVar.notify_all();
     185              :         }
     186          357 :         message->wait();
     187              : 
     188          357 :         if (m_workerThread.joinable())
     189          357 :             m_workerThread.join();
     190              :     }
     191              : 
     192          359 :     doClear();
     193              : }
     194              : 
     195          360 : void MessageQueue::doClear()
     196              : {
     197          360 :     std::unique_lock<std::mutex> lock(m_mutex);
     198          361 :     while (!m_queue.empty())
     199              :     {
     200            1 :         m_queue.front()->skip();
     201            1 :         m_queue.pop_front();
     202              :     }
     203          360 : }
     204              : } // namespace rialto
        

Generated by: LCOV version 2.0-1