Line data Source code
1 : /*
2 : * Copyright (C) 2025 Sky UK
3 : *
4 : * This library is free software; you can redistribute it and/or
5 : * modify it under the terms of the GNU Lesser General Public
6 : * License as published by the Free Software Foundation;
7 : * version 2.1 of the License.
8 : *
9 : * This library is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 : * Lesser General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU Lesser General Public
15 : * License along with this library; if not, write to the Free Software
16 : * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 : */
18 :
19 : #include "PullModePlaybackDelegate.h"
20 : #include "ControlBackend.h"
21 : #include "GstreamerCatLog.h"
22 : #include "RialtoGStreamerMSEBaseSink.h"
23 : #include "RialtoGStreamerMSEBaseSinkPrivate.h"
24 :
25 : #define GST_CAT_DEFAULT rialtoGStreamerCat
26 :
27 : namespace
28 : {
29 310 : GstObject *getOldestGstBinParent(GstElement *element)
30 : {
31 310 : GstObject *parent = gst_object_get_parent(GST_OBJECT_CAST(element));
32 310 : GstObject *result = GST_OBJECT_CAST(element);
33 310 : if (parent)
34 : {
35 155 : if (GST_IS_BIN(parent))
36 : {
37 155 : result = getOldestGstBinParent(GST_ELEMENT_CAST(parent));
38 : }
39 155 : gst_object_unref(parent);
40 : }
41 :
42 310 : return result;
43 : }
44 :
45 6 : unsigned getGstPlayFlag(const char *nick)
46 : {
47 6 : GFlagsClass *flagsClass = static_cast<GFlagsClass *>(g_type_class_ref(g_type_from_name("GstPlayFlags")));
48 6 : GFlagsValue *flag = g_flags_get_value_by_nick(flagsClass, nick);
49 6 : return flag ? flag->value : 0;
50 : }
51 :
52 148 : bool getNStreamsFromParent(GstObject *parentObject, gint &n_video, gint &n_audio, gint &n_text)
53 : {
54 148 : if (g_object_class_find_property(G_OBJECT_GET_CLASS(parentObject), "n-video") &&
55 150 : g_object_class_find_property(G_OBJECT_GET_CLASS(parentObject), "n-audio") &&
56 2 : g_object_class_find_property(G_OBJECT_GET_CLASS(parentObject), "n-text"))
57 : {
58 2 : g_object_get(parentObject, "n-video", &n_video, "n-audio", &n_audio, "n-text", &n_text, nullptr);
59 :
60 2 : if (g_object_class_find_property(G_OBJECT_GET_CLASS(parentObject), "flags"))
61 : {
62 2 : guint flags = 0;
63 2 : g_object_get(parentObject, "flags", &flags, nullptr);
64 2 : n_video = (flags & getGstPlayFlag("video")) ? n_video : 0;
65 2 : n_audio = (flags & getGstPlayFlag("audio")) ? n_audio : 0;
66 2 : n_text = (flags & getGstPlayFlag("text")) ? n_text : 0;
67 : }
68 :
69 2 : return true;
70 : }
71 :
72 146 : return false;
73 : }
74 : } // namespace
75 :
76 276 : PullModePlaybackDelegate::PullModePlaybackDelegate(GstElement *sink) : m_sink{sink}
77 : {
78 276 : RialtoMSEBaseSink *baseSink = RIALTO_MSE_BASE_SINK(sink);
79 276 : m_sinkPad = baseSink->priv->m_sinkPad;
80 276 : m_rialtoControlClient = std::make_unique<firebolt::rialto::client::ControlBackend>();
81 276 : gst_segment_init(&m_lastSegment, GST_FORMAT_TIME);
82 : }
83 :
84 276 : PullModePlaybackDelegate::~PullModePlaybackDelegate()
85 : {
86 276 : if (m_caps)
87 146 : gst_caps_unref(m_caps);
88 276 : clearBuffersUnlocked();
89 : }
90 :
91 443 : void PullModePlaybackDelegate::clearBuffersUnlocked()
92 : {
93 443 : m_isSinkFlushOngoing = true;
94 443 : m_needDataCondVariable.notify_all();
95 474 : while (!m_samples.empty())
96 : {
97 31 : GstSample *sample = m_samples.front();
98 31 : m_samples.pop();
99 31 : gst_sample_unref(sample);
100 : }
101 443 : setLastBuffer(nullptr);
102 : }
103 :
104 142 : void PullModePlaybackDelegate::setSourceId(int32_t sourceId)
105 : {
106 142 : m_sourceId = sourceId;
107 : }
108 :
109 3 : void PullModePlaybackDelegate::handleEos()
110 : {
111 3 : GstState currentState = GST_STATE(m_sink);
112 3 : if ((currentState != GST_STATE_PAUSED) && (currentState != GST_STATE_PLAYING))
113 : {
114 1 : GST_ERROR_OBJECT(m_sink, "Sink cannot post a EOS message in state '%s', posting an error instead",
115 : gst_element_state_get_name(currentState));
116 :
117 1 : const char *errMessage = "Rialto sinks received EOS in non-playing state";
118 1 : GError *gError{g_error_new_literal(GST_STREAM_ERROR, 0, errMessage)};
119 1 : gst_element_post_message(m_sink, gst_message_new_error(GST_OBJECT_CAST(m_sink), gError, errMessage));
120 1 : g_error_free(gError);
121 : }
122 : else
123 : {
124 2 : std::unique_lock lock{m_sinkMutex};
125 2 : if (!m_isSinkFlushOngoing && !m_isServerFlushOngoing)
126 : {
127 1 : gst_element_post_message(m_sink, gst_message_new_eos(GST_OBJECT_CAST(m_sink)));
128 : }
129 : else
130 : {
131 1 : GST_WARNING_OBJECT(m_sink, "Skip sending eos message - flush is ongoing...");
132 : }
133 2 : }
134 3 : }
135 :
136 4 : void PullModePlaybackDelegate::handleFlushCompleted()
137 : {
138 4 : GST_INFO_OBJECT(m_sink, "Flush completed");
139 4 : std::unique_lock<std::mutex> lock(m_sinkMutex);
140 4 : m_isServerFlushOngoing = false;
141 : }
142 :
143 39 : void PullModePlaybackDelegate::handleStateChanged(firebolt::rialto::PlaybackState state)
144 : {
145 39 : GstState current = GST_STATE(m_sink);
146 39 : GstState next = GST_STATE_NEXT(m_sink);
147 39 : GstState pending = GST_STATE_PENDING(m_sink);
148 39 : GstState postNext = next == pending ? GST_STATE_VOID_PENDING : pending;
149 :
150 39 : GST_DEBUG_OBJECT(m_sink,
151 : "Received server's state change to %u. Sink's states are: current state: %s next state: %s "
152 : "pending state: %s, last return state %s",
153 : static_cast<uint32_t>(state), gst_element_state_get_name(current),
154 : gst_element_state_get_name(next), gst_element_state_get_name(pending),
155 : gst_element_state_change_return_get_name(GST_STATE_RETURN(m_sink)));
156 :
157 39 : if (m_isStateCommitNeeded)
158 : {
159 39 : if ((state == firebolt::rialto::PlaybackState::PAUSED && next == GST_STATE_PAUSED) ||
160 7 : (state == firebolt::rialto::PlaybackState::PLAYING && next == GST_STATE_PLAYING))
161 : {
162 38 : GST_STATE(m_sink) = next;
163 38 : GST_STATE_NEXT(m_sink) = postNext;
164 38 : GST_STATE_PENDING(m_sink) = GST_STATE_VOID_PENDING;
165 38 : GST_STATE_RETURN(m_sink) = GST_STATE_CHANGE_SUCCESS;
166 :
167 38 : GST_INFO_OBJECT(m_sink, "Async state transition to state %s done", gst_element_state_get_name(next));
168 :
169 38 : gst_element_post_message(m_sink,
170 38 : gst_message_new_state_changed(GST_OBJECT_CAST(m_sink), current, next, pending));
171 38 : postAsyncDone();
172 : }
173 : /* Immediately transition to PLAYING when prerolled and PLAY is requested */
174 1 : else if (state == firebolt::rialto::PlaybackState::PAUSED && current == GST_STATE_PAUSED &&
175 : next == GST_STATE_PLAYING)
176 : {
177 1 : GST_INFO_OBJECT(m_sink, "Async state transition to PAUSED done. Transitioning to PLAYING");
178 1 : changeState(GST_STATE_CHANGE_PAUSED_TO_PLAYING);
179 : }
180 : }
181 39 : }
182 :
183 873 : GstStateChangeReturn PullModePlaybackDelegate::changeState(GstStateChange transition)
184 : {
185 873 : GstState current_state = GST_STATE_TRANSITION_CURRENT(transition);
186 873 : GstState next_state = GST_STATE_TRANSITION_NEXT(transition);
187 873 : GST_INFO_OBJECT(m_sink, "State change: (%s) -> (%s)", gst_element_state_get_name(current_state),
188 : gst_element_state_get_name(next_state));
189 :
190 873 : GstStateChangeReturn status = GST_STATE_CHANGE_SUCCESS;
191 873 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
192 :
193 873 : switch (transition)
194 : {
195 276 : case GST_STATE_CHANGE_NULL_TO_READY:
196 276 : if (!m_sinkPad)
197 : {
198 0 : GST_ERROR_OBJECT(m_sink, "Cannot start, because there's no sink pad");
199 0 : return GST_STATE_CHANGE_FAILURE;
200 : }
201 276 : if (!m_rialtoControlClient->waitForRunning())
202 : {
203 0 : GST_ERROR_OBJECT(m_sink, "Control: Rialto client cannot reach running state");
204 0 : return GST_STATE_CHANGE_FAILURE;
205 : }
206 276 : GST_INFO_OBJECT(m_sink, "Control: Rialto client reached running state");
207 276 : break;
208 151 : case GST_STATE_CHANGE_READY_TO_PAUSED:
209 : {
210 151 : if (!client)
211 : {
212 0 : GST_ERROR_OBJECT(m_sink, "Cannot get the media player client object");
213 0 : return GST_STATE_CHANGE_FAILURE;
214 : }
215 :
216 151 : m_isSinkFlushOngoing = false;
217 :
218 151 : StateChangeResult result = client->pause(m_sourceId);
219 151 : if (result == StateChangeResult::SUCCESS_ASYNC || result == StateChangeResult::NOT_ATTACHED)
220 : {
221 : // NOT_ATTACHED is not a problem here, because source will be attached later when GST_EVENT_CAPS is received
222 151 : if (result == StateChangeResult::NOT_ATTACHED)
223 : {
224 151 : postAsyncStart();
225 : }
226 151 : status = GST_STATE_CHANGE_ASYNC;
227 : }
228 :
229 151 : break;
230 : }
231 8 : case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
232 : {
233 8 : if (!client)
234 : {
235 0 : GST_ERROR_OBJECT(m_sink, "Cannot get the media player client object");
236 0 : return GST_STATE_CHANGE_FAILURE;
237 : }
238 :
239 8 : StateChangeResult result = client->play(m_sourceId);
240 8 : if (result == StateChangeResult::SUCCESS_ASYNC)
241 : {
242 8 : status = GST_STATE_CHANGE_ASYNC;
243 : }
244 0 : else if (result == StateChangeResult::NOT_ATTACHED)
245 : {
246 0 : GST_ERROR_OBJECT(m_sink, "Failed to change state to playing");
247 0 : return GST_STATE_CHANGE_FAILURE;
248 : }
249 :
250 8 : break;
251 : }
252 7 : case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
253 : {
254 7 : if (!client)
255 : {
256 0 : GST_ERROR_OBJECT(m_sink, "Cannot get the media player client object");
257 0 : return GST_STATE_CHANGE_FAILURE;
258 : }
259 :
260 7 : StateChangeResult result = client->pause(m_sourceId);
261 7 : if (result == StateChangeResult::SUCCESS_ASYNC)
262 : {
263 7 : status = GST_STATE_CHANGE_ASYNC;
264 : }
265 0 : else if (result == StateChangeResult::NOT_ATTACHED)
266 : {
267 0 : GST_ERROR_OBJECT(m_sink, "Failed to change state to paused");
268 0 : return GST_STATE_CHANGE_FAILURE;
269 : }
270 :
271 7 : break;
272 : }
273 151 : case GST_STATE_CHANGE_PAUSED_TO_READY:
274 151 : if (!client)
275 : {
276 0 : GST_ERROR_OBJECT(m_sink, "Cannot get the media player client object");
277 0 : return GST_STATE_CHANGE_FAILURE;
278 : }
279 :
280 151 : if (m_isStateCommitNeeded)
281 : {
282 130 : GST_DEBUG_OBJECT(m_sink, "Sending async_done in PAUSED->READY transition");
283 130 : postAsyncDone();
284 : }
285 :
286 151 : client->removeSource(m_sourceId);
287 : {
288 151 : std::lock_guard<std::mutex> lock(m_sinkMutex);
289 151 : clearBuffersUnlocked();
290 151 : m_sourceAttached = false;
291 : }
292 151 : break;
293 276 : case GST_STATE_CHANGE_READY_TO_NULL:
294 : // Playback will be stopped once all sources are finished and ref count
295 : // of the media pipeline object reaches 0
296 276 : m_mediaPlayerManager.releaseMediaPlayerClient();
297 276 : m_rialtoControlClient->removeControlBackend();
298 276 : break;
299 4 : default:
300 4 : break;
301 : }
302 :
303 873 : return status;
304 : }
305 :
306 2 : void PullModePlaybackDelegate::handleError(const std::string &message, gint code)
307 : {
308 2 : GError *gError{g_error_new_literal(GST_STREAM_ERROR, code, message.c_str())};
309 4 : gst_element_post_message(GST_ELEMENT_CAST(m_sink),
310 2 : gst_message_new_error(GST_OBJECT_CAST(m_sink), gError, message.c_str()));
311 2 : g_error_free(gError);
312 : }
313 :
314 308 : void PullModePlaybackDelegate::postAsyncStart()
315 : {
316 308 : m_isStateCommitNeeded = true;
317 308 : gst_element_post_message(GST_ELEMENT_CAST(m_sink), gst_message_new_async_start(GST_OBJECT(m_sink)));
318 : }
319 :
320 168 : void PullModePlaybackDelegate::postAsyncDone()
321 : {
322 168 : m_isStateCommitNeeded = false;
323 168 : gst_element_post_message(m_sink, gst_message_new_async_done(GST_OBJECT_CAST(m_sink), GST_CLOCK_TIME_NONE));
324 : }
325 :
326 320 : void PullModePlaybackDelegate::setProperty(const Property &type, const GValue *value)
327 : {
328 320 : switch (type)
329 : {
330 159 : case Property::IsSinglePathStream:
331 : {
332 159 : std::lock_guard<std::mutex> lock(m_sinkMutex);
333 159 : m_isSinglePathStream = g_value_get_boolean(value) != FALSE;
334 159 : break;
335 : }
336 159 : case Property::NumberOfStreams:
337 : {
338 159 : std::lock_guard<std::mutex> lock(m_sinkMutex);
339 159 : m_numOfStreams = g_value_get_int(value);
340 159 : break;
341 : }
342 1 : case Property::HasDrm:
343 : {
344 1 : std::lock_guard<std::mutex> lock(m_sinkMutex);
345 1 : m_hasDrm = g_value_get_boolean(value) != FALSE;
346 1 : break;
347 : }
348 1 : case Property::EnableLastSample:
349 : {
350 1 : std::lock_guard<std::mutex> lock(m_sinkMutex);
351 1 : m_enableLastSample = g_value_get_boolean(value) != FALSE;
352 1 : if (!m_enableLastSample)
353 : {
354 0 : if (m_lastBuffer)
355 : {
356 0 : gst_buffer_unref(m_lastBuffer);
357 0 : m_lastBuffer = nullptr;
358 : }
359 : }
360 1 : break;
361 : }
362 0 : default:
363 : {
364 0 : break;
365 : }
366 : }
367 320 : }
368 :
369 11 : void PullModePlaybackDelegate::getProperty(const Property &type, GValue *value)
370 : {
371 11 : switch (type)
372 : {
373 1 : case Property::IsSinglePathStream:
374 : {
375 1 : std::lock_guard<std::mutex> lock(m_sinkMutex);
376 1 : g_value_set_boolean(value, m_isSinglePathStream ? TRUE : FALSE);
377 1 : break;
378 : }
379 1 : case Property::NumberOfStreams:
380 : {
381 1 : std::lock_guard<std::mutex> lock(m_sinkMutex);
382 1 : g_value_set_int(value, m_numOfStreams);
383 1 : break;
384 : }
385 1 : case Property::HasDrm:
386 : {
387 1 : std::lock_guard<std::mutex> lock(m_sinkMutex);
388 1 : g_value_set_boolean(value, m_hasDrm ? TRUE : FALSE);
389 1 : break;
390 : }
391 2 : case Property::Stats:
392 : {
393 2 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
394 2 : if (!client)
395 : {
396 1 : GST_ERROR_OBJECT(m_sink, "Could not get the media player client");
397 1 : break;
398 : }
399 :
400 1 : guint64 totalVideoFrames{0};
401 1 : guint64 droppedVideoFrames{0};
402 1 : if (client->getStats(m_sourceId, totalVideoFrames, droppedVideoFrames))
403 : {
404 1 : GstStructure *stats{gst_structure_new("stats", "rendered", G_TYPE_UINT64, totalVideoFrames, "dropped",
405 : G_TYPE_UINT64, droppedVideoFrames, nullptr)};
406 1 : g_value_set_pointer(value, stats);
407 : }
408 : else
409 : {
410 0 : GST_ERROR_OBJECT(m_sink, "No stats returned from client");
411 : }
412 2 : }
413 : case Property::EnableLastSample:
414 : {
415 3 : std::lock_guard<std::mutex> lock(m_sinkMutex);
416 3 : g_value_set_boolean(value, m_enableLastSample ? TRUE : FALSE);
417 3 : break;
418 : }
419 4 : case Property::LastSample:
420 : {
421 : // Mutex inside getLastSample function
422 4 : gst_value_take_sample(value, getLastSample());
423 4 : break;
424 : }
425 0 : default:
426 : {
427 0 : break;
428 : }
429 : }
430 11 : }
431 :
432 25 : std::optional<gboolean> PullModePlaybackDelegate::handleQuery(GstQuery *query) const
433 : {
434 25 : GST_DEBUG_OBJECT(m_sink, "handling query '%s'", GST_QUERY_TYPE_NAME(query));
435 25 : switch (GST_QUERY_TYPE(query))
436 : {
437 1 : case GST_QUERY_SEEKING:
438 : {
439 : GstFormat fmt;
440 1 : gst_query_parse_seeking(query, &fmt, NULL, NULL, NULL);
441 1 : gst_query_set_seeking(query, fmt, FALSE, 0, -1);
442 1 : return TRUE;
443 : }
444 5 : case GST_QUERY_POSITION:
445 : {
446 5 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
447 5 : if (!client)
448 : {
449 1 : return FALSE;
450 : }
451 :
452 : GstFormat fmt;
453 4 : gst_query_parse_position(query, &fmt, NULL);
454 4 : switch (fmt)
455 : {
456 3 : case GST_FORMAT_TIME:
457 : {
458 3 : gint64 position = client->getPosition(m_sourceId);
459 3 : GST_DEBUG_OBJECT(m_sink, "Queried position is %" GST_TIME_FORMAT, GST_TIME_ARGS(position));
460 3 : if (position < 0)
461 : {
462 2 : return FALSE;
463 : }
464 :
465 1 : gst_query_set_position(query, fmt, position);
466 1 : break;
467 : }
468 1 : default:
469 1 : break;
470 : }
471 2 : return TRUE;
472 5 : }
473 2 : case GST_QUERY_SEGMENT:
474 : {
475 2 : std::lock_guard<std::mutex> lock(m_sinkMutex);
476 2 : GstFormat format{m_lastSegment.format};
477 2 : gint64 start{static_cast<gint64>(gst_segment_to_stream_time(&m_lastSegment, format, m_lastSegment.start))};
478 2 : gint64 stop{0};
479 2 : if (m_lastSegment.stop == GST_CLOCK_TIME_NONE)
480 : {
481 2 : stop = m_lastSegment.duration;
482 : }
483 : else
484 : {
485 0 : stop = gst_segment_to_stream_time(&m_lastSegment, format, m_lastSegment.stop);
486 : }
487 2 : gst_query_set_segment(query, m_lastSegment.rate, format, start, stop);
488 2 : return TRUE;
489 : }
490 3 : case GST_QUERY_DURATION:
491 : {
492 3 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
493 3 : if (!client)
494 : {
495 1 : return FALSE;
496 : }
497 : GstFormat fmt;
498 2 : int64_t duration{-1};
499 2 : gst_query_parse_duration(query, &fmt, NULL);
500 2 : if (GST_FORMAT_TIME != fmt || !client->getDuration(duration))
501 : {
502 1 : return FALSE;
503 : }
504 1 : gst_query_set_duration(query, fmt, duration);
505 1 : return TRUE;
506 3 : }
507 14 : default:
508 14 : break;
509 : }
510 14 : return std::nullopt;
511 : }
512 :
513 29 : gboolean PullModePlaybackDelegate::handleSendEvent(GstEvent *event)
514 : {
515 29 : GST_DEBUG_OBJECT(m_sink, "handling event '%s'", GST_EVENT_TYPE_NAME(event));
516 29 : bool shouldForwardUpstream = GST_EVENT_IS_UPSTREAM(event);
517 :
518 29 : switch (GST_EVENT_TYPE(event))
519 : {
520 13 : case GST_EVENT_SEEK:
521 : {
522 13 : gdouble rate{1.0};
523 13 : GstFormat seekFormat{GST_FORMAT_UNDEFINED};
524 13 : GstSeekFlags flags{GST_SEEK_FLAG_NONE};
525 13 : GstSeekType startType{GST_SEEK_TYPE_NONE}, stopType{GST_SEEK_TYPE_NONE};
526 13 : gint64 start{0}, stop{0};
527 13 : if (event)
528 : {
529 13 : gst_event_parse_seek(event, &rate, &seekFormat, &flags, &startType, &start, &stopType, &stop);
530 :
531 13 : if (flags & GST_SEEK_FLAG_FLUSH)
532 : {
533 9 : if (seekFormat == GST_FORMAT_TIME && startType == GST_SEEK_TYPE_END)
534 : {
535 1 : GST_ERROR_OBJECT(m_sink, "GST_SEEK_TYPE_END seek is not supported");
536 1 : gst_event_unref(event);
537 5 : return FALSE;
538 : }
539 : // Update last segment
540 8 : if (seekFormat == GST_FORMAT_TIME)
541 : {
542 7 : gboolean update{FALSE};
543 7 : std::lock_guard<std::mutex> lock(m_sinkMutex);
544 7 : gst_segment_do_seek(&m_lastSegment, rate, seekFormat, flags, startType, start, stopType, stop,
545 : &update);
546 : }
547 : }
548 : #if GST_CHECK_VERSION(1, 18, 0)
549 4 : else if (flags & GST_SEEK_FLAG_INSTANT_RATE_CHANGE)
550 : {
551 2 : gdouble rateMultiplier = rate / m_lastSegment.rate;
552 2 : GstEvent *rateChangeEvent = gst_event_new_instant_rate_change(rateMultiplier, (GstSegmentFlags)flags);
553 2 : gst_event_set_seqnum(rateChangeEvent, gst_event_get_seqnum(event));
554 2 : gst_event_unref(event);
555 2 : if (gst_pad_send_event(m_sinkPad, rateChangeEvent) != TRUE)
556 : {
557 1 : GST_ERROR_OBJECT(m_sink, "Sending instant rate change failed.");
558 1 : return FALSE;
559 : }
560 1 : return TRUE;
561 : }
562 : #endif
563 : else
564 : {
565 2 : GST_WARNING_OBJECT(m_sink, "Seek with flags 0x%X is not supported", flags);
566 2 : gst_event_unref(event);
567 2 : return FALSE;
568 : }
569 : }
570 8 : break;
571 : }
572 : #if GST_CHECK_VERSION(1, 18, 0)
573 2 : case GST_EVENT_INSTANT_RATE_SYNC_TIME:
574 : {
575 2 : double rate{0.0};
576 2 : GstClockTime runningTime{GST_CLOCK_TIME_NONE}, upstreamRunningTime{GST_CLOCK_TIME_NONE};
577 2 : guint32 seqnum = gst_event_get_seqnum(event);
578 2 : gst_event_parse_instant_rate_sync_time(event, &rate, &runningTime, &upstreamRunningTime);
579 :
580 2 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
581 2 : if ((client) && (m_mediaPlayerManager.hasControl()))
582 : {
583 2 : GST_DEBUG_OBJECT(m_sink, "Instant playback rate change: %.2f", rate);
584 2 : m_currentInstantRateChangeSeqnum = seqnum;
585 2 : client->setPlaybackRate(rate);
586 : }
587 2 : break;
588 : }
589 : #endif
590 14 : default:
591 14 : break;
592 : }
593 :
594 24 : if (shouldForwardUpstream)
595 : {
596 24 : bool result = gst_pad_push_event(m_sinkPad, event);
597 24 : if (!result)
598 : {
599 10 : GST_DEBUG_OBJECT(m_sink, "forwarding upstream event '%s' failed", GST_EVENT_TYPE_NAME(event));
600 : }
601 :
602 24 : return result;
603 : }
604 :
605 0 : gst_event_unref(event);
606 0 : return TRUE;
607 : }
608 :
609 202 : gboolean PullModePlaybackDelegate::handleEvent(GstPad *pad, GstObject *parent, GstEvent *event)
610 : {
611 202 : GST_DEBUG_OBJECT(m_sink, "handling event %" GST_PTR_FORMAT, event);
612 202 : switch (GST_EVENT_TYPE(event))
613 : {
614 7 : case GST_EVENT_SEGMENT:
615 : {
616 7 : copySegment(event);
617 7 : setSegment();
618 7 : break;
619 : }
620 3 : case GST_EVENT_EOS:
621 : {
622 3 : std::lock_guard<std::mutex> lock(m_sinkMutex);
623 3 : m_isEos = true;
624 3 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
625 3 : if (client)
626 : {
627 0 : client->getFlushAndDataSynchronizer().notifyDataReceived(m_sourceId);
628 : }
629 3 : break;
630 : }
631 150 : case GST_EVENT_CAPS:
632 : {
633 : GstCaps *caps;
634 150 : gst_event_parse_caps(event, &caps);
635 : {
636 150 : std::lock_guard<std::mutex> lock(m_sinkMutex);
637 150 : if (m_caps)
638 : {
639 4 : if (!gst_caps_is_equal(caps, m_caps))
640 : {
641 1 : gst_caps_unref(m_caps);
642 1 : m_caps = gst_caps_copy(caps);
643 : }
644 : }
645 : else
646 : {
647 146 : m_caps = gst_caps_copy(caps);
648 : }
649 150 : }
650 150 : break;
651 : }
652 1 : case GST_EVENT_SINK_MESSAGE:
653 : {
654 1 : GstMessage *message = nullptr;
655 1 : gst_event_parse_sink_message(event, &message);
656 :
657 1 : if (message)
658 : {
659 1 : gst_element_post_message(m_sink, message);
660 : }
661 :
662 1 : break;
663 : }
664 8 : case GST_EVENT_CUSTOM_DOWNSTREAM:
665 : case GST_EVENT_CUSTOM_DOWNSTREAM_OOB:
666 : {
667 8 : if (gst_event_has_name(event, "custom-instant-rate-change"))
668 : {
669 2 : GST_DEBUG_OBJECT(m_sink, "Change rate event received");
670 2 : changePlaybackRate(event);
671 : }
672 8 : break;
673 : }
674 16 : case GST_EVENT_FLUSH_START:
675 : {
676 16 : startFlushing();
677 16 : break;
678 : }
679 7 : case GST_EVENT_FLUSH_STOP:
680 : {
681 7 : gboolean resetTime{FALSE};
682 7 : gst_event_parse_flush_stop(event, &resetTime);
683 :
684 7 : stopFlushing(resetTime);
685 7 : break;
686 : }
687 3 : case GST_EVENT_STREAM_COLLECTION:
688 : {
689 3 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
690 3 : if (!client)
691 : {
692 1 : gst_event_unref(event);
693 1 : return FALSE;
694 : }
695 2 : int32_t videoStreams{0}, audioStreams{0}, textStreams{0};
696 2 : GstStreamCollection *streamCollection{nullptr};
697 2 : gst_event_parse_stream_collection(event, &streamCollection);
698 2 : guint streamsSize = gst_stream_collection_get_size(streamCollection);
699 6 : for (guint i = 0; i < streamsSize; ++i)
700 : {
701 4 : auto *stream = gst_stream_collection_get_stream(streamCollection, i);
702 4 : auto type = gst_stream_get_stream_type(stream);
703 4 : if (type & GST_STREAM_TYPE_AUDIO)
704 : {
705 2 : ++audioStreams;
706 : }
707 2 : else if (type & GST_STREAM_TYPE_VIDEO)
708 : {
709 1 : ++videoStreams;
710 : }
711 1 : else if (type & GST_STREAM_TYPE_TEXT)
712 : {
713 1 : ++textStreams;
714 : }
715 : }
716 2 : gst_object_unref(streamCollection);
717 2 : client->handleStreamCollection(audioStreams, videoStreams, textStreams);
718 2 : client->sendAllSourcesAttachedIfPossible();
719 2 : break;
720 3 : }
721 : #if GST_CHECK_VERSION(1, 18, 0)
722 4 : case GST_EVENT_INSTANT_RATE_CHANGE:
723 : {
724 4 : guint32 seqnum = gst_event_get_seqnum(event);
725 7 : if (m_lastInstantRateChangeSeqnum == seqnum || m_currentInstantRateChangeSeqnum.load() == seqnum)
726 : {
727 : /* Ignore if we already received the instant-rate-sync-time event from the pipeline */
728 2 : GST_DEBUG_OBJECT(m_sink, "Instant rate change event with seqnum %u already handled. Ignoring...", seqnum);
729 2 : break;
730 : }
731 :
732 2 : m_lastInstantRateChangeSeqnum = seqnum;
733 2 : gdouble rate{0.0};
734 2 : GstSegmentFlags flags{GST_SEGMENT_FLAG_NONE};
735 2 : gst_event_parse_instant_rate_change(event, &rate, &flags);
736 2 : GstMessage *msg = gst_message_new_instant_rate_request(GST_OBJECT_CAST(m_sink), rate);
737 2 : gst_message_set_seqnum(msg, seqnum);
738 2 : gst_element_post_message(m_sink, msg);
739 2 : break;
740 : }
741 : #endif
742 3 : default:
743 3 : break;
744 : }
745 :
746 201 : gst_event_unref(event);
747 :
748 201 : return TRUE;
749 : }
750 :
751 7 : void PullModePlaybackDelegate::copySegment(GstEvent *event)
752 : {
753 7 : std::lock_guard<std::mutex> lock(m_sinkMutex);
754 7 : gst_event_copy_segment(event, &m_lastSegment);
755 : }
756 :
757 7 : void PullModePlaybackDelegate::setSegment()
758 : {
759 7 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
760 7 : if (!client)
761 : {
762 1 : GST_ERROR_OBJECT(m_sink, "Could not get the media player client");
763 1 : return;
764 : }
765 6 : const bool kResetTime{m_lastSegment.flags == GST_SEGMENT_FLAG_RESET};
766 6 : int64_t position = static_cast<int64_t>(m_lastSegment.start);
767 6 : client->setSourcePosition(m_sourceId, position, kResetTime, m_lastSegment.applied_rate, m_lastSegment.stop);
768 6 : m_segmentSet = true;
769 7 : }
770 :
771 2 : void PullModePlaybackDelegate::changePlaybackRate(GstEvent *event)
772 : {
773 2 : const GstStructure *structure{gst_event_get_structure(event)};
774 2 : gdouble playbackRate{1.0};
775 2 : if (gst_structure_get_double(structure, "rate", &playbackRate) == TRUE)
776 : {
777 2 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
778 2 : if (client && m_mediaPlayerManager.hasControl())
779 : {
780 1 : GST_DEBUG_OBJECT(m_sink, "Instant playback rate change: %.2f", playbackRate);
781 1 : client->setPlaybackRate(playbackRate);
782 : }
783 2 : }
784 : }
785 :
786 16 : void PullModePlaybackDelegate::startFlushing()
787 : {
788 16 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
789 16 : if (client)
790 : {
791 5 : client->getFlushAndDataSynchronizer().waitIfRequired(m_sourceId);
792 : }
793 16 : std::lock_guard<std::mutex> lock(m_sinkMutex);
794 16 : if (!m_isSinkFlushOngoing)
795 : {
796 16 : GST_INFO_OBJECT(m_sink, "Starting flushing");
797 16 : if (m_isEos)
798 : {
799 2 : GST_DEBUG_OBJECT(m_sink, "Flush will clear EOS state.");
800 2 : m_isEos = false;
801 : }
802 16 : m_isSinkFlushOngoing = true;
803 16 : m_segmentSet = false;
804 16 : clearBuffersUnlocked();
805 : }
806 : }
807 :
808 7 : void PullModePlaybackDelegate::stopFlushing(bool resetTime)
809 : {
810 7 : GST_INFO_OBJECT(m_sink, "Stopping flushing");
811 7 : flushServer(resetTime);
812 7 : std::lock_guard<std::mutex> lock(m_sinkMutex);
813 7 : m_isSinkFlushOngoing = false;
814 :
815 7 : if (resetTime)
816 : {
817 7 : GST_DEBUG_OBJECT(m_sink, "sending reset_time message");
818 7 : gst_element_post_message(m_sink, gst_message_new_reset_time(GST_OBJECT_CAST(m_sink), 0));
819 : }
820 : }
821 :
822 7 : void PullModePlaybackDelegate::flushServer(bool resetTime)
823 : {
824 7 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
825 7 : if (!client)
826 : {
827 1 : GST_ERROR_OBJECT(m_sink, "Could not get the media player client");
828 1 : return;
829 : }
830 :
831 : {
832 6 : std::unique_lock<std::mutex> lock(m_sinkMutex);
833 6 : m_isServerFlushOngoing = true;
834 : }
835 6 : client->flush(m_sourceId, resetTime);
836 7 : }
837 :
838 35 : GstFlowReturn PullModePlaybackDelegate::handleBuffer(GstBuffer *buffer)
839 : {
840 35 : constexpr size_t kMaxInternalBuffersQueueSize = 24;
841 35 : GST_LOG_OBJECT(m_sink, "Handling buffer %p with PTS %" GST_TIME_FORMAT, buffer,
842 : GST_TIME_ARGS(GST_BUFFER_PTS(buffer)));
843 :
844 35 : std::unique_lock<std::mutex> lock(m_sinkMutex);
845 :
846 35 : if (m_samples.size() >= kMaxInternalBuffersQueueSize)
847 : {
848 1 : GST_DEBUG_OBJECT(m_sink, "Waiting for more space in buffers queue\n");
849 1 : m_needDataCondVariable.wait(lock);
850 : }
851 :
852 35 : if (m_isSinkFlushOngoing)
853 : {
854 3 : GST_DEBUG_OBJECT(m_sink, "Discarding buffer which was received during flushing");
855 3 : gst_buffer_unref(buffer);
856 3 : return GST_FLOW_FLUSHING;
857 : }
858 :
859 32 : GstSample *sample = gst_sample_new(buffer, m_caps, &m_lastSegment, nullptr);
860 32 : if (sample)
861 32 : m_samples.push(sample);
862 : else
863 0 : GST_ERROR_OBJECT(m_sink, "Failed to create a sample");
864 :
865 32 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
866 32 : if (client)
867 : {
868 28 : client->getFlushAndDataSynchronizer().notifyDataReceived(m_sourceId);
869 : }
870 :
871 32 : setLastBuffer(buffer);
872 :
873 32 : gst_buffer_unref(buffer);
874 :
875 32 : return GST_FLOW_OK;
876 35 : }
877 :
878 1 : GstRefSample PullModePlaybackDelegate::getFrontSample() const
879 : {
880 1 : std::lock_guard<std::mutex> lock(m_sinkMutex);
881 1 : if (m_isServerFlushOngoing)
882 : {
883 0 : GST_WARNING_OBJECT(m_sink, "Skip pulling buffer - flush is ongoing on server side...");
884 0 : return GstRefSample{};
885 : }
886 1 : if (!m_samples.empty())
887 : {
888 1 : GstSample *sample = m_samples.front();
889 1 : GstBuffer *buffer = gst_sample_get_buffer(sample);
890 1 : GST_LOG_OBJECT(m_sink, "Pulling buffer %p with PTS %" GST_TIME_FORMAT, buffer,
891 : GST_TIME_ARGS(GST_BUFFER_PTS(buffer)));
892 :
893 1 : return GstRefSample{sample};
894 : }
895 :
896 0 : return GstRefSample{};
897 1 : }
898 :
899 1 : void PullModePlaybackDelegate::popSample()
900 : {
901 1 : std::lock_guard<std::mutex> lock(m_sinkMutex);
902 1 : if (!m_samples.empty())
903 : {
904 1 : gst_sample_unref(m_samples.front());
905 1 : m_samples.pop();
906 : }
907 1 : m_needDataCondVariable.notify_all();
908 : }
909 :
910 0 : bool PullModePlaybackDelegate::isEos() const
911 : {
912 0 : std::lock_guard<std::mutex> lock(m_sinkMutex);
913 0 : return m_samples.empty() && m_isEos;
914 : }
915 :
916 2 : bool PullModePlaybackDelegate::isReadyToSendData() const
917 : {
918 2 : std::lock_guard<std::mutex> lock(m_sinkMutex);
919 4 : return m_isEos || m_segmentSet;
920 2 : }
921 :
922 7 : void PullModePlaybackDelegate::lostState()
923 : {
924 7 : m_isStateCommitNeeded = true;
925 7 : gst_element_lost_state(m_sink);
926 : }
927 :
928 155 : bool PullModePlaybackDelegate::attachToMediaClientAndSetStreamsNumber(const uint32_t maxVideoWidth,
929 : const uint32_t maxVideoHeight)
930 : {
931 155 : GstObject *parentObject = getOldestGstBinParent(m_sink);
932 155 : if (!m_mediaPlayerManager.attachMediaPlayerClient(parentObject, maxVideoWidth, maxVideoHeight, isLiveLatencyEnabled()))
933 : {
934 3 : GST_ERROR_OBJECT(m_sink, "Cannot attach the MediaPlayerClient");
935 3 : return false;
936 : }
937 :
938 152 : gchar *parentObjectName = gst_object_get_name(parentObject);
939 152 : GST_INFO_OBJECT(m_sink, "Attached media player client with parent %s(%p)", parentObjectName, parentObject);
940 152 : g_free(parentObjectName);
941 :
942 152 : return setStreamsNumber(parentObject);
943 : }
944 :
945 152 : bool PullModePlaybackDelegate::setStreamsNumber(GstObject *parentObject)
946 : {
947 152 : int32_t videoStreams{-1}, audioStreams{-1}, subtitleStreams{-1};
948 :
949 152 : GstContext *context = gst_element_get_context(m_sink, "streams-info");
950 152 : if (context)
951 : {
952 4 : GST_DEBUG_OBJECT(m_sink, "Getting number of streams from \"streams-info\" context");
953 :
954 4 : guint n_video{0}, n_audio{0}, n_text{0};
955 :
956 4 : const GstStructure *streamsInfoStructure = gst_context_get_structure(context);
957 4 : gst_structure_get_uint(streamsInfoStructure, "video-streams", &n_video);
958 4 : gst_structure_get_uint(streamsInfoStructure, "audio-streams", &n_audio);
959 4 : gst_structure_get_uint(streamsInfoStructure, "text-streams", &n_text);
960 :
961 7 : if (n_video > std::numeric_limits<int32_t>::max() || n_audio > std::numeric_limits<int32_t>::max() ||
962 3 : n_text > std::numeric_limits<int32_t>::max())
963 : {
964 1 : GST_ERROR_OBJECT(m_sink, "Number of streams is too big, video=%u, audio=%u, text=%u", n_video, n_audio,
965 : n_text);
966 1 : gst_context_unref(context);
967 1 : return false;
968 : }
969 :
970 3 : videoStreams = n_video;
971 3 : audioStreams = n_audio;
972 3 : subtitleStreams = n_text;
973 :
974 3 : gst_context_unref(context);
975 : }
976 148 : else if (getNStreamsFromParent(parentObject, videoStreams, audioStreams, subtitleStreams))
977 : {
978 2 : GST_DEBUG_OBJECT(m_sink, "Got number of streams from playbin2 properties");
979 : }
980 : else
981 : {
982 : // The default value of streams is V:1, A:1, S:0
983 : // Changing the default setting via properties is considered as DEPRECATED
984 146 : subtitleStreams = 0;
985 146 : std::lock_guard<std::mutex> lock{m_sinkMutex};
986 146 : if (m_mediaSourceType == firebolt::rialto::MediaSourceType::VIDEO)
987 : {
988 28 : videoStreams = m_numOfStreams;
989 28 : if (m_isSinglePathStream)
990 : {
991 27 : audioStreams = 0;
992 27 : subtitleStreams = 0;
993 : }
994 : }
995 118 : else if (m_mediaSourceType == firebolt::rialto::MediaSourceType::AUDIO)
996 : {
997 108 : audioStreams = m_numOfStreams;
998 108 : if (m_isSinglePathStream)
999 : {
1000 107 : videoStreams = 0;
1001 107 : subtitleStreams = 0;
1002 : }
1003 : }
1004 10 : else if (m_mediaSourceType == firebolt::rialto::MediaSourceType::SUBTITLE)
1005 : {
1006 10 : subtitleStreams = m_numOfStreams;
1007 10 : if (m_isSinglePathStream)
1008 : {
1009 10 : videoStreams = 0;
1010 10 : audioStreams = 0;
1011 : }
1012 : }
1013 146 : }
1014 :
1015 151 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
1016 151 : if (!client)
1017 : {
1018 0 : GST_ERROR_OBJECT(m_sink, "MediaPlayerClient is nullptr");
1019 0 : return false;
1020 : }
1021 :
1022 151 : client->handleStreamCollection(audioStreams, videoStreams, subtitleStreams);
1023 :
1024 151 : return true;
1025 : }
1026 :
1027 155 : bool PullModePlaybackDelegate::isLiveLatencyEnabled() const
1028 : {
1029 155 : GstContext *context = gst_element_get_context(m_sink, "streams-info");
1030 155 : if (context)
1031 : {
1032 4 : GST_DEBUG_OBJECT(m_sink, "Checking if live latency is enabled from \"streams-info\" context");
1033 :
1034 4 : gboolean isEnabled{FALSE};
1035 :
1036 4 : const GstStructure *streamsInfoStructure = gst_context_get_structure(context);
1037 4 : gst_structure_get_boolean(streamsInfoStructure, "enable-live-latency", &isEnabled);
1038 :
1039 4 : gst_context_unref(context);
1040 4 : return isEnabled != FALSE;
1041 : }
1042 151 : return false;
1043 : }
1044 :
1045 4 : GstSample *PullModePlaybackDelegate::getLastSample() const
1046 : {
1047 4 : std::lock_guard<std::mutex> lock(m_sinkMutex);
1048 4 : if (m_enableLastSample && m_lastBuffer)
1049 : {
1050 2 : return gst_sample_new(m_lastBuffer, m_caps, &m_lastSegment, nullptr);
1051 : }
1052 2 : return nullptr;
1053 4 : }
1054 :
1055 475 : void PullModePlaybackDelegate::setLastBuffer(GstBuffer *buffer)
1056 : {
1057 475 : if (m_enableLastSample)
1058 : {
1059 3 : if (m_lastBuffer)
1060 : {
1061 2 : gst_buffer_unref(m_lastBuffer);
1062 : }
1063 3 : if (buffer)
1064 : {
1065 2 : m_lastBuffer = gst_buffer_ref(buffer);
1066 : }
1067 : else
1068 : {
1069 1 : m_lastBuffer = nullptr;
1070 : }
1071 : }
1072 475 : }
|