Line data Source code
1 : /*
2 : * If not stated otherwise in this file or this component's LICENSE file the
3 : * following copyright and licenses apply:
4 : *
5 : * Copyright 2022 Sky UK
6 : *
7 : * Licensed under the Apache License, Version 2.0 (the "License");
8 : * you may not use this file except in compliance with the License.
9 : * You may obtain a copy of the License at
10 : *
11 : * http://www.apache.org/licenses/LICENSE-2.0
12 : *
13 : * Unless required by applicable law or agreed to in writing, software
14 : * distributed under the License is distributed on an "AS IS" BASIS,
15 : * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 : * See the License for the specific language governing permissions and
17 : * limitations under the License.
18 : */
19 :
20 : #include "WorkerThread.h"
21 : #include "RialtoServerLogging.h"
22 :
23 : namespace
24 : {
25 :
26 : class FunctionTask : public firebolt::rialto::server::IPlayerTask
27 : {
28 : public:
29 6 : explicit FunctionTask(std::function<void(void)> &&callback) : m_callback(std::move(callback)) {}
30 :
31 12 : ~FunctionTask() override = default;
32 :
33 3 : void execute() const override { m_callback(); }
34 :
35 : private:
36 : std::function<void(void)> m_callback;
37 : };
38 :
39 : } // namespace
40 :
41 : namespace firebolt::rialto::server
42 : {
43 3 : std::unique_ptr<IWorkerThread> WorkerThreadFactory::createWorkerThread() const
44 : {
45 3 : std::unique_ptr<IWorkerThread> workerThread;
46 : try
47 : {
48 3 : workerThread = std::make_unique<WorkerThread>();
49 : }
50 0 : catch (const std::exception &e)
51 : {
52 0 : RIALTO_SERVER_LOG_ERROR("Failed to create the worker thread, reason: %s", e.what());
53 : }
54 3 : return workerThread;
55 : }
56 :
57 3 : WorkerThread::WorkerThread()
58 : {
59 3 : RIALTO_SERVER_LOG_INFO("Worker thread is starting");
60 3 : m_taskThread = std::thread(&WorkerThread::taskHandler, this);
61 : }
62 :
63 6 : WorkerThread::~WorkerThread()
64 : {
65 3 : stop();
66 3 : join();
67 6 : }
68 :
69 6 : void WorkerThread::stop()
70 : {
71 6 : RIALTO_SERVER_LOG_INFO("Stopping worker thread");
72 :
73 9 : auto shutdownTask = [this]() { m_isTaskThreadActive = false; };
74 6 : enqueueTask(std::make_unique<FunctionTask>(std::move(shutdownTask)));
75 : }
76 :
77 5 : void WorkerThread::join()
78 : {
79 5 : if (m_taskThread.joinable())
80 : {
81 3 : m_taskThread.join();
82 : }
83 5 : }
84 :
85 10 : void WorkerThread::enqueueTask(std::unique_ptr<IPlayerTask> &&task)
86 : {
87 10 : if (task)
88 : {
89 10 : std::unique_lock<std::mutex> lock(m_taskMutex);
90 10 : m_taskQueue.push(std::move(task));
91 10 : m_taskCV.notify_one();
92 : }
93 : }
94 :
95 3 : void WorkerThread::taskHandler()
96 : {
97 10 : while (m_isTaskThreadActive)
98 : {
99 7 : std::unique_ptr<IPlayerTask> task = waitForTask();
100 7 : task->execute();
101 : }
102 3 : }
103 :
104 7 : std::unique_ptr<IPlayerTask> WorkerThread::waitForTask()
105 : {
106 7 : std::unique_lock<std::mutex> lock(m_taskMutex);
107 7 : if (m_taskQueue.empty())
108 : {
109 12 : m_taskCV.wait(lock, [this] { return !m_taskQueue.empty(); });
110 : }
111 7 : std::unique_ptr<IPlayerTask> task = std::move(m_taskQueue.front());
112 7 : m_taskQueue.pop();
113 14 : return task;
114 7 : }
115 : } // namespace firebolt::rialto::server
|