Line data Source code
1 : /*
2 : * If not stated otherwise in this file or this component's LICENSE file the
3 : * following copyright and licenses apply:
4 : *
5 : * Copyright 2022 Sky UK
6 : *
7 : * Licensed under the Apache License, Version 2.0 (the "License");
8 : * you may not use this file except in compliance with the License.
9 : * You may obtain a copy of the License at
10 : *
11 : * http://www.apache.org/licenses/LICENSE-2.0
12 : *
13 : * Unless required by applicable law or agreed to in writing, software
14 : * distributed under the License is distributed on an "AS IS" BASIS,
15 : * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 : * See the License for the specific language governing permissions and
17 : * limitations under the License.
18 : */
19 :
20 : #include "WorkerThread.h"
21 : #include "RialtoServerLogging.h"
22 :
23 : namespace
24 : {
25 :
26 : class FunctionTask : public firebolt::rialto::server::IPlayerTask
27 : {
28 : public:
29 4 : explicit FunctionTask(std::function<void(void)> &&callback) : m_callback(std::move(callback)) {}
30 :
31 8 : ~FunctionTask() override = default;
32 :
33 3 : void execute() const override { m_callback(); }
34 :
35 : private:
36 : std::function<void(void)> m_callback;
37 : };
38 :
39 : } // namespace
40 :
41 : namespace firebolt::rialto::server
42 : {
43 3 : std::unique_ptr<IWorkerThread> WorkerThreadFactory::createWorkerThread() const
44 : {
45 3 : std::unique_ptr<IWorkerThread> workerThread;
46 : try
47 : {
48 3 : workerThread = std::make_unique<WorkerThread>();
49 : }
50 0 : catch (const std::exception &e)
51 : {
52 0 : RIALTO_SERVER_LOG_ERROR("Failed to create the worker thread, reason: %s", e.what());
53 : }
54 3 : return workerThread;
55 : }
56 :
57 3 : WorkerThread::WorkerThread()
58 : {
59 3 : RIALTO_SERVER_LOG_INFO("Worker thread is starting");
60 3 : m_taskThread = std::thread(&WorkerThread::taskHandler, this);
61 : }
62 :
63 6 : WorkerThread::~WorkerThread()
64 : {
65 3 : if (m_taskThread.joinable())
66 : {
67 3 : stop();
68 3 : m_taskThread.join();
69 : }
70 6 : }
71 :
72 4 : void WorkerThread::stop()
73 : {
74 4 : RIALTO_SERVER_LOG_INFO("Stopping worker thread");
75 :
76 7 : auto shutdownTask = [this]() { m_isTaskThreadActive = false; };
77 4 : enqueueTask(std::make_unique<FunctionTask>(std::move(shutdownTask)));
78 : }
79 :
80 6 : void WorkerThread::enqueueTask(std::unique_ptr<IPlayerTask> &&task)
81 : {
82 6 : if (task)
83 : {
84 6 : std::unique_lock<std::mutex> lock(m_taskMutex);
85 6 : m_taskQueue.push(std::move(task));
86 6 : m_taskCV.notify_one();
87 : }
88 : }
89 :
90 3 : void WorkerThread::taskHandler()
91 : {
92 8 : while (m_isTaskThreadActive)
93 : {
94 5 : std::unique_ptr<IPlayerTask> task = waitForTask();
95 5 : task->execute();
96 : }
97 3 : }
98 :
99 5 : std::unique_ptr<IPlayerTask> WorkerThread::waitForTask()
100 : {
101 5 : std::unique_lock<std::mutex> lock(m_taskMutex);
102 5 : if (m_taskQueue.empty())
103 : {
104 12 : m_taskCV.wait(lock, [this] { return !m_taskQueue.empty(); });
105 : }
106 5 : std::unique_ptr<IPlayerTask> task = std::move(m_taskQueue.front());
107 5 : m_taskQueue.pop();
108 10 : return task;
109 5 : }
110 : } // namespace firebolt::rialto::server
|