Line data Source code
1 : /*
2 : * Copyright (C) 2025 Sky UK
3 : *
4 : * This library is free software; you can redistribute it and/or
5 : * modify it under the terms of the GNU Lesser General Public
6 : * License as published by the Free Software Foundation;
7 : * version 2.1 of the License.
8 : *
9 : * This library is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 : * Lesser General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU Lesser General Public
15 : * License along with this library; if not, write to the Free Software
16 : * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 : */
18 :
19 : #include "PullModePlaybackDelegate.h"
20 : #include "ControlBackend.h"
21 : #include "GstreamerCatLog.h"
22 : #include "RialtoGStreamerMSEBaseSink.h"
23 : #include "RialtoGStreamerMSEBaseSinkPrivate.h"
24 :
25 : #define GST_CAT_DEFAULT rialtoGStreamerCat
26 :
27 : namespace
28 : {
29 302 : GstObject *getOldestGstBinParent(GstElement *element)
30 : {
31 302 : GstObject *parent = gst_object_get_parent(GST_OBJECT_CAST(element));
32 302 : GstObject *result = GST_OBJECT_CAST(element);
33 302 : if (parent)
34 : {
35 151 : if (GST_IS_BIN(parent))
36 : {
37 151 : result = getOldestGstBinParent(GST_ELEMENT_CAST(parent));
38 : }
39 151 : gst_object_unref(parent);
40 : }
41 :
42 302 : return result;
43 : }
44 :
45 6 : unsigned getGstPlayFlag(const char *nick)
46 : {
47 6 : GFlagsClass *flagsClass = static_cast<GFlagsClass *>(g_type_class_ref(g_type_from_name("GstPlayFlags")));
48 6 : GFlagsValue *flag = g_flags_get_value_by_nick(flagsClass, nick);
49 6 : return flag ? flag->value : 0;
50 : }
51 :
52 145 : bool getNStreamsFromParent(GstObject *parentObject, gint &n_video, gint &n_audio, gint &n_text)
53 : {
54 145 : if (g_object_class_find_property(G_OBJECT_GET_CLASS(parentObject), "n-video") &&
55 147 : g_object_class_find_property(G_OBJECT_GET_CLASS(parentObject), "n-audio") &&
56 2 : g_object_class_find_property(G_OBJECT_GET_CLASS(parentObject), "n-text"))
57 : {
58 2 : g_object_get(parentObject, "n-video", &n_video, "n-audio", &n_audio, "n-text", &n_text, nullptr);
59 :
60 2 : if (g_object_class_find_property(G_OBJECT_GET_CLASS(parentObject), "flags"))
61 : {
62 2 : guint flags = 0;
63 2 : g_object_get(parentObject, "flags", &flags, nullptr);
64 2 : n_video = (flags & getGstPlayFlag("video")) ? n_video : 0;
65 2 : n_audio = (flags & getGstPlayFlag("audio")) ? n_audio : 0;
66 2 : n_text = (flags & getGstPlayFlag("text")) ? n_text : 0;
67 : }
68 :
69 2 : return true;
70 : }
71 :
72 143 : return false;
73 : }
74 : } // namespace
75 :
76 270 : PullModePlaybackDelegate::PullModePlaybackDelegate(GstElement *sink) : m_sink{sink}
77 : {
78 270 : RialtoMSEBaseSink *baseSink = RIALTO_MSE_BASE_SINK(sink);
79 270 : m_sinkPad = baseSink->priv->m_sinkPad;
80 270 : m_rialtoControlClient = std::make_unique<firebolt::rialto::client::ControlBackend>();
81 270 : gst_segment_init(&m_lastSegment, GST_FORMAT_TIME);
82 : }
83 :
84 270 : PullModePlaybackDelegate::~PullModePlaybackDelegate()
85 : {
86 270 : if (m_caps)
87 143 : gst_caps_unref(m_caps);
88 270 : clearBuffersUnlocked();
89 : }
90 :
91 433 : void PullModePlaybackDelegate::clearBuffersUnlocked()
92 : {
93 433 : m_isSinkFlushOngoing = true;
94 433 : m_needDataCondVariable.notify_all();
95 464 : while (!m_samples.empty())
96 : {
97 31 : GstSample *sample = m_samples.front();
98 31 : m_samples.pop();
99 31 : gst_sample_unref(sample);
100 : }
101 433 : setLastBuffer(nullptr);
102 : }
103 :
104 139 : void PullModePlaybackDelegate::setSourceId(int32_t sourceId)
105 : {
106 139 : m_sourceId = sourceId;
107 : }
108 :
109 3 : void PullModePlaybackDelegate::handleEos()
110 : {
111 3 : GstState currentState = GST_STATE(m_sink);
112 3 : if ((currentState != GST_STATE_PAUSED) && (currentState != GST_STATE_PLAYING))
113 : {
114 1 : GST_ERROR_OBJECT(m_sink, "Sink cannot post a EOS message in state '%s', posting an error instead",
115 : gst_element_state_get_name(currentState));
116 :
117 1 : const char *errMessage = "Rialto sinks received EOS in non-playing state";
118 1 : GError *gError{g_error_new_literal(GST_STREAM_ERROR, 0, errMessage)};
119 1 : gst_element_post_message(m_sink, gst_message_new_error(GST_OBJECT_CAST(m_sink), gError, errMessage));
120 1 : g_error_free(gError);
121 : }
122 : else
123 : {
124 2 : std::unique_lock lock{m_sinkMutex};
125 2 : if (!m_isSinkFlushOngoing && !m_isServerFlushOngoing)
126 : {
127 1 : gst_element_post_message(m_sink, gst_message_new_eos(GST_OBJECT_CAST(m_sink)));
128 : }
129 : else
130 : {
131 1 : GST_WARNING_OBJECT(m_sink, "Skip sending eos message - flush is ongoing...");
132 : }
133 2 : }
134 3 : }
135 :
136 4 : void PullModePlaybackDelegate::handleFlushCompleted()
137 : {
138 4 : GST_INFO_OBJECT(m_sink, "Flush completed");
139 4 : std::unique_lock<std::mutex> lock(m_sinkMutex);
140 4 : m_isServerFlushOngoing = false;
141 : }
142 :
143 39 : void PullModePlaybackDelegate::handleStateChanged(firebolt::rialto::PlaybackState state)
144 : {
145 39 : GstState current = GST_STATE(m_sink);
146 39 : GstState next = GST_STATE_NEXT(m_sink);
147 39 : GstState pending = GST_STATE_PENDING(m_sink);
148 39 : GstState postNext = next == pending ? GST_STATE_VOID_PENDING : pending;
149 :
150 39 : GST_DEBUG_OBJECT(m_sink,
151 : "Received server's state change to %u. Sink's states are: current state: %s next state: %s "
152 : "pending state: %s, last return state %s",
153 : static_cast<uint32_t>(state), gst_element_state_get_name(current),
154 : gst_element_state_get_name(next), gst_element_state_get_name(pending),
155 : gst_element_state_change_return_get_name(GST_STATE_RETURN(m_sink)));
156 :
157 39 : if (m_isStateCommitNeeded)
158 : {
159 39 : if ((state == firebolt::rialto::PlaybackState::PAUSED && next == GST_STATE_PAUSED) ||
160 7 : (state == firebolt::rialto::PlaybackState::PLAYING && next == GST_STATE_PLAYING))
161 : {
162 38 : GST_STATE(m_sink) = next;
163 38 : GST_STATE_NEXT(m_sink) = postNext;
164 38 : GST_STATE_PENDING(m_sink) = GST_STATE_VOID_PENDING;
165 38 : GST_STATE_RETURN(m_sink) = GST_STATE_CHANGE_SUCCESS;
166 :
167 38 : GST_INFO_OBJECT(m_sink, "Async state transition to state %s done", gst_element_state_get_name(next));
168 :
169 38 : gst_element_post_message(m_sink,
170 38 : gst_message_new_state_changed(GST_OBJECT_CAST(m_sink), current, next, pending));
171 38 : postAsyncDone();
172 : }
173 : /* Immediately transition to PLAYING when prerolled and PLAY is requested */
174 1 : else if (state == firebolt::rialto::PlaybackState::PAUSED && current == GST_STATE_PAUSED &&
175 : next == GST_STATE_PLAYING)
176 : {
177 1 : GST_INFO_OBJECT(m_sink, "Async state transition to PAUSED done. Transitioning to PLAYING");
178 1 : changeState(GST_STATE_CHANGE_PAUSED_TO_PLAYING);
179 : }
180 : }
181 39 : }
182 :
183 853 : GstStateChangeReturn PullModePlaybackDelegate::changeState(GstStateChange transition)
184 : {
185 853 : GstState current_state = GST_STATE_TRANSITION_CURRENT(transition);
186 853 : GstState next_state = GST_STATE_TRANSITION_NEXT(transition);
187 853 : GST_INFO_OBJECT(m_sink, "State change: (%s) -> (%s)", gst_element_state_get_name(current_state),
188 : gst_element_state_get_name(next_state));
189 :
190 853 : GstStateChangeReturn status = GST_STATE_CHANGE_SUCCESS;
191 853 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
192 :
193 853 : switch (transition)
194 : {
195 270 : case GST_STATE_CHANGE_NULL_TO_READY:
196 270 : if (!m_sinkPad)
197 : {
198 0 : GST_ERROR_OBJECT(m_sink, "Cannot start, because there's no sink pad");
199 0 : return GST_STATE_CHANGE_FAILURE;
200 : }
201 270 : if (!m_rialtoControlClient->waitForRunning())
202 : {
203 0 : GST_ERROR_OBJECT(m_sink, "Control: Rialto client cannot reach running state");
204 0 : return GST_STATE_CHANGE_FAILURE;
205 : }
206 270 : GST_INFO_OBJECT(m_sink, "Control: Rialto client reached running state");
207 270 : break;
208 147 : case GST_STATE_CHANGE_READY_TO_PAUSED:
209 : {
210 147 : if (!client)
211 : {
212 0 : GST_ERROR_OBJECT(m_sink, "Cannot get the media player client object");
213 0 : return GST_STATE_CHANGE_FAILURE;
214 : }
215 :
216 147 : m_isSinkFlushOngoing = false;
217 :
218 147 : StateChangeResult result = client->pause(m_sourceId);
219 147 : if (result == StateChangeResult::SUCCESS_ASYNC || result == StateChangeResult::NOT_ATTACHED)
220 : {
221 : // NOT_ATTACHED is not a problem here, because source will be attached later when GST_EVENT_CAPS is received
222 147 : if (result == StateChangeResult::NOT_ATTACHED)
223 : {
224 147 : postAsyncStart();
225 : }
226 147 : status = GST_STATE_CHANGE_ASYNC;
227 : }
228 :
229 147 : break;
230 : }
231 8 : case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
232 : {
233 8 : if (!client)
234 : {
235 0 : GST_ERROR_OBJECT(m_sink, "Cannot get the media player client object");
236 0 : return GST_STATE_CHANGE_FAILURE;
237 : }
238 :
239 8 : StateChangeResult result = client->play(m_sourceId);
240 8 : if (result == StateChangeResult::SUCCESS_ASYNC)
241 : {
242 8 : status = GST_STATE_CHANGE_ASYNC;
243 : }
244 0 : else if (result == StateChangeResult::NOT_ATTACHED)
245 : {
246 0 : GST_ERROR_OBJECT(m_sink, "Failed to change state to playing");
247 0 : return GST_STATE_CHANGE_FAILURE;
248 : }
249 :
250 8 : break;
251 : }
252 7 : case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
253 : {
254 7 : if (!client)
255 : {
256 0 : GST_ERROR_OBJECT(m_sink, "Cannot get the media player client object");
257 0 : return GST_STATE_CHANGE_FAILURE;
258 : }
259 :
260 7 : StateChangeResult result = client->pause(m_sourceId);
261 7 : if (result == StateChangeResult::SUCCESS_ASYNC)
262 : {
263 7 : status = GST_STATE_CHANGE_ASYNC;
264 : }
265 0 : else if (result == StateChangeResult::NOT_ATTACHED)
266 : {
267 0 : GST_ERROR_OBJECT(m_sink, "Failed to change state to paused");
268 0 : return GST_STATE_CHANGE_FAILURE;
269 : }
270 :
271 7 : break;
272 : }
273 147 : case GST_STATE_CHANGE_PAUSED_TO_READY:
274 147 : if (!client)
275 : {
276 0 : GST_ERROR_OBJECT(m_sink, "Cannot get the media player client object");
277 0 : return GST_STATE_CHANGE_FAILURE;
278 : }
279 :
280 147 : if (m_isStateCommitNeeded)
281 : {
282 126 : GST_DEBUG_OBJECT(m_sink, "Sending async_done in PAUSED->READY transition");
283 126 : postAsyncDone();
284 : }
285 :
286 147 : client->removeSource(m_sourceId);
287 : {
288 147 : std::lock_guard<std::mutex> lock(m_sinkMutex);
289 147 : clearBuffersUnlocked();
290 147 : m_sourceAttached = false;
291 : }
292 147 : break;
293 270 : case GST_STATE_CHANGE_READY_TO_NULL:
294 : // Playback will be stopped once all sources are finished and ref count
295 : // of the media pipeline object reaches 0
296 270 : m_mediaPlayerManager.releaseMediaPlayerClient();
297 270 : m_rialtoControlClient->removeControlBackend();
298 270 : break;
299 4 : default:
300 4 : break;
301 : }
302 :
303 853 : return status;
304 : }
305 :
306 2 : void PullModePlaybackDelegate::handleError(const std::string &message, gint code)
307 : {
308 2 : GError *gError{g_error_new_literal(GST_STREAM_ERROR, code, message.c_str())};
309 4 : gst_element_post_message(GST_ELEMENT_CAST(m_sink),
310 2 : gst_message_new_error(GST_OBJECT_CAST(m_sink), gError, message.c_str()));
311 2 : g_error_free(gError);
312 : }
313 :
314 301 : void PullModePlaybackDelegate::postAsyncStart()
315 : {
316 301 : m_isStateCommitNeeded = true;
317 301 : gst_element_post_message(GST_ELEMENT_CAST(m_sink), gst_message_new_async_start(GST_OBJECT(m_sink)));
318 : }
319 :
320 164 : void PullModePlaybackDelegate::postAsyncDone()
321 : {
322 164 : m_isStateCommitNeeded = false;
323 164 : gst_element_post_message(m_sink, gst_message_new_async_done(GST_OBJECT_CAST(m_sink), GST_CLOCK_TIME_NONE));
324 : }
325 :
326 314 : void PullModePlaybackDelegate::setProperty(const Property &type, const GValue *value)
327 : {
328 314 : switch (type)
329 : {
330 156 : case Property::IsSinglePathStream:
331 : {
332 156 : std::lock_guard<std::mutex> lock(m_sinkMutex);
333 156 : m_isSinglePathStream = g_value_get_boolean(value) != FALSE;
334 156 : break;
335 : }
336 156 : case Property::NumberOfStreams:
337 : {
338 156 : std::lock_guard<std::mutex> lock(m_sinkMutex);
339 156 : m_numOfStreams = g_value_get_int(value);
340 156 : break;
341 : }
342 1 : case Property::HasDrm:
343 : {
344 1 : std::lock_guard<std::mutex> lock(m_sinkMutex);
345 1 : m_hasDrm = g_value_get_boolean(value) != FALSE;
346 1 : break;
347 : }
348 1 : case Property::EnableLastSample:
349 : {
350 1 : std::lock_guard<std::mutex> lock(m_sinkMutex);
351 1 : m_enableLastSample = g_value_get_boolean(value) != FALSE;
352 1 : if (!m_enableLastSample)
353 : {
354 0 : if (m_lastBuffer)
355 : {
356 0 : gst_buffer_unref(m_lastBuffer);
357 0 : m_lastBuffer = nullptr;
358 : }
359 : }
360 1 : break;
361 : }
362 0 : default:
363 : {
364 0 : break;
365 : }
366 : }
367 314 : }
368 :
369 11 : void PullModePlaybackDelegate::getProperty(const Property &type, GValue *value)
370 : {
371 11 : switch (type)
372 : {
373 1 : case Property::IsSinglePathStream:
374 : {
375 1 : std::lock_guard<std::mutex> lock(m_sinkMutex);
376 1 : g_value_set_boolean(value, m_isSinglePathStream ? TRUE : FALSE);
377 1 : break;
378 : }
379 1 : case Property::NumberOfStreams:
380 : {
381 1 : std::lock_guard<std::mutex> lock(m_sinkMutex);
382 1 : g_value_set_int(value, m_numOfStreams);
383 1 : break;
384 : }
385 1 : case Property::HasDrm:
386 : {
387 1 : std::lock_guard<std::mutex> lock(m_sinkMutex);
388 1 : g_value_set_boolean(value, m_hasDrm ? TRUE : FALSE);
389 1 : break;
390 : }
391 2 : case Property::Stats:
392 : {
393 2 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
394 2 : if (!client)
395 : {
396 1 : GST_ERROR_OBJECT(m_sink, "Could not get the media player client");
397 1 : break;
398 : }
399 :
400 1 : guint64 totalVideoFrames{0};
401 1 : guint64 droppedVideoFrames{0};
402 1 : if (client->getStats(m_sourceId, totalVideoFrames, droppedVideoFrames))
403 : {
404 1 : GstStructure *stats{gst_structure_new("stats", "rendered", G_TYPE_UINT64, totalVideoFrames, "dropped",
405 : G_TYPE_UINT64, droppedVideoFrames, nullptr)};
406 1 : g_value_set_pointer(value, stats);
407 : }
408 : else
409 : {
410 0 : GST_ERROR_OBJECT(m_sink, "No stats returned from client");
411 : }
412 2 : }
413 : case Property::EnableLastSample:
414 : {
415 3 : std::lock_guard<std::mutex> lock(m_sinkMutex);
416 3 : g_value_set_boolean(value, m_enableLastSample ? TRUE : FALSE);
417 3 : break;
418 : }
419 4 : case Property::LastSample:
420 : {
421 : // Mutex inside getLastSample function
422 4 : gst_value_take_sample(value, getLastSample());
423 4 : break;
424 : }
425 0 : default:
426 : {
427 0 : break;
428 : }
429 : }
430 11 : }
431 :
432 22 : std::optional<gboolean> PullModePlaybackDelegate::handleQuery(GstQuery *query) const
433 : {
434 22 : GST_DEBUG_OBJECT(m_sink, "handling query '%s'", GST_QUERY_TYPE_NAME(query));
435 22 : switch (GST_QUERY_TYPE(query))
436 : {
437 1 : case GST_QUERY_SEEKING:
438 : {
439 : GstFormat fmt;
440 1 : gst_query_parse_seeking(query, &fmt, NULL, NULL, NULL);
441 1 : gst_query_set_seeking(query, fmt, FALSE, 0, -1);
442 1 : return TRUE;
443 : }
444 5 : case GST_QUERY_POSITION:
445 : {
446 5 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
447 5 : if (!client)
448 : {
449 1 : return FALSE;
450 : }
451 :
452 : GstFormat fmt;
453 4 : gst_query_parse_position(query, &fmt, NULL);
454 4 : switch (fmt)
455 : {
456 3 : case GST_FORMAT_TIME:
457 : {
458 3 : gint64 position = client->getPosition(m_sourceId);
459 3 : GST_DEBUG_OBJECT(m_sink, "Queried position is %" GST_TIME_FORMAT, GST_TIME_ARGS(position));
460 3 : if (position < 0)
461 : {
462 2 : return FALSE;
463 : }
464 :
465 1 : gst_query_set_position(query, fmt, position);
466 1 : break;
467 : }
468 1 : default:
469 1 : break;
470 : }
471 2 : return TRUE;
472 5 : }
473 2 : case GST_QUERY_SEGMENT:
474 : {
475 2 : std::lock_guard<std::mutex> lock(m_sinkMutex);
476 2 : GstFormat format{m_lastSegment.format};
477 2 : gint64 start{static_cast<gint64>(gst_segment_to_stream_time(&m_lastSegment, format, m_lastSegment.start))};
478 2 : gint64 stop{0};
479 2 : if (m_lastSegment.stop == GST_CLOCK_TIME_NONE)
480 : {
481 2 : stop = m_lastSegment.duration;
482 : }
483 : else
484 : {
485 0 : stop = gst_segment_to_stream_time(&m_lastSegment, format, m_lastSegment.stop);
486 : }
487 2 : gst_query_set_segment(query, m_lastSegment.rate, format, start, stop);
488 2 : return TRUE;
489 : }
490 14 : default:
491 14 : break;
492 : }
493 14 : return std::nullopt;
494 : }
495 :
496 29 : gboolean PullModePlaybackDelegate::handleSendEvent(GstEvent *event)
497 : {
498 29 : GST_DEBUG_OBJECT(m_sink, "handling event '%s'", GST_EVENT_TYPE_NAME(event));
499 29 : bool shouldForwardUpstream = GST_EVENT_IS_UPSTREAM(event);
500 :
501 29 : switch (GST_EVENT_TYPE(event))
502 : {
503 13 : case GST_EVENT_SEEK:
504 : {
505 13 : gdouble rate{1.0};
506 13 : GstFormat seekFormat{GST_FORMAT_UNDEFINED};
507 13 : GstSeekFlags flags{GST_SEEK_FLAG_NONE};
508 13 : GstSeekType startType{GST_SEEK_TYPE_NONE}, stopType{GST_SEEK_TYPE_NONE};
509 13 : gint64 start{0}, stop{0};
510 13 : if (event)
511 : {
512 13 : gst_event_parse_seek(event, &rate, &seekFormat, &flags, &startType, &start, &stopType, &stop);
513 :
514 13 : if (flags & GST_SEEK_FLAG_FLUSH)
515 : {
516 9 : if (seekFormat == GST_FORMAT_TIME && startType == GST_SEEK_TYPE_END)
517 : {
518 1 : GST_ERROR_OBJECT(m_sink, "GST_SEEK_TYPE_END seek is not supported");
519 1 : gst_event_unref(event);
520 5 : return FALSE;
521 : }
522 : // Update last segment
523 8 : if (seekFormat == GST_FORMAT_TIME)
524 : {
525 7 : gboolean update{FALSE};
526 7 : std::lock_guard<std::mutex> lock(m_sinkMutex);
527 7 : gst_segment_do_seek(&m_lastSegment, rate, seekFormat, flags, startType, start, stopType, stop,
528 : &update);
529 : }
530 : }
531 : #if GST_CHECK_VERSION(1, 18, 0)
532 4 : else if (flags & GST_SEEK_FLAG_INSTANT_RATE_CHANGE)
533 : {
534 2 : gdouble rateMultiplier = rate / m_lastSegment.rate;
535 2 : GstEvent *rateChangeEvent = gst_event_new_instant_rate_change(rateMultiplier, (GstSegmentFlags)flags);
536 2 : gst_event_set_seqnum(rateChangeEvent, gst_event_get_seqnum(event));
537 2 : gst_event_unref(event);
538 2 : if (gst_pad_send_event(m_sinkPad, rateChangeEvent) != TRUE)
539 : {
540 1 : GST_ERROR_OBJECT(m_sink, "Sending instant rate change failed.");
541 1 : return FALSE;
542 : }
543 1 : return TRUE;
544 : }
545 : #endif
546 : else
547 : {
548 2 : GST_WARNING_OBJECT(m_sink, "Seek with flags 0x%X is not supported", flags);
549 2 : gst_event_unref(event);
550 2 : return FALSE;
551 : }
552 : }
553 8 : break;
554 : }
555 : #if GST_CHECK_VERSION(1, 18, 0)
556 2 : case GST_EVENT_INSTANT_RATE_SYNC_TIME:
557 : {
558 2 : double rate{0.0};
559 2 : GstClockTime runningTime{GST_CLOCK_TIME_NONE}, upstreamRunningTime{GST_CLOCK_TIME_NONE};
560 2 : guint32 seqnum = gst_event_get_seqnum(event);
561 2 : gst_event_parse_instant_rate_sync_time(event, &rate, &runningTime, &upstreamRunningTime);
562 :
563 2 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
564 2 : if ((client) && (m_mediaPlayerManager.hasControl()))
565 : {
566 2 : GST_DEBUG_OBJECT(m_sink, "Instant playback rate change: %.2f", rate);
567 2 : m_currentInstantRateChangeSeqnum = seqnum;
568 2 : client->setPlaybackRate(rate);
569 : }
570 2 : break;
571 : }
572 : #endif
573 14 : default:
574 14 : break;
575 : }
576 :
577 24 : if (shouldForwardUpstream)
578 : {
579 24 : bool result = gst_pad_push_event(m_sinkPad, event);
580 24 : if (!result)
581 : {
582 10 : GST_DEBUG_OBJECT(m_sink, "forwarding upstream event '%s' failed", GST_EVENT_TYPE_NAME(event));
583 : }
584 :
585 24 : return result;
586 : }
587 :
588 0 : gst_event_unref(event);
589 0 : return TRUE;
590 : }
591 :
592 199 : gboolean PullModePlaybackDelegate::handleEvent(GstPad *pad, GstObject *parent, GstEvent *event)
593 : {
594 199 : GST_DEBUG_OBJECT(m_sink, "handling event %" GST_PTR_FORMAT, event);
595 199 : switch (GST_EVENT_TYPE(event))
596 : {
597 7 : case GST_EVENT_SEGMENT:
598 : {
599 7 : copySegment(event);
600 7 : setSegment();
601 7 : break;
602 : }
603 3 : case GST_EVENT_EOS:
604 : {
605 3 : std::lock_guard<std::mutex> lock(m_sinkMutex);
606 3 : m_isEos = true;
607 3 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
608 3 : if (client)
609 : {
610 0 : client->getFlushAndDataSynchronizer().notifyDataReceived(m_sourceId);
611 : }
612 3 : break;
613 : }
614 147 : case GST_EVENT_CAPS:
615 : {
616 : GstCaps *caps;
617 147 : gst_event_parse_caps(event, &caps);
618 : {
619 147 : std::lock_guard<std::mutex> lock(m_sinkMutex);
620 147 : if (m_caps)
621 : {
622 4 : if (!gst_caps_is_equal(caps, m_caps))
623 : {
624 1 : gst_caps_unref(m_caps);
625 1 : m_caps = gst_caps_copy(caps);
626 : }
627 : }
628 : else
629 : {
630 143 : m_caps = gst_caps_copy(caps);
631 : }
632 147 : }
633 147 : break;
634 : }
635 1 : case GST_EVENT_SINK_MESSAGE:
636 : {
637 1 : GstMessage *message = nullptr;
638 1 : gst_event_parse_sink_message(event, &message);
639 :
640 1 : if (message)
641 : {
642 1 : gst_element_post_message(m_sink, message);
643 : }
644 :
645 1 : break;
646 : }
647 8 : case GST_EVENT_CUSTOM_DOWNSTREAM:
648 : case GST_EVENT_CUSTOM_DOWNSTREAM_OOB:
649 : {
650 8 : if (gst_event_has_name(event, "custom-instant-rate-change"))
651 : {
652 2 : GST_DEBUG_OBJECT(m_sink, "Change rate event received");
653 2 : changePlaybackRate(event);
654 : }
655 8 : break;
656 : }
657 16 : case GST_EVENT_FLUSH_START:
658 : {
659 16 : startFlushing();
660 16 : break;
661 : }
662 7 : case GST_EVENT_FLUSH_STOP:
663 : {
664 7 : gboolean resetTime{FALSE};
665 7 : gst_event_parse_flush_stop(event, &resetTime);
666 :
667 7 : stopFlushing(resetTime);
668 7 : break;
669 : }
670 3 : case GST_EVENT_STREAM_COLLECTION:
671 : {
672 3 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
673 3 : if (!client)
674 : {
675 1 : gst_event_unref(event);
676 1 : return FALSE;
677 : }
678 2 : int32_t videoStreams{0}, audioStreams{0}, textStreams{0};
679 2 : GstStreamCollection *streamCollection{nullptr};
680 2 : gst_event_parse_stream_collection(event, &streamCollection);
681 2 : guint streamsSize = gst_stream_collection_get_size(streamCollection);
682 6 : for (guint i = 0; i < streamsSize; ++i)
683 : {
684 4 : auto *stream = gst_stream_collection_get_stream(streamCollection, i);
685 4 : auto type = gst_stream_get_stream_type(stream);
686 4 : if (type & GST_STREAM_TYPE_AUDIO)
687 : {
688 2 : ++audioStreams;
689 : }
690 2 : else if (type & GST_STREAM_TYPE_VIDEO)
691 : {
692 1 : ++videoStreams;
693 : }
694 1 : else if (type & GST_STREAM_TYPE_TEXT)
695 : {
696 1 : ++textStreams;
697 : }
698 : }
699 2 : gst_object_unref(streamCollection);
700 2 : client->handleStreamCollection(audioStreams, videoStreams, textStreams);
701 2 : client->sendAllSourcesAttachedIfPossible();
702 2 : break;
703 3 : }
704 : #if GST_CHECK_VERSION(1, 18, 0)
705 4 : case GST_EVENT_INSTANT_RATE_CHANGE:
706 : {
707 4 : guint32 seqnum = gst_event_get_seqnum(event);
708 7 : if (m_lastInstantRateChangeSeqnum == seqnum || m_currentInstantRateChangeSeqnum.load() == seqnum)
709 : {
710 : /* Ignore if we already received the instant-rate-sync-time event from the pipeline */
711 2 : GST_DEBUG_OBJECT(m_sink, "Instant rate change event with seqnum %u already handled. Ignoring...", seqnum);
712 2 : break;
713 : }
714 :
715 2 : m_lastInstantRateChangeSeqnum = seqnum;
716 2 : gdouble rate{0.0};
717 2 : GstSegmentFlags flags{GST_SEGMENT_FLAG_NONE};
718 2 : gst_event_parse_instant_rate_change(event, &rate, &flags);
719 2 : GstMessage *msg = gst_message_new_instant_rate_request(GST_OBJECT_CAST(m_sink), rate);
720 2 : gst_message_set_seqnum(msg, seqnum);
721 2 : gst_element_post_message(m_sink, msg);
722 2 : break;
723 : }
724 : #endif
725 3 : default:
726 3 : break;
727 : }
728 :
729 198 : gst_event_unref(event);
730 :
731 198 : return TRUE;
732 : }
733 :
734 7 : void PullModePlaybackDelegate::copySegment(GstEvent *event)
735 : {
736 7 : std::lock_guard<std::mutex> lock(m_sinkMutex);
737 7 : gst_event_copy_segment(event, &m_lastSegment);
738 : }
739 :
740 7 : void PullModePlaybackDelegate::setSegment()
741 : {
742 7 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
743 7 : if (!client)
744 : {
745 1 : GST_ERROR_OBJECT(m_sink, "Could not get the media player client");
746 1 : return;
747 : }
748 6 : const bool kResetTime{m_lastSegment.flags == GST_SEGMENT_FLAG_RESET};
749 6 : int64_t position = static_cast<int64_t>(m_lastSegment.start);
750 6 : client->setSourcePosition(m_sourceId, position, kResetTime, m_lastSegment.applied_rate, m_lastSegment.stop);
751 6 : m_segmentSet = true;
752 7 : }
753 :
754 2 : void PullModePlaybackDelegate::changePlaybackRate(GstEvent *event)
755 : {
756 2 : const GstStructure *structure{gst_event_get_structure(event)};
757 2 : gdouble playbackRate{1.0};
758 2 : if (gst_structure_get_double(structure, "rate", &playbackRate) == TRUE)
759 : {
760 2 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
761 2 : if (client && m_mediaPlayerManager.hasControl())
762 : {
763 1 : GST_DEBUG_OBJECT(m_sink, "Instant playback rate change: %.2f", playbackRate);
764 1 : client->setPlaybackRate(playbackRate);
765 : }
766 2 : }
767 : }
768 :
769 16 : void PullModePlaybackDelegate::startFlushing()
770 : {
771 16 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
772 16 : if (client)
773 : {
774 5 : client->getFlushAndDataSynchronizer().waitIfRequired(m_sourceId);
775 : }
776 16 : std::lock_guard<std::mutex> lock(m_sinkMutex);
777 16 : if (!m_isSinkFlushOngoing)
778 : {
779 16 : GST_INFO_OBJECT(m_sink, "Starting flushing");
780 16 : if (m_isEos)
781 : {
782 2 : GST_DEBUG_OBJECT(m_sink, "Flush will clear EOS state.");
783 2 : m_isEos = false;
784 : }
785 16 : m_isSinkFlushOngoing = true;
786 16 : m_segmentSet = false;
787 16 : clearBuffersUnlocked();
788 : }
789 : }
790 :
791 7 : void PullModePlaybackDelegate::stopFlushing(bool resetTime)
792 : {
793 7 : GST_INFO_OBJECT(m_sink, "Stopping flushing");
794 7 : flushServer(resetTime);
795 7 : std::lock_guard<std::mutex> lock(m_sinkMutex);
796 7 : m_isSinkFlushOngoing = false;
797 :
798 7 : if (resetTime)
799 : {
800 7 : GST_DEBUG_OBJECT(m_sink, "sending reset_time message");
801 7 : gst_element_post_message(m_sink, gst_message_new_reset_time(GST_OBJECT_CAST(m_sink), 0));
802 : }
803 : }
804 :
805 7 : void PullModePlaybackDelegate::flushServer(bool resetTime)
806 : {
807 7 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
808 7 : if (!client)
809 : {
810 1 : GST_ERROR_OBJECT(m_sink, "Could not get the media player client");
811 1 : return;
812 : }
813 :
814 : {
815 6 : std::unique_lock<std::mutex> lock(m_sinkMutex);
816 6 : m_isServerFlushOngoing = true;
817 : }
818 6 : client->flush(m_sourceId, resetTime);
819 7 : }
820 :
821 35 : GstFlowReturn PullModePlaybackDelegate::handleBuffer(GstBuffer *buffer)
822 : {
823 35 : constexpr size_t kMaxInternalBuffersQueueSize = 24;
824 35 : GST_LOG_OBJECT(m_sink, "Handling buffer %p with PTS %" GST_TIME_FORMAT, buffer,
825 : GST_TIME_ARGS(GST_BUFFER_PTS(buffer)));
826 :
827 35 : std::unique_lock<std::mutex> lock(m_sinkMutex);
828 :
829 35 : if (m_samples.size() >= kMaxInternalBuffersQueueSize)
830 : {
831 1 : GST_DEBUG_OBJECT(m_sink, "Waiting for more space in buffers queue\n");
832 1 : m_needDataCondVariable.wait(lock);
833 : }
834 :
835 35 : if (m_isSinkFlushOngoing)
836 : {
837 3 : GST_DEBUG_OBJECT(m_sink, "Discarding buffer which was received during flushing");
838 3 : gst_buffer_unref(buffer);
839 3 : return GST_FLOW_FLUSHING;
840 : }
841 :
842 32 : GstSample *sample = gst_sample_new(buffer, m_caps, &m_lastSegment, nullptr);
843 32 : if (sample)
844 32 : m_samples.push(sample);
845 : else
846 0 : GST_ERROR_OBJECT(m_sink, "Failed to create a sample");
847 :
848 32 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
849 32 : if (client)
850 : {
851 28 : client->getFlushAndDataSynchronizer().notifyDataReceived(m_sourceId);
852 : }
853 :
854 32 : setLastBuffer(buffer);
855 :
856 32 : gst_buffer_unref(buffer);
857 :
858 32 : return GST_FLOW_OK;
859 35 : }
860 :
861 1 : GstRefSample PullModePlaybackDelegate::getFrontSample() const
862 : {
863 1 : std::lock_guard<std::mutex> lock(m_sinkMutex);
864 1 : if (m_isServerFlushOngoing)
865 : {
866 0 : GST_WARNING_OBJECT(m_sink, "Skip pulling buffer - flush is ongoing on server side...");
867 0 : return GstRefSample{};
868 : }
869 1 : if (!m_samples.empty())
870 : {
871 1 : GstSample *sample = m_samples.front();
872 1 : GstBuffer *buffer = gst_sample_get_buffer(sample);
873 1 : GST_LOG_OBJECT(m_sink, "Pulling buffer %p with PTS %" GST_TIME_FORMAT, buffer,
874 : GST_TIME_ARGS(GST_BUFFER_PTS(buffer)));
875 :
876 1 : return GstRefSample{sample};
877 : }
878 :
879 0 : return GstRefSample{};
880 1 : }
881 :
882 1 : void PullModePlaybackDelegate::popSample()
883 : {
884 1 : std::lock_guard<std::mutex> lock(m_sinkMutex);
885 1 : if (!m_samples.empty())
886 : {
887 1 : gst_sample_unref(m_samples.front());
888 1 : m_samples.pop();
889 : }
890 1 : m_needDataCondVariable.notify_all();
891 : }
892 :
893 0 : bool PullModePlaybackDelegate::isEos() const
894 : {
895 0 : std::lock_guard<std::mutex> lock(m_sinkMutex);
896 0 : return m_samples.empty() && m_isEos;
897 : }
898 :
899 2 : bool PullModePlaybackDelegate::isReadyToSendData() const
900 : {
901 2 : std::lock_guard<std::mutex> lock(m_sinkMutex);
902 4 : return m_isEos || m_segmentSet;
903 2 : }
904 :
905 7 : void PullModePlaybackDelegate::lostState()
906 : {
907 7 : m_isStateCommitNeeded = true;
908 7 : gst_element_lost_state(m_sink);
909 : }
910 :
911 151 : bool PullModePlaybackDelegate::attachToMediaClientAndSetStreamsNumber(const uint32_t maxVideoWidth,
912 : const uint32_t maxVideoHeight)
913 : {
914 151 : GstObject *parentObject = getOldestGstBinParent(m_sink);
915 151 : if (!m_mediaPlayerManager.attachMediaPlayerClient(parentObject, maxVideoWidth, maxVideoHeight))
916 : {
917 3 : GST_ERROR_OBJECT(m_sink, "Cannot attach the MediaPlayerClient");
918 3 : return false;
919 : }
920 :
921 148 : gchar *parentObjectName = gst_object_get_name(parentObject);
922 148 : GST_INFO_OBJECT(m_sink, "Attached media player client with parent %s(%p)", parentObjectName, parentObject);
923 148 : g_free(parentObjectName);
924 :
925 148 : return setStreamsNumber(parentObject);
926 : }
927 :
928 148 : bool PullModePlaybackDelegate::setStreamsNumber(GstObject *parentObject)
929 : {
930 148 : int32_t videoStreams{-1}, audioStreams{-1}, subtitleStreams{-1};
931 :
932 148 : GstContext *context = gst_element_get_context(m_sink, "streams-info");
933 148 : if (context)
934 : {
935 3 : GST_DEBUG_OBJECT(m_sink, "Getting number of streams from \"streams-info\" context");
936 :
937 3 : guint n_video{0}, n_audio{0}, n_text{0};
938 :
939 3 : const GstStructure *streamsInfoStructure = gst_context_get_structure(context);
940 3 : gst_structure_get_uint(streamsInfoStructure, "video-streams", &n_video);
941 3 : gst_structure_get_uint(streamsInfoStructure, "audio-streams", &n_audio);
942 3 : gst_structure_get_uint(streamsInfoStructure, "text-streams", &n_text);
943 :
944 5 : if (n_video > std::numeric_limits<int32_t>::max() || n_audio > std::numeric_limits<int32_t>::max() ||
945 2 : n_text > std::numeric_limits<int32_t>::max())
946 : {
947 1 : GST_ERROR_OBJECT(m_sink, "Number of streams is too big, video=%u, audio=%u, text=%u", n_video, n_audio,
948 : n_text);
949 1 : gst_context_unref(context);
950 1 : return false;
951 : }
952 :
953 2 : videoStreams = n_video;
954 2 : audioStreams = n_audio;
955 2 : subtitleStreams = n_text;
956 :
957 2 : gst_context_unref(context);
958 : }
959 145 : else if (getNStreamsFromParent(parentObject, videoStreams, audioStreams, subtitleStreams))
960 : {
961 2 : GST_DEBUG_OBJECT(m_sink, "Got number of streams from playbin2 properties");
962 : }
963 : else
964 : {
965 : // The default value of streams is V:1, A:1, S:0
966 : // Changing the default setting via properties is considered as DEPRECATED
967 143 : subtitleStreams = 0;
968 143 : std::lock_guard<std::mutex> lock{m_sinkMutex};
969 143 : if (m_mediaSourceType == firebolt::rialto::MediaSourceType::VIDEO)
970 : {
971 27 : videoStreams = m_numOfStreams;
972 27 : if (m_isSinglePathStream)
973 : {
974 26 : audioStreams = 0;
975 26 : subtitleStreams = 0;
976 : }
977 : }
978 116 : else if (m_mediaSourceType == firebolt::rialto::MediaSourceType::AUDIO)
979 : {
980 106 : audioStreams = m_numOfStreams;
981 106 : if (m_isSinglePathStream)
982 : {
983 105 : videoStreams = 0;
984 105 : subtitleStreams = 0;
985 : }
986 : }
987 10 : else if (m_mediaSourceType == firebolt::rialto::MediaSourceType::SUBTITLE)
988 : {
989 10 : subtitleStreams = m_numOfStreams;
990 10 : if (m_isSinglePathStream)
991 : {
992 10 : videoStreams = 0;
993 10 : audioStreams = 0;
994 : }
995 : }
996 143 : }
997 :
998 147 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
999 147 : if (!client)
1000 : {
1001 0 : GST_ERROR_OBJECT(m_sink, "MediaPlayerClient is nullptr");
1002 0 : return false;
1003 : }
1004 :
1005 147 : client->handleStreamCollection(audioStreams, videoStreams, subtitleStreams);
1006 :
1007 147 : return true;
1008 : }
1009 :
1010 4 : GstSample *PullModePlaybackDelegate::getLastSample() const
1011 : {
1012 4 : std::lock_guard<std::mutex> lock(m_sinkMutex);
1013 4 : if (m_enableLastSample && m_lastBuffer)
1014 : {
1015 2 : return gst_sample_new(m_lastBuffer, m_caps, &m_lastSegment, nullptr);
1016 : }
1017 2 : return nullptr;
1018 4 : }
1019 :
1020 465 : void PullModePlaybackDelegate::setLastBuffer(GstBuffer *buffer)
1021 : {
1022 465 : if (m_enableLastSample)
1023 : {
1024 3 : if (m_lastBuffer)
1025 : {
1026 2 : gst_buffer_unref(m_lastBuffer);
1027 : }
1028 3 : if (buffer)
1029 : {
1030 2 : m_lastBuffer = gst_buffer_ref(buffer);
1031 : }
1032 : else
1033 : {
1034 1 : m_lastBuffer = nullptr;
1035 : }
1036 : }
1037 465 : }
|