Line data Source code
1 : /*
2 : * Copyright (C) 2023 Sky UK
3 : *
4 : * This library is free software; you can redistribute it and/or
5 : * modify it under the terms of the GNU Lesser General Public
6 : * License as published by the Free Software Foundation;
7 : * version 2.1 of the License.
8 : *
9 : * This library is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 : * Lesser General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU Lesser General Public
15 : * License along with this library; if not, write to the Free Software
16 : * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 : */
18 :
19 : #include "MessageQueue.h"
20 : #include "GstreamerCatLog.h"
21 : #define GST_CAT_DEFAULT rialtoGStreamerCat
22 1464 : CallInEventLoopMessage::CallInEventLoopMessage(const std::function<void()> &func) : m_func(func), m_done{false} {}
23 :
24 1462 : void CallInEventLoopMessage::handle()
25 : {
26 1462 : std::unique_lock<std::mutex> lock(m_callInEventLoopMutex);
27 1462 : m_func();
28 1462 : m_done = true;
29 1462 : m_callInEventLoopCondVar.notify_all();
30 : }
31 :
32 1 : void CallInEventLoopMessage::skip()
33 : {
34 1 : std::unique_lock<std::mutex> lock(m_callInEventLoopMutex);
35 1 : m_done = true;
36 1 : m_callInEventLoopCondVar.notify_all();
37 : }
38 :
39 1463 : void CallInEventLoopMessage::wait()
40 : {
41 1463 : std::unique_lock<std::mutex> lock(m_callInEventLoopMutex);
42 4389 : m_callInEventLoopCondVar.wait(lock, [this]() { return m_done; });
43 1463 : }
44 :
45 0 : ScheduleInEventLoopMessage::ScheduleInEventLoopMessage(const std::function<void()> &func) : m_func(func) {}
46 :
47 0 : void ScheduleInEventLoopMessage::handle()
48 : {
49 0 : m_func();
50 : }
51 :
52 146 : std::shared_ptr<IMessageQueueFactory> IMessageQueueFactory::createFactory()
53 : {
54 146 : return std::make_shared<MessageQueueFactory>();
55 : }
56 :
57 273 : std::unique_ptr<IMessageQueue> MessageQueueFactory::createMessageQueue() const
58 : {
59 273 : return std::make_unique<MessageQueue>();
60 : }
61 :
62 307 : MessageQueue::MessageQueue() : m_running(false) {}
63 :
64 606 : MessageQueue::~MessageQueue()
65 : {
66 307 : doStop();
67 606 : }
68 :
69 306 : void MessageQueue::start()
70 : {
71 306 : if (m_running)
72 : {
73 : // queue is running
74 1 : return;
75 : }
76 305 : m_running = true;
77 305 : std::thread startThread(&MessageQueue::processMessages, this);
78 305 : m_workerThread.swap(startThread);
79 : }
80 :
81 175 : void MessageQueue::stop()
82 : {
83 175 : doStop();
84 : }
85 :
86 1 : void MessageQueue::clear()
87 : {
88 1 : doClear();
89 : }
90 :
91 1465 : std::shared_ptr<Message> MessageQueue::waitForMessage()
92 : {
93 1465 : std::unique_lock<std::mutex> lock(m_mutex);
94 2779 : while (m_queue.empty())
95 : {
96 1314 : m_condVar.wait(lock);
97 : }
98 1465 : std::shared_ptr<Message> message = m_queue.front();
99 1465 : m_queue.pop_front();
100 2930 : return message;
101 1465 : }
102 :
103 1468 : bool MessageQueue::postMessage(const std::shared_ptr<Message> &msg)
104 : {
105 1468 : const std::lock_guard<std::mutex> lock(m_mutex);
106 1468 : if (!m_running)
107 : {
108 2 : GST_ERROR("Message queue is not running");
109 2 : return false;
110 : }
111 1466 : m_queue.push_back(msg);
112 1466 : m_condVar.notify_all();
113 :
114 1466 : return true;
115 1468 : }
116 :
117 305 : void MessageQueue::processMessages()
118 : {
119 : do
120 : {
121 1465 : std::shared_ptr<Message> message = waitForMessage();
122 1465 : message->handle();
123 1465 : } while (m_running);
124 305 : }
125 :
126 0 : bool MessageQueue::scheduleInEventLoop(const std::function<void()> &func)
127 : {
128 0 : auto message = std::make_shared<ScheduleInEventLoopMessage>(func);
129 0 : if (!postMessage(message))
130 : {
131 0 : return false;
132 : }
133 :
134 0 : return true;
135 : }
136 :
137 1168 : bool MessageQueue::callInEventLoop(const std::function<void()> &func)
138 : {
139 1168 : return callInEventLoopInternal(func);
140 : }
141 :
142 1473 : bool MessageQueue::callInEventLoopInternal(const std::function<void()> &func)
143 : {
144 1473 : if (std::this_thread::get_id() != m_workerThread.get_id())
145 : {
146 1464 : auto message = std::make_shared<CallInEventLoopMessage>(func);
147 1464 : if (!postMessage(message))
148 : {
149 1 : return false;
150 : }
151 1463 : message->wait();
152 1464 : }
153 : else
154 : {
155 9 : func();
156 : }
157 :
158 1472 : return true;
159 : }
160 :
161 482 : void MessageQueue::doStop()
162 : {
163 482 : if (!m_running)
164 : {
165 : // queue is not running
166 177 : return;
167 : }
168 305 : callInEventLoopInternal([this]() { m_running = false; });
169 :
170 305 : if (m_workerThread.joinable())
171 305 : m_workerThread.join();
172 :
173 305 : doClear();
174 : }
175 :
176 306 : void MessageQueue::doClear()
177 : {
178 306 : std::unique_lock<std::mutex> lock(m_mutex);
179 307 : while (!m_queue.empty())
180 : {
181 1 : m_queue.front()->skip();
182 1 : m_queue.pop_front();
183 : }
184 306 : }
|