Line data Source code
1 : /*
2 : * Copyright (C) 2025 Sky UK
3 : *
4 : * This library is free software; you can redistribute it and/or
5 : * modify it under the terms of the GNU Lesser General Public
6 : * License as published by the Free Software Foundation;
7 : * version 2.1 of the License.
8 : *
9 : * This library is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 : * Lesser General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU Lesser General Public
15 : * License along with this library; if not, write to the Free Software
16 : * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 : */
18 :
19 : #include "PullModePlaybackDelegate.h"
20 : #include "ControlBackend.h"
21 : #include "GstreamerCatLog.h"
22 : #include "RialtoGStreamerMSEBaseSink.h"
23 : #include "RialtoGStreamerMSEBaseSinkPrivate.h"
24 :
25 : #define GST_CAT_DEFAULT rialtoGStreamerCat
26 :
27 : namespace
28 : {
29 288 : GstObject *getOldestGstBinParent(GstElement *element)
30 : {
31 288 : GstObject *parent = gst_object_get_parent(GST_OBJECT_CAST(element));
32 288 : GstObject *result = GST_OBJECT_CAST(element);
33 288 : if (parent)
34 : {
35 144 : if (GST_IS_BIN(parent))
36 : {
37 144 : result = getOldestGstBinParent(GST_ELEMENT_CAST(parent));
38 : }
39 144 : gst_object_unref(parent);
40 : }
41 :
42 288 : return result;
43 : }
44 :
45 6 : unsigned getGstPlayFlag(const char *nick)
46 : {
47 6 : GFlagsClass *flagsClass = static_cast<GFlagsClass *>(g_type_class_ref(g_type_from_name("GstPlayFlags")));
48 6 : GFlagsValue *flag = g_flags_get_value_by_nick(flagsClass, nick);
49 6 : return flag ? flag->value : 0;
50 : }
51 :
52 138 : bool getNStreamsFromParent(GstObject *parentObject, gint &n_video, gint &n_audio, gint &n_text)
53 : {
54 138 : if (g_object_class_find_property(G_OBJECT_GET_CLASS(parentObject), "n-video") &&
55 140 : g_object_class_find_property(G_OBJECT_GET_CLASS(parentObject), "n-audio") &&
56 2 : g_object_class_find_property(G_OBJECT_GET_CLASS(parentObject), "n-text"))
57 : {
58 2 : g_object_get(parentObject, "n-video", &n_video, "n-audio", &n_audio, "n-text", &n_text, nullptr);
59 :
60 2 : if (g_object_class_find_property(G_OBJECT_GET_CLASS(parentObject), "flags"))
61 : {
62 2 : guint flags = 0;
63 2 : g_object_get(parentObject, "flags", &flags, nullptr);
64 2 : n_video = (flags & getGstPlayFlag("video")) ? n_video : 0;
65 2 : n_audio = (flags & getGstPlayFlag("audio")) ? n_audio : 0;
66 2 : n_text = (flags & getGstPlayFlag("text")) ? n_text : 0;
67 : }
68 :
69 2 : return true;
70 : }
71 :
72 136 : return false;
73 : }
74 : } // namespace
75 :
76 262 : PullModePlaybackDelegate::PullModePlaybackDelegate(GstElement *sink) : m_sink{sink}
77 : {
78 262 : RialtoMSEBaseSink *baseSink = RIALTO_MSE_BASE_SINK(sink);
79 262 : m_sinkPad = baseSink->priv->m_sinkPad;
80 262 : m_rialtoControlClient = std::make_unique<firebolt::rialto::client::ControlBackend>();
81 262 : gst_segment_init(&m_lastSegment, GST_FORMAT_TIME);
82 : }
83 :
84 262 : PullModePlaybackDelegate::~PullModePlaybackDelegate()
85 : {
86 262 : if (m_caps)
87 136 : gst_caps_unref(m_caps);
88 262 : clearBuffersUnlocked();
89 : }
90 :
91 415 : void PullModePlaybackDelegate::clearBuffersUnlocked()
92 : {
93 415 : m_isFlushOngoing = true;
94 415 : m_needDataCondVariable.notify_all();
95 443 : while (!m_samples.empty())
96 : {
97 28 : GstSample *sample = m_samples.front();
98 28 : m_samples.pop();
99 28 : gst_sample_unref(sample);
100 : }
101 415 : }
102 :
103 187 : void PullModePlaybackDelegate::setSourceId(int32_t sourceId)
104 : {
105 187 : std::unique_lock lock{m_sinkMutex};
106 187 : m_sourceId = sourceId;
107 : }
108 :
109 3 : void PullModePlaybackDelegate::handleEos()
110 : {
111 3 : GstState currentState = GST_STATE(m_sink);
112 3 : if ((currentState != GST_STATE_PAUSED) && (currentState != GST_STATE_PLAYING))
113 : {
114 1 : GST_ERROR_OBJECT(m_sink, "Sink cannot post a EOS message in state '%s', posting an error instead",
115 : gst_element_state_get_name(currentState));
116 :
117 1 : const char *errMessage = "Rialto sinks received EOS in non-playing state";
118 1 : GError *gError{g_error_new_literal(GST_STREAM_ERROR, 0, errMessage)};
119 1 : gst_element_post_message(m_sink, gst_message_new_error(GST_OBJECT_CAST(m_sink), gError, errMessage));
120 1 : g_error_free(gError);
121 : }
122 2 : else if (!m_isFlushOngoing)
123 : {
124 1 : gst_element_post_message(m_sink, gst_message_new_eos(GST_OBJECT_CAST(m_sink)));
125 : }
126 : else
127 : {
128 1 : GST_WARNING_OBJECT(m_sink, "Skip sending eos message - flush is ongoing...");
129 : }
130 3 : }
131 :
132 2 : void PullModePlaybackDelegate::handleFlushCompleted()
133 : {
134 2 : GST_INFO_OBJECT(m_sink, "Flush completed");
135 2 : std::unique_lock<std::mutex> lock(m_flushMutex);
136 2 : m_flushCondVariable.notify_all();
137 : }
138 :
139 57 : void PullModePlaybackDelegate::handleStateChanged(firebolt::rialto::PlaybackState state)
140 : {
141 57 : GstState current = GST_STATE(m_sink);
142 57 : GstState next = GST_STATE_NEXT(m_sink);
143 57 : GstState pending = GST_STATE_PENDING(m_sink);
144 57 : GstState postNext = next == pending ? GST_STATE_VOID_PENDING : pending;
145 :
146 57 : GST_DEBUG_OBJECT(m_sink,
147 : "Received server's state change to %u. Sink's states are: current state: %s next state: %s "
148 : "pending state: %s, last return state %s",
149 : static_cast<uint32_t>(state), gst_element_state_get_name(current),
150 : gst_element_state_get_name(next), gst_element_state_get_name(pending),
151 : gst_element_state_change_return_get_name(GST_STATE_RETURN(m_sink)));
152 :
153 57 : if (m_isStateCommitNeeded)
154 : {
155 53 : if ((state == firebolt::rialto::PlaybackState::PAUSED && next == GST_STATE_PAUSED) ||
156 11 : (state == firebolt::rialto::PlaybackState::PLAYING && next == GST_STATE_PLAYING))
157 : {
158 34 : GST_STATE(m_sink) = next;
159 34 : GST_STATE_NEXT(m_sink) = postNext;
160 34 : GST_STATE_PENDING(m_sink) = GST_STATE_VOID_PENDING;
161 34 : GST_STATE_RETURN(m_sink) = GST_STATE_CHANGE_SUCCESS;
162 :
163 34 : GST_INFO_OBJECT(m_sink, "Async state transition to state %s done", gst_element_state_get_name(next));
164 :
165 34 : gst_element_post_message(m_sink,
166 34 : gst_message_new_state_changed(GST_OBJECT_CAST(m_sink), current, next, pending));
167 34 : postAsyncDone();
168 : }
169 : /* Immediately transition to PLAYING when prerolled and PLAY is requested */
170 19 : else if (state == firebolt::rialto::PlaybackState::PAUSED && current == GST_STATE_PAUSED &&
171 : next == GST_STATE_PLAYING)
172 : {
173 1 : GST_INFO_OBJECT(m_sink, "Async state transition to PAUSED done. Transitioning to PLAYING");
174 1 : changeState(GST_STATE_CHANGE_PAUSED_TO_PLAYING);
175 : }
176 : }
177 57 : }
178 :
179 821 : GstStateChangeReturn PullModePlaybackDelegate::changeState(GstStateChange transition)
180 : {
181 821 : GstState current_state = GST_STATE_TRANSITION_CURRENT(transition);
182 821 : GstState next_state = GST_STATE_TRANSITION_NEXT(transition);
183 821 : GST_INFO_OBJECT(m_sink, "State change: (%s) -> (%s)", gst_element_state_get_name(current_state),
184 : gst_element_state_get_name(next_state));
185 :
186 821 : GstStateChangeReturn status = GST_STATE_CHANGE_SUCCESS;
187 821 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
188 :
189 821 : switch (transition)
190 : {
191 262 : case GST_STATE_CHANGE_NULL_TO_READY:
192 262 : if (!m_sinkPad)
193 : {
194 0 : GST_ERROR_OBJECT(m_sink, "Cannot start, because there's no sink pad");
195 0 : return GST_STATE_CHANGE_FAILURE;
196 : }
197 262 : if (!m_rialtoControlClient->waitForRunning())
198 : {
199 0 : GST_ERROR_OBJECT(m_sink, "Control: Rialto client cannot reach running state");
200 0 : return GST_STATE_CHANGE_FAILURE;
201 : }
202 262 : GST_INFO_OBJECT(m_sink, "Control: Rialto client reached running state");
203 262 : break;
204 140 : case GST_STATE_CHANGE_READY_TO_PAUSED:
205 : {
206 140 : if (!client)
207 : {
208 0 : GST_ERROR_OBJECT(m_sink, "Cannot get the media player client object");
209 0 : return GST_STATE_CHANGE_FAILURE;
210 : }
211 :
212 140 : m_isFlushOngoing = false;
213 :
214 140 : StateChangeResult result = client->pause(m_sourceId);
215 140 : if (result == StateChangeResult::SUCCESS_ASYNC || result == StateChangeResult::NOT_ATTACHED)
216 : {
217 : // NOT_ATTACHED is not a problem here, because source will be attached later when GST_EVENT_CAPS is received
218 140 : if (result == StateChangeResult::NOT_ATTACHED)
219 : {
220 140 : postAsyncStart();
221 : }
222 140 : status = GST_STATE_CHANGE_ASYNC;
223 : }
224 :
225 140 : break;
226 : }
227 6 : case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
228 : {
229 6 : if (!client)
230 : {
231 0 : GST_ERROR_OBJECT(m_sink, "Cannot get the media player client object");
232 0 : return GST_STATE_CHANGE_FAILURE;
233 : }
234 :
235 6 : StateChangeResult result = client->play(m_sourceId);
236 6 : if (result == StateChangeResult::SUCCESS_ASYNC)
237 : {
238 6 : status = GST_STATE_CHANGE_ASYNC;
239 : }
240 0 : else if (result == StateChangeResult::NOT_ATTACHED)
241 : {
242 0 : GST_ERROR_OBJECT(m_sink, "Failed to change state to playing");
243 0 : return GST_STATE_CHANGE_FAILURE;
244 : }
245 :
246 6 : break;
247 : }
248 6 : case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
249 : {
250 6 : if (!client)
251 : {
252 0 : GST_ERROR_OBJECT(m_sink, "Cannot get the media player client object");
253 0 : return GST_STATE_CHANGE_FAILURE;
254 : }
255 :
256 6 : StateChangeResult result = client->pause(m_sourceId);
257 6 : if (result == StateChangeResult::SUCCESS_ASYNC)
258 : {
259 6 : status = GST_STATE_CHANGE_ASYNC;
260 : }
261 0 : else if (result == StateChangeResult::NOT_ATTACHED)
262 : {
263 0 : GST_ERROR_OBJECT(m_sink, "Failed to change state to paused");
264 0 : return GST_STATE_CHANGE_FAILURE;
265 : }
266 :
267 6 : break;
268 : }
269 140 : case GST_STATE_CHANGE_PAUSED_TO_READY:
270 140 : if (!client)
271 : {
272 0 : GST_ERROR_OBJECT(m_sink, "Cannot get the media player client object");
273 0 : return GST_STATE_CHANGE_FAILURE;
274 : }
275 :
276 140 : if (m_isStateCommitNeeded)
277 : {
278 118 : GST_DEBUG_OBJECT(m_sink, "Sending async_done in PAUSED->READY transition");
279 118 : postAsyncDone();
280 : }
281 :
282 140 : client->removeSource(m_sourceId);
283 : {
284 140 : std::lock_guard<std::mutex> lock(m_sinkMutex);
285 140 : clearBuffersUnlocked();
286 140 : m_sourceAttached = false;
287 : }
288 140 : break;
289 262 : case GST_STATE_CHANGE_READY_TO_NULL:
290 : // Playback will be stopped once all sources are finished and ref count
291 : // of the media pipeline object reaches 0
292 262 : m_mediaPlayerManager.releaseMediaPlayerClient();
293 262 : m_rialtoControlClient->removeControlBackend();
294 262 : break;
295 5 : default:
296 5 : break;
297 : }
298 :
299 821 : return status;
300 : }
301 :
302 3 : void PullModePlaybackDelegate::handleError(firebolt::rialto::PlaybackError error)
303 : {
304 3 : GError *gError = nullptr;
305 3 : std::string message;
306 3 : switch (error)
307 : {
308 1 : case firebolt::rialto::PlaybackError::DECRYPTION:
309 : {
310 1 : message = "Rialto dropped a frame that failed to decrypt";
311 1 : gError = g_error_new_literal(GST_STREAM_ERROR, GST_STREAM_ERROR_DECRYPT, message.c_str());
312 1 : break;
313 : }
314 2 : case firebolt::rialto::PlaybackError::UNKNOWN:
315 : default:
316 : {
317 2 : message = "Rialto server playback failed";
318 2 : gError = g_error_new_literal(GST_STREAM_ERROR, 0, message.c_str());
319 2 : break;
320 : }
321 : }
322 6 : gst_element_post_message(GST_ELEMENT_CAST(m_sink),
323 3 : gst_message_new_error(GST_OBJECT_CAST(m_sink), gError, message.c_str()));
324 3 : g_error_free(gError);
325 : }
326 :
327 314 : void PullModePlaybackDelegate::postAsyncStart()
328 : {
329 314 : m_isStateCommitNeeded = true;
330 314 : gst_element_post_message(GST_ELEMENT_CAST(m_sink), gst_message_new_async_start(GST_OBJECT(m_sink)));
331 : }
332 :
333 152 : void PullModePlaybackDelegate::postAsyncDone()
334 : {
335 152 : m_isStateCommitNeeded = false;
336 152 : gst_element_post_message(m_sink, gst_message_new_async_done(GST_OBJECT_CAST(m_sink), GST_CLOCK_TIME_NONE));
337 : }
338 :
339 303 : void PullModePlaybackDelegate::setProperty(const Property &type, const GValue *value)
340 : {
341 303 : switch (type)
342 : {
343 151 : case Property::IsSinglePathStream:
344 : {
345 151 : std::lock_guard<std::mutex> lock(m_sinkMutex);
346 151 : m_isSinglePathStream = g_value_get_boolean(value) != FALSE;
347 151 : break;
348 : }
349 151 : case Property::NumberOfStreams:
350 : {
351 151 : std::lock_guard<std::mutex> lock(m_sinkMutex);
352 151 : m_numOfStreams = g_value_get_int(value);
353 151 : break;
354 : }
355 1 : case Property::HasDrm:
356 : {
357 1 : std::lock_guard<std::mutex> lock(m_sinkMutex);
358 1 : m_hasDrm = g_value_get_boolean(value) != FALSE;
359 1 : break;
360 : }
361 0 : default:
362 : {
363 0 : break;
364 : }
365 : }
366 303 : }
367 :
368 5 : void PullModePlaybackDelegate::getProperty(const Property &type, GValue *value)
369 : {
370 5 : switch (type)
371 : {
372 1 : case Property::IsSinglePathStream:
373 : {
374 1 : std::lock_guard<std::mutex> lock(m_sinkMutex);
375 1 : g_value_set_boolean(value, m_isSinglePathStream ? TRUE : FALSE);
376 1 : break;
377 : }
378 1 : case Property::NumberOfStreams:
379 : {
380 1 : std::lock_guard<std::mutex> lock(m_sinkMutex);
381 1 : g_value_set_int(value, m_numOfStreams);
382 1 : break;
383 : }
384 1 : case Property::HasDrm:
385 : {
386 1 : std::lock_guard<std::mutex> lock(m_sinkMutex);
387 1 : g_value_set_boolean(value, m_hasDrm ? TRUE : FALSE);
388 1 : break;
389 : }
390 2 : case Property::Stats:
391 : {
392 2 : std::lock_guard<std::mutex> lock(m_sinkMutex);
393 2 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
394 2 : if (!client)
395 : {
396 1 : GST_ERROR_OBJECT(m_sink, "Could not get the media player client");
397 1 : break;
398 : }
399 :
400 1 : guint64 totalVideoFrames{0};
401 1 : guint64 droppedVideoFrames{0};
402 1 : if (client->getStats(m_sourceId, totalVideoFrames, droppedVideoFrames))
403 : {
404 1 : GstStructure *stats{gst_structure_new("stats", "rendered", G_TYPE_UINT64, totalVideoFrames, "dropped",
405 : G_TYPE_UINT64, droppedVideoFrames, nullptr)};
406 1 : g_value_set_pointer(value, stats);
407 : }
408 : else
409 : {
410 0 : GST_ERROR_OBJECT(m_sink, "No stats returned from client");
411 : }
412 3 : }
413 : default:
414 : {
415 1 : break;
416 : }
417 : }
418 5 : }
419 :
420 17 : std::optional<gboolean> PullModePlaybackDelegate::handleQuery(GstQuery *query) const
421 : {
422 17 : GST_DEBUG_OBJECT(m_sink, "handling query '%s'", GST_QUERY_TYPE_NAME(query));
423 17 : switch (GST_QUERY_TYPE(query))
424 : {
425 1 : case GST_QUERY_SEEKING:
426 : {
427 : GstFormat fmt;
428 1 : gst_query_parse_seeking(query, &fmt, NULL, NULL, NULL);
429 1 : gst_query_set_seeking(query, fmt, FALSE, 0, -1);
430 1 : return TRUE;
431 : }
432 5 : case GST_QUERY_POSITION:
433 : {
434 5 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
435 5 : if (!client)
436 : {
437 1 : return FALSE;
438 : }
439 :
440 : GstFormat fmt;
441 4 : gst_query_parse_position(query, &fmt, NULL);
442 4 : switch (fmt)
443 : {
444 3 : case GST_FORMAT_TIME:
445 : {
446 3 : gint64 position = client->getPosition(m_sourceId);
447 3 : GST_DEBUG_OBJECT(m_sink, "Queried position is %" GST_TIME_FORMAT, GST_TIME_ARGS(position));
448 3 : if (position < 0)
449 : {
450 2 : return FALSE;
451 : }
452 :
453 1 : gst_query_set_position(query, fmt, position);
454 1 : break;
455 : }
456 1 : default:
457 1 : break;
458 : }
459 2 : return TRUE;
460 5 : }
461 2 : case GST_QUERY_SEGMENT:
462 : {
463 2 : std::lock_guard<std::mutex> lock(m_sinkMutex);
464 2 : GstFormat format{m_lastSegment.format};
465 2 : gint64 start{static_cast<gint64>(gst_segment_to_stream_time(&m_lastSegment, format, m_lastSegment.start))};
466 2 : gint64 stop{0};
467 2 : if (m_lastSegment.stop == GST_CLOCK_TIME_NONE)
468 : {
469 2 : stop = m_lastSegment.duration;
470 : }
471 : else
472 : {
473 0 : stop = gst_segment_to_stream_time(&m_lastSegment, format, m_lastSegment.stop);
474 : }
475 2 : gst_query_set_segment(query, m_lastSegment.rate, format, start, stop);
476 2 : return TRUE;
477 : }
478 9 : default:
479 9 : break;
480 : }
481 9 : return std::nullopt;
482 : }
483 :
484 23 : gboolean PullModePlaybackDelegate::handleSendEvent(GstEvent *event)
485 : {
486 23 : GST_DEBUG_OBJECT(m_sink, "handling event '%s'", GST_EVENT_TYPE_NAME(event));
487 23 : bool shouldForwardUpstream = GST_EVENT_IS_UPSTREAM(event);
488 :
489 23 : switch (GST_EVENT_TYPE(event))
490 : {
491 12 : case GST_EVENT_SEEK:
492 : {
493 12 : gdouble rate{1.0};
494 12 : GstFormat seekFormat{GST_FORMAT_UNDEFINED};
495 12 : GstSeekFlags flags{GST_SEEK_FLAG_NONE};
496 12 : GstSeekType startType{GST_SEEK_TYPE_NONE}, stopType{GST_SEEK_TYPE_NONE};
497 12 : gint64 start{0}, stop{0};
498 12 : if (event)
499 : {
500 12 : gst_event_parse_seek(event, &rate, &seekFormat, &flags, &startType, &start, &stopType, &stop);
501 :
502 12 : if (flags & GST_SEEK_FLAG_FLUSH)
503 : {
504 9 : if (seekFormat == GST_FORMAT_TIME && startType == GST_SEEK_TYPE_END)
505 : {
506 1 : GST_ERROR_OBJECT(m_sink, "GST_SEEK_TYPE_END seek is not supported");
507 1 : gst_event_unref(event);
508 4 : return FALSE;
509 : }
510 : // Update last segment
511 8 : if (seekFormat == GST_FORMAT_TIME)
512 : {
513 7 : gboolean update{FALSE};
514 7 : std::lock_guard<std::mutex> lock(m_sinkMutex);
515 7 : gst_segment_do_seek(&m_lastSegment, rate, seekFormat, flags, startType, start, stopType, stop,
516 : &update);
517 : }
518 : }
519 : #if GST_CHECK_VERSION(1, 18, 0)
520 3 : else if (flags & GST_SEEK_FLAG_INSTANT_RATE_CHANGE)
521 : {
522 2 : gdouble rateMultiplier = rate / m_lastSegment.rate;
523 2 : GstEvent *rateChangeEvent = gst_event_new_instant_rate_change(rateMultiplier, (GstSegmentFlags)flags);
524 2 : gst_event_set_seqnum(rateChangeEvent, gst_event_get_seqnum(event));
525 2 : gst_event_unref(event);
526 2 : if (gst_pad_send_event(m_sinkPad, rateChangeEvent) != TRUE)
527 : {
528 1 : GST_ERROR_OBJECT(m_sink, "Sending instant rate change failed.");
529 1 : return FALSE;
530 : }
531 1 : return TRUE;
532 : }
533 : #endif
534 : else
535 : {
536 1 : GST_WARNING_OBJECT(m_sink, "Seek with flags 0x%X is not supported", flags);
537 1 : gst_event_unref(event);
538 1 : return FALSE;
539 : }
540 : }
541 8 : break;
542 : }
543 : #if GST_CHECK_VERSION(1, 18, 0)
544 2 : case GST_EVENT_INSTANT_RATE_SYNC_TIME:
545 : {
546 2 : double rate{0.0};
547 2 : GstClockTime runningTime{GST_CLOCK_TIME_NONE}, upstreamRunningTime{GST_CLOCK_TIME_NONE};
548 2 : guint32 seqnum = gst_event_get_seqnum(event);
549 2 : gst_event_parse_instant_rate_sync_time(event, &rate, &runningTime, &upstreamRunningTime);
550 :
551 2 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
552 2 : if ((client) && (m_mediaPlayerManager.hasControl()))
553 : {
554 2 : GST_DEBUG_OBJECT(m_sink, "Instant playback rate change: %.2f", rate);
555 2 : m_currentInstantRateChangeSeqnum = seqnum;
556 2 : client->setPlaybackRate(rate);
557 : }
558 2 : break;
559 : }
560 : #endif
561 9 : default:
562 9 : break;
563 : }
564 :
565 19 : if (shouldForwardUpstream)
566 : {
567 19 : bool result = gst_pad_push_event(m_sinkPad, event);
568 19 : if (!result)
569 : {
570 10 : GST_DEBUG_OBJECT(m_sink, "forwarding upstream event '%s' failed", GST_EVENT_TYPE_NAME(event));
571 : }
572 :
573 19 : return result;
574 : }
575 :
576 0 : gst_event_unref(event);
577 0 : return TRUE;
578 : }
579 :
580 188 : gboolean PullModePlaybackDelegate::handleEvent(GstEvent *event)
581 : {
582 188 : GST_DEBUG_OBJECT(m_sink, "handling event %" GST_PTR_FORMAT, event);
583 188 : switch (GST_EVENT_TYPE(event))
584 : {
585 7 : case GST_EVENT_SEGMENT:
586 : {
587 7 : copySegment(event);
588 7 : setSegment();
589 7 : break;
590 : }
591 4 : case GST_EVENT_EOS:
592 : {
593 4 : std::lock_guard<std::mutex> lock(m_sinkMutex);
594 4 : m_isEos = true;
595 4 : break;
596 : }
597 140 : case GST_EVENT_CAPS:
598 : {
599 : GstCaps *caps;
600 140 : gst_event_parse_caps(event, &caps);
601 : {
602 140 : std::lock_guard<std::mutex> lock(m_sinkMutex);
603 140 : if (m_caps)
604 : {
605 4 : if (!gst_caps_is_equal(caps, m_caps))
606 : {
607 1 : gst_caps_unref(m_caps);
608 1 : m_caps = gst_caps_copy(caps);
609 : }
610 : }
611 : else
612 : {
613 136 : m_caps = gst_caps_copy(caps);
614 : }
615 140 : }
616 140 : break;
617 : }
618 1 : case GST_EVENT_SINK_MESSAGE:
619 : {
620 1 : GstMessage *message = nullptr;
621 1 : gst_event_parse_sink_message(event, &message);
622 :
623 1 : if (message)
624 : {
625 1 : gst_element_post_message(m_sink, message);
626 : }
627 :
628 1 : break;
629 : }
630 10 : case GST_EVENT_CUSTOM_DOWNSTREAM:
631 : case GST_EVENT_CUSTOM_DOWNSTREAM_OOB:
632 : {
633 10 : if (gst_event_has_name(event, "custom-instant-rate-change"))
634 : {
635 2 : GST_DEBUG_OBJECT(m_sink, "Change rate event received");
636 2 : changePlaybackRate(event);
637 : }
638 10 : break;
639 : }
640 13 : case GST_EVENT_FLUSH_START:
641 : {
642 13 : startFlushing();
643 13 : break;
644 : }
645 3 : case GST_EVENT_FLUSH_STOP:
646 : {
647 3 : gboolean resetTime{FALSE};
648 3 : gst_event_parse_flush_stop(event, &resetTime);
649 :
650 3 : stopFlushing(resetTime);
651 3 : break;
652 : }
653 3 : case GST_EVENT_STREAM_COLLECTION:
654 : {
655 3 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
656 3 : if (!client)
657 : {
658 1 : gst_event_unref(event);
659 1 : return FALSE;
660 : }
661 2 : int32_t videoStreams{0}, audioStreams{0}, textStreams{0};
662 2 : GstStreamCollection *streamCollection{nullptr};
663 2 : gst_event_parse_stream_collection(event, &streamCollection);
664 2 : guint streamsSize = gst_stream_collection_get_size(streamCollection);
665 6 : for (guint i = 0; i < streamsSize; ++i)
666 : {
667 4 : auto *stream = gst_stream_collection_get_stream(streamCollection, i);
668 4 : auto type = gst_stream_get_stream_type(stream);
669 4 : if (type & GST_STREAM_TYPE_AUDIO)
670 : {
671 2 : ++audioStreams;
672 : }
673 2 : else if (type & GST_STREAM_TYPE_VIDEO)
674 : {
675 1 : ++videoStreams;
676 : }
677 1 : else if (type & GST_STREAM_TYPE_TEXT)
678 : {
679 1 : ++textStreams;
680 : }
681 : }
682 2 : gst_object_unref(streamCollection);
683 2 : client->handleStreamCollection(audioStreams, videoStreams, textStreams);
684 2 : client->sendAllSourcesAttachedIfPossible();
685 2 : break;
686 3 : }
687 : #if GST_CHECK_VERSION(1, 18, 0)
688 4 : case GST_EVENT_INSTANT_RATE_CHANGE:
689 : {
690 4 : guint32 seqnum = gst_event_get_seqnum(event);
691 7 : if (m_lastInstantRateChangeSeqnum == seqnum || m_currentInstantRateChangeSeqnum.load() == seqnum)
692 : {
693 : /* Ignore if we already received the instant-rate-sync-time event from the pipeline */
694 2 : GST_DEBUG_OBJECT(m_sink, "Instant rate change event with seqnum %u already handled. Ignoring...", seqnum);
695 2 : break;
696 : }
697 :
698 2 : m_lastInstantRateChangeSeqnum = seqnum;
699 2 : gdouble rate{0.0};
700 2 : GstSegmentFlags flags{GST_SEGMENT_FLAG_NONE};
701 2 : gst_event_parse_instant_rate_change(event, &rate, &flags);
702 2 : GstMessage *msg = gst_message_new_instant_rate_request(GST_OBJECT_CAST(m_sink), rate);
703 2 : gst_message_set_seqnum(msg, seqnum);
704 2 : gst_element_post_message(m_sink, msg);
705 2 : break;
706 : }
707 : #endif
708 3 : default:
709 3 : break;
710 : }
711 :
712 187 : gst_event_unref(event);
713 :
714 187 : return TRUE;
715 : }
716 :
717 7 : void PullModePlaybackDelegate::copySegment(GstEvent *event)
718 : {
719 7 : std::lock_guard<std::mutex> lock(m_sinkMutex);
720 7 : gst_event_copy_segment(event, &m_lastSegment);
721 : }
722 :
723 7 : void PullModePlaybackDelegate::setSegment()
724 : {
725 7 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
726 7 : if (!client)
727 : {
728 1 : GST_ERROR_OBJECT(m_sink, "Could not get the media player client");
729 1 : return;
730 : }
731 6 : const bool kResetTime{m_lastSegment.flags == GST_SEGMENT_FLAG_RESET};
732 6 : int64_t position = static_cast<int64_t>(m_lastSegment.start);
733 : {
734 6 : std::unique_lock lock{m_sinkMutex};
735 6 : m_initialPositionSet = true;
736 6 : if (m_queuedOffset)
737 : {
738 1 : position = m_queuedOffset.value();
739 1 : m_queuedOffset.reset();
740 : }
741 6 : }
742 6 : client->setSourcePosition(m_sourceId, position, kResetTime, m_lastSegment.applied_rate, m_lastSegment.stop);
743 7 : }
744 :
745 2 : void PullModePlaybackDelegate::changePlaybackRate(GstEvent *event)
746 : {
747 2 : const GstStructure *structure{gst_event_get_structure(event)};
748 2 : gdouble playbackRate{1.0};
749 2 : if (gst_structure_get_double(structure, "rate", &playbackRate) == TRUE)
750 : {
751 2 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
752 2 : if (client && m_mediaPlayerManager.hasControl())
753 : {
754 1 : GST_DEBUG_OBJECT(m_sink, "Instant playback rate change: %.2f", playbackRate);
755 1 : client->setPlaybackRate(playbackRate);
756 : }
757 2 : }
758 : }
759 :
760 13 : void PullModePlaybackDelegate::startFlushing()
761 : {
762 13 : std::lock_guard<std::mutex> lock(m_sinkMutex);
763 13 : if (!m_isFlushOngoing)
764 : {
765 13 : GST_INFO_OBJECT(m_sink, "Starting flushing");
766 13 : if (m_isEos)
767 : {
768 2 : GST_DEBUG_OBJECT(m_sink, "Flush will clear EOS state.");
769 2 : m_isEos = false;
770 : }
771 13 : m_isFlushOngoing = true;
772 : // We expect to receive a new gst segment after flush
773 13 : m_initialPositionSet = false;
774 13 : clearBuffersUnlocked();
775 : }
776 : }
777 :
778 3 : void PullModePlaybackDelegate::stopFlushing(bool resetTime)
779 : {
780 3 : GST_INFO_OBJECT(m_sink, "Stopping flushing");
781 3 : flushServer(resetTime);
782 3 : std::lock_guard<std::mutex> lock(m_sinkMutex);
783 3 : m_isFlushOngoing = false;
784 :
785 3 : if (resetTime)
786 : {
787 3 : GST_DEBUG_OBJECT(m_sink, "sending reset_time message");
788 3 : gst_element_post_message(m_sink, gst_message_new_reset_time(GST_OBJECT_CAST(m_sink), 0));
789 : }
790 : }
791 :
792 3 : void PullModePlaybackDelegate::flushServer(bool resetTime)
793 : {
794 3 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
795 3 : if (!client)
796 : {
797 1 : GST_ERROR_OBJECT(m_sink, "Could not get the media player client");
798 1 : return;
799 : }
800 :
801 2 : std::unique_lock<std::mutex> lock(m_flushMutex);
802 2 : GST_INFO_OBJECT(m_sink, "Flushing sink with sourceId %d", m_sourceId.load());
803 2 : client->flush(m_sourceId, resetTime);
804 2 : if (m_sourceAttached)
805 : {
806 1 : m_flushCondVariable.wait(lock);
807 : }
808 : else
809 : {
810 1 : GST_DEBUG_OBJECT(m_sink, "Skip waiting for flush finish - source not attached yet.");
811 : }
812 3 : }
813 :
814 32 : GstFlowReturn PullModePlaybackDelegate::handleBuffer(GstBuffer *buffer)
815 : {
816 32 : constexpr size_t kMaxInternalBuffersQueueSize = 24;
817 32 : GST_LOG_OBJECT(m_sink, "Handling buffer %p with PTS %" GST_TIME_FORMAT, buffer,
818 : GST_TIME_ARGS(GST_BUFFER_PTS(buffer)));
819 :
820 32 : std::unique_lock<std::mutex> lock(m_sinkMutex);
821 :
822 32 : if (m_samples.size() >= kMaxInternalBuffersQueueSize)
823 : {
824 1 : GST_DEBUG_OBJECT(m_sink, "Waiting for more space in buffers queue\n");
825 1 : m_needDataCondVariable.wait(lock);
826 : }
827 :
828 32 : if (m_isFlushOngoing)
829 : {
830 3 : GST_DEBUG_OBJECT(m_sink, "Discarding buffer which was received during flushing");
831 3 : gst_buffer_unref(buffer);
832 3 : return GST_FLOW_FLUSHING;
833 : }
834 :
835 29 : GstSample *sample = gst_sample_new(buffer, m_caps, &m_lastSegment, nullptr);
836 29 : if (sample)
837 29 : m_samples.push(sample);
838 : else
839 0 : GST_ERROR_OBJECT(m_sink, "Failed to create a sample");
840 :
841 29 : gst_buffer_unref(buffer);
842 :
843 29 : return GST_FLOW_OK;
844 32 : }
845 :
846 2 : GstRefSample PullModePlaybackDelegate::getFrontSample() const
847 : {
848 2 : std::lock_guard<std::mutex> lock(m_sinkMutex);
849 2 : if (!m_samples.empty())
850 : {
851 0 : GstSample *sample = m_samples.front();
852 0 : GstBuffer *buffer = gst_sample_get_buffer(sample);
853 0 : GST_LOG_OBJECT(m_sink, "Pulling buffer %p with PTS %" GST_TIME_FORMAT, buffer,
854 : GST_TIME_ARGS(GST_BUFFER_PTS(buffer)));
855 :
856 0 : return GstRefSample{sample};
857 : }
858 :
859 2 : return GstRefSample{};
860 : }
861 :
862 1 : void PullModePlaybackDelegate::popSample()
863 : {
864 1 : std::lock_guard<std::mutex> lock(m_sinkMutex);
865 1 : if (!m_samples.empty())
866 : {
867 1 : gst_sample_unref(m_samples.front());
868 1 : m_samples.pop();
869 : }
870 1 : m_needDataCondVariable.notify_all();
871 : }
872 :
873 7 : bool PullModePlaybackDelegate::isEos() const
874 : {
875 7 : std::lock_guard<std::mutex> lock(m_sinkMutex);
876 14 : return m_samples.empty() && m_isEos;
877 7 : }
878 :
879 7 : void PullModePlaybackDelegate::lostState()
880 : {
881 7 : m_isStateCommitNeeded = true;
882 7 : gst_element_lost_state(m_sink);
883 : }
884 :
885 144 : bool PullModePlaybackDelegate::attachToMediaClientAndSetStreamsNumber(const uint32_t maxVideoWidth,
886 : const uint32_t maxVideoHeight)
887 : {
888 144 : GstObject *parentObject = getOldestGstBinParent(m_sink);
889 144 : if (!m_mediaPlayerManager.attachMediaPlayerClient(parentObject, maxVideoWidth, maxVideoHeight))
890 : {
891 3 : GST_ERROR_OBJECT(m_sink, "Cannot attach the MediaPlayerClient");
892 3 : return false;
893 : }
894 :
895 141 : gchar *parentObjectName = gst_object_get_name(parentObject);
896 141 : GST_INFO_OBJECT(m_sink, "Attached media player client with parent %s(%p)", parentObjectName, parentObject);
897 141 : g_free(parentObjectName);
898 :
899 141 : return setStreamsNumber(parentObject);
900 : }
901 :
902 141 : bool PullModePlaybackDelegate::setStreamsNumber(GstObject *parentObject)
903 : {
904 141 : int32_t videoStreams{-1}, audioStreams{-1}, subtitleStreams{-1};
905 :
906 141 : GstContext *context = gst_element_get_context(m_sink, "streams-info");
907 141 : if (context)
908 : {
909 3 : GST_DEBUG_OBJECT(m_sink, "Getting number of streams from \"streams-info\" context");
910 :
911 3 : guint n_video{0}, n_audio{0}, n_text{0};
912 :
913 3 : const GstStructure *streamsInfoStructure = gst_context_get_structure(context);
914 3 : gst_structure_get_uint(streamsInfoStructure, "video-streams", &n_video);
915 3 : gst_structure_get_uint(streamsInfoStructure, "audio-streams", &n_audio);
916 3 : gst_structure_get_uint(streamsInfoStructure, "text-streams", &n_text);
917 :
918 5 : if (n_video > std::numeric_limits<int32_t>::max() || n_audio > std::numeric_limits<int32_t>::max() ||
919 2 : n_text > std::numeric_limits<int32_t>::max())
920 : {
921 1 : GST_ERROR_OBJECT(m_sink, "Number of streams is too big, video=%u, audio=%u, text=%u", n_video, n_audio,
922 : n_text);
923 1 : gst_context_unref(context);
924 1 : return false;
925 : }
926 :
927 2 : videoStreams = n_video;
928 2 : audioStreams = n_audio;
929 2 : subtitleStreams = n_text;
930 :
931 2 : gst_context_unref(context);
932 : }
933 138 : else if (getNStreamsFromParent(parentObject, videoStreams, audioStreams, subtitleStreams))
934 : {
935 2 : GST_DEBUG_OBJECT(m_sink, "Got number of streams from playbin2 properties");
936 : }
937 : else
938 : {
939 : // The default value of streams is V:1, A:1, S:0
940 : // Changing the default setting via properties is considered as DEPRECATED
941 136 : subtitleStreams = 0;
942 136 : std::lock_guard<std::mutex> lock{m_sinkMutex};
943 136 : if (m_mediaSourceType == firebolt::rialto::MediaSourceType::VIDEO)
944 : {
945 27 : videoStreams = m_numOfStreams;
946 27 : if (m_isSinglePathStream)
947 : {
948 26 : audioStreams = 0;
949 26 : subtitleStreams = 0;
950 : }
951 : }
952 109 : else if (m_mediaSourceType == firebolt::rialto::MediaSourceType::AUDIO)
953 : {
954 98 : audioStreams = m_numOfStreams;
955 98 : if (m_isSinglePathStream)
956 : {
957 97 : videoStreams = 0;
958 97 : subtitleStreams = 0;
959 : }
960 : }
961 11 : else if (m_mediaSourceType == firebolt::rialto::MediaSourceType::SUBTITLE)
962 : {
963 11 : subtitleStreams = m_numOfStreams;
964 11 : if (m_isSinglePathStream)
965 : {
966 11 : videoStreams = 0;
967 11 : audioStreams = 0;
968 : }
969 : }
970 136 : }
971 :
972 140 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
973 140 : if (!client)
974 : {
975 0 : GST_ERROR_OBJECT(m_sink, "MediaPlayerClient is nullptr");
976 0 : return false;
977 : }
978 :
979 140 : client->handleStreamCollection(audioStreams, videoStreams, subtitleStreams);
980 :
981 140 : return true;
982 : }
|