LCOV - code coverage report
Current view: top level - source - MessageQueue.cpp (source / functions) Coverage Total Hit
Test: coverage.info Lines: 90.7 % 86 78
Test Date: 2025-08-04 11:40:41 Functions: 87.0 % 23 20

            Line data    Source code
       1              : /*
       2              :  * Copyright (C) 2023 Sky UK
       3              :  *
       4              :  * This library is free software; you can redistribute it and/or
       5              :  * modify it under the terms of the GNU Lesser General Public
       6              :  * License as published by the Free Software Foundation;
       7              :  * version 2.1 of the License.
       8              :  *
       9              :  * This library is distributed in the hope that it will be useful,
      10              :  * but WITHOUT ANY WARRANTY; without even the implied warranty of
      11              :  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
      12              :  * Lesser General Public License for more details.
      13              :  *
      14              :  * You should have received a copy of the GNU Lesser General Public
      15              :  * License along with this library; if not, write to the Free Software
      16              :  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
      17              :  */
      18              : 
      19              : #include "MessageQueue.h"
      20              : #include "GstreamerCatLog.h"
      21              : #define GST_CAT_DEFAULT rialtoGStreamerCat
      22         1483 : CallInEventLoopMessage::CallInEventLoopMessage(const std::function<void()> &func) : m_func(func), m_done{false} {}
      23              : 
      24         1481 : void CallInEventLoopMessage::handle()
      25              : {
      26         1481 :     std::unique_lock<std::mutex> lock(m_callInEventLoopMutex);
      27         1481 :     m_func();
      28         1481 :     m_done = true;
      29         1481 :     m_callInEventLoopCondVar.notify_all();
      30              : }
      31              : 
      32            1 : void CallInEventLoopMessage::skip()
      33              : {
      34            1 :     std::unique_lock<std::mutex> lock(m_callInEventLoopMutex);
      35            1 :     m_done = true;
      36            1 :     m_callInEventLoopCondVar.notify_all();
      37              : }
      38              : 
      39         1482 : void CallInEventLoopMessage::wait()
      40              : {
      41         1482 :     std::unique_lock<std::mutex> lock(m_callInEventLoopMutex);
      42         4446 :     m_callInEventLoopCondVar.wait(lock, [this]() { return m_done; });
      43         1482 : }
      44              : 
      45            0 : ScheduleInEventLoopMessage::ScheduleInEventLoopMessage(const std::function<void()> &func) : m_func(func) {}
      46              : 
      47            0 : void ScheduleInEventLoopMessage::handle()
      48              : {
      49            0 :     m_func();
      50              : }
      51              : 
      52          151 : std::shared_ptr<IMessageQueueFactory> IMessageQueueFactory::createFactory()
      53              : {
      54          151 :     return std::make_shared<MessageQueueFactory>();
      55              : }
      56              : 
      57          283 : std::unique_ptr<IMessageQueue> MessageQueueFactory::createMessageQueue() const
      58              : {
      59          283 :     return std::make_unique<MessageQueue>();
      60              : }
      61              : 
      62          317 : MessageQueue::MessageQueue() : m_running(false) {}
      63              : 
      64          626 : MessageQueue::~MessageQueue()
      65              : {
      66          317 :     doStop();
      67          626 : }
      68              : 
      69          316 : void MessageQueue::start()
      70              : {
      71          316 :     if (m_running)
      72              :     {
      73              :         // queue is running
      74            1 :         return;
      75              :     }
      76          315 :     m_running = true;
      77          315 :     std::thread startThread(&MessageQueue::processMessages, this);
      78          315 :     m_workerThread.swap(startThread);
      79              : }
      80              : 
      81          180 : void MessageQueue::stop()
      82              : {
      83          180 :     doStop();
      84              : }
      85              : 
      86            1 : void MessageQueue::clear()
      87              : {
      88            1 :     doClear();
      89              : }
      90              : 
      91         1520 : std::shared_ptr<Message> MessageQueue::waitForMessage()
      92              : {
      93         1520 :     std::unique_lock<std::mutex> lock(m_mutex);
      94         2875 :     while (m_queue.empty())
      95              :     {
      96         1355 :         m_condVar.wait(lock);
      97              :     }
      98         1520 :     std::shared_ptr<Message> message = m_queue.front();
      99         1520 :     m_queue.pop_front();
     100         3040 :     return message;
     101         1520 : }
     102              : 
     103         1523 : bool MessageQueue::postMessage(const std::shared_ptr<Message> &msg)
     104              : {
     105         1523 :     const std::lock_guard<std::mutex> lock(m_mutex);
     106         1523 :     if (!m_running)
     107              :     {
     108            2 :         GST_ERROR("Message queue is not running");
     109            2 :         return false;
     110              :     }
     111         1521 :     m_queue.push_back(msg);
     112         1521 :     m_condVar.notify_all();
     113              : 
     114         1521 :     return true;
     115         1523 : }
     116              : 
     117          315 : void MessageQueue::processMessages()
     118              : {
     119              :     do
     120              :     {
     121         1520 :         std::shared_ptr<Message> message = waitForMessage();
     122         1520 :         message->handle();
     123         1520 :     } while (m_running);
     124          315 : }
     125              : 
     126            0 : bool MessageQueue::scheduleInEventLoop(const std::function<void()> &func)
     127              : {
     128            0 :     auto message = std::make_shared<ScheduleInEventLoopMessage>(func);
     129            0 :     if (!postMessage(message))
     130              :     {
     131            0 :         return false;
     132              :     }
     133              : 
     134            0 :     return true;
     135              : }
     136              : 
     137         1207 : bool MessageQueue::callInEventLoop(const std::function<void()> &func)
     138              : {
     139         1207 :     return callInEventLoopInternal(func);
     140              : }
     141              : 
     142         1522 : bool MessageQueue::callInEventLoopInternal(const std::function<void()> &func)
     143              : {
     144         1522 :     if (std::this_thread::get_id() != m_workerThread.get_id())
     145              :     {
     146         1483 :         auto message = std::make_shared<CallInEventLoopMessage>(func);
     147         1483 :         if (!postMessage(message))
     148              :         {
     149            1 :             return false;
     150              :         }
     151         1482 :         message->wait();
     152         1483 :     }
     153              :     else
     154              :     {
     155           39 :         func();
     156              :     }
     157              : 
     158         1521 :     return true;
     159              : }
     160              : 
     161          497 : void MessageQueue::doStop()
     162              : {
     163          497 :     if (!m_running)
     164              :     {
     165              :         // queue is not running
     166          182 :         return;
     167              :     }
     168          315 :     callInEventLoopInternal([this]() { m_running = false; });
     169              : 
     170          315 :     if (m_workerThread.joinable())
     171          315 :         m_workerThread.join();
     172              : 
     173          315 :     doClear();
     174              : }
     175              : 
     176          316 : void MessageQueue::doClear()
     177              : {
     178          316 :     std::unique_lock<std::mutex> lock(m_mutex);
     179          317 :     while (!m_queue.empty())
     180              :     {
     181            1 :         m_queue.front()->skip();
     182            1 :         m_queue.pop_front();
     183              :     }
     184          316 : }
        

Generated by: LCOV version 2.0-1