LCOV - code coverage report
Current view: top level - source - PullModePlaybackDelegate.cpp (source / functions) Coverage Total Hit
Test: coverage.info Lines: 94.1 % 527 496
Test Date: 2025-08-11 12:19:57 Functions: 96.9 % 32 31

            Line data    Source code
       1              : /*
       2              :  * Copyright (C) 2025 Sky UK
       3              :  *
       4              :  * This library is free software; you can redistribute it and/or
       5              :  * modify it under the terms of the GNU Lesser General Public
       6              :  * License as published by the Free Software Foundation;
       7              :  * version 2.1 of the License.
       8              :  *
       9              :  * This library is distributed in the hope that it will be useful,
      10              :  * but WITHOUT ANY WARRANTY; without even the implied warranty of
      11              :  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
      12              :  * Lesser General Public License for more details.
      13              :  *
      14              :  * You should have received a copy of the GNU Lesser General Public
      15              :  * License along with this library; if not, write to the Free Software
      16              :  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
      17              :  */
      18              : 
      19              : #include "PullModePlaybackDelegate.h"
      20              : #include "ControlBackend.h"
      21              : #include "GstreamerCatLog.h"
      22              : #include "RialtoGStreamerMSEBaseSink.h"
      23              : #include "RialtoGStreamerMSEBaseSinkPrivate.h"
      24              : 
      25              : #define GST_CAT_DEFAULT rialtoGStreamerCat
      26              : 
      27              : namespace
      28              : {
      29          298 : GstObject *getOldestGstBinParent(GstElement *element)
      30              : {
      31          298 :     GstObject *parent = gst_object_get_parent(GST_OBJECT_CAST(element));
      32          298 :     GstObject *result = GST_OBJECT_CAST(element);
      33          298 :     if (parent)
      34              :     {
      35          149 :         if (GST_IS_BIN(parent))
      36              :         {
      37          149 :             result = getOldestGstBinParent(GST_ELEMENT_CAST(parent));
      38              :         }
      39          149 :         gst_object_unref(parent);
      40              :     }
      41              : 
      42          298 :     return result;
      43              : }
      44              : 
      45            6 : unsigned getGstPlayFlag(const char *nick)
      46              : {
      47            6 :     GFlagsClass *flagsClass = static_cast<GFlagsClass *>(g_type_class_ref(g_type_from_name("GstPlayFlags")));
      48            6 :     GFlagsValue *flag = g_flags_get_value_by_nick(flagsClass, nick);
      49            6 :     return flag ? flag->value : 0;
      50              : }
      51              : 
      52          143 : bool getNStreamsFromParent(GstObject *parentObject, gint &n_video, gint &n_audio, gint &n_text)
      53              : {
      54          143 :     if (g_object_class_find_property(G_OBJECT_GET_CLASS(parentObject), "n-video") &&
      55          145 :         g_object_class_find_property(G_OBJECT_GET_CLASS(parentObject), "n-audio") &&
      56            2 :         g_object_class_find_property(G_OBJECT_GET_CLASS(parentObject), "n-text"))
      57              :     {
      58            2 :         g_object_get(parentObject, "n-video", &n_video, "n-audio", &n_audio, "n-text", &n_text, nullptr);
      59              : 
      60            2 :         if (g_object_class_find_property(G_OBJECT_GET_CLASS(parentObject), "flags"))
      61              :         {
      62            2 :             guint flags = 0;
      63            2 :             g_object_get(parentObject, "flags", &flags, nullptr);
      64            2 :             n_video = (flags & getGstPlayFlag("video")) ? n_video : 0;
      65            2 :             n_audio = (flags & getGstPlayFlag("audio")) ? n_audio : 0;
      66            2 :             n_text = (flags & getGstPlayFlag("text")) ? n_text : 0;
      67              :         }
      68              : 
      69            2 :         return true;
      70              :     }
      71              : 
      72          141 :     return false;
      73              : }
      74              : } // namespace
      75              : 
      76          263 : PullModePlaybackDelegate::PullModePlaybackDelegate(GstElement *sink) : m_sink{sink}
      77              : {
      78          263 :     RialtoMSEBaseSink *baseSink = RIALTO_MSE_BASE_SINK(sink);
      79          263 :     m_sinkPad = baseSink->priv->m_sinkPad;
      80          263 :     m_rialtoControlClient = std::make_unique<firebolt::rialto::client::ControlBackend>();
      81          263 :     gst_segment_init(&m_lastSegment, GST_FORMAT_TIME);
      82              : }
      83              : 
      84          263 : PullModePlaybackDelegate::~PullModePlaybackDelegate()
      85              : {
      86          263 :     if (m_caps)
      87          141 :         gst_caps_unref(m_caps);
      88          263 :     clearBuffersUnlocked();
      89              : }
      90              : 
      91          422 : void PullModePlaybackDelegate::clearBuffersUnlocked()
      92              : {
      93          422 :     m_isFlushOngoing = true;
      94          422 :     m_needDataCondVariable.notify_all();
      95          450 :     while (!m_samples.empty())
      96              :     {
      97           28 :         GstSample *sample = m_samples.front();
      98           28 :         m_samples.pop();
      99           28 :         gst_sample_unref(sample);
     100              :     }
     101          422 : }
     102              : 
     103          137 : void PullModePlaybackDelegate::setSourceId(int32_t sourceId)
     104              : {
     105          137 :     std::unique_lock lock{m_sinkMutex};
     106          137 :     m_sourceId = sourceId;
     107              : }
     108              : 
     109            3 : void PullModePlaybackDelegate::handleEos()
     110              : {
     111            3 :     GstState currentState = GST_STATE(m_sink);
     112            3 :     if ((currentState != GST_STATE_PAUSED) && (currentState != GST_STATE_PLAYING))
     113              :     {
     114            1 :         GST_ERROR_OBJECT(m_sink, "Sink cannot post a EOS message in state '%s', posting an error instead",
     115              :                          gst_element_state_get_name(currentState));
     116              : 
     117            1 :         const char *errMessage = "Rialto sinks received EOS in non-playing state";
     118            1 :         GError *gError{g_error_new_literal(GST_STREAM_ERROR, 0, errMessage)};
     119            1 :         gst_element_post_message(m_sink, gst_message_new_error(GST_OBJECT_CAST(m_sink), gError, errMessage));
     120            1 :         g_error_free(gError);
     121              :     }
     122            2 :     else if (!m_isFlushOngoing)
     123              :     {
     124            1 :         gst_element_post_message(m_sink, gst_message_new_eos(GST_OBJECT_CAST(m_sink)));
     125              :     }
     126              :     else
     127              :     {
     128            1 :         GST_WARNING_OBJECT(m_sink, "Skip sending eos message - flush is ongoing...");
     129              :     }
     130            3 : }
     131              : 
     132            2 : void PullModePlaybackDelegate::handleFlushCompleted()
     133              : {
     134            2 :     GST_INFO_OBJECT(m_sink, "Flush completed");
     135            2 :     std::unique_lock<std::mutex> lock(m_flushMutex);
     136            2 :     m_flushCondVariable.notify_all();
     137              : }
     138              : 
     139           39 : void PullModePlaybackDelegate::handleStateChanged(firebolt::rialto::PlaybackState state)
     140              : {
     141           39 :     GstState current = GST_STATE(m_sink);
     142           39 :     GstState next = GST_STATE_NEXT(m_sink);
     143           39 :     GstState pending = GST_STATE_PENDING(m_sink);
     144           39 :     GstState postNext = next == pending ? GST_STATE_VOID_PENDING : pending;
     145              : 
     146           39 :     GST_DEBUG_OBJECT(m_sink,
     147              :                      "Received server's state change to %u. Sink's states are: current state: %s next state: %s "
     148              :                      "pending state: %s, last return state %s",
     149              :                      static_cast<uint32_t>(state), gst_element_state_get_name(current),
     150              :                      gst_element_state_get_name(next), gst_element_state_get_name(pending),
     151              :                      gst_element_state_change_return_get_name(GST_STATE_RETURN(m_sink)));
     152              : 
     153           39 :     if (m_isStateCommitNeeded)
     154              :     {
     155           39 :         if ((state == firebolt::rialto::PlaybackState::PAUSED && next == GST_STATE_PAUSED) ||
     156            7 :             (state == firebolt::rialto::PlaybackState::PLAYING && next == GST_STATE_PLAYING))
     157              :         {
     158           38 :             GST_STATE(m_sink) = next;
     159           38 :             GST_STATE_NEXT(m_sink) = postNext;
     160           38 :             GST_STATE_PENDING(m_sink) = GST_STATE_VOID_PENDING;
     161           38 :             GST_STATE_RETURN(m_sink) = GST_STATE_CHANGE_SUCCESS;
     162              : 
     163           38 :             GST_INFO_OBJECT(m_sink, "Async state transition to state %s done", gst_element_state_get_name(next));
     164              : 
     165           38 :             gst_element_post_message(m_sink,
     166           38 :                                      gst_message_new_state_changed(GST_OBJECT_CAST(m_sink), current, next, pending));
     167           38 :             postAsyncDone();
     168              :         }
     169              :         /* Immediately transition to PLAYING when prerolled and PLAY is requested */
     170            1 :         else if (state == firebolt::rialto::PlaybackState::PAUSED && current == GST_STATE_PAUSED &&
     171              :                  next == GST_STATE_PLAYING)
     172              :         {
     173            1 :             GST_INFO_OBJECT(m_sink, "Async state transition to PAUSED done. Transitioning to PLAYING");
     174            1 :             changeState(GST_STATE_CHANGE_PAUSED_TO_PLAYING);
     175              :         }
     176              :     }
     177           39 : }
     178              : 
     179          836 : GstStateChangeReturn PullModePlaybackDelegate::changeState(GstStateChange transition)
     180              : {
     181          836 :     GstState current_state = GST_STATE_TRANSITION_CURRENT(transition);
     182          836 :     GstState next_state = GST_STATE_TRANSITION_NEXT(transition);
     183          836 :     GST_INFO_OBJECT(m_sink, "State change: (%s) -> (%s)", gst_element_state_get_name(current_state),
     184              :                     gst_element_state_get_name(next_state));
     185              : 
     186          836 :     GstStateChangeReturn status = GST_STATE_CHANGE_SUCCESS;
     187          836 :     std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
     188              : 
     189          836 :     switch (transition)
     190              :     {
     191          263 :     case GST_STATE_CHANGE_NULL_TO_READY:
     192          263 :         if (!m_sinkPad)
     193              :         {
     194            0 :             GST_ERROR_OBJECT(m_sink, "Cannot start, because there's no sink pad");
     195            0 :             return GST_STATE_CHANGE_FAILURE;
     196              :         }
     197          263 :         if (!m_rialtoControlClient->waitForRunning())
     198              :         {
     199            0 :             GST_ERROR_OBJECT(m_sink, "Control: Rialto client cannot reach running state");
     200            0 :             return GST_STATE_CHANGE_FAILURE;
     201              :         }
     202          263 :         GST_INFO_OBJECT(m_sink, "Control: Rialto client reached running state");
     203          263 :         break;
     204          145 :     case GST_STATE_CHANGE_READY_TO_PAUSED:
     205              :     {
     206          145 :         if (!client)
     207              :         {
     208            0 :             GST_ERROR_OBJECT(m_sink, "Cannot get the media player client object");
     209            0 :             return GST_STATE_CHANGE_FAILURE;
     210              :         }
     211              : 
     212          145 :         m_isFlushOngoing = false;
     213              : 
     214          145 :         StateChangeResult result = client->pause(m_sourceId);
     215          145 :         if (result == StateChangeResult::SUCCESS_ASYNC || result == StateChangeResult::NOT_ATTACHED)
     216              :         {
     217              :             // NOT_ATTACHED is not a problem here, because source will be attached later when GST_EVENT_CAPS is received
     218          145 :             if (result == StateChangeResult::NOT_ATTACHED)
     219              :             {
     220          145 :                 postAsyncStart();
     221              :             }
     222          145 :             status = GST_STATE_CHANGE_ASYNC;
     223              :         }
     224              : 
     225          145 :         break;
     226              :     }
     227            8 :     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
     228              :     {
     229            8 :         if (!client)
     230              :         {
     231            0 :             GST_ERROR_OBJECT(m_sink, "Cannot get the media player client object");
     232            0 :             return GST_STATE_CHANGE_FAILURE;
     233              :         }
     234              : 
     235            8 :         StateChangeResult result = client->play(m_sourceId);
     236            8 :         if (result == StateChangeResult::SUCCESS_ASYNC)
     237              :         {
     238            8 :             status = GST_STATE_CHANGE_ASYNC;
     239              :         }
     240            0 :         else if (result == StateChangeResult::NOT_ATTACHED)
     241              :         {
     242            0 :             GST_ERROR_OBJECT(m_sink, "Failed to change state to playing");
     243            0 :             return GST_STATE_CHANGE_FAILURE;
     244              :         }
     245              : 
     246            8 :         break;
     247              :     }
     248            7 :     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
     249              :     {
     250            7 :         if (!client)
     251              :         {
     252            0 :             GST_ERROR_OBJECT(m_sink, "Cannot get the media player client object");
     253            0 :             return GST_STATE_CHANGE_FAILURE;
     254              :         }
     255              : 
     256            7 :         StateChangeResult result = client->pause(m_sourceId);
     257            7 :         if (result == StateChangeResult::SUCCESS_ASYNC)
     258              :         {
     259            7 :             status = GST_STATE_CHANGE_ASYNC;
     260              :         }
     261            0 :         else if (result == StateChangeResult::NOT_ATTACHED)
     262              :         {
     263            0 :             GST_ERROR_OBJECT(m_sink, "Failed to change state to paused");
     264            0 :             return GST_STATE_CHANGE_FAILURE;
     265              :         }
     266              : 
     267            7 :         break;
     268              :     }
     269          145 :     case GST_STATE_CHANGE_PAUSED_TO_READY:
     270          145 :         if (!client)
     271              :         {
     272            0 :             GST_ERROR_OBJECT(m_sink, "Cannot get the media player client object");
     273            0 :             return GST_STATE_CHANGE_FAILURE;
     274              :         }
     275              : 
     276          145 :         if (m_isStateCommitNeeded)
     277              :         {
     278          122 :             GST_DEBUG_OBJECT(m_sink, "Sending async_done in PAUSED->READY transition");
     279          122 :             postAsyncDone();
     280              :         }
     281              : 
     282          145 :         client->removeSource(m_sourceId);
     283              :         {
     284          145 :             std::lock_guard<std::mutex> lock(m_sinkMutex);
     285          145 :             clearBuffersUnlocked();
     286          145 :             m_sourceAttached = false;
     287              :         }
     288          145 :         break;
     289          263 :     case GST_STATE_CHANGE_READY_TO_NULL:
     290              :         // Playback will be stopped once all sources are finished and ref count
     291              :         // of the media pipeline object reaches 0
     292          263 :         m_mediaPlayerManager.releaseMediaPlayerClient();
     293          263 :         m_rialtoControlClient->removeControlBackend();
     294          263 :         break;
     295            5 :     default:
     296            5 :         break;
     297              :     }
     298              : 
     299          836 :     return status;
     300              : }
     301              : 
     302            2 : void PullModePlaybackDelegate::handleError(const char *message, gint code)
     303              : {
     304            2 :     GError *gError{g_error_new_literal(GST_STREAM_ERROR, code, message)};
     305            2 :     gst_element_post_message(GST_ELEMENT_CAST(m_sink), gst_message_new_error(GST_OBJECT_CAST(m_sink), gError, message));
     306            2 :     g_error_free(gError);
     307              : }
     308              : 
     309          297 : void PullModePlaybackDelegate::postAsyncStart()
     310              : {
     311          297 :     m_isStateCommitNeeded = true;
     312          297 :     gst_element_post_message(GST_ELEMENT_CAST(m_sink), gst_message_new_async_start(GST_OBJECT(m_sink)));
     313              : }
     314              : 
     315          160 : void PullModePlaybackDelegate::postAsyncDone()
     316              : {
     317          160 :     m_isStateCommitNeeded = false;
     318          160 :     gst_element_post_message(m_sink, gst_message_new_async_done(GST_OBJECT_CAST(m_sink), GST_CLOCK_TIME_NONE));
     319              : }
     320              : 
     321          309 : void PullModePlaybackDelegate::setProperty(const Property &type, const GValue *value)
     322              : {
     323          309 :     switch (type)
     324              :     {
     325          154 :     case Property::IsSinglePathStream:
     326              :     {
     327          154 :         std::lock_guard<std::mutex> lock(m_sinkMutex);
     328          154 :         m_isSinglePathStream = g_value_get_boolean(value) != FALSE;
     329          154 :         break;
     330              :     }
     331          154 :     case Property::NumberOfStreams:
     332              :     {
     333          154 :         std::lock_guard<std::mutex> lock(m_sinkMutex);
     334          154 :         m_numOfStreams = g_value_get_int(value);
     335          154 :         break;
     336              :     }
     337            1 :     case Property::HasDrm:
     338              :     {
     339            1 :         std::lock_guard<std::mutex> lock(m_sinkMutex);
     340            1 :         m_hasDrm = g_value_get_boolean(value) != FALSE;
     341            1 :         break;
     342              :     }
     343            0 :     default:
     344              :     {
     345            0 :         break;
     346              :     }
     347              :     }
     348          309 : }
     349              : 
     350            5 : void PullModePlaybackDelegate::getProperty(const Property &type, GValue *value)
     351              : {
     352            5 :     switch (type)
     353              :     {
     354            1 :     case Property::IsSinglePathStream:
     355              :     {
     356            1 :         std::lock_guard<std::mutex> lock(m_sinkMutex);
     357            1 :         g_value_set_boolean(value, m_isSinglePathStream ? TRUE : FALSE);
     358            1 :         break;
     359              :     }
     360            1 :     case Property::NumberOfStreams:
     361              :     {
     362            1 :         std::lock_guard<std::mutex> lock(m_sinkMutex);
     363            1 :         g_value_set_int(value, m_numOfStreams);
     364            1 :         break;
     365              :     }
     366            1 :     case Property::HasDrm:
     367              :     {
     368            1 :         std::lock_guard<std::mutex> lock(m_sinkMutex);
     369            1 :         g_value_set_boolean(value, m_hasDrm ? TRUE : FALSE);
     370            1 :         break;
     371              :     }
     372            2 :     case Property::Stats:
     373              :     {
     374            2 :         std::lock_guard<std::mutex> lock(m_sinkMutex);
     375            2 :         std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
     376            2 :         if (!client)
     377              :         {
     378            1 :             GST_ERROR_OBJECT(m_sink, "Could not get the media player client");
     379            1 :             break;
     380              :         }
     381              : 
     382            1 :         guint64 totalVideoFrames{0};
     383            1 :         guint64 droppedVideoFrames{0};
     384            1 :         if (client->getStats(m_sourceId, totalVideoFrames, droppedVideoFrames))
     385              :         {
     386            1 :             GstStructure *stats{gst_structure_new("stats", "rendered", G_TYPE_UINT64, totalVideoFrames, "dropped",
     387              :                                                   G_TYPE_UINT64, droppedVideoFrames, nullptr)};
     388            1 :             g_value_set_pointer(value, stats);
     389              :         }
     390              :         else
     391              :         {
     392            0 :             GST_ERROR_OBJECT(m_sink, "No stats returned from client");
     393              :         }
     394            3 :     }
     395              :     default:
     396              :     {
     397            1 :         break;
     398              :     }
     399              :     }
     400            5 : }
     401              : 
     402           21 : std::optional<gboolean> PullModePlaybackDelegate::handleQuery(GstQuery *query) const
     403              : {
     404           21 :     GST_DEBUG_OBJECT(m_sink, "handling query '%s'", GST_QUERY_TYPE_NAME(query));
     405           21 :     switch (GST_QUERY_TYPE(query))
     406              :     {
     407            1 :     case GST_QUERY_SEEKING:
     408              :     {
     409              :         GstFormat fmt;
     410            1 :         gst_query_parse_seeking(query, &fmt, NULL, NULL, NULL);
     411            1 :         gst_query_set_seeking(query, fmt, FALSE, 0, -1);
     412            1 :         return TRUE;
     413              :     }
     414            5 :     case GST_QUERY_POSITION:
     415              :     {
     416            5 :         std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
     417            5 :         if (!client)
     418              :         {
     419            1 :             return FALSE;
     420              :         }
     421              : 
     422              :         GstFormat fmt;
     423            4 :         gst_query_parse_position(query, &fmt, NULL);
     424            4 :         switch (fmt)
     425              :         {
     426            3 :         case GST_FORMAT_TIME:
     427              :         {
     428            3 :             gint64 position = client->getPosition(m_sourceId);
     429            3 :             GST_DEBUG_OBJECT(m_sink, "Queried position is %" GST_TIME_FORMAT, GST_TIME_ARGS(position));
     430            3 :             if (position < 0)
     431              :             {
     432            2 :                 return FALSE;
     433              :             }
     434              : 
     435            1 :             gst_query_set_position(query, fmt, position);
     436            1 :             break;
     437              :         }
     438            1 :         default:
     439            1 :             break;
     440              :         }
     441            2 :         return TRUE;
     442            5 :     }
     443            2 :     case GST_QUERY_SEGMENT:
     444              :     {
     445            2 :         std::lock_guard<std::mutex> lock(m_sinkMutex);
     446            2 :         GstFormat format{m_lastSegment.format};
     447            2 :         gint64 start{static_cast<gint64>(gst_segment_to_stream_time(&m_lastSegment, format, m_lastSegment.start))};
     448            2 :         gint64 stop{0};
     449            2 :         if (m_lastSegment.stop == GST_CLOCK_TIME_NONE)
     450              :         {
     451            2 :             stop = m_lastSegment.duration;
     452              :         }
     453              :         else
     454              :         {
     455            0 :             stop = gst_segment_to_stream_time(&m_lastSegment, format, m_lastSegment.stop);
     456              :         }
     457            2 :         gst_query_set_segment(query, m_lastSegment.rate, format, start, stop);
     458            2 :         return TRUE;
     459              :     }
     460           13 :     default:
     461           13 :         break;
     462              :     }
     463           13 :     return std::nullopt;
     464              : }
     465              : 
     466           28 : gboolean PullModePlaybackDelegate::handleSendEvent(GstEvent *event)
     467              : {
     468           28 :     GST_DEBUG_OBJECT(m_sink, "handling event '%s'", GST_EVENT_TYPE_NAME(event));
     469           28 :     bool shouldForwardUpstream = GST_EVENT_IS_UPSTREAM(event);
     470              : 
     471           28 :     switch (GST_EVENT_TYPE(event))
     472              :     {
     473           13 :     case GST_EVENT_SEEK:
     474              :     {
     475           13 :         gdouble rate{1.0};
     476           13 :         GstFormat seekFormat{GST_FORMAT_UNDEFINED};
     477           13 :         GstSeekFlags flags{GST_SEEK_FLAG_NONE};
     478           13 :         GstSeekType startType{GST_SEEK_TYPE_NONE}, stopType{GST_SEEK_TYPE_NONE};
     479           13 :         gint64 start{0}, stop{0};
     480           13 :         if (event)
     481              :         {
     482           13 :             gst_event_parse_seek(event, &rate, &seekFormat, &flags, &startType, &start, &stopType, &stop);
     483              : 
     484           13 :             if (flags & GST_SEEK_FLAG_FLUSH)
     485              :             {
     486            9 :                 if (seekFormat == GST_FORMAT_TIME && startType == GST_SEEK_TYPE_END)
     487              :                 {
     488            1 :                     GST_ERROR_OBJECT(m_sink, "GST_SEEK_TYPE_END seek is not supported");
     489            1 :                     gst_event_unref(event);
     490            5 :                     return FALSE;
     491              :                 }
     492              :                 // Update last segment
     493            8 :                 if (seekFormat == GST_FORMAT_TIME)
     494              :                 {
     495            7 :                     gboolean update{FALSE};
     496            7 :                     std::lock_guard<std::mutex> lock(m_sinkMutex);
     497            7 :                     gst_segment_do_seek(&m_lastSegment, rate, seekFormat, flags, startType, start, stopType, stop,
     498              :                                         &update);
     499              :                 }
     500              :             }
     501              : #if GST_CHECK_VERSION(1, 18, 0)
     502            4 :             else if (flags & GST_SEEK_FLAG_INSTANT_RATE_CHANGE)
     503              :             {
     504            2 :                 gdouble rateMultiplier = rate / m_lastSegment.rate;
     505            2 :                 GstEvent *rateChangeEvent = gst_event_new_instant_rate_change(rateMultiplier, (GstSegmentFlags)flags);
     506            2 :                 gst_event_set_seqnum(rateChangeEvent, gst_event_get_seqnum(event));
     507            2 :                 gst_event_unref(event);
     508            2 :                 if (gst_pad_send_event(m_sinkPad, rateChangeEvent) != TRUE)
     509              :                 {
     510            1 :                     GST_ERROR_OBJECT(m_sink, "Sending instant rate change failed.");
     511            1 :                     return FALSE;
     512              :                 }
     513            1 :                 return TRUE;
     514              :             }
     515              : #endif
     516              :             else
     517              :             {
     518            2 :                 GST_WARNING_OBJECT(m_sink, "Seek with flags 0x%X is not supported", flags);
     519            2 :                 gst_event_unref(event);
     520            2 :                 return FALSE;
     521              :             }
     522              :         }
     523            8 :         break;
     524              :     }
     525              : #if GST_CHECK_VERSION(1, 18, 0)
     526            2 :     case GST_EVENT_INSTANT_RATE_SYNC_TIME:
     527              :     {
     528            2 :         double rate{0.0};
     529            2 :         GstClockTime runningTime{GST_CLOCK_TIME_NONE}, upstreamRunningTime{GST_CLOCK_TIME_NONE};
     530            2 :         guint32 seqnum = gst_event_get_seqnum(event);
     531            2 :         gst_event_parse_instant_rate_sync_time(event, &rate, &runningTime, &upstreamRunningTime);
     532              : 
     533            2 :         std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
     534            2 :         if ((client) && (m_mediaPlayerManager.hasControl()))
     535              :         {
     536            2 :             GST_DEBUG_OBJECT(m_sink, "Instant playback rate change: %.2f", rate);
     537            2 :             m_currentInstantRateChangeSeqnum = seqnum;
     538            2 :             client->setPlaybackRate(rate);
     539              :         }
     540            2 :         break;
     541              :     }
     542              : #endif
     543           13 :     default:
     544           13 :         break;
     545              :     }
     546              : 
     547           23 :     if (shouldForwardUpstream)
     548              :     {
     549           23 :         bool result = gst_pad_push_event(m_sinkPad, event);
     550           23 :         if (!result)
     551              :         {
     552           10 :             GST_DEBUG_OBJECT(m_sink, "forwarding upstream event '%s' failed", GST_EVENT_TYPE_NAME(event));
     553              :         }
     554              : 
     555           23 :         return result;
     556              :     }
     557              : 
     558            0 :     gst_event_unref(event);
     559            0 :     return TRUE;
     560              : }
     561              : 
     562          194 : gboolean PullModePlaybackDelegate::handleEvent(GstPad *pad, GstObject *parent, GstEvent *event)
     563              : {
     564          194 :     GST_DEBUG_OBJECT(m_sink, "handling event %" GST_PTR_FORMAT, event);
     565          194 :     switch (GST_EVENT_TYPE(event))
     566              :     {
     567            7 :     case GST_EVENT_SEGMENT:
     568              :     {
     569            7 :         copySegment(event);
     570            7 :         setSegment();
     571            7 :         break;
     572              :     }
     573            3 :     case GST_EVENT_EOS:
     574              :     {
     575            3 :         std::lock_guard<std::mutex> lock(m_sinkMutex);
     576            3 :         m_isEos = true;
     577            3 :         break;
     578              :     }
     579          145 :     case GST_EVENT_CAPS:
     580              :     {
     581              :         GstCaps *caps;
     582          145 :         gst_event_parse_caps(event, &caps);
     583              :         {
     584          145 :             std::lock_guard<std::mutex> lock(m_sinkMutex);
     585          145 :             if (m_caps)
     586              :             {
     587            4 :                 if (!gst_caps_is_equal(caps, m_caps))
     588              :                 {
     589            1 :                     gst_caps_unref(m_caps);
     590            1 :                     m_caps = gst_caps_copy(caps);
     591              :                 }
     592              :             }
     593              :             else
     594              :             {
     595          141 :                 m_caps = gst_caps_copy(caps);
     596              :             }
     597          145 :         }
     598          145 :         break;
     599              :     }
     600            1 :     case GST_EVENT_SINK_MESSAGE:
     601              :     {
     602            1 :         GstMessage *message = nullptr;
     603            1 :         gst_event_parse_sink_message(event, &message);
     604              : 
     605            1 :         if (message)
     606              :         {
     607            1 :             gst_element_post_message(m_sink, message);
     608              :         }
     609              : 
     610            1 :         break;
     611              :     }
     612           10 :     case GST_EVENT_CUSTOM_DOWNSTREAM:
     613              :     case GST_EVENT_CUSTOM_DOWNSTREAM_OOB:
     614              :     {
     615           10 :         if (gst_event_has_name(event, "custom-instant-rate-change"))
     616              :         {
     617            2 :             GST_DEBUG_OBJECT(m_sink, "Change rate event received");
     618            2 :             changePlaybackRate(event);
     619              :         }
     620           10 :         break;
     621              :     }
     622           14 :     case GST_EVENT_FLUSH_START:
     623              :     {
     624           14 :         startFlushing();
     625           14 :         break;
     626              :     }
     627            4 :     case GST_EVENT_FLUSH_STOP:
     628              :     {
     629            4 :         gboolean resetTime{FALSE};
     630            4 :         gst_event_parse_flush_stop(event, &resetTime);
     631              : 
     632            4 :         stopFlushing(resetTime);
     633            4 :         break;
     634              :     }
     635            3 :     case GST_EVENT_STREAM_COLLECTION:
     636              :     {
     637            3 :         std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
     638            3 :         if (!client)
     639              :         {
     640            1 :             gst_event_unref(event);
     641            1 :             return FALSE;
     642              :         }
     643            2 :         int32_t videoStreams{0}, audioStreams{0}, textStreams{0};
     644            2 :         GstStreamCollection *streamCollection{nullptr};
     645            2 :         gst_event_parse_stream_collection(event, &streamCollection);
     646            2 :         guint streamsSize = gst_stream_collection_get_size(streamCollection);
     647            6 :         for (guint i = 0; i < streamsSize; ++i)
     648              :         {
     649            4 :             auto *stream = gst_stream_collection_get_stream(streamCollection, i);
     650            4 :             auto type = gst_stream_get_stream_type(stream);
     651            4 :             if (type & GST_STREAM_TYPE_AUDIO)
     652              :             {
     653            2 :                 ++audioStreams;
     654              :             }
     655            2 :             else if (type & GST_STREAM_TYPE_VIDEO)
     656              :             {
     657            1 :                 ++videoStreams;
     658              :             }
     659            1 :             else if (type & GST_STREAM_TYPE_TEXT)
     660              :             {
     661            1 :                 ++textStreams;
     662              :             }
     663              :         }
     664            2 :         gst_object_unref(streamCollection);
     665            2 :         client->handleStreamCollection(audioStreams, videoStreams, textStreams);
     666            2 :         client->sendAllSourcesAttachedIfPossible();
     667            2 :         break;
     668            3 :     }
     669              : #if GST_CHECK_VERSION(1, 18, 0)
     670            4 :     case GST_EVENT_INSTANT_RATE_CHANGE:
     671              :     {
     672            4 :         guint32 seqnum = gst_event_get_seqnum(event);
     673            7 :         if (m_lastInstantRateChangeSeqnum == seqnum || m_currentInstantRateChangeSeqnum.load() == seqnum)
     674              :         {
     675              :             /* Ignore if we already received the instant-rate-sync-time event from the pipeline */
     676            2 :             GST_DEBUG_OBJECT(m_sink, "Instant rate change event with seqnum %u already handled. Ignoring...", seqnum);
     677            2 :             break;
     678              :         }
     679              : 
     680            2 :         m_lastInstantRateChangeSeqnum = seqnum;
     681            2 :         gdouble rate{0.0};
     682            2 :         GstSegmentFlags flags{GST_SEGMENT_FLAG_NONE};
     683            2 :         gst_event_parse_instant_rate_change(event, &rate, &flags);
     684            2 :         GstMessage *msg = gst_message_new_instant_rate_request(GST_OBJECT_CAST(m_sink), rate);
     685            2 :         gst_message_set_seqnum(msg, seqnum);
     686            2 :         gst_element_post_message(m_sink, msg);
     687            2 :         break;
     688              :     }
     689              : #endif
     690            3 :     default:
     691            3 :         break;
     692              :     }
     693              : 
     694          193 :     gst_event_unref(event);
     695              : 
     696          193 :     return TRUE;
     697              : }
     698              : 
     699            7 : void PullModePlaybackDelegate::copySegment(GstEvent *event)
     700              : {
     701            7 :     std::lock_guard<std::mutex> lock(m_sinkMutex);
     702            7 :     gst_event_copy_segment(event, &m_lastSegment);
     703              : }
     704              : 
     705            7 : void PullModePlaybackDelegate::setSegment()
     706              : {
     707            7 :     std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
     708            7 :     if (!client)
     709              :     {
     710            1 :         GST_ERROR_OBJECT(m_sink, "Could not get the media player client");
     711            1 :         return;
     712              :     }
     713            6 :     const bool kResetTime{m_lastSegment.flags == GST_SEGMENT_FLAG_RESET};
     714            6 :     int64_t position = static_cast<int64_t>(m_lastSegment.start);
     715              :     {
     716            6 :         std::unique_lock lock{m_sinkMutex};
     717            6 :         m_initialPositionSet = true;
     718            6 :         if (m_queuedOffset)
     719              :         {
     720            1 :             position = m_queuedOffset.value();
     721            1 :             m_queuedOffset.reset();
     722              :         }
     723            6 :     }
     724            6 :     client->setSourcePosition(m_sourceId, position, kResetTime, m_lastSegment.applied_rate, m_lastSegment.stop);
     725            7 : }
     726              : 
     727            2 : void PullModePlaybackDelegate::changePlaybackRate(GstEvent *event)
     728              : {
     729            2 :     const GstStructure *structure{gst_event_get_structure(event)};
     730            2 :     gdouble playbackRate{1.0};
     731            2 :     if (gst_structure_get_double(structure, "rate", &playbackRate) == TRUE)
     732              :     {
     733            2 :         std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
     734            2 :         if (client && m_mediaPlayerManager.hasControl())
     735              :         {
     736            1 :             GST_DEBUG_OBJECT(m_sink, "Instant playback rate change: %.2f", playbackRate);
     737            1 :             client->setPlaybackRate(playbackRate);
     738              :         }
     739            2 :     }
     740              : }
     741              : 
     742           14 : void PullModePlaybackDelegate::startFlushing()
     743              : {
     744           14 :     std::lock_guard<std::mutex> lock(m_sinkMutex);
     745           14 :     if (!m_isFlushOngoing)
     746              :     {
     747           14 :         GST_INFO_OBJECT(m_sink, "Starting flushing");
     748           14 :         if (m_isEos)
     749              :         {
     750            2 :             GST_DEBUG_OBJECT(m_sink, "Flush will clear EOS state.");
     751            2 :             m_isEos = false;
     752              :         }
     753           14 :         m_isFlushOngoing = true;
     754              :         // We expect to receive a new gst segment after flush
     755           14 :         m_initialPositionSet = false;
     756           14 :         clearBuffersUnlocked();
     757              :     }
     758              : }
     759              : 
     760            4 : void PullModePlaybackDelegate::stopFlushing(bool resetTime)
     761              : {
     762            4 :     GST_INFO_OBJECT(m_sink, "Stopping flushing");
     763            4 :     flushServer(resetTime);
     764            4 :     std::lock_guard<std::mutex> lock(m_sinkMutex);
     765            4 :     m_isFlushOngoing = false;
     766              : 
     767            4 :     if (resetTime)
     768              :     {
     769            4 :         GST_DEBUG_OBJECT(m_sink, "sending reset_time message");
     770            4 :         gst_element_post_message(m_sink, gst_message_new_reset_time(GST_OBJECT_CAST(m_sink), 0));
     771              :     }
     772              : }
     773              : 
     774            4 : void PullModePlaybackDelegate::flushServer(bool resetTime)
     775              : {
     776            4 :     std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
     777            4 :     if (!client)
     778              :     {
     779            1 :         GST_ERROR_OBJECT(m_sink, "Could not get the media player client");
     780            1 :         return;
     781              :     }
     782              : 
     783            3 :     std::unique_lock<std::mutex> lock(m_flushMutex);
     784            3 :     GST_INFO_OBJECT(m_sink, "Flushing sink with sourceId %d", m_sourceId.load());
     785            3 :     client->flush(m_sourceId, resetTime);
     786            3 :     if (m_sourceAttached)
     787              :     {
     788            2 :         m_flushCondVariable.wait(lock);
     789              :     }
     790              :     else
     791              :     {
     792            1 :         GST_DEBUG_OBJECT(m_sink, "Skip waiting for flush finish - source not attached yet.");
     793              :     }
     794            4 : }
     795              : 
     796           32 : GstFlowReturn PullModePlaybackDelegate::handleBuffer(GstBuffer *buffer)
     797              : {
     798           32 :     constexpr size_t kMaxInternalBuffersQueueSize = 24;
     799           32 :     GST_LOG_OBJECT(m_sink, "Handling buffer %p with PTS %" GST_TIME_FORMAT, buffer,
     800              :                    GST_TIME_ARGS(GST_BUFFER_PTS(buffer)));
     801              : 
     802           32 :     std::unique_lock<std::mutex> lock(m_sinkMutex);
     803              : 
     804           32 :     if (m_samples.size() >= kMaxInternalBuffersQueueSize)
     805              :     {
     806            1 :         GST_DEBUG_OBJECT(m_sink, "Waiting for more space in buffers queue\n");
     807            1 :         m_needDataCondVariable.wait(lock);
     808              :     }
     809              : 
     810           32 :     if (m_isFlushOngoing)
     811              :     {
     812            3 :         GST_DEBUG_OBJECT(m_sink, "Discarding buffer which was received during flushing");
     813            3 :         gst_buffer_unref(buffer);
     814            3 :         return GST_FLOW_FLUSHING;
     815              :     }
     816              : 
     817           29 :     GstSample *sample = gst_sample_new(buffer, m_caps, &m_lastSegment, nullptr);
     818           29 :     if (sample)
     819           29 :         m_samples.push(sample);
     820              :     else
     821            0 :         GST_ERROR_OBJECT(m_sink, "Failed to create a sample");
     822              : 
     823           29 :     gst_buffer_unref(buffer);
     824              : 
     825           29 :     return GST_FLOW_OK;
     826           32 : }
     827              : 
     828            1 : GstRefSample PullModePlaybackDelegate::getFrontSample() const
     829              : {
     830            1 :     std::lock_guard<std::mutex> lock(m_sinkMutex);
     831            1 :     if (!m_samples.empty())
     832              :     {
     833            1 :         GstSample *sample = m_samples.front();
     834            1 :         GstBuffer *buffer = gst_sample_get_buffer(sample);
     835            1 :         GST_LOG_OBJECT(m_sink, "Pulling buffer %p with PTS %" GST_TIME_FORMAT, buffer,
     836              :                        GST_TIME_ARGS(GST_BUFFER_PTS(buffer)));
     837              : 
     838            1 :         return GstRefSample{sample};
     839              :     }
     840              : 
     841            0 :     return GstRefSample{};
     842            1 : }
     843              : 
     844            1 : void PullModePlaybackDelegate::popSample()
     845              : {
     846            1 :     std::lock_guard<std::mutex> lock(m_sinkMutex);
     847            1 :     if (!m_samples.empty())
     848              :     {
     849            1 :         gst_sample_unref(m_samples.front());
     850            1 :         m_samples.pop();
     851              :     }
     852            1 :     m_needDataCondVariable.notify_all();
     853              : }
     854              : 
     855            0 : bool PullModePlaybackDelegate::isEos() const
     856              : {
     857            0 :     std::lock_guard<std::mutex> lock(m_sinkMutex);
     858            0 :     return m_samples.empty() && m_isEos;
     859              : }
     860              : 
     861            4 : void PullModePlaybackDelegate::lostState()
     862              : {
     863            4 :     m_isStateCommitNeeded = true;
     864            4 :     gst_element_lost_state(m_sink);
     865              : }
     866              : 
     867          149 : bool PullModePlaybackDelegate::attachToMediaClientAndSetStreamsNumber(const uint32_t maxVideoWidth,
     868              :                                                                       const uint32_t maxVideoHeight)
     869              : {
     870          149 :     GstObject *parentObject = getOldestGstBinParent(m_sink);
     871          149 :     if (!m_mediaPlayerManager.attachMediaPlayerClient(parentObject, maxVideoWidth, maxVideoHeight))
     872              :     {
     873            3 :         GST_ERROR_OBJECT(m_sink, "Cannot attach the MediaPlayerClient");
     874            3 :         return false;
     875              :     }
     876              : 
     877          146 :     gchar *parentObjectName = gst_object_get_name(parentObject);
     878          146 :     GST_INFO_OBJECT(m_sink, "Attached media player client with parent %s(%p)", parentObjectName, parentObject);
     879          146 :     g_free(parentObjectName);
     880              : 
     881          146 :     return setStreamsNumber(parentObject);
     882              : }
     883              : 
     884          146 : bool PullModePlaybackDelegate::setStreamsNumber(GstObject *parentObject)
     885              : {
     886          146 :     int32_t videoStreams{-1}, audioStreams{-1}, subtitleStreams{-1};
     887              : 
     888          146 :     GstContext *context = gst_element_get_context(m_sink, "streams-info");
     889          146 :     if (context)
     890              :     {
     891            3 :         GST_DEBUG_OBJECT(m_sink, "Getting number of streams from \"streams-info\" context");
     892              : 
     893            3 :         guint n_video{0}, n_audio{0}, n_text{0};
     894              : 
     895            3 :         const GstStructure *streamsInfoStructure = gst_context_get_structure(context);
     896            3 :         gst_structure_get_uint(streamsInfoStructure, "video-streams", &n_video);
     897            3 :         gst_structure_get_uint(streamsInfoStructure, "audio-streams", &n_audio);
     898            3 :         gst_structure_get_uint(streamsInfoStructure, "text-streams", &n_text);
     899              : 
     900            5 :         if (n_video > std::numeric_limits<int32_t>::max() || n_audio > std::numeric_limits<int32_t>::max() ||
     901            2 :             n_text > std::numeric_limits<int32_t>::max())
     902              :         {
     903            1 :             GST_ERROR_OBJECT(m_sink, "Number of streams is too big, video=%u, audio=%u, text=%u", n_video, n_audio,
     904              :                              n_text);
     905            1 :             gst_context_unref(context);
     906            1 :             return false;
     907              :         }
     908              : 
     909            2 :         videoStreams = n_video;
     910            2 :         audioStreams = n_audio;
     911            2 :         subtitleStreams = n_text;
     912              : 
     913            2 :         gst_context_unref(context);
     914              :     }
     915          143 :     else if (getNStreamsFromParent(parentObject, videoStreams, audioStreams, subtitleStreams))
     916              :     {
     917            2 :         GST_DEBUG_OBJECT(m_sink, "Got number of streams from playbin2 properties");
     918              :     }
     919              :     else
     920              :     {
     921              :         // The default value of streams is V:1, A:1, S:0
     922              :         // Changing the default setting via properties is considered as DEPRECATED
     923          141 :         subtitleStreams = 0;
     924          141 :         std::lock_guard<std::mutex> lock{m_sinkMutex};
     925          141 :         if (m_mediaSourceType == firebolt::rialto::MediaSourceType::VIDEO)
     926              :         {
     927           27 :             videoStreams = m_numOfStreams;
     928           27 :             if (m_isSinglePathStream)
     929              :             {
     930           26 :                 audioStreams = 0;
     931           26 :                 subtitleStreams = 0;
     932              :             }
     933              :         }
     934          114 :         else if (m_mediaSourceType == firebolt::rialto::MediaSourceType::AUDIO)
     935              :         {
     936          103 :             audioStreams = m_numOfStreams;
     937          103 :             if (m_isSinglePathStream)
     938              :             {
     939          102 :                 videoStreams = 0;
     940          102 :                 subtitleStreams = 0;
     941              :             }
     942              :         }
     943           11 :         else if (m_mediaSourceType == firebolt::rialto::MediaSourceType::SUBTITLE)
     944              :         {
     945           11 :             subtitleStreams = m_numOfStreams;
     946           11 :             if (m_isSinglePathStream)
     947              :             {
     948           11 :                 videoStreams = 0;
     949           11 :                 audioStreams = 0;
     950              :             }
     951              :         }
     952          141 :     }
     953              : 
     954          145 :     std::shared_ptr<GStreamerMSEMediaPlayerClient> client = m_mediaPlayerManager.getMediaPlayerClient();
     955          145 :     if (!client)
     956              :     {
     957            0 :         GST_ERROR_OBJECT(m_sink, "MediaPlayerClient is nullptr");
     958            0 :         return false;
     959              :     }
     960              : 
     961          145 :     client->handleStreamCollection(audioStreams, videoStreams, subtitleStreams);
     962              : 
     963          145 :     return true;
     964              : }
        

Generated by: LCOV version 2.0-1