Line data Source code
1 : /*
2 : * Copyright (C) 2023 Sky UK
3 : *
4 : * This library is free software; you can redistribute it and/or
5 : * modify it under the terms of the GNU Lesser General Public
6 : * License as published by the Free Software Foundation;
7 : * version 2.1 of the License.
8 : *
9 : * This library is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 : * Lesser General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU Lesser General Public
15 : * License along with this library; if not, write to the Free Software
16 : * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 : */
18 :
19 : #include "MessageQueue.h"
20 : #include "GstreamerCatLog.h"
21 : #define GST_CAT_DEFAULT rialtoGStreamerCat
22 1635 : CallInEventLoopMessage::CallInEventLoopMessage(const std::function<void()> &func) : m_func(func), m_done{false} {}
23 :
24 1633 : void CallInEventLoopMessage::handle()
25 : {
26 1633 : std::unique_lock<std::mutex> lock(m_callInEventLoopMutex);
27 1633 : m_func();
28 1633 : m_done = true;
29 1633 : m_callInEventLoopCondVar.notify_all();
30 : }
31 :
32 1 : void CallInEventLoopMessage::skip()
33 : {
34 1 : std::unique_lock<std::mutex> lock(m_callInEventLoopMutex);
35 1 : m_done = true;
36 1 : m_callInEventLoopCondVar.notify_all();
37 : }
38 :
39 1634 : void CallInEventLoopMessage::wait()
40 : {
41 1634 : std::unique_lock<std::mutex> lock(m_callInEventLoopMutex);
42 4902 : m_callInEventLoopCondVar.wait(lock, [this]() { return m_done; });
43 1634 : }
44 :
45 0 : ScheduleInEventLoopMessage::ScheduleInEventLoopMessage(const std::function<void()> &func) : m_func(func) {}
46 :
47 0 : void ScheduleInEventLoopMessage::handle()
48 : {
49 0 : m_func();
50 : }
51 :
52 156 : std::shared_ptr<IMessageQueueFactory> IMessageQueueFactory::createFactory()
53 : {
54 156 : return std::make_shared<MessageQueueFactory>();
55 : }
56 :
57 293 : std::unique_ptr<IMessageQueue> MessageQueueFactory::createMessageQueue() const
58 : {
59 293 : return std::make_unique<rialto::MessageQueue>();
60 : }
61 :
62 : namespace rialto
63 : {
64 348 : MessageQueue::MessageQueue() : m_running(false) {}
65 :
66 688 : MessageQueue::~MessageQueue()
67 : {
68 348 : doStop();
69 688 : }
70 :
71 349 : void MessageQueue::start()
72 : {
73 349 : if (m_running)
74 : {
75 : // queue is running
76 1 : return;
77 : }
78 348 : m_running = true;
79 348 : std::thread startThread(&MessageQueue::processMessages, this);
80 348 : m_workerThread.swap(startThread);
81 : }
82 :
83 207 : void MessageQueue::stop()
84 : {
85 207 : doStop();
86 : }
87 :
88 1 : void MessageQueue::clear()
89 : {
90 1 : doClear();
91 : }
92 :
93 1682 : std::shared_ptr<Message> MessageQueue::waitForMessage()
94 : {
95 1682 : std::unique_lock<std::mutex> lock(m_mutex);
96 3192 : while (m_queue.empty())
97 : {
98 1510 : m_condVar.wait(lock);
99 : }
100 1682 : std::shared_ptr<Message> message = m_queue.front();
101 1682 : m_queue.pop_front();
102 3364 : return message;
103 1682 : }
104 :
105 1685 : bool MessageQueue::postMessage(const std::shared_ptr<Message> &msg)
106 : {
107 1685 : const std::lock_guard<std::mutex> lock(m_mutex);
108 1685 : if (!m_running)
109 : {
110 2 : GST_ERROR("Message queue is not running");
111 2 : return false;
112 : }
113 1683 : m_queue.push_back(msg);
114 1683 : m_condVar.notify_all();
115 :
116 1683 : return true;
117 1685 : }
118 :
119 348 : void MessageQueue::processMessages()
120 : {
121 : do
122 : {
123 1682 : std::shared_ptr<Message> message = waitForMessage();
124 1682 : message->handle();
125 1682 : } while (m_running);
126 348 : }
127 :
128 0 : bool MessageQueue::scheduleInEventLoop(const std::function<void()> &func)
129 : {
130 0 : auto message = std::make_shared<ScheduleInEventLoopMessage>(func);
131 0 : if (!postMessage(message))
132 : {
133 0 : return false;
134 : }
135 :
136 0 : return true;
137 : }
138 :
139 1334 : bool MessageQueue::callInEventLoop(const std::function<void()> &func)
140 : {
141 1334 : return callInEventLoopInternal(func);
142 : }
143 :
144 1682 : bool MessageQueue::callInEventLoopInternal(const std::function<void()> &func)
145 : {
146 1682 : if (std::this_thread::get_id() != m_workerThread.get_id())
147 : {
148 1635 : auto message = std::make_shared<CallInEventLoopMessage>(func);
149 1635 : if (!postMessage(message))
150 : {
151 1 : return false;
152 : }
153 1634 : message->wait();
154 1635 : }
155 : else
156 : {
157 47 : func();
158 : }
159 :
160 1681 : return true;
161 : }
162 :
163 555 : void MessageQueue::doStop()
164 : {
165 555 : if (!m_running)
166 : {
167 : // queue is not running
168 207 : return;
169 : }
170 348 : callInEventLoopInternal([this]() { m_running = false; });
171 :
172 348 : if (m_workerThread.joinable())
173 348 : m_workerThread.join();
174 :
175 348 : doClear();
176 : }
177 :
178 349 : void MessageQueue::doClear()
179 : {
180 349 : std::unique_lock<std::mutex> lock(m_mutex);
181 350 : while (!m_queue.empty())
182 : {
183 1 : m_queue.front()->skip();
184 1 : m_queue.pop_front();
185 : }
186 349 : }
187 : } // namespace rialto
|