LCOV - code coverage report
Current view: top level - media/server/gstplayer/source - WorkerThread.cpp (source / functions) Coverage Total Hit
Test: coverage.info Lines: 95.7 % 47 45
Test Date: 2025-10-07 14:22:52 Functions: 100.0 % 14 14

            Line data    Source code
       1              : /*
       2              :  * If not stated otherwise in this file or this component's LICENSE file the
       3              :  * following copyright and licenses apply:
       4              :  *
       5              :  * Copyright 2022 Sky UK
       6              :  *
       7              :  * Licensed under the Apache License, Version 2.0 (the "License");
       8              :  * you may not use this file except in compliance with the License.
       9              :  * You may obtain a copy of the License at
      10              :  *
      11              :  * http://www.apache.org/licenses/LICENSE-2.0
      12              :  *
      13              :  * Unless required by applicable law or agreed to in writing, software
      14              :  * distributed under the License is distributed on an "AS IS" BASIS,
      15              :  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      16              :  * See the License for the specific language governing permissions and
      17              :  * limitations under the License.
      18              :  */
      19              : 
      20              : #include "WorkerThread.h"
      21              : #include "RialtoServerLogging.h"
      22              : 
      23              : namespace
      24              : {
      25              : 
      26              : class FunctionTask : public firebolt::rialto::server::IPlayerTask
      27              : {
      28              : public:
      29            8 :     explicit FunctionTask(std::function<void(void)> &&callback) : m_callback(std::move(callback)) {}
      30              : 
      31           16 :     ~FunctionTask() override = default;
      32              : 
      33            4 :     void execute() const override { m_callback(); }
      34              : 
      35              : private:
      36              :     std::function<void(void)> m_callback;
      37              : };
      38              : 
      39              : } // namespace
      40              : 
      41              : namespace firebolt::rialto::server
      42              : {
      43            4 : std::unique_ptr<IWorkerThread> WorkerThreadFactory::createWorkerThread() const
      44              : {
      45            4 :     std::unique_ptr<IWorkerThread> workerThread;
      46              :     try
      47              :     {
      48            4 :         workerThread = std::make_unique<WorkerThread>();
      49              :     }
      50            0 :     catch (const std::exception &e)
      51              :     {
      52            0 :         RIALTO_SERVER_LOG_ERROR("Failed to create the worker thread, reason: %s", e.what());
      53              :     }
      54            4 :     return workerThread;
      55              : }
      56              : 
      57            4 : WorkerThread::WorkerThread()
      58              : {
      59            4 :     RIALTO_SERVER_LOG_INFO("Worker thread is starting");
      60            4 :     m_taskThread = std::thread(&WorkerThread::taskHandler, this);
      61              : }
      62              : 
      63            8 : WorkerThread::~WorkerThread()
      64              : {
      65            4 :     stop();
      66            4 :     join();
      67            8 : }
      68              : 
      69            8 : void WorkerThread::stop()
      70              : {
      71            8 :     RIALTO_SERVER_LOG_INFO("Stopping worker thread");
      72              : 
      73           12 :     auto shutdownTask = [this]() { m_isTaskThreadActive = false; };
      74            8 :     enqueueTask(std::make_unique<FunctionTask>(std::move(shutdownTask)));
      75              : }
      76              : 
      77            6 : void WorkerThread::join()
      78              : {
      79            6 :     if (m_taskThread.joinable())
      80              :     {
      81            4 :         m_taskThread.join();
      82              :     }
      83            6 : }
      84              : 
      85           13 : void WorkerThread::enqueueTask(std::unique_ptr<IPlayerTask> &&task)
      86              : {
      87           13 :     if (task)
      88              :     {
      89           13 :         std::unique_lock<std::mutex> lock(m_taskMutex);
      90           13 :         m_taskQueue.push_back(std::move(task));
      91           13 :         m_taskCV.notify_one();
      92              :     }
      93              : }
      94              : 
      95            1 : void WorkerThread::enqueuePriorityTask(std::unique_ptr<IPlayerTask> &&task)
      96              : {
      97            1 :     if (task)
      98              :     {
      99            1 :         std::unique_lock<std::mutex> lock(m_taskMutex);
     100            1 :         m_taskQueue.push_front(std::move(task));
     101            1 :         m_taskCV.notify_one();
     102              :     }
     103              : }
     104              : 
     105            4 : void WorkerThread::taskHandler()
     106              : {
     107           14 :     while (m_isTaskThreadActive)
     108              :     {
     109           10 :         std::unique_ptr<IPlayerTask> task = waitForTask();
     110           10 :         task->execute();
     111              :     }
     112            4 : }
     113              : 
     114           10 : std::unique_ptr<IPlayerTask> WorkerThread::waitForTask()
     115              : {
     116           10 :     std::unique_lock<std::mutex> lock(m_taskMutex);
     117           10 :     if (m_taskQueue.empty())
     118              :     {
     119           18 :         m_taskCV.wait(lock, [this] { return !m_taskQueue.empty(); });
     120              :     }
     121           10 :     std::unique_ptr<IPlayerTask> task = std::move(m_taskQueue.front());
     122           10 :     m_taskQueue.pop_front();
     123           20 :     return task;
     124           10 : }
     125              : } // namespace firebolt::rialto::server
        

Generated by: LCOV version 2.0-1