Line data Source code
1 : /*
2 : * Copyright (C) 2023 Sky UK
3 : *
4 : * This library is free software; you can redistribute it and/or
5 : * modify it under the terms of the GNU Lesser General Public
6 : * License as published by the Free Software Foundation;
7 : * version 2.1 of the License.
8 : *
9 : * This library is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 : * Lesser General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU Lesser General Public
15 : * License along with this library; if not, write to the Free Software
16 : * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 : */
18 :
19 : #include "MessageQueue.h"
20 : #include "GstreamerCatLog.h"
21 : #define GST_CAT_DEFAULT rialtoGStreamerCat
22 1535 : CallInEventLoopMessage::CallInEventLoopMessage(const std::function<void()> &func) : m_func(func), m_done{false} {}
23 :
24 1533 : void CallInEventLoopMessage::handle()
25 : {
26 1533 : std::unique_lock<std::mutex> lock(m_callInEventLoopMutex);
27 1533 : m_func();
28 1533 : m_done = true;
29 1533 : m_callInEventLoopCondVar.notify_all();
30 : }
31 :
32 1 : void CallInEventLoopMessage::skip()
33 : {
34 1 : std::unique_lock<std::mutex> lock(m_callInEventLoopMutex);
35 1 : m_done = true;
36 1 : m_callInEventLoopCondVar.notify_all();
37 : }
38 :
39 1534 : void CallInEventLoopMessage::wait()
40 : {
41 1534 : std::unique_lock<std::mutex> lock(m_callInEventLoopMutex);
42 4602 : m_callInEventLoopCondVar.wait(lock, [this]() { return m_done; });
43 1534 : }
44 :
45 0 : ScheduleInEventLoopMessage::ScheduleInEventLoopMessage(const std::function<void()> &func) : m_func(func) {}
46 :
47 0 : void ScheduleInEventLoopMessage::handle()
48 : {
49 0 : m_func();
50 : }
51 :
52 156 : std::shared_ptr<IMessageQueueFactory> IMessageQueueFactory::createFactory()
53 : {
54 156 : return std::make_shared<MessageQueueFactory>();
55 : }
56 :
57 293 : std::unique_ptr<IMessageQueue> MessageQueueFactory::createMessageQueue() const
58 : {
59 293 : return std::make_unique<MessageQueue>();
60 : }
61 :
62 327 : MessageQueue::MessageQueue() : m_running(false) {}
63 :
64 646 : MessageQueue::~MessageQueue()
65 : {
66 327 : doStop();
67 646 : }
68 :
69 328 : void MessageQueue::start()
70 : {
71 328 : if (m_running)
72 : {
73 : // queue is running
74 1 : return;
75 : }
76 327 : m_running = true;
77 327 : std::thread startThread(&MessageQueue::processMessages, this);
78 327 : m_workerThread.swap(startThread);
79 : }
80 :
81 186 : void MessageQueue::stop()
82 : {
83 186 : doStop();
84 : }
85 :
86 1 : void MessageQueue::clear()
87 : {
88 1 : doClear();
89 : }
90 :
91 1584 : std::shared_ptr<Message> MessageQueue::waitForMessage()
92 : {
93 1584 : std::unique_lock<std::mutex> lock(m_mutex);
94 2999 : while (m_queue.empty())
95 : {
96 1415 : m_condVar.wait(lock);
97 : }
98 1584 : std::shared_ptr<Message> message = m_queue.front();
99 1584 : m_queue.pop_front();
100 3168 : return message;
101 1584 : }
102 :
103 1587 : bool MessageQueue::postMessage(const std::shared_ptr<Message> &msg)
104 : {
105 1587 : const std::lock_guard<std::mutex> lock(m_mutex);
106 1587 : if (!m_running)
107 : {
108 2 : GST_ERROR("Message queue is not running");
109 2 : return false;
110 : }
111 1585 : m_queue.push_back(msg);
112 1585 : m_condVar.notify_all();
113 :
114 1585 : return true;
115 1587 : }
116 :
117 327 : void MessageQueue::processMessages()
118 : {
119 : do
120 : {
121 1584 : std::shared_ptr<Message> message = waitForMessage();
122 1584 : message->handle();
123 1584 : } while (m_running);
124 327 : }
125 :
126 0 : bool MessageQueue::scheduleInEventLoop(const std::function<void()> &func)
127 : {
128 0 : auto message = std::make_shared<ScheduleInEventLoopMessage>(func);
129 0 : if (!postMessage(message))
130 : {
131 0 : return false;
132 : }
133 :
134 0 : return true;
135 : }
136 :
137 1257 : bool MessageQueue::callInEventLoop(const std::function<void()> &func)
138 : {
139 1257 : return callInEventLoopInternal(func);
140 : }
141 :
142 1584 : bool MessageQueue::callInEventLoopInternal(const std::function<void()> &func)
143 : {
144 1584 : if (std::this_thread::get_id() != m_workerThread.get_id())
145 : {
146 1535 : auto message = std::make_shared<CallInEventLoopMessage>(func);
147 1535 : if (!postMessage(message))
148 : {
149 1 : return false;
150 : }
151 1534 : message->wait();
152 1535 : }
153 : else
154 : {
155 49 : func();
156 : }
157 :
158 1583 : return true;
159 : }
160 :
161 513 : void MessageQueue::doStop()
162 : {
163 513 : if (!m_running)
164 : {
165 : // queue is not running
166 186 : return;
167 : }
168 327 : callInEventLoopInternal([this]() { m_running = false; });
169 :
170 327 : if (m_workerThread.joinable())
171 327 : m_workerThread.join();
172 :
173 327 : doClear();
174 : }
175 :
176 328 : void MessageQueue::doClear()
177 : {
178 328 : std::unique_lock<std::mutex> lock(m_mutex);
179 329 : while (!m_queue.empty())
180 : {
181 1 : m_queue.front()->skip();
182 1 : m_queue.pop_front();
183 : }
184 328 : }
|