Line data Source code
1 : /*
2 : * Copyright (C) 2025 Sky UK
3 : *
4 : * This library is free software; you can redistribute it and/or
5 : * modify it under the terms of the GNU Lesser General Public
6 : * License as published by the Free Software Foundation;
7 : * version 2.1 of the License.
8 : *
9 : * This library is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 : * Lesser General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU Lesser General Public
15 : * License along with this library; if not, write to the Free Software
16 : * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 : */
18 :
19 : #include "PullModePlaybackDelegate.h"
20 : #include "ControlBackend.h"
21 : #include "GstreamerCatLog.h"
22 : #include "RialtoGStreamerMSEBaseSink.h"
23 : #include "RialtoGStreamerMSEBaseSinkPrivate.h"
24 :
25 : #define GST_CAT_DEFAULT rialtoGStreamerCat
26 :
27 : namespace
28 : {
29 298 : GstObject *getOldestGstBinParent(GstElement *element)
30 : {
31 298 : GstObject *parent = gst_object_get_parent(GST_OBJECT_CAST(element));
32 298 : GstObject *result = GST_OBJECT_CAST(element);
33 298 : if (parent)
34 : {
35 149 : if (GST_IS_BIN(parent))
36 : {
37 149 : result = getOldestGstBinParent(GST_ELEMENT_CAST(parent));
38 : }
39 149 : gst_object_unref(parent);
40 : }
41 :
42 298 : return result;
43 : }
44 :
45 6 : unsigned getGstPlayFlag(const char *nick)
46 : {
47 6 : GFlagsClass *flagsClass = static_cast<GFlagsClass *>(g_type_class_ref(g_type_from_name("GstPlayFlags")));
48 6 : GFlagsValue *flag = g_flags_get_value_by_nick(flagsClass, nick);
49 6 : return flag ? flag->value : 0;
50 : }
51 :
52 143 : bool getNStreamsFromParent(GstObject *parentObject, gint &n_video, gint &n_audio, gint &n_text)
53 : {
54 143 : if (g_object_class_find_property(G_OBJECT_GET_CLASS(parentObject), "n-video") &&
55 145 : g_object_class_find_property(G_OBJECT_GET_CLASS(parentObject), "n-audio") &&
56 2 : g_object_class_find_property(G_OBJECT_GET_CLASS(parentObject), "n-text"))
57 : {
58 2 : g_object_get(parentObject, "n-video", &n_video, "n-audio", &n_audio, "n-text", &n_text, nullptr);
59 :
60 2 : if (g_object_class_find_property(G_OBJECT_GET_CLASS(parentObject), "flags"))
61 : {
62 2 : guint flags = 0;
63 2 : g_object_get(parentObject, "flags", &flags, nullptr);
64 2 : n_video = (flags & getGstPlayFlag("video")) ? n_video : 0;
65 2 : n_audio = (flags & getGstPlayFlag("audio")) ? n_audio : 0;
66 2 : n_text = (flags & getGstPlayFlag("text")) ? n_text : 0;
67 : }
68 :
69 2 : return true;
70 : }
71 :
72 141 : return false;
73 : }
74 : } // namespace
75 :
76 263 : PullModePlaybackDelegate::PullModePlaybackDelegate(GstElement *sink) : m_sink{sink}
77 : {
78 263 : RialtoMSEBaseSink *baseSink = RIALTO_MSE_BASE_SINK(sink);
79 263 : m_sinkPad = baseSink->priv->m_sinkPad;
80 263 : m_rialtoControlClient = std::make_unique<firebolt::rialto::client::ControlBackend>();
81 263 : gst_segment_init(&m_lastSegment, GST_FORMAT_TIME);
82 : }
83 :
84 263 : PullModePlaybackDelegate::~PullModePlaybackDelegate()
85 : {
86 263 : if (m_caps)
87 141 : gst_caps_unref(m_caps);
88 263 : clearBuffersUnlocked();
89 : }
90 :
91 422 : void PullModePlaybackDelegate::clearBuffersUnlocked()
92 : {
93 422 : m_isFlushOngoing = true;
94 422 : m_needDataCondVariable.notify_all();
95 450 : while (!m_samples.empty())
96 : {
97 28 : GstSample *sample = m_samples.front();
98 28 : m_samples.pop();
99 28 : gst_sample_unref(sample);
100 : }
101 422 : }
102 :
103 137 : void PullModePlaybackDelegate::setSourceId(int32_t sourceId)
104 : {
105 137 : std::unique_lock lock{m_sinkMutex};
106 137 : m_sourceId = sourceId;
107 : }
108 :
109 3 : void PullModePlaybackDelegate::handleEos()
110 : {
111 3 : GstState currentState = GST_STATE(m_sink);
112 3 : if ((currentState != GST_STATE_PAUSED) && (currentState != GST_STATE_PLAYING))
113 : {
114 1 : GST_ERROR_OBJECT(m_sink, "Sink cannot post a EOS message in state '%s', posting an error instead",
115 : gst_element_state_get_name(currentState));
116 :
117 1 : const char *errMessage = "Rialto sinks received EOS in non-playing state";
118 1 : GError *gError{g_error_new_literal(GST_STREAM_ERROR, 0, errMessage)};
119 1 : gst_element_post_message(m_sink, gst_message_new_error(GST_OBJECT_CAST(m_sink), gError, errMessage));
120 1 : g_error_free(gError);
121 : }
122 2 : else if (!m_isFlushOngoing)
123 : {
124 1 : gst_element_post_message(m_sink, gst_message_new_eos(GST_OBJECT_CAST(m_sink)));
125 : }
126 : else
127 : {
128 1 : GST_WARNING_OBJECT(m_sink, "Skip sending eos message - flush is ongoing...");
129 : }
130 3 : }
131 :
132 2 : void PullModePlaybackDelegate::handleFlushCompleted()
133 : {
134 2 : GST_INFO_OBJECT(m_sink, "Flush completed");
135 2 : std::unique_lock<std::mutex> lock(m_flushMutex);
136 2 : m_flushCondVariable.notify_all();
137 : }
138 :
139 39 : void PullModePlaybackDelegate::handleStateChanged(firebolt::rialto::PlaybackState state)
140 : {
141 39 : GstState current = GST_STATE(m_sink);
142 39 : GstState next = GST_STATE_NEXT(m_sink);
143 39 : GstState pending = GST_STATE_PENDING(m_sink);
144 39 : GstState postNext = next == pending ? GST_STATE_VOID_PENDING : pending;
145 :
146 39 : GST_DEBUG_OBJECT(m_sink,
147 : "Received server's state change to %u. Sink's states are: current state: %s next state: %s "
148 : "pending state: %s, last return state %s",
149 : static_cast<uint32_t>(state), gst_element_state_get_name(current),
150 : gst_element_state_get_name(next), gst_element_state_get_name(pending),
151 : gst_element_state_change_return_get_name(GST_STATE_RETURN(m_sink)));
152 :
153 39 : if (m_isStateCommitNeeded)
154 : {
155 39 : if ((state == firebolt::rialto::PlaybackState::PAUSED && next == GST_STATE_PAUSED) ||
156 7 : (state == firebolt::rialto::PlaybackState::PLAYING && next == GST_STATE_PLAYING))
157 : {
158 38 : GST_STATE(m_sink) = next;
159 38 : GST_STATE_NEXT(m_sink) = postNext;
160 38 : GST_STATE_PENDING(m_sink) = GST_STATE_VOID_PENDING;
161 38 : GST_STATE_RETURN(m_sink) = GST_STATE_CHANGE_SUCCESS;
162 :
163 38 : GST_INFO_OBJECT(m_sink, "Async state transition to state %s done", gst_element_state_get_name(next));
164 :
165 38 : gst_element_post_message(m_sink,
166 38 : gst_message_new_state_changed(GST_OBJECT_CAST(m_sink), current, next, pending));
167 38 : postAsyncDone();
168 : }
169 : /* Immediately transition to PLAYING when prerolled and PLAY is requested */
170 1 : else if (state == firebolt::rialto::PlaybackState::PAUSED && current == GST_STATE_PAUSED &&
171 : next == GST_STATE_PLAYING)
172 : {
173 1 : GST_INFO_OBJECT(m_sink, "Async state transition to PAUSED done. Transitioning to PLAYING");
174 1 : changeState(GST_STATE_CHANGE_PAUSED_TO_PLAYING);
175 : }
176 : }
177 39 : }
178 :
179 836 : GstStateChangeReturn PullModePlaybackDelegate::changeState(GstStateChange transition)
180 : {
181 836 : GstState current_state = GST_STATE_TRANSITION_CURRENT(transition);
182 836 : GstState next_state = GST_STATE_TRANSITION_NEXT(transition);
183 836 : GST_INFO_OBJECT(m_sink, "State change: (%s) -> (%s)", gst_element_state_get_name(current_state),
184 : gst_element_state_get_name(next_state));
185 :
186 836 : GstStateChangeReturn status = GST_STATE_CHANGE_SUCCESS;
187 836 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
188 :
189 836 : switch (transition)
190 : {
191 263 : case GST_STATE_CHANGE_NULL_TO_READY:
192 263 : if (!m_sinkPad)
193 : {
194 0 : GST_ERROR_OBJECT(m_sink, "Cannot start, because there's no sink pad");
195 0 : return GST_STATE_CHANGE_FAILURE;
196 : }
197 263 : if (!m_rialtoControlClient->waitForRunning())
198 : {
199 0 : GST_ERROR_OBJECT(m_sink, "Control: Rialto client cannot reach running state");
200 0 : return GST_STATE_CHANGE_FAILURE;
201 : }
202 263 : GST_INFO_OBJECT(m_sink, "Control: Rialto client reached running state");
203 263 : break;
204 145 : case GST_STATE_CHANGE_READY_TO_PAUSED:
205 : {
206 145 : if (!client)
207 : {
208 0 : GST_ERROR_OBJECT(m_sink, "Cannot get the media player client object");
209 0 : return GST_STATE_CHANGE_FAILURE;
210 : }
211 :
212 145 : m_isFlushOngoing = false;
213 :
214 145 : StateChangeResult result = client->pause(m_sourceId);
215 145 : if (result == StateChangeResult::SUCCESS_ASYNC || result == StateChangeResult::NOT_ATTACHED)
216 : {
217 : // NOT_ATTACHED is not a problem here, because source will be attached later when GST_EVENT_CAPS is received
218 145 : if (result == StateChangeResult::NOT_ATTACHED)
219 : {
220 145 : postAsyncStart();
221 : }
222 145 : status = GST_STATE_CHANGE_ASYNC;
223 : }
224 :
225 145 : break;
226 : }
227 8 : case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
228 : {
229 8 : if (!client)
230 : {
231 0 : GST_ERROR_OBJECT(m_sink, "Cannot get the media player client object");
232 0 : return GST_STATE_CHANGE_FAILURE;
233 : }
234 :
235 8 : StateChangeResult result = client->play(m_sourceId);
236 8 : if (result == StateChangeResult::SUCCESS_ASYNC)
237 : {
238 8 : status = GST_STATE_CHANGE_ASYNC;
239 : }
240 0 : else if (result == StateChangeResult::NOT_ATTACHED)
241 : {
242 0 : GST_ERROR_OBJECT(m_sink, "Failed to change state to playing");
243 0 : return GST_STATE_CHANGE_FAILURE;
244 : }
245 :
246 8 : break;
247 : }
248 7 : case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
249 : {
250 7 : if (!client)
251 : {
252 0 : GST_ERROR_OBJECT(m_sink, "Cannot get the media player client object");
253 0 : return GST_STATE_CHANGE_FAILURE;
254 : }
255 :
256 7 : StateChangeResult result = client->pause(m_sourceId);
257 7 : if (result == StateChangeResult::SUCCESS_ASYNC)
258 : {
259 7 : status = GST_STATE_CHANGE_ASYNC;
260 : }
261 0 : else if (result == StateChangeResult::NOT_ATTACHED)
262 : {
263 0 : GST_ERROR_OBJECT(m_sink, "Failed to change state to paused");
264 0 : return GST_STATE_CHANGE_FAILURE;
265 : }
266 :
267 7 : break;
268 : }
269 145 : case GST_STATE_CHANGE_PAUSED_TO_READY:
270 145 : if (!client)
271 : {
272 0 : GST_ERROR_OBJECT(m_sink, "Cannot get the media player client object");
273 0 : return GST_STATE_CHANGE_FAILURE;
274 : }
275 :
276 145 : if (m_isStateCommitNeeded)
277 : {
278 122 : GST_DEBUG_OBJECT(m_sink, "Sending async_done in PAUSED->READY transition");
279 122 : postAsyncDone();
280 : }
281 :
282 145 : client->removeSource(m_sourceId);
283 : {
284 145 : std::lock_guard<std::mutex> lock(m_sinkMutex);
285 145 : clearBuffersUnlocked();
286 145 : m_sourceAttached = false;
287 : }
288 145 : break;
289 263 : case GST_STATE_CHANGE_READY_TO_NULL:
290 : // Playback will be stopped once all sources are finished and ref count
291 : // of the media pipeline object reaches 0
292 263 : m_mediaPlayerManager.releaseMediaPlayerClient();
293 263 : m_rialtoControlClient->removeControlBackend();
294 263 : break;
295 5 : default:
296 5 : break;
297 : }
298 :
299 836 : return status;
300 : }
301 :
302 2 : void PullModePlaybackDelegate::handleError(const char *message, gint code)
303 : {
304 2 : GError *gError{g_error_new_literal(GST_STREAM_ERROR, code, message)};
305 2 : gst_element_post_message(GST_ELEMENT_CAST(m_sink), gst_message_new_error(GST_OBJECT_CAST(m_sink), gError, message));
306 2 : g_error_free(gError);
307 : }
308 :
309 297 : void PullModePlaybackDelegate::postAsyncStart()
310 : {
311 297 : m_isStateCommitNeeded = true;
312 297 : gst_element_post_message(GST_ELEMENT_CAST(m_sink), gst_message_new_async_start(GST_OBJECT(m_sink)));
313 : }
314 :
315 160 : void PullModePlaybackDelegate::postAsyncDone()
316 : {
317 160 : m_isStateCommitNeeded = false;
318 160 : gst_element_post_message(m_sink, gst_message_new_async_done(GST_OBJECT_CAST(m_sink), GST_CLOCK_TIME_NONE));
319 : }
320 :
321 309 : void PullModePlaybackDelegate::setProperty(const Property &type, const GValue *value)
322 : {
323 309 : switch (type)
324 : {
325 154 : case Property::IsSinglePathStream:
326 : {
327 154 : std::lock_guard<std::mutex> lock(m_sinkMutex);
328 154 : m_isSinglePathStream = g_value_get_boolean(value) != FALSE;
329 154 : break;
330 : }
331 154 : case Property::NumberOfStreams:
332 : {
333 154 : std::lock_guard<std::mutex> lock(m_sinkMutex);
334 154 : m_numOfStreams = g_value_get_int(value);
335 154 : break;
336 : }
337 1 : case Property::HasDrm:
338 : {
339 1 : std::lock_guard<std::mutex> lock(m_sinkMutex);
340 1 : m_hasDrm = g_value_get_boolean(value) != FALSE;
341 1 : break;
342 : }
343 0 : default:
344 : {
345 0 : break;
346 : }
347 : }
348 309 : }
349 :
350 5 : void PullModePlaybackDelegate::getProperty(const Property &type, GValue *value)
351 : {
352 5 : switch (type)
353 : {
354 1 : case Property::IsSinglePathStream:
355 : {
356 1 : std::lock_guard<std::mutex> lock(m_sinkMutex);
357 1 : g_value_set_boolean(value, m_isSinglePathStream ? TRUE : FALSE);
358 1 : break;
359 : }
360 1 : case Property::NumberOfStreams:
361 : {
362 1 : std::lock_guard<std::mutex> lock(m_sinkMutex);
363 1 : g_value_set_int(value, m_numOfStreams);
364 1 : break;
365 : }
366 1 : case Property::HasDrm:
367 : {
368 1 : std::lock_guard<std::mutex> lock(m_sinkMutex);
369 1 : g_value_set_boolean(value, m_hasDrm ? TRUE : FALSE);
370 1 : break;
371 : }
372 2 : case Property::Stats:
373 : {
374 2 : std::lock_guard<std::mutex> lock(m_sinkMutex);
375 2 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
376 2 : if (!client)
377 : {
378 1 : GST_ERROR_OBJECT(m_sink, "Could not get the media player client");
379 1 : break;
380 : }
381 :
382 1 : guint64 totalVideoFrames{0};
383 1 : guint64 droppedVideoFrames{0};
384 1 : if (client->getStats(m_sourceId, totalVideoFrames, droppedVideoFrames))
385 : {
386 1 : GstStructure *stats{gst_structure_new("stats", "rendered", G_TYPE_UINT64, totalVideoFrames, "dropped",
387 : G_TYPE_UINT64, droppedVideoFrames, nullptr)};
388 1 : g_value_set_pointer(value, stats);
389 : }
390 : else
391 : {
392 0 : GST_ERROR_OBJECT(m_sink, "No stats returned from client");
393 : }
394 3 : }
395 : default:
396 : {
397 1 : break;
398 : }
399 : }
400 5 : }
401 :
402 21 : std::optional<gboolean> PullModePlaybackDelegate::handleQuery(GstQuery *query) const
403 : {
404 21 : GST_DEBUG_OBJECT(m_sink, "handling query '%s'", GST_QUERY_TYPE_NAME(query));
405 21 : switch (GST_QUERY_TYPE(query))
406 : {
407 1 : case GST_QUERY_SEEKING:
408 : {
409 : GstFormat fmt;
410 1 : gst_query_parse_seeking(query, &fmt, NULL, NULL, NULL);
411 1 : gst_query_set_seeking(query, fmt, FALSE, 0, -1);
412 1 : return TRUE;
413 : }
414 5 : case GST_QUERY_POSITION:
415 : {
416 5 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
417 5 : if (!client)
418 : {
419 1 : return FALSE;
420 : }
421 :
422 : GstFormat fmt;
423 4 : gst_query_parse_position(query, &fmt, NULL);
424 4 : switch (fmt)
425 : {
426 3 : case GST_FORMAT_TIME:
427 : {
428 3 : gint64 position = client->getPosition(m_sourceId);
429 3 : GST_DEBUG_OBJECT(m_sink, "Queried position is %" GST_TIME_FORMAT, GST_TIME_ARGS(position));
430 3 : if (position < 0)
431 : {
432 2 : return FALSE;
433 : }
434 :
435 1 : gst_query_set_position(query, fmt, position);
436 1 : break;
437 : }
438 1 : default:
439 1 : break;
440 : }
441 2 : return TRUE;
442 5 : }
443 2 : case GST_QUERY_SEGMENT:
444 : {
445 2 : std::lock_guard<std::mutex> lock(m_sinkMutex);
446 2 : GstFormat format{m_lastSegment.format};
447 2 : gint64 start{static_cast<gint64>(gst_segment_to_stream_time(&m_lastSegment, format, m_lastSegment.start))};
448 2 : gint64 stop{0};
449 2 : if (m_lastSegment.stop == GST_CLOCK_TIME_NONE)
450 : {
451 2 : stop = m_lastSegment.duration;
452 : }
453 : else
454 : {
455 0 : stop = gst_segment_to_stream_time(&m_lastSegment, format, m_lastSegment.stop);
456 : }
457 2 : gst_query_set_segment(query, m_lastSegment.rate, format, start, stop);
458 2 : return TRUE;
459 : }
460 13 : default:
461 13 : break;
462 : }
463 13 : return std::nullopt;
464 : }
465 :
466 28 : gboolean PullModePlaybackDelegate::handleSendEvent(GstEvent *event)
467 : {
468 28 : GST_DEBUG_OBJECT(m_sink, "handling event '%s'", GST_EVENT_TYPE_NAME(event));
469 28 : bool shouldForwardUpstream = GST_EVENT_IS_UPSTREAM(event);
470 :
471 28 : switch (GST_EVENT_TYPE(event))
472 : {
473 13 : case GST_EVENT_SEEK:
474 : {
475 13 : gdouble rate{1.0};
476 13 : GstFormat seekFormat{GST_FORMAT_UNDEFINED};
477 13 : GstSeekFlags flags{GST_SEEK_FLAG_NONE};
478 13 : GstSeekType startType{GST_SEEK_TYPE_NONE}, stopType{GST_SEEK_TYPE_NONE};
479 13 : gint64 start{0}, stop{0};
480 13 : if (event)
481 : {
482 13 : gst_event_parse_seek(event, &rate, &seekFormat, &flags, &startType, &start, &stopType, &stop);
483 :
484 13 : if (flags & GST_SEEK_FLAG_FLUSH)
485 : {
486 9 : if (seekFormat == GST_FORMAT_TIME && startType == GST_SEEK_TYPE_END)
487 : {
488 1 : GST_ERROR_OBJECT(m_sink, "GST_SEEK_TYPE_END seek is not supported");
489 1 : gst_event_unref(event);
490 5 : return FALSE;
491 : }
492 : // Update last segment
493 8 : if (seekFormat == GST_FORMAT_TIME)
494 : {
495 7 : gboolean update{FALSE};
496 7 : std::lock_guard<std::mutex> lock(m_sinkMutex);
497 7 : gst_segment_do_seek(&m_lastSegment, rate, seekFormat, flags, startType, start, stopType, stop,
498 : &update);
499 : }
500 : }
501 : #if GST_CHECK_VERSION(1, 18, 0)
502 4 : else if (flags & GST_SEEK_FLAG_INSTANT_RATE_CHANGE)
503 : {
504 2 : gdouble rateMultiplier = rate / m_lastSegment.rate;
505 2 : GstEvent *rateChangeEvent = gst_event_new_instant_rate_change(rateMultiplier, (GstSegmentFlags)flags);
506 2 : gst_event_set_seqnum(rateChangeEvent, gst_event_get_seqnum(event));
507 2 : gst_event_unref(event);
508 2 : if (gst_pad_send_event(m_sinkPad, rateChangeEvent) != TRUE)
509 : {
510 1 : GST_ERROR_OBJECT(m_sink, "Sending instant rate change failed.");
511 1 : return FALSE;
512 : }
513 1 : return TRUE;
514 : }
515 : #endif
516 : else
517 : {
518 2 : GST_WARNING_OBJECT(m_sink, "Seek with flags 0x%X is not supported", flags);
519 2 : gst_event_unref(event);
520 2 : return FALSE;
521 : }
522 : }
523 8 : break;
524 : }
525 : #if GST_CHECK_VERSION(1, 18, 0)
526 2 : case GST_EVENT_INSTANT_RATE_SYNC_TIME:
527 : {
528 2 : double rate{0.0};
529 2 : GstClockTime runningTime{GST_CLOCK_TIME_NONE}, upstreamRunningTime{GST_CLOCK_TIME_NONE};
530 2 : guint32 seqnum = gst_event_get_seqnum(event);
531 2 : gst_event_parse_instant_rate_sync_time(event, &rate, &runningTime, &upstreamRunningTime);
532 :
533 2 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
534 2 : if ((client) && (m_mediaPlayerManager.hasControl()))
535 : {
536 2 : GST_DEBUG_OBJECT(m_sink, "Instant playback rate change: %.2f", rate);
537 2 : m_currentInstantRateChangeSeqnum = seqnum;
538 2 : client->setPlaybackRate(rate);
539 : }
540 2 : break;
541 : }
542 : #endif
543 13 : default:
544 13 : break;
545 : }
546 :
547 23 : if (shouldForwardUpstream)
548 : {
549 23 : bool result = gst_pad_push_event(m_sinkPad, event);
550 23 : if (!result)
551 : {
552 10 : GST_DEBUG_OBJECT(m_sink, "forwarding upstream event '%s' failed", GST_EVENT_TYPE_NAME(event));
553 : }
554 :
555 23 : return result;
556 : }
557 :
558 0 : gst_event_unref(event);
559 0 : return TRUE;
560 : }
561 :
562 194 : gboolean PullModePlaybackDelegate::handleEvent(GstPad *pad, GstObject *parent, GstEvent *event)
563 : {
564 194 : GST_DEBUG_OBJECT(m_sink, "handling event %" GST_PTR_FORMAT, event);
565 194 : switch (GST_EVENT_TYPE(event))
566 : {
567 7 : case GST_EVENT_SEGMENT:
568 : {
569 7 : copySegment(event);
570 7 : setSegment();
571 7 : break;
572 : }
573 3 : case GST_EVENT_EOS:
574 : {
575 3 : std::lock_guard<std::mutex> lock(m_sinkMutex);
576 3 : m_isEos = true;
577 3 : break;
578 : }
579 145 : case GST_EVENT_CAPS:
580 : {
581 : GstCaps *caps;
582 145 : gst_event_parse_caps(event, &caps);
583 : {
584 145 : std::lock_guard<std::mutex> lock(m_sinkMutex);
585 145 : if (m_caps)
586 : {
587 4 : if (!gst_caps_is_equal(caps, m_caps))
588 : {
589 1 : gst_caps_unref(m_caps);
590 1 : m_caps = gst_caps_copy(caps);
591 : }
592 : }
593 : else
594 : {
595 141 : m_caps = gst_caps_copy(caps);
596 : }
597 145 : }
598 145 : break;
599 : }
600 1 : case GST_EVENT_SINK_MESSAGE:
601 : {
602 1 : GstMessage *message = nullptr;
603 1 : gst_event_parse_sink_message(event, &message);
604 :
605 1 : if (message)
606 : {
607 1 : gst_element_post_message(m_sink, message);
608 : }
609 :
610 1 : break;
611 : }
612 10 : case GST_EVENT_CUSTOM_DOWNSTREAM:
613 : case GST_EVENT_CUSTOM_DOWNSTREAM_OOB:
614 : {
615 10 : if (gst_event_has_name(event, "custom-instant-rate-change"))
616 : {
617 2 : GST_DEBUG_OBJECT(m_sink, "Change rate event received");
618 2 : changePlaybackRate(event);
619 : }
620 10 : break;
621 : }
622 14 : case GST_EVENT_FLUSH_START:
623 : {
624 14 : startFlushing();
625 14 : break;
626 : }
627 4 : case GST_EVENT_FLUSH_STOP:
628 : {
629 4 : gboolean resetTime{FALSE};
630 4 : gst_event_parse_flush_stop(event, &resetTime);
631 :
632 4 : stopFlushing(resetTime);
633 4 : break;
634 : }
635 3 : case GST_EVENT_STREAM_COLLECTION:
636 : {
637 3 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
638 3 : if (!client)
639 : {
640 1 : gst_event_unref(event);
641 1 : return FALSE;
642 : }
643 2 : int32_t videoStreams{0}, audioStreams{0}, textStreams{0};
644 2 : GstStreamCollection *streamCollection{nullptr};
645 2 : gst_event_parse_stream_collection(event, &streamCollection);
646 2 : guint streamsSize = gst_stream_collection_get_size(streamCollection);
647 6 : for (guint i = 0; i < streamsSize; ++i)
648 : {
649 4 : auto *stream = gst_stream_collection_get_stream(streamCollection, i);
650 4 : auto type = gst_stream_get_stream_type(stream);
651 4 : if (type & GST_STREAM_TYPE_AUDIO)
652 : {
653 2 : ++audioStreams;
654 : }
655 2 : else if (type & GST_STREAM_TYPE_VIDEO)
656 : {
657 1 : ++videoStreams;
658 : }
659 1 : else if (type & GST_STREAM_TYPE_TEXT)
660 : {
661 1 : ++textStreams;
662 : }
663 : }
664 2 : gst_object_unref(streamCollection);
665 2 : client->handleStreamCollection(audioStreams, videoStreams, textStreams);
666 2 : client->sendAllSourcesAttachedIfPossible();
667 2 : break;
668 3 : }
669 : #if GST_CHECK_VERSION(1, 18, 0)
670 4 : case GST_EVENT_INSTANT_RATE_CHANGE:
671 : {
672 4 : guint32 seqnum = gst_event_get_seqnum(event);
673 7 : if (m_lastInstantRateChangeSeqnum == seqnum || m_currentInstantRateChangeSeqnum.load() == seqnum)
674 : {
675 : /* Ignore if we already received the instant-rate-sync-time event from the pipeline */
676 2 : GST_DEBUG_OBJECT(m_sink, "Instant rate change event with seqnum %u already handled. Ignoring...", seqnum);
677 2 : break;
678 : }
679 :
680 2 : m_lastInstantRateChangeSeqnum = seqnum;
681 2 : gdouble rate{0.0};
682 2 : GstSegmentFlags flags{GST_SEGMENT_FLAG_NONE};
683 2 : gst_event_parse_instant_rate_change(event, &rate, &flags);
684 2 : GstMessage *msg = gst_message_new_instant_rate_request(GST_OBJECT_CAST(m_sink), rate);
685 2 : gst_message_set_seqnum(msg, seqnum);
686 2 : gst_element_post_message(m_sink, msg);
687 2 : break;
688 : }
689 : #endif
690 3 : default:
691 3 : break;
692 : }
693 :
694 193 : gst_event_unref(event);
695 :
696 193 : return TRUE;
697 : }
698 :
699 7 : void PullModePlaybackDelegate::copySegment(GstEvent *event)
700 : {
701 7 : std::lock_guard<std::mutex> lock(m_sinkMutex);
702 7 : gst_event_copy_segment(event, &m_lastSegment);
703 : }
704 :
705 7 : void PullModePlaybackDelegate::setSegment()
706 : {
707 7 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
708 7 : if (!client)
709 : {
710 1 : GST_ERROR_OBJECT(m_sink, "Could not get the media player client");
711 1 : return;
712 : }
713 6 : const bool kResetTime{m_lastSegment.flags == GST_SEGMENT_FLAG_RESET};
714 6 : int64_t position = static_cast<int64_t>(m_lastSegment.start);
715 : {
716 6 : std::unique_lock lock{m_sinkMutex};
717 6 : m_initialPositionSet = true;
718 6 : if (m_queuedOffset)
719 : {
720 1 : position = m_queuedOffset.value();
721 1 : m_queuedOffset.reset();
722 : }
723 6 : }
724 6 : client->setSourcePosition(m_sourceId, position, kResetTime, m_lastSegment.applied_rate, m_lastSegment.stop);
725 7 : }
726 :
727 2 : void PullModePlaybackDelegate::changePlaybackRate(GstEvent *event)
728 : {
729 2 : const GstStructure *structure{gst_event_get_structure(event)};
730 2 : gdouble playbackRate{1.0};
731 2 : if (gst_structure_get_double(structure, "rate", &playbackRate) == TRUE)
732 : {
733 2 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
734 2 : if (client && m_mediaPlayerManager.hasControl())
735 : {
736 1 : GST_DEBUG_OBJECT(m_sink, "Instant playback rate change: %.2f", playbackRate);
737 1 : client->setPlaybackRate(playbackRate);
738 : }
739 2 : }
740 : }
741 :
742 14 : void PullModePlaybackDelegate::startFlushing()
743 : {
744 14 : std::lock_guard<std::mutex> lock(m_sinkMutex);
745 14 : if (!m_isFlushOngoing)
746 : {
747 14 : GST_INFO_OBJECT(m_sink, "Starting flushing");
748 14 : if (m_isEos)
749 : {
750 2 : GST_DEBUG_OBJECT(m_sink, "Flush will clear EOS state.");
751 2 : m_isEos = false;
752 : }
753 14 : m_isFlushOngoing = true;
754 : // We expect to receive a new gst segment after flush
755 14 : m_initialPositionSet = false;
756 14 : clearBuffersUnlocked();
757 : }
758 : }
759 :
760 4 : void PullModePlaybackDelegate::stopFlushing(bool resetTime)
761 : {
762 4 : GST_INFO_OBJECT(m_sink, "Stopping flushing");
763 4 : flushServer(resetTime);
764 4 : std::lock_guard<std::mutex> lock(m_sinkMutex);
765 4 : m_isFlushOngoing = false;
766 :
767 4 : if (resetTime)
768 : {
769 4 : GST_DEBUG_OBJECT(m_sink, "sending reset_time message");
770 4 : gst_element_post_message(m_sink, gst_message_new_reset_time(GST_OBJECT_CAST(m_sink), 0));
771 : }
772 : }
773 :
774 4 : void PullModePlaybackDelegate::flushServer(bool resetTime)
775 : {
776 4 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
777 4 : if (!client)
778 : {
779 1 : GST_ERROR_OBJECT(m_sink, "Could not get the media player client");
780 1 : return;
781 : }
782 :
783 3 : std::unique_lock<std::mutex> lock(m_flushMutex);
784 3 : GST_INFO_OBJECT(m_sink, "Flushing sink with sourceId %d", m_sourceId.load());
785 3 : client->flush(m_sourceId, resetTime);
786 3 : if (m_sourceAttached)
787 : {
788 2 : m_flushCondVariable.wait(lock);
789 : }
790 : else
791 : {
792 1 : GST_DEBUG_OBJECT(m_sink, "Skip waiting for flush finish - source not attached yet.");
793 : }
794 4 : }
795 :
796 32 : GstFlowReturn PullModePlaybackDelegate::handleBuffer(GstBuffer *buffer)
797 : {
798 32 : constexpr size_t kMaxInternalBuffersQueueSize = 24;
799 32 : GST_LOG_OBJECT(m_sink, "Handling buffer %p with PTS %" GST_TIME_FORMAT, buffer,
800 : GST_TIME_ARGS(GST_BUFFER_PTS(buffer)));
801 :
802 32 : std::unique_lock<std::mutex> lock(m_sinkMutex);
803 :
804 32 : if (m_samples.size() >= kMaxInternalBuffersQueueSize)
805 : {
806 1 : GST_DEBUG_OBJECT(m_sink, "Waiting for more space in buffers queue\n");
807 1 : m_needDataCondVariable.wait(lock);
808 : }
809 :
810 32 : if (m_isFlushOngoing)
811 : {
812 3 : GST_DEBUG_OBJECT(m_sink, "Discarding buffer which was received during flushing");
813 3 : gst_buffer_unref(buffer);
814 3 : return GST_FLOW_FLUSHING;
815 : }
816 :
817 29 : GstSample *sample = gst_sample_new(buffer, m_caps, &m_lastSegment, nullptr);
818 29 : if (sample)
819 29 : m_samples.push(sample);
820 : else
821 0 : GST_ERROR_OBJECT(m_sink, "Failed to create a sample");
822 :
823 29 : gst_buffer_unref(buffer);
824 :
825 29 : return GST_FLOW_OK;
826 32 : }
827 :
828 1 : GstRefSample PullModePlaybackDelegate::getFrontSample() const
829 : {
830 1 : std::lock_guard<std::mutex> lock(m_sinkMutex);
831 1 : if (!m_samples.empty())
832 : {
833 1 : GstSample *sample = m_samples.front();
834 1 : GstBuffer *buffer = gst_sample_get_buffer(sample);
835 1 : GST_LOG_OBJECT(m_sink, "Pulling buffer %p with PTS %" GST_TIME_FORMAT, buffer,
836 : GST_TIME_ARGS(GST_BUFFER_PTS(buffer)));
837 :
838 1 : return GstRefSample{sample};
839 : }
840 :
841 0 : return GstRefSample{};
842 1 : }
843 :
844 1 : void PullModePlaybackDelegate::popSample()
845 : {
846 1 : std::lock_guard<std::mutex> lock(m_sinkMutex);
847 1 : if (!m_samples.empty())
848 : {
849 1 : gst_sample_unref(m_samples.front());
850 1 : m_samples.pop();
851 : }
852 1 : m_needDataCondVariable.notify_all();
853 : }
854 :
855 0 : bool PullModePlaybackDelegate::isEos() const
856 : {
857 0 : std::lock_guard<std::mutex> lock(m_sinkMutex);
858 0 : return m_samples.empty() && m_isEos;
859 : }
860 :
861 4 : void PullModePlaybackDelegate::lostState()
862 : {
863 4 : m_isStateCommitNeeded = true;
864 4 : gst_element_lost_state(m_sink);
865 : }
866 :
867 149 : bool PullModePlaybackDelegate::attachToMediaClientAndSetStreamsNumber(const uint32_t maxVideoWidth,
868 : const uint32_t maxVideoHeight)
869 : {
870 149 : GstObject *parentObject = getOldestGstBinParent(m_sink);
871 149 : if (!m_mediaPlayerManager.attachMediaPlayerClient(parentObject, maxVideoWidth, maxVideoHeight))
872 : {
873 3 : GST_ERROR_OBJECT(m_sink, "Cannot attach the MediaPlayerClient");
874 3 : return false;
875 : }
876 :
877 146 : gchar *parentObjectName = gst_object_get_name(parentObject);
878 146 : GST_INFO_OBJECT(m_sink, "Attached media player client with parent %s(%p)", parentObjectName, parentObject);
879 146 : g_free(parentObjectName);
880 :
881 146 : return setStreamsNumber(parentObject);
882 : }
883 :
884 146 : bool PullModePlaybackDelegate::setStreamsNumber(GstObject *parentObject)
885 : {
886 146 : int32_t videoStreams{-1}, audioStreams{-1}, subtitleStreams{-1};
887 :
888 146 : GstContext *context = gst_element_get_context(m_sink, "streams-info");
889 146 : if (context)
890 : {
891 3 : GST_DEBUG_OBJECT(m_sink, "Getting number of streams from \"streams-info\" context");
892 :
893 3 : guint n_video{0}, n_audio{0}, n_text{0};
894 :
895 3 : const GstStructure *streamsInfoStructure = gst_context_get_structure(context);
896 3 : gst_structure_get_uint(streamsInfoStructure, "video-streams", &n_video);
897 3 : gst_structure_get_uint(streamsInfoStructure, "audio-streams", &n_audio);
898 3 : gst_structure_get_uint(streamsInfoStructure, "text-streams", &n_text);
899 :
900 5 : if (n_video > std::numeric_limits<int32_t>::max() || n_audio > std::numeric_limits<int32_t>::max() ||
901 2 : n_text > std::numeric_limits<int32_t>::max())
902 : {
903 1 : GST_ERROR_OBJECT(m_sink, "Number of streams is too big, video=%u, audio=%u, text=%u", n_video, n_audio,
904 : n_text);
905 1 : gst_context_unref(context);
906 1 : return false;
907 : }
908 :
909 2 : videoStreams = n_video;
910 2 : audioStreams = n_audio;
911 2 : subtitleStreams = n_text;
912 :
913 2 : gst_context_unref(context);
914 : }
915 143 : else if (getNStreamsFromParent(parentObject, videoStreams, audioStreams, subtitleStreams))
916 : {
917 2 : GST_DEBUG_OBJECT(m_sink, "Got number of streams from playbin2 properties");
918 : }
919 : else
920 : {
921 : // The default value of streams is V:1, A:1, S:0
922 : // Changing the default setting via properties is considered as DEPRECATED
923 141 : subtitleStreams = 0;
924 141 : std::lock_guard<std::mutex> lock{m_sinkMutex};
925 141 : if (m_mediaSourceType == firebolt::rialto::MediaSourceType::VIDEO)
926 : {
927 27 : videoStreams = m_numOfStreams;
928 27 : if (m_isSinglePathStream)
929 : {
930 26 : audioStreams = 0;
931 26 : subtitleStreams = 0;
932 : }
933 : }
934 114 : else if (m_mediaSourceType == firebolt::rialto::MediaSourceType::AUDIO)
935 : {
936 103 : audioStreams = m_numOfStreams;
937 103 : if (m_isSinglePathStream)
938 : {
939 102 : videoStreams = 0;
940 102 : subtitleStreams = 0;
941 : }
942 : }
943 11 : else if (m_mediaSourceType == firebolt::rialto::MediaSourceType::SUBTITLE)
944 : {
945 11 : subtitleStreams = m_numOfStreams;
946 11 : if (m_isSinglePathStream)
947 : {
948 11 : videoStreams = 0;
949 11 : audioStreams = 0;
950 : }
951 : }
952 141 : }
953 :
954 145 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
955 145 : if (!client)
956 : {
957 0 : GST_ERROR_OBJECT(m_sink, "MediaPlayerClient is nullptr");
958 0 : return false;
959 : }
960 :
961 145 : client->handleStreamCollection(audioStreams, videoStreams, subtitleStreams);
962 :
963 145 : return true;
964 : }
|