Line data Source code
1 : /*
2 : * If not stated otherwise in this file or this component's LICENSE file the
3 : * following copyright and licenses apply:
4 : *
5 : * Copyright 2022 Sky UK
6 : *
7 : * Licensed under the Apache License, Version 2.0 (the "License");
8 : * you may not use this file except in compliance with the License.
9 : * You may obtain a copy of the License at
10 : *
11 : * http://www.apache.org/licenses/LICENSE-2.0
12 : *
13 : * Unless required by applicable law or agreed to in writing, software
14 : * distributed under the License is distributed on an "AS IS" BASIS,
15 : * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 : * See the License for the specific language governing permissions and
17 : * limitations under the License.
18 : */
19 :
20 : #include "MainThread.h"
21 : #include "RialtoServerLogging.h"
22 : #include <string>
23 : #include <utility>
24 : #include <vector>
25 :
26 : namespace firebolt::rialto::server
27 : {
28 : std::weak_ptr<IMainThread> MainThreadFactory::m_mainThread;
29 : std::mutex MainThreadFactory::m_creationMutex;
30 :
31 28 : std::shared_ptr<IMainThreadFactory> IMainThreadFactory::createFactory()
32 : {
33 28 : std::shared_ptr<IMainThreadFactory> factory;
34 : try
35 : {
36 28 : factory = std::make_shared<MainThreadFactory>();
37 : }
38 0 : catch (const std::exception &e)
39 : {
40 0 : RIALTO_SERVER_LOG_ERROR("Failed to create the main thread factory, reason: %s", e.what());
41 : }
42 :
43 28 : return factory;
44 : }
45 :
46 4 : std::shared_ptr<IMainThread> MainThreadFactory::getMainThread() const
47 : {
48 4 : std::lock_guard<std::mutex> lock{m_creationMutex};
49 :
50 4 : std::shared_ptr<IMainThread> mainThread = m_mainThread.lock();
51 4 : if (!mainThread)
52 : {
53 : try
54 : {
55 4 : mainThread = std::make_shared<MainThread>();
56 : }
57 0 : catch (const std::exception &e)
58 : {
59 0 : RIALTO_SERVER_LOG_ERROR("Failed to create the main thread, reason: %s", e.what());
60 : }
61 :
62 4 : m_mainThread = mainThread;
63 : }
64 :
65 8 : return mainThread;
66 4 : }
67 :
68 8 : MainThread::MainThread() : m_isMainThreadRunning{true}, m_mainThreadClientId{0}, m_nextClientId{1}
69 : {
70 8 : RIALTO_SERVER_LOG_DEBUG("MainThread is constructed");
71 8 : m_thread = std::thread(std::bind(&MainThread::mainThreadLoop, this));
72 :
73 : // Register itself
74 8 : m_registeredClients.insert(m_mainThreadClientId);
75 : }
76 :
77 8 : MainThread::~MainThread()
78 : {
79 8 : RIALTO_SERVER_LOG_DEBUG("MainThread is destructed");
80 16 : auto shutdownTask = [this]() { m_isMainThreadRunning = false; };
81 8 : enqueueTask(m_mainThreadClientId, shutdownTask);
82 8 : m_thread.join();
83 : }
84 :
85 8 : void MainThread::mainThreadLoop()
86 : {
87 44 : while (m_isMainThreadRunning)
88 : {
89 36 : const std::shared_ptr<TaskInfo> kTaskInfo = waitForTask();
90 36 : if (m_registeredClients.find(kTaskInfo->clientId) != m_registeredClients.end())
91 : {
92 34 : kTaskInfo->task();
93 : }
94 : else
95 : {
96 2 : RIALTO_SERVER_LOG_WARN("Task ignored, client '%d' not registered", kTaskInfo->clientId);
97 : }
98 :
99 36 : if (nullptr != kTaskInfo->cv)
100 : {
101 13 : std::unique_lock<std::mutex> lockTask(*(kTaskInfo->mutex));
102 13 : kTaskInfo->done = true;
103 13 : kTaskInfo->cv->notify_one();
104 : }
105 36 : }
106 8 : }
107 :
108 36 : const std::shared_ptr<MainThread::TaskInfo> MainThread::waitForTask()
109 : {
110 36 : std::unique_lock<std::mutex> lock(m_taskQueueMutex);
111 36 : if (m_taskQueue.empty())
112 : {
113 54 : m_taskQueueCv.wait(lock, [this] { return !m_taskQueue.empty(); });
114 : }
115 36 : const auto kTaskInfo = m_taskQueue.front();
116 36 : m_taskQueue.pop_front();
117 72 : return kTaskInfo;
118 36 : }
119 :
120 11 : int32_t MainThread::registerClient()
121 : {
122 11 : uint32_t clientId = m_nextClientId++;
123 :
124 11 : auto task = [&, clientId]()
125 : {
126 11 : RIALTO_SERVER_LOG_INFO("Registering client '%u'", clientId);
127 11 : m_registeredClients.insert(clientId);
128 22 : };
129 11 : enqueueTask(m_mainThreadClientId, task);
130 :
131 11 : return clientId;
132 : }
133 :
134 8 : void MainThread::unregisterClient(uint32_t clientId)
135 : {
136 8 : RIALTO_SERVER_LOG_INFO("Unregistering client '%u'", clientId);
137 8 : m_registeredClients.erase(clientId);
138 : }
139 :
140 23 : void MainThread::enqueueTask(uint32_t clientId, const Task &task)
141 : {
142 23 : std::shared_ptr<TaskInfo> newTask = std::make_shared<TaskInfo>();
143 23 : newTask->clientId = clientId;
144 23 : newTask->task = task;
145 : {
146 23 : std::unique_lock<std::mutex> lock(m_taskQueueMutex);
147 23 : m_taskQueue.push_back(std::move(newTask));
148 : }
149 23 : m_taskQueueCv.notify_one();
150 : }
151 :
152 11 : void MainThread::enqueueTaskAndWait(uint32_t clientId, const Task &task)
153 : {
154 11 : std::shared_ptr<TaskInfo> newTask = std::make_shared<TaskInfo>();
155 11 : newTask->clientId = clientId;
156 11 : newTask->task = task;
157 11 : newTask->mutex = std::make_unique<std::mutex>();
158 11 : newTask->cv = std::make_unique<std::condition_variable>();
159 :
160 : {
161 11 : std::unique_lock<std::mutex> lockTask(*(newTask->mutex));
162 : {
163 11 : std::unique_lock<std::mutex> lockQueue(m_taskQueueMutex);
164 11 : m_taskQueue.push_back(newTask);
165 : }
166 11 : m_taskQueueCv.notify_one();
167 :
168 33 : newTask->cv->wait(lockTask, [&] { return newTask->done; });
169 11 : }
170 : }
171 :
172 2 : void MainThread::enqueuePriorityTaskAndWait(uint32_t clientId, const Task &task)
173 : {
174 2 : std::shared_ptr<TaskInfo> newTask = std::make_shared<TaskInfo>();
175 2 : newTask->clientId = clientId;
176 2 : newTask->task = task;
177 2 : newTask->mutex = std::make_unique<std::mutex>();
178 2 : newTask->cv = std::make_unique<std::condition_variable>();
179 :
180 : {
181 2 : std::unique_lock<std::mutex> lockTask(*(newTask->mutex));
182 : {
183 2 : std::unique_lock<std::mutex> lockQueue(m_taskQueueMutex);
184 2 : m_taskQueue.push_front(newTask);
185 : }
186 2 : m_taskQueueCv.notify_one();
187 :
188 6 : newTask->cv->wait(lockTask, [&] { return newTask->done; });
189 2 : }
190 : }
191 : } // namespace firebolt::rialto::server
|