Line data Source code
1 : /*
2 : * Copyright (C) 2025 Sky UK
3 : *
4 : * This library is free software; you can redistribute it and/or
5 : * modify it under the terms of the GNU Lesser General Public
6 : * License as published by the Free Software Foundation;
7 : * version 2.1 of the License.
8 : *
9 : * This library is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 : * Lesser General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU Lesser General Public
15 : * License along with this library; if not, write to the Free Software
16 : * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 : */
18 :
19 : #include "PullModePlaybackDelegate.h"
20 : #include "ControlBackend.h"
21 : #include "GstreamerCatLog.h"
22 : #include "RialtoGStreamerMSEBaseSink.h"
23 : #include "RialtoGStreamerMSEBaseSinkPrivate.h"
24 :
25 : #define GST_CAT_DEFAULT rialtoGStreamerCat
26 :
27 : namespace
28 : {
29 312 : GstObject *getOldestGstBinParent(GstElement *element)
30 : {
31 312 : GstObject *parent = gst_object_get_parent(GST_OBJECT_CAST(element));
32 312 : GstObject *result = GST_OBJECT_CAST(element);
33 312 : if (parent)
34 : {
35 156 : if (GST_IS_BIN(parent))
36 : {
37 156 : result = getOldestGstBinParent(GST_ELEMENT_CAST(parent));
38 : }
39 156 : gst_object_unref(parent);
40 : }
41 :
42 312 : return result;
43 : }
44 :
45 6 : unsigned getGstPlayFlag(const char *nick)
46 : {
47 6 : GFlagsClass *flagsClass = static_cast<GFlagsClass *>(g_type_class_ref(g_type_from_name("GstPlayFlags")));
48 6 : GFlagsValue *flag = g_flags_get_value_by_nick(flagsClass, nick);
49 6 : return flag ? flag->value : 0;
50 : }
51 :
52 149 : bool getNStreamsFromParent(GstObject *parentObject, gint &n_video, gint &n_audio, gint &n_text)
53 : {
54 149 : if (g_object_class_find_property(G_OBJECT_GET_CLASS(parentObject), "n-video") &&
55 151 : g_object_class_find_property(G_OBJECT_GET_CLASS(parentObject), "n-audio") &&
56 2 : g_object_class_find_property(G_OBJECT_GET_CLASS(parentObject), "n-text"))
57 : {
58 2 : g_object_get(parentObject, "n-video", &n_video, "n-audio", &n_audio, "n-text", &n_text, nullptr);
59 :
60 2 : if (g_object_class_find_property(G_OBJECT_GET_CLASS(parentObject), "flags"))
61 : {
62 2 : guint flags = 0;
63 2 : g_object_get(parentObject, "flags", &flags, nullptr);
64 2 : n_video = (flags & getGstPlayFlag("video")) ? n_video : 0;
65 2 : n_audio = (flags & getGstPlayFlag("audio")) ? n_audio : 0;
66 2 : n_text = (flags & getGstPlayFlag("text")) ? n_text : 0;
67 : }
68 :
69 2 : return true;
70 : }
71 :
72 147 : return false;
73 : }
74 : } // namespace
75 :
76 283 : PullModePlaybackDelegate::PullModePlaybackDelegate(GstElement *sink) : m_sink{sink}
77 : {
78 283 : m_sinkPad = RIALTO_MSE_BASE_SINK(sink)->priv->m_sinkPad;
79 283 : gst_segment_init(&m_lastSegment, GST_FORMAT_TIME);
80 : }
81 :
82 283 : void PullModePlaybackDelegate::createControlBackend()
83 : {
84 283 : if (!m_rialtoControlClient)
85 : {
86 283 : m_rialtoControlClient = std::make_unique<firebolt::rialto::client::ControlBackend>(shared_from_this());
87 : }
88 : }
89 :
90 283 : PullModePlaybackDelegate::~PullModePlaybackDelegate()
91 : {
92 283 : if (m_caps)
93 147 : gst_caps_unref(m_caps);
94 283 : clearBuffersUnlocked();
95 : }
96 :
97 452 : void PullModePlaybackDelegate::clearBuffersUnlocked()
98 : {
99 452 : m_isSinkFlushOngoing = true;
100 452 : m_needDataCondVariable.notify_all();
101 483 : while (!m_samples.empty())
102 : {
103 31 : GstSample *sample = m_samples.front();
104 31 : m_samples.pop();
105 31 : gst_sample_unref(sample);
106 : }
107 452 : setLastBuffer(nullptr);
108 : }
109 :
110 143 : void PullModePlaybackDelegate::setSourceId(int32_t sourceId)
111 : {
112 143 : m_sourceId = sourceId;
113 : }
114 :
115 3 : void PullModePlaybackDelegate::handleEos()
116 : {
117 3 : GstState currentState = GST_STATE(m_sink);
118 3 : if ((currentState != GST_STATE_PAUSED) && (currentState != GST_STATE_PLAYING))
119 : {
120 1 : GST_ERROR_OBJECT(m_sink, "Sink cannot post a EOS message in state '%s', posting an error instead",
121 : gst_element_state_get_name(currentState));
122 :
123 1 : const char *errMessage = "Rialto sinks received EOS in non-playing state";
124 1 : GError *gError{g_error_new_literal(GST_STREAM_ERROR, 0, errMessage)};
125 1 : gst_element_post_message(m_sink, gst_message_new_error(GST_OBJECT_CAST(m_sink), gError, errMessage));
126 1 : g_error_free(gError);
127 : }
128 : else
129 : {
130 2 : std::unique_lock lock{m_sinkMutex};
131 2 : if (!m_isSinkFlushOngoing && !m_isServerFlushOngoing)
132 : {
133 1 : gst_element_post_message(m_sink, gst_message_new_eos(GST_OBJECT_CAST(m_sink)));
134 : }
135 : else
136 : {
137 1 : GST_WARNING_OBJECT(m_sink, "Skip sending eos message - flush is ongoing...");
138 : }
139 2 : }
140 3 : }
141 :
142 5 : void PullModePlaybackDelegate::handleFlushCompleted()
143 : {
144 5 : GST_INFO_OBJECT(m_sink, "Flush completed");
145 5 : std::unique_lock<std::mutex> lock(m_sinkMutex);
146 5 : m_isServerFlushOngoing = false;
147 5 : m_isTimeResetOngoing = false;
148 : }
149 :
150 40 : void PullModePlaybackDelegate::handleStateChanged(firebolt::rialto::PlaybackState state)
151 : {
152 40 : GstState current = GST_STATE(m_sink);
153 40 : GstState next = GST_STATE_NEXT(m_sink);
154 40 : GstState pending = GST_STATE_PENDING(m_sink);
155 40 : GstState postNext = next == pending ? GST_STATE_VOID_PENDING : pending;
156 :
157 40 : GST_DEBUG_OBJECT(m_sink,
158 : "Received server's state change to %u. Sink's states are: current state: %s next state: %s "
159 : "pending state: %s, last return state %s",
160 : static_cast<uint32_t>(state), gst_element_state_get_name(current),
161 : gst_element_state_get_name(next), gst_element_state_get_name(pending),
162 : gst_element_state_change_return_get_name(GST_STATE_RETURN(m_sink)));
163 :
164 40 : if (m_isStateCommitNeeded)
165 : {
166 40 : if ((state == firebolt::rialto::PlaybackState::PAUSED && next == GST_STATE_PAUSED) ||
167 7 : (state == firebolt::rialto::PlaybackState::PLAYING && next == GST_STATE_PLAYING))
168 : {
169 39 : GST_STATE(m_sink) = next;
170 39 : GST_STATE_NEXT(m_sink) = postNext;
171 39 : GST_STATE_PENDING(m_sink) = GST_STATE_VOID_PENDING;
172 39 : GST_STATE_RETURN(m_sink) = GST_STATE_CHANGE_SUCCESS;
173 :
174 39 : GST_INFO_OBJECT(m_sink, "Async state transition to state %s done", gst_element_state_get_name(next));
175 :
176 39 : gst_element_post_message(m_sink,
177 39 : gst_message_new_state_changed(GST_OBJECT_CAST(m_sink), current, next, pending));
178 39 : postAsyncDone();
179 : }
180 : /* Immediately transition to PLAYING when prerolled and PLAY is requested */
181 1 : else if (state == firebolt::rialto::PlaybackState::PAUSED && current == GST_STATE_PAUSED &&
182 : next == GST_STATE_PLAYING)
183 : {
184 1 : GST_INFO_OBJECT(m_sink, "Async state transition to PAUSED done. Transitioning to PLAYING");
185 1 : changeState(GST_STATE_CHANGE_PAUSED_TO_PLAYING);
186 : }
187 : }
188 40 : }
189 :
190 888 : GstStateChangeReturn PullModePlaybackDelegate::changeState(GstStateChange transition)
191 : {
192 888 : GstState current_state = GST_STATE_TRANSITION_CURRENT(transition);
193 888 : GstState next_state = GST_STATE_TRANSITION_NEXT(transition);
194 888 : GST_INFO_OBJECT(m_sink, "State change: (%s) -> (%s)", gst_element_state_get_name(current_state),
195 : gst_element_state_get_name(next_state));
196 :
197 888 : GstStateChangeReturn status = GST_STATE_CHANGE_SUCCESS;
198 888 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
199 :
200 888 : switch (transition)
201 : {
202 283 : case GST_STATE_CHANGE_NULL_TO_READY:
203 283 : if (!m_sinkPad)
204 : {
205 0 : GST_ERROR_OBJECT(m_sink, "Cannot start, because there's no sink pad");
206 0 : return GST_STATE_CHANGE_FAILURE;
207 : }
208 283 : if (!m_rialtoControlClient->waitForRunning())
209 : {
210 0 : GST_ERROR_OBJECT(m_sink, "Control: Rialto client cannot reach running state");
211 0 : return GST_STATE_CHANGE_FAILURE;
212 : }
213 283 : GST_INFO_OBJECT(m_sink, "Control: Rialto client reached running state");
214 283 : break;
215 152 : case GST_STATE_CHANGE_READY_TO_PAUSED:
216 : {
217 152 : if (!client)
218 : {
219 0 : GST_ERROR_OBJECT(m_sink, "Cannot get the media player client object");
220 0 : return GST_STATE_CHANGE_FAILURE;
221 : }
222 :
223 152 : m_isSinkFlushOngoing = false;
224 :
225 152 : StateChangeResult result = client->pause(m_sourceId);
226 152 : if (result == StateChangeResult::SUCCESS_ASYNC || result == StateChangeResult::NOT_ATTACHED)
227 : {
228 : // NOT_ATTACHED is not a problem here, because source will be attached later when GST_EVENT_CAPS is received
229 152 : if (result == StateChangeResult::NOT_ATTACHED)
230 : {
231 152 : postAsyncStart();
232 : }
233 152 : status = GST_STATE_CHANGE_ASYNC;
234 : }
235 :
236 152 : break;
237 : }
238 8 : case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
239 : {
240 8 : if (!client)
241 : {
242 0 : GST_ERROR_OBJECT(m_sink, "Cannot get the media player client object");
243 0 : return GST_STATE_CHANGE_FAILURE;
244 : }
245 :
246 8 : StateChangeResult result = client->play(m_sourceId);
247 8 : if (result == StateChangeResult::SUCCESS_ASYNC)
248 : {
249 8 : status = GST_STATE_CHANGE_ASYNC;
250 : }
251 0 : else if (result == StateChangeResult::NOT_ATTACHED)
252 : {
253 0 : GST_ERROR_OBJECT(m_sink, "Failed to change state to playing");
254 0 : return GST_STATE_CHANGE_FAILURE;
255 : }
256 :
257 8 : break;
258 : }
259 7 : case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
260 : {
261 7 : if (!client)
262 : {
263 0 : GST_ERROR_OBJECT(m_sink, "Cannot get the media player client object");
264 0 : return GST_STATE_CHANGE_FAILURE;
265 : }
266 :
267 7 : StateChangeResult result = client->pause(m_sourceId);
268 7 : if (result == StateChangeResult::SUCCESS_ASYNC)
269 : {
270 7 : status = GST_STATE_CHANGE_ASYNC;
271 : }
272 0 : else if (result == StateChangeResult::NOT_ATTACHED)
273 : {
274 0 : GST_ERROR_OBJECT(m_sink, "Failed to change state to paused");
275 0 : return GST_STATE_CHANGE_FAILURE;
276 : }
277 :
278 7 : break;
279 : }
280 152 : case GST_STATE_CHANGE_PAUSED_TO_READY:
281 152 : if (!client)
282 : {
283 0 : GST_ERROR_OBJECT(m_sink, "Cannot get the media player client object");
284 0 : return GST_STATE_CHANGE_FAILURE;
285 : }
286 :
287 152 : if (m_isStateCommitNeeded)
288 : {
289 131 : GST_DEBUG_OBJECT(m_sink, "Sending async_done in PAUSED->READY transition");
290 131 : postAsyncDone();
291 : }
292 :
293 152 : client->removeSource(m_sourceId);
294 : {
295 152 : std::lock_guard<std::mutex> lock(m_sinkMutex);
296 152 : clearBuffersUnlocked();
297 152 : m_sourceAttached = false;
298 : }
299 152 : break;
300 282 : case GST_STATE_CHANGE_READY_TO_NULL:
301 : // Playback will be stopped once all sources are finished and ref count
302 : // of the media pipeline object reaches 0
303 282 : m_mediaPlayerManager.releaseMediaPlayerClient();
304 282 : m_rialtoControlClient->removeControlBackend();
305 282 : break;
306 4 : default:
307 4 : break;
308 : }
309 :
310 888 : return status;
311 : }
312 :
313 3 : void PullModePlaybackDelegate::handleError(const std::string &message, gint code)
314 : {
315 3 : GError *gError{g_error_new_literal(GST_STREAM_ERROR, code, message.c_str())};
316 6 : gst_element_post_message(GST_ELEMENT_CAST(m_sink),
317 3 : gst_message_new_error(GST_OBJECT_CAST(m_sink), gError, message.c_str()));
318 3 : g_error_free(gError);
319 : }
320 :
321 2 : void PullModePlaybackDelegate::notifyApplicationState(firebolt::rialto::ApplicationState state)
322 : {
323 2 : if (state == firebolt::rialto::ApplicationState::UNKNOWN)
324 : {
325 1 : GST_WARNING_OBJECT(m_sink, "Rialto control sent unknown application state");
326 3 : handleError("Rialto client reached unknown application state");
327 : }
328 2 : }
329 :
330 310 : void PullModePlaybackDelegate::postAsyncStart()
331 : {
332 310 : m_isStateCommitNeeded = true;
333 310 : gst_element_post_message(GST_ELEMENT_CAST(m_sink), gst_message_new_async_start(GST_OBJECT(m_sink)));
334 : }
335 :
336 170 : void PullModePlaybackDelegate::postAsyncDone()
337 : {
338 170 : m_isStateCommitNeeded = false;
339 170 : gst_element_post_message(m_sink, gst_message_new_async_done(GST_OBJECT_CAST(m_sink), GST_CLOCK_TIME_NONE));
340 : }
341 :
342 328 : void PullModePlaybackDelegate::setProperty(const Property &type, const GValue *value)
343 : {
344 328 : switch (type)
345 : {
346 163 : case Property::IsSinglePathStream:
347 : {
348 163 : std::lock_guard<std::mutex> lock(m_sinkMutex);
349 163 : m_isSinglePathStream = g_value_get_boolean(value) != FALSE;
350 163 : break;
351 : }
352 163 : case Property::NumberOfStreams:
353 : {
354 163 : std::lock_guard<std::mutex> lock(m_sinkMutex);
355 163 : m_numOfStreams = g_value_get_int(value);
356 163 : break;
357 : }
358 1 : case Property::HasDrm:
359 : {
360 1 : std::lock_guard<std::mutex> lock(m_sinkMutex);
361 1 : m_hasDrm = g_value_get_boolean(value) != FALSE;
362 1 : break;
363 : }
364 1 : case Property::EnableLastSample:
365 : {
366 1 : std::lock_guard<std::mutex> lock(m_sinkMutex);
367 1 : m_enableLastSample = g_value_get_boolean(value) != FALSE;
368 1 : if (!m_enableLastSample)
369 : {
370 0 : if (m_lastBuffer)
371 : {
372 0 : gst_buffer_unref(m_lastBuffer);
373 0 : m_lastBuffer = nullptr;
374 : }
375 : }
376 1 : break;
377 : }
378 0 : default:
379 : {
380 0 : break;
381 : }
382 : }
383 328 : }
384 :
385 11 : void PullModePlaybackDelegate::getProperty(const Property &type, GValue *value)
386 : {
387 11 : switch (type)
388 : {
389 1 : case Property::IsSinglePathStream:
390 : {
391 1 : std::lock_guard<std::mutex> lock(m_sinkMutex);
392 1 : g_value_set_boolean(value, m_isSinglePathStream ? TRUE : FALSE);
393 1 : break;
394 : }
395 1 : case Property::NumberOfStreams:
396 : {
397 1 : std::lock_guard<std::mutex> lock(m_sinkMutex);
398 1 : g_value_set_int(value, m_numOfStreams);
399 1 : break;
400 : }
401 1 : case Property::HasDrm:
402 : {
403 1 : std::lock_guard<std::mutex> lock(m_sinkMutex);
404 1 : g_value_set_boolean(value, m_hasDrm ? TRUE : FALSE);
405 1 : break;
406 : }
407 2 : case Property::Stats:
408 : {
409 2 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
410 2 : if (!client)
411 : {
412 1 : GST_ERROR_OBJECT(m_sink, "Could not get the media player client");
413 1 : break;
414 : }
415 :
416 1 : guint64 totalVideoFrames{0};
417 1 : guint64 droppedVideoFrames{0};
418 1 : if (client->getStats(m_sourceId, totalVideoFrames, droppedVideoFrames))
419 : {
420 1 : GstStructure *stats{gst_structure_new("stats", "rendered", G_TYPE_UINT64, totalVideoFrames, "dropped",
421 : G_TYPE_UINT64, droppedVideoFrames, nullptr)};
422 1 : g_value_set_pointer(value, stats);
423 : }
424 : else
425 : {
426 0 : GST_ERROR_OBJECT(m_sink, "No stats returned from client");
427 : }
428 2 : }
429 : case Property::EnableLastSample:
430 : {
431 3 : std::lock_guard<std::mutex> lock(m_sinkMutex);
432 3 : g_value_set_boolean(value, m_enableLastSample ? TRUE : FALSE);
433 3 : break;
434 : }
435 4 : case Property::LastSample:
436 : {
437 : // Mutex inside getLastSample function
438 4 : gst_value_take_sample(value, getLastSample());
439 4 : break;
440 : }
441 0 : default:
442 : {
443 0 : break;
444 : }
445 : }
446 11 : }
447 :
448 26 : std::optional<gboolean> PullModePlaybackDelegate::handleQuery(GstQuery *query) const
449 : {
450 26 : GST_DEBUG_OBJECT(m_sink, "handling query '%s'", GST_QUERY_TYPE_NAME(query));
451 26 : switch (GST_QUERY_TYPE(query))
452 : {
453 1 : case GST_QUERY_SEEKING:
454 : {
455 : GstFormat fmt;
456 1 : gst_query_parse_seeking(query, &fmt, NULL, NULL, NULL);
457 1 : gst_query_set_seeking(query, fmt, FALSE, 0, -1);
458 1 : return TRUE;
459 : }
460 6 : case GST_QUERY_POSITION:
461 : {
462 6 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
463 6 : if (!client)
464 : {
465 1 : return FALSE;
466 : }
467 : {
468 5 : std::unique_lock<std::mutex> lock(m_sinkMutex);
469 5 : if (m_isServerFlushOngoing && m_isTimeResetOngoing)
470 : {
471 1 : GST_WARNING_OBJECT(m_sink, "Position query during server flush and time reset, returning FALSE");
472 1 : return FALSE;
473 : }
474 5 : }
475 :
476 : GstFormat fmt;
477 4 : gst_query_parse_position(query, &fmt, NULL);
478 4 : switch (fmt)
479 : {
480 3 : case GST_FORMAT_TIME:
481 : {
482 3 : gint64 position = client->getPosition(m_sourceId);
483 3 : GST_DEBUG_OBJECT(m_sink, "Queried position is %" GST_TIME_FORMAT, GST_TIME_ARGS(position));
484 3 : if (position < 0)
485 : {
486 2 : return FALSE;
487 : }
488 :
489 1 : gst_query_set_position(query, fmt, position);
490 1 : break;
491 : }
492 1 : default:
493 1 : break;
494 : }
495 2 : return TRUE;
496 6 : }
497 2 : case GST_QUERY_SEGMENT:
498 : {
499 2 : std::lock_guard<std::mutex> lock(m_sinkMutex);
500 2 : GstFormat format{m_lastSegment.format};
501 2 : gint64 start{static_cast<gint64>(gst_segment_to_stream_time(&m_lastSegment, format, m_lastSegment.start))};
502 2 : gint64 stop{0};
503 2 : if (m_lastSegment.stop == GST_CLOCK_TIME_NONE)
504 : {
505 2 : stop = m_lastSegment.duration;
506 : }
507 : else
508 : {
509 0 : stop = gst_segment_to_stream_time(&m_lastSegment, format, m_lastSegment.stop);
510 : }
511 2 : gst_query_set_segment(query, m_lastSegment.rate, format, start, stop);
512 2 : return TRUE;
513 : }
514 3 : case GST_QUERY_DURATION:
515 : {
516 3 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
517 3 : if (!client)
518 : {
519 1 : return FALSE;
520 : }
521 : GstFormat fmt;
522 2 : int64_t duration{-1};
523 2 : gst_query_parse_duration(query, &fmt, NULL);
524 2 : if (GST_FORMAT_TIME != fmt || !client->getDuration(duration))
525 : {
526 1 : return FALSE;
527 : }
528 1 : gst_query_set_duration(query, fmt, duration);
529 1 : return TRUE;
530 3 : }
531 14 : default:
532 14 : break;
533 : }
534 14 : return std::nullopt;
535 : }
536 :
537 29 : gboolean PullModePlaybackDelegate::handleSendEvent(GstEvent *event)
538 : {
539 29 : GST_DEBUG_OBJECT(m_sink, "handling event '%s'", GST_EVENT_TYPE_NAME(event));
540 29 : bool shouldForwardUpstream = GST_EVENT_IS_UPSTREAM(event);
541 :
542 29 : switch (GST_EVENT_TYPE(event))
543 : {
544 13 : case GST_EVENT_SEEK:
545 : {
546 13 : gdouble rate{1.0};
547 13 : GstFormat seekFormat{GST_FORMAT_UNDEFINED};
548 13 : GstSeekFlags flags{GST_SEEK_FLAG_NONE};
549 13 : GstSeekType startType{GST_SEEK_TYPE_NONE}, stopType{GST_SEEK_TYPE_NONE};
550 13 : gint64 start{0}, stop{0};
551 13 : gst_event_parse_seek(event, &rate, &seekFormat, &flags, &startType, &start, &stopType, &stop);
552 :
553 13 : if (flags & GST_SEEK_FLAG_FLUSH)
554 : {
555 9 : if (seekFormat == GST_FORMAT_TIME && startType == GST_SEEK_TYPE_END)
556 : {
557 1 : GST_ERROR_OBJECT(m_sink, "GST_SEEK_TYPE_END seek is not supported");
558 1 : gst_event_unref(event);
559 5 : return FALSE;
560 : }
561 : // Update last segment
562 8 : if (seekFormat == GST_FORMAT_TIME)
563 : {
564 7 : gboolean update{FALSE};
565 7 : std::lock_guard<std::mutex> lock(m_sinkMutex);
566 7 : gst_segment_do_seek(&m_lastSegment, rate, seekFormat, flags, startType, start, stopType, stop, &update);
567 : }
568 : }
569 : #if GST_CHECK_VERSION(1, 18, 0)
570 4 : else if (flags & GST_SEEK_FLAG_INSTANT_RATE_CHANGE)
571 : {
572 2 : gdouble rateMultiplier = rate / m_lastSegment.rate;
573 2 : GstEvent *rateChangeEvent = gst_event_new_instant_rate_change(rateMultiplier, (GstSegmentFlags)flags);
574 2 : gst_event_set_seqnum(rateChangeEvent, gst_event_get_seqnum(event));
575 2 : gst_event_unref(event);
576 2 : if (gst_pad_send_event(m_sinkPad, rateChangeEvent) != TRUE)
577 : {
578 1 : GST_ERROR_OBJECT(m_sink, "Sending instant rate change failed.");
579 1 : return FALSE;
580 : }
581 1 : return TRUE;
582 : }
583 : #endif
584 : else
585 : {
586 2 : GST_WARNING_OBJECT(m_sink, "Seek with flags 0x%X is not supported", flags);
587 2 : gst_event_unref(event);
588 2 : return FALSE;
589 : }
590 8 : break;
591 : }
592 : #if GST_CHECK_VERSION(1, 18, 0)
593 2 : case GST_EVENT_INSTANT_RATE_SYNC_TIME:
594 : {
595 2 : double rate{0.0};
596 2 : GstClockTime runningTime{GST_CLOCK_TIME_NONE}, upstreamRunningTime{GST_CLOCK_TIME_NONE};
597 2 : guint32 seqnum = gst_event_get_seqnum(event);
598 2 : gst_event_parse_instant_rate_sync_time(event, &rate, &runningTime, &upstreamRunningTime);
599 :
600 2 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
601 2 : if ((client) && (m_mediaPlayerManager.hasControl()))
602 : {
603 2 : GST_DEBUG_OBJECT(m_sink, "Instant playback rate change: %.2f", rate);
604 2 : m_currentInstantRateChangeSeqnum = seqnum;
605 2 : client->setPlaybackRate(rate);
606 : }
607 2 : break;
608 : }
609 : #endif
610 14 : default:
611 14 : break;
612 : }
613 :
614 24 : if (shouldForwardUpstream)
615 : {
616 24 : bool result = gst_pad_push_event(m_sinkPad, event);
617 24 : if (!result)
618 : {
619 10 : GST_DEBUG_OBJECT(m_sink, "forwarding upstream event '%s' failed", GST_EVENT_TYPE_NAME(event));
620 : }
621 :
622 24 : return result;
623 : }
624 :
625 0 : gst_event_unref(event);
626 0 : return TRUE;
627 : }
628 :
629 205 : gboolean PullModePlaybackDelegate::handleEvent(GstPad *pad, GstObject *parent, GstEvent *event)
630 : {
631 205 : GST_DEBUG_OBJECT(m_sink, "handling event %" GST_PTR_FORMAT, event);
632 205 : switch (GST_EVENT_TYPE(event))
633 : {
634 7 : case GST_EVENT_SEGMENT:
635 : {
636 7 : copySegment(event);
637 7 : setSegment();
638 7 : break;
639 : }
640 3 : case GST_EVENT_EOS:
641 : {
642 3 : std::lock_guard<std::mutex> lock(m_sinkMutex);
643 3 : m_isEos = true;
644 3 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
645 3 : if (client)
646 : {
647 0 : client->getFlushAndDataSynchronizer().notifyDataReceived(m_sourceId);
648 : }
649 3 : break;
650 : }
651 151 : case GST_EVENT_CAPS:
652 : {
653 : GstCaps *caps;
654 151 : gst_event_parse_caps(event, &caps);
655 : {
656 151 : std::lock_guard<std::mutex> lock(m_sinkMutex);
657 151 : if (m_caps)
658 : {
659 4 : if (!gst_caps_is_equal(caps, m_caps))
660 : {
661 1 : gst_caps_unref(m_caps);
662 1 : m_caps = gst_caps_copy(caps);
663 : }
664 : }
665 : else
666 : {
667 147 : m_caps = gst_caps_copy(caps);
668 : }
669 151 : }
670 151 : break;
671 : }
672 1 : case GST_EVENT_SINK_MESSAGE:
673 : {
674 1 : GstMessage *message = nullptr;
675 1 : gst_event_parse_sink_message(event, &message);
676 :
677 1 : if (message)
678 : {
679 1 : gst_element_post_message(m_sink, message);
680 : }
681 :
682 1 : break;
683 : }
684 8 : case GST_EVENT_CUSTOM_DOWNSTREAM:
685 : case GST_EVENT_CUSTOM_DOWNSTREAM_OOB:
686 : {
687 8 : if (gst_event_has_name(event, "custom-instant-rate-change"))
688 : {
689 2 : GST_DEBUG_OBJECT(m_sink, "Change rate event received");
690 2 : changePlaybackRate(event);
691 : }
692 8 : break;
693 : }
694 17 : case GST_EVENT_FLUSH_START:
695 : {
696 17 : startFlushing();
697 17 : break;
698 : }
699 8 : case GST_EVENT_FLUSH_STOP:
700 : {
701 8 : gboolean resetTime{FALSE};
702 8 : gst_event_parse_flush_stop(event, &resetTime);
703 :
704 8 : stopFlushing(resetTime);
705 8 : break;
706 : }
707 3 : case GST_EVENT_STREAM_COLLECTION:
708 : {
709 3 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
710 3 : if (!client)
711 : {
712 1 : gst_event_unref(event);
713 1 : return FALSE;
714 : }
715 2 : int32_t videoStreams{0}, audioStreams{0}, textStreams{0};
716 2 : GstStreamCollection *streamCollection{nullptr};
717 2 : gst_event_parse_stream_collection(event, &streamCollection);
718 2 : guint streamsSize = gst_stream_collection_get_size(streamCollection);
719 6 : for (guint i = 0; i < streamsSize; ++i)
720 : {
721 4 : auto *stream = gst_stream_collection_get_stream(streamCollection, i);
722 4 : auto type = gst_stream_get_stream_type(stream);
723 4 : if (type & GST_STREAM_TYPE_AUDIO)
724 : {
725 2 : ++audioStreams;
726 : }
727 2 : else if (type & GST_STREAM_TYPE_VIDEO)
728 : {
729 1 : ++videoStreams;
730 : }
731 1 : else if (type & GST_STREAM_TYPE_TEXT)
732 : {
733 1 : ++textStreams;
734 : }
735 : }
736 2 : gst_object_unref(streamCollection);
737 2 : client->handleStreamCollection(audioStreams, videoStreams, textStreams);
738 2 : client->sendAllSourcesAttachedIfPossible();
739 2 : break;
740 3 : }
741 : #if GST_CHECK_VERSION(1, 18, 0)
742 4 : case GST_EVENT_INSTANT_RATE_CHANGE:
743 : {
744 4 : guint32 seqnum = gst_event_get_seqnum(event);
745 7 : if (m_lastInstantRateChangeSeqnum == seqnum || m_currentInstantRateChangeSeqnum.load() == seqnum)
746 : {
747 : /* Ignore if we already received the instant-rate-sync-time event from the pipeline */
748 2 : GST_DEBUG_OBJECT(m_sink, "Instant rate change event with seqnum %u already handled. Ignoring...", seqnum);
749 2 : break;
750 : }
751 :
752 2 : m_lastInstantRateChangeSeqnum = seqnum;
753 2 : gdouble rate{0.0};
754 2 : GstSegmentFlags flags{GST_SEGMENT_FLAG_NONE};
755 2 : gst_event_parse_instant_rate_change(event, &rate, &flags);
756 2 : GstMessage *msg = gst_message_new_instant_rate_request(GST_OBJECT_CAST(m_sink), rate);
757 2 : gst_message_set_seqnum(msg, seqnum);
758 2 : gst_element_post_message(m_sink, msg);
759 2 : break;
760 : }
761 : #endif
762 3 : default:
763 3 : break;
764 : }
765 :
766 204 : gst_event_unref(event);
767 :
768 204 : return TRUE;
769 : }
770 :
771 7 : void PullModePlaybackDelegate::copySegment(GstEvent *event)
772 : {
773 7 : std::lock_guard<std::mutex> lock(m_sinkMutex);
774 7 : gst_event_copy_segment(event, &m_lastSegment);
775 : }
776 :
777 7 : void PullModePlaybackDelegate::setSegment()
778 : {
779 7 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
780 7 : if (!client)
781 : {
782 1 : GST_ERROR_OBJECT(m_sink, "Could not get the media player client");
783 1 : return;
784 : }
785 6 : const bool kResetTime{m_lastSegment.flags == GST_SEGMENT_FLAG_RESET};
786 6 : int64_t position = static_cast<int64_t>(m_lastSegment.start);
787 6 : client->setSourcePosition(m_sourceId, position, kResetTime, m_lastSegment.applied_rate, m_lastSegment.stop);
788 6 : m_segmentSet = true;
789 7 : }
790 :
791 2 : void PullModePlaybackDelegate::changePlaybackRate(GstEvent *event)
792 : {
793 2 : const GstStructure *structure{gst_event_get_structure(event)};
794 2 : gdouble playbackRate{1.0};
795 2 : if (gst_structure_get_double(structure, "rate", &playbackRate) == TRUE)
796 : {
797 2 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
798 2 : if (client && m_mediaPlayerManager.hasControl())
799 : {
800 1 : GST_DEBUG_OBJECT(m_sink, "Instant playback rate change: %.2f", playbackRate);
801 1 : client->setPlaybackRate(playbackRate);
802 : }
803 2 : }
804 : }
805 :
806 17 : void PullModePlaybackDelegate::startFlushing()
807 : {
808 17 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
809 17 : if (client)
810 : {
811 5 : client->getFlushAndDataSynchronizer().waitIfRequired(m_sourceId);
812 : }
813 17 : std::lock_guard<std::mutex> lock(m_sinkMutex);
814 17 : if (!m_isSinkFlushOngoing)
815 : {
816 17 : GST_INFO_OBJECT(m_sink, "Starting flushing");
817 17 : if (m_isEos)
818 : {
819 2 : GST_DEBUG_OBJECT(m_sink, "Flush will clear EOS state.");
820 2 : m_isEos = false;
821 : }
822 17 : m_isSinkFlushOngoing = true;
823 17 : m_segmentSet = false;
824 17 : clearBuffersUnlocked();
825 : }
826 : }
827 :
828 8 : void PullModePlaybackDelegate::stopFlushing(bool resetTime)
829 : {
830 8 : GST_INFO_OBJECT(m_sink, "Stopping flushing");
831 8 : flushServer(resetTime);
832 8 : std::lock_guard<std::mutex> lock(m_sinkMutex);
833 8 : m_isSinkFlushOngoing = false;
834 :
835 8 : if (resetTime)
836 : {
837 8 : GST_DEBUG_OBJECT(m_sink, "sending reset_time message");
838 8 : gst_element_post_message(m_sink, gst_message_new_reset_time(GST_OBJECT_CAST(m_sink), 0));
839 : }
840 : }
841 :
842 8 : void PullModePlaybackDelegate::flushServer(bool resetTime)
843 : {
844 8 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
845 8 : if (!client)
846 : {
847 1 : GST_ERROR_OBJECT(m_sink, "Could not get the media player client");
848 1 : return;
849 : }
850 :
851 : {
852 7 : std::unique_lock<std::mutex> lock(m_sinkMutex);
853 7 : m_isServerFlushOngoing = true;
854 7 : m_isTimeResetOngoing = true;
855 : }
856 7 : client->flush(m_sourceId, resetTime);
857 8 : }
858 :
859 35 : GstFlowReturn PullModePlaybackDelegate::handleBuffer(GstBuffer *buffer)
860 : {
861 35 : constexpr size_t kMaxInternalBuffersQueueSize = 24;
862 35 : GST_LOG_OBJECT(m_sink, "Handling buffer %p with PTS %" GST_TIME_FORMAT, buffer,
863 : GST_TIME_ARGS(GST_BUFFER_PTS(buffer)));
864 :
865 35 : std::unique_lock<std::mutex> lock(m_sinkMutex);
866 :
867 35 : if (m_samples.size() >= kMaxInternalBuffersQueueSize)
868 : {
869 1 : GST_DEBUG_OBJECT(m_sink, "Waiting for more space in buffers queue\n");
870 1 : m_needDataCondVariable.wait(lock);
871 : }
872 :
873 35 : if (m_isSinkFlushOngoing)
874 : {
875 3 : GST_DEBUG_OBJECT(m_sink, "Discarding buffer which was received during flushing");
876 3 : gst_buffer_unref(buffer);
877 3 : return GST_FLOW_FLUSHING;
878 : }
879 :
880 32 : GstSample *sample = gst_sample_new(buffer, m_caps, &m_lastSegment, nullptr);
881 32 : if (sample)
882 32 : m_samples.push(sample);
883 : else
884 0 : GST_ERROR_OBJECT(m_sink, "Failed to create a sample");
885 :
886 32 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
887 32 : if (client)
888 : {
889 28 : client->getFlushAndDataSynchronizer().notifyDataReceived(m_sourceId);
890 : }
891 :
892 32 : setLastBuffer(buffer);
893 :
894 32 : gst_buffer_unref(buffer);
895 :
896 32 : return GST_FLOW_OK;
897 35 : }
898 :
899 1 : GstRefSample PullModePlaybackDelegate::getFrontSample() const
900 : {
901 1 : std::lock_guard<std::mutex> lock(m_sinkMutex);
902 1 : if (m_isServerFlushOngoing)
903 : {
904 0 : GST_WARNING_OBJECT(m_sink, "Skip pulling buffer - flush is ongoing on server side...");
905 0 : return GstRefSample{};
906 : }
907 1 : if (!m_samples.empty())
908 : {
909 1 : GstSample *sample = m_samples.front();
910 1 : GstBuffer *buffer = gst_sample_get_buffer(sample);
911 1 : GST_LOG_OBJECT(m_sink, "Pulling buffer %p with PTS %" GST_TIME_FORMAT, buffer,
912 : GST_TIME_ARGS(GST_BUFFER_PTS(buffer)));
913 :
914 1 : return GstRefSample{sample};
915 : }
916 :
917 0 : return GstRefSample{};
918 1 : }
919 :
920 1 : void PullModePlaybackDelegate::popSample()
921 : {
922 1 : std::lock_guard<std::mutex> lock(m_sinkMutex);
923 1 : if (!m_samples.empty())
924 : {
925 1 : gst_sample_unref(m_samples.front());
926 1 : m_samples.pop();
927 : }
928 1 : m_needDataCondVariable.notify_all();
929 : }
930 :
931 0 : bool PullModePlaybackDelegate::isEos() const
932 : {
933 0 : std::lock_guard<std::mutex> lock(m_sinkMutex);
934 0 : return m_samples.empty() && m_isEos;
935 : }
936 :
937 2 : bool PullModePlaybackDelegate::isReadyToSendData() const
938 : {
939 2 : std::lock_guard<std::mutex> lock(m_sinkMutex);
940 4 : return m_isEos || m_segmentSet;
941 2 : }
942 :
943 8 : void PullModePlaybackDelegate::lostState()
944 : {
945 8 : m_isStateCommitNeeded = true;
946 8 : gst_element_lost_state(m_sink);
947 : }
948 :
949 156 : bool PullModePlaybackDelegate::attachToMediaClientAndSetStreamsNumber(const uint32_t maxVideoWidth,
950 : const uint32_t maxVideoHeight)
951 : {
952 156 : GstObject *parentObject = getOldestGstBinParent(m_sink);
953 156 : if (!m_mediaPlayerManager.attachMediaPlayerClient(parentObject, maxVideoWidth, maxVideoHeight, isLiveLatencyEnabled()))
954 : {
955 3 : GST_ERROR_OBJECT(m_sink, "Cannot attach the MediaPlayerClient");
956 3 : return false;
957 : }
958 :
959 153 : gchar *parentObjectName = gst_object_get_name(parentObject);
960 153 : GST_INFO_OBJECT(m_sink, "Attached media player client with parent %s(%p)", parentObjectName, parentObject);
961 153 : g_free(parentObjectName);
962 :
963 153 : return setStreamsNumber(parentObject);
964 : }
965 :
966 153 : bool PullModePlaybackDelegate::setStreamsNumber(GstObject *parentObject)
967 : {
968 153 : int32_t videoStreams{-1}, audioStreams{-1}, subtitleStreams{-1};
969 :
970 153 : GstContext *context = gst_element_get_context(m_sink, "streams-info");
971 153 : if (context)
972 : {
973 4 : GST_DEBUG_OBJECT(m_sink, "Getting number of streams from \"streams-info\" context");
974 :
975 4 : guint n_video{0}, n_audio{0}, n_text{0};
976 :
977 4 : const GstStructure *streamsInfoStructure = gst_context_get_structure(context);
978 4 : gst_structure_get_uint(streamsInfoStructure, "video-streams", &n_video);
979 4 : gst_structure_get_uint(streamsInfoStructure, "audio-streams", &n_audio);
980 4 : gst_structure_get_uint(streamsInfoStructure, "text-streams", &n_text);
981 :
982 7 : if (n_video > std::numeric_limits<int32_t>::max() || n_audio > std::numeric_limits<int32_t>::max() ||
983 3 : n_text > std::numeric_limits<int32_t>::max())
984 : {
985 1 : GST_ERROR_OBJECT(m_sink, "Number of streams is too big, video=%u, audio=%u, text=%u", n_video, n_audio,
986 : n_text);
987 1 : gst_context_unref(context);
988 1 : return false;
989 : }
990 :
991 3 : videoStreams = n_video;
992 3 : audioStreams = n_audio;
993 3 : subtitleStreams = n_text;
994 :
995 3 : gst_context_unref(context);
996 : }
997 149 : else if (getNStreamsFromParent(parentObject, videoStreams, audioStreams, subtitleStreams))
998 : {
999 2 : GST_DEBUG_OBJECT(m_sink, "Got number of streams from playbin2 properties");
1000 : }
1001 : else
1002 : {
1003 : // The default value of streams is V:1, A:1, S:0
1004 : // Changing the default setting via properties is considered as DEPRECATED
1005 147 : subtitleStreams = 0;
1006 147 : std::lock_guard<std::mutex> lock{m_sinkMutex};
1007 147 : if (m_mediaSourceType == firebolt::rialto::MediaSourceType::VIDEO)
1008 : {
1009 28 : videoStreams = m_numOfStreams;
1010 28 : if (m_isSinglePathStream)
1011 : {
1012 27 : audioStreams = 0;
1013 27 : subtitleStreams = 0;
1014 : }
1015 : }
1016 119 : else if (m_mediaSourceType == firebolt::rialto::MediaSourceType::AUDIO)
1017 : {
1018 109 : audioStreams = m_numOfStreams;
1019 109 : if (m_isSinglePathStream)
1020 : {
1021 108 : videoStreams = 0;
1022 108 : subtitleStreams = 0;
1023 : }
1024 : }
1025 10 : else if (m_mediaSourceType == firebolt::rialto::MediaSourceType::SUBTITLE)
1026 : {
1027 10 : subtitleStreams = m_numOfStreams;
1028 10 : if (m_isSinglePathStream)
1029 : {
1030 10 : videoStreams = 0;
1031 10 : audioStreams = 0;
1032 : }
1033 : }
1034 147 : }
1035 :
1036 152 : std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
1037 152 : if (!client)
1038 : {
1039 0 : GST_ERROR_OBJECT(m_sink, "MediaPlayerClient is nullptr");
1040 0 : return false;
1041 : }
1042 :
1043 152 : client->handleStreamCollection(audioStreams, videoStreams, subtitleStreams);
1044 :
1045 152 : return true;
1046 : }
1047 :
1048 156 : bool PullModePlaybackDelegate::isLiveLatencyEnabled() const
1049 : {
1050 156 : GstContext *context = gst_element_get_context(m_sink, "streams-info");
1051 156 : if (context)
1052 : {
1053 4 : GST_DEBUG_OBJECT(m_sink, "Checking if live latency is enabled from \"streams-info\" context");
1054 :
1055 4 : gboolean isEnabled{FALSE};
1056 :
1057 4 : const GstStructure *streamsInfoStructure = gst_context_get_structure(context);
1058 4 : gst_structure_get_boolean(streamsInfoStructure, "enable-live-latency", &isEnabled);
1059 :
1060 4 : gst_context_unref(context);
1061 4 : return isEnabled != FALSE;
1062 : }
1063 152 : return false;
1064 : }
1065 :
1066 4 : GstSample *PullModePlaybackDelegate::getLastSample() const
1067 : {
1068 4 : std::lock_guard<std::mutex> lock(m_sinkMutex);
1069 4 : if (m_enableLastSample && m_lastBuffer)
1070 : {
1071 2 : return gst_sample_new(m_lastBuffer, m_caps, &m_lastSegment, nullptr);
1072 : }
1073 2 : return nullptr;
1074 4 : }
1075 :
1076 484 : void PullModePlaybackDelegate::setLastBuffer(GstBuffer *buffer)
1077 : {
1078 484 : if (m_enableLastSample)
1079 : {
1080 3 : if (m_lastBuffer)
1081 : {
1082 2 : gst_buffer_unref(m_lastBuffer);
1083 : }
1084 3 : if (buffer)
1085 : {
1086 2 : m_lastBuffer = gst_buffer_ref(buffer);
1087 : }
1088 : else
1089 : {
1090 1 : m_lastBuffer = nullptr;
1091 : }
1092 : }
1093 484 : }
|