LCOV - code coverage report
Current view: top level - source - MessageQueue.cpp (source / functions) Coverage Total Hit
Test: coverage.info Lines: 90.7 % 86 78
Test Date: 2025-10-17 10:59:19 Functions: 87.0 % 23 20

            Line data    Source code
       1              : /*
       2              :  * Copyright (C) 2023 Sky UK
       3              :  *
       4              :  * This library is free software; you can redistribute it and/or
       5              :  * modify it under the terms of the GNU Lesser General Public
       6              :  * License as published by the Free Software Foundation;
       7              :  * version 2.1 of the License.
       8              :  *
       9              :  * This library is distributed in the hope that it will be useful,
      10              :  * but WITHOUT ANY WARRANTY; without even the implied warranty of
      11              :  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
      12              :  * Lesser General Public License for more details.
      13              :  *
      14              :  * You should have received a copy of the GNU Lesser General Public
      15              :  * License along with this library; if not, write to the Free Software
      16              :  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
      17              :  */
      18              : 
      19              : #include "MessageQueue.h"
      20              : #include "GstreamerCatLog.h"
      21              : #define GST_CAT_DEFAULT rialtoGStreamerCat
      22         1635 : CallInEventLoopMessage::CallInEventLoopMessage(const std::function<void()> &func) : m_func(func), m_done{false} {}
      23              : 
      24         1633 : void CallInEventLoopMessage::handle()
      25              : {
      26         1633 :     std::unique_lock<std::mutex> lock(m_callInEventLoopMutex);
      27         1633 :     m_func();
      28         1633 :     m_done = true;
      29         1633 :     m_callInEventLoopCondVar.notify_all();
      30              : }
      31              : 
      32            1 : void CallInEventLoopMessage::skip()
      33              : {
      34            1 :     std::unique_lock<std::mutex> lock(m_callInEventLoopMutex);
      35            1 :     m_done = true;
      36            1 :     m_callInEventLoopCondVar.notify_all();
      37              : }
      38              : 
      39         1634 : void CallInEventLoopMessage::wait()
      40              : {
      41         1634 :     std::unique_lock<std::mutex> lock(m_callInEventLoopMutex);
      42         4902 :     m_callInEventLoopCondVar.wait(lock, [this]() { return m_done; });
      43         1634 : }
      44              : 
      45            0 : ScheduleInEventLoopMessage::ScheduleInEventLoopMessage(const std::function<void()> &func) : m_func(func) {}
      46              : 
      47            0 : void ScheduleInEventLoopMessage::handle()
      48              : {
      49            0 :     m_func();
      50              : }
      51              : 
      52          156 : std::shared_ptr<IMessageQueueFactory> IMessageQueueFactory::createFactory()
      53              : {
      54          156 :     return std::make_shared<MessageQueueFactory>();
      55              : }
      56              : 
      57          293 : std::unique_ptr<IMessageQueue> MessageQueueFactory::createMessageQueue() const
      58              : {
      59          293 :     return std::make_unique<rialto::MessageQueue>();
      60              : }
      61              : 
      62              : namespace rialto
      63              : {
      64          348 : MessageQueue::MessageQueue() : m_running(false) {}
      65              : 
      66          688 : MessageQueue::~MessageQueue()
      67              : {
      68          348 :     doStop();
      69          688 : }
      70              : 
      71          349 : void MessageQueue::start()
      72              : {
      73          349 :     if (m_running)
      74              :     {
      75              :         // queue is running
      76            1 :         return;
      77              :     }
      78          348 :     m_running = true;
      79          348 :     std::thread startThread(&MessageQueue::processMessages, this);
      80          348 :     m_workerThread.swap(startThread);
      81              : }
      82              : 
      83          207 : void MessageQueue::stop()
      84              : {
      85          207 :     doStop();
      86              : }
      87              : 
      88            1 : void MessageQueue::clear()
      89              : {
      90            1 :     doClear();
      91              : }
      92              : 
      93         1682 : std::shared_ptr<Message> MessageQueue::waitForMessage()
      94              : {
      95         1682 :     std::unique_lock<std::mutex> lock(m_mutex);
      96         3192 :     while (m_queue.empty())
      97              :     {
      98         1510 :         m_condVar.wait(lock);
      99              :     }
     100         1682 :     std::shared_ptr<Message> message = m_queue.front();
     101         1682 :     m_queue.pop_front();
     102         3364 :     return message;
     103         1682 : }
     104              : 
     105         1685 : bool MessageQueue::postMessage(const std::shared_ptr<Message> &msg)
     106              : {
     107         1685 :     const std::lock_guard<std::mutex> lock(m_mutex);
     108         1685 :     if (!m_running)
     109              :     {
     110            2 :         GST_ERROR("Message queue is not running");
     111            2 :         return false;
     112              :     }
     113         1683 :     m_queue.push_back(msg);
     114         1683 :     m_condVar.notify_all();
     115              : 
     116         1683 :     return true;
     117         1685 : }
     118              : 
     119          348 : void MessageQueue::processMessages()
     120              : {
     121              :     do
     122              :     {
     123         1682 :         std::shared_ptr<Message> message = waitForMessage();
     124         1682 :         message->handle();
     125         1682 :     } while (m_running);
     126          348 : }
     127              : 
     128            0 : bool MessageQueue::scheduleInEventLoop(const std::function<void()> &func)
     129              : {
     130            0 :     auto message = std::make_shared<ScheduleInEventLoopMessage>(func);
     131            0 :     if (!postMessage(message))
     132              :     {
     133            0 :         return false;
     134              :     }
     135              : 
     136            0 :     return true;
     137              : }
     138              : 
     139         1334 : bool MessageQueue::callInEventLoop(const std::function<void()> &func)
     140              : {
     141         1334 :     return callInEventLoopInternal(func);
     142              : }
     143              : 
     144         1682 : bool MessageQueue::callInEventLoopInternal(const std::function<void()> &func)
     145              : {
     146         1682 :     if (std::this_thread::get_id() != m_workerThread.get_id())
     147              :     {
     148         1635 :         auto message = std::make_shared<CallInEventLoopMessage>(func);
     149         1635 :         if (!postMessage(message))
     150              :         {
     151            1 :             return false;
     152              :         }
     153         1634 :         message->wait();
     154         1635 :     }
     155              :     else
     156              :     {
     157           47 :         func();
     158              :     }
     159              : 
     160         1681 :     return true;
     161              : }
     162              : 
     163          555 : void MessageQueue::doStop()
     164              : {
     165          555 :     if (!m_running)
     166              :     {
     167              :         // queue is not running
     168          207 :         return;
     169              :     }
     170          348 :     callInEventLoopInternal([this]() { m_running = false; });
     171              : 
     172          348 :     if (m_workerThread.joinable())
     173          348 :         m_workerThread.join();
     174              : 
     175          348 :     doClear();
     176              : }
     177              : 
     178          349 : void MessageQueue::doClear()
     179              : {
     180          349 :     std::unique_lock<std::mutex> lock(m_mutex);
     181          350 :     while (!m_queue.empty())
     182              :     {
     183            1 :         m_queue.front()->skip();
     184            1 :         m_queue.pop_front();
     185              :     }
     186          349 : }
     187              : } // namespace rialto
        

Generated by: LCOV version 2.0-1