Line data Source code
1 : /*
2 : * Copyright (C) 2023 Sky UK
3 : *
4 : * This library is free software; you can redistribute it and/or
5 : * modify it under the terms of the GNU Lesser General Public
6 : * License as published by the Free Software Foundation;
7 : * version 2.1 of the License.
8 : *
9 : * This library is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 : * Lesser General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU Lesser General Public
15 : * License along with this library; if not, write to the Free Software
16 : * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 : */
18 :
19 : #include "MessageQueue.h"
20 : #include "GstreamerCatLog.h"
21 : #define GST_CAT_DEFAULT rialtoGStreamerCat
22 1685 : CallInEventLoopMessage::CallInEventLoopMessage(const std::function<void()> &func) : m_func(func), m_done{false} {}
23 :
24 1682 : void CallInEventLoopMessage::handle()
25 : {
26 1682 : std::unique_lock<std::mutex> lock(m_callInEventLoopMutex);
27 1682 : m_func();
28 1682 : m_done = true;
29 1682 : m_callInEventLoopCondVar.notify_all();
30 : }
31 :
32 1 : void CallInEventLoopMessage::skip()
33 : {
34 1 : std::unique_lock<std::mutex> lock(m_callInEventLoopMutex);
35 1 : m_done = true;
36 1 : m_callInEventLoopCondVar.notify_all();
37 : }
38 :
39 1683 : void CallInEventLoopMessage::wait()
40 : {
41 1683 : std::unique_lock<std::mutex> lock(m_callInEventLoopMutex);
42 5048 : m_callInEventLoopCondVar.wait(lock, [this]() { return m_done; });
43 1682 : }
44 :
45 1 : ScheduleInEventLoopMessage::ScheduleInEventLoopMessage(const std::function<void()> &func) : m_func(func) {}
46 :
47 1 : void ScheduleInEventLoopMessage::handle()
48 : {
49 1 : m_func();
50 : }
51 :
52 162 : std::shared_ptr<IMessageQueueFactory> IMessageQueueFactory::createFactory()
53 : {
54 162 : return std::make_shared<MessageQueueFactory>();
55 : }
56 :
57 304 : std::unique_ptr<IMessageQueue> MessageQueueFactory::createMessageQueue() const
58 : {
59 304 : return std::make_unique<rialto::MessageQueue>();
60 : }
61 :
62 : namespace rialto
63 : {
64 361 : MessageQueue::MessageQueue() : m_running(false), m_acceptingMessages{false} {}
65 :
66 712 : MessageQueue::~MessageQueue()
67 : {
68 361 : doStop();
69 712 : }
70 :
71 360 : void MessageQueue::start()
72 : {
73 360 : if (m_running)
74 : {
75 : // queue is running
76 1 : return;
77 : }
78 359 : m_running = true;
79 359 : m_acceptingMessages = true;
80 359 : std::thread startThread(&MessageQueue::processMessages, this);
81 359 : m_workerThread.swap(startThread);
82 : }
83 :
84 213 : void MessageQueue::stop()
85 : {
86 213 : doStop();
87 : }
88 :
89 1 : void MessageQueue::clear()
90 : {
91 1 : doClear();
92 : }
93 :
94 1739 : std::shared_ptr<Message> MessageQueue::waitForMessage()
95 : {
96 1739 : std::unique_lock<std::mutex> lock(m_mutex);
97 3300 : while (m_queue.empty())
98 : {
99 1561 : m_condVar.wait(lock);
100 : }
101 1739 : std::shared_ptr<Message> message = m_queue.front();
102 1739 : m_queue.pop_front();
103 3478 : return message;
104 1739 : }
105 :
106 1386 : bool MessageQueue::postMessage(const std::shared_ptr<Message> &msg)
107 : {
108 1386 : const std::lock_guard<std::mutex> lock(m_mutex);
109 1386 : if (!m_running || !m_acceptingMessages)
110 : {
111 3 : GST_ERROR("Message queue is not running or not accepting messages");
112 3 : return false;
113 : }
114 1383 : m_queue.push_back(msg);
115 1383 : m_condVar.notify_all();
116 :
117 1383 : return true;
118 1386 : }
119 :
120 359 : void MessageQueue::processMessages()
121 : {
122 : do
123 : {
124 1739 : std::shared_ptr<Message> message = waitForMessage();
125 1739 : message->handle();
126 1739 : } while (m_running);
127 359 : }
128 :
129 1 : bool MessageQueue::scheduleInEventLoop(const std::function<void()> &func)
130 : {
131 1 : auto message = std::make_shared<ScheduleInEventLoopMessage>(func);
132 1 : if (!postMessage(message))
133 : {
134 0 : return false;
135 : }
136 :
137 1 : return true;
138 : }
139 :
140 1380 : bool MessageQueue::callInEventLoop(const std::function<void()> &func)
141 : {
142 1380 : return callInEventLoopInternal(func);
143 : }
144 :
145 1380 : bool MessageQueue::callInEventLoopInternal(const std::function<void()> &func)
146 : {
147 1380 : if (std::this_thread::get_id() != m_workerThread.get_id())
148 : {
149 1328 : auto message = std::make_shared<CallInEventLoopMessage>(func);
150 1328 : if (!postMessage(message))
151 : {
152 2 : return false;
153 : }
154 1326 : message->wait();
155 1328 : }
156 : else
157 : {
158 52 : func();
159 : }
160 :
161 1378 : return true;
162 : }
163 :
164 574 : void MessageQueue::doStop()
165 : {
166 574 : if (!m_running)
167 : {
168 : // queue is not running
169 215 : return;
170 : }
171 359 : if (std::this_thread::get_id() == m_workerThread.get_id())
172 : {
173 2 : m_acceptingMessages = false;
174 2 : m_running = false;
175 2 : m_workerThread.detach();
176 : }
177 : else
178 : {
179 714 : auto message = std::make_shared<CallInEventLoopMessage>([this]() { m_running = false; });
180 : {
181 357 : const std::lock_guard<std::mutex> lock(m_mutex);
182 357 : m_acceptingMessages = false;
183 357 : m_queue.push_back(message);
184 357 : m_condVar.notify_all();
185 : }
186 357 : message->wait();
187 :
188 357 : if (m_workerThread.joinable())
189 357 : m_workerThread.join();
190 : }
191 :
192 359 : doClear();
193 : }
194 :
195 360 : void MessageQueue::doClear()
196 : {
197 360 : std::unique_lock<std::mutex> lock(m_mutex);
198 361 : while (!m_queue.empty())
199 : {
200 1 : m_queue.front()->skip();
201 1 : m_queue.pop_front();
202 : }
203 360 : }
204 : } // namespace rialto
|