LCOV - code coverage report
Current view: top level - media/server/gstplayer/source/tasks/generic - HandleBusMessage.cpp (source / functions) Coverage Total Hit
Test: coverage.info Lines: 94.2 % 138 130
Test Date: 2026-05-14 05:59:51 Functions: 100.0 % 4 4

            Line data    Source code
       1              : /*
       2              :  * If not stated otherwise in this file or this component's LICENSE file the
       3              :  * following copyright and licenses apply:
       4              :  *
       5              :  * Copyright 2022 Sky UK
       6              :  *
       7              :  * Licensed under the Apache License, Version 2.0 (the "License");
       8              :  * you may not use this file except in compliance with the License.
       9              :  * You may obtain a copy of the License at
      10              :  *
      11              :  * http://www.apache.org/licenses/LICENSE-2.0
      12              :  *
      13              :  * Unless required by applicable law or agreed to in writing, software
      14              :  * distributed under the License is distributed on an "AS IS" BASIS,
      15              :  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      16              :  * See the License for the specific language governing permissions and
      17              :  * limitations under the License.
      18              :  */
      19              : 
      20              : #include "tasks/generic/HandleBusMessage.h"
      21              : #include "GenericPlayerContext.h"
      22              : #include "IGstGenericPlayerClient.h"
      23              : #include "IGstWrapper.h"
      24              : #include "RialtoServerLogging.h"
      25              : 
      26              : namespace firebolt::rialto::server::tasks::generic
      27              : {
      28           36 : HandleBusMessage::HandleBusMessage(GenericPlayerContext &context, IGstGenericPlayerPrivate &player,
      29              :                                    IGstGenericPlayerClient *client,
      30              :                                    const std::shared_ptr<firebolt::rialto::wrappers::IGstWrapper> &gstWrapper,
      31              :                                    const std::shared_ptr<firebolt::rialto::wrappers::IGlibWrapper> &glibWrapper,
      32           36 :                                    GstMessage *message, const IFlushWatcher &flushWatcher)
      33           36 :     : m_context{context}, m_player{player}, m_gstPlayerClient{client}, m_gstWrapper{gstWrapper},
      34           36 :       m_glibWrapper{glibWrapper}, m_message{message}, m_flushWatcher{flushWatcher},
      35           36 :       m_isFlushOngoingDuringCreation{flushWatcher.isFlushOngoing()},
      36           72 :       m_isAsyncFlushOngoingDuringCreation{flushWatcher.isAsyncFlushOngoing()}
      37              : {
      38           36 :     RIALTO_SERVER_LOG_DEBUG("Constructing HandleBusMessage");
      39              : }
      40              : 
      41           37 : HandleBusMessage::~HandleBusMessage()
      42              : {
      43           36 :     RIALTO_SERVER_LOG_DEBUG("HandleBusMessage finished");
      44           37 : }
      45              : 
      46           35 : void HandleBusMessage::execute() const
      47              : {
      48           35 :     RIALTO_SERVER_LOG_DEBUG("Executing HandleBusMessage");
      49           35 :     switch (GST_MESSAGE_TYPE(m_message))
      50              :     {
      51           14 :     case GST_MESSAGE_STATE_CHANGED:
      52              :     {
      53           14 :         if (m_context.pipeline && GST_MESSAGE_SRC(m_message) == GST_OBJECT(m_context.pipeline))
      54              :         {
      55              :             GstState oldState, newState, pending;
      56           12 :             m_gstWrapper->gstMessageParseStateChanged(m_message, &oldState, &newState, &pending);
      57           12 :             const char *oldStateName = m_gstWrapper->gstElementStateGetName(oldState);
      58           12 :             const char *newStateName = m_gstWrapper->gstElementStateGetName(newState);
      59           12 :             const char *pendingStateName = m_gstWrapper->gstElementStateGetName(pending);
      60           12 :             RIALTO_SERVER_LOG_MIL("State changed (old: %s, new: %s, pending: %s)", oldStateName, newStateName,
      61              :                                   pendingStateName);
      62           48 :             auto recordId = m_context.gstProfiler->createRecord("Pipeline State Changed", newStateName);
      63           12 :             if (recordId)
      64            0 :                 m_context.gstProfiler->logRecord(recordId.value());
      65           12 :             if (newState == GST_STATE_PLAYING)
      66            5 :                 m_context.gstProfiler->logPipelineSummary();
      67              : 
      68           36 :             std::string filename = std::string(oldStateName) + "-" + std::string(newStateName);
      69           12 :             m_gstWrapper->gstDebugBinToDotFileWithTs(GST_BIN(m_context.pipeline), GST_DEBUG_GRAPH_SHOW_ALL,
      70              :                                                      filename.c_str());
      71           12 :             if (!m_gstPlayerClient)
      72              :             {
      73            1 :                 break;
      74              :             }
      75           11 :             switch (newState)
      76              :             {
      77            1 :             case GST_STATE_NULL:
      78              :             {
      79            1 :                 m_gstPlayerClient->notifyPlaybackState(PlaybackState::STOPPED);
      80            1 :                 break;
      81              :             }
      82            5 :             case GST_STATE_PAUSED:
      83              :             {
      84            5 :                 m_player.startNotifyPlaybackInfoTimer();
      85            5 :                 m_player.stopPositionReportingAndCheckAudioUnderflowTimer();
      86            5 :                 if (pending != GST_STATE_PAUSED)
      87              :                 {
      88              :                     // If async flush was requested before HandleBusMessage task creation (but it was not executed yet)
      89              :                     // or if async flush was created after HandleBusMessage task creation (but before its execution)
      90              :                     // we can't report playback state, because async flush causes state loss - reported state is probably invalid.
      91            4 :                     if (m_isAsyncFlushOngoingDuringCreation || m_flushWatcher.isAsyncFlushOngoing())
      92              :                     {
      93            2 :                         RIALTO_SERVER_LOG_WARN("Skip PAUSED notification - flush is ongoing");
      94            2 :                         break;
      95              :                     }
      96              :                     // newState==GST_STATE_PAUSED, pending==GST_STATE_PAUSED state transition is received as a result of
      97              :                     // waiting for preroll after seek.
      98              :                     // Subsequent newState==GST_STATE_PAUSED, pending!=GST_STATE_PAUSED transition will
      99              :                     // indicate that the pipeline is prerolled and it reached GST_STATE_PAUSED state after seek.
     100            2 :                     m_gstPlayerClient->notifyPlaybackState(PlaybackState::PAUSED);
     101              :                 }
     102              : 
     103            3 :                 if (m_player.hasSourceType(MediaSourceType::SUBTITLE))
     104              :                 {
     105            0 :                     m_player.stopSubtitleClockResyncTimer();
     106              :                 }
     107            3 :                 break;
     108              :             }
     109            5 :             case GST_STATE_PLAYING:
     110              :             {
     111              :                 // If async flush was requested before HandleBusMessage task creation (but it was not executed yet)
     112              :                 // or if async flush was created after HandleBusMessage task creation (but before its execution)
     113              :                 // we can't report playback state, because async flush causes state loss - reported state is probably invalid.
     114            5 :                 if (m_isAsyncFlushOngoingDuringCreation || m_flushWatcher.isAsyncFlushOngoing())
     115              :                 {
     116            2 :                     RIALTO_SERVER_LOG_WARN("Skip PLAYING notification - flush is ongoing");
     117            2 :                     break;
     118              :                 }
     119            3 :                 if (m_context.pendingPlaybackRate != kNoPendingPlaybackRate)
     120              :                 {
     121            1 :                     m_player.setPendingPlaybackRate();
     122              :                 }
     123            3 :                 m_player.startPositionReportingAndCheckAudioUnderflowTimer();
     124            3 :                 if (m_player.hasSourceType(MediaSourceType::SUBTITLE))
     125              :                 {
     126            0 :                     m_player.startSubtitleClockResyncTimer();
     127              :                 }
     128              : 
     129            3 :                 m_context.isPlaying = true;
     130            3 :                 m_gstPlayerClient->notifyPlaybackState(PlaybackState::PLAYING);
     131            3 :                 break;
     132              :             }
     133            0 :             case GST_STATE_VOID_PENDING:
     134              :             {
     135            0 :                 break;
     136              :             }
     137            0 :             case GST_STATE_READY:
     138              :             {
     139            0 :                 m_player.stopNotifyPlaybackInfoTimer();
     140            0 :                 break;
     141              :             }
     142              :             }
     143           12 :         }
     144           13 :         break;
     145              :     }
     146            6 :     case GST_MESSAGE_EOS:
     147              :     {
     148              :         // If flush was requested before HandleBusMessage task creation (but it was not executed yet)
     149              :         // or if flush was created after HandleBusMessage task creation (but before its execution)
     150              :         // we can't report EOS, because flush clears EOS.
     151            6 :         if (m_isFlushOngoingDuringCreation || m_flushWatcher.isFlushOngoing())
     152              :         {
     153            2 :             RIALTO_SERVER_LOG_WARN("Skip EOS notification - flush is ongoing");
     154            2 :             break;
     155              :         }
     156            4 :         if (m_context.pipeline && GST_MESSAGE_SRC(m_message) == GST_OBJECT(m_context.pipeline))
     157              :         {
     158            2 :             RIALTO_SERVER_LOG_MIL("End of stream reached.");
     159            2 :             if (!m_context.eosNotified && m_gstPlayerClient)
     160              :             {
     161            1 :                 m_gstPlayerClient->notifyPlaybackState(PlaybackState::END_OF_STREAM);
     162            1 :                 m_context.eosNotified = true;
     163              :             }
     164              :         }
     165            4 :         break;
     166              :     }
     167            4 :     case GST_MESSAGE_QOS:
     168              :     {
     169              :         GstFormat format;
     170            4 :         gboolean isLive = FALSE;
     171            4 :         guint64 runningTime = 0;
     172            4 :         guint64 streamTime = 0;
     173            4 :         guint64 timestamp = 0;
     174            4 :         guint64 duration = 0;
     175            4 :         guint64 dropped = 0;
     176            4 :         guint64 processed = 0;
     177              : 
     178            4 :         m_gstWrapper->gstMessageParseQos(m_message, &isLive, &runningTime, &streamTime, &timestamp, &duration);
     179            4 :         m_gstWrapper->gstMessageParseQosStats(m_message, &format, &processed, &dropped);
     180              : 
     181            4 :         if (GST_FORMAT_BUFFERS == format || GST_FORMAT_DEFAULT == format)
     182              :         {
     183            3 :             RIALTO_SERVER_LOG_INFO("QOS message: runningTime  %" G_GUINT64_FORMAT ", streamTime %" G_GUINT64_FORMAT
     184              :                                    ", timestamp %" G_GUINT64_FORMAT ", duration %" G_GUINT64_FORMAT
     185              :                                    ", format %u, processed %" G_GUINT64_FORMAT ", dropped %" G_GUINT64_FORMAT,
     186              :                                    runningTime, streamTime, timestamp, duration, format, processed, dropped);
     187              : 
     188            3 :             if (m_gstPlayerClient)
     189              :             {
     190            3 :                 firebolt::rialto::QosInfo qosInfo = {processed, dropped};
     191              :                 const gchar *klass;
     192            3 :                 klass = m_gstWrapper->gstElementClassGetMetadata(GST_ELEMENT_GET_CLASS(GST_MESSAGE_SRC(m_message)),
     193              :                                                                  GST_ELEMENT_METADATA_KLASS);
     194              : 
     195            3 :                 if (g_strrstr(klass, "Video"))
     196              :                 {
     197            1 :                     m_gstPlayerClient->notifyQos(firebolt::rialto::MediaSourceType::VIDEO, qosInfo);
     198              :                 }
     199            2 :                 else if (g_strrstr(klass, "Audio"))
     200              :                 {
     201            1 :                     m_gstPlayerClient->notifyQos(firebolt::rialto::MediaSourceType::AUDIO, qosInfo);
     202              :                 }
     203              :                 else
     204              :                 {
     205            1 :                     RIALTO_SERVER_LOG_WARN("Unknown source type for class '%s', ignoring QOS Message", klass);
     206              :                 }
     207              :             }
     208            3 :         }
     209              :         else
     210              :         {
     211            1 :             RIALTO_SERVER_LOG_WARN("Received a QOS_MESSAGE with unhandled format %s",
     212              :                                    m_gstWrapper->gstFormatGetName(format));
     213              :         }
     214            4 :         break;
     215              :     }
     216            6 :     case GST_MESSAGE_ERROR:
     217              :     {
     218            6 :         GError *err = nullptr;
     219            6 :         gchar *debug = nullptr;
     220            6 :         m_gstWrapper->gstMessageParseError(m_message, &err, &debug);
     221              : 
     222            6 :         if ((err->domain == GST_STREAM_ERROR) && (allSourcesEos()))
     223              :         {
     224            2 :             RIALTO_SERVER_LOG_WARN("Got stream error from %s. But all streams are ended, so reporting EOS. Error code "
     225              :                                    "%d: %s "
     226              :                                    "(%s).",
     227              :                                    GST_OBJECT_NAME(GST_MESSAGE_SRC(m_message)), err->code, err->message, debug);
     228            2 :             if (!m_context.eosNotified && m_gstPlayerClient)
     229              :             {
     230            1 :                 m_gstPlayerClient->notifyPlaybackState(PlaybackState::END_OF_STREAM);
     231            1 :                 m_context.eosNotified = true;
     232              :             }
     233              :         }
     234              :         else
     235              :         {
     236            4 :             RIALTO_SERVER_LOG_ERROR("Error from %s - %d: %s (%s)", GST_OBJECT_NAME(GST_MESSAGE_SRC(m_message)),
     237              :                                     err->code, err->message, debug);
     238            4 :             m_gstPlayerClient->notifyPlaybackState(PlaybackState::FAILURE);
     239              :         }
     240              : 
     241            6 :         m_glibWrapper->gFree(debug);
     242            6 :         m_glibWrapper->gErrorFree(err);
     243            6 :         break;
     244              :     }
     245            4 :     case GST_MESSAGE_WARNING:
     246              :     {
     247            4 :         PlaybackError rialtoError = PlaybackError::UNKNOWN;
     248            4 :         GError *err = nullptr;
     249            4 :         gchar *debug = nullptr;
     250            4 :         m_gstWrapper->gstMessageParseWarning(m_message, &err, &debug);
     251              : 
     252            4 :         if ((err->domain == GST_STREAM_ERROR) && (err->code == GST_STREAM_ERROR_DECRYPT))
     253              :         {
     254            3 :             RIALTO_SERVER_LOG_WARN("Decrypt error %s - %d: %s (%s)", GST_OBJECT_NAME(GST_MESSAGE_SRC(m_message)),
     255              :                                    err->code, err->message, debug);
     256            3 :             rialtoError = PlaybackError::DECRYPTION;
     257              :         }
     258              :         else
     259              :         {
     260            1 :             RIALTO_SERVER_LOG_WARN("Unknown warning, ignoring %s - %d: %s (%s)",
     261              :                                    GST_OBJECT_NAME(GST_MESSAGE_SRC(m_message)), err->code, err->message, debug);
     262              :         }
     263              : 
     264            4 :         if ((PlaybackError::UNKNOWN != rialtoError) && (m_gstPlayerClient))
     265              :         {
     266            3 :             const gchar *kName = GST_ELEMENT_NAME(GST_ELEMENT(GST_MESSAGE_SRC(m_message)));
     267            3 :             if (g_strrstr(kName, "video"))
     268              :             {
     269            1 :                 m_gstPlayerClient->notifyPlaybackError(firebolt::rialto::MediaSourceType::VIDEO,
     270              :                                                        PlaybackError::DECRYPTION);
     271              :             }
     272            2 :             else if (g_strrstr(kName, "audio"))
     273              :             {
     274            1 :                 m_gstPlayerClient->notifyPlaybackError(firebolt::rialto::MediaSourceType::AUDIO,
     275              :                                                        PlaybackError::DECRYPTION);
     276              :             }
     277              :             else
     278              :             {
     279            1 :                 RIALTO_SERVER_LOG_WARN("Unknown source type for element '%s', not propagating error", kName);
     280              :             }
     281              :         }
     282              : 
     283            4 :         m_glibWrapper->gFree(debug);
     284            4 :         m_glibWrapper->gErrorFree(err);
     285            4 :         break;
     286              :     }
     287            1 :     default:
     288            1 :         break;
     289              :     }
     290              : 
     291           35 :     m_gstWrapper->gstMessageUnref(m_message);
     292              : }
     293              : 
     294            4 : bool HandleBusMessage::allSourcesEos() const
     295              : {
     296            8 :     for (const auto &streamInfo : m_context.streamInfo)
     297              :     {
     298            6 :         const auto eosInfoIt = m_context.endOfStreamInfo.find(streamInfo.first);
     299            6 :         if (eosInfoIt == m_context.endOfStreamInfo.end() || eosInfoIt->second != EosState::SET)
     300              :         {
     301            2 :             return false;
     302              :         }
     303              :     }
     304            2 :     return true;
     305              : }
     306              : } // namespace firebolt::rialto::server::tasks::generic
        

Generated by: LCOV version 2.0-1