Line data Source code
1 : /*
2 : * If not stated otherwise in this file or this component's LICENSE file the
3 : * following copyright and licenses apply:
4 : *
5 : * Copyright 2022 Sky UK
6 : *
7 : * Licensed under the Apache License, Version 2.0 (the "License");
8 : * you may not use this file except in compliance with the License.
9 : * You may obtain a copy of the License at
10 : *
11 : * http://www.apache.org/licenses/LICENSE-2.0
12 : *
13 : * Unless required by applicable law or agreed to in writing, software
14 : * distributed under the License is distributed on an "AS IS" BASIS,
15 : * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 : * See the License for the specific language governing permissions and
17 : * limitations under the License.
18 : */
19 :
20 : #include "WorkerThread.h"
21 : #include "RialtoServerLogging.h"
22 :
23 : namespace firebolt::rialto::server
24 : {
25 3 : std::unique_ptr<IWorkerThread> WorkerThreadFactory::createWorkerThread() const
26 : {
27 3 : std::unique_ptr<IWorkerThread> workerThread;
28 : try
29 : {
30 3 : workerThread = std::make_unique<WorkerThread>();
31 : }
32 0 : catch (const std::exception &e)
33 : {
34 0 : RIALTO_SERVER_LOG_ERROR("Failed to create the worker thread, reason: %s", e.what());
35 : }
36 3 : return workerThread;
37 : }
38 :
39 3 : WorkerThread::WorkerThread()
40 : {
41 3 : RIALTO_SERVER_LOG_INFO("Worker thread is starting");
42 3 : m_taskThread = std::thread(&WorkerThread::taskHandler, this);
43 : }
44 :
45 6 : WorkerThread::~WorkerThread()
46 : {
47 3 : if (m_taskThread.joinable())
48 : {
49 1 : m_taskThread.join();
50 : }
51 6 : }
52 :
53 3 : void WorkerThread::stop()
54 : {
55 3 : RIALTO_SERVER_LOG_INFO("Stopping worker thread");
56 3 : if (m_isTaskThreadActive)
57 : {
58 3 : m_isTaskThreadActive = false;
59 : }
60 : }
61 :
62 2 : void WorkerThread::join()
63 : {
64 2 : if (m_taskThread.joinable())
65 : {
66 2 : m_taskThread.join();
67 : }
68 : }
69 :
70 4 : void WorkerThread::enqueueTask(std::unique_ptr<IPlayerTask> &&task)
71 : {
72 : {
73 4 : std::unique_lock<std::mutex> lock(m_taskMutex);
74 4 : m_taskQueue.push(std::move(task));
75 : }
76 4 : m_taskCV.notify_one();
77 : }
78 :
79 3 : void WorkerThread::taskHandler()
80 : {
81 7 : while (m_isTaskThreadActive)
82 : {
83 4 : std::unique_ptr<IPlayerTask> task = waitForTask();
84 4 : task->execute();
85 : }
86 3 : }
87 :
88 4 : std::unique_ptr<IPlayerTask> WorkerThread::waitForTask()
89 : {
90 4 : std::unique_lock<std::mutex> lock(m_taskMutex);
91 4 : if (m_taskQueue.empty())
92 : {
93 12 : m_taskCV.wait(lock, [this] { return !m_taskQueue.empty(); });
94 : }
95 4 : std::unique_ptr<IPlayerTask> task = std::move(m_taskQueue.front());
96 4 : m_taskQueue.pop();
97 8 : return task;
98 4 : }
99 : } // namespace firebolt::rialto::server
|