Line data Source code
1 : /*
2 : * Copyright (C) 2023 Sky UK
3 : *
4 : * This library is free software; you can redistribute it and/or
5 : * modify it under the terms of the GNU Lesser General Public
6 : * License as published by the Free Software Foundation;
7 : * version 2.1 of the License.
8 : *
9 : * This library is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 : * Lesser General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU Lesser General Public
15 : * License along with this library; if not, write to the Free Software
16 : * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 : */
18 :
19 : #include "MessageQueue.h"
20 : #include "GstreamerCatLog.h"
21 : #define GST_CAT_DEFAULT rialtoGStreamerCat
22 1483 : CallInEventLoopMessage::CallInEventLoopMessage(const std::function<void()> &func) : m_func(func), m_done{false} {}
23 :
24 1481 : void CallInEventLoopMessage::handle()
25 : {
26 1481 : std::unique_lock<std::mutex> lock(m_callInEventLoopMutex);
27 1481 : m_func();
28 1481 : m_done = true;
29 1481 : m_callInEventLoopCondVar.notify_all();
30 : }
31 :
32 1 : void CallInEventLoopMessage::skip()
33 : {
34 1 : std::unique_lock<std::mutex> lock(m_callInEventLoopMutex);
35 1 : m_done = true;
36 1 : m_callInEventLoopCondVar.notify_all();
37 : }
38 :
39 1482 : void CallInEventLoopMessage::wait()
40 : {
41 1482 : std::unique_lock<std::mutex> lock(m_callInEventLoopMutex);
42 4446 : m_callInEventLoopCondVar.wait(lock, [this]() { return m_done; });
43 1482 : }
44 :
45 0 : ScheduleInEventLoopMessage::ScheduleInEventLoopMessage(const std::function<void()> &func) : m_func(func) {}
46 :
47 0 : void ScheduleInEventLoopMessage::handle()
48 : {
49 0 : m_func();
50 : }
51 :
52 151 : std::shared_ptr<IMessageQueueFactory> IMessageQueueFactory::createFactory()
53 : {
54 151 : return std::make_shared<MessageQueueFactory>();
55 : }
56 :
57 283 : std::unique_ptr<IMessageQueue> MessageQueueFactory::createMessageQueue() const
58 : {
59 283 : return std::make_unique<MessageQueue>();
60 : }
61 :
62 317 : MessageQueue::MessageQueue() : m_running(false) {}
63 :
64 626 : MessageQueue::~MessageQueue()
65 : {
66 317 : doStop();
67 626 : }
68 :
69 316 : void MessageQueue::start()
70 : {
71 316 : if (m_running)
72 : {
73 : // queue is running
74 1 : return;
75 : }
76 315 : m_running = true;
77 315 : std::thread startThread(&MessageQueue::processMessages, this);
78 315 : m_workerThread.swap(startThread);
79 : }
80 :
81 180 : void MessageQueue::stop()
82 : {
83 180 : doStop();
84 : }
85 :
86 1 : void MessageQueue::clear()
87 : {
88 1 : doClear();
89 : }
90 :
91 1520 : std::shared_ptr<Message> MessageQueue::waitForMessage()
92 : {
93 1520 : std::unique_lock<std::mutex> lock(m_mutex);
94 2875 : while (m_queue.empty())
95 : {
96 1355 : m_condVar.wait(lock);
97 : }
98 1520 : std::shared_ptr<Message> message = m_queue.front();
99 1520 : m_queue.pop_front();
100 3040 : return message;
101 1520 : }
102 :
103 1523 : bool MessageQueue::postMessage(const std::shared_ptr<Message> &msg)
104 : {
105 1523 : const std::lock_guard<std::mutex> lock(m_mutex);
106 1523 : if (!m_running)
107 : {
108 2 : GST_ERROR("Message queue is not running");
109 2 : return false;
110 : }
111 1521 : m_queue.push_back(msg);
112 1521 : m_condVar.notify_all();
113 :
114 1521 : return true;
115 1523 : }
116 :
117 315 : void MessageQueue::processMessages()
118 : {
119 : do
120 : {
121 1520 : std::shared_ptr<Message> message = waitForMessage();
122 1520 : message->handle();
123 1520 : } while (m_running);
124 315 : }
125 :
126 0 : bool MessageQueue::scheduleInEventLoop(const std::function<void()> &func)
127 : {
128 0 : auto message = std::make_shared<ScheduleInEventLoopMessage>(func);
129 0 : if (!postMessage(message))
130 : {
131 0 : return false;
132 : }
133 :
134 0 : return true;
135 : }
136 :
137 1207 : bool MessageQueue::callInEventLoop(const std::function<void()> &func)
138 : {
139 1207 : return callInEventLoopInternal(func);
140 : }
141 :
142 1522 : bool MessageQueue::callInEventLoopInternal(const std::function<void()> &func)
143 : {
144 1522 : if (std::this_thread::get_id() != m_workerThread.get_id())
145 : {
146 1483 : auto message = std::make_shared<CallInEventLoopMessage>(func);
147 1483 : if (!postMessage(message))
148 : {
149 1 : return false;
150 : }
151 1482 : message->wait();
152 1483 : }
153 : else
154 : {
155 39 : func();
156 : }
157 :
158 1521 : return true;
159 : }
160 :
161 497 : void MessageQueue::doStop()
162 : {
163 497 : if (!m_running)
164 : {
165 : // queue is not running
166 182 : return;
167 : }
168 315 : callInEventLoopInternal([this]() { m_running = false; });
169 :
170 315 : if (m_workerThread.joinable())
171 315 : m_workerThread.join();
172 :
173 315 : doClear();
174 : }
175 :
176 316 : void MessageQueue::doClear()
177 : {
178 316 : std::unique_lock<std::mutex> lock(m_mutex);
179 317 : while (!m_queue.empty())
180 : {
181 1 : m_queue.front()->skip();
182 1 : m_queue.pop_front();
183 : }
184 316 : }
|