Line data Source code
1 : /*
2 : * If not stated otherwise in this file or this component's LICENSE file the
3 : * following copyright and licenses apply:
4 : *
5 : * Copyright 2022 Sky UK
6 : *
7 : * Licensed under the Apache License, Version 2.0 (the "License");
8 : * you may not use this file except in compliance with the License.
9 : * You may obtain a copy of the License at
10 : *
11 : * http://www.apache.org/licenses/LICENSE-2.0
12 : *
13 : * Unless required by applicable law or agreed to in writing, software
14 : * distributed under the License is distributed on an "AS IS" BASIS,
15 : * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 : * See the License for the specific language governing permissions and
17 : * limitations under the License.
18 : */
19 :
20 : #include "WorkerThread.h"
21 : #include "RialtoServerLogging.h"
22 :
23 : namespace
24 : {
25 :
26 : class FunctionTask : public firebolt::rialto::server::IPlayerTask
27 : {
28 : public:
29 8 : explicit FunctionTask(std::function<void(void)> &&callback) : m_callback(std::move(callback)) {}
30 :
31 16 : ~FunctionTask() override = default;
32 :
33 4 : void execute() const override { m_callback(); }
34 :
35 : private:
36 : std::function<void(void)> m_callback;
37 : };
38 :
39 : } // namespace
40 :
41 : namespace firebolt::rialto::server
42 : {
43 4 : std::unique_ptr<IWorkerThread> WorkerThreadFactory::createWorkerThread() const
44 : {
45 4 : std::unique_ptr<IWorkerThread> workerThread;
46 : try
47 : {
48 4 : workerThread = std::make_unique<WorkerThread>();
49 : }
50 0 : catch (const std::exception &e)
51 : {
52 0 : RIALTO_SERVER_LOG_ERROR("Failed to create the worker thread, reason: %s", e.what());
53 : }
54 4 : return workerThread;
55 : }
56 :
57 4 : WorkerThread::WorkerThread()
58 : {
59 4 : RIALTO_SERVER_LOG_INFO("Worker thread is starting");
60 4 : m_taskThread = std::thread(&WorkerThread::taskHandler, this);
61 : }
62 :
63 8 : WorkerThread::~WorkerThread()
64 : {
65 4 : stop();
66 4 : join();
67 8 : }
68 :
69 8 : void WorkerThread::stop()
70 : {
71 8 : RIALTO_SERVER_LOG_INFO("Stopping worker thread");
72 :
73 12 : auto shutdownTask = [this]() { m_isTaskThreadActive = false; };
74 8 : enqueueTask(std::make_unique<FunctionTask>(std::move(shutdownTask)));
75 : }
76 :
77 6 : void WorkerThread::join()
78 : {
79 6 : if (m_taskThread.joinable())
80 : {
81 4 : m_taskThread.join();
82 : }
83 6 : }
84 :
85 13 : void WorkerThread::enqueueTask(std::unique_ptr<IPlayerTask> &&task)
86 : {
87 13 : if (task)
88 : {
89 13 : std::unique_lock<std::mutex> lock(m_taskMutex);
90 13 : m_taskQueue.push_back(std::move(task));
91 13 : m_taskCV.notify_one();
92 : }
93 : }
94 :
95 1 : void WorkerThread::enqueuePriorityTask(std::unique_ptr<IPlayerTask> &&task)
96 : {
97 1 : if (task)
98 : {
99 1 : std::unique_lock<std::mutex> lock(m_taskMutex);
100 1 : m_taskQueue.push_front(std::move(task));
101 1 : m_taskCV.notify_one();
102 : }
103 : }
104 :
105 4 : void WorkerThread::taskHandler()
106 : {
107 14 : while (m_isTaskThreadActive)
108 : {
109 10 : std::unique_ptr<IPlayerTask> task = waitForTask();
110 10 : task->execute();
111 : }
112 4 : }
113 :
114 10 : std::unique_ptr<IPlayerTask> WorkerThread::waitForTask()
115 : {
116 10 : std::unique_lock<std::mutex> lock(m_taskMutex);
117 10 : if (m_taskQueue.empty())
118 : {
119 18 : m_taskCV.wait(lock, [this] { return !m_taskQueue.empty(); });
120 : }
121 10 : std::unique_ptr<IPlayerTask> task = std::move(m_taskQueue.front());
122 10 : m_taskQueue.pop_front();
123 20 : return task;
124 10 : }
125 : } // namespace firebolt::rialto::server
|