Line data Source code
1 : /*
2 : * If not stated otherwise in this file or this component's LICENSE file the
3 : * following copyright and licenses apply:
4 : *
5 : * Copyright 2022 Sky UK
6 : *
7 : * Licensed under the Apache License, Version 2.0 (the "License");
8 : * you may not use this file except in compliance with the License.
9 : * You may obtain a copy of the License at
10 : *
11 : * http://www.apache.org/licenses/LICENSE-2.0
12 : *
13 : * Unless required by applicable law or agreed to in writing, software
14 : * distributed under the License is distributed on an "AS IS" BASIS,
15 : * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 : * See the License for the specific language governing permissions and
17 : * limitations under the License.
18 : */
19 :
20 : #include "MainThread.h"
21 : #include "RialtoServerLogging.h"
22 : #include <string>
23 : #include <utility>
24 : #include <vector>
25 :
26 : namespace firebolt::rialto::server
27 : {
28 : std::weak_ptr<IMainThread> MainThreadFactory::m_mainThread;
29 : std::mutex MainThreadFactory::m_creationMutex;
30 :
31 28 : std::shared_ptr<IMainThreadFactory> IMainThreadFactory::createFactory()
32 : {
33 28 : std::shared_ptr<IMainThreadFactory> factory;
34 : try
35 : {
36 28 : factory = std::make_shared<MainThreadFactory>();
37 : }
38 0 : catch (const std::exception &e)
39 : {
40 0 : RIALTO_SERVER_LOG_ERROR("Failed to create the main thread factory, reason: %s", e.what());
41 : }
42 :
43 28 : return factory;
44 : }
45 :
46 4 : std::shared_ptr<IMainThread> MainThreadFactory::getMainThread() const
47 : {
48 4 : std::lock_guard<std::mutex> lock{m_creationMutex};
49 :
50 4 : std::shared_ptr<IMainThread> mainThread = m_mainThread.lock();
51 4 : if (!mainThread)
52 : {
53 : try
54 : {
55 4 : mainThread = std::make_shared<MainThread>();
56 : }
57 0 : catch (const std::exception &e)
58 : {
59 0 : RIALTO_SERVER_LOG_ERROR("Failed to create the main thread, reason: %s", e.what());
60 : }
61 :
62 4 : m_mainThread = mainThread;
63 : }
64 :
65 8 : return mainThread;
66 4 : }
67 :
68 8 : MainThread::MainThread() : m_isMainThreadRunning{true}, m_mainThreadClientId{0}, m_nextClientId{1}
69 : {
70 8 : RIALTO_SERVER_LOG_DEBUG("MainThread is constructed");
71 8 : m_thread = std::thread(std::bind(&MainThread::mainThreadLoop, this));
72 :
73 : // Register itself
74 8 : m_registeredClients.insert(m_mainThreadClientId);
75 : }
76 :
77 8 : MainThread::~MainThread()
78 : {
79 8 : RIALTO_SERVER_LOG_DEBUG("MainThread is destructed");
80 16 : auto shutdownTask = [this]() { m_isMainThreadRunning = false; };
81 8 : enqueueTask(m_mainThreadClientId, shutdownTask);
82 8 : m_thread.join();
83 : }
84 :
85 8 : void MainThread::mainThreadLoop()
86 : {
87 44 : while (m_isMainThreadRunning)
88 : {
89 36 : const std::shared_ptr<TaskInfo> kTaskInfo = waitForTask();
90 36 : if (m_registeredClients.find(kTaskInfo->clientId) != m_registeredClients.end())
91 : {
92 34 : kTaskInfo->task();
93 : }
94 : else
95 : {
96 2 : RIALTO_SERVER_LOG_WARN("Task ignored, client '%d' not registered", kTaskInfo->clientId);
97 : }
98 :
99 36 : if (nullptr != kTaskInfo->cv)
100 : {
101 13 : std::unique_lock<std::mutex> lockTask(*(kTaskInfo->mutex));
102 13 : kTaskInfo->cv->notify_one();
103 : }
104 36 : }
105 8 : }
106 :
107 36 : const std::shared_ptr<MainThread::TaskInfo> MainThread::waitForTask()
108 : {
109 36 : std::unique_lock<std::mutex> lock(m_taskQueueMutex);
110 36 : if (m_taskQueue.empty())
111 : {
112 54 : m_taskQueueCv.wait(lock, [this] { return !m_taskQueue.empty(); });
113 : }
114 36 : const std::shared_ptr<TaskInfo> kTaskInfo = m_taskQueue.front();
115 36 : m_taskQueue.pop_front();
116 72 : return kTaskInfo;
117 36 : }
118 :
119 11 : int32_t MainThread::registerClient()
120 : {
121 11 : uint32_t clientId = m_nextClientId++;
122 :
123 11 : auto task = [&, clientId]()
124 : {
125 11 : RIALTO_SERVER_LOG_INFO("Registering client '%u'", clientId);
126 11 : m_registeredClients.insert(clientId);
127 22 : };
128 11 : enqueueTask(m_mainThreadClientId, task);
129 :
130 11 : return clientId;
131 : }
132 :
133 8 : void MainThread::unregisterClient(uint32_t clientId)
134 : {
135 8 : RIALTO_SERVER_LOG_INFO("Unregistering client '%u'", clientId);
136 8 : m_registeredClients.erase(clientId);
137 : }
138 :
139 23 : void MainThread::enqueueTask(uint32_t clientId, Task task)
140 : {
141 23 : std::shared_ptr<TaskInfo> newTask = std::make_shared<TaskInfo>();
142 23 : newTask->clientId = clientId;
143 23 : newTask->task = task;
144 : {
145 23 : std::unique_lock<std::mutex> lock(m_taskQueueMutex);
146 23 : m_taskQueue.push_back(newTask);
147 : }
148 23 : m_taskQueueCv.notify_one();
149 : }
150 :
151 11 : void MainThread::enqueueTaskAndWait(uint32_t clientId, Task task)
152 : {
153 11 : std::shared_ptr<TaskInfo> newTask = std::make_shared<TaskInfo>();
154 11 : newTask->clientId = clientId;
155 11 : newTask->task = task;
156 11 : newTask->mutex = std::make_unique<std::mutex>();
157 11 : newTask->cv = std::make_unique<std::condition_variable>();
158 :
159 : {
160 11 : std::unique_lock<std::mutex> lockTask(*(newTask->mutex));
161 : {
162 11 : std::unique_lock<std::mutex> lockQueue(m_taskQueueMutex);
163 11 : m_taskQueue.push_back(newTask);
164 : }
165 11 : m_taskQueueCv.notify_one();
166 :
167 11 : newTask->cv->wait(lockTask);
168 : }
169 : }
170 :
171 2 : void MainThread::enqueuePriorityTaskAndWait(uint32_t clientId, Task task)
172 : {
173 2 : std::shared_ptr<TaskInfo> newTask = std::make_shared<TaskInfo>();
174 2 : newTask->clientId = clientId;
175 2 : newTask->task = task;
176 2 : newTask->mutex = std::make_unique<std::mutex>();
177 2 : newTask->cv = std::make_unique<std::condition_variable>();
178 :
179 : {
180 2 : std::unique_lock<std::mutex> lockTask(*(newTask->mutex));
181 : {
182 2 : std::unique_lock<std::mutex> lockQueue(m_taskQueueMutex);
183 2 : m_taskQueue.push_front(newTask);
184 : }
185 2 : m_taskQueueCv.notify_one();
186 :
187 2 : newTask->cv->wait(lockTask);
188 : }
189 : }
190 : } // namespace firebolt::rialto::server
|