LCOV - code coverage report
Current view: top level - source - RialtoGStreamerMSEBaseSink.cpp (source / functions) Coverage Total Hit
Test: coverage.info Lines: 93.0 % 633 589
Test Date: 2025-06-24 14:11:58 Functions: 100.0 % 43 43

            Line data    Source code
       1              : /*
       2              :  * Copyright (C) 2022 Sky UK
       3              :  *
       4              :  * This library is free software; you can redistribute it and/or
       5              :  * modify it under the terms of the GNU Lesser General Public
       6              :  * License as published by the Free Software Foundation;
       7              :  * version 2.1 of the License.
       8              :  *
       9              :  * This library is distributed in the hope that it will be useful,
      10              :  * but WITHOUT ANY WARRANTY; without even the implied warranty of
      11              :  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
      12              :  * Lesser General Public License for more details.
      13              :  *
      14              :  * You should have received a copy of the GNU Lesser General Public
      15              :  * License along with this library; if not, write to the Free Software
      16              :  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
      17              :  */
      18              : 
      19              : #define USE_GLIB 1
      20              : 
      21              : #include <cstring>
      22              : #include <limits>
      23              : 
      24              : #include <gst/gst.h>
      25              : 
      26              : #include "ControlBackend.h"
      27              : #include "GStreamerUtils.h"
      28              : #include "IClientLogControl.h"
      29              : #include "IMediaPipeline.h"
      30              : #include "LogToGstHandler.h"
      31              : #include "RialtoGStreamerMSEBaseSink.h"
      32              : #include "RialtoGStreamerMSEBaseSinkPrivate.h"
      33              : 
      34              : GST_DEBUG_CATEGORY_STATIC(RialtoMSEBaseSinkDebug);
      35              : #define GST_CAT_DEFAULT RialtoMSEBaseSinkDebug
      36              : 
      37              : #define rialto_mse_base_sink_parent_class parent_class
      38         2445 : G_DEFINE_TYPE_WITH_CODE(RialtoMSEBaseSink, rialto_mse_base_sink, GST_TYPE_ELEMENT,
      39              :                         G_ADD_PRIVATE(RialtoMSEBaseSink)
      40              :                             GST_DEBUG_CATEGORY_INIT(RialtoMSEBaseSinkDebug, "rialtomsebasesink", 0,
      41              :                                                     "rialto mse base sink"));
      42              : 
      43              : enum
      44              : {
      45              :     PROP_0,
      46              :     PROP_IS_SINGLE_PATH_STREAM,
      47              :     PROP_N_STREAMS,
      48              :     PROP_HAS_DRM,
      49              :     PROP_STATS,
      50              :     PROP_LAST
      51              : };
      52              : 
      53              : enum
      54              : {
      55              :     SIGNAL_UNDERFLOW,
      56              :     SIGNAL_LAST
      57              : };
      58              : 
      59              : static guint g_signals[SIGNAL_LAST] = {0};
      60              : 
      61            6 : static unsigned rialto_mse_base_sink_get_gst_play_flag(const char *nick)
      62              : {
      63            6 :     GFlagsClass *flagsClass = static_cast<GFlagsClass *>(g_type_class_ref(g_type_from_name("GstPlayFlags")));
      64            6 :     GFlagsValue *flag = g_flags_get_value_by_nick(flagsClass, nick);
      65            6 :     return flag ? flag->value : 0;
      66              : }
      67              : 
      68          304 : void rialto_mse_base_async_start(RialtoMSEBaseSink *sink)
      69              : {
      70          304 :     sink->priv->m_isStateCommitNeeded = true;
      71          304 :     gst_element_post_message(GST_ELEMENT_CAST(sink), gst_message_new_async_start(GST_OBJECT(sink)));
      72              : }
      73              : 
      74          148 : static void rialto_mse_base_async_done(RialtoMSEBaseSink *sink)
      75              : {
      76          148 :     sink->priv->m_isStateCommitNeeded = false;
      77          148 :     gst_element_post_message(GST_ELEMENT_CAST(sink),
      78              :                              gst_message_new_async_done(GST_OBJECT_CAST(sink), GST_CLOCK_TIME_NONE));
      79              : }
      80              : 
      81            3 : static void rialto_mse_base_sink_eos_handler(RialtoMSEBaseSink *sink)
      82              : {
      83            3 :     GstState currentState = GST_STATE(sink);
      84            3 :     if ((currentState != GST_STATE_PAUSED) && (currentState != GST_STATE_PLAYING))
      85              :     {
      86            1 :         GST_ERROR_OBJECT(sink, "Sink cannot post a EOS message in state '%s', posting an error instead",
      87              :                          gst_element_state_get_name(currentState));
      88              : 
      89            1 :         const char *errMessage = "Rialto sinks received EOS in non-playing state";
      90            1 :         GError *gError{g_error_new_literal(GST_STREAM_ERROR, 0, errMessage)};
      91            1 :         gst_element_post_message(GST_ELEMENT_CAST(sink),
      92              :                                  gst_message_new_error(GST_OBJECT_CAST(sink), gError, errMessage));
      93            1 :         g_error_free(gError);
      94              :     }
      95            2 :     else if (!sink->priv->m_isFlushOngoing)
      96              :     {
      97            1 :         gst_element_post_message(GST_ELEMENT_CAST(sink), gst_message_new_eos(GST_OBJECT_CAST(sink)));
      98              :     }
      99              :     else
     100              :     {
     101            1 :         GST_WARNING_OBJECT(sink, "Skip sending eos message - flush is ongoing...");
     102              :     }
     103            3 : }
     104              : 
     105            3 : static void rialto_mse_base_sink_error_handler(RialtoMSEBaseSink *sink, firebolt::rialto::PlaybackError error)
     106              : {
     107            3 :     GError *gError = nullptr;
     108            3 :     std::string message;
     109            3 :     switch (error)
     110              :     {
     111            1 :     case firebolt::rialto::PlaybackError::DECRYPTION:
     112              :     {
     113            1 :         message = "Rialto dropped a frame that failed to decrypt";
     114            1 :         gError = g_error_new_literal(GST_STREAM_ERROR, GST_STREAM_ERROR_DECRYPT, message.c_str());
     115            1 :         break;
     116              :     }
     117            2 :     case firebolt::rialto::PlaybackError::UNKNOWN:
     118              :     default:
     119              :     {
     120            2 :         message = "Rialto server playback failed";
     121            2 :         gError = g_error_new_literal(GST_STREAM_ERROR, 0, message.c_str());
     122            2 :         break;
     123              :     }
     124              :     }
     125            3 :     gst_element_post_message(GST_ELEMENT_CAST(sink),
     126              :                              gst_message_new_error(GST_OBJECT_CAST(sink), gError, message.c_str()));
     127            3 :     g_error_free(gError);
     128              : }
     129              : 
     130          576 : static GstStateChangeReturn rialto_mse_base_sink_change_state(GstElement *element, GstStateChange transition)
     131              : {
     132          576 :     RialtoMSEBaseSink *sink = RIALTO_MSE_BASE_SINK(element);
     133          576 :     RialtoMSEBaseSinkPrivate *priv = sink->priv;
     134              : 
     135          576 :     GstState current_state = GST_STATE_TRANSITION_CURRENT(transition);
     136          576 :     GstState next_state = GST_STATE_TRANSITION_NEXT(transition);
     137          576 :     GST_INFO_OBJECT(sink, "State change: (%s) -> (%s)", gst_element_state_get_name(current_state),
     138              :                     gst_element_state_get_name(next_state));
     139              : 
     140          576 :     GstStateChangeReturn status = GST_STATE_CHANGE_SUCCESS;
     141          576 :     std::shared_ptr<GStreamerMSEMediaPlayerClient> client = sink->priv->m_mediaPlayerManager.getMediaPlayerClient();
     142              : 
     143          576 :     switch (transition)
     144              :     {
     145          145 :     case GST_STATE_CHANGE_NULL_TO_READY:
     146          145 :         if (!priv->m_sinkPad)
     147              :         {
     148            0 :             GST_ERROR_OBJECT(sink, "Cannot start, because there's no sink pad");
     149            0 :             return GST_STATE_CHANGE_FAILURE;
     150              :         }
     151          145 :         if (!priv->m_rialtoControlClient->waitForRunning())
     152              :         {
     153            0 :             GST_ERROR_OBJECT(sink, "Control: Rialto client cannot reach running state");
     154            0 :             return GST_STATE_CHANGE_FAILURE;
     155              :         }
     156          145 :         GST_INFO_OBJECT(sink, "Control: Rialto client reached running state");
     157          145 :         break;
     158          135 :     case GST_STATE_CHANGE_READY_TO_PAUSED:
     159              :     {
     160          135 :         if (!client)
     161              :         {
     162            0 :             GST_ERROR_OBJECT(sink, "Cannot get the media player client object");
     163            0 :             return GST_STATE_CHANGE_FAILURE;
     164              :         }
     165              : 
     166          135 :         priv->m_isFlushOngoing = false;
     167              : 
     168          135 :         StateChangeResult result = client->pause(priv->m_sourceId);
     169          135 :         if (result == StateChangeResult::SUCCESS_ASYNC || result == StateChangeResult::NOT_ATTACHED)
     170              :         {
     171              :             // NOT_ATTACHED is not a problem here, because source will be attached later when GST_EVENT_CAPS is received
     172          135 :             if (result == StateChangeResult::NOT_ATTACHED)
     173              :             {
     174          135 :                 rialto_mse_base_async_start(sink);
     175              :             }
     176          135 :             status = GST_STATE_CHANGE_ASYNC;
     177              :         }
     178              : 
     179          135 :         break;
     180              :     }
     181            6 :     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
     182              :     {
     183            6 :         if (!client)
     184              :         {
     185            0 :             GST_ERROR_OBJECT(sink, "Cannot get the media player client object");
     186            0 :             return GST_STATE_CHANGE_FAILURE;
     187              :         }
     188              : 
     189            6 :         StateChangeResult result = client->play(priv->m_sourceId);
     190            6 :         if (result == StateChangeResult::SUCCESS_ASYNC)
     191              :         {
     192            6 :             status = GST_STATE_CHANGE_ASYNC;
     193              :         }
     194            0 :         else if (result == StateChangeResult::NOT_ATTACHED)
     195              :         {
     196            0 :             GST_ERROR_OBJECT(sink, "Failed to change state to playing");
     197            0 :             return GST_STATE_CHANGE_FAILURE;
     198              :         }
     199              : 
     200            6 :         break;
     201              :     }
     202            6 :     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
     203              :     {
     204            6 :         if (!client)
     205              :         {
     206            0 :             GST_ERROR_OBJECT(sink, "Cannot get the media player client object");
     207            0 :             return GST_STATE_CHANGE_FAILURE;
     208              :         }
     209              : 
     210            6 :         StateChangeResult result = client->pause(priv->m_sourceId);
     211            6 :         if (result == StateChangeResult::SUCCESS_ASYNC)
     212              :         {
     213            6 :             status = GST_STATE_CHANGE_ASYNC;
     214              :         }
     215            0 :         else if (result == StateChangeResult::NOT_ATTACHED)
     216              :         {
     217            0 :             GST_ERROR_OBJECT(sink, "Failed to change state to paused");
     218            0 :             return GST_STATE_CHANGE_FAILURE;
     219              :         }
     220              : 
     221            6 :         break;
     222              :     }
     223          135 :     case GST_STATE_CHANGE_PAUSED_TO_READY:
     224          135 :         if (!client)
     225              :         {
     226            0 :             GST_ERROR_OBJECT(sink, "Cannot get the media player client object");
     227            0 :             return GST_STATE_CHANGE_FAILURE;
     228              :         }
     229              : 
     230          135 :         if (priv->m_isStateCommitNeeded)
     231              :         {
     232          120 :             GST_DEBUG_OBJECT(sink, "Sending async_done in PAUSED->READY transition");
     233          120 :             rialto_mse_base_async_done(sink);
     234              :         }
     235              : 
     236          135 :         client->removeSource(priv->m_sourceId);
     237              :         {
     238          135 :             std::lock_guard<std::mutex> lock(sink->priv->m_sinkMutex);
     239          135 :             priv->clearBuffersUnlocked();
     240          135 :             priv->m_sourceAttached = false;
     241              :         }
     242          135 :         break;
     243          145 :     case GST_STATE_CHANGE_READY_TO_NULL:
     244              :         // Playback will be stopped once all sources are finished and ref count
     245              :         // of the media pipeline object reaches 0
     246          145 :         priv->m_mediaPlayerManager.releaseMediaPlayerClient();
     247          145 :         priv->m_rialtoControlClient->removeControlBackend();
     248          145 :         break;
     249            4 :     default:
     250            4 :         break;
     251              :     }
     252              : 
     253          576 :     GstStateChangeReturn result = GST_ELEMENT_CLASS(parent_class)->change_state(element, transition);
     254          576 :     if (G_UNLIKELY(result == GST_STATE_CHANGE_FAILURE))
     255              :     {
     256            0 :         GST_WARNING_OBJECT(sink, "State change failed");
     257            0 :         return result;
     258              :     }
     259          576 :     else if (result == GST_STATE_CHANGE_ASYNC)
     260              :     {
     261            0 :         return GST_STATE_CHANGE_ASYNC;
     262              :     }
     263              : 
     264          576 :     return status;
     265              : }
     266              : 
     267           56 : static void rialto_mse_base_sink_rialto_state_changed_handler(RialtoMSEBaseSink *sink,
     268              :                                                               firebolt::rialto::PlaybackState state)
     269              : {
     270           56 :     GstState current = GST_STATE(sink);
     271           56 :     GstState next = GST_STATE_NEXT(sink);
     272           56 :     GstState pending = GST_STATE_PENDING(sink);
     273           56 :     GstState postNext = next == pending ? GST_STATE_VOID_PENDING : pending;
     274              : 
     275           56 :     GST_DEBUG_OBJECT(sink,
     276              :                      "Received server's state change to %u. Sink's states are: current state: %s next state: %s "
     277              :                      "pending state: %s, last return state %s",
     278              :                      static_cast<uint32_t>(state), gst_element_state_get_name(current),
     279              :                      gst_element_state_get_name(next), gst_element_state_get_name(pending),
     280              :                      gst_element_state_change_return_get_name(GST_STATE_RETURN(sink)));
     281              : 
     282           56 :     if (sink->priv->m_isStateCommitNeeded)
     283              :     {
     284           47 :         if ((state == firebolt::rialto::PlaybackState::PAUSED && next == GST_STATE_PAUSED) ||
     285           11 :             (state == firebolt::rialto::PlaybackState::PLAYING && next == GST_STATE_PLAYING))
     286              :         {
     287           28 :             GST_STATE(sink) = next;
     288           28 :             GST_STATE_NEXT(sink) = postNext;
     289           28 :             GST_STATE_PENDING(sink) = GST_STATE_VOID_PENDING;
     290           28 :             GST_STATE_RETURN(sink) = GST_STATE_CHANGE_SUCCESS;
     291              : 
     292           28 :             GST_INFO_OBJECT(sink, "Async state transition to state %s done", gst_element_state_get_name(next));
     293              : 
     294           28 :             gst_element_post_message(GST_ELEMENT_CAST(sink),
     295              :                                      gst_message_new_state_changed(GST_OBJECT_CAST(sink), current, next, pending));
     296           28 :             rialto_mse_base_async_done(sink);
     297              :         }
     298              :         /* Immediately transition to PLAYING when prerolled and PLAY is requested */
     299           19 :         else if (state == firebolt::rialto::PlaybackState::PAUSED && current == GST_STATE_PAUSED &&
     300              :                  next == GST_STATE_PLAYING)
     301              :         {
     302            1 :             GST_INFO_OBJECT(sink, "Async state transition to PAUSED done. Transitioning to PLAYING");
     303            1 :             rialto_mse_base_sink_change_state(GST_ELEMENT(sink), GST_STATE_CHANGE_PAUSED_TO_PLAYING);
     304              :         }
     305              :     }
     306           56 : }
     307              : 
     308            2 : static void rialto_mse_base_sink_flush_completed_handler(RialtoMSEBaseSink *sink)
     309              : {
     310            2 :     GST_INFO_OBJECT(sink, "Flush completed");
     311            2 :     std::unique_lock<std::mutex> lock(sink->priv->m_flushMutex);
     312            2 :     sink->priv->m_flushCondVariable.notify_all();
     313              : }
     314              : 
     315          266 : static void rialto_mse_base_sink_init(RialtoMSEBaseSink *sink)
     316              : {
     317          266 :     GST_INFO_OBJECT(sink, "Init: %" GST_PTR_FORMAT, sink);
     318          266 :     sink->priv = static_cast<RialtoMSEBaseSinkPrivate *>(rialto_mse_base_sink_get_instance_private(sink));
     319          266 :     new (sink->priv) RialtoMSEBaseSinkPrivate();
     320              : 
     321          266 :     sink->priv->m_rialtoControlClient = std::make_unique<firebolt::rialto::client::ControlBackend>();
     322              : 
     323          266 :     RialtoGStreamerMSEBaseSinkCallbacks callbacks;
     324          266 :     callbacks.eosCallback = std::bind(rialto_mse_base_sink_eos_handler, sink);
     325          266 :     callbacks.flushCompletedCallback = std::bind(rialto_mse_base_sink_flush_completed_handler, sink);
     326              :     callbacks.stateChangedCallback =
     327          266 :         std::bind(rialto_mse_base_sink_rialto_state_changed_handler, sink, std::placeholders::_1);
     328          266 :     callbacks.errorCallback = std::bind(rialto_mse_base_sink_error_handler, sink, std::placeholders::_1);
     329          266 :     sink->priv->m_callbacks = callbacks;
     330          266 :     gst_segment_init(&sink->priv->m_lastSegment, GST_FORMAT_TIME);
     331          266 :     GST_OBJECT_FLAG_SET(sink, GST_ELEMENT_FLAG_SINK);
     332              : }
     333              : 
     334          266 : static void rialto_mse_base_sink_finalize(GObject *object)
     335              : {
     336          266 :     RialtoMSEBaseSink *sink = RIALTO_MSE_BASE_SINK(object);
     337          266 :     RialtoMSEBaseSinkPrivate *priv = sink->priv;
     338          266 :     GST_INFO_OBJECT(sink, "Finalize: %" GST_PTR_FORMAT " %" GST_PTR_FORMAT, sink, priv);
     339              : 
     340          266 :     priv->~RialtoMSEBaseSinkPrivate();
     341          266 :     GST_CALL_PARENT(G_OBJECT_CLASS, finalize, (object));
     342              : }
     343              : 
     344            5 : static void rialto_mse_base_sink_get_property(GObject *object, guint propId, GValue *value, GParamSpec *pspec)
     345              : {
     346            5 :     RialtoMSEBaseSink *sink = RIALTO_MSE_BASE_SINK(object);
     347              : 
     348            5 :     std::lock_guard<std::mutex> lock(sink->priv->m_sinkMutex);
     349            5 :     switch (propId)
     350              :     {
     351            1 :     case PROP_IS_SINGLE_PATH_STREAM:
     352            1 :         g_value_set_boolean(value, sink->priv->m_isSinglePathStream ? TRUE : FALSE);
     353            1 :         break;
     354            1 :     case PROP_N_STREAMS:
     355            1 :         g_value_set_int(value, sink->priv->m_numOfStreams);
     356            1 :         break;
     357            1 :     case PROP_HAS_DRM:
     358            1 :         g_value_set_boolean(value, sink->priv->m_hasDrm);
     359            1 :         break;
     360            2 :     case PROP_STATS:
     361              :     {
     362            2 :         std::shared_ptr<GStreamerMSEMediaPlayerClient> client = sink->priv->m_mediaPlayerManager.getMediaPlayerClient();
     363            2 :         if (!client)
     364              :         {
     365            1 :             GST_ERROR_OBJECT(sink, "Could not get the media player client");
     366            1 :             return;
     367              :         }
     368              : 
     369              :         guint64 totalVideoFrames;
     370              :         guint64 droppedVideoFrames;
     371            1 :         if (client->getStats(sink->priv->m_sourceId, totalVideoFrames, droppedVideoFrames))
     372              :         {
     373            1 :             GstStructure *stats{gst_structure_new("stats", "rendered", G_TYPE_UINT64, totalVideoFrames, "dropped",
     374              :                                                   G_TYPE_UINT64, droppedVideoFrames, nullptr)};
     375            1 :             g_value_set_pointer(value, stats);
     376              :         }
     377              :         else
     378              :         {
     379            0 :             GST_ERROR_OBJECT(sink, "No stats returned from client");
     380              :         }
     381            2 :     }
     382            1 :     break;
     383            0 :     default:
     384            0 :         G_OBJECT_WARN_INVALID_PROPERTY_ID(object, propId, pspec);
     385            0 :         break;
     386              :     }
     387            5 : }
     388              : 
     389          293 : static void rialto_mse_base_sink_set_property(GObject *object, guint propId, const GValue *value, GParamSpec *pspec)
     390              : {
     391          293 :     RialtoMSEBaseSink *sink = RIALTO_MSE_BASE_SINK(object);
     392              : 
     393          293 :     std::lock_guard<std::mutex> lock(sink->priv->m_sinkMutex);
     394          293 :     switch (propId)
     395              :     {
     396          146 :     case PROP_IS_SINGLE_PATH_STREAM:
     397          146 :         sink->priv->m_isSinglePathStream = g_value_get_boolean(value) != FALSE;
     398          146 :         break;
     399          146 :     case PROP_N_STREAMS:
     400          146 :         sink->priv->m_numOfStreams = g_value_get_int(value);
     401          146 :         break;
     402            1 :     case PROP_HAS_DRM:
     403            1 :         sink->priv->m_hasDrm = g_value_get_boolean(value) != FALSE;
     404            1 :         break;
     405            0 :     default:
     406            0 :         G_OBJECT_WARN_INVALID_PROPERTY_ID(object, propId, pspec);
     407            0 :         break;
     408              :     }
     409          293 : }
     410              : 
     411           16 : static gboolean rialto_mse_base_sink_query(GstElement *element, GstQuery *query)
     412              : {
     413           16 :     RialtoMSEBaseSink *sink = RIALTO_MSE_BASE_SINK(element);
     414           16 :     GST_DEBUG_OBJECT(sink, "handling query '%s'", GST_QUERY_TYPE_NAME(query));
     415           16 :     switch (GST_QUERY_TYPE(query))
     416              :     {
     417            1 :     case GST_QUERY_SEEKING:
     418              :     {
     419              :         GstFormat fmt;
     420            1 :         gst_query_parse_seeking(query, &fmt, NULL, NULL, NULL);
     421            1 :         gst_query_set_seeking(query, fmt, FALSE, 0, -1);
     422            1 :         return TRUE;
     423              :     }
     424            5 :     case GST_QUERY_POSITION:
     425              :     {
     426            5 :         std::shared_ptr<GStreamerMSEMediaPlayerClient> client = sink->priv->m_mediaPlayerManager.getMediaPlayerClient();
     427            5 :         if (!client)
     428              :         {
     429            1 :             return FALSE;
     430              :         }
     431              : 
     432              :         GstFormat fmt;
     433            4 :         gst_query_parse_position(query, &fmt, NULL);
     434            4 :         switch (fmt)
     435              :         {
     436            3 :         case GST_FORMAT_TIME:
     437              :         {
     438            3 :             gint64 position = client->getPosition(sink->priv->m_sourceId);
     439            3 :             GST_DEBUG_OBJECT(sink, "Queried position is %" GST_TIME_FORMAT, GST_TIME_ARGS(position));
     440            3 :             if (position < 0)
     441              :             {
     442            2 :                 return FALSE;
     443              :             }
     444              : 
     445            1 :             gst_query_set_position(query, fmt, position);
     446            1 :             break;
     447              :         }
     448            1 :         default:
     449            1 :             break;
     450              :         }
     451            2 :         return TRUE;
     452            5 :     }
     453           10 :     default:
     454           10 :         break;
     455              :     }
     456              : 
     457           10 :     GstElement *parent = GST_ELEMENT(&sink->parent);
     458           10 :     return GST_ELEMENT_CLASS(parent_class)->query(parent, query);
     459              : }
     460              : 
     461            2 : static void rialto_mse_base_sink_change_playback_rate(RialtoMSEBaseSink *sink, GstEvent *event)
     462              : {
     463            2 :     const GstStructure *structure{gst_event_get_structure(event)};
     464            2 :     gdouble playbackRate{1.0};
     465            2 :     if (gst_structure_get_double(structure, "rate", &playbackRate) == TRUE)
     466              :     {
     467            2 :         std::shared_ptr<GStreamerMSEMediaPlayerClient> client = sink->priv->m_mediaPlayerManager.getMediaPlayerClient();
     468            2 :         if ((client) && (sink->priv->m_mediaPlayerManager.hasControl()))
     469              :         {
     470            1 :             GST_DEBUG_OBJECT(sink, "Instant playback rate change: %.2f", playbackRate);
     471            1 :             client->setPlaybackRate(playbackRate);
     472              :         }
     473            2 :     }
     474              : }
     475              : 
     476            3 : static void rialto_mse_base_sink_flush_server(RialtoMSEBaseSink *sink, bool resetTime)
     477              : {
     478            3 :     std::shared_ptr<GStreamerMSEMediaPlayerClient> client = sink->priv->m_mediaPlayerManager.getMediaPlayerClient();
     479            3 :     if (!client)
     480              :     {
     481            1 :         GST_ERROR_OBJECT(sink, "Could not get the media player client");
     482            1 :         return;
     483              :     }
     484              : 
     485            2 :     std::unique_lock<std::mutex> lock(sink->priv->m_flushMutex);
     486            2 :     GST_INFO_OBJECT(sink, "Flushing sink with sourceId %d", sink->priv->m_sourceId.load());
     487            2 :     client->flush(sink->priv->m_sourceId, resetTime);
     488            2 :     if (sink->priv->m_sourceAttached)
     489              :     {
     490            1 :         sink->priv->m_flushCondVariable.wait(lock);
     491              :     }
     492              :     else
     493              :     {
     494            1 :         GST_DEBUG_OBJECT(sink, "Skip waiting for flush finish - source not attached yet.");
     495              :     }
     496            3 : }
     497              : 
     498            3 : static void rialto_mse_base_sink_flush_start(RialtoMSEBaseSink *sink)
     499              : {
     500            3 :     std::lock_guard<std::mutex> lock(sink->priv->m_sinkMutex);
     501            3 :     if (!sink->priv->m_isFlushOngoing)
     502              :     {
     503            3 :         GST_INFO_OBJECT(sink, "Starting flushing");
     504            3 :         if (sink->priv->m_isEos)
     505              :         {
     506            2 :             GST_DEBUG_OBJECT(sink, "Flush will clear EOS state.");
     507            2 :             sink->priv->m_isEos = false;
     508              :         }
     509            3 :         sink->priv->m_isFlushOngoing = true;
     510              :         // We expect to receive a new gst segment after flush
     511            3 :         sink->priv->m_initialPositionSet = false;
     512            3 :         sink->priv->clearBuffersUnlocked();
     513              :     }
     514              : }
     515              : 
     516            3 : static void rialto_mse_base_sink_flush_stop(RialtoMSEBaseSink *sink, bool resetTime)
     517              : {
     518            3 :     GST_INFO_OBJECT(sink, "Stopping flushing");
     519            3 :     rialto_mse_base_sink_flush_server(sink, resetTime);
     520            3 :     std::lock_guard<std::mutex> lock(sink->priv->m_sinkMutex);
     521            3 :     sink->priv->m_isFlushOngoing = false;
     522              : 
     523            3 :     if (resetTime)
     524              :     {
     525            3 :         GST_DEBUG_OBJECT(sink, "sending reset_time message");
     526            3 :         gst_element_post_message(GST_ELEMENT_CAST(sink), gst_message_new_reset_time(GST_OBJECT_CAST(sink), 0));
     527              :     }
     528              : }
     529              : 
     530            5 : static void rialto_mse_base_sink_set_segment(RialtoMSEBaseSink *sink)
     531              : {
     532            5 :     std::shared_ptr<GStreamerMSEMediaPlayerClient> client = sink->priv->m_mediaPlayerManager.getMediaPlayerClient();
     533            5 :     if (!client)
     534              :     {
     535            1 :         GST_ERROR_OBJECT(sink, "Could not get the media player client");
     536            1 :         return;
     537              :     }
     538            4 :     const bool kResetTime{sink->priv->m_lastSegment.flags == GST_SEGMENT_FLAG_RESET};
     539            4 :     int64_t position = static_cast<int64_t>(sink->priv->m_lastSegment.start);
     540              :     {
     541            4 :         std::unique_lock lock{sink->priv->m_sinkMutex};
     542            4 :         sink->priv->m_initialPositionSet = true;
     543            4 :         if (sink->priv->m_queuedOffset)
     544              :         {
     545            1 :             position = sink->priv->m_queuedOffset.value();
     546            1 :             sink->priv->m_queuedOffset.reset();
     547              :         }
     548            4 :     }
     549            8 :     client->setSourcePosition(sink->priv->m_sourceId, position, kResetTime, sink->priv->m_lastSegment.applied_rate,
     550            4 :                               sink->priv->m_lastSegment.stop);
     551            5 : }
     552              : 
     553           22 : static gboolean rialto_mse_base_sink_send_event(GstElement *element, GstEvent *event)
     554              : {
     555           22 :     RialtoMSEBaseSink *sink = RIALTO_MSE_BASE_SINK(element);
     556           22 :     GST_DEBUG_OBJECT(sink, "handling event '%s'", GST_EVENT_TYPE_NAME(event));
     557           22 :     bool shouldForwardUpstream = GST_EVENT_IS_UPSTREAM(event);
     558              : 
     559           22 :     switch (GST_EVENT_TYPE(event))
     560              :     {
     561           11 :     case GST_EVENT_SEEK:
     562              :     {
     563           11 :         gdouble rate{1.0};
     564              :         GstFormat seekFormat;
     565           11 :         GstSeekFlags flags{GST_SEEK_FLAG_NONE};
     566              :         GstSeekType startType, stopType;
     567              :         gint64 start, stop;
     568           11 :         if (event)
     569              :         {
     570           11 :             gst_event_parse_seek(event, &rate, &seekFormat, &flags, &startType, &start, &stopType, &stop);
     571              : 
     572           11 :             if (flags & GST_SEEK_FLAG_FLUSH)
     573              :             {
     574            8 :                 if (seekFormat == GST_FORMAT_TIME && startType == GST_SEEK_TYPE_END)
     575              :                 {
     576            1 :                     GST_ERROR_OBJECT(sink, "GST_SEEK_TYPE_END seek is not supported");
     577            1 :                     gst_event_unref(event);
     578            4 :                     return FALSE;
     579              :                 }
     580              :             }
     581              : #if GST_CHECK_VERSION(1, 18, 0)
     582            3 :             else if (flags & GST_SEEK_FLAG_INSTANT_RATE_CHANGE)
     583              :             {
     584            2 :                 gdouble rateMultiplier = rate / sink->priv->m_lastSegment.rate;
     585            2 :                 GstEvent *rateChangeEvent = gst_event_new_instant_rate_change(rateMultiplier, (GstSegmentFlags)flags);
     586            2 :                 gst_event_set_seqnum(rateChangeEvent, gst_event_get_seqnum(event));
     587            2 :                 gst_event_unref(event);
     588            2 :                 if (gst_pad_send_event(sink->priv->m_sinkPad, rateChangeEvent) != TRUE)
     589              :                 {
     590            1 :                     GST_ERROR_OBJECT(sink, "Sending instant rate change failed.");
     591            1 :                     return FALSE;
     592              :                 }
     593            1 :                 return TRUE;
     594              :             }
     595              : #endif
     596              :             else
     597              :             {
     598            1 :                 GST_WARNING_OBJECT(sink, "Seek with flags 0x%X is not supported", flags);
     599            1 :                 gst_event_unref(event);
     600            1 :                 return FALSE;
     601              :             }
     602              :         }
     603            7 :         break;
     604              :     }
     605              : #if GST_CHECK_VERSION(1, 18, 0)
     606            1 :     case GST_EVENT_INSTANT_RATE_SYNC_TIME:
     607              :     {
     608            1 :         double rate{0.0};
     609            1 :         GstClockTime runningTime{GST_CLOCK_TIME_NONE}, upstreamRunningTime{GST_CLOCK_TIME_NONE};
     610            1 :         guint32 seqnum = gst_event_get_seqnum(event);
     611            1 :         gst_event_parse_instant_rate_sync_time(event, &rate, &runningTime, &upstreamRunningTime);
     612              : 
     613            1 :         std::shared_ptr<GStreamerMSEMediaPlayerClient> client = sink->priv->m_mediaPlayerManager.getMediaPlayerClient();
     614            1 :         if ((client) && (sink->priv->m_mediaPlayerManager.hasControl()))
     615              :         {
     616            1 :             GST_DEBUG_OBJECT(sink, "Instant playback rate change: %.2f", rate);
     617            1 :             sink->priv->currentInstantRateChangeSeqnum = seqnum;
     618            1 :             client->setPlaybackRate(rate);
     619              :         }
     620            1 :         break;
     621              :     }
     622              : #endif
     623           10 :     default:
     624           10 :         break;
     625              :     }
     626              : 
     627           18 :     if (shouldForwardUpstream)
     628              :     {
     629           18 :         bool result = gst_pad_push_event(sink->priv->m_sinkPad, event);
     630           18 :         if (!result)
     631              :         {
     632            8 :             GST_DEBUG_OBJECT(sink, "forwarding upstream event '%s' failed", GST_EVENT_TYPE_NAME(event));
     633              :         }
     634              : 
     635           18 :         return result;
     636              :     }
     637              : 
     638            0 :     gst_event_unref(event);
     639            0 :     return TRUE;
     640              : }
     641              : 
     642            5 : static void rialto_mse_base_sink_copy_segment(RialtoMSEBaseSink *sink, GstEvent *event)
     643              : {
     644            5 :     std::lock_guard<std::mutex> lock(sink->priv->m_sinkMutex);
     645            5 :     gst_event_copy_segment(event, &sink->priv->m_lastSegment);
     646              : }
     647              : 
     648            1 : static void rialto_mse_base_sink_class_init(RialtoMSEBaseSinkClass *klass)
     649              : {
     650              :     std::shared_ptr<firebolt::rialto::IClientLogHandler> logToGstHandler =
     651            1 :         std::make_shared<firebolt::rialto::LogToGstHandler>();
     652              : 
     653            1 :     if (!firebolt::rialto::IClientLogControlFactory::createFactory()->createClientLogControl().registerLogHandler(logToGstHandler,
     654              :                                                                                                                   true))
     655              :     {
     656            0 :         GST_ERROR("Unable to preRegister log handler");
     657              :     }
     658              : 
     659            1 :     GObjectClass *gobjectClass = G_OBJECT_CLASS(klass);
     660            1 :     GstElementClass *elementClass = GST_ELEMENT_CLASS(klass);
     661              : 
     662            1 :     gst_element_class_set_metadata(elementClass, "Rialto MSE base sink", "Generic", "A sink for Rialto", "Sky");
     663              : 
     664            1 :     gobjectClass->finalize = rialto_mse_base_sink_finalize;
     665            1 :     gobjectClass->get_property = rialto_mse_base_sink_get_property;
     666            1 :     gobjectClass->set_property = rialto_mse_base_sink_set_property;
     667            1 :     elementClass->query = rialto_mse_base_sink_query;
     668            1 :     elementClass->send_event = rialto_mse_base_sink_send_event;
     669            1 :     elementClass->change_state = rialto_mse_base_sink_change_state;
     670              : 
     671            1 :     g_signals[SIGNAL_UNDERFLOW] = g_signal_new("buffer-underflow-callback", G_TYPE_FROM_CLASS(klass),
     672              :                                                (GSignalFlags)(G_SIGNAL_RUN_LAST), 0, nullptr, nullptr,
     673              :                                                g_cclosure_marshal_VOID__UINT_POINTER, G_TYPE_NONE, 2, G_TYPE_UINT,
     674              :                                                G_TYPE_POINTER);
     675              : 
     676            1 :     g_object_class_install_property(gobjectClass, PROP_IS_SINGLE_PATH_STREAM,
     677              :                                     g_param_spec_boolean("single-path-stream", "single path stream",
     678              :                                                          "is single path stream", FALSE, GParamFlags(G_PARAM_READWRITE)));
     679              : 
     680            1 :     g_object_class_install_property(gobjectClass, PROP_N_STREAMS,
     681              :                                     g_param_spec_int("streams-number", "streams number", "streams number", 1, G_MAXINT,
     682              :                                                      1, GParamFlags(G_PARAM_READWRITE)));
     683              : 
     684            1 :     g_object_class_install_property(gobjectClass, PROP_HAS_DRM,
     685              :                                     g_param_spec_boolean("has-drm", "has drm", "has drm", TRUE,
     686              :                                                          GParamFlags(G_PARAM_READWRITE)));
     687            1 :     g_object_class_install_property(gobjectClass, PROP_STATS,
     688              :                                     g_param_spec_pointer("stats", NULL, "pointer to a gst_structure",
     689              :                                                          GParamFlags(G_PARAM_READABLE)));
     690              : }
     691              : 
     692            3 : GstFlowReturn rialto_mse_base_sink_chain(GstPad *pad, GstObject *parent, GstBuffer *buf)
     693              : {
     694            3 :     size_t MAX_INTERNAL_BUFFERS_QUEUE_SIZE = 24;
     695            3 :     RialtoMSEBaseSink *sink = RIALTO_MSE_BASE_SINK(parent);
     696            3 :     GST_LOG_OBJECT(sink, "Handling buffer %p with PTS %" GST_TIME_FORMAT, buf, GST_TIME_ARGS(GST_BUFFER_PTS(buf)));
     697              : 
     698            3 :     std::unique_lock<std::mutex> lock(sink->priv->m_sinkMutex);
     699              : 
     700            3 :     if (sink->priv->m_samples.size() >= MAX_INTERNAL_BUFFERS_QUEUE_SIZE)
     701              :     {
     702            1 :         GST_DEBUG_OBJECT(sink, "Waiting for more space in buffers queue\n");
     703            1 :         sink->priv->m_needDataCondVariable.wait(lock);
     704              :     }
     705              : 
     706            3 :     if (sink->priv->m_isFlushOngoing)
     707              :     {
     708            1 :         GST_DEBUG_OBJECT(sink, "Discarding buffer which was received during flushing");
     709            1 :         gst_buffer_unref(buf);
     710            1 :         return GST_FLOW_FLUSHING;
     711              :     }
     712              : 
     713            2 :     GstSample *sample = gst_sample_new(buf, sink->priv->m_caps, &sink->priv->m_lastSegment, nullptr);
     714            2 :     if (sample)
     715            2 :         sink->priv->m_samples.push(sample);
     716              :     else
     717            0 :         GST_ERROR_OBJECT(sink, "Failed to create a sample");
     718              : 
     719            2 :     gst_buffer_unref(buf);
     720              : 
     721            2 :     return GST_FLOW_OK;
     722            3 : }
     723              : 
     724          266 : bool rialto_mse_base_sink_initialise_sinkpad(RialtoMSEBaseSink *sink)
     725              : {
     726              :     GstPadTemplate *pad_template =
     727          266 :         gst_element_class_get_pad_template(GST_ELEMENT_CLASS(G_OBJECT_GET_CLASS(sink)), "sink");
     728          266 :     if (!pad_template)
     729              :     {
     730            0 :         GST_ERROR_OBJECT(sink, "Could not find sink pad template");
     731            0 :         return false;
     732              :     }
     733              : 
     734          266 :     GstPad *sinkPad = gst_pad_new_from_template(pad_template, "sink");
     735          266 :     if (!sinkPad)
     736              :     {
     737            0 :         GST_ERROR_OBJECT(sink, "Could not create sinkpad");
     738            0 :         return false;
     739              :     }
     740              : 
     741          266 :     gst_element_add_pad(GST_ELEMENT_CAST(sink), sinkPad);
     742          266 :     sink->priv->m_sinkPad = sinkPad;
     743              : 
     744          266 :     return true;
     745              : }
     746              : 
     747          166 : bool rialto_mse_base_sink_event(GstPad *pad, GstObject *parent, GstEvent *event)
     748              : {
     749          166 :     RialtoMSEBaseSink *sink = RIALTO_MSE_BASE_SINK(parent);
     750          166 :     GST_DEBUG_OBJECT(sink, "handling event %" GST_PTR_FORMAT, event);
     751          166 :     switch (GST_EVENT_TYPE(event))
     752              :     {
     753            5 :     case GST_EVENT_SEGMENT:
     754              :     {
     755            5 :         rialto_mse_base_sink_copy_segment(sink, event);
     756            5 :         rialto_mse_base_sink_set_segment(sink);
     757            5 :         break;
     758              :     }
     759            1 :     case GST_EVENT_EOS:
     760              :     {
     761            1 :         std::lock_guard<std::mutex> lock(sink->priv->m_sinkMutex);
     762            1 :         sink->priv->m_isEos = true;
     763            1 :         break;
     764              :     }
     765          135 :     case GST_EVENT_CAPS:
     766              :     {
     767              :         GstCaps *caps;
     768          135 :         gst_event_parse_caps(event, &caps);
     769              :         {
     770          135 :             std::lock_guard<std::mutex> lock(sink->priv->m_sinkMutex);
     771          135 :             if (sink->priv->m_caps)
     772              :             {
     773            4 :                 if (!gst_caps_is_equal(caps, sink->priv->m_caps))
     774              :                 {
     775            1 :                     gst_caps_unref(sink->priv->m_caps);
     776            1 :                     sink->priv->m_caps = gst_caps_copy(caps);
     777              :                 }
     778              :             }
     779              :             else
     780              :             {
     781          131 :                 sink->priv->m_caps = gst_caps_copy(caps);
     782              :             }
     783          135 :         }
     784              : 
     785          135 :         break;
     786              :     }
     787            1 :     case GST_EVENT_SINK_MESSAGE:
     788              :     {
     789            1 :         GstMessage *message = nullptr;
     790            1 :         gst_event_parse_sink_message(event, &message);
     791              : 
     792            1 :         if (message)
     793              :         {
     794            1 :             gst_element_post_message(GST_ELEMENT_CAST(sink), message);
     795              :         }
     796              : 
     797            1 :         break;
     798              :     }
     799            9 :     case GST_EVENT_CUSTOM_DOWNSTREAM:
     800              :     case GST_EVENT_CUSTOM_DOWNSTREAM_OOB:
     801              :     {
     802            9 :         if (gst_event_has_name(event, "custom-instant-rate-change"))
     803              :         {
     804            2 :             GST_DEBUG_OBJECT(sink, "Change rate event received");
     805            2 :             rialto_mse_base_sink_change_playback_rate(sink, event);
     806              :         }
     807            9 :         break;
     808              :     }
     809            3 :     case GST_EVENT_FLUSH_START:
     810              :     {
     811            3 :         rialto_mse_base_sink_flush_start(sink);
     812            3 :         break;
     813              :     }
     814            3 :     case GST_EVENT_FLUSH_STOP:
     815              :     {
     816              :         gboolean reset_time;
     817            3 :         gst_event_parse_flush_stop(event, &reset_time);
     818              : 
     819            3 :         rialto_mse_base_sink_flush_stop(sink, reset_time);
     820            3 :         break;
     821              :     }
     822            3 :     case GST_EVENT_STREAM_COLLECTION:
     823              :     {
     824            3 :         std::shared_ptr<GStreamerMSEMediaPlayerClient> client = sink->priv->m_mediaPlayerManager.getMediaPlayerClient();
     825            3 :         if (!client)
     826              :         {
     827            1 :             gst_event_unref(event);
     828            1 :             return FALSE;
     829              :         }
     830            2 :         int32_t videoStreams{0}, audioStreams{0}, textStreams{0};
     831              :         GstStreamCollection *streamCollection;
     832            2 :         gst_event_parse_stream_collection(event, &streamCollection);
     833            2 :         guint streamsSize = gst_stream_collection_get_size(streamCollection);
     834            6 :         for (guint i = 0; i < streamsSize; ++i)
     835              :         {
     836            4 :             auto *stream = gst_stream_collection_get_stream(streamCollection, i);
     837            4 :             auto type = gst_stream_get_stream_type(stream);
     838            4 :             if (type & GST_STREAM_TYPE_AUDIO)
     839              :             {
     840            2 :                 ++audioStreams;
     841              :             }
     842            2 :             else if (type & GST_STREAM_TYPE_VIDEO)
     843              :             {
     844            1 :                 ++videoStreams;
     845              :             }
     846            1 :             else if (type & GST_STREAM_TYPE_TEXT)
     847              :             {
     848            1 :                 ++textStreams;
     849              :             }
     850              :         }
     851            2 :         gst_object_unref(streamCollection);
     852            2 :         client->handleStreamCollection(audioStreams, videoStreams, textStreams);
     853            2 :         client->sendAllSourcesAttachedIfPossible();
     854            2 :         break;
     855            3 :     }
     856              : #if GST_CHECK_VERSION(1, 18, 0)
     857            3 :     case GST_EVENT_INSTANT_RATE_CHANGE:
     858              :     {
     859            3 :         guint32 seqnum = gst_event_get_seqnum(event);
     860            5 :         if (sink->priv->lastInstantRateChangeSeqnum == seqnum ||
     861            4 :             sink->priv->currentInstantRateChangeSeqnum.load() == seqnum)
     862              :         {
     863              :             /* Ignore if we already received the instant-rate-sync-time event from the pipeline */
     864            2 :             GST_DEBUG_OBJECT(sink, "Instant rate change event with seqnum %u already handled. Ignoring...", seqnum);
     865            2 :             break;
     866              :         }
     867              : 
     868            1 :         sink->priv->lastInstantRateChangeSeqnum = seqnum;
     869            1 :         gdouble rate{0.0};
     870            1 :         GstSegmentFlags flags{GST_SEGMENT_FLAG_NONE};
     871            1 :         gst_event_parse_instant_rate_change(event, &rate, &flags);
     872            1 :         GstMessage *msg = gst_message_new_instant_rate_request(GST_OBJECT_CAST(sink), rate);
     873            1 :         gst_message_set_seqnum(msg, seqnum);
     874            1 :         gst_element_post_message(GST_ELEMENT_CAST(sink), msg);
     875            1 :         break;
     876              :     }
     877              : #endif
     878            3 :     default:
     879            3 :         break;
     880              :     }
     881              : 
     882          165 :     gst_event_unref(event);
     883              : 
     884          165 :     return TRUE;
     885              : }
     886              : 
     887            5 : GstRefSample rialto_mse_base_sink_get_front_sample(RialtoMSEBaseSink *sink)
     888              : {
     889            5 :     std::lock_guard<std::mutex> lock(sink->priv->m_sinkMutex);
     890            5 :     if (!sink->priv->m_samples.empty())
     891              :     {
     892            3 :         GstSample *sample = sink->priv->m_samples.front();
     893            3 :         GstBuffer *buffer = gst_sample_get_buffer(sample);
     894            3 :         GST_LOG_OBJECT(sink, "Pulling buffer %p with PTS %" GST_TIME_FORMAT, buffer,
     895              :                        GST_TIME_ARGS(GST_BUFFER_PTS(buffer)));
     896              : 
     897            3 :         return GstRefSample{sample};
     898              :     }
     899              : 
     900            2 :     return GstRefSample{};
     901            5 : }
     902              : 
     903            3 : void rialto_mse_base_sink_pop_sample(RialtoMSEBaseSink *sink)
     904              : {
     905            3 :     std::lock_guard<std::mutex> lock(sink->priv->m_sinkMutex);
     906            3 :     sink->priv->m_needDataCondVariable.notify_all();
     907            3 :     if (!sink->priv->m_samples.empty())
     908              :     {
     909            3 :         gst_sample_unref(sink->priv->m_samples.front());
     910            3 :         sink->priv->m_samples.pop();
     911              :     }
     912              : }
     913              : 
     914            2 : bool rialto_mse_base_sink_is_eos(RialtoMSEBaseSink *sink)
     915              : {
     916            2 :     std::lock_guard<std::mutex> lock(sink->priv->m_sinkMutex);
     917            4 :     return sink->priv->m_samples.empty() && sink->priv->m_isEos;
     918            2 : }
     919              : 
     920           55 : void rialto_mse_base_handle_rialto_server_state_changed(RialtoMSEBaseSink *sink, firebolt::rialto::PlaybackState state)
     921              : {
     922           55 :     if (sink->priv->m_callbacks.stateChangedCallback)
     923              :     {
     924           55 :         sink->priv->m_callbacks.stateChangedCallback(state);
     925              :     }
     926              : }
     927              : 
     928            3 : void rialto_mse_base_handle_rialto_server_eos(RialtoMSEBaseSink *sink)
     929              : {
     930            3 :     if (sink->priv->m_callbacks.eosCallback)
     931              :     {
     932            3 :         sink->priv->m_callbacks.eosCallback();
     933              :     }
     934              : }
     935              : 
     936            2 : void rialto_mse_base_handle_rialto_server_completed_flush(RialtoMSEBaseSink *sink)
     937              : {
     938            2 :     if (sink->priv->m_callbacks.flushCompletedCallback)
     939              :     {
     940            2 :         sink->priv->m_callbacks.flushCompletedCallback();
     941              :     }
     942              : }
     943              : 
     944            3 : void rialto_mse_base_handle_rialto_server_sent_qos(RialtoMSEBaseSink *sink, uint64_t processed, uint64_t dropped)
     945              : {
     946            3 :     if (sink->priv->m_callbacks.qosCallback)
     947              :     {
     948            3 :         sink->priv->m_callbacks.qosCallback(processed, dropped);
     949              :     }
     950              : }
     951              : 
     952            2 : void rialto_mse_base_handle_rialto_server_error(RialtoMSEBaseSink *sink, firebolt::rialto::PlaybackError error)
     953              : {
     954            2 :     if (sink->priv->m_callbacks.errorCallback)
     955              :     {
     956            2 :         sink->priv->m_callbacks.errorCallback(error);
     957              :     }
     958              : }
     959              : 
     960            1 : void rialto_mse_base_handle_rialto_server_sent_buffer_underflow(RialtoMSEBaseSink *sink)
     961              : {
     962            1 :     GST_WARNING_OBJECT(sink, "Sending underflow signal");
     963              :     // send 2 last parameters just to be compatible with RDK's buffer-underflow-callback signal signature
     964            1 :     g_signal_emit(G_OBJECT(sink), g_signals[SIGNAL_UNDERFLOW], 0, 0, nullptr);
     965              : }
     966              : 
     967          278 : GstObject *rialto_mse_base_get_oldest_gst_bin_parent(GstElement *element)
     968              : {
     969          278 :     GstObject *parent = gst_object_get_parent(GST_OBJECT_CAST(element));
     970          278 :     GstObject *result = GST_OBJECT_CAST(element);
     971          278 :     if (parent)
     972              :     {
     973          139 :         if (GST_IS_BIN(parent))
     974              :         {
     975          139 :             result = rialto_mse_base_get_oldest_gst_bin_parent(GST_ELEMENT_CAST(parent));
     976              :         }
     977          139 :         gst_object_unref(parent);
     978              :     }
     979              : 
     980          278 :     return result;
     981              : }
     982              : 
     983          123 : std::shared_ptr<firebolt::rialto::CodecData> rialto_mse_base_sink_get_codec_data(RialtoMSEBaseSink *sink,
     984              :                                                                                  const GstStructure *structure)
     985              : {
     986          123 :     const GValue *codec_data = gst_structure_get_value(structure, "codec_data");
     987          123 :     if (codec_data)
     988              :     {
     989            2 :         GstBuffer *buf = gst_value_get_buffer(codec_data);
     990            2 :         if (buf)
     991              :         {
     992            1 :             GstMappedBuffer mappedBuf(buf, GST_MAP_READ);
     993            1 :             if (mappedBuf)
     994              :             {
     995            1 :                 auto codecData = std::make_shared<firebolt::rialto::CodecData>();
     996            1 :                 codecData->data = std::vector<std::uint8_t>(mappedBuf.data(), mappedBuf.data() + mappedBuf.size());
     997            1 :                 codecData->type = firebolt::rialto::CodecDataType::BUFFER;
     998            1 :                 return codecData;
     999              :             }
    1000              :             else
    1001              :             {
    1002            0 :                 GST_ERROR_OBJECT(sink, "Failed to read codec_data");
    1003            0 :                 return nullptr;
    1004              :             }
    1005            1 :         }
    1006            1 :         const gchar *str = g_value_get_string(codec_data);
    1007            1 :         if (str)
    1008              :         {
    1009            1 :             auto codecData = std::make_shared<firebolt::rialto::CodecData>();
    1010            1 :             codecData->data = std::vector<std::uint8_t>(str, str + std::strlen(str));
    1011            1 :             codecData->type = firebolt::rialto::CodecDataType::STRING;
    1012            1 :             return codecData;
    1013              :         }
    1014              :     }
    1015              : 
    1016          121 :     return nullptr;
    1017              : }
    1018              : 
    1019          123 : firebolt::rialto::StreamFormat rialto_mse_base_sink_get_stream_format(RialtoMSEBaseSink *sink,
    1020              :                                                                       const GstStructure *structure)
    1021              : {
    1022          123 :     const gchar *streamFormat = gst_structure_get_string(structure, "stream-format");
    1023          123 :     firebolt::rialto::StreamFormat format = firebolt::rialto::StreamFormat::UNDEFINED;
    1024          123 :     if (streamFormat)
    1025              :     {
    1026              :         static const std::unordered_map<std::string, firebolt::rialto::StreamFormat> stringToStreamFormatMap =
    1027            0 :             {{"raw", firebolt::rialto::StreamFormat::RAW},
    1028            0 :              {"avc", firebolt::rialto::StreamFormat::AVC},
    1029            0 :              {"byte-stream", firebolt::rialto::StreamFormat::BYTE_STREAM},
    1030            0 :              {"hvc1", firebolt::rialto::StreamFormat::HVC1},
    1031           12 :              {"hev1", firebolt::rialto::StreamFormat::HEV1}};
    1032              : 
    1033            5 :         auto strToStreamFormatIt = stringToStreamFormatMap.find(streamFormat);
    1034            5 :         if (strToStreamFormatIt != stringToStreamFormatMap.end())
    1035              :         {
    1036            5 :             format = strToStreamFormatIt->second;
    1037              :         }
    1038              :     }
    1039              : 
    1040          123 :     return format;
    1041            1 : }
    1042              : 
    1043          123 : firebolt::rialto::SegmentAlignment rialto_mse_base_sink_get_segment_alignment(RialtoMSEBaseSink *sink,
    1044              :                                                                               const GstStructure *s)
    1045              : {
    1046          123 :     const gchar *alignment = gst_structure_get_string(s, "alignment");
    1047          123 :     if (alignment)
    1048              :     {
    1049            3 :         GST_DEBUG_OBJECT(sink, "Alignment found %s", alignment);
    1050            3 :         if (strcmp(alignment, "au") == 0)
    1051              :         {
    1052            1 :             return firebolt::rialto::SegmentAlignment::AU;
    1053              :         }
    1054            2 :         else if (strcmp(alignment, "nal") == 0)
    1055              :         {
    1056            1 :             return firebolt::rialto::SegmentAlignment::NAL;
    1057              :         }
    1058              :     }
    1059              : 
    1060          121 :     return firebolt::rialto::SegmentAlignment::UNDEFINED;
    1061              : }
    1062              : 
    1063            4 : bool rialto_mse_base_sink_get_dv_profile(RialtoMSEBaseSink *sink, const GstStructure *s, uint32_t &dvProfile)
    1064              : {
    1065            4 :     gboolean isDolbyVisionEnabled = false;
    1066            4 :     if (gst_structure_get_boolean(s, "dovi-stream", &isDolbyVisionEnabled) && isDolbyVisionEnabled)
    1067              :     {
    1068            1 :         if (gst_structure_get_uint(s, "dv_profile", &dvProfile))
    1069              :         {
    1070            1 :             return true;
    1071              :         }
    1072              :     }
    1073            3 :     return false;
    1074              : }
    1075              : 
    1076            7 : void rialto_mse_base_sink_lost_state(RialtoMSEBaseSink *sink)
    1077              : {
    1078            7 :     sink->priv->m_isStateCommitNeeded = true;
    1079            7 :     gst_element_lost_state(GST_ELEMENT_CAST(sink));
    1080              : }
    1081              : 
    1082          133 : static bool rialto_mse_base_sink_get_n_streams_from_parent(GstObject *parentObject, gint &n_video, gint &n_audio,
    1083              :                                                            gint &n_text)
    1084              : {
    1085          133 :     if (g_object_class_find_property(G_OBJECT_GET_CLASS(parentObject), "n-video") &&
    1086          135 :         g_object_class_find_property(G_OBJECT_GET_CLASS(parentObject), "n-audio") &&
    1087            2 :         g_object_class_find_property(G_OBJECT_GET_CLASS(parentObject), "n-text"))
    1088              :     {
    1089            2 :         g_object_get(parentObject, "n-video", &n_video, "n-audio", &n_audio, "n-text", &n_text, nullptr);
    1090              : 
    1091            2 :         if (g_object_class_find_property(G_OBJECT_GET_CLASS(parentObject), "flags"))
    1092              :         {
    1093            2 :             guint flags = 0;
    1094            2 :             g_object_get(parentObject, "flags", &flags, nullptr);
    1095            2 :             n_video = flags & rialto_mse_base_sink_get_gst_play_flag("video") ? n_video : 0;
    1096            2 :             n_audio = flags & rialto_mse_base_sink_get_gst_play_flag("audio") ? n_audio : 0;
    1097            2 :             n_text = flags & rialto_mse_base_sink_get_gst_play_flag("text") ? n_text : 0;
    1098              :         }
    1099              : 
    1100            2 :         return true;
    1101              :     }
    1102              : 
    1103          131 :     return false;
    1104              : }
    1105              : 
    1106          136 : static bool rialto_mse_base_sink_set_streams_number(RialtoMSEBaseSink *sink, GstObject *parentObject)
    1107              : {
    1108          136 :     RialtoMSEBaseSinkPrivate *priv = sink->priv;
    1109          136 :     int32_t videoStreams{-1}, audioStreams{-1}, subtitleStreams{-1};
    1110              : 
    1111          136 :     GstContext *context = gst_element_get_context(GST_ELEMENT(sink), "streams-info");
    1112          136 :     if (context)
    1113              :     {
    1114            3 :         GST_DEBUG_OBJECT(sink, "Getting number of streams from \"streams-info\" context");
    1115              : 
    1116            3 :         guint n_video{0}, n_audio{0}, n_text{0};
    1117              : 
    1118            3 :         const GstStructure *streamsInfoStructure = gst_context_get_structure(context);
    1119            3 :         gst_structure_get_uint(streamsInfoStructure, "video-streams", &n_video);
    1120            3 :         gst_structure_get_uint(streamsInfoStructure, "audio-streams", &n_audio);
    1121            3 :         gst_structure_get_uint(streamsInfoStructure, "text-streams", &n_text);
    1122              : 
    1123            5 :         if (n_video > std::numeric_limits<int32_t>::max() || n_audio > std::numeric_limits<int32_t>::max() ||
    1124            2 :             n_text > std::numeric_limits<int32_t>::max())
    1125              :         {
    1126            1 :             GST_ERROR_OBJECT(sink, "Number of streams is too big, video=%u, audio=%u, text=%u", n_video, n_audio, n_text);
    1127            1 :             gst_context_unref(context);
    1128            1 :             return false;
    1129              :         }
    1130              : 
    1131            2 :         videoStreams = n_video;
    1132            2 :         audioStreams = n_audio;
    1133            2 :         subtitleStreams = n_text;
    1134              : 
    1135            2 :         gst_context_unref(context);
    1136              :     }
    1137          133 :     else if (rialto_mse_base_sink_get_n_streams_from_parent(parentObject, videoStreams, audioStreams, subtitleStreams))
    1138              :     {
    1139            2 :         GST_DEBUG_OBJECT(sink, "Got number of streams from playbin2 properties");
    1140              :     }
    1141              :     else
    1142              :     {
    1143              :         // The default value of streams is V:1, A:1, S:0
    1144              :         // Changing the default setting via properties is considered as DEPRECATED
    1145          131 :         subtitleStreams = 0;
    1146          131 :         std::lock_guard<std::mutex> lock(priv->m_sinkMutex);
    1147          131 :         if (priv->m_mediaSourceType == firebolt::rialto::MediaSourceType::VIDEO)
    1148              :         {
    1149           26 :             videoStreams = priv->m_numOfStreams;
    1150           26 :             if (priv->m_isSinglePathStream)
    1151              :             {
    1152           25 :                 audioStreams = 0;
    1153           25 :                 subtitleStreams = 0;
    1154              :             }
    1155              :         }
    1156          105 :         else if (priv->m_mediaSourceType == firebolt::rialto::MediaSourceType::AUDIO)
    1157              :         {
    1158           95 :             audioStreams = priv->m_numOfStreams;
    1159           95 :             if (priv->m_isSinglePathStream)
    1160              :             {
    1161           94 :                 videoStreams = 0;
    1162           94 :                 subtitleStreams = 0;
    1163              :             }
    1164              :         }
    1165           10 :         else if (priv->m_mediaSourceType == firebolt::rialto::MediaSourceType::SUBTITLE)
    1166              :         {
    1167           10 :             subtitleStreams = priv->m_numOfStreams;
    1168           10 :             if (priv->m_isSinglePathStream)
    1169              :             {
    1170           10 :                 videoStreams = 0;
    1171           10 :                 audioStreams = 0;
    1172              :             }
    1173              :         }
    1174          131 :     }
    1175              : 
    1176          135 :     std::shared_ptr<GStreamerMSEMediaPlayerClient> client = sink->priv->m_mediaPlayerManager.getMediaPlayerClient();
    1177          135 :     if (!client)
    1178              :     {
    1179            0 :         GST_ERROR_OBJECT(sink, "MediaPlayerClient is nullptr");
    1180            0 :         return false;
    1181              :     }
    1182              : 
    1183          135 :     client->handleStreamCollection(audioStreams, videoStreams, subtitleStreams);
    1184              : 
    1185          135 :     return true;
    1186              : }
    1187              : 
    1188          139 : bool rialto_mse_base_sink_attach_to_media_client_and_set_streams_number(GstElement *element, const uint32_t maxVideoWidth,
    1189              :                                                                         const uint32_t maxVideoHeight)
    1190              : {
    1191          139 :     RialtoMSEBaseSink *sink = RIALTO_MSE_BASE_SINK(element);
    1192          139 :     RialtoMSEBaseSinkPrivate *priv = sink->priv;
    1193              : 
    1194          139 :     GstObject *parentObject = rialto_mse_base_get_oldest_gst_bin_parent(element);
    1195          139 :     if (!priv->m_mediaPlayerManager.attachMediaPlayerClient(parentObject, maxVideoWidth, maxVideoHeight))
    1196              :     {
    1197            3 :         GST_ERROR_OBJECT(sink, "Cannot attach the MediaPlayerClient");
    1198            3 :         return false;
    1199              :     }
    1200              : 
    1201          136 :     gchar *parentObjectName = gst_object_get_name(parentObject);
    1202          136 :     GST_INFO_OBJECT(element, "Attached media player client with parent %s(%p)", parentObjectName, parentObject);
    1203          136 :     g_free(parentObjectName);
    1204              : 
    1205          136 :     return rialto_mse_base_sink_set_streams_number(sink, parentObject);
    1206              : }
        

Generated by: LCOV version 2.0-1