LCOV - code coverage report
Current view: top level - media/server/main/source - MainThread.cpp (source / functions) Coverage Total Hit
Test: coverage.info Lines: 95.4 % 87 83
Test Date: 2026-03-18 08:38:49 Functions: 100.0 % 16 16

            Line data    Source code
       1              : /*
       2              :  * If not stated otherwise in this file or this component's LICENSE file the
       3              :  * following copyright and licenses apply:
       4              :  *
       5              :  * Copyright 2022 Sky UK
       6              :  *
       7              :  * Licensed under the Apache License, Version 2.0 (the "License");
       8              :  * you may not use this file except in compliance with the License.
       9              :  * You may obtain a copy of the License at
      10              :  *
      11              :  * http://www.apache.org/licenses/LICENSE-2.0
      12              :  *
      13              :  * Unless required by applicable law or agreed to in writing, software
      14              :  * distributed under the License is distributed on an "AS IS" BASIS,
      15              :  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      16              :  * See the License for the specific language governing permissions and
      17              :  * limitations under the License.
      18              :  */
      19              : 
      20              : #include "MainThread.h"
      21              : #include "RialtoServerLogging.h"
      22              : #include <string>
      23              : #include <utility>
      24              : #include <vector>
      25              : 
      26              : namespace firebolt::rialto::server
      27              : {
      28              : std::weak_ptr<IMainThread> MainThreadFactory::m_mainThread;
      29              : std::mutex MainThreadFactory::m_creationMutex;
      30              : 
      31           28 : std::shared_ptr<IMainThreadFactory> IMainThreadFactory::createFactory()
      32              : {
      33           28 :     std::shared_ptr<IMainThreadFactory> factory;
      34              :     try
      35              :     {
      36           28 :         factory = std::make_shared<MainThreadFactory>();
      37              :     }
      38            0 :     catch (const std::exception &e)
      39              :     {
      40            0 :         RIALTO_SERVER_LOG_ERROR("Failed to create the main thread factory, reason: %s", e.what());
      41              :     }
      42              : 
      43           28 :     return factory;
      44              : }
      45              : 
      46            4 : std::shared_ptr<IMainThread> MainThreadFactory::getMainThread() const
      47              : {
      48            4 :     std::lock_guard<std::mutex> lock{m_creationMutex};
      49              : 
      50            4 :     std::shared_ptr<IMainThread> mainThread = m_mainThread.lock();
      51            4 :     if (!mainThread)
      52              :     {
      53              :         try
      54              :         {
      55            4 :             mainThread = std::make_shared<MainThread>();
      56              :         }
      57            0 :         catch (const std::exception &e)
      58              :         {
      59            0 :             RIALTO_SERVER_LOG_ERROR("Failed to create the main thread, reason: %s", e.what());
      60              :         }
      61              : 
      62            4 :         m_mainThread = mainThread;
      63              :     }
      64              : 
      65            8 :     return mainThread;
      66            4 : }
      67              : 
      68            8 : MainThread::MainThread() : m_isMainThreadRunning{true}, m_mainThreadClientId{0}, m_nextClientId{1}
      69              : {
      70            8 :     RIALTO_SERVER_LOG_DEBUG("MainThread is constructed");
      71            8 :     m_thread = std::thread(std::bind(&MainThread::mainThreadLoop, this));
      72              : 
      73              :     // Register itself
      74            8 :     m_registeredClients.insert(m_mainThreadClientId);
      75              : }
      76              : 
      77            8 : MainThread::~MainThread()
      78              : {
      79            8 :     RIALTO_SERVER_LOG_DEBUG("MainThread is destructed");
      80           16 :     auto shutdownTask = [this]() { m_isMainThreadRunning = false; };
      81            8 :     enqueueTask(m_mainThreadClientId, shutdownTask);
      82            8 :     m_thread.join();
      83              : }
      84              : 
      85            8 : void MainThread::mainThreadLoop()
      86              : {
      87           44 :     while (m_isMainThreadRunning)
      88              :     {
      89           36 :         const std::shared_ptr<TaskInfo> kTaskInfo = waitForTask();
      90           36 :         if (m_registeredClients.find(kTaskInfo->clientId) != m_registeredClients.end())
      91              :         {
      92           34 :             kTaskInfo->task();
      93              :         }
      94              :         else
      95              :         {
      96            2 :             RIALTO_SERVER_LOG_WARN("Task ignored, client '%d' not registered", kTaskInfo->clientId);
      97              :         }
      98              : 
      99           36 :         if (nullptr != kTaskInfo->cv)
     100              :         {
     101           13 :             std::unique_lock<std::mutex> lockTask(*(kTaskInfo->mutex));
     102           13 :             kTaskInfo->done = true;
     103           13 :             kTaskInfo->cv->notify_one();
     104              :         }
     105           36 :     }
     106            8 : }
     107              : 
     108           36 : const std::shared_ptr<MainThread::TaskInfo> MainThread::waitForTask()
     109              : {
     110           36 :     std::unique_lock<std::mutex> lock(m_taskQueueMutex);
     111           36 :     if (m_taskQueue.empty())
     112              :     {
     113           54 :         m_taskQueueCv.wait(lock, [this] { return !m_taskQueue.empty(); });
     114              :     }
     115           36 :     const auto kTaskInfo = m_taskQueue.front();
     116           36 :     m_taskQueue.pop_front();
     117           72 :     return kTaskInfo;
     118           36 : }
     119              : 
     120           11 : int32_t MainThread::registerClient()
     121              : {
     122           11 :     uint32_t clientId = m_nextClientId++;
     123              : 
     124           11 :     auto task = [&, clientId]()
     125              :     {
     126           11 :         RIALTO_SERVER_LOG_INFO("Registering client '%u'", clientId);
     127           11 :         m_registeredClients.insert(clientId);
     128           22 :     };
     129           11 :     enqueueTask(m_mainThreadClientId, task);
     130              : 
     131           11 :     return clientId;
     132              : }
     133              : 
     134            8 : void MainThread::unregisterClient(uint32_t clientId)
     135              : {
     136            8 :     RIALTO_SERVER_LOG_INFO("Unregistering client '%u'", clientId);
     137            8 :     m_registeredClients.erase(clientId);
     138              : }
     139              : 
     140           23 : void MainThread::enqueueTask(uint32_t clientId, const Task &task)
     141              : {
     142           23 :     std::shared_ptr<TaskInfo> newTask = std::make_shared<TaskInfo>();
     143           23 :     newTask->clientId = clientId;
     144           23 :     newTask->task = task;
     145              :     {
     146           23 :         std::unique_lock<std::mutex> lock(m_taskQueueMutex);
     147           23 :         m_taskQueue.push_back(std::move(newTask));
     148              :     }
     149           23 :     m_taskQueueCv.notify_one();
     150              : }
     151              : 
     152           11 : void MainThread::enqueueTaskAndWait(uint32_t clientId, const Task &task)
     153              : {
     154           11 :     std::shared_ptr<TaskInfo> newTask = std::make_shared<TaskInfo>();
     155           11 :     newTask->clientId = clientId;
     156           11 :     newTask->task = task;
     157           11 :     newTask->mutex = std::make_unique<std::mutex>();
     158           11 :     newTask->cv = std::make_unique<std::condition_variable>();
     159              : 
     160              :     {
     161           11 :         std::unique_lock<std::mutex> lockTask(*(newTask->mutex));
     162              :         {
     163           11 :             std::unique_lock<std::mutex> lockQueue(m_taskQueueMutex);
     164           11 :             m_taskQueue.push_back(newTask);
     165              :         }
     166           11 :         m_taskQueueCv.notify_one();
     167              : 
     168           33 :         newTask->cv->wait(lockTask, [&] { return newTask->done; });
     169           11 :     }
     170              : }
     171              : 
     172            2 : void MainThread::enqueuePriorityTaskAndWait(uint32_t clientId, const Task &task)
     173              : {
     174            2 :     std::shared_ptr<TaskInfo> newTask = std::make_shared<TaskInfo>();
     175            2 :     newTask->clientId = clientId;
     176            2 :     newTask->task = task;
     177            2 :     newTask->mutex = std::make_unique<std::mutex>();
     178            2 :     newTask->cv = std::make_unique<std::condition_variable>();
     179              : 
     180              :     {
     181            2 :         std::unique_lock<std::mutex> lockTask(*(newTask->mutex));
     182              :         {
     183            2 :             std::unique_lock<std::mutex> lockQueue(m_taskQueueMutex);
     184            2 :             m_taskQueue.push_front(newTask);
     185              :         }
     186            2 :         m_taskQueueCv.notify_one();
     187              : 
     188            6 :         newTask->cv->wait(lockTask, [&] { return newTask->done; });
     189            2 :     }
     190              : }
     191              : } // namespace firebolt::rialto::server
        

Generated by: LCOV version 2.0-1