LCOV - code coverage report
Current view: top level - media/server/main/source - MainThread.cpp (source / functions) Coverage Total Hit
Test: coverage.info Lines: 95.2 % 84 80
Test Date: 2025-02-18 13:13:53 Functions: 100.0 % 14 14

            Line data    Source code
       1              : /*
       2              :  * If not stated otherwise in this file or this component's LICENSE file the
       3              :  * following copyright and licenses apply:
       4              :  *
       5              :  * Copyright 2022 Sky UK
       6              :  *
       7              :  * Licensed under the Apache License, Version 2.0 (the "License");
       8              :  * you may not use this file except in compliance with the License.
       9              :  * You may obtain a copy of the License at
      10              :  *
      11              :  * http://www.apache.org/licenses/LICENSE-2.0
      12              :  *
      13              :  * Unless required by applicable law or agreed to in writing, software
      14              :  * distributed under the License is distributed on an "AS IS" BASIS,
      15              :  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      16              :  * See the License for the specific language governing permissions and
      17              :  * limitations under the License.
      18              :  */
      19              : 
      20              : #include "MainThread.h"
      21              : #include "RialtoServerLogging.h"
      22              : #include <string>
      23              : #include <utility>
      24              : #include <vector>
      25              : 
      26              : namespace firebolt::rialto::server
      27              : {
      28              : std::weak_ptr<IMainThread> MainThreadFactory::m_mainThread;
      29              : std::mutex MainThreadFactory::m_creationMutex;
      30              : 
      31           28 : std::shared_ptr<IMainThreadFactory> IMainThreadFactory::createFactory()
      32              : {
      33           28 :     std::shared_ptr<IMainThreadFactory> factory;
      34              :     try
      35              :     {
      36           28 :         factory = std::make_shared<MainThreadFactory>();
      37              :     }
      38            0 :     catch (const std::exception &e)
      39              :     {
      40            0 :         RIALTO_SERVER_LOG_ERROR("Failed to create the main thread factory, reason: %s", e.what());
      41              :     }
      42              : 
      43           28 :     return factory;
      44              : }
      45              : 
      46            4 : std::shared_ptr<IMainThread> MainThreadFactory::getMainThread() const
      47              : {
      48            4 :     std::lock_guard<std::mutex> lock{m_creationMutex};
      49              : 
      50            4 :     std::shared_ptr<IMainThread> mainThread = m_mainThread.lock();
      51            4 :     if (!mainThread)
      52              :     {
      53              :         try
      54              :         {
      55            4 :             mainThread = std::make_shared<MainThread>();
      56              :         }
      57            0 :         catch (const std::exception &e)
      58              :         {
      59            0 :             RIALTO_SERVER_LOG_ERROR("Failed to create the main thread, reason: %s", e.what());
      60              :         }
      61              : 
      62            4 :         m_mainThread = mainThread;
      63              :     }
      64              : 
      65            8 :     return mainThread;
      66            4 : }
      67              : 
      68            8 : MainThread::MainThread() : m_isMainThreadRunning{true}, m_mainThreadClientId{0}, m_nextClientId{1}
      69              : {
      70            8 :     RIALTO_SERVER_LOG_DEBUG("MainThread is constructed");
      71            8 :     m_thread = std::thread(std::bind(&MainThread::mainThreadLoop, this));
      72              : 
      73              :     // Register itself
      74            8 :     m_registeredClients.insert(m_mainThreadClientId);
      75              : }
      76              : 
      77            8 : MainThread::~MainThread()
      78              : {
      79            8 :     RIALTO_SERVER_LOG_DEBUG("MainThread is destructed");
      80           16 :     auto shutdownTask = [this]() { m_isMainThreadRunning = false; };
      81            8 :     enqueueTask(m_mainThreadClientId, shutdownTask);
      82            8 :     m_thread.join();
      83              : }
      84              : 
      85            8 : void MainThread::mainThreadLoop()
      86              : {
      87           44 :     while (m_isMainThreadRunning)
      88              :     {
      89           36 :         const std::shared_ptr<TaskInfo> kTaskInfo = waitForTask();
      90           36 :         if (m_registeredClients.find(kTaskInfo->clientId) != m_registeredClients.end())
      91              :         {
      92           34 :             kTaskInfo->task();
      93              :         }
      94              :         else
      95              :         {
      96            2 :             RIALTO_SERVER_LOG_WARN("Task ignored, client '%d' not registered", kTaskInfo->clientId);
      97              :         }
      98              : 
      99           36 :         if (nullptr != kTaskInfo->cv)
     100              :         {
     101           13 :             std::unique_lock<std::mutex> lockTask(*(kTaskInfo->mutex));
     102           13 :             kTaskInfo->cv->notify_one();
     103              :         }
     104           36 :     }
     105            8 : }
     106              : 
     107           36 : const std::shared_ptr<MainThread::TaskInfo> MainThread::waitForTask()
     108              : {
     109           36 :     std::unique_lock<std::mutex> lock(m_taskQueueMutex);
     110           36 :     if (m_taskQueue.empty())
     111              :     {
     112           54 :         m_taskQueueCv.wait(lock, [this] { return !m_taskQueue.empty(); });
     113              :     }
     114           36 :     const std::shared_ptr<TaskInfo> kTaskInfo = m_taskQueue.front();
     115           36 :     m_taskQueue.pop_front();
     116           72 :     return kTaskInfo;
     117           36 : }
     118              : 
     119           11 : int32_t MainThread::registerClient()
     120              : {
     121           11 :     uint32_t clientId = m_nextClientId++;
     122              : 
     123           11 :     auto task = [&, clientId]()
     124              :     {
     125           11 :         RIALTO_SERVER_LOG_INFO("Registering client '%u'", clientId);
     126           11 :         m_registeredClients.insert(clientId);
     127           22 :     };
     128           11 :     enqueueTask(m_mainThreadClientId, task);
     129              : 
     130           11 :     return clientId;
     131              : }
     132              : 
     133            8 : void MainThread::unregisterClient(uint32_t clientId)
     134              : {
     135            8 :     RIALTO_SERVER_LOG_INFO("Unregistering client '%u'", clientId);
     136            8 :     m_registeredClients.erase(clientId);
     137              : }
     138              : 
     139           23 : void MainThread::enqueueTask(uint32_t clientId, Task task)
     140              : {
     141           23 :     std::shared_ptr<TaskInfo> newTask = std::make_shared<TaskInfo>();
     142           23 :     newTask->clientId = clientId;
     143           23 :     newTask->task = task;
     144              :     {
     145           23 :         std::unique_lock<std::mutex> lock(m_taskQueueMutex);
     146           23 :         m_taskQueue.push_back(newTask);
     147              :     }
     148           23 :     m_taskQueueCv.notify_one();
     149              : }
     150              : 
     151           11 : void MainThread::enqueueTaskAndWait(uint32_t clientId, Task task)
     152              : {
     153           11 :     std::shared_ptr<TaskInfo> newTask = std::make_shared<TaskInfo>();
     154           11 :     newTask->clientId = clientId;
     155           11 :     newTask->task = task;
     156           11 :     newTask->mutex = std::make_unique<std::mutex>();
     157           11 :     newTask->cv = std::make_unique<std::condition_variable>();
     158              : 
     159              :     {
     160           11 :         std::unique_lock<std::mutex> lockTask(*(newTask->mutex));
     161              :         {
     162           11 :             std::unique_lock<std::mutex> lockQueue(m_taskQueueMutex);
     163           11 :             m_taskQueue.push_back(newTask);
     164              :         }
     165           11 :         m_taskQueueCv.notify_one();
     166              : 
     167           11 :         newTask->cv->wait(lockTask);
     168              :     }
     169              : }
     170              : 
     171            2 : void MainThread::enqueuePriorityTaskAndWait(uint32_t clientId, Task task)
     172              : {
     173            2 :     std::shared_ptr<TaskInfo> newTask = std::make_shared<TaskInfo>();
     174            2 :     newTask->clientId = clientId;
     175            2 :     newTask->task = task;
     176            2 :     newTask->mutex = std::make_unique<std::mutex>();
     177            2 :     newTask->cv = std::make_unique<std::condition_variable>();
     178              : 
     179              :     {
     180            2 :         std::unique_lock<std::mutex> lockTask(*(newTask->mutex));
     181              :         {
     182            2 :             std::unique_lock<std::mutex> lockQueue(m_taskQueueMutex);
     183            2 :             m_taskQueue.push_front(newTask);
     184              :         }
     185            2 :         m_taskQueueCv.notify_one();
     186              : 
     187            2 :         newTask->cv->wait(lockTask);
     188              :     }
     189              : }
     190              : } // namespace firebolt::rialto::server
        

Generated by: LCOV version 2.0-1