Line data Source code
1 : /*
2 : * If not stated otherwise in this file or this component's LICENSE file the
3 : * following copyright and licenses apply:
4 : *
5 : * Copyright 2023 Sky UK
6 : *
7 : * Licensed under the Apache License, Version 2.0 (the "License");
8 : * you may not use this file except in compliance with the License.
9 : * You may obtain a copy of the License at
10 : *
11 : * http://www.apache.org/licenses/LICENSE-2.0
12 : *
13 : * Unless required by applicable law or agreed to in writing, software
14 : * distributed under the License is distributed on an "AS IS" BASIS,
15 : * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 : * See the License for the specific language governing permissions and
17 : * limitations under the License.
18 : */
19 :
20 : #include "GstWebAudioPlayer.h"
21 : #include "GstDispatcherThread.h"
22 : #include "RialtoServerLogging.h"
23 : #include "WorkerThread.h"
24 : #include "tasks/webAudio/WebAudioPlayerTaskFactory.h"
25 : #include <stdexcept>
26 :
27 : namespace
28 : {
29 : constexpr uint32_t kMaxWriteBufferTimeoutMs{2000};
30 : } // namespace
31 :
32 : namespace firebolt::rialto::server
33 : {
34 : std::weak_ptr<IGstWebAudioPlayerFactory> GstWebAudioPlayerFactory::m_factory;
35 :
36 25 : std::shared_ptr<IGstWebAudioPlayerFactory> IGstWebAudioPlayerFactory::getFactory()
37 : {
38 25 : std::shared_ptr<IGstWebAudioPlayerFactory> factory = GstWebAudioPlayerFactory::m_factory.lock();
39 :
40 25 : if (!factory)
41 : {
42 : try
43 : {
44 25 : factory = std::make_shared<GstWebAudioPlayerFactory>();
45 : }
46 0 : catch (const std::exception &e)
47 : {
48 0 : RIALTO_SERVER_LOG_ERROR("Failed to create the gstreamer player factory, reason: %s", e.what());
49 : }
50 :
51 25 : GstWebAudioPlayerFactory::m_factory = factory;
52 : }
53 :
54 25 : return factory;
55 : }
56 :
57 1 : std::unique_ptr<IGstWebAudioPlayer> GstWebAudioPlayerFactory::createGstWebAudioPlayer(IGstWebAudioPlayerClient *client,
58 : const uint32_t priority)
59 : {
60 1 : std::unique_ptr<IGstWebAudioPlayer> gstPlayer;
61 :
62 : try
63 : {
64 1 : auto gstWrapperFactory = firebolt::rialto::wrappers::IGstWrapperFactory::getFactory();
65 1 : auto glibWrapperFactory = firebolt::rialto::wrappers::IGlibWrapperFactory::getFactory();
66 1 : std::shared_ptr<firebolt::rialto::wrappers::IGstWrapper> gstWrapper;
67 1 : std::shared_ptr<firebolt::rialto::wrappers::IGlibWrapper> glibWrapper;
68 1 : if ((!gstWrapperFactory) || (!(gstWrapper = gstWrapperFactory->getGstWrapper())))
69 : {
70 0 : throw std::runtime_error("Cannot create GstWrapper");
71 : }
72 1 : if ((!glibWrapperFactory) || (!(glibWrapper = glibWrapperFactory->getGlibWrapper())))
73 : {
74 0 : throw std::runtime_error("Cannot create GlibWrapper");
75 : }
76 2 : gstPlayer = std::make_unique<GstWebAudioPlayer>(client, priority, gstWrapper, glibWrapper,
77 2 : IGstInitialiser::instance(), IGstSrcFactory::getFactory(),
78 2 : std::make_unique<WebAudioPlayerTaskFactory>(client, gstWrapper,
79 : glibWrapper),
80 2 : std::make_unique<WorkerThreadFactory>(),
81 3 : std::make_unique<GstDispatcherThreadFactory>());
82 1 : }
83 0 : catch (const std::exception &e)
84 : {
85 0 : RIALTO_SERVER_LOG_ERROR("Failed to create the gstreamer player, reason: %s", e.what());
86 : }
87 :
88 1 : return gstPlayer;
89 : }
90 :
91 33 : GstWebAudioPlayer::GstWebAudioPlayer(IGstWebAudioPlayerClient *client, const uint32_t priority,
92 : const std::shared_ptr<firebolt::rialto::wrappers::IGstWrapper> &gstWrapper,
93 : const std::shared_ptr<firebolt::rialto::wrappers::IGlibWrapper> &glibWrapper,
94 : const IGstInitialiser &gstInitialiser,
95 : const std::shared_ptr<IGstSrcFactory> &gstSrcFactory,
96 : std::unique_ptr<IWebAudioPlayerTaskFactory> taskFactory,
97 : std::unique_ptr<IWorkerThreadFactory> workerThreadFactory,
98 33 : std::unique_ptr<IGstDispatcherThreadFactory> gstDispatcherThreadFactory)
99 33 : : m_gstPlayerClient(client), m_gstWrapper{gstWrapper}, m_glibWrapper{glibWrapper},
100 66 : m_taskFactory{std::move(taskFactory)}
101 : {
102 33 : RIALTO_SERVER_LOG_DEBUG("GstWebAudioPlayer is constructed.");
103 :
104 33 : gstInitialiser.waitForInitialisation();
105 :
106 33 : if ((!gstSrcFactory) || (!(m_context.gstSrc = gstSrcFactory->getGstSrc())))
107 : {
108 1 : throw std::runtime_error("Cannot create GstSrc");
109 : }
110 :
111 : // Ensure that rialtosrc has been initalised
112 32 : m_context.gstSrc->initSrc();
113 :
114 : // Start task thread
115 32 : if ((!workerThreadFactory) || (!(m_workerThread = workerThreadFactory->createWorkerThread())))
116 : {
117 1 : throw std::runtime_error("Failed to create the worker thread");
118 : }
119 :
120 31 : if (!initWebAudioPipeline(priority))
121 : {
122 12 : termWebAudioPipeline();
123 12 : resetWorkerThread();
124 12 : throw std::runtime_error("Failed to initalise the pipeline");
125 : }
126 :
127 38 : if ((!gstDispatcherThreadFactory) ||
128 19 : (!(m_gstDispatcherThread =
129 38 : gstDispatcherThreadFactory->createGstDispatcherThread(*this, m_context.pipeline, m_gstWrapper))))
130 : {
131 1 : termWebAudioPipeline();
132 1 : resetWorkerThread();
133 1 : throw std::runtime_error("Failed to create the dispatcher thread");
134 : }
135 153 : }
136 :
137 36 : GstWebAudioPlayer::~GstWebAudioPlayer()
138 : {
139 18 : RIALTO_SERVER_LOG_DEBUG("GstWebAudioPlayer is destructed.");
140 :
141 18 : m_gstDispatcherThread.reset();
142 :
143 18 : resetWorkerThread();
144 :
145 18 : termWebAudioPipeline();
146 36 : }
147 :
148 31 : bool GstWebAudioPlayer::initWebAudioPipeline(const uint32_t priority)
149 : {
150 31 : m_context.pipeline = m_gstWrapper->gstPipelineNew(std::string("webaudiopipeline" + std::to_string(priority)).c_str());
151 31 : if (!m_context.pipeline)
152 : {
153 1 : RIALTO_SERVER_LOG_ERROR("Failed to create the webaudiopipeline");
154 1 : return false;
155 : }
156 :
157 30 : RIALTO_SERVER_LOG_MIL("RialtoServer's webaudio pipeline constructed");
158 :
159 : // Create and initalise appsrc
160 30 : m_context.source = m_gstWrapper->gstElementFactoryMake("appsrc", "audsrc");
161 30 : if (!m_context.source)
162 : {
163 1 : RIALTO_SERVER_LOG_ERROR("Failed to create the appsrc");
164 1 : return false;
165 : }
166 29 : m_gstWrapper->gstAppSrcSetMaxBytes(GST_APP_SRC(m_context.source), kMaxWebAudioBytes);
167 29 : m_glibWrapper->gObjectSet(m_context.source, "format", GST_FORMAT_TIME, nullptr);
168 :
169 : // Perform sink specific initalisation
170 29 : GstPluginFeature *feature = nullptr;
171 29 : GstRegistry *reg = m_gstWrapper->gstRegistryGet();
172 29 : if (!reg)
173 : {
174 1 : RIALTO_SERVER_LOG_ERROR("Failed get the gst registry");
175 1 : return false;
176 : }
177 :
178 28 : GstElement *sink = nullptr;
179 28 : if (nullptr != (feature = m_gstWrapper->gstRegistryLookupFeature(reg, "amlhalasink")))
180 : {
181 : // LLama
182 3 : RIALTO_SERVER_LOG_INFO("Use amlhalasink");
183 3 : sink = createAmlhalaSink();
184 3 : m_gstWrapper->gstObjectUnref(feature);
185 : }
186 25 : else if (nullptr != (feature = m_gstWrapper->gstRegistryLookupFeature(reg, "rtkaudiosink")))
187 : {
188 : // XiOne
189 5 : RIALTO_SERVER_LOG_INFO("Use rtkaudiosink");
190 5 : sink = createRtkAudioSink();
191 5 : m_gstWrapper->gstObjectUnref(feature);
192 : }
193 : else
194 : {
195 20 : RIALTO_SERVER_LOG_INFO("Use autoaudiosink");
196 20 : sink = createAutoSink();
197 : }
198 :
199 28 : if (sink)
200 : {
201 25 : return linkElementsToSrc(sink);
202 : }
203 : else
204 : {
205 3 : m_gstWrapper->gstObjectUnref(m_context.source);
206 3 : m_context.source = nullptr;
207 : }
208 3 : return false;
209 : }
210 :
211 3 : GstElement *GstWebAudioPlayer::createAmlhalaSink()
212 : {
213 3 : GstElement *sink = m_gstWrapper->gstElementFactoryMake("amlhalasink", "webaudiosink");
214 3 : if (!sink)
215 : {
216 1 : RIALTO_SERVER_LOG_ERROR("Failed create the amlhalasink");
217 1 : return nullptr;
218 : }
219 2 : m_glibWrapper->gObjectSet(G_OBJECT(sink), "direct-mode", FALSE, NULL);
220 :
221 2 : return sink;
222 : }
223 :
224 5 : GstElement *GstWebAudioPlayer::createRtkAudioSink()
225 : {
226 5 : GstElement *sink = m_gstWrapper->gstElementFactoryMake("rtkaudiosink", "webaudiosink");
227 5 : if (!sink)
228 : {
229 1 : RIALTO_SERVER_LOG_ERROR("Failed create the rtkaudiosink");
230 1 : return nullptr;
231 : }
232 4 : m_glibWrapper->gObjectSet(G_OBJECT(sink), "media-tunnel", FALSE, NULL);
233 4 : m_glibWrapper->gObjectSet(G_OBJECT(sink), "audio-service", TRUE, NULL);
234 :
235 4 : return sink;
236 : }
237 :
238 20 : GstElement *GstWebAudioPlayer::createAutoSink()
239 : {
240 20 : GstElement *sink = m_gstWrapper->gstElementFactoryMake("autoaudiosink", "webaudiosink");
241 20 : if (!sink)
242 : {
243 1 : RIALTO_SERVER_LOG_ERROR("Failed create the autoaudiosink");
244 1 : return nullptr;
245 : }
246 :
247 19 : return sink;
248 : }
249 :
250 : // NOTE:-
251 : // This method hands the responsibility for the destruction of both { "sink", "m_context.source" }
252 : // over to the pipeline (or if handover wasn't possible, it will unref)
253 25 : bool GstWebAudioPlayer::linkElementsToSrc(GstElement *sink)
254 : {
255 25 : bool status{true};
256 :
257 25 : GstElement *convert{nullptr};
258 25 : GstElement *resample{nullptr};
259 25 : GstElement *volume{nullptr};
260 25 : GstElement *queue{nullptr};
261 :
262 25 : convert = m_gstWrapper->gstElementFactoryMake("audioconvert", NULL);
263 25 : if (!convert)
264 : {
265 1 : RIALTO_SERVER_LOG_ERROR("Failed create the audioconvert");
266 1 : status = false;
267 : }
268 :
269 25 : if (status)
270 : {
271 24 : resample = m_gstWrapper->gstElementFactoryMake("audioresample", NULL);
272 24 : if (!resample)
273 : {
274 1 : RIALTO_SERVER_LOG_ERROR("Failed create the audioresample");
275 1 : status = false;
276 : }
277 : }
278 :
279 25 : if (status)
280 : {
281 23 : volume = m_gstWrapper->gstElementFactoryMake("volume", NULL);
282 23 : if (!volume)
283 : {
284 1 : RIALTO_SERVER_LOG_ERROR("Failed create the volume");
285 1 : status = false;
286 : }
287 : }
288 25 : if (status)
289 : {
290 22 : queue = m_gstWrapper->gstElementFactoryMake("queue", NULL);
291 22 : if (!queue)
292 : {
293 1 : RIALTO_SERVER_LOG_ERROR("Failed create the queue");
294 1 : status = false;
295 : }
296 : else
297 : {
298 21 : constexpr guint kWebAudioQueueSize{8192};
299 21 : m_glibWrapper->gObjectSet(queue, "max-size-bytes", kWebAudioQueueSize, nullptr);
300 : }
301 : }
302 :
303 25 : std::queue<GstElement *> elementsToAdd;
304 25 : elementsToAdd.push(m_context.source);
305 25 : if (convert)
306 24 : elementsToAdd.push(convert);
307 25 : if (resample)
308 23 : elementsToAdd.push(resample);
309 25 : if (volume)
310 22 : elementsToAdd.push(volume);
311 25 : if (queue)
312 21 : elementsToAdd.push(queue);
313 25 : elementsToAdd.push(sink);
314 :
315 25 : if (status)
316 : {
317 : // Add elements to the pipeline
318 21 : GstBin *pipelineBin = GST_BIN(m_context.pipeline);
319 146 : while (!elementsToAdd.empty())
320 : {
321 126 : if (m_gstWrapper->gstBinAdd(pipelineBin, elementsToAdd.front()))
322 125 : elementsToAdd.pop();
323 : else
324 : {
325 1 : RIALTO_SERVER_LOG_ERROR("Failed to add element to the bin");
326 1 : status = false;
327 1 : break;
328 : }
329 : }
330 : }
331 :
332 25 : if (status)
333 : {
334 20 : if ((!m_gstWrapper->gstElementLink(m_context.source, convert)) ||
335 19 : (!m_gstWrapper->gstElementLink(convert, resample)) || (!m_gstWrapper->gstElementLink(resample, volume)) ||
336 39 : (!m_gstWrapper->gstElementLink(volume, queue)) || (!m_gstWrapper->gstElementLink(queue, sink)))
337 : {
338 1 : RIALTO_SERVER_LOG_ERROR("Failed to link elements");
339 1 : status = false;
340 : }
341 20 : m_context.gstVolumeElement = GST_STREAM_VOLUME(volume);
342 : }
343 :
344 25 : if (!status)
345 : {
346 : // Unref anything that wasn't added to the pipeline
347 21 : while (!elementsToAdd.empty())
348 : {
349 15 : m_gstWrapper->gstObjectUnref(elementsToAdd.front());
350 15 : elementsToAdd.pop();
351 : }
352 6 : m_context.source = nullptr;
353 : }
354 :
355 25 : return status;
356 : }
357 :
358 31 : void GstWebAudioPlayer::termWebAudioPipeline()
359 : {
360 31 : if (m_context.pipeline)
361 : {
362 30 : m_taskFactory->createStop(*this)->execute();
363 30 : GstBus *bus = m_gstWrapper->gstPipelineGetBus(GST_PIPELINE(m_context.pipeline));
364 30 : if (bus)
365 : {
366 29 : m_gstWrapper->gstBusSetSyncHandler(bus, nullptr, nullptr, nullptr);
367 29 : m_gstWrapper->gstObjectUnref(bus);
368 : }
369 :
370 30 : m_gstWrapper->gstObjectUnref(m_context.pipeline);
371 : }
372 31 : RIALTO_SERVER_LOG_MIL("RialtoServer's webaudio pipeline terminated.");
373 : }
374 :
375 31 : void GstWebAudioPlayer::resetWorkerThread()
376 : {
377 31 : m_workerThread->enqueueTask(m_taskFactory->createShutdown(*this));
378 31 : m_workerThread->join();
379 31 : m_workerThread.reset();
380 : }
381 :
382 1 : void GstWebAudioPlayer::setCaps(const std::string &audioMimeType, std::weak_ptr<const WebAudioConfig> config)
383 : {
384 1 : m_workerThread->enqueueTask(m_taskFactory->createSetCaps(m_context, audioMimeType, config));
385 : }
386 :
387 1 : void GstWebAudioPlayer::play()
388 : {
389 1 : m_workerThread->enqueueTask(m_taskFactory->createPlay(*this));
390 : }
391 :
392 1 : void GstWebAudioPlayer::pause()
393 : {
394 1 : m_workerThread->enqueueTask(m_taskFactory->createPause(*this));
395 : }
396 :
397 1 : void GstWebAudioPlayer::setVolume(double volume)
398 : {
399 1 : m_workerThread->enqueueTask(m_taskFactory->createSetVolume(m_context, volume));
400 : }
401 :
402 1 : bool GstWebAudioPlayer::getVolume(double &volume)
403 : {
404 : // Must be called on the main thread, otherwise the pipeline can be destroyed during the query.
405 1 : volume = m_gstWrapper->gstStreamVolumeGetVolume(m_context.gstVolumeElement, GST_STREAM_VOLUME_FORMAT_LINEAR);
406 1 : return true;
407 : }
408 :
409 2 : uint32_t GstWebAudioPlayer::writeBuffer(uint8_t *mainPtr, uint32_t mainLength, uint8_t *wrapPtr, uint32_t wrapLength)
410 : {
411 : // Must block and wait for the data to be written from the shared buffer.
412 2 : std::unique_lock<std::mutex> lock(m_context.writeBufferMutex);
413 2 : m_workerThread->enqueueTask(m_taskFactory->createWriteBuffer(m_context, mainPtr, mainLength, wrapPtr, wrapLength));
414 2 : std::cv_status status = m_context.writeBufferCond.wait_for(lock, std::chrono::milliseconds(kMaxWriteBufferTimeoutMs));
415 2 : if (std::cv_status::timeout == status)
416 : {
417 1 : RIALTO_SERVER_LOG_ERROR("Timed out writing to the gstreamer buffers");
418 1 : return 0;
419 : }
420 : else
421 : {
422 1 : return m_context.lastBytesWritten;
423 : }
424 2 : }
425 :
426 1 : void GstWebAudioPlayer::setEos()
427 : {
428 1 : m_workerThread->enqueueTask(m_taskFactory->createEos(m_context));
429 : }
430 :
431 1 : uint64_t GstWebAudioPlayer::getQueuedBytes()
432 : {
433 : // Must be called on the main thread, otherwise the pipeline can be destroyed during the query.
434 1 : return m_gstWrapper->gstAppSrcGetCurrentLevelBytes(GST_APP_SRC(m_context.source));
435 : }
436 :
437 3 : bool GstWebAudioPlayer::changePipelineState(GstState newState)
438 : {
439 3 : if (m_gstWrapper->gstElementSetState(m_context.pipeline, newState) == GST_STATE_CHANGE_FAILURE)
440 : {
441 1 : RIALTO_SERVER_LOG_ERROR("Change state failed - Gstreamer returned an error");
442 1 : if (m_gstPlayerClient)
443 1 : m_gstPlayerClient->notifyState(WebAudioPlayerState::FAILURE);
444 1 : return false;
445 : }
446 2 : return true;
447 : }
448 :
449 2 : void GstWebAudioPlayer::stopWorkerThread()
450 : {
451 2 : if (m_workerThread)
452 : {
453 2 : m_workerThread->stop();
454 : }
455 : }
456 :
457 1 : void GstWebAudioPlayer::handleBusMessage(GstMessage *message)
458 : {
459 1 : if (m_workerThread)
460 : {
461 1 : m_workerThread->enqueueTask(m_taskFactory->createHandleBusMessage(m_context, *this, message));
462 : }
463 : }
464 :
465 1 : void GstWebAudioPlayer::ping(std::unique_ptr<IHeartbeatHandler> &&heartbeatHandler)
466 : {
467 1 : if (m_workerThread)
468 : {
469 1 : m_workerThread->enqueueTask(m_taskFactory->createPing(std::move(heartbeatHandler)));
470 : }
471 : }
472 :
473 : }; // namespace firebolt::rialto::server
|