Line data Source code
1 : /*
2 : * Copyright (C) 2023 Sky UK
3 : *
4 : * This library is free software; you can redistribute it and/or
5 : * modify it under the terms of the GNU Lesser General Public
6 : * License as published by the Free Software Foundation;
7 : * version 2.1 of the License.
8 : *
9 : * This library is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 : * Lesser General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU Lesser General Public
15 : * License along with this library; if not, write to the Free Software
16 : * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 : */
18 :
19 : #include "MessageQueue.h"
20 : #include "GstreamerCatLog.h"
21 : #define GST_CAT_DEFAULT rialtoGStreamerCat
22 1696 : CallInEventLoopMessage::CallInEventLoopMessage(const std::function<void()> &func) : m_func(func), m_done{false} {}
23 :
24 1693 : void CallInEventLoopMessage::handle()
25 : {
26 1693 : std::unique_lock<std::mutex> lock(m_callInEventLoopMutex);
27 1693 : m_func();
28 1693 : m_done = true;
29 1693 : m_callInEventLoopCondVar.notify_all();
30 : }
31 :
32 1 : void CallInEventLoopMessage::skip()
33 : {
34 1 : std::unique_lock<std::mutex> lock(m_callInEventLoopMutex);
35 1 : m_done = true;
36 1 : m_callInEventLoopCondVar.notify_all();
37 : }
38 :
39 1694 : void CallInEventLoopMessage::wait()
40 : {
41 1694 : std::unique_lock<std::mutex> lock(m_callInEventLoopMutex);
42 5082 : m_callInEventLoopCondVar.wait(lock, [this]() { return m_done; });
43 1694 : }
44 :
45 1 : ScheduleInEventLoopMessage::ScheduleInEventLoopMessage(const std::function<void()> &func) : m_func(func) {}
46 :
47 1 : void ScheduleInEventLoopMessage::handle()
48 : {
49 1 : m_func();
50 : }
51 :
52 163 : std::shared_ptr<IMessageQueueFactory> IMessageQueueFactory::createFactory()
53 : {
54 163 : return std::make_shared<MessageQueueFactory>();
55 : }
56 :
57 306 : std::unique_ptr<IMessageQueue> MessageQueueFactory::createMessageQueue() const
58 : {
59 306 : return std::make_unique<rialto::MessageQueue>();
60 : }
61 :
62 : namespace rialto
63 : {
64 363 : MessageQueue::MessageQueue() : m_running(false), m_acceptingMessages{false} {}
65 :
66 716 : MessageQueue::~MessageQueue()
67 : {
68 363 : doStop();
69 716 : }
70 :
71 362 : void MessageQueue::start()
72 : {
73 362 : if (m_running)
74 : {
75 : // queue is running
76 1 : return;
77 : }
78 361 : m_running = true;
79 361 : m_acceptingMessages = true;
80 361 : std::thread startThread(&MessageQueue::processMessages, this);
81 361 : m_workerThread.swap(startThread);
82 : }
83 :
84 214 : void MessageQueue::stop()
85 : {
86 214 : doStop();
87 : }
88 :
89 1 : void MessageQueue::clear()
90 : {
91 1 : doClear();
92 : }
93 :
94 1752 : std::shared_ptr<Message> MessageQueue::waitForMessage()
95 : {
96 1752 : std::unique_lock<std::mutex> lock(m_mutex);
97 3327 : while (m_queue.empty())
98 : {
99 1575 : m_condVar.wait(lock);
100 : }
101 1752 : std::shared_ptr<Message> message = m_queue.front();
102 1752 : m_queue.pop_front();
103 3504 : return message;
104 1752 : }
105 :
106 1397 : bool MessageQueue::postMessage(const std::shared_ptr<Message> &msg)
107 : {
108 1397 : const std::lock_guard<std::mutex> lock(m_mutex);
109 1397 : if (!m_running || !m_acceptingMessages)
110 : {
111 3 : GST_ERROR("Message queue is not running or not accepting messages");
112 3 : return false;
113 : }
114 1394 : m_queue.push_back(msg);
115 1394 : m_condVar.notify_all();
116 :
117 1394 : return true;
118 1397 : }
119 :
120 361 : void MessageQueue::processMessages()
121 : {
122 : do
123 : {
124 1752 : std::shared_ptr<Message> message = waitForMessage();
125 1752 : message->handle();
126 1752 : } while (m_running);
127 361 : }
128 :
129 1 : bool MessageQueue::scheduleInEventLoop(const std::function<void()> &func)
130 : {
131 1 : auto message = std::make_shared<ScheduleInEventLoopMessage>(func);
132 1 : if (!postMessage(message))
133 : {
134 0 : return false;
135 : }
136 :
137 1 : return true;
138 : }
139 :
140 1391 : bool MessageQueue::callInEventLoop(const std::function<void()> &func)
141 : {
142 1391 : return callInEventLoopInternal(func);
143 : }
144 :
145 1391 : bool MessageQueue::callInEventLoopInternal(const std::function<void()> &func)
146 : {
147 1391 : if (std::this_thread::get_id() != m_workerThread.get_id())
148 : {
149 1337 : auto message = std::make_shared<CallInEventLoopMessage>(func);
150 1337 : if (!postMessage(message))
151 : {
152 2 : return false;
153 : }
154 1335 : message->wait();
155 1337 : }
156 : else
157 : {
158 54 : func();
159 : }
160 :
161 1389 : return true;
162 : }
163 :
164 577 : void MessageQueue::doStop()
165 : {
166 577 : if (!m_running)
167 : {
168 : // queue is not running
169 216 : return;
170 : }
171 361 : if (std::this_thread::get_id() == m_workerThread.get_id())
172 : {
173 2 : m_acceptingMessages = false;
174 2 : m_running = false;
175 2 : m_workerThread.detach();
176 : }
177 : else
178 : {
179 718 : auto message = std::make_shared<CallInEventLoopMessage>([this]() { m_running = false; });
180 : {
181 359 : const std::lock_guard<std::mutex> lock(m_mutex);
182 359 : m_acceptingMessages = false;
183 359 : m_queue.push_back(message);
184 359 : m_condVar.notify_all();
185 : }
186 359 : message->wait();
187 :
188 359 : if (m_workerThread.joinable())
189 359 : m_workerThread.join();
190 : }
191 :
192 361 : doClear();
193 : }
194 :
195 362 : void MessageQueue::doClear()
196 : {
197 362 : std::unique_lock<std::mutex> lock(m_mutex);
198 363 : while (!m_queue.empty())
199 : {
200 1 : m_queue.front()->skip();
201 1 : m_queue.pop_front();
202 : }
203 362 : }
204 : } // namespace rialto
|