LCOV - code coverage report
Current view: top level - source - MessageQueue.cpp (source / functions) Coverage Total Hit
Test: coverage.info Lines: 90.7 % 86 78
Test Date: 2025-06-24 14:11:58 Functions: 87.0 % 23 20

            Line data    Source code
       1              : /*
       2              :  * Copyright (C) 2023 Sky UK
       3              :  *
       4              :  * This library is free software; you can redistribute it and/or
       5              :  * modify it under the terms of the GNU Lesser General Public
       6              :  * License as published by the Free Software Foundation;
       7              :  * version 2.1 of the License.
       8              :  *
       9              :  * This library is distributed in the hope that it will be useful,
      10              :  * but WITHOUT ANY WARRANTY; without even the implied warranty of
      11              :  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
      12              :  * Lesser General Public License for more details.
      13              :  *
      14              :  * You should have received a copy of the GNU Lesser General Public
      15              :  * License along with this library; if not, write to the Free Software
      16              :  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
      17              :  */
      18              : 
      19              : #include "MessageQueue.h"
      20              : #include "GstreamerCatLog.h"
      21              : #define GST_CAT_DEFAULT rialtoGStreamerCat
      22         1464 : CallInEventLoopMessage::CallInEventLoopMessage(const std::function<void()> &func) : m_func(func), m_done{false} {}
      23              : 
      24         1462 : void CallInEventLoopMessage::handle()
      25              : {
      26         1462 :     std::unique_lock<std::mutex> lock(m_callInEventLoopMutex);
      27         1462 :     m_func();
      28         1462 :     m_done = true;
      29         1462 :     m_callInEventLoopCondVar.notify_all();
      30              : }
      31              : 
      32            1 : void CallInEventLoopMessage::skip()
      33              : {
      34            1 :     std::unique_lock<std::mutex> lock(m_callInEventLoopMutex);
      35            1 :     m_done = true;
      36            1 :     m_callInEventLoopCondVar.notify_all();
      37              : }
      38              : 
      39         1463 : void CallInEventLoopMessage::wait()
      40              : {
      41         1463 :     std::unique_lock<std::mutex> lock(m_callInEventLoopMutex);
      42         4389 :     m_callInEventLoopCondVar.wait(lock, [this]() { return m_done; });
      43         1463 : }
      44              : 
      45            0 : ScheduleInEventLoopMessage::ScheduleInEventLoopMessage(const std::function<void()> &func) : m_func(func) {}
      46              : 
      47            0 : void ScheduleInEventLoopMessage::handle()
      48              : {
      49            0 :     m_func();
      50              : }
      51              : 
      52          146 : std::shared_ptr<IMessageQueueFactory> IMessageQueueFactory::createFactory()
      53              : {
      54          146 :     return std::make_shared<MessageQueueFactory>();
      55              : }
      56              : 
      57          273 : std::unique_ptr<IMessageQueue> MessageQueueFactory::createMessageQueue() const
      58              : {
      59          273 :     return std::make_unique<MessageQueue>();
      60              : }
      61              : 
      62          307 : MessageQueue::MessageQueue() : m_running(false) {}
      63              : 
      64          606 : MessageQueue::~MessageQueue()
      65              : {
      66          307 :     doStop();
      67          606 : }
      68              : 
      69          306 : void MessageQueue::start()
      70              : {
      71          306 :     if (m_running)
      72              :     {
      73              :         // queue is running
      74            1 :         return;
      75              :     }
      76          305 :     m_running = true;
      77          305 :     std::thread startThread(&MessageQueue::processMessages, this);
      78          305 :     m_workerThread.swap(startThread);
      79              : }
      80              : 
      81          175 : void MessageQueue::stop()
      82              : {
      83          175 :     doStop();
      84              : }
      85              : 
      86            1 : void MessageQueue::clear()
      87              : {
      88            1 :     doClear();
      89              : }
      90              : 
      91         1465 : std::shared_ptr<Message> MessageQueue::waitForMessage()
      92              : {
      93         1465 :     std::unique_lock<std::mutex> lock(m_mutex);
      94         2779 :     while (m_queue.empty())
      95              :     {
      96         1314 :         m_condVar.wait(lock);
      97              :     }
      98         1465 :     std::shared_ptr<Message> message = m_queue.front();
      99         1465 :     m_queue.pop_front();
     100         2930 :     return message;
     101         1465 : }
     102              : 
     103         1468 : bool MessageQueue::postMessage(const std::shared_ptr<Message> &msg)
     104              : {
     105         1468 :     const std::lock_guard<std::mutex> lock(m_mutex);
     106         1468 :     if (!m_running)
     107              :     {
     108            2 :         GST_ERROR("Message queue is not running");
     109            2 :         return false;
     110              :     }
     111         1466 :     m_queue.push_back(msg);
     112         1466 :     m_condVar.notify_all();
     113              : 
     114         1466 :     return true;
     115         1468 : }
     116              : 
     117          305 : void MessageQueue::processMessages()
     118              : {
     119              :     do
     120              :     {
     121         1465 :         std::shared_ptr<Message> message = waitForMessage();
     122         1465 :         message->handle();
     123         1465 :     } while (m_running);
     124          305 : }
     125              : 
     126            0 : bool MessageQueue::scheduleInEventLoop(const std::function<void()> &func)
     127              : {
     128            0 :     auto message = std::make_shared<ScheduleInEventLoopMessage>(func);
     129            0 :     if (!postMessage(message))
     130              :     {
     131            0 :         return false;
     132              :     }
     133              : 
     134            0 :     return true;
     135              : }
     136              : 
     137         1168 : bool MessageQueue::callInEventLoop(const std::function<void()> &func)
     138              : {
     139         1168 :     return callInEventLoopInternal(func);
     140              : }
     141              : 
     142         1473 : bool MessageQueue::callInEventLoopInternal(const std::function<void()> &func)
     143              : {
     144         1473 :     if (std::this_thread::get_id() != m_workerThread.get_id())
     145              :     {
     146         1464 :         auto message = std::make_shared<CallInEventLoopMessage>(func);
     147         1464 :         if (!postMessage(message))
     148              :         {
     149            1 :             return false;
     150              :         }
     151         1463 :         message->wait();
     152         1464 :     }
     153              :     else
     154              :     {
     155            9 :         func();
     156              :     }
     157              : 
     158         1472 :     return true;
     159              : }
     160              : 
     161          482 : void MessageQueue::doStop()
     162              : {
     163          482 :     if (!m_running)
     164              :     {
     165              :         // queue is not running
     166          177 :         return;
     167              :     }
     168          305 :     callInEventLoopInternal([this]() { m_running = false; });
     169              : 
     170          305 :     if (m_workerThread.joinable())
     171          305 :         m_workerThread.join();
     172              : 
     173          305 :     doClear();
     174              : }
     175              : 
     176          306 : void MessageQueue::doClear()
     177              : {
     178          306 :     std::unique_lock<std::mutex> lock(m_mutex);
     179          307 :     while (!m_queue.empty())
     180              :     {
     181            1 :         m_queue.front()->skip();
     182            1 :         m_queue.pop_front();
     183              :     }
     184          306 : }
        

Generated by: LCOV version 2.0-1