Line data Source code
1 : /*
2 : * Copyright (C) 2022 Sky UK
3 : *
4 : * This library is free software; you can redistribute it and/or
5 : * modify it under the terms of the GNU Lesser General Public
6 : * License as published by the Free Software Foundation;
7 : * version 2.1 of the License.
8 : *
9 : * This library is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 : * Lesser General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU Lesser General Public
15 : * License along with this library; if not, write to the Free Software
16 : * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 : */
18 :
19 : #define USE_GLIB 1
20 :
21 : #include <cstring>
22 : #include <limits>
23 :
24 : #include <gst/gst.h>
25 :
26 : #include "ControlBackend.h"
27 : #include "GStreamerUtils.h"
28 : #include "IClientLogControl.h"
29 : #include "IMediaPipeline.h"
30 : #include "LogToGstHandler.h"
31 : #include "RialtoGStreamerMSEBaseSink.h"
32 : #include "RialtoGStreamerMSEBaseSinkPrivate.h"
33 :
34 : GST_DEBUG_CATEGORY_STATIC(RialtoMSEBaseSinkDebug);
35 : #define GST_CAT_DEFAULT RialtoMSEBaseSinkDebug
36 :
37 : #define rialto_mse_base_sink_parent_class parent_class
38 3242 : G_DEFINE_TYPE_WITH_CODE(RialtoMSEBaseSink, rialto_mse_base_sink, GST_TYPE_ELEMENT,
39 : G_ADD_PRIVATE(RialtoMSEBaseSink)
40 : GST_DEBUG_CATEGORY_INIT(RialtoMSEBaseSinkDebug, "rialtomsebasesink", 0,
41 : "rialto mse base sink"));
42 :
43 : enum
44 : {
45 : PROP_0,
46 : PROP_IS_SINGLE_PATH_STREAM,
47 : PROP_N_STREAMS,
48 : PROP_HAS_DRM,
49 : PROP_STATS,
50 : PROP_LAST
51 : };
52 :
53 : enum
54 : {
55 : SIGNAL_UNDERFLOW,
56 : SIGNAL_LAST
57 : };
58 :
59 : static guint g_signals[SIGNAL_LAST] = {0};
60 :
61 265 : void rialto_mse_base_sink_initialise_delegate(RialtoMSEBaseSink *sink, const std::shared_ptr<IPlaybackDelegate> &delegate)
62 : {
63 265 : std::unique_lock lock{sink->priv->m_sinkMutex};
64 265 : sink->priv->m_delegate = delegate;
65 :
66 265 : for (auto &[type, value] : sink->priv->m_queuedProperties)
67 : {
68 0 : delegate->setProperty(type, &value);
69 0 : g_value_unset(&value);
70 : }
71 265 : sink->priv->m_queuedProperties.clear();
72 : }
73 :
74 1766 : static std::shared_ptr<IPlaybackDelegate> rialto_mse_base_sink_get_delegate(RialtoMSEBaseSink *sink)
75 : {
76 1766 : std::unique_lock lock{sink->priv->m_sinkMutex};
77 1766 : if (!sink->priv->m_delegate)
78 : {
79 0 : GST_ERROR_OBJECT(sink, "Sink delegate not initialized");
80 : }
81 3532 : return sink->priv->m_delegate;
82 1766 : }
83 :
84 174 : void rialto_mse_base_async_start(RialtoMSEBaseSink *sink)
85 : {
86 174 : if (auto delegate = rialto_mse_base_sink_get_delegate(sink))
87 : {
88 174 : delegate->postAsyncStart();
89 : }
90 : }
91 :
92 3 : static void rialto_mse_base_sink_eos_handler(RialtoMSEBaseSink *sink)
93 : {
94 3 : if (auto delegate = rialto_mse_base_sink_get_delegate(sink))
95 : {
96 3 : delegate->handleEos();
97 : }
98 : }
99 :
100 3 : static void rialto_mse_base_sink_error_handler(RialtoMSEBaseSink *sink, firebolt::rialto::PlaybackError error)
101 : {
102 3 : if (auto delegate = rialto_mse_base_sink_get_delegate(sink))
103 : {
104 3 : delegate->handleError(error);
105 : }
106 : }
107 :
108 57 : static void rialto_mse_base_sink_rialto_state_changed_handler(RialtoMSEBaseSink *sink,
109 : firebolt::rialto::PlaybackState state)
110 : {
111 57 : if (auto delegate = rialto_mse_base_sink_get_delegate(sink))
112 : {
113 57 : delegate->handleStateChanged(state);
114 : }
115 : }
116 :
117 2 : static void rialto_mse_base_sink_flush_completed_handler(RialtoMSEBaseSink *sink)
118 : {
119 2 : if (auto delegate = rialto_mse_base_sink_get_delegate(sink))
120 : {
121 2 : delegate->handleFlushCompleted();
122 : }
123 : }
124 :
125 23 : static gboolean rialto_mse_base_sink_send_event(GstElement *element, GstEvent *event)
126 : {
127 23 : if (auto delegate = rialto_mse_base_sink_get_delegate(RIALTO_MSE_BASE_SINK(element)))
128 : {
129 23 : return delegate->handleSendEvent(event);
130 : }
131 0 : return FALSE;
132 : }
133 :
134 188 : gboolean rialto_mse_base_sink_event(GstPad *pad, GstObject *parent, GstEvent *event)
135 : {
136 188 : if (auto delegate = rialto_mse_base_sink_get_delegate(RIALTO_MSE_BASE_SINK(parent)))
137 : {
138 188 : return delegate->handleEvent(event);
139 : }
140 0 : return FALSE;
141 : }
142 :
143 32 : GstFlowReturn rialto_mse_base_sink_chain(GstPad *pad, GstObject *parent, GstBuffer *buf)
144 : {
145 32 : if (auto delegate = rialto_mse_base_sink_get_delegate(RIALTO_MSE_BASE_SINK(parent)))
146 : {
147 32 : return delegate->handleBuffer(buf);
148 : }
149 0 : return GST_FLOW_ERROR;
150 : }
151 :
152 5 : GstRefSample rialto_mse_base_sink_get_front_sample(RialtoMSEBaseSink *sink)
153 : {
154 5 : if (auto delegate = rialto_mse_base_sink_get_delegate(sink))
155 : {
156 5 : return delegate->getFrontSample();
157 : }
158 0 : return GstRefSample{};
159 : }
160 :
161 2 : void rialto_mse_base_sink_pop_sample(RialtoMSEBaseSink *sink)
162 : {
163 2 : if (auto delegate = rialto_mse_base_sink_get_delegate(sink))
164 : {
165 2 : delegate->popSample();
166 : }
167 : }
168 :
169 3 : bool rialto_mse_base_sink_is_eos(RialtoMSEBaseSink *sink)
170 : {
171 3 : if (auto delegate = rialto_mse_base_sink_get_delegate(sink))
172 : {
173 3 : return delegate->isEos();
174 : }
175 0 : return false;
176 : }
177 :
178 7 : void rialto_mse_base_sink_lost_state(RialtoMSEBaseSink *sink)
179 : {
180 7 : if (auto delegate = rialto_mse_base_sink_get_delegate(sink))
181 : {
182 7 : delegate->lostState();
183 : }
184 : }
185 :
186 3 : static void rialto_mse_base_sink_qos_handle(GstElement *element, uint64_t processed, uint64_t dropped)
187 : {
188 3 : if (auto delegate = rialto_mse_base_sink_get_delegate(RIALTO_MSE_BASE_SINK(element)))
189 : {
190 3 : delegate->handleQos(processed, dropped);
191 : }
192 : }
193 :
194 17 : static gboolean rialto_mse_base_sink_query(GstElement *element, GstQuery *query)
195 : {
196 17 : RialtoMSEBaseSink *sink = RIALTO_MSE_BASE_SINK(element);
197 17 : if (auto delegate = rialto_mse_base_sink_get_delegate(sink))
198 : {
199 17 : std::optional<gboolean> result{delegate->handleQuery(query)};
200 17 : if (result.has_value())
201 : {
202 8 : return result.value();
203 : }
204 9 : GstElement *parent = GST_ELEMENT(&sink->parent);
205 9 : return GST_ELEMENT_CLASS(parent_class)->query(parent, query);
206 17 : }
207 0 : return FALSE;
208 : }
209 :
210 824 : static GstStateChangeReturn rialto_mse_base_sink_change_state(GstElement *element, GstStateChange transition)
211 : {
212 824 : RialtoMSEBaseSink *sink = RIALTO_MSE_BASE_SINK(element);
213 824 : if (auto delegate = rialto_mse_base_sink_get_delegate(sink))
214 : {
215 824 : GstStateChangeReturn status = delegate->changeState(transition);
216 824 : if (GST_STATE_CHANGE_FAILURE != status)
217 : {
218 820 : GstStateChangeReturn result = GST_ELEMENT_CLASS(parent_class)->change_state(element, transition);
219 820 : if (G_UNLIKELY(result == GST_STATE_CHANGE_FAILURE))
220 : {
221 0 : GST_WARNING_OBJECT(sink, "State change failed");
222 0 : return result;
223 : }
224 820 : else if (result == GST_STATE_CHANGE_ASYNC)
225 : {
226 0 : return GST_STATE_CHANGE_ASYNC;
227 : }
228 : }
229 824 : return status;
230 : }
231 0 : return GST_STATE_CHANGE_FAILURE;
232 : }
233 :
234 49 : void rialto_mse_base_sink_handle_get_property(RialtoMSEBaseSink *sink, const IPlaybackDelegate::Property &property,
235 : GValue *value)
236 : {
237 49 : if (auto delegate = rialto_mse_base_sink_get_delegate(sink))
238 : {
239 49 : delegate->getProperty(property, value);
240 : }
241 : else // Copy queued value if present
242 : {
243 0 : std::unique_lock lock{sink->priv->m_sinkMutex};
244 0 : if (sink->priv->m_queuedProperties.find(property) != sink->priv->m_queuedProperties.end())
245 : {
246 0 : g_value_copy(&sink->priv->m_queuedProperties[property], value);
247 : }
248 49 : }
249 : }
250 :
251 374 : void rialto_mse_base_sink_handle_set_property(RialtoMSEBaseSink *sink, const IPlaybackDelegate::Property &property,
252 : const GValue *value)
253 : {
254 374 : if (auto delegate = rialto_mse_base_sink_get_delegate(sink))
255 : {
256 374 : delegate->setProperty(property, value);
257 : }
258 : else
259 : {
260 0 : std::unique_lock lock{sink->priv->m_sinkMutex};
261 0 : sink->priv->m_queuedProperties[property] = G_VALUE_INIT;
262 0 : g_value_init(&(sink->priv->m_queuedProperties[property]), G_VALUE_TYPE(value));
263 0 : g_value_copy(value, &(sink->priv->m_queuedProperties[property]));
264 374 : }
265 : }
266 :
267 5 : static void rialto_mse_base_sink_get_property(GObject *object, guint propId, GValue *value, GParamSpec *pspec)
268 : {
269 5 : switch (propId)
270 : {
271 1 : case PROP_IS_SINGLE_PATH_STREAM:
272 : // Set default value if it can't be acquired
273 1 : g_value_set_boolean(value, FALSE);
274 1 : rialto_mse_base_sink_handle_get_property(RIALTO_MSE_BASE_SINK(object),
275 1 : IPlaybackDelegate::Property::IsSinglePathStream, value);
276 1 : break;
277 1 : case PROP_N_STREAMS:
278 : // Set default value if it can't be acquired
279 1 : g_value_set_int(value, 1);
280 1 : rialto_mse_base_sink_handle_get_property(RIALTO_MSE_BASE_SINK(object),
281 1 : IPlaybackDelegate::Property::NumberOfStreams, value);
282 1 : break;
283 1 : case PROP_HAS_DRM:
284 : // Set default value if it can't be acquired
285 1 : g_value_set_boolean(value, TRUE);
286 1 : rialto_mse_base_sink_handle_get_property(RIALTO_MSE_BASE_SINK(object), IPlaybackDelegate::Property::HasDrm,
287 : value);
288 1 : break;
289 2 : case PROP_STATS:
290 2 : rialto_mse_base_sink_handle_get_property(RIALTO_MSE_BASE_SINK(object), IPlaybackDelegate::Property::Stats, value);
291 2 : break;
292 0 : default:
293 0 : G_OBJECT_WARN_INVALID_PROPERTY_ID(object, propId, pspec);
294 0 : break;
295 : }
296 5 : }
297 :
298 303 : static void rialto_mse_base_sink_set_property(GObject *object, guint propId, const GValue *value, GParamSpec *pspec)
299 : {
300 303 : switch (propId)
301 : {
302 151 : case PROP_IS_SINGLE_PATH_STREAM:
303 151 : rialto_mse_base_sink_handle_set_property(RIALTO_MSE_BASE_SINK(object),
304 151 : IPlaybackDelegate::Property::IsSinglePathStream, value);
305 151 : break;
306 151 : case PROP_N_STREAMS:
307 151 : rialto_mse_base_sink_handle_set_property(RIALTO_MSE_BASE_SINK(object),
308 151 : IPlaybackDelegate::Property::NumberOfStreams, value);
309 151 : break;
310 1 : case PROP_HAS_DRM:
311 1 : rialto_mse_base_sink_handle_set_property(RIALTO_MSE_BASE_SINK(object), IPlaybackDelegate::Property::HasDrm,
312 : value);
313 1 : break;
314 0 : default:
315 0 : G_OBJECT_WARN_INVALID_PROPERTY_ID(object, propId, pspec);
316 0 : break;
317 : }
318 303 : }
319 :
320 56 : void rialto_mse_base_handle_rialto_server_state_changed(RialtoMSEBaseSink *sink, firebolt::rialto::PlaybackState state)
321 : {
322 56 : if (sink->priv->m_callbacks.stateChangedCallback)
323 : {
324 56 : sink->priv->m_callbacks.stateChangedCallback(state);
325 : }
326 : }
327 :
328 3 : void rialto_mse_base_handle_rialto_server_eos(RialtoMSEBaseSink *sink)
329 : {
330 3 : if (sink->priv->m_callbacks.eosCallback)
331 : {
332 3 : sink->priv->m_callbacks.eosCallback();
333 : }
334 : }
335 :
336 2 : void rialto_mse_base_handle_rialto_server_completed_flush(RialtoMSEBaseSink *sink)
337 : {
338 2 : if (sink->priv->m_callbacks.flushCompletedCallback)
339 : {
340 2 : sink->priv->m_callbacks.flushCompletedCallback();
341 : }
342 : }
343 :
344 3 : void rialto_mse_base_handle_rialto_server_sent_qos(RialtoMSEBaseSink *sink, uint64_t processed, uint64_t dropped)
345 : {
346 3 : if (sink->priv->m_callbacks.qosCallback)
347 : {
348 3 : sink->priv->m_callbacks.qosCallback(processed, dropped);
349 : }
350 : }
351 :
352 2 : void rialto_mse_base_handle_rialto_server_error(RialtoMSEBaseSink *sink, firebolt::rialto::PlaybackError error)
353 : {
354 2 : if (sink->priv->m_callbacks.errorCallback)
355 : {
356 2 : sink->priv->m_callbacks.errorCallback(error);
357 : }
358 : }
359 :
360 1 : void rialto_mse_base_handle_rialto_server_sent_buffer_underflow(RialtoMSEBaseSink *sink)
361 : {
362 1 : GST_WARNING_OBJECT(sink, "Sending underflow signal");
363 : // send 2 last parameters just to be compatible with RDK's buffer-underflow-callback signal signature
364 1 : g_signal_emit(G_OBJECT(sink), g_signals[SIGNAL_UNDERFLOW], 0, 0, nullptr);
365 : }
366 :
367 265 : bool rialto_mse_base_sink_initialise_sinkpad(RialtoMSEBaseSink *sink)
368 : {
369 : GstPadTemplate *pad_template =
370 265 : gst_element_class_get_pad_template(GST_ELEMENT_CLASS(G_OBJECT_GET_CLASS(sink)), "sink");
371 265 : if (!pad_template)
372 : {
373 0 : GST_ERROR_OBJECT(sink, "Could not find sink pad template");
374 0 : return false;
375 : }
376 :
377 265 : GstPad *sinkPad = gst_pad_new_from_template(pad_template, "sink");
378 265 : if (!sinkPad)
379 : {
380 0 : GST_ERROR_OBJECT(sink, "Could not create sinkpad");
381 0 : return false;
382 : }
383 :
384 265 : gst_element_add_pad(GST_ELEMENT_CAST(sink), sinkPad);
385 265 : sink->priv->m_sinkPad = sinkPad;
386 :
387 265 : return true;
388 : }
389 :
390 265 : static void rialto_mse_base_sink_init(RialtoMSEBaseSink *sink)
391 : {
392 265 : GST_INFO_OBJECT(sink, "Init: %" GST_PTR_FORMAT, sink);
393 265 : sink->priv = static_cast<RialtoMSEBaseSinkPrivate *>(rialto_mse_base_sink_get_instance_private(sink));
394 265 : new (sink->priv) RialtoMSEBaseSinkPrivate();
395 :
396 265 : RialtoGStreamerMSEBaseSinkCallbacks callbacks;
397 265 : callbacks.eosCallback = std::bind(rialto_mse_base_sink_eos_handler, sink);
398 265 : callbacks.flushCompletedCallback = std::bind(rialto_mse_base_sink_flush_completed_handler, sink);
399 : callbacks.stateChangedCallback =
400 265 : std::bind(rialto_mse_base_sink_rialto_state_changed_handler, sink, std::placeholders::_1);
401 265 : callbacks.errorCallback = std::bind(rialto_mse_base_sink_error_handler, sink, std::placeholders::_1);
402 265 : callbacks.qosCallback = std::bind(rialto_mse_base_sink_qos_handle, GST_ELEMENT_CAST(sink), std::placeholders::_1,
403 265 : std::placeholders::_2);
404 265 : sink->priv->m_callbacks = callbacks;
405 265 : GST_OBJECT_FLAG_SET(sink, GST_ELEMENT_FLAG_SINK);
406 : }
407 :
408 265 : static void rialto_mse_base_sink_finalize(GObject *object)
409 : {
410 265 : RialtoMSEBaseSink *sink = RIALTO_MSE_BASE_SINK(object);
411 265 : RialtoMSEBaseSinkPrivate *priv = sink->priv;
412 265 : GST_INFO_OBJECT(sink, "Finalize: %" GST_PTR_FORMAT " %" GST_PTR_FORMAT, sink, priv);
413 :
414 265 : priv->~RialtoMSEBaseSinkPrivate();
415 265 : GST_CALL_PARENT(G_OBJECT_CLASS, finalize, (object));
416 : }
417 :
418 1 : static void rialto_mse_base_sink_class_init(RialtoMSEBaseSinkClass *klass)
419 : {
420 : std::shared_ptr<firebolt::rialto::IClientLogHandler> logToGstHandler =
421 1 : std::make_shared<firebolt::rialto::LogToGstHandler>();
422 1 : if (!firebolt::rialto::IClientLogControlFactory::createFactory()->createClientLogControl().registerLogHandler(logToGstHandler,
423 : true))
424 : {
425 0 : GST_ERROR("Unable to preRegister log handler");
426 : }
427 :
428 1 : GObjectClass *gobjectClass = G_OBJECT_CLASS(klass);
429 1 : GstElementClass *elementClass = GST_ELEMENT_CLASS(klass);
430 :
431 1 : gst_element_class_set_metadata(elementClass, "Rialto MSE base sink", "Generic", "A sink for Rialto", "Sky");
432 :
433 1 : gobjectClass->finalize = rialto_mse_base_sink_finalize;
434 1 : gobjectClass->get_property = rialto_mse_base_sink_get_property;
435 1 : gobjectClass->set_property = rialto_mse_base_sink_set_property;
436 1 : elementClass->query = rialto_mse_base_sink_query;
437 1 : elementClass->send_event = rialto_mse_base_sink_send_event;
438 1 : elementClass->change_state = rialto_mse_base_sink_change_state;
439 :
440 1 : g_signals[SIGNAL_UNDERFLOW] = g_signal_new("buffer-underflow-callback", G_TYPE_FROM_CLASS(klass),
441 : (GSignalFlags)(G_SIGNAL_RUN_LAST), 0, nullptr, nullptr,
442 : g_cclosure_marshal_VOID__UINT_POINTER, G_TYPE_NONE, 2, G_TYPE_UINT,
443 : G_TYPE_POINTER);
444 :
445 1 : g_object_class_install_property(gobjectClass, PROP_IS_SINGLE_PATH_STREAM,
446 : g_param_spec_boolean("single-path-stream", "single path stream",
447 : "is single path stream", FALSE, GParamFlags(G_PARAM_READWRITE)));
448 :
449 1 : g_object_class_install_property(gobjectClass, PROP_N_STREAMS,
450 : g_param_spec_int("streams-number", "streams number", "streams number", 1, G_MAXINT,
451 : 1, GParamFlags(G_PARAM_READWRITE)));
452 :
453 1 : g_object_class_install_property(gobjectClass, PROP_HAS_DRM,
454 : g_param_spec_boolean("has-drm", "has drm", "has drm", TRUE,
455 : GParamFlags(G_PARAM_READWRITE)));
456 1 : g_object_class_install_property(gobjectClass, PROP_STATS,
457 : g_param_spec_pointer("stats", NULL, "pointer to a gst_structure",
458 : GParamFlags(G_PARAM_READABLE)));
459 : }
|