LCOV - code coverage report
Current view: top level - media/server/gstplayer/source - GstDispatcherThread.cpp (source / functions) Coverage Total Hit
Test: coverage.info Lines: 93.1 % 58 54
Test Date: 2026-02-18 13:48:45 Functions: 100.0 % 4 4

            Line data    Source code
       1              : /*
       2              :  * If not stated otherwise in this file or this component's LICENSE file the
       3              :  * following copyright and licenses apply:
       4              :  *
       5              :  * Copyright 2022 Sky UK
       6              :  *
       7              :  * Licensed under the Apache License, Version 2.0 (the "License");
       8              :  * you may not use this file except in compliance with the License.
       9              :  * You may obtain a copy of the License at
      10              :  *
      11              :  * http://www.apache.org/licenses/LICENSE-2.0
      12              :  *
      13              :  * Unless required by applicable law or agreed to in writing, software
      14              :  * distributed under the License is distributed on an "AS IS" BASIS,
      15              :  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      16              :  * See the License for the specific language governing permissions and
      17              :  * limitations under the License.
      18              :  */
      19              : 
      20              : #include "GstDispatcherThread.h"
      21              : #include "RialtoServerLogging.h"
      22              : 
      23              : namespace firebolt::rialto::server
      24              : {
      25            2 : std::unique_ptr<IGstDispatcherThread> GstDispatcherThreadFactory::createGstDispatcherThread(
      26              :     IGstDispatcherThreadClient &client, GstElement *pipeline,
      27              :     const std::shared_ptr<IFlushOnPrerollController> &flushOnPrerollController,
      28              :     const std::shared_ptr<firebolt::rialto::wrappers::IGstWrapper> &gstWrapper) const
      29              : {
      30            2 :     return std::make_unique<GstDispatcherThread>(client, pipeline, flushOnPrerollController, gstWrapper);
      31              : }
      32              : 
      33            9 : GstDispatcherThread::GstDispatcherThread(IGstDispatcherThreadClient &client, GstElement *pipeline,
      34              :                                          const std::shared_ptr<IFlushOnPrerollController> &flushOnPrerollController,
      35            9 :                                          const std::shared_ptr<firebolt::rialto::wrappers::IGstWrapper> &gstWrapper)
      36            9 :     : m_client{client}, m_flushOnPrerollController{flushOnPrerollController}, m_gstWrapper{gstWrapper},
      37           18 :       m_isGstreamerDispatcherActive{true}
      38              : {
      39            9 :     RIALTO_SERVER_LOG_INFO("GstDispatcherThread is starting");
      40            9 :     m_gstBusDispatcherThread = std::thread(&GstDispatcherThread::gstBusEventHandler, this, pipeline);
      41              : }
      42              : 
      43           18 : GstDispatcherThread::~GstDispatcherThread()
      44              : {
      45            9 :     RIALTO_SERVER_LOG_INFO("Stopping GstDispatcherThread");
      46            9 :     m_isGstreamerDispatcherActive = false;
      47            9 :     if (m_gstBusDispatcherThread.joinable())
      48              :     {
      49            9 :         m_gstBusDispatcherThread.join();
      50              :     }
      51           18 : }
      52              : 
      53            9 : void GstDispatcherThread::gstBusEventHandler(GstElement *pipeline)
      54              : {
      55            9 :     GstBus *bus = m_gstWrapper->gstPipelineGetBus(GST_PIPELINE(pipeline));
      56            9 :     if (!bus)
      57              :     {
      58            2 :         RIALTO_SERVER_LOG_ERROR("Failed to get gst bus");
      59            2 :         return;
      60              :     }
      61              : 
      62           26 :     while (m_isGstreamerDispatcherActive)
      63              :     {
      64              :         GstMessage *message =
      65           12 :             m_gstWrapper->gstBusTimedPopFiltered(bus, 100 * GST_MSECOND,
      66              :                                                  static_cast<GstMessageType>(GST_MESSAGE_STATE_CHANGED |
      67              :                                                                              GST_MESSAGE_QOS | GST_MESSAGE_EOS |
      68              :                                                                              GST_MESSAGE_ERROR | GST_MESSAGE_WARNING));
      69              : 
      70           12 :         if (message)
      71              :         {
      72           11 :             bool shouldHandleMessage{true};
      73           11 :             if (GST_MESSAGE_SRC(message) == GST_OBJECT(pipeline))
      74              :             {
      75           10 :                 switch (GST_MESSAGE_TYPE(message))
      76              :                 {
      77            4 :                 case GST_MESSAGE_STATE_CHANGED:
      78              :                 {
      79              :                     GstState oldState, newState, pending;
      80            4 :                     m_gstWrapper->gstMessageParseStateChanged(message, &oldState, &newState, &pending);
      81            4 :                     switch (newState)
      82              :                     {
      83            1 :                     case GST_STATE_NULL:
      84              :                     {
      85            1 :                         m_isGstreamerDispatcherActive = false;
      86            1 :                         if (m_flushOnPrerollController)
      87              :                         {
      88            1 :                             m_flushOnPrerollController->reset();
      89              :                         }
      90            1 :                         break;
      91              :                     }
      92            2 :                     case GST_STATE_PAUSED:
      93              :                     {
      94            2 :                         if (m_flushOnPrerollController && pending != GST_STATE_PAUSED)
      95              :                         {
      96            1 :                             m_flushOnPrerollController->stateReached(newState);
      97              :                         }
      98            1 :                         else if (m_flushOnPrerollController && pending == GST_STATE_PAUSED)
      99              :                         {
     100            1 :                             m_flushOnPrerollController->setPrerolling();
     101              :                         }
     102            2 :                         break;
     103              :                     }
     104            1 :                     case GST_STATE_PLAYING:
     105              :                     {
     106            1 :                         if (m_flushOnPrerollController)
     107              :                         {
     108            1 :                             m_flushOnPrerollController->stateReached(newState);
     109              :                         }
     110            1 :                         break;
     111              :                     }
     112            0 :                     case GST_STATE_READY:
     113              :                     case GST_STATE_VOID_PENDING:
     114              :                     {
     115            0 :                         break;
     116              :                     }
     117              :                     }
     118            4 :                     break;
     119              :                 }
     120            6 :                 case GST_MESSAGE_ERROR:
     121              :                 {
     122            6 :                     m_isGstreamerDispatcherActive = false;
     123            6 :                     break;
     124              :                 }
     125            0 :                 default:
     126              :                 {
     127            0 :                     break;
     128              :                 }
     129              :                 }
     130              :             }
     131            1 :             else if (GST_MESSAGE_STATE_CHANGED == GST_MESSAGE_TYPE(message))
     132              :             {
     133              :                 // Skip handling GST_MESSAGE_STATE_CHANGED for non-pipeline objects.
     134              :                 // It signifficantly slows down rialto gst worker thread
     135            1 :                 shouldHandleMessage = false;
     136            1 :                 m_gstWrapper->gstMessageUnref(message);
     137              :             }
     138              : 
     139           11 :             if (shouldHandleMessage)
     140              :             {
     141           10 :                 m_client.handleBusMessage(message);
     142              :             }
     143              :         }
     144              :     }
     145              : 
     146            7 :     RIALTO_SERVER_LOG_INFO("Gstbus dispatcher exitting");
     147            7 :     m_gstWrapper->gstObjectUnref(bus);
     148              : }
     149              : } // namespace firebolt::rialto::server
        

Generated by: LCOV version 2.0-1