Line data Source code
1 : /*
2 : * Copyright (C) 2023 Sky UK
3 : *
4 : * This library is free software; you can redistribute it and/or
5 : * modify it under the terms of the GNU Lesser General Public
6 : * License as published by the Free Software Foundation;
7 : * version 2.1 of the License.
8 : *
9 : * This library is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 : * Lesser General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU Lesser General Public
15 : * License along with this library; if not, write to the Free Software
16 : * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 : */
18 :
19 : #include "MessageQueue.h"
20 : #include "GstreamerCatLog.h"
21 : #define GST_CAT_DEFAULT rialtoGStreamerCat
22 1647 : CallInEventLoopMessage::CallInEventLoopMessage(const std::function<void()> &func) : m_func(func), m_done{false} {}
23 :
24 1645 : void CallInEventLoopMessage::handle()
25 : {
26 1645 : std::unique_lock<std::mutex> lock(m_callInEventLoopMutex);
27 1645 : m_func();
28 1645 : m_done = true;
29 1645 : m_callInEventLoopCondVar.notify_all();
30 : }
31 :
32 1 : void CallInEventLoopMessage::skip()
33 : {
34 1 : std::unique_lock<std::mutex> lock(m_callInEventLoopMutex);
35 1 : m_done = true;
36 1 : m_callInEventLoopCondVar.notify_all();
37 : }
38 :
39 1646 : void CallInEventLoopMessage::wait()
40 : {
41 1646 : std::unique_lock<std::mutex> lock(m_callInEventLoopMutex);
42 4938 : m_callInEventLoopCondVar.wait(lock, [this]() { return m_done; });
43 1646 : }
44 :
45 0 : ScheduleInEventLoopMessage::ScheduleInEventLoopMessage(const std::function<void()> &func) : m_func(func) {}
46 :
47 0 : void ScheduleInEventLoopMessage::handle()
48 : {
49 0 : m_func();
50 : }
51 :
52 158 : std::shared_ptr<IMessageQueueFactory> IMessageQueueFactory::createFactory()
53 : {
54 158 : return std::make_shared<MessageQueueFactory>();
55 : }
56 :
57 297 : std::unique_ptr<IMessageQueue> MessageQueueFactory::createMessageQueue() const
58 : {
59 297 : return std::make_unique<rialto::MessageQueue>();
60 : }
61 :
62 : namespace rialto
63 : {
64 352 : MessageQueue::MessageQueue() : m_running(false) {}
65 :
66 696 : MessageQueue::~MessageQueue()
67 : {
68 352 : doStop();
69 696 : }
70 :
71 351 : void MessageQueue::start()
72 : {
73 351 : if (m_running)
74 : {
75 : // queue is running
76 1 : return;
77 : }
78 350 : m_running = true;
79 350 : std::thread startThread(&MessageQueue::processMessages, this);
80 350 : m_workerThread.swap(startThread);
81 : }
82 :
83 207 : void MessageQueue::stop()
84 : {
85 207 : doStop();
86 : }
87 :
88 1 : void MessageQueue::clear()
89 : {
90 1 : doClear();
91 : }
92 :
93 1701 : std::shared_ptr<Message> MessageQueue::waitForMessage()
94 : {
95 1701 : std::unique_lock<std::mutex> lock(m_mutex);
96 3230 : while (m_queue.empty())
97 : {
98 1529 : m_condVar.wait(lock);
99 : }
100 1701 : std::shared_ptr<Message> message = m_queue.front();
101 1701 : m_queue.pop_front();
102 3402 : return message;
103 1701 : }
104 :
105 1704 : bool MessageQueue::postMessage(const std::shared_ptr<Message> &msg)
106 : {
107 1704 : const std::lock_guard<std::mutex> lock(m_mutex);
108 1704 : if (!m_running)
109 : {
110 2 : GST_ERROR("Message queue is not running");
111 2 : return false;
112 : }
113 1702 : m_queue.push_back(msg);
114 1702 : m_condVar.notify_all();
115 :
116 1702 : return true;
117 1704 : }
118 :
119 350 : void MessageQueue::processMessages()
120 : {
121 : do
122 : {
123 1701 : std::shared_ptr<Message> message = waitForMessage();
124 1701 : message->handle();
125 1701 : } while (m_running);
126 350 : }
127 :
128 0 : bool MessageQueue::scheduleInEventLoop(const std::function<void()> &func)
129 : {
130 0 : auto message = std::make_shared<ScheduleInEventLoopMessage>(func);
131 0 : if (!postMessage(message))
132 : {
133 0 : return false;
134 : }
135 :
136 0 : return true;
137 : }
138 :
139 1349 : bool MessageQueue::callInEventLoop(const std::function<void()> &func)
140 : {
141 1349 : return callInEventLoopInternal(func);
142 : }
143 :
144 1699 : bool MessageQueue::callInEventLoopInternal(const std::function<void()> &func)
145 : {
146 1699 : if (std::this_thread::get_id() != m_workerThread.get_id())
147 : {
148 1647 : auto message = std::make_shared<CallInEventLoopMessage>(func);
149 1647 : if (!postMessage(message))
150 : {
151 1 : return false;
152 : }
153 1646 : message->wait();
154 1647 : }
155 : else
156 : {
157 52 : func();
158 : }
159 :
160 1698 : return true;
161 : }
162 :
163 559 : void MessageQueue::doStop()
164 : {
165 559 : if (!m_running)
166 : {
167 : // queue is not running
168 209 : return;
169 : }
170 350 : callInEventLoopInternal([this]() { m_running = false; });
171 :
172 350 : if (m_workerThread.joinable())
173 350 : m_workerThread.join();
174 :
175 350 : doClear();
176 : }
177 :
178 351 : void MessageQueue::doClear()
179 : {
180 351 : std::unique_lock<std::mutex> lock(m_mutex);
181 352 : while (!m_queue.empty())
182 : {
183 1 : m_queue.front()->skip();
184 1 : m_queue.pop_front();
185 : }
186 351 : }
187 : } // namespace rialto
|