Line data Source code
1 : /*
2 : * Copyright (C) 2025 Sky UK
3 : *
4 : * This library is free software; you can redistribute it and/or
5 : * modify it under the terms of the GNU Lesser General Public
6 : * License as published by the Free Software Foundation;
7 : * version 2.1 of the License.
8 : *
9 : * This library is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 : * Lesser General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU Lesser General Public
15 : * License along with this library; if not, write to the Free Software
16 : * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 : */
18 :
19 : #include "PullModePlaybackDelegate.h"
20 : #include "ControlBackend.h"
21 : #include "GstreamerCatLog.h"
22 : #include "RialtoGStreamerMSEBaseSink.h"
23 : #include "RialtoGStreamerMSEBaseSinkPrivate.h"
24 :
25 : #define GST_CAT_DEFAULT rialtoGStreamerCat
26 :
27 : namespace
28 : {
29 302 : GstObject *getOldestGstBinParent(GstElement *element)
30 : {
31 302 : GstObject *parent = gst_object_get_parent(GST_OBJECT_CAST(element));
32 302 : GstObject *result = GST_OBJECT_CAST(element);
33 302 : if (parent)
34 : {
35 151 : if (GST_IS_BIN(parent))
36 : {
37 151 : result = getOldestGstBinParent(GST_ELEMENT_CAST(parent));
38 : }
39 151 : gst_object_unref(parent);
40 : }
41 :
42 302 : return result;
43 : }
44 :
45 6 : unsigned getGstPlayFlag(const char *nick)
46 : {
47 6 : GFlagsClass *flagsClass = static_cast<GFlagsClass *>(g_type_class_ref(g_type_from_name("GstPlayFlags")));
48 6 : GFlagsValue *flag = g_flags_get_value_by_nick(flagsClass, nick);
49 6 : return flag ? flag->value : 0;
50 : }
51 :
52 145 : bool getNStreamsFromParent(GstObject *parentObject, gint &n_video, gint &n_audio, gint &n_text)
53 : {
54 145 : if (g_object_class_find_property(G_OBJECT_GET_CLASS(parentObject), "n-video") &&
55 147 : g_object_class_find_property(G_OBJECT_GET_CLASS(parentObject), "n-audio") &&
56 2 : g_object_class_find_property(G_OBJECT_GET_CLASS(parentObject), "n-text"))
57 : {
58 2 : g_object_get(parentObject, "n-video", &n_video, "n-audio", &n_audio, "n-text", &n_text, nullptr);
59 :
60 2 : if (g_object_class_find_property(G_OBJECT_GET_CLASS(parentObject), "flags"))
61 : {
62 2 : guint flags = 0;
63 2 : g_object_get(parentObject, "flags", &flags, nullptr);
64 2 : n_video = (flags & getGstPlayFlag("video")) ? n_video : 0;
65 2 : n_audio = (flags & getGstPlayFlag("audio")) ? n_audio : 0;
66 2 : n_text = (flags & getGstPlayFlag("text")) ? n_text : 0;
67 : }
68 :
69 2 : return true;
70 : }
71 :
72 143 : return false;
73 : }
74 : } // namespace
75 :
76 270 : PullModePlaybackDelegate::PullModePlaybackDelegate(GstElement *sink) : m_sink{sink}
77 : {
78 270 : RialtoMSEBaseSink *baseSink = RIALTO_MSE_BASE_SINK(sink);
79 270 : m_sinkPad = baseSink->priv->m_sinkPad;
80 270 : m_rialtoControlClient = std::make_unique<firebolt::rialto::client::ControlBackend>();
81 270 : gst_segment_init(&m_lastSegment, GST_FORMAT_TIME);
82 : }
83 :
84 270 : PullModePlaybackDelegate::~PullModePlaybackDelegate()
85 : {
86 270 : if (m_caps)
87 143 : gst_caps_unref(m_caps);
88 270 : clearBuffersUnlocked();
89 : }
90 :
91 433 : void PullModePlaybackDelegate::clearBuffersUnlocked()
92 : {
93 433 : m_isSinkFlushOngoing = true;
94 433 : m_needDataCondVariable.notify_all();
95 464 : while (!m_samples.empty())
96 : {
97 31 : GstSample *sample = m_samples.front();
98 31 : m_samples.pop();
99 31 : gst_sample_unref(sample);
100 : }
101 433 : setLastBuffer(nullptr);
102 : }
103 :
104 139 : void PullModePlaybackDelegate::setSourceId(int32_t sourceId)
105 : {
106 139 : std::unique_lock lock{m_sinkMutex};
107 139 : m_sourceId = sourceId;
108 : }
109 :
110 3 : void PullModePlaybackDelegate::handleEos()
111 : {
112 3 : GstState currentState = GST_STATE(m_sink);
113 3 : if ((currentState != GST_STATE_PAUSED) && (currentState != GST_STATE_PLAYING))
114 : {
115 1 : GST_ERROR_OBJECT(m_sink, "Sink cannot post a EOS message in state '%s', posting an error instead",
116 : gst_element_state_get_name(currentState));
117 :
118 1 : const char *errMessage = "Rialto sinks received EOS in non-playing state";
119 1 : GError *gError{g_error_new_literal(GST_STREAM_ERROR, 0, errMessage)};
120 1 : gst_element_post_message(m_sink, gst_message_new_error(GST_OBJECT_CAST(m_sink), gError, errMessage));
121 1 : g_error_free(gError);
122 : }
123 : else
124 : {
125 2 : std::unique_lock lock{m_sinkMutex};
126 2 : if (!m_isSinkFlushOngoing && !m_isServerFlushOngoing)
127 : {
128 1 : gst_element_post_message(m_sink, gst_message_new_eos(GST_OBJECT_CAST(m_sink)));
129 : }
130 : else
131 : {
132 1 : GST_WARNING_OBJECT(m_sink, "Skip sending eos message - flush is ongoing...");
133 : }
134 2 : }
135 3 : }
136 :
137 4 : void PullModePlaybackDelegate::handleFlushCompleted()
138 : {
139 4 : GST_INFO_OBJECT(m_sink, "Flush completed");
140 4 : std::unique_lock<std::mutex> lock(m_sinkMutex);
141 4 : m_isServerFlushOngoing = false;
142 : }
143 :
144 39 : void PullModePlaybackDelegate::handleStateChanged(firebolt::rialto::PlaybackState state)
145 : {
146 39 : GstState current = GST_STATE(m_sink);
147 39 : GstState next = GST_STATE_NEXT(m_sink);
148 39 : GstState pending = GST_STATE_PENDING(m_sink);
149 39 : GstState postNext = next == pending ? GST_STATE_VOID_PENDING : pending;
150 :
151 39 : GST_DEBUG_OBJECT(m_sink,
152 : "Received server's state change to %u. Sink's states are: current state: %s next state: %s "
153 : "pending state: %s, last return state %s",
154 : static_cast<uint32_t>(state), gst_element_state_get_name(current),
155 : gst_element_state_get_name(next), gst_element_state_get_name(pending),
156 : gst_element_state_change_return_get_name(GST_STATE_RETURN(m_sink)));
157 :
158 39 : if (m_isStateCommitNeeded)
159 : {
160 39 : if ((state == firebolt::rialto::PlaybackState::PAUSED && next == GST_STATE_PAUSED) ||
161 7 : (state == firebolt::rialto::PlaybackState::PLAYING && next == GST_STATE_PLAYING))
162 : {
163 38 : GST_STATE(m_sink) = next;
164 38 : GST_STATE_NEXT(m_sink) = postNext;
165 38 : GST_STATE_PENDING(m_sink) = GST_STATE_VOID_PENDING;
166 38 : GST_STATE_RETURN(m_sink) = GST_STATE_CHANGE_SUCCESS;
167 :
168 38 : GST_INFO_OBJECT(m_sink, "Async state transition to state %s done", gst_element_state_get_name(next));
169 :
170 38 : gst_element_post_message(m_sink,
171 38 : gst_message_new_state_changed(GST_OBJECT_CAST(m_sink), current, next, pending));
172 38 : postAsyncDone();
173 : }
174 : /* Immediately transition to PLAYING when prerolled and PLAY is requested */
175 1 : else if (state == firebolt::rialto::PlaybackState::PAUSED && current == GST_STATE_PAUSED &&
176 : next == GST_STATE_PLAYING)
177 : {
178 1 : GST_INFO_OBJECT(m_sink, "Async state transition to PAUSED done. Transitioning to PLAYING");
179 1 : changeState(GST_STATE_CHANGE_PAUSED_TO_PLAYING);
180 : }
181 : }
182 39 : }
183 :
184 853 : GstStateChangeReturn PullModePlaybackDelegate::changeState(GstStateChange transition)
185 : {
186 853 : GstState current_state = GST_STATE_TRANSITION_CURRENT(transition);
187 853 : GstState next_state = GST_STATE_TRANSITION_NEXT(transition);
188 853 : GST_INFO_OBJECT(m_sink, "State change: (%s) -> (%s)", gst_element_state_get_name(current_state),
189 : gst_element_state_get_name(next_state));
190 :
191 853 : GstStateChangeReturn status = GST_STATE_CHANGE_SUCCESS;
192 853 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
193 :
194 853 : switch (transition)
195 : {
196 270 : case GST_STATE_CHANGE_NULL_TO_READY:
197 270 : if (!m_sinkPad)
198 : {
199 0 : GST_ERROR_OBJECT(m_sink, "Cannot start, because there's no sink pad");
200 0 : return GST_STATE_CHANGE_FAILURE;
201 : }
202 270 : if (!m_rialtoControlClient->waitForRunning())
203 : {
204 0 : GST_ERROR_OBJECT(m_sink, "Control: Rialto client cannot reach running state");
205 0 : return GST_STATE_CHANGE_FAILURE;
206 : }
207 270 : GST_INFO_OBJECT(m_sink, "Control: Rialto client reached running state");
208 270 : break;
209 147 : case GST_STATE_CHANGE_READY_TO_PAUSED:
210 : {
211 147 : if (!client)
212 : {
213 0 : GST_ERROR_OBJECT(m_sink, "Cannot get the media player client object");
214 0 : return GST_STATE_CHANGE_FAILURE;
215 : }
216 :
217 147 : m_isSinkFlushOngoing = false;
218 :
219 147 : StateChangeResult result = client->pause(m_sourceId);
220 147 : if (result == StateChangeResult::SUCCESS_ASYNC || result == StateChangeResult::NOT_ATTACHED)
221 : {
222 : // NOT_ATTACHED is not a problem here, because source will be attached later when GST_EVENT_CAPS is received
223 147 : if (result == StateChangeResult::NOT_ATTACHED)
224 : {
225 147 : postAsyncStart();
226 : }
227 147 : status = GST_STATE_CHANGE_ASYNC;
228 : }
229 :
230 147 : break;
231 : }
232 8 : case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
233 : {
234 8 : if (!client)
235 : {
236 0 : GST_ERROR_OBJECT(m_sink, "Cannot get the media player client object");
237 0 : return GST_STATE_CHANGE_FAILURE;
238 : }
239 :
240 8 : StateChangeResult result = client->play(m_sourceId);
241 8 : if (result == StateChangeResult::SUCCESS_ASYNC)
242 : {
243 8 : status = GST_STATE_CHANGE_ASYNC;
244 : }
245 0 : else if (result == StateChangeResult::NOT_ATTACHED)
246 : {
247 0 : GST_ERROR_OBJECT(m_sink, "Failed to change state to playing");
248 0 : return GST_STATE_CHANGE_FAILURE;
249 : }
250 :
251 8 : break;
252 : }
253 7 : case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
254 : {
255 7 : if (!client)
256 : {
257 0 : GST_ERROR_OBJECT(m_sink, "Cannot get the media player client object");
258 0 : return GST_STATE_CHANGE_FAILURE;
259 : }
260 :
261 7 : StateChangeResult result = client->pause(m_sourceId);
262 7 : if (result == StateChangeResult::SUCCESS_ASYNC)
263 : {
264 7 : status = GST_STATE_CHANGE_ASYNC;
265 : }
266 0 : else if (result == StateChangeResult::NOT_ATTACHED)
267 : {
268 0 : GST_ERROR_OBJECT(m_sink, "Failed to change state to paused");
269 0 : return GST_STATE_CHANGE_FAILURE;
270 : }
271 :
272 7 : break;
273 : }
274 147 : case GST_STATE_CHANGE_PAUSED_TO_READY:
275 147 : if (!client)
276 : {
277 0 : GST_ERROR_OBJECT(m_sink, "Cannot get the media player client object");
278 0 : return GST_STATE_CHANGE_FAILURE;
279 : }
280 :
281 147 : if (m_isStateCommitNeeded)
282 : {
283 126 : GST_DEBUG_OBJECT(m_sink, "Sending async_done in PAUSED->READY transition");
284 126 : postAsyncDone();
285 : }
286 :
287 147 : client->removeSource(m_sourceId);
288 : {
289 147 : std::lock_guard<std::mutex> lock(m_sinkMutex);
290 147 : clearBuffersUnlocked();
291 147 : m_sourceAttached = false;
292 : }
293 147 : break;
294 270 : case GST_STATE_CHANGE_READY_TO_NULL:
295 : // Playback will be stopped once all sources are finished and ref count
296 : // of the media pipeline object reaches 0
297 270 : m_mediaPlayerManager.releaseMediaPlayerClient();
298 270 : m_rialtoControlClient->removeControlBackend();
299 270 : break;
300 4 : default:
301 4 : break;
302 : }
303 :
304 853 : return status;
305 : }
306 :
307 2 : void PullModePlaybackDelegate::handleError(const std::string &message, gint code)
308 : {
309 2 : GError *gError{g_error_new_literal(GST_STREAM_ERROR, code, message.c_str())};
310 4 : gst_element_post_message(GST_ELEMENT_CAST(m_sink),
311 2 : gst_message_new_error(GST_OBJECT_CAST(m_sink), gError, message.c_str()));
312 2 : g_error_free(gError);
313 : }
314 :
315 301 : void PullModePlaybackDelegate::postAsyncStart()
316 : {
317 301 : m_isStateCommitNeeded = true;
318 301 : gst_element_post_message(GST_ELEMENT_CAST(m_sink), gst_message_new_async_start(GST_OBJECT(m_sink)));
319 : }
320 :
321 164 : void PullModePlaybackDelegate::postAsyncDone()
322 : {
323 164 : m_isStateCommitNeeded = false;
324 164 : gst_element_post_message(m_sink, gst_message_new_async_done(GST_OBJECT_CAST(m_sink), GST_CLOCK_TIME_NONE));
325 : }
326 :
327 314 : void PullModePlaybackDelegate::setProperty(const Property &type, const GValue *value)
328 : {
329 314 : switch (type)
330 : {
331 156 : case Property::IsSinglePathStream:
332 : {
333 156 : std::lock_guard<std::mutex> lock(m_sinkMutex);
334 156 : m_isSinglePathStream = g_value_get_boolean(value) != FALSE;
335 156 : break;
336 : }
337 156 : case Property::NumberOfStreams:
338 : {
339 156 : std::lock_guard<std::mutex> lock(m_sinkMutex);
340 156 : m_numOfStreams = g_value_get_int(value);
341 156 : break;
342 : }
343 1 : case Property::HasDrm:
344 : {
345 1 : std::lock_guard<std::mutex> lock(m_sinkMutex);
346 1 : m_hasDrm = g_value_get_boolean(value) != FALSE;
347 1 : break;
348 : }
349 1 : case Property::EnableLastSample:
350 : {
351 1 : std::lock_guard<std::mutex> lock(m_sinkMutex);
352 1 : m_enableLastSample = g_value_get_boolean(value) != FALSE;
353 1 : if (!m_enableLastSample)
354 : {
355 0 : if (m_lastBuffer)
356 : {
357 0 : gst_buffer_unref(m_lastBuffer);
358 0 : m_lastBuffer = nullptr;
359 : }
360 : }
361 1 : break;
362 : }
363 0 : default:
364 : {
365 0 : break;
366 : }
367 : }
368 314 : }
369 :
370 11 : void PullModePlaybackDelegate::getProperty(const Property &type, GValue *value)
371 : {
372 11 : switch (type)
373 : {
374 1 : case Property::IsSinglePathStream:
375 : {
376 1 : std::lock_guard<std::mutex> lock(m_sinkMutex);
377 1 : g_value_set_boolean(value, m_isSinglePathStream ? TRUE : FALSE);
378 1 : break;
379 : }
380 1 : case Property::NumberOfStreams:
381 : {
382 1 : std::lock_guard<std::mutex> lock(m_sinkMutex);
383 1 : g_value_set_int(value, m_numOfStreams);
384 1 : break;
385 : }
386 1 : case Property::HasDrm:
387 : {
388 1 : std::lock_guard<std::mutex> lock(m_sinkMutex);
389 1 : g_value_set_boolean(value, m_hasDrm ? TRUE : FALSE);
390 1 : break;
391 : }
392 2 : case Property::Stats:
393 : {
394 2 : std::lock_guard<std::mutex> lock(m_sinkMutex);
395 2 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
396 2 : if (!client)
397 : {
398 1 : GST_ERROR_OBJECT(m_sink, "Could not get the media player client");
399 1 : break;
400 : }
401 :
402 1 : guint64 totalVideoFrames{0};
403 1 : guint64 droppedVideoFrames{0};
404 1 : if (client->getStats(m_sourceId, totalVideoFrames, droppedVideoFrames))
405 : {
406 1 : GstStructure *stats{gst_structure_new("stats", "rendered", G_TYPE_UINT64, totalVideoFrames, "dropped",
407 : G_TYPE_UINT64, droppedVideoFrames, nullptr)};
408 1 : g_value_set_pointer(value, stats);
409 : }
410 : else
411 : {
412 0 : GST_ERROR_OBJECT(m_sink, "No stats returned from client");
413 : }
414 3 : }
415 : case Property::EnableLastSample:
416 : {
417 3 : std::lock_guard<std::mutex> lock(m_sinkMutex);
418 3 : g_value_set_boolean(value, m_enableLastSample ? TRUE : FALSE);
419 3 : break;
420 : }
421 4 : case Property::LastSample:
422 : {
423 : // Mutex inside getLastSample function
424 4 : gst_value_take_sample(value, getLastSample());
425 4 : break;
426 : }
427 0 : default:
428 : {
429 0 : break;
430 : }
431 : }
432 11 : }
433 :
434 22 : std::optional<gboolean> PullModePlaybackDelegate::handleQuery(GstQuery *query) const
435 : {
436 22 : GST_DEBUG_OBJECT(m_sink, "handling query '%s'", GST_QUERY_TYPE_NAME(query));
437 22 : switch (GST_QUERY_TYPE(query))
438 : {
439 1 : case GST_QUERY_SEEKING:
440 : {
441 : GstFormat fmt;
442 1 : gst_query_parse_seeking(query, &fmt, NULL, NULL, NULL);
443 1 : gst_query_set_seeking(query, fmt, FALSE, 0, -1);
444 1 : return TRUE;
445 : }
446 5 : case GST_QUERY_POSITION:
447 : {
448 5 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
449 5 : if (!client)
450 : {
451 1 : return FALSE;
452 : }
453 :
454 : GstFormat fmt;
455 4 : gst_query_parse_position(query, &fmt, NULL);
456 4 : switch (fmt)
457 : {
458 3 : case GST_FORMAT_TIME:
459 : {
460 3 : gint64 position = client->getPosition(m_sourceId);
461 3 : GST_DEBUG_OBJECT(m_sink, "Queried position is %" GST_TIME_FORMAT, GST_TIME_ARGS(position));
462 3 : if (position < 0)
463 : {
464 2 : return FALSE;
465 : }
466 :
467 1 : gst_query_set_position(query, fmt, position);
468 1 : break;
469 : }
470 1 : default:
471 1 : break;
472 : }
473 2 : return TRUE;
474 5 : }
475 2 : case GST_QUERY_SEGMENT:
476 : {
477 2 : std::lock_guard<std::mutex> lock(m_sinkMutex);
478 2 : GstFormat format{m_lastSegment.format};
479 2 : gint64 start{static_cast<gint64>(gst_segment_to_stream_time(&m_lastSegment, format, m_lastSegment.start))};
480 2 : gint64 stop{0};
481 2 : if (m_lastSegment.stop == GST_CLOCK_TIME_NONE)
482 : {
483 2 : stop = m_lastSegment.duration;
484 : }
485 : else
486 : {
487 0 : stop = gst_segment_to_stream_time(&m_lastSegment, format, m_lastSegment.stop);
488 : }
489 2 : gst_query_set_segment(query, m_lastSegment.rate, format, start, stop);
490 2 : return TRUE;
491 : }
492 14 : default:
493 14 : break;
494 : }
495 14 : return std::nullopt;
496 : }
497 :
498 29 : gboolean PullModePlaybackDelegate::handleSendEvent(GstEvent *event)
499 : {
500 29 : GST_DEBUG_OBJECT(m_sink, "handling event '%s'", GST_EVENT_TYPE_NAME(event));
501 29 : bool shouldForwardUpstream = GST_EVENT_IS_UPSTREAM(event);
502 :
503 29 : switch (GST_EVENT_TYPE(event))
504 : {
505 13 : case GST_EVENT_SEEK:
506 : {
507 13 : gdouble rate{1.0};
508 13 : GstFormat seekFormat{GST_FORMAT_UNDEFINED};
509 13 : GstSeekFlags flags{GST_SEEK_FLAG_NONE};
510 13 : GstSeekType startType{GST_SEEK_TYPE_NONE}, stopType{GST_SEEK_TYPE_NONE};
511 13 : gint64 start{0}, stop{0};
512 13 : if (event)
513 : {
514 13 : gst_event_parse_seek(event, &rate, &seekFormat, &flags, &startType, &start, &stopType, &stop);
515 :
516 13 : if (flags & GST_SEEK_FLAG_FLUSH)
517 : {
518 9 : if (seekFormat == GST_FORMAT_TIME && startType == GST_SEEK_TYPE_END)
519 : {
520 1 : GST_ERROR_OBJECT(m_sink, "GST_SEEK_TYPE_END seek is not supported");
521 1 : gst_event_unref(event);
522 5 : return FALSE;
523 : }
524 : // Update last segment
525 8 : if (seekFormat == GST_FORMAT_TIME)
526 : {
527 7 : gboolean update{FALSE};
528 7 : std::lock_guard<std::mutex> lock(m_sinkMutex);
529 7 : gst_segment_do_seek(&m_lastSegment, rate, seekFormat, flags, startType, start, stopType, stop,
530 : &update);
531 : }
532 : }
533 : #if GST_CHECK_VERSION(1, 18, 0)
534 4 : else if (flags & GST_SEEK_FLAG_INSTANT_RATE_CHANGE)
535 : {
536 2 : gdouble rateMultiplier = rate / m_lastSegment.rate;
537 2 : GstEvent *rateChangeEvent = gst_event_new_instant_rate_change(rateMultiplier, (GstSegmentFlags)flags);
538 2 : gst_event_set_seqnum(rateChangeEvent, gst_event_get_seqnum(event));
539 2 : gst_event_unref(event);
540 2 : if (gst_pad_send_event(m_sinkPad, rateChangeEvent) != TRUE)
541 : {
542 1 : GST_ERROR_OBJECT(m_sink, "Sending instant rate change failed.");
543 1 : return FALSE;
544 : }
545 1 : return TRUE;
546 : }
547 : #endif
548 : else
549 : {
550 2 : GST_WARNING_OBJECT(m_sink, "Seek with flags 0x%X is not supported", flags);
551 2 : gst_event_unref(event);
552 2 : return FALSE;
553 : }
554 : }
555 8 : break;
556 : }
557 : #if GST_CHECK_VERSION(1, 18, 0)
558 2 : case GST_EVENT_INSTANT_RATE_SYNC_TIME:
559 : {
560 2 : double rate{0.0};
561 2 : GstClockTime runningTime{GST_CLOCK_TIME_NONE}, upstreamRunningTime{GST_CLOCK_TIME_NONE};
562 2 : guint32 seqnum = gst_event_get_seqnum(event);
563 2 : gst_event_parse_instant_rate_sync_time(event, &rate, &runningTime, &upstreamRunningTime);
564 :
565 2 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
566 2 : if ((client) && (m_mediaPlayerManager.hasControl()))
567 : {
568 2 : GST_DEBUG_OBJECT(m_sink, "Instant playback rate change: %.2f", rate);
569 2 : m_currentInstantRateChangeSeqnum = seqnum;
570 2 : client->setPlaybackRate(rate);
571 : }
572 2 : break;
573 : }
574 : #endif
575 14 : default:
576 14 : break;
577 : }
578 :
579 24 : if (shouldForwardUpstream)
580 : {
581 24 : bool result = gst_pad_push_event(m_sinkPad, event);
582 24 : if (!result)
583 : {
584 10 : GST_DEBUG_OBJECT(m_sink, "forwarding upstream event '%s' failed", GST_EVENT_TYPE_NAME(event));
585 : }
586 :
587 24 : return result;
588 : }
589 :
590 0 : gst_event_unref(event);
591 0 : return TRUE;
592 : }
593 :
594 199 : gboolean PullModePlaybackDelegate::handleEvent(GstPad *pad, GstObject *parent, GstEvent *event)
595 : {
596 199 : GST_DEBUG_OBJECT(m_sink, "handling event %" GST_PTR_FORMAT, event);
597 199 : switch (GST_EVENT_TYPE(event))
598 : {
599 7 : case GST_EVENT_SEGMENT:
600 : {
601 7 : copySegment(event);
602 7 : setSegment();
603 7 : break;
604 : }
605 3 : case GST_EVENT_EOS:
606 : {
607 3 : std::lock_guard<std::mutex> lock(m_sinkMutex);
608 3 : m_isEos = true;
609 3 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
610 3 : if (client)
611 : {
612 0 : client->getFlushAndDataSynchronizer().notifyDataReceived(m_sourceId);
613 : }
614 3 : break;
615 : }
616 147 : case GST_EVENT_CAPS:
617 : {
618 : GstCaps *caps;
619 147 : gst_event_parse_caps(event, &caps);
620 : {
621 147 : std::lock_guard<std::mutex> lock(m_sinkMutex);
622 147 : if (m_caps)
623 : {
624 4 : if (!gst_caps_is_equal(caps, m_caps))
625 : {
626 1 : gst_caps_unref(m_caps);
627 1 : m_caps = gst_caps_copy(caps);
628 : }
629 : }
630 : else
631 : {
632 143 : m_caps = gst_caps_copy(caps);
633 : }
634 147 : }
635 147 : break;
636 : }
637 1 : case GST_EVENT_SINK_MESSAGE:
638 : {
639 1 : GstMessage *message = nullptr;
640 1 : gst_event_parse_sink_message(event, &message);
641 :
642 1 : if (message)
643 : {
644 1 : gst_element_post_message(m_sink, message);
645 : }
646 :
647 1 : break;
648 : }
649 8 : case GST_EVENT_CUSTOM_DOWNSTREAM:
650 : case GST_EVENT_CUSTOM_DOWNSTREAM_OOB:
651 : {
652 8 : if (gst_event_has_name(event, "custom-instant-rate-change"))
653 : {
654 2 : GST_DEBUG_OBJECT(m_sink, "Change rate event received");
655 2 : changePlaybackRate(event);
656 : }
657 8 : break;
658 : }
659 16 : case GST_EVENT_FLUSH_START:
660 : {
661 16 : startFlushing();
662 16 : break;
663 : }
664 7 : case GST_EVENT_FLUSH_STOP:
665 : {
666 7 : gboolean resetTime{FALSE};
667 7 : gst_event_parse_flush_stop(event, &resetTime);
668 :
669 7 : stopFlushing(resetTime);
670 7 : break;
671 : }
672 3 : case GST_EVENT_STREAM_COLLECTION:
673 : {
674 3 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
675 3 : if (!client)
676 : {
677 1 : gst_event_unref(event);
678 1 : return FALSE;
679 : }
680 2 : int32_t videoStreams{0}, audioStreams{0}, textStreams{0};
681 2 : GstStreamCollection *streamCollection{nullptr};
682 2 : gst_event_parse_stream_collection(event, &streamCollection);
683 2 : guint streamsSize = gst_stream_collection_get_size(streamCollection);
684 6 : for (guint i = 0; i < streamsSize; ++i)
685 : {
686 4 : auto *stream = gst_stream_collection_get_stream(streamCollection, i);
687 4 : auto type = gst_stream_get_stream_type(stream);
688 4 : if (type & GST_STREAM_TYPE_AUDIO)
689 : {
690 2 : ++audioStreams;
691 : }
692 2 : else if (type & GST_STREAM_TYPE_VIDEO)
693 : {
694 1 : ++videoStreams;
695 : }
696 1 : else if (type & GST_STREAM_TYPE_TEXT)
697 : {
698 1 : ++textStreams;
699 : }
700 : }
701 2 : gst_object_unref(streamCollection);
702 2 : client->handleStreamCollection(audioStreams, videoStreams, textStreams);
703 2 : client->sendAllSourcesAttachedIfPossible();
704 2 : break;
705 3 : }
706 : #if GST_CHECK_VERSION(1, 18, 0)
707 4 : case GST_EVENT_INSTANT_RATE_CHANGE:
708 : {
709 4 : guint32 seqnum = gst_event_get_seqnum(event);
710 7 : if (m_lastInstantRateChangeSeqnum == seqnum || m_currentInstantRateChangeSeqnum.load() == seqnum)
711 : {
712 : /* Ignore if we already received the instant-rate-sync-time event from the pipeline */
713 2 : GST_DEBUG_OBJECT(m_sink, "Instant rate change event with seqnum %u already handled. Ignoring...", seqnum);
714 2 : break;
715 : }
716 :
717 2 : m_lastInstantRateChangeSeqnum = seqnum;
718 2 : gdouble rate{0.0};
719 2 : GstSegmentFlags flags{GST_SEGMENT_FLAG_NONE};
720 2 : gst_event_parse_instant_rate_change(event, &rate, &flags);
721 2 : GstMessage *msg = gst_message_new_instant_rate_request(GST_OBJECT_CAST(m_sink), rate);
722 2 : gst_message_set_seqnum(msg, seqnum);
723 2 : gst_element_post_message(m_sink, msg);
724 2 : break;
725 : }
726 : #endif
727 3 : default:
728 3 : break;
729 : }
730 :
731 198 : gst_event_unref(event);
732 :
733 198 : return TRUE;
734 : }
735 :
736 7 : void PullModePlaybackDelegate::copySegment(GstEvent *event)
737 : {
738 7 : std::lock_guard<std::mutex> lock(m_sinkMutex);
739 7 : gst_event_copy_segment(event, &m_lastSegment);
740 : }
741 :
742 7 : void PullModePlaybackDelegate::setSegment()
743 : {
744 7 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
745 7 : if (!client)
746 : {
747 1 : GST_ERROR_OBJECT(m_sink, "Could not get the media player client");
748 1 : return;
749 : }
750 6 : const bool kResetTime{m_lastSegment.flags == GST_SEGMENT_FLAG_RESET};
751 6 : int64_t position = static_cast<int64_t>(m_lastSegment.start);
752 6 : client->setSourcePosition(m_sourceId, position, kResetTime, m_lastSegment.applied_rate, m_lastSegment.stop);
753 6 : m_segmentSet = true;
754 7 : }
755 :
756 2 : void PullModePlaybackDelegate::changePlaybackRate(GstEvent *event)
757 : {
758 2 : const GstStructure *structure{gst_event_get_structure(event)};
759 2 : gdouble playbackRate{1.0};
760 2 : if (gst_structure_get_double(structure, "rate", &playbackRate) == TRUE)
761 : {
762 2 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
763 2 : if (client && m_mediaPlayerManager.hasControl())
764 : {
765 1 : GST_DEBUG_OBJECT(m_sink, "Instant playback rate change: %.2f", playbackRate);
766 1 : client->setPlaybackRate(playbackRate);
767 : }
768 2 : }
769 : }
770 :
771 16 : void PullModePlaybackDelegate::startFlushing()
772 : {
773 16 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
774 16 : if (client)
775 : {
776 5 : client->getFlushAndDataSynchronizer().waitIfRequired(m_sourceId);
777 : }
778 16 : std::lock_guard<std::mutex> lock(m_sinkMutex);
779 16 : if (!m_isSinkFlushOngoing)
780 : {
781 16 : GST_INFO_OBJECT(m_sink, "Starting flushing");
782 16 : if (m_isEos)
783 : {
784 2 : GST_DEBUG_OBJECT(m_sink, "Flush will clear EOS state.");
785 2 : m_isEos = false;
786 : }
787 16 : m_isSinkFlushOngoing = true;
788 16 : m_segmentSet = false;
789 16 : clearBuffersUnlocked();
790 : }
791 : }
792 :
793 7 : void PullModePlaybackDelegate::stopFlushing(bool resetTime)
794 : {
795 7 : GST_INFO_OBJECT(m_sink, "Stopping flushing");
796 7 : flushServer(resetTime);
797 7 : std::lock_guard<std::mutex> lock(m_sinkMutex);
798 7 : m_isSinkFlushOngoing = false;
799 :
800 7 : if (resetTime)
801 : {
802 7 : GST_DEBUG_OBJECT(m_sink, "sending reset_time message");
803 7 : gst_element_post_message(m_sink, gst_message_new_reset_time(GST_OBJECT_CAST(m_sink), 0));
804 : }
805 : }
806 :
807 7 : void PullModePlaybackDelegate::flushServer(bool resetTime)
808 : {
809 7 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
810 7 : if (!client)
811 : {
812 1 : GST_ERROR_OBJECT(m_sink, "Could not get the media player client");
813 1 : return;
814 : }
815 :
816 : {
817 6 : std::unique_lock<std::mutex> lock(m_sinkMutex);
818 6 : m_isServerFlushOngoing = true;
819 : }
820 6 : client->flush(m_sourceId, resetTime);
821 7 : }
822 :
823 35 : GstFlowReturn PullModePlaybackDelegate::handleBuffer(GstBuffer *buffer)
824 : {
825 35 : constexpr size_t kMaxInternalBuffersQueueSize = 24;
826 35 : GST_LOG_OBJECT(m_sink, "Handling buffer %p with PTS %" GST_TIME_FORMAT, buffer,
827 : GST_TIME_ARGS(GST_BUFFER_PTS(buffer)));
828 :
829 35 : std::unique_lock<std::mutex> lock(m_sinkMutex);
830 :
831 35 : if (m_samples.size() >= kMaxInternalBuffersQueueSize)
832 : {
833 1 : GST_DEBUG_OBJECT(m_sink, "Waiting for more space in buffers queue\n");
834 1 : m_needDataCondVariable.wait(lock);
835 : }
836 :
837 35 : if (m_isSinkFlushOngoing)
838 : {
839 3 : GST_DEBUG_OBJECT(m_sink, "Discarding buffer which was received during flushing");
840 3 : gst_buffer_unref(buffer);
841 3 : return GST_FLOW_FLUSHING;
842 : }
843 :
844 32 : GstSample *sample = gst_sample_new(buffer, m_caps, &m_lastSegment, nullptr);
845 32 : if (sample)
846 32 : m_samples.push(sample);
847 : else
848 0 : GST_ERROR_OBJECT(m_sink, "Failed to create a sample");
849 :
850 32 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
851 32 : if (client)
852 : {
853 28 : client->getFlushAndDataSynchronizer().notifyDataReceived(m_sourceId);
854 : }
855 :
856 32 : setLastBuffer(buffer);
857 :
858 32 : gst_buffer_unref(buffer);
859 :
860 32 : return GST_FLOW_OK;
861 35 : }
862 :
863 1 : GstRefSample PullModePlaybackDelegate::getFrontSample() const
864 : {
865 1 : std::lock_guard<std::mutex> lock(m_sinkMutex);
866 1 : if (m_isServerFlushOngoing)
867 : {
868 0 : GST_WARNING_OBJECT(m_sink, "Skip pulling buffer - flush is ongoing on server side...");
869 0 : return GstRefSample{};
870 : }
871 1 : if (!m_samples.empty())
872 : {
873 1 : GstSample *sample = m_samples.front();
874 1 : GstBuffer *buffer = gst_sample_get_buffer(sample);
875 1 : GST_LOG_OBJECT(m_sink, "Pulling buffer %p with PTS %" GST_TIME_FORMAT, buffer,
876 : GST_TIME_ARGS(GST_BUFFER_PTS(buffer)));
877 :
878 1 : return GstRefSample{sample};
879 : }
880 :
881 0 : return GstRefSample{};
882 1 : }
883 :
884 1 : void PullModePlaybackDelegate::popSample()
885 : {
886 1 : std::lock_guard<std::mutex> lock(m_sinkMutex);
887 1 : if (!m_samples.empty())
888 : {
889 1 : gst_sample_unref(m_samples.front());
890 1 : m_samples.pop();
891 : }
892 1 : m_needDataCondVariable.notify_all();
893 : }
894 :
895 0 : bool PullModePlaybackDelegate::isEos() const
896 : {
897 0 : std::lock_guard<std::mutex> lock(m_sinkMutex);
898 0 : return m_samples.empty() && m_isEos;
899 : }
900 :
901 2 : bool PullModePlaybackDelegate::isReadyToSendData() const
902 : {
903 2 : std::lock_guard<std::mutex> lock(m_sinkMutex);
904 4 : return m_isEos || m_segmentSet;
905 2 : }
906 :
907 7 : void PullModePlaybackDelegate::lostState()
908 : {
909 7 : m_isStateCommitNeeded = true;
910 7 : gst_element_lost_state(m_sink);
911 : }
912 :
913 151 : bool PullModePlaybackDelegate::attachToMediaClientAndSetStreamsNumber(const uint32_t maxVideoWidth,
914 : const uint32_t maxVideoHeight)
915 : {
916 151 : GstObject *parentObject = getOldestGstBinParent(m_sink);
917 151 : if (!m_mediaPlayerManager.attachMediaPlayerClient(parentObject, maxVideoWidth, maxVideoHeight))
918 : {
919 3 : GST_ERROR_OBJECT(m_sink, "Cannot attach the MediaPlayerClient");
920 3 : return false;
921 : }
922 :
923 148 : gchar *parentObjectName = gst_object_get_name(parentObject);
924 148 : GST_INFO_OBJECT(m_sink, "Attached media player client with parent %s(%p)", parentObjectName, parentObject);
925 148 : g_free(parentObjectName);
926 :
927 148 : return setStreamsNumber(parentObject);
928 : }
929 :
930 148 : bool PullModePlaybackDelegate::setStreamsNumber(GstObject *parentObject)
931 : {
932 148 : int32_t videoStreams{-1}, audioStreams{-1}, subtitleStreams{-1};
933 :
934 148 : GstContext *context = gst_element_get_context(m_sink, "streams-info");
935 148 : if (context)
936 : {
937 3 : GST_DEBUG_OBJECT(m_sink, "Getting number of streams from \"streams-info\" context");
938 :
939 3 : guint n_video{0}, n_audio{0}, n_text{0};
940 :
941 3 : const GstStructure *streamsInfoStructure = gst_context_get_structure(context);
942 3 : gst_structure_get_uint(streamsInfoStructure, "video-streams", &n_video);
943 3 : gst_structure_get_uint(streamsInfoStructure, "audio-streams", &n_audio);
944 3 : gst_structure_get_uint(streamsInfoStructure, "text-streams", &n_text);
945 :
946 5 : if (n_video > std::numeric_limits<int32_t>::max() || n_audio > std::numeric_limits<int32_t>::max() ||
947 2 : n_text > std::numeric_limits<int32_t>::max())
948 : {
949 1 : GST_ERROR_OBJECT(m_sink, "Number of streams is too big, video=%u, audio=%u, text=%u", n_video, n_audio,
950 : n_text);
951 1 : gst_context_unref(context);
952 1 : return false;
953 : }
954 :
955 2 : videoStreams = n_video;
956 2 : audioStreams = n_audio;
957 2 : subtitleStreams = n_text;
958 :
959 2 : gst_context_unref(context);
960 : }
961 145 : else if (getNStreamsFromParent(parentObject, videoStreams, audioStreams, subtitleStreams))
962 : {
963 2 : GST_DEBUG_OBJECT(m_sink, "Got number of streams from playbin2 properties");
964 : }
965 : else
966 : {
967 : // The default value of streams is V:1, A:1, S:0
968 : // Changing the default setting via properties is considered as DEPRECATED
969 143 : subtitleStreams = 0;
970 143 : std::lock_guard<std::mutex> lock{m_sinkMutex};
971 143 : if (m_mediaSourceType == firebolt::rialto::MediaSourceType::VIDEO)
972 : {
973 27 : videoStreams = m_numOfStreams;
974 27 : if (m_isSinglePathStream)
975 : {
976 26 : audioStreams = 0;
977 26 : subtitleStreams = 0;
978 : }
979 : }
980 116 : else if (m_mediaSourceType == firebolt::rialto::MediaSourceType::AUDIO)
981 : {
982 106 : audioStreams = m_numOfStreams;
983 106 : if (m_isSinglePathStream)
984 : {
985 105 : videoStreams = 0;
986 105 : subtitleStreams = 0;
987 : }
988 : }
989 10 : else if (m_mediaSourceType == firebolt::rialto::MediaSourceType::SUBTITLE)
990 : {
991 10 : subtitleStreams = m_numOfStreams;
992 10 : if (m_isSinglePathStream)
993 : {
994 10 : videoStreams = 0;
995 10 : audioStreams = 0;
996 : }
997 : }
998 143 : }
999 :
1000 147 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
1001 147 : if (!client)
1002 : {
1003 0 : GST_ERROR_OBJECT(m_sink, "MediaPlayerClient is nullptr");
1004 0 : return false;
1005 : }
1006 :
1007 147 : client->handleStreamCollection(audioStreams, videoStreams, subtitleStreams);
1008 :
1009 147 : return true;
1010 : }
1011 :
1012 4 : GstSample *PullModePlaybackDelegate::getLastSample() const
1013 : {
1014 4 : std::lock_guard<std::mutex> lock(m_sinkMutex);
1015 4 : if (m_enableLastSample && m_lastBuffer)
1016 : {
1017 2 : return gst_sample_new(m_lastBuffer, m_caps, &m_lastSegment, nullptr);
1018 : }
1019 2 : return nullptr;
1020 4 : }
1021 :
1022 465 : void PullModePlaybackDelegate::setLastBuffer(GstBuffer *buffer)
1023 : {
1024 465 : if (m_enableLastSample)
1025 : {
1026 3 : if (m_lastBuffer)
1027 : {
1028 2 : gst_buffer_unref(m_lastBuffer);
1029 : }
1030 3 : if (buffer)
1031 : {
1032 2 : m_lastBuffer = gst_buffer_ref(buffer);
1033 : }
1034 : else
1035 : {
1036 1 : m_lastBuffer = nullptr;
1037 : }
1038 : }
1039 465 : }
|