Line data Source code
1 : /*
2 : * Copyright (C) 2025 Sky UK
3 : *
4 : * This library is free software; you can redistribute it and/or
5 : * modify it under the terms of the GNU Lesser General Public
6 : * License as published by the Free Software Foundation;
7 : * version 2.1 of the License.
8 : *
9 : * This library is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 : * Lesser General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU Lesser General Public
15 : * License along with this library; if not, write to the Free Software
16 : * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 : */
18 :
19 : #include "PullModePlaybackDelegate.h"
20 : #include "ControlBackend.h"
21 : #include "GstreamerCatLog.h"
22 : #include "RialtoGStreamerMSEBaseSink.h"
23 : #include "RialtoGStreamerMSEBaseSinkPrivate.h"
24 :
25 : #define GST_CAT_DEFAULT rialtoGStreamerCat
26 :
27 : namespace
28 : {
29 298 : GstObject *getOldestGstBinParent(GstElement *element)
30 : {
31 298 : GstObject *parent = gst_object_get_parent(GST_OBJECT_CAST(element));
32 298 : GstObject *result = GST_OBJECT_CAST(element);
33 298 : if (parent)
34 : {
35 149 : if (GST_IS_BIN(parent))
36 : {
37 149 : result = getOldestGstBinParent(GST_ELEMENT_CAST(parent));
38 : }
39 149 : gst_object_unref(parent);
40 : }
41 :
42 298 : return result;
43 : }
44 :
45 6 : unsigned getGstPlayFlag(const char *nick)
46 : {
47 6 : GFlagsClass *flagsClass = static_cast<GFlagsClass *>(g_type_class_ref(g_type_from_name("GstPlayFlags")));
48 6 : GFlagsValue *flag = g_flags_get_value_by_nick(flagsClass, nick);
49 6 : return flag ? flag->value : 0;
50 : }
51 :
52 143 : bool getNStreamsFromParent(GstObject *parentObject, gint &n_video, gint &n_audio, gint &n_text)
53 : {
54 143 : if (g_object_class_find_property(G_OBJECT_GET_CLASS(parentObject), "n-video") &&
55 145 : g_object_class_find_property(G_OBJECT_GET_CLASS(parentObject), "n-audio") &&
56 2 : g_object_class_find_property(G_OBJECT_GET_CLASS(parentObject), "n-text"))
57 : {
58 2 : g_object_get(parentObject, "n-video", &n_video, "n-audio", &n_audio, "n-text", &n_text, nullptr);
59 :
60 2 : if (g_object_class_find_property(G_OBJECT_GET_CLASS(parentObject), "flags"))
61 : {
62 2 : guint flags = 0;
63 2 : g_object_get(parentObject, "flags", &flags, nullptr);
64 2 : n_video = (flags & getGstPlayFlag("video")) ? n_video : 0;
65 2 : n_audio = (flags & getGstPlayFlag("audio")) ? n_audio : 0;
66 2 : n_text = (flags & getGstPlayFlag("text")) ? n_text : 0;
67 : }
68 :
69 2 : return true;
70 : }
71 :
72 141 : return false;
73 : }
74 : } // namespace
75 :
76 264 : PullModePlaybackDelegate::PullModePlaybackDelegate(GstElement *sink) : m_sink{sink}
77 : {
78 264 : RialtoMSEBaseSink *baseSink = RIALTO_MSE_BASE_SINK(sink);
79 264 : m_sinkPad = baseSink->priv->m_sinkPad;
80 264 : m_rialtoControlClient = std::make_unique<firebolt::rialto::client::ControlBackend>();
81 264 : gst_segment_init(&m_lastSegment, GST_FORMAT_TIME);
82 : }
83 :
84 264 : PullModePlaybackDelegate::~PullModePlaybackDelegate()
85 : {
86 264 : if (m_caps)
87 141 : gst_caps_unref(m_caps);
88 264 : clearBuffersUnlocked();
89 : }
90 :
91 422 : void PullModePlaybackDelegate::clearBuffersUnlocked()
92 : {
93 422 : m_isFlushOngoing = true;
94 422 : m_needDataCondVariable.notify_all();
95 450 : while (!m_samples.empty())
96 : {
97 28 : GstSample *sample = m_samples.front();
98 28 : m_samples.pop();
99 28 : gst_sample_unref(sample);
100 : }
101 422 : }
102 :
103 137 : void PullModePlaybackDelegate::setSourceId(int32_t sourceId)
104 : {
105 137 : std::unique_lock lock{m_sinkMutex};
106 137 : m_sourceId = sourceId;
107 : }
108 :
109 3 : void PullModePlaybackDelegate::handleEos()
110 : {
111 3 : GstState currentState = GST_STATE(m_sink);
112 3 : if ((currentState != GST_STATE_PAUSED) && (currentState != GST_STATE_PLAYING))
113 : {
114 1 : GST_ERROR_OBJECT(m_sink, "Sink cannot post a EOS message in state '%s', posting an error instead",
115 : gst_element_state_get_name(currentState));
116 :
117 1 : const char *errMessage = "Rialto sinks received EOS in non-playing state";
118 1 : GError *gError{g_error_new_literal(GST_STREAM_ERROR, 0, errMessage)};
119 1 : gst_element_post_message(m_sink, gst_message_new_error(GST_OBJECT_CAST(m_sink), gError, errMessage));
120 1 : g_error_free(gError);
121 : }
122 2 : else if (!m_isFlushOngoing)
123 : {
124 1 : gst_element_post_message(m_sink, gst_message_new_eos(GST_OBJECT_CAST(m_sink)));
125 : }
126 : else
127 : {
128 1 : GST_WARNING_OBJECT(m_sink, "Skip sending eos message - flush is ongoing...");
129 : }
130 3 : }
131 :
132 2 : void PullModePlaybackDelegate::handleFlushCompleted()
133 : {
134 2 : GST_INFO_OBJECT(m_sink, "Flush completed");
135 2 : std::unique_lock<std::mutex> lock(m_flushMutex);
136 2 : m_flushCondVariable.notify_all();
137 : }
138 :
139 37 : void PullModePlaybackDelegate::handleStateChanged(firebolt::rialto::PlaybackState state)
140 : {
141 37 : GstState current = GST_STATE(m_sink);
142 37 : GstState next = GST_STATE_NEXT(m_sink);
143 37 : GstState pending = GST_STATE_PENDING(m_sink);
144 37 : GstState postNext = next == pending ? GST_STATE_VOID_PENDING : pending;
145 :
146 37 : GST_DEBUG_OBJECT(m_sink,
147 : "Received server's state change to %u. Sink's states are: current state: %s next state: %s "
148 : "pending state: %s, last return state %s",
149 : static_cast<uint32_t>(state), gst_element_state_get_name(current),
150 : gst_element_state_get_name(next), gst_element_state_get_name(pending),
151 : gst_element_state_change_return_get_name(GST_STATE_RETURN(m_sink)));
152 :
153 37 : if (m_isStateCommitNeeded)
154 : {
155 37 : if ((state == firebolt::rialto::PlaybackState::PAUSED && next == GST_STATE_PAUSED) ||
156 7 : (state == firebolt::rialto::PlaybackState::PLAYING && next == GST_STATE_PLAYING))
157 : {
158 36 : GST_STATE(m_sink) = next;
159 36 : GST_STATE_NEXT(m_sink) = postNext;
160 36 : GST_STATE_PENDING(m_sink) = GST_STATE_VOID_PENDING;
161 36 : GST_STATE_RETURN(m_sink) = GST_STATE_CHANGE_SUCCESS;
162 :
163 36 : GST_INFO_OBJECT(m_sink, "Async state transition to state %s done", gst_element_state_get_name(next));
164 :
165 36 : gst_element_post_message(m_sink,
166 36 : gst_message_new_state_changed(GST_OBJECT_CAST(m_sink), current, next, pending));
167 36 : postAsyncDone();
168 : }
169 : /* Immediately transition to PLAYING when prerolled and PLAY is requested */
170 1 : else if (state == firebolt::rialto::PlaybackState::PAUSED && current == GST_STATE_PAUSED &&
171 : next == GST_STATE_PLAYING)
172 : {
173 1 : GST_INFO_OBJECT(m_sink, "Async state transition to PAUSED done. Transitioning to PLAYING");
174 1 : changeState(GST_STATE_CHANGE_PAUSED_TO_PLAYING);
175 : }
176 : }
177 37 : }
178 :
179 837 : GstStateChangeReturn PullModePlaybackDelegate::changeState(GstStateChange transition)
180 : {
181 837 : GstState current_state = GST_STATE_TRANSITION_CURRENT(transition);
182 837 : GstState next_state = GST_STATE_TRANSITION_NEXT(transition);
183 837 : GST_INFO_OBJECT(m_sink, "State change: (%s) -> (%s)", gst_element_state_get_name(current_state),
184 : gst_element_state_get_name(next_state));
185 :
186 837 : GstStateChangeReturn status = GST_STATE_CHANGE_SUCCESS;
187 837 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
188 :
189 837 : switch (transition)
190 : {
191 264 : case GST_STATE_CHANGE_NULL_TO_READY:
192 264 : if (!m_sinkPad)
193 : {
194 0 : GST_ERROR_OBJECT(m_sink, "Cannot start, because there's no sink pad");
195 0 : return GST_STATE_CHANGE_FAILURE;
196 : }
197 264 : if (!m_rialtoControlClient->waitForRunning())
198 : {
199 0 : GST_ERROR_OBJECT(m_sink, "Control: Rialto client cannot reach running state");
200 0 : return GST_STATE_CHANGE_FAILURE;
201 : }
202 264 : GST_INFO_OBJECT(m_sink, "Control: Rialto client reached running state");
203 264 : break;
204 145 : case GST_STATE_CHANGE_READY_TO_PAUSED:
205 : {
206 145 : if (!client)
207 : {
208 0 : GST_ERROR_OBJECT(m_sink, "Cannot get the media player client object");
209 0 : return GST_STATE_CHANGE_FAILURE;
210 : }
211 :
212 145 : m_isFlushOngoing = false;
213 :
214 145 : StateChangeResult result = client->pause(m_sourceId);
215 145 : if (result == StateChangeResult::SUCCESS_ASYNC || result == StateChangeResult::NOT_ATTACHED)
216 : {
217 : // NOT_ATTACHED is not a problem here, because source will be attached later when GST_EVENT_CAPS is received
218 145 : if (result == StateChangeResult::NOT_ATTACHED)
219 : {
220 145 : postAsyncStart();
221 : }
222 145 : status = GST_STATE_CHANGE_ASYNC;
223 : }
224 :
225 145 : break;
226 : }
227 8 : case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
228 : {
229 8 : if (!client)
230 : {
231 0 : GST_ERROR_OBJECT(m_sink, "Cannot get the media player client object");
232 0 : return GST_STATE_CHANGE_FAILURE;
233 : }
234 :
235 8 : StateChangeResult result = client->play(m_sourceId);
236 8 : if (result == StateChangeResult::SUCCESS_ASYNC)
237 : {
238 8 : status = GST_STATE_CHANGE_ASYNC;
239 : }
240 0 : else if (result == StateChangeResult::NOT_ATTACHED)
241 : {
242 0 : GST_ERROR_OBJECT(m_sink, "Failed to change state to playing");
243 0 : return GST_STATE_CHANGE_FAILURE;
244 : }
245 :
246 8 : break;
247 : }
248 7 : case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
249 : {
250 7 : if (!client)
251 : {
252 0 : GST_ERROR_OBJECT(m_sink, "Cannot get the media player client object");
253 0 : return GST_STATE_CHANGE_FAILURE;
254 : }
255 :
256 7 : StateChangeResult result = client->pause(m_sourceId);
257 7 : if (result == StateChangeResult::SUCCESS_ASYNC)
258 : {
259 7 : status = GST_STATE_CHANGE_ASYNC;
260 : }
261 0 : else if (result == StateChangeResult::NOT_ATTACHED)
262 : {
263 0 : GST_ERROR_OBJECT(m_sink, "Failed to change state to paused");
264 0 : return GST_STATE_CHANGE_FAILURE;
265 : }
266 :
267 7 : break;
268 : }
269 145 : case GST_STATE_CHANGE_PAUSED_TO_READY:
270 145 : if (!client)
271 : {
272 0 : GST_ERROR_OBJECT(m_sink, "Cannot get the media player client object");
273 0 : return GST_STATE_CHANGE_FAILURE;
274 : }
275 :
276 145 : if (m_isStateCommitNeeded)
277 : {
278 124 : GST_DEBUG_OBJECT(m_sink, "Sending async_done in PAUSED->READY transition");
279 124 : postAsyncDone();
280 : }
281 :
282 145 : client->removeSource(m_sourceId);
283 : {
284 145 : std::lock_guard<std::mutex> lock(m_sinkMutex);
285 145 : clearBuffersUnlocked();
286 145 : m_sourceAttached = false;
287 : }
288 145 : break;
289 264 : case GST_STATE_CHANGE_READY_TO_NULL:
290 : // Playback will be stopped once all sources are finished and ref count
291 : // of the media pipeline object reaches 0
292 264 : m_mediaPlayerManager.releaseMediaPlayerClient();
293 264 : m_rialtoControlClient->removeControlBackend();
294 264 : break;
295 4 : default:
296 4 : break;
297 : }
298 :
299 837 : return status;
300 : }
301 :
302 2 : void PullModePlaybackDelegate::handleError(const std::string &message, gint code)
303 : {
304 2 : GError *gError{g_error_new_literal(GST_STREAM_ERROR, code, message.c_str())};
305 4 : gst_element_post_message(GST_ELEMENT_CAST(m_sink),
306 2 : gst_message_new_error(GST_OBJECT_CAST(m_sink), gError, message.c_str()));
307 2 : g_error_free(gError);
308 : }
309 :
310 297 : void PullModePlaybackDelegate::postAsyncStart()
311 : {
312 297 : m_isStateCommitNeeded = true;
313 297 : gst_element_post_message(GST_ELEMENT_CAST(m_sink), gst_message_new_async_start(GST_OBJECT(m_sink)));
314 : }
315 :
316 160 : void PullModePlaybackDelegate::postAsyncDone()
317 : {
318 160 : m_isStateCommitNeeded = false;
319 160 : gst_element_post_message(m_sink, gst_message_new_async_done(GST_OBJECT_CAST(m_sink), GST_CLOCK_TIME_NONE));
320 : }
321 :
322 309 : void PullModePlaybackDelegate::setProperty(const Property &type, const GValue *value)
323 : {
324 309 : switch (type)
325 : {
326 154 : case Property::IsSinglePathStream:
327 : {
328 154 : std::lock_guard<std::mutex> lock(m_sinkMutex);
329 154 : m_isSinglePathStream = g_value_get_boolean(value) != FALSE;
330 154 : break;
331 : }
332 154 : case Property::NumberOfStreams:
333 : {
334 154 : std::lock_guard<std::mutex> lock(m_sinkMutex);
335 154 : m_numOfStreams = g_value_get_int(value);
336 154 : break;
337 : }
338 1 : case Property::HasDrm:
339 : {
340 1 : std::lock_guard<std::mutex> lock(m_sinkMutex);
341 1 : m_hasDrm = g_value_get_boolean(value) != FALSE;
342 1 : break;
343 : }
344 0 : default:
345 : {
346 0 : break;
347 : }
348 : }
349 309 : }
350 :
351 5 : void PullModePlaybackDelegate::getProperty(const Property &type, GValue *value)
352 : {
353 5 : switch (type)
354 : {
355 1 : case Property::IsSinglePathStream:
356 : {
357 1 : std::lock_guard<std::mutex> lock(m_sinkMutex);
358 1 : g_value_set_boolean(value, m_isSinglePathStream ? TRUE : FALSE);
359 1 : break;
360 : }
361 1 : case Property::NumberOfStreams:
362 : {
363 1 : std::lock_guard<std::mutex> lock(m_sinkMutex);
364 1 : g_value_set_int(value, m_numOfStreams);
365 1 : break;
366 : }
367 1 : case Property::HasDrm:
368 : {
369 1 : std::lock_guard<std::mutex> lock(m_sinkMutex);
370 1 : g_value_set_boolean(value, m_hasDrm ? TRUE : FALSE);
371 1 : break;
372 : }
373 2 : case Property::Stats:
374 : {
375 2 : std::lock_guard<std::mutex> lock(m_sinkMutex);
376 2 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
377 2 : if (!client)
378 : {
379 1 : GST_ERROR_OBJECT(m_sink, "Could not get the media player client");
380 1 : break;
381 : }
382 :
383 1 : guint64 totalVideoFrames{0};
384 1 : guint64 droppedVideoFrames{0};
385 1 : if (client->getStats(m_sourceId, totalVideoFrames, droppedVideoFrames))
386 : {
387 1 : GstStructure *stats{gst_structure_new("stats", "rendered", G_TYPE_UINT64, totalVideoFrames, "dropped",
388 : G_TYPE_UINT64, droppedVideoFrames, nullptr)};
389 1 : g_value_set_pointer(value, stats);
390 : }
391 : else
392 : {
393 0 : GST_ERROR_OBJECT(m_sink, "No stats returned from client");
394 : }
395 3 : }
396 : default:
397 : {
398 1 : break;
399 : }
400 : }
401 5 : }
402 :
403 22 : std::optional<gboolean> PullModePlaybackDelegate::handleQuery(GstQuery *query) const
404 : {
405 22 : GST_DEBUG_OBJECT(m_sink, "handling query '%s'", GST_QUERY_TYPE_NAME(query));
406 22 : switch (GST_QUERY_TYPE(query))
407 : {
408 1 : case GST_QUERY_SEEKING:
409 : {
410 : GstFormat fmt;
411 1 : gst_query_parse_seeking(query, &fmt, NULL, NULL, NULL);
412 1 : gst_query_set_seeking(query, fmt, FALSE, 0, -1);
413 1 : return TRUE;
414 : }
415 5 : case GST_QUERY_POSITION:
416 : {
417 5 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
418 5 : if (!client)
419 : {
420 1 : return FALSE;
421 : }
422 :
423 : GstFormat fmt;
424 4 : gst_query_parse_position(query, &fmt, NULL);
425 4 : switch (fmt)
426 : {
427 3 : case GST_FORMAT_TIME:
428 : {
429 3 : gint64 position = client->getPosition(m_sourceId);
430 3 : GST_DEBUG_OBJECT(m_sink, "Queried position is %" GST_TIME_FORMAT, GST_TIME_ARGS(position));
431 3 : if (position < 0)
432 : {
433 2 : return FALSE;
434 : }
435 :
436 1 : gst_query_set_position(query, fmt, position);
437 1 : break;
438 : }
439 1 : default:
440 1 : break;
441 : }
442 2 : return TRUE;
443 5 : }
444 2 : case GST_QUERY_SEGMENT:
445 : {
446 2 : std::lock_guard<std::mutex> lock(m_sinkMutex);
447 2 : GstFormat format{m_lastSegment.format};
448 2 : gint64 start{static_cast<gint64>(gst_segment_to_stream_time(&m_lastSegment, format, m_lastSegment.start))};
449 2 : gint64 stop{0};
450 2 : if (m_lastSegment.stop == GST_CLOCK_TIME_NONE)
451 : {
452 2 : stop = m_lastSegment.duration;
453 : }
454 : else
455 : {
456 0 : stop = gst_segment_to_stream_time(&m_lastSegment, format, m_lastSegment.stop);
457 : }
458 2 : gst_query_set_segment(query, m_lastSegment.rate, format, start, stop);
459 2 : return TRUE;
460 : }
461 14 : default:
462 14 : break;
463 : }
464 14 : return std::nullopt;
465 : }
466 :
467 29 : gboolean PullModePlaybackDelegate::handleSendEvent(GstEvent *event)
468 : {
469 29 : GST_DEBUG_OBJECT(m_sink, "handling event '%s'", GST_EVENT_TYPE_NAME(event));
470 29 : bool shouldForwardUpstream = GST_EVENT_IS_UPSTREAM(event);
471 :
472 29 : switch (GST_EVENT_TYPE(event))
473 : {
474 13 : case GST_EVENT_SEEK:
475 : {
476 13 : gdouble rate{1.0};
477 13 : GstFormat seekFormat{GST_FORMAT_UNDEFINED};
478 13 : GstSeekFlags flags{GST_SEEK_FLAG_NONE};
479 13 : GstSeekType startType{GST_SEEK_TYPE_NONE}, stopType{GST_SEEK_TYPE_NONE};
480 13 : gint64 start{0}, stop{0};
481 13 : if (event)
482 : {
483 13 : gst_event_parse_seek(event, &rate, &seekFormat, &flags, &startType, &start, &stopType, &stop);
484 :
485 13 : if (flags & GST_SEEK_FLAG_FLUSH)
486 : {
487 9 : if (seekFormat == GST_FORMAT_TIME && startType == GST_SEEK_TYPE_END)
488 : {
489 1 : GST_ERROR_OBJECT(m_sink, "GST_SEEK_TYPE_END seek is not supported");
490 1 : gst_event_unref(event);
491 5 : return FALSE;
492 : }
493 : // Update last segment
494 8 : if (seekFormat == GST_FORMAT_TIME)
495 : {
496 7 : gboolean update{FALSE};
497 7 : std::lock_guard<std::mutex> lock(m_sinkMutex);
498 7 : gst_segment_do_seek(&m_lastSegment, rate, seekFormat, flags, startType, start, stopType, stop,
499 : &update);
500 : }
501 : }
502 : #if GST_CHECK_VERSION(1, 18, 0)
503 4 : else if (flags & GST_SEEK_FLAG_INSTANT_RATE_CHANGE)
504 : {
505 2 : gdouble rateMultiplier = rate / m_lastSegment.rate;
506 2 : GstEvent *rateChangeEvent = gst_event_new_instant_rate_change(rateMultiplier, (GstSegmentFlags)flags);
507 2 : gst_event_set_seqnum(rateChangeEvent, gst_event_get_seqnum(event));
508 2 : gst_event_unref(event);
509 2 : if (gst_pad_send_event(m_sinkPad, rateChangeEvent) != TRUE)
510 : {
511 1 : GST_ERROR_OBJECT(m_sink, "Sending instant rate change failed.");
512 1 : return FALSE;
513 : }
514 1 : return TRUE;
515 : }
516 : #endif
517 : else
518 : {
519 2 : GST_WARNING_OBJECT(m_sink, "Seek with flags 0x%X is not supported", flags);
520 2 : gst_event_unref(event);
521 2 : return FALSE;
522 : }
523 : }
524 8 : break;
525 : }
526 : #if GST_CHECK_VERSION(1, 18, 0)
527 2 : case GST_EVENT_INSTANT_RATE_SYNC_TIME:
528 : {
529 2 : double rate{0.0};
530 2 : GstClockTime runningTime{GST_CLOCK_TIME_NONE}, upstreamRunningTime{GST_CLOCK_TIME_NONE};
531 2 : guint32 seqnum = gst_event_get_seqnum(event);
532 2 : gst_event_parse_instant_rate_sync_time(event, &rate, &runningTime, &upstreamRunningTime);
533 :
534 2 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
535 2 : if ((client) && (m_mediaPlayerManager.hasControl()))
536 : {
537 2 : GST_DEBUG_OBJECT(m_sink, "Instant playback rate change: %.2f", rate);
538 2 : m_currentInstantRateChangeSeqnum = seqnum;
539 2 : client->setPlaybackRate(rate);
540 : }
541 2 : break;
542 : }
543 : #endif
544 14 : default:
545 14 : break;
546 : }
547 :
548 24 : if (shouldForwardUpstream)
549 : {
550 24 : bool result = gst_pad_push_event(m_sinkPad, event);
551 24 : if (!result)
552 : {
553 10 : GST_DEBUG_OBJECT(m_sink, "forwarding upstream event '%s' failed", GST_EVENT_TYPE_NAME(event));
554 : }
555 :
556 24 : return result;
557 : }
558 :
559 0 : gst_event_unref(event);
560 0 : return TRUE;
561 : }
562 :
563 191 : gboolean PullModePlaybackDelegate::handleEvent(GstPad *pad, GstObject *parent, GstEvent *event)
564 : {
565 191 : GST_DEBUG_OBJECT(m_sink, "handling event %" GST_PTR_FORMAT, event);
566 191 : switch (GST_EVENT_TYPE(event))
567 : {
568 7 : case GST_EVENT_SEGMENT:
569 : {
570 7 : copySegment(event);
571 7 : setSegment();
572 7 : break;
573 : }
574 3 : case GST_EVENT_EOS:
575 : {
576 3 : std::lock_guard<std::mutex> lock(m_sinkMutex);
577 3 : m_isEos = true;
578 3 : break;
579 : }
580 145 : case GST_EVENT_CAPS:
581 : {
582 : GstCaps *caps;
583 145 : gst_event_parse_caps(event, &caps);
584 : {
585 145 : std::lock_guard<std::mutex> lock(m_sinkMutex);
586 145 : if (m_caps)
587 : {
588 4 : if (!gst_caps_is_equal(caps, m_caps))
589 : {
590 1 : gst_caps_unref(m_caps);
591 1 : m_caps = gst_caps_copy(caps);
592 : }
593 : }
594 : else
595 : {
596 141 : m_caps = gst_caps_copy(caps);
597 : }
598 145 : }
599 145 : break;
600 : }
601 1 : case GST_EVENT_SINK_MESSAGE:
602 : {
603 1 : GstMessage *message = nullptr;
604 1 : gst_event_parse_sink_message(event, &message);
605 :
606 1 : if (message)
607 : {
608 1 : gst_element_post_message(m_sink, message);
609 : }
610 :
611 1 : break;
612 : }
613 8 : case GST_EVENT_CUSTOM_DOWNSTREAM:
614 : case GST_EVENT_CUSTOM_DOWNSTREAM_OOB:
615 : {
616 8 : if (gst_event_has_name(event, "custom-instant-rate-change"))
617 : {
618 2 : GST_DEBUG_OBJECT(m_sink, "Change rate event received");
619 2 : changePlaybackRate(event);
620 : }
621 8 : break;
622 : }
623 13 : case GST_EVENT_FLUSH_START:
624 : {
625 13 : startFlushing();
626 13 : break;
627 : }
628 4 : case GST_EVENT_FLUSH_STOP:
629 : {
630 4 : gboolean resetTime{FALSE};
631 4 : gst_event_parse_flush_stop(event, &resetTime);
632 :
633 4 : stopFlushing(resetTime);
634 4 : break;
635 : }
636 3 : case GST_EVENT_STREAM_COLLECTION:
637 : {
638 3 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
639 3 : if (!client)
640 : {
641 1 : gst_event_unref(event);
642 1 : return FALSE;
643 : }
644 2 : int32_t videoStreams{0}, audioStreams{0}, textStreams{0};
645 2 : GstStreamCollection *streamCollection{nullptr};
646 2 : gst_event_parse_stream_collection(event, &streamCollection);
647 2 : guint streamsSize = gst_stream_collection_get_size(streamCollection);
648 6 : for (guint i = 0; i < streamsSize; ++i)
649 : {
650 4 : auto *stream = gst_stream_collection_get_stream(streamCollection, i);
651 4 : auto type = gst_stream_get_stream_type(stream);
652 4 : if (type & GST_STREAM_TYPE_AUDIO)
653 : {
654 2 : ++audioStreams;
655 : }
656 2 : else if (type & GST_STREAM_TYPE_VIDEO)
657 : {
658 1 : ++videoStreams;
659 : }
660 1 : else if (type & GST_STREAM_TYPE_TEXT)
661 : {
662 1 : ++textStreams;
663 : }
664 : }
665 2 : gst_object_unref(streamCollection);
666 2 : client->handleStreamCollection(audioStreams, videoStreams, textStreams);
667 2 : client->sendAllSourcesAttachedIfPossible();
668 2 : break;
669 3 : }
670 : #if GST_CHECK_VERSION(1, 18, 0)
671 4 : case GST_EVENT_INSTANT_RATE_CHANGE:
672 : {
673 4 : guint32 seqnum = gst_event_get_seqnum(event);
674 7 : if (m_lastInstantRateChangeSeqnum == seqnum || m_currentInstantRateChangeSeqnum.load() == seqnum)
675 : {
676 : /* Ignore if we already received the instant-rate-sync-time event from the pipeline */
677 2 : GST_DEBUG_OBJECT(m_sink, "Instant rate change event with seqnum %u already handled. Ignoring...", seqnum);
678 2 : break;
679 : }
680 :
681 2 : m_lastInstantRateChangeSeqnum = seqnum;
682 2 : gdouble rate{0.0};
683 2 : GstSegmentFlags flags{GST_SEGMENT_FLAG_NONE};
684 2 : gst_event_parse_instant_rate_change(event, &rate, &flags);
685 2 : GstMessage *msg = gst_message_new_instant_rate_request(GST_OBJECT_CAST(m_sink), rate);
686 2 : gst_message_set_seqnum(msg, seqnum);
687 2 : gst_element_post_message(m_sink, msg);
688 2 : break;
689 : }
690 : #endif
691 3 : default:
692 3 : break;
693 : }
694 :
695 190 : gst_event_unref(event);
696 :
697 190 : return TRUE;
698 : }
699 :
700 7 : void PullModePlaybackDelegate::copySegment(GstEvent *event)
701 : {
702 7 : std::lock_guard<std::mutex> lock(m_sinkMutex);
703 7 : gst_event_copy_segment(event, &m_lastSegment);
704 : }
705 :
706 7 : void PullModePlaybackDelegate::setSegment()
707 : {
708 7 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
709 7 : if (!client)
710 : {
711 1 : GST_ERROR_OBJECT(m_sink, "Could not get the media player client");
712 1 : return;
713 : }
714 6 : const bool kResetTime{m_lastSegment.flags == GST_SEGMENT_FLAG_RESET};
715 6 : int64_t position = static_cast<int64_t>(m_lastSegment.start);
716 6 : client->setSourcePosition(m_sourceId, position, kResetTime, m_lastSegment.applied_rate, m_lastSegment.stop);
717 6 : m_segmentSet = true;
718 7 : }
719 :
720 2 : void PullModePlaybackDelegate::changePlaybackRate(GstEvent *event)
721 : {
722 2 : const GstStructure *structure{gst_event_get_structure(event)};
723 2 : gdouble playbackRate{1.0};
724 2 : if (gst_structure_get_double(structure, "rate", &playbackRate) == TRUE)
725 : {
726 2 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
727 2 : if (client && m_mediaPlayerManager.hasControl())
728 : {
729 1 : GST_DEBUG_OBJECT(m_sink, "Instant playback rate change: %.2f", playbackRate);
730 1 : client->setPlaybackRate(playbackRate);
731 : }
732 2 : }
733 : }
734 :
735 13 : void PullModePlaybackDelegate::startFlushing()
736 : {
737 13 : std::lock_guard<std::mutex> lock(m_sinkMutex);
738 13 : if (!m_isFlushOngoing)
739 : {
740 13 : GST_INFO_OBJECT(m_sink, "Starting flushing");
741 13 : if (m_isEos)
742 : {
743 2 : GST_DEBUG_OBJECT(m_sink, "Flush will clear EOS state.");
744 2 : m_isEos = false;
745 : }
746 13 : m_isFlushOngoing = true;
747 13 : m_segmentSet = false;
748 13 : clearBuffersUnlocked();
749 : }
750 : }
751 :
752 4 : void PullModePlaybackDelegate::stopFlushing(bool resetTime)
753 : {
754 4 : GST_INFO_OBJECT(m_sink, "Stopping flushing");
755 4 : flushServer(resetTime);
756 4 : std::lock_guard<std::mutex> lock(m_sinkMutex);
757 4 : m_isFlushOngoing = false;
758 :
759 4 : if (resetTime)
760 : {
761 4 : GST_DEBUG_OBJECT(m_sink, "sending reset_time message");
762 4 : gst_element_post_message(m_sink, gst_message_new_reset_time(GST_OBJECT_CAST(m_sink), 0));
763 : }
764 : }
765 :
766 4 : void PullModePlaybackDelegate::flushServer(bool resetTime)
767 : {
768 4 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
769 4 : if (!client)
770 : {
771 1 : GST_ERROR_OBJECT(m_sink, "Could not get the media player client");
772 1 : return;
773 : }
774 :
775 3 : std::unique_lock<std::mutex> lock(m_flushMutex);
776 3 : GST_INFO_OBJECT(m_sink, "Flushing sink with sourceId %d", m_sourceId.load());
777 3 : client->flush(m_sourceId, resetTime);
778 3 : if (m_sourceAttached)
779 : {
780 2 : m_flushCondVariable.wait(lock);
781 : }
782 : else
783 : {
784 1 : GST_DEBUG_OBJECT(m_sink, "Skip waiting for flush finish - source not attached yet.");
785 : }
786 4 : }
787 :
788 32 : GstFlowReturn PullModePlaybackDelegate::handleBuffer(GstBuffer *buffer)
789 : {
790 32 : constexpr size_t kMaxInternalBuffersQueueSize = 24;
791 32 : GST_LOG_OBJECT(m_sink, "Handling buffer %p with PTS %" GST_TIME_FORMAT, buffer,
792 : GST_TIME_ARGS(GST_BUFFER_PTS(buffer)));
793 :
794 32 : std::unique_lock<std::mutex> lock(m_sinkMutex);
795 :
796 32 : if (m_samples.size() >= kMaxInternalBuffersQueueSize)
797 : {
798 1 : GST_DEBUG_OBJECT(m_sink, "Waiting for more space in buffers queue\n");
799 1 : m_needDataCondVariable.wait(lock);
800 : }
801 :
802 32 : if (m_isFlushOngoing)
803 : {
804 3 : GST_DEBUG_OBJECT(m_sink, "Discarding buffer which was received during flushing");
805 3 : gst_buffer_unref(buffer);
806 3 : return GST_FLOW_FLUSHING;
807 : }
808 :
809 29 : GstSample *sample = gst_sample_new(buffer, m_caps, &m_lastSegment, nullptr);
810 29 : if (sample)
811 29 : m_samples.push(sample);
812 : else
813 0 : GST_ERROR_OBJECT(m_sink, "Failed to create a sample");
814 :
815 29 : gst_buffer_unref(buffer);
816 :
817 29 : return GST_FLOW_OK;
818 32 : }
819 :
820 1 : GstRefSample PullModePlaybackDelegate::getFrontSample() const
821 : {
822 1 : std::lock_guard<std::mutex> lock(m_sinkMutex);
823 1 : if (!m_samples.empty())
824 : {
825 1 : GstSample *sample = m_samples.front();
826 1 : GstBuffer *buffer = gst_sample_get_buffer(sample);
827 1 : GST_LOG_OBJECT(m_sink, "Pulling buffer %p with PTS %" GST_TIME_FORMAT, buffer,
828 : GST_TIME_ARGS(GST_BUFFER_PTS(buffer)));
829 :
830 1 : return GstRefSample{sample};
831 : }
832 :
833 0 : return GstRefSample{};
834 1 : }
835 :
836 1 : void PullModePlaybackDelegate::popSample()
837 : {
838 1 : std::lock_guard<std::mutex> lock(m_sinkMutex);
839 1 : if (!m_samples.empty())
840 : {
841 1 : gst_sample_unref(m_samples.front());
842 1 : m_samples.pop();
843 : }
844 1 : m_needDataCondVariable.notify_all();
845 : }
846 :
847 0 : bool PullModePlaybackDelegate::isEos() const
848 : {
849 0 : std::lock_guard<std::mutex> lock(m_sinkMutex);
850 0 : return m_samples.empty() && m_isEos;
851 : }
852 :
853 1 : bool PullModePlaybackDelegate::isReadyToSendData() const
854 : {
855 1 : std::lock_guard<std::mutex> lock(m_sinkMutex);
856 2 : return m_isEos || m_segmentSet;
857 1 : }
858 :
859 4 : void PullModePlaybackDelegate::lostState()
860 : {
861 4 : m_isStateCommitNeeded = true;
862 4 : gst_element_lost_state(m_sink);
863 : }
864 :
865 149 : bool PullModePlaybackDelegate::attachToMediaClientAndSetStreamsNumber(const uint32_t maxVideoWidth,
866 : const uint32_t maxVideoHeight)
867 : {
868 149 : GstObject *parentObject = getOldestGstBinParent(m_sink);
869 149 : if (!m_mediaPlayerManager.attachMediaPlayerClient(parentObject, maxVideoWidth, maxVideoHeight))
870 : {
871 3 : GST_ERROR_OBJECT(m_sink, "Cannot attach the MediaPlayerClient");
872 3 : return false;
873 : }
874 :
875 146 : gchar *parentObjectName = gst_object_get_name(parentObject);
876 146 : GST_INFO_OBJECT(m_sink, "Attached media player client with parent %s(%p)", parentObjectName, parentObject);
877 146 : g_free(parentObjectName);
878 :
879 146 : return setStreamsNumber(parentObject);
880 : }
881 :
882 146 : bool PullModePlaybackDelegate::setStreamsNumber(GstObject *parentObject)
883 : {
884 146 : int32_t videoStreams{-1}, audioStreams{-1}, subtitleStreams{-1};
885 :
886 146 : GstContext *context = gst_element_get_context(m_sink, "streams-info");
887 146 : if (context)
888 : {
889 3 : GST_DEBUG_OBJECT(m_sink, "Getting number of streams from \"streams-info\" context");
890 :
891 3 : guint n_video{0}, n_audio{0}, n_text{0};
892 :
893 3 : const GstStructure *streamsInfoStructure = gst_context_get_structure(context);
894 3 : gst_structure_get_uint(streamsInfoStructure, "video-streams", &n_video);
895 3 : gst_structure_get_uint(streamsInfoStructure, "audio-streams", &n_audio);
896 3 : gst_structure_get_uint(streamsInfoStructure, "text-streams", &n_text);
897 :
898 5 : if (n_video > std::numeric_limits<int32_t>::max() || n_audio > std::numeric_limits<int32_t>::max() ||
899 2 : n_text > std::numeric_limits<int32_t>::max())
900 : {
901 1 : GST_ERROR_OBJECT(m_sink, "Number of streams is too big, video=%u, audio=%u, text=%u", n_video, n_audio,
902 : n_text);
903 1 : gst_context_unref(context);
904 1 : return false;
905 : }
906 :
907 2 : videoStreams = n_video;
908 2 : audioStreams = n_audio;
909 2 : subtitleStreams = n_text;
910 :
911 2 : gst_context_unref(context);
912 : }
913 143 : else if (getNStreamsFromParent(parentObject, videoStreams, audioStreams, subtitleStreams))
914 : {
915 2 : GST_DEBUG_OBJECT(m_sink, "Got number of streams from playbin2 properties");
916 : }
917 : else
918 : {
919 : // The default value of streams is V:1, A:1, S:0
920 : // Changing the default setting via properties is considered as DEPRECATED
921 141 : subtitleStreams = 0;
922 141 : std::lock_guard<std::mutex> lock{m_sinkMutex};
923 141 : if (m_mediaSourceType == firebolt::rialto::MediaSourceType::VIDEO)
924 : {
925 27 : videoStreams = m_numOfStreams;
926 27 : if (m_isSinglePathStream)
927 : {
928 26 : audioStreams = 0;
929 26 : subtitleStreams = 0;
930 : }
931 : }
932 114 : else if (m_mediaSourceType == firebolt::rialto::MediaSourceType::AUDIO)
933 : {
934 104 : audioStreams = m_numOfStreams;
935 104 : if (m_isSinglePathStream)
936 : {
937 103 : videoStreams = 0;
938 103 : subtitleStreams = 0;
939 : }
940 : }
941 10 : else if (m_mediaSourceType == firebolt::rialto::MediaSourceType::SUBTITLE)
942 : {
943 10 : subtitleStreams = m_numOfStreams;
944 10 : if (m_isSinglePathStream)
945 : {
946 10 : videoStreams = 0;
947 10 : audioStreams = 0;
948 : }
949 : }
950 141 : }
951 :
952 145 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
953 145 : if (!client)
954 : {
955 0 : GST_ERROR_OBJECT(m_sink, "MediaPlayerClient is nullptr");
956 0 : return false;
957 : }
958 :
959 145 : client->handleStreamCollection(audioStreams, videoStreams, subtitleStreams);
960 :
961 145 : return true;
962 : }
|