LCOV - code coverage report
Current view: top level - media/server/gstplayer/source - GstDispatcherThread.cpp (source / functions) Coverage Total Hit
Test: coverage.info Lines: 95.6 % 45 43
Test Date: 2025-10-07 14:22:52 Functions: 100.0 % 4 4

            Line data    Source code
       1              : /*
       2              :  * If not stated otherwise in this file or this component's LICENSE file the
       3              :  * following copyright and licenses apply:
       4              :  *
       5              :  * Copyright 2022 Sky UK
       6              :  *
       7              :  * Licensed under the Apache License, Version 2.0 (the "License");
       8              :  * you may not use this file except in compliance with the License.
       9              :  * You may obtain a copy of the License at
      10              :  *
      11              :  * http://www.apache.org/licenses/LICENSE-2.0
      12              :  *
      13              :  * Unless required by applicable law or agreed to in writing, software
      14              :  * distributed under the License is distributed on an "AS IS" BASIS,
      15              :  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      16              :  * See the License for the specific language governing permissions and
      17              :  * limitations under the License.
      18              :  */
      19              : 
      20              : #include "GstDispatcherThread.h"
      21              : #include "RialtoServerLogging.h"
      22              : 
      23              : namespace firebolt::rialto::server
      24              : {
      25            2 : std::unique_ptr<IGstDispatcherThread> GstDispatcherThreadFactory::createGstDispatcherThread(
      26              :     IGstDispatcherThreadClient &client, GstElement *pipeline,
      27              :     const std::shared_ptr<firebolt::rialto::wrappers::IGstWrapper> &gstWrapper) const
      28              : {
      29            2 :     return std::make_unique<GstDispatcherThread>(client, pipeline, gstWrapper);
      30              : }
      31              : 
      32            8 : GstDispatcherThread::GstDispatcherThread(IGstDispatcherThreadClient &client, GstElement *pipeline,
      33            8 :                                          const std::shared_ptr<firebolt::rialto::wrappers::IGstWrapper> &gstWrapper)
      34            8 :     : m_client{client}, m_gstWrapper{gstWrapper}, m_isGstreamerDispatcherActive{true}
      35              : {
      36            8 :     RIALTO_SERVER_LOG_INFO("GstDispatcherThread is starting");
      37            8 :     m_gstBusDispatcherThread = std::thread(&GstDispatcherThread::gstBusEventHandler, this, pipeline);
      38              : }
      39              : 
      40           16 : GstDispatcherThread::~GstDispatcherThread()
      41              : {
      42            8 :     RIALTO_SERVER_LOG_INFO("Stopping GstDispatcherThread");
      43            8 :     m_isGstreamerDispatcherActive = false;
      44            8 :     if (m_gstBusDispatcherThread.joinable())
      45              :     {
      46            8 :         m_gstBusDispatcherThread.join();
      47              :     }
      48           16 : }
      49              : 
      50            8 : void GstDispatcherThread::gstBusEventHandler(GstElement *pipeline)
      51              : {
      52            8 :     GstBus *bus = m_gstWrapper->gstPipelineGetBus(GST_PIPELINE(pipeline));
      53            8 :     if (!bus)
      54              :     {
      55            2 :         RIALTO_SERVER_LOG_ERROR("Failed to get gst bus");
      56            2 :         return;
      57              :     }
      58              : 
      59           21 :     while (m_isGstreamerDispatcherActive)
      60              :     {
      61              :         GstMessage *message =
      62            9 :             m_gstWrapper->gstBusTimedPopFiltered(bus, 100 * GST_MSECOND,
      63              :                                                  static_cast<GstMessageType>(GST_MESSAGE_STATE_CHANGED |
      64              :                                                                              GST_MESSAGE_QOS | GST_MESSAGE_EOS |
      65              :                                                                              GST_MESSAGE_ERROR | GST_MESSAGE_WARNING));
      66              : 
      67            9 :         if (message)
      68              :         {
      69            8 :             bool shouldHandleMessage{true};
      70            8 :             bool isPrioritised{false};
      71            8 :             if (GST_MESSAGE_SRC(message) == GST_OBJECT(pipeline))
      72              :             {
      73            7 :                 switch (GST_MESSAGE_TYPE(message))
      74              :                 {
      75            2 :                 case GST_MESSAGE_STATE_CHANGED:
      76              :                 {
      77            2 :                     isPrioritised = true;
      78              :                     GstState oldState, newState, pending;
      79            2 :                     m_gstWrapper->gstMessageParseStateChanged(message, &oldState, &newState, &pending);
      80            2 :                     switch (newState)
      81              :                     {
      82            1 :                     case GST_STATE_NULL:
      83              :                     {
      84            1 :                         m_isGstreamerDispatcherActive = false;
      85              :                     }
      86            2 :                     case GST_STATE_READY:
      87              :                     case GST_STATE_PAUSED:
      88              :                     case GST_STATE_PLAYING:
      89              :                     case GST_STATE_VOID_PENDING:
      90              :                     {
      91            2 :                         break;
      92              :                     }
      93              :                     }
      94            2 :                     break;
      95              :                 }
      96            5 :                 case GST_MESSAGE_ERROR:
      97              :                 {
      98            5 :                     m_isGstreamerDispatcherActive = false;
      99            5 :                     break;
     100              :                 }
     101            0 :                 default:
     102              :                 {
     103            0 :                     break;
     104              :                 }
     105              :                 }
     106              :             }
     107            1 :             else if (GST_MESSAGE_STATE_CHANGED == GST_MESSAGE_TYPE(message))
     108              :             {
     109              :                 // Skip handling GST_MESSAGE_STATE_CHANGED for non-pipeline objects.
     110              :                 // It signifficantly slows down rialto gst worker thread
     111            1 :                 shouldHandleMessage = false;
     112              :             }
     113              : 
     114            8 :             if (shouldHandleMessage)
     115              :             {
     116            7 :                 m_client.handleBusMessage(message, isPrioritised);
     117              :             }
     118              :         }
     119              :     }
     120              : 
     121            6 :     RIALTO_SERVER_LOG_INFO("Gstbus dispatcher exitting");
     122            6 :     m_gstWrapper->gstObjectUnref(bus);
     123              : }
     124              : } // namespace firebolt::rialto::server
        

Generated by: LCOV version 2.0-1