LCOV - code coverage report
Current view: top level - source - GStreamerMSEMediaPlayerClient.cpp (source / functions) Coverage Total Hit
Test: coverage.info Lines: 94.3 % 630 594
Test Date: 2025-10-17 10:59:19 Functions: 98.5 % 132 130

            Line data    Source code
       1              : /*
       2              :  * Copyright (C) 2022 Sky UK
       3              :  *
       4              :  * This library is free software; you can redistribute it and/or
       5              :  * modify it under the terms of the GNU Lesser General Public
       6              :  * License as published by the Free Software Foundation;
       7              :  * version 2.1 of the License.
       8              :  *
       9              :  * This library is distributed in the hope that it will be useful,
      10              :  * but WITHOUT ANY WARRANTY; without even the implied warranty of
      11              :  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
      12              :  * Lesser General Public License for more details.
      13              :  *
      14              :  * You should have received a copy of the GNU Lesser General Public
      15              :  * License along with this library; if not, write to the Free Software
      16              :  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
      17              :  */
      18              : 
      19              : #include "GStreamerMSEMediaPlayerClient.h"
      20              : #include "Constants.h"
      21              : #include "GstreamerCatLog.h"
      22              : #include "RialtoGStreamerMSEBaseSink.h"
      23              : #include "RialtoGStreamerMSEBaseSinkPrivate.h"
      24              : #include "RialtoGStreamerMSEVideoSink.h"
      25              : 
      26              : #include <algorithm>
      27              : #include <chrono>
      28              : #include <thread>
      29              : 
      30              : namespace
      31              : {
      32              : // The start time of segment might differ from the first sample which is injected.
      33              : // That difference should not be bigger than 1 video / audio frame.
      34              : // 1 second is probably erring on the side of caution, but should not have side effect.
      35              : const int64_t segmentStartMaximumDiff = 1000000000;
      36              : const int32_t UNKNOWN_STREAMS_NUMBER = -1;
      37              : 
      38            0 : const char *toString(const firebolt::rialto::PlaybackError &error)
      39              : {
      40            0 :     switch (error)
      41              :     {
      42            0 :     case firebolt::rialto::PlaybackError::DECRYPTION:
      43            0 :         return "DECRYPTION";
      44            0 :     case firebolt::rialto::PlaybackError::UNKNOWN:
      45            0 :         return "UNKNOWN";
      46              :     }
      47            0 :     return "UNKNOWN";
      48              : }
      49            0 : const char *toString(const firebolt::rialto::MediaSourceType &src)
      50              : {
      51            0 :     switch (src)
      52              :     {
      53            0 :     case firebolt::rialto::MediaSourceType::AUDIO:
      54            0 :         return "AUDIO";
      55            0 :     case firebolt::rialto::MediaSourceType::VIDEO:
      56            0 :         return "VIDEO";
      57            0 :     case firebolt::rialto::MediaSourceType::SUBTITLE:
      58            0 :         return "SUBTITLE";
      59            0 :     case firebolt::rialto::MediaSourceType::UNKNOWN:
      60            0 :         return "UNKNOWN";
      61              :     }
      62            0 :     return "UNKNOWN";
      63              : }
      64              : } // namespace
      65              : #define GST_CAT_DEFAULT rialtoGStreamerCat
      66          271 : GStreamerMSEMediaPlayerClient::GStreamerMSEMediaPlayerClient(
      67              :     const std::shared_ptr<IMessageQueueFactory> &messageQueueFactory,
      68              :     const std::shared_ptr<firebolt::rialto::client::MediaPlayerClientBackendInterface> &MediaPlayerClientBackend,
      69          271 :     const uint32_t maxVideoWidth, const uint32_t maxVideoHeight)
      70          271 :     : m_backendQueue{messageQueueFactory->createMessageQueue()}, m_messageQueueFactory{messageQueueFactory},
      71          271 :       m_clientBackend(MediaPlayerClientBackend), m_duration(0), m_audioStreams{UNKNOWN_STREAMS_NUMBER},
      72          271 :       m_videoStreams{UNKNOWN_STREAMS_NUMBER}, m_subtitleStreams{UNKNOWN_STREAMS_NUMBER},
      73          271 :       m_videoRectangle{0, 0, 1920, 1080}, m_streamingStopped(false),
      74          271 :       m_maxWidth(maxVideoWidth == 0 ? DEFAULT_MAX_VIDEO_WIDTH : maxVideoWidth),
      75          813 :       m_maxHeight(maxVideoHeight == 0 ? DEFAULT_MAX_VIDEO_HEIGHT : maxVideoHeight)
      76              : {
      77          271 :     m_backendQueue->start();
      78              : }
      79              : 
      80          271 : GStreamerMSEMediaPlayerClient::~GStreamerMSEMediaPlayerClient()
      81              : {
      82          271 :     stopStreaming();
      83              : }
      84              : 
      85          423 : void GStreamerMSEMediaPlayerClient::stopStreaming()
      86              : {
      87          423 :     if (!m_streamingStopped)
      88              :     {
      89          271 :         m_backendQueue->stop();
      90              : 
      91          332 :         for (auto &source : m_attachedSources)
      92              :         {
      93           61 :             source.second.m_bufferPuller->stop();
      94              :         }
      95              : 
      96          271 :         m_streamingStopped = true;
      97              :     }
      98          423 : }
      99              : 
     100              : // Deletes client backend -> this deletes mediapipeline object
     101          154 : void GStreamerMSEMediaPlayerClient::destroyClientBackend()
     102              : {
     103          154 :     m_clientBackend.reset();
     104              : }
     105              : 
     106            1 : void GStreamerMSEMediaPlayerClient::notifyDuration(int64_t duration)
     107              : {
     108            1 :     m_backendQueue->postMessage(std::make_shared<SetDurationMessage>(duration, m_duration));
     109              : }
     110              : 
     111            1 : void GStreamerMSEMediaPlayerClient::notifyPosition(int64_t position)
     112              : {
     113            1 :     m_backendQueue->postMessage(std::make_shared<SetPositionMessage>(position, m_attachedSources));
     114              : }
     115              : 
     116              : void GStreamerMSEMediaPlayerClient::notifyNativeSize(uint32_t width, uint32_t height, double aspect) {}
     117              : 
     118              : void GStreamerMSEMediaPlayerClient::notifyNetworkState(firebolt::rialto::NetworkState state) {}
     119              : 
     120           55 : void GStreamerMSEMediaPlayerClient::notifyPlaybackState(firebolt::rialto::PlaybackState state)
     121              : {
     122           55 :     m_backendQueue->postMessage(std::make_shared<PlaybackStateMessage>(state, this));
     123              : }
     124              : 
     125              : void GStreamerMSEMediaPlayerClient::notifyVideoData(bool hasData) {}
     126              : 
     127              : void GStreamerMSEMediaPlayerClient::notifyAudioData(bool hasData) {}
     128              : 
     129            9 : void GStreamerMSEMediaPlayerClient::notifyNeedMediaData(
     130              :     int32_t sourceId, size_t frameCount, uint32_t needDataRequestId,
     131              :     const std::shared_ptr<firebolt::rialto::MediaPlayerShmInfo> & /*shmInfo*/)
     132              : {
     133            9 :     m_backendQueue->postMessage(std::make_shared<NeedDataMessage>(sourceId, frameCount, needDataRequestId, this));
     134              : 
     135            9 :     return;
     136              : }
     137              : 
     138              : void GStreamerMSEMediaPlayerClient::notifyCancelNeedMediaData(int sourceId) {}
     139              : 
     140            5 : void GStreamerMSEMediaPlayerClient::notifyQos(int32_t sourceId, const firebolt::rialto::QosInfo &qosInfo)
     141              : {
     142            5 :     m_backendQueue->postMessage(std::make_shared<QosMessage>(sourceId, qosInfo, this));
     143              : }
     144              : 
     145            2 : void GStreamerMSEMediaPlayerClient::notifyBufferUnderflow(int32_t sourceId)
     146              : {
     147            2 :     m_backendQueue->postMessage(std::make_shared<BufferUnderflowMessage>(sourceId, this));
     148              : }
     149              : 
     150            5 : void GStreamerMSEMediaPlayerClient::notifyPlaybackError(int32_t sourceId, firebolt::rialto::PlaybackError error)
     151              : {
     152            5 :     m_backendQueue->postMessage(std::make_shared<PlaybackErrorMessage>(sourceId, error, this));
     153              : }
     154              : 
     155            5 : void GStreamerMSEMediaPlayerClient::notifySourceFlushed(int32_t sourceId)
     156              : {
     157            5 :     m_backendQueue->postMessage(std::make_shared<SourceFlushedMessage>(sourceId, this));
     158              : }
     159              : 
     160            6 : void GStreamerMSEMediaPlayerClient::getPositionDo(int64_t *position, int32_t sourceId)
     161              : {
     162            6 :     auto sourceIt = m_attachedSources.find(sourceId);
     163            6 :     if (sourceIt == m_attachedSources.end())
     164              :     {
     165            1 :         *position = -1;
     166            1 :         return;
     167              :     }
     168              : 
     169            5 :     if (m_clientBackend && m_clientBackend->getPosition(*position))
     170              :     {
     171            3 :         sourceIt->second.m_position = *position;
     172              :     }
     173              :     else
     174              :     {
     175            2 :         *position = sourceIt->second.m_position;
     176              :     }
     177              : }
     178              : 
     179            6 : int64_t GStreamerMSEMediaPlayerClient::getPosition(int32_t sourceId)
     180              : {
     181              :     int64_t position;
     182           12 :     m_backendQueue->callInEventLoop([&]() { getPositionDo(&position, sourceId); });
     183            6 :     return position;
     184              : }
     185              : 
     186            5 : bool GStreamerMSEMediaPlayerClient::setImmediateOutput(int32_t sourceId, bool immediateOutput)
     187              : {
     188            5 :     if (!m_clientBackend)
     189              :     {
     190            1 :         return false;
     191              :     }
     192              : 
     193            4 :     bool status{false};
     194            8 :     m_backendQueue->callInEventLoop([&]() { status = m_clientBackend->setImmediateOutput(sourceId, immediateOutput); });
     195            4 :     return status;
     196              : }
     197              : 
     198            4 : bool GStreamerMSEMediaPlayerClient::getImmediateOutput(int32_t sourceId, bool &immediateOutput)
     199              : {
     200            4 :     if (!m_clientBackend)
     201              :     {
     202            1 :         return false;
     203              :     }
     204              : 
     205            3 :     bool status{false};
     206            6 :     m_backendQueue->callInEventLoop([&]() { status = m_clientBackend->getImmediateOutput(sourceId, immediateOutput); });
     207            3 :     return status;
     208              : }
     209              : 
     210            1 : bool GStreamerMSEMediaPlayerClient::getStats(int32_t sourceId, uint64_t &renderedFrames, uint64_t &droppedFrames)
     211              : {
     212            1 :     if (!m_clientBackend)
     213              :     {
     214            0 :         return false;
     215              :     }
     216              : 
     217            1 :     bool status{false};
     218            1 :     m_backendQueue->callInEventLoop([&]()
     219            1 :                                     { status = m_clientBackend->getStats(sourceId, renderedFrames, droppedFrames); });
     220            1 :     return status;
     221              : }
     222              : 
     223          160 : bool GStreamerMSEMediaPlayerClient::createBackend()
     224              : {
     225          160 :     bool result = false;
     226          320 :     m_backendQueue->callInEventLoop(
     227          160 :         [&]()
     228              :         {
     229          160 :             if (!m_clientBackend)
     230              :             {
     231            1 :                 GST_ERROR("Client backend is NULL");
     232            1 :                 result = false;
     233            1 :                 return;
     234              :             }
     235          159 :             m_clientBackend->createMediaPlayerBackend(shared_from_this(), m_maxWidth, m_maxHeight);
     236              : 
     237          159 :             if (m_clientBackend->isMediaPlayerBackendCreated())
     238              :             {
     239          154 :                 std::string utf8url = "mse://1";
     240          154 :                 firebolt::rialto::MediaType mediaType = firebolt::rialto::MediaType::MSE;
     241          462 :                 if (!m_clientBackend->load(mediaType, "", utf8url))
     242              :                 {
     243            1 :                     GST_ERROR("Could not load RialtoClient");
     244            1 :                     return;
     245              :                 }
     246          153 :                 result = true;
     247          154 :             }
     248              :             else
     249              :             {
     250            5 :                 GST_ERROR("Media player backend could not be created");
     251              :             }
     252              :         });
     253              : 
     254          160 :     return result;
     255              : }
     256              : 
     257           21 : StateChangeResult GStreamerMSEMediaPlayerClient::play(int32_t sourceId)
     258              : {
     259           21 :     StateChangeResult result = StateChangeResult::NOT_ATTACHED;
     260           42 :     m_backendQueue->callInEventLoop(
     261           21 :         [&]()
     262              :         {
     263           21 :             auto sourceIt = m_attachedSources.find(sourceId);
     264           21 :             if (sourceIt == m_attachedSources.end())
     265              :             {
     266            1 :                 GST_ERROR("Cannot play - there's no attached source with id %d", sourceId);
     267            1 :                 result = StateChangeResult::NOT_ATTACHED;
     268            3 :                 return;
     269              :             }
     270              : 
     271           20 :             if (m_serverPlaybackState == firebolt::rialto::PlaybackState::PLAYING)
     272              :             {
     273            2 :                 GST_INFO("Server is already playing");
     274            2 :                 sourceIt->second.m_state = ClientState::PLAYING;
     275              : 
     276            6 :                 if (checkIfAllAttachedSourcesInStates({ClientState::PLAYING}))
     277              :                 {
     278            1 :                     m_clientState = ClientState::PLAYING;
     279              :                 }
     280              : 
     281            2 :                 result = StateChangeResult::SUCCESS_SYNC;
     282            2 :                 return;
     283              :             }
     284              : 
     285           18 :             sourceIt->second.m_state = ClientState::AWAITING_PLAYING;
     286              : 
     287           18 :             if (m_clientState == ClientState::PAUSED)
     288              :             {
     289              :                 // If one source is AWAITING_PLAYING, the other source can still be PLAYING.
     290              :                 // This happends when we are switching out audio.
     291           42 :                 if (checkIfAllAttachedSourcesInStates({ClientState::AWAITING_PLAYING, ClientState::PLAYING}))
     292              :                 {
     293           10 :                     GST_INFO("Sending play command");
     294           10 :                     m_clientBackend->play();
     295           10 :                     m_clientState = ClientState::AWAITING_PLAYING;
     296              :                 }
     297              :                 else
     298              :                 {
     299            4 :                     GST_DEBUG("Not all sources are ready to play");
     300              :                 }
     301              :             }
     302              :             else
     303              :             {
     304            4 :                 GST_WARNING("Not in PAUSED state in %u state", static_cast<uint32_t>(m_clientState));
     305              :             }
     306              : 
     307           18 :             result = StateChangeResult::SUCCESS_ASYNC;
     308           18 :             sourceIt->second.m_delegate->postAsyncStart();
     309              :         });
     310              : 
     311           21 :     return result;
     312              : }
     313              : 
     314          312 : StateChangeResult GStreamerMSEMediaPlayerClient::pause(int32_t sourceId)
     315              : {
     316          312 :     StateChangeResult result = StateChangeResult::NOT_ATTACHED;
     317          624 :     m_backendQueue->callInEventLoop(
     318          312 :         [&]()
     319              :         {
     320          312 :             auto sourceIt = m_attachedSources.find(sourceId);
     321          312 :             if (sourceIt == m_attachedSources.end())
     322              :             {
     323          146 :                 GST_WARNING("Cannot pause - there's no attached source with id %d", sourceId);
     324              : 
     325          146 :                 result = StateChangeResult::NOT_ATTACHED;
     326          146 :                 return;
     327              :             }
     328              : 
     329          166 :             if (m_serverPlaybackState == firebolt::rialto::PlaybackState::PAUSED &&
     330            4 :                 m_clientState != ClientState::AWAITING_PLAYING && m_clientState != ClientState::AWAITING_PAUSED)
     331              :             {
     332              :                 // if the server is already paused and we are not in async, we don't need to send pause command
     333            2 :                 GST_INFO("Server is already paused");
     334            2 :                 sourceIt->second.m_state = ClientState::PAUSED;
     335              : 
     336            6 :                 if (checkIfAllAttachedSourcesInStates({ClientState::PAUSED}))
     337              :                 {
     338            1 :                     m_clientState = ClientState::PAUSED;
     339              :                 }
     340              : 
     341            2 :                 result = StateChangeResult::SUCCESS_SYNC;
     342              :             }
     343              :             else
     344              :             {
     345          164 :                 sourceIt->second.m_state = ClientState::AWAITING_PAUSED;
     346              : 
     347          164 :                 bool shouldPause = false;
     348          164 :                 if (m_clientState == ClientState::READY)
     349              :                 {
     350          456 :                     if (checkIfAllAttachedSourcesInStates({ClientState::AWAITING_PAUSED}))
     351              :                     {
     352          144 :                         shouldPause = true;
     353              :                     }
     354              :                     else
     355              :                     {
     356            8 :                         GST_DEBUG("Not all attached sources are ready to pause");
     357              :                     }
     358              :                 }
     359           12 :                 else if (m_clientState == ClientState::AWAITING_PLAYING || m_clientState == ClientState::PLAYING)
     360              :                 {
     361            9 :                     shouldPause = true;
     362              :                 }
     363              :                 else
     364              :                 {
     365            3 :                     GST_DEBUG("Cannot pause in %u state", static_cast<uint32_t>(m_clientState));
     366              :                 }
     367              : 
     368          164 :                 if (shouldPause)
     369              :                 {
     370          153 :                     GST_INFO("Sending pause command in %u state", static_cast<uint32_t>(m_clientState));
     371          153 :                     m_clientBackend->pause();
     372          153 :                     m_clientState = ClientState::AWAITING_PAUSED;
     373              :                 }
     374              : 
     375          164 :                 result = StateChangeResult::SUCCESS_ASYNC;
     376          164 :                 sourceIt->second.m_delegate->postAsyncStart();
     377              :             }
     378              :         });
     379              : 
     380          312 :     return result;
     381              : }
     382              : 
     383          153 : void GStreamerMSEMediaPlayerClient::stop()
     384              : {
     385          306 :     m_backendQueue->callInEventLoop([&]() { m_clientBackend->stop(); });
     386          153 : }
     387              : 
     388            4 : void GStreamerMSEMediaPlayerClient::setPlaybackRate(double rate)
     389              : {
     390            8 :     m_backendQueue->callInEventLoop([&]() { m_clientBackend->setPlaybackRate(rate); });
     391            4 : }
     392              : 
     393            7 : void GStreamerMSEMediaPlayerClient::flush(int32_t sourceId, bool resetTime)
     394              : {
     395           14 :     m_backendQueue->callInEventLoop(
     396            7 :         [&]()
     397              :         {
     398            7 :             bool async{true};
     399            7 :             auto sourceIt = m_attachedSources.find(sourceId);
     400            7 :             if (sourceIt == m_attachedSources.end())
     401              :             {
     402            2 :                 GST_ERROR("Cannot flush - there's no attached source with id %d", sourceId);
     403            3 :                 return;
     404              :             }
     405            5 :             if (!m_clientBackend->flush(sourceId, resetTime, async))
     406              :             {
     407            1 :                 GST_ERROR("Flush operation failed for source with id %d", sourceId);
     408            1 :                 return;
     409              :             }
     410            4 :             sourceIt->second.m_isFlushing = true;
     411            4 :             sourceIt->second.m_bufferPuller->stop();
     412              : 
     413            4 :             if (async)
     414              :             {
     415            4 :                 GST_INFO("Flush request sent for async source %d. Sink will lose state now", sourceId);
     416            4 :                 sourceIt->second.m_delegate->lostState();
     417              : 
     418            4 :                 sourceIt->second.m_state = ClientState::AWAITING_PAUSED;
     419            4 :                 if (m_clientState == ClientState::PLAYING)
     420              :                 {
     421            0 :                     m_clientState = ClientState::AWAITING_PLAYING;
     422              :                 }
     423            4 :                 else if (m_clientState == ClientState::PAUSED)
     424              :                 {
     425            1 :                     m_clientState = ClientState::AWAITING_PAUSED;
     426              :                 }
     427              :             }
     428              :         });
     429            7 : }
     430              : 
     431            9 : void GStreamerMSEMediaPlayerClient::setSourcePosition(int32_t sourceId, int64_t position, bool resetTime,
     432              :                                                       double appliedRate, uint64_t stopPosition)
     433              : {
     434           18 :     m_backendQueue->callInEventLoop(
     435            9 :         [&]()
     436              :         {
     437            9 :             auto sourceIt = m_attachedSources.find(sourceId);
     438            9 :             if (sourceIt == m_attachedSources.end())
     439              :             {
     440            1 :                 GST_ERROR("Cannot Set Source Position - there's no attached source with id %d", sourceId);
     441            2 :                 return;
     442              :             }
     443            8 :             if (!m_clientBackend->setSourcePosition(sourceId, position, resetTime, appliedRate, stopPosition))
     444              :             {
     445            1 :                 GST_ERROR("Set Source Position operation failed for source with id %d", sourceId);
     446            1 :                 return;
     447              :             }
     448            7 :             sourceIt->second.m_position = position;
     449              :         });
     450            9 : }
     451              : 
     452            1 : void GStreamerMSEMediaPlayerClient::setSubtitleOffset(int32_t sourceId, int64_t position)
     453              : {
     454            2 :     m_backendQueue->callInEventLoop(
     455            1 :         [&]()
     456              :         {
     457            1 :             auto sourceIt = m_attachedSources.find(sourceId);
     458            1 :             if (sourceIt == m_attachedSources.end())
     459              :             {
     460            0 :                 GST_ERROR("Cannot Set Subtitle Offset - there's no attached source with id %d", sourceId);
     461            0 :                 return;
     462              :             }
     463            1 :             if (!m_clientBackend->setSubtitleOffset(sourceId, position))
     464              :             {
     465            0 :                 GST_ERROR("Set Subtitle Offset operation failed for source with id %d", sourceId);
     466            0 :                 return;
     467              :             }
     468              :         });
     469            1 : }
     470              : 
     471            4 : void GStreamerMSEMediaPlayerClient::processAudioGap(int64_t position, uint32_t duration, int64_t discontinuityGap,
     472              :                                                     bool audioAac)
     473              : {
     474            8 :     m_backendQueue->callInEventLoop(
     475            4 :         [&]()
     476              :         {
     477            4 :             if (!m_clientBackend->processAudioGap(position, duration, discontinuityGap, audioAac))
     478              :             {
     479            1 :                 GST_ERROR("Process Audio Gap operation failed");
     480            1 :                 return;
     481              :             }
     482              :         });
     483            4 : }
     484              : 
     485          200 : bool GStreamerMSEMediaPlayerClient::attachSource(std::unique_ptr<firebolt::rialto::IMediaPipeline::MediaSource> &source,
     486              :                                                  RialtoMSEBaseSink *rialtoSink,
     487              :                                                  const std::shared_ptr<IPullModePlaybackDelegate> &delegate)
     488              : {
     489          200 :     if (source->getType() != firebolt::rialto::MediaSourceType::AUDIO &&
     490          212 :         source->getType() != firebolt::rialto::MediaSourceType::VIDEO &&
     491           12 :         source->getType() != firebolt::rialto::MediaSourceType::SUBTITLE)
     492              :     {
     493            1 :         GST_WARNING_OBJECT(rialtoSink, "Invalid source type %u", static_cast<uint32_t>(source->getType()));
     494            1 :         return false;
     495              :     }
     496              : 
     497          199 :     bool result = false;
     498          398 :     m_backendQueue->callInEventLoop(
     499          199 :         [&]()
     500              :         {
     501          199 :             result = m_clientBackend->attachSource(source);
     502              : 
     503          199 :             if (result)
     504              :             {
     505          198 :                 std::shared_ptr<BufferParser> bufferParser;
     506          198 :                 if (source->getType() == firebolt::rialto::MediaSourceType::AUDIO)
     507              :                 {
     508          147 :                     bufferParser = std::make_shared<AudioBufferParser>();
     509              :                 }
     510           51 :                 else if (source->getType() == firebolt::rialto::MediaSourceType::VIDEO)
     511              :                 {
     512           40 :                     bufferParser = std::make_shared<VideoBufferParser>();
     513              :                 }
     514           11 :                 else if (source->getType() == firebolt::rialto::MediaSourceType::SUBTITLE)
     515              :                 {
     516           11 :                     bufferParser = std::make_shared<SubtitleBufferParser>();
     517              :                 }
     518              : 
     519          198 :                 std::shared_ptr<BufferPuller> bufferPuller = std::make_shared<BufferPuller>(m_messageQueueFactory,
     520            0 :                                                                                             GST_ELEMENT_CAST(rialtoSink),
     521          198 :                                                                                             bufferParser, delegate);
     522              : 
     523          198 :                 if (m_attachedSources.find(source->getId()) == m_attachedSources.end())
     524              :                 {
     525          198 :                     m_attachedSources.emplace(source->getId(),
     526          396 :                                               AttachedSource(rialtoSink, bufferPuller, delegate, source->getType()));
     527              : 
     528          198 :                     delegate->setSourceId(source->getId());
     529          198 :                     bufferPuller->start();
     530              :                 }
     531              :             }
     532              : 
     533          199 :             sendAllSourcesAttachedIfPossibleInternal();
     534          199 :         });
     535              : 
     536          199 :     return result;
     537              : }
     538              : 
     539            4 : void GStreamerMSEMediaPlayerClient::sendAllSourcesAttachedIfPossible()
     540              : {
     541            8 :     m_backendQueue->callInEventLoop([&]() { sendAllSourcesAttachedIfPossibleInternal(); });
     542            4 : }
     543              : 
     544          203 : void GStreamerMSEMediaPlayerClient::sendAllSourcesAttachedIfPossibleInternal()
     545              : {
     546          203 :     if (!m_wasAllSourcesAttachedSent && areAllStreamsAttached())
     547              :     {
     548              :         // RialtoServer doesn't support dynamic source attachment.
     549              :         // It means that when we notify that all sources were attached, we cannot add any more sources in the current session
     550          150 :         GST_INFO("All sources attached");
     551          150 :         m_clientBackend->allSourcesAttached();
     552          150 :         m_wasAllSourcesAttachedSent = true;
     553          150 :         m_clientState = ClientState::READY;
     554              : 
     555              :         // In playbin3 streams, confirmation about number of available sources comes after attaching the source,
     556              :         // so we need to check if all sources are ready to pause
     557          450 :         if (checkIfAllAttachedSourcesInStates({ClientState::AWAITING_PAUSED}))
     558              :         {
     559            1 :             GST_INFO("Sending pause command, because all attached sources are ready to pause");
     560            1 :             m_clientBackend->pause();
     561            1 :             m_clientState = ClientState::AWAITING_PAUSED;
     562              :         }
     563              :     }
     564          203 : }
     565              : 
     566          147 : void GStreamerMSEMediaPlayerClient::removeSource(int32_t sourceId)
     567              : {
     568          294 :     m_backendQueue->callInEventLoop(
     569          147 :         [&]()
     570              :         {
     571          147 :             if (!m_clientBackend->removeSource(sourceId))
     572              :             {
     573            1 :                 GST_WARNING("Remove source %d failed", sourceId);
     574              :             }
     575          147 :             m_attachedSources.erase(sourceId);
     576          147 :         });
     577              : }
     578              : 
     579           55 : void GStreamerMSEMediaPlayerClient::handlePlaybackStateChange(firebolt::rialto::PlaybackState state)
     580              : {
     581           55 :     GST_DEBUG("Received state change to state %u", static_cast<uint32_t>(state));
     582          110 :     m_backendQueue->callInEventLoop(
     583           55 :         [&]()
     584              :         {
     585           55 :             m_serverPlaybackState = state;
     586           55 :             switch (state)
     587              :             {
     588           48 :             case firebolt::rialto::PlaybackState::PAUSED:
     589              :             case firebolt::rialto::PlaybackState::PLAYING:
     590              :             {
     591           48 :                 if (state == firebolt::rialto::PlaybackState::PAUSED && m_clientState == ClientState::AWAITING_PAUSED)
     592              :                 {
     593           35 :                     m_clientState = ClientState::PAUSED;
     594              :                 }
     595           13 :                 else if (state == firebolt::rialto::PlaybackState::PLAYING &&
     596           11 :                          m_clientState == ClientState::AWAITING_PLAYING)
     597              :                 {
     598            8 :                     m_clientState = ClientState::PLAYING;
     599              :                 }
     600            5 :                 else if (state == firebolt::rialto::PlaybackState::PLAYING &&
     601            3 :                          m_clientState == ClientState::AWAITING_PAUSED)
     602              :                 {
     603            1 :                     GST_WARNING("Outdated Playback State change to PLAYING received. Discarding...");
     604            1 :                     break;
     605              :                 }
     606              : 
     607          104 :                 for (auto &source : m_attachedSources)
     608              :                 {
     609           57 :                     if (state == firebolt::rialto::PlaybackState::PAUSED &&
     610           44 :                         source.second.m_state == ClientState::AWAITING_PAUSED)
     611              :                     {
     612           41 :                         source.second.m_state = ClientState::PAUSED;
     613              :                     }
     614           16 :                     else if (state == firebolt::rialto::PlaybackState::PLAYING &&
     615           13 :                              source.second.m_state == ClientState::AWAITING_PLAYING)
     616              :                     {
     617           10 :                         source.second.m_state = ClientState::PLAYING;
     618              :                     }
     619           57 :                     source.second.m_delegate->handleStateChanged(state);
     620              :                 }
     621              : 
     622           47 :                 break;
     623              :             }
     624            4 :             case firebolt::rialto::PlaybackState::END_OF_STREAM:
     625              :             {
     626            8 :                 for (const auto &source : m_attachedSources)
     627              :                 {
     628            4 :                     source.second.m_delegate->handleEos();
     629              :                 }
     630              :             }
     631            4 :             break;
     632            1 :             case firebolt::rialto::PlaybackState::SEEK_DONE:
     633              :             {
     634            1 :                 GST_WARNING("firebolt::rialto::PlaybackState::SEEK_DONE notification not supported");
     635            1 :                 break;
     636              :             }
     637            1 :             case firebolt::rialto::PlaybackState::FAILURE:
     638              :             {
     639            2 :                 for (const auto &source : m_attachedSources)
     640              :                 {
     641            3 :                     source.second.m_delegate->handleError("Rialto server playback failed");
     642              :                 }
     643            2 :                 for (auto &source : m_attachedSources)
     644              :                 {
     645            1 :                     source.second.m_position = 0;
     646              :                 }
     647              : 
     648            1 :                 break;
     649              :             }
     650              :             break;
     651            1 :             default:
     652            1 :                 break;
     653              :             }
     654           55 :         });
     655              : }
     656              : 
     657            5 : void GStreamerMSEMediaPlayerClient::handleSourceFlushed(int32_t sourceId)
     658              : {
     659           10 :     m_backendQueue->callInEventLoop(
     660            5 :         [&]()
     661              :         {
     662            5 :             auto sourceIt = m_attachedSources.find(sourceId);
     663            5 :             if (sourceIt == m_attachedSources.end())
     664              :             {
     665            1 :                 GST_ERROR("Cannot finish flush - there's no attached source with id %d", sourceId);
     666            2 :                 return;
     667              :             }
     668            4 :             if (!sourceIt->second.m_isFlushing)
     669              :             {
     670            1 :                 GST_ERROR("Cannot finish flush - source with id %d is not flushing!", sourceId);
     671            1 :                 return;
     672              :             }
     673            3 :             sourceIt->second.m_isFlushing = false;
     674            3 :             sourceIt->second.m_bufferPuller->start();
     675            3 :             sourceIt->second.m_delegate->handleFlushCompleted();
     676              :         });
     677            5 : }
     678              : 
     679            7 : void GStreamerMSEMediaPlayerClient::setVideoRectangle(const std::string &rectangleString)
     680              : {
     681           14 :     m_backendQueue->callInEventLoop(
     682            7 :         [&]()
     683              :         {
     684            7 :             if (!m_clientBackend || !m_clientBackend->isMediaPlayerBackendCreated())
     685              :             {
     686            1 :                 GST_WARNING("Missing RialtoClient backend - can't set video window now");
     687            3 :                 return;
     688              :             }
     689              : 
     690            6 :             if (rectangleString.empty())
     691              :             {
     692            1 :                 GST_WARNING("Empty video rectangle string");
     693            1 :                 return;
     694              :             }
     695              : 
     696            5 :             Rectangle rect = {0, 0, 0, 0};
     697            5 :             if (sscanf(rectangleString.c_str(), "%u,%u,%u,%u", &rect.x, &rect.y, &rect.width, &rect.height) != 4)
     698              :             {
     699            1 :                 GST_WARNING("Invalid video rectangle values");
     700            1 :                 return;
     701              :             }
     702              : 
     703            4 :             m_clientBackend->setVideoWindow(rect.x, rect.y, rect.width, rect.height);
     704            4 :             m_videoRectangle = rect;
     705              :         });
     706            7 : }
     707              : 
     708            4 : std::string GStreamerMSEMediaPlayerClient::getVideoRectangle()
     709              : {
     710              :     char rectangle[64];
     711            8 :     m_backendQueue->callInEventLoop(
     712            8 :         [&]()
     713              :         {
     714            4 :             sprintf(rectangle, "%u,%u,%u,%u", m_videoRectangle.x, m_videoRectangle.y, m_videoRectangle.width,
     715              :                     m_videoRectangle.height);
     716            4 :         });
     717              : 
     718            8 :     return std::string(rectangle);
     719              : }
     720              : 
     721            4 : bool GStreamerMSEMediaPlayerClient::renderFrame(int32_t sourceId)
     722              : {
     723            4 :     bool result = false;
     724            8 :     m_backendQueue->callInEventLoop(
     725            4 :         [&]()
     726              :         {
     727            4 :             result = m_clientBackend->renderFrame();
     728            4 :             if (result)
     729              :             {
     730              :                 // RialtoServer's video sink should drop PAUSED state due to skipping prerolled buffer in PAUSED state
     731            3 :                 auto sourceIt = m_attachedSources.find(sourceId);
     732            3 :                 if (sourceIt != m_attachedSources.end())
     733              :                 {
     734            3 :                     sourceIt->second.m_delegate->lostState();
     735              :                 }
     736              :             }
     737            4 :         });
     738            4 :     return result;
     739              : }
     740              : 
     741            6 : void GStreamerMSEMediaPlayerClient::setVolume(double targetVolume, uint32_t volumeDuration,
     742              :                                               firebolt::rialto::EaseType easeType)
     743              : {
     744           12 :     m_backendQueue->callInEventLoop([&]() { m_clientBackend->setVolume(targetVolume, volumeDuration, easeType); });
     745            6 : }
     746              : 
     747            5 : bool GStreamerMSEMediaPlayerClient::getVolume(double &volume)
     748              : {
     749            5 :     bool status{false};
     750           10 :     m_backendQueue->callInEventLoop([&]() { status = m_clientBackend->getVolume(volume); });
     751            5 :     return status;
     752              : }
     753              : 
     754            7 : void GStreamerMSEMediaPlayerClient::setMute(bool mute, int32_t sourceId)
     755              : {
     756           14 :     m_backendQueue->callInEventLoop([&]() { m_clientBackend->setMute(mute, sourceId); });
     757            7 : }
     758              : 
     759            3 : bool GStreamerMSEMediaPlayerClient::getMute(int sourceId)
     760              : {
     761            3 :     bool mute{false};
     762            6 :     m_backendQueue->callInEventLoop([&]() { m_clientBackend->getMute(mute, sourceId); });
     763              : 
     764            3 :     return mute;
     765              : }
     766              : 
     767            3 : void GStreamerMSEMediaPlayerClient::setTextTrackIdentifier(const std::string &textTrackIdentifier)
     768              : {
     769            6 :     m_backendQueue->callInEventLoop([&]() { m_clientBackend->setTextTrackIdentifier(textTrackIdentifier); });
     770            3 : }
     771              : 
     772            2 : std::string GStreamerMSEMediaPlayerClient::getTextTrackIdentifier()
     773              : {
     774            2 :     std::string getTextTrackIdentifier;
     775            4 :     m_backendQueue->callInEventLoop([&]() { m_clientBackend->getTextTrackIdentifier(getTextTrackIdentifier); });
     776            2 :     return getTextTrackIdentifier;
     777              : }
     778              : 
     779            6 : bool GStreamerMSEMediaPlayerClient::setLowLatency(bool lowLatency)
     780              : {
     781            6 :     if (!m_clientBackend)
     782              :     {
     783            1 :         return false;
     784              :     }
     785              : 
     786            5 :     bool status{false};
     787           10 :     m_backendQueue->callInEventLoop([&]() { status = m_clientBackend->setLowLatency(lowLatency); });
     788            5 :     return status;
     789              : }
     790              : 
     791            6 : bool GStreamerMSEMediaPlayerClient::setSync(bool sync)
     792              : {
     793            6 :     if (!m_clientBackend)
     794              :     {
     795            1 :         return false;
     796              :     }
     797              : 
     798            5 :     bool status{false};
     799           10 :     m_backendQueue->callInEventLoop([&]() { status = m_clientBackend->setSync(sync); });
     800            5 :     return status;
     801              : }
     802              : 
     803            4 : bool GStreamerMSEMediaPlayerClient::getSync(bool &sync)
     804              : {
     805            4 :     if (!m_clientBackend)
     806              :     {
     807            1 :         return false;
     808              :     }
     809              : 
     810            3 :     bool status{false};
     811            6 :     m_backendQueue->callInEventLoop([&]() { status = m_clientBackend->getSync(sync); });
     812            3 :     return status;
     813              : }
     814              : 
     815            6 : bool GStreamerMSEMediaPlayerClient::setSyncOff(bool syncOff)
     816              : {
     817            6 :     if (!m_clientBackend)
     818              :     {
     819            1 :         return false;
     820              :     }
     821              : 
     822            5 :     bool status{false};
     823           10 :     m_backendQueue->callInEventLoop([&]() { status = m_clientBackend->setSyncOff(syncOff); });
     824            5 :     return status;
     825              : }
     826              : 
     827           10 : bool GStreamerMSEMediaPlayerClient::setStreamSyncMode(int32_t sourceId, int32_t streamSyncMode)
     828              : {
     829           10 :     if (!m_clientBackend)
     830              :     {
     831            1 :         return false;
     832              :     }
     833              : 
     834            9 :     bool status{false};
     835           18 :     m_backendQueue->callInEventLoop([&]() { status = m_clientBackend->setStreamSyncMode(sourceId, streamSyncMode); });
     836            9 :     return status;
     837              : }
     838              : 
     839            4 : bool GStreamerMSEMediaPlayerClient::getStreamSyncMode(int32_t &streamSyncMode)
     840              : {
     841            4 :     if (!m_clientBackend)
     842              :     {
     843            1 :         return false;
     844              :     }
     845              : 
     846            3 :     bool status{false};
     847            6 :     m_backendQueue->callInEventLoop([&]() { status = m_clientBackend->getStreamSyncMode(streamSyncMode); });
     848            3 :     return status;
     849              : }
     850              : 
     851           39 : ClientState GStreamerMSEMediaPlayerClient::getClientState()
     852              : {
     853           39 :     ClientState state{ClientState::IDLE};
     854           39 :     m_backendQueue->callInEventLoop([&]() { state = m_clientState; });
     855           39 :     return state;
     856              : }
     857              : 
     858          166 : void GStreamerMSEMediaPlayerClient::handleStreamCollection(int32_t audioStreams, int32_t videoStreams,
     859              :                                                            int32_t subtitleStreams)
     860              : {
     861          332 :     m_backendQueue->callInEventLoop(
     862          166 :         [&]()
     863              :         {
     864          166 :             if (m_audioStreams == UNKNOWN_STREAMS_NUMBER)
     865          163 :                 m_audioStreams = audioStreams;
     866          166 :             if (m_videoStreams == UNKNOWN_STREAMS_NUMBER)
     867          162 :                 m_videoStreams = videoStreams;
     868          166 :             if (m_subtitleStreams == UNKNOWN_STREAMS_NUMBER)
     869          162 :                 m_subtitleStreams = subtitleStreams;
     870              : 
     871          166 :             GST_INFO("Updated number of streams. New streams' numbers; video=%d, audio=%d, text=%d", m_videoStreams,
     872              :                      m_audioStreams, m_subtitleStreams);
     873          166 :         });
     874              : }
     875              : 
     876            3 : void GStreamerMSEMediaPlayerClient::setBufferingLimit(uint32_t limitBufferingMs)
     877              : {
     878            3 :     if (!m_clientBackend)
     879              :     {
     880            0 :         return;
     881              :     }
     882            6 :     m_backendQueue->callInEventLoop([&]() { m_clientBackend->setBufferingLimit(limitBufferingMs); });
     883              : }
     884              : 
     885            2 : uint32_t GStreamerMSEMediaPlayerClient::getBufferingLimit()
     886              : {
     887            2 :     if (!m_clientBackend)
     888              :     {
     889            0 :         return kDefaultBufferingLimit;
     890              :     }
     891              : 
     892            2 :     uint32_t result{kDefaultBufferingLimit};
     893            4 :     m_backendQueue->callInEventLoop([&]() { m_clientBackend->getBufferingLimit(result); });
     894            2 :     return result;
     895              : }
     896              : 
     897            3 : void GStreamerMSEMediaPlayerClient::setUseBuffering(bool useBuffering)
     898              : {
     899            3 :     if (!m_clientBackend)
     900              :     {
     901            0 :         return;
     902              :     }
     903            6 :     m_backendQueue->callInEventLoop([&]() { m_clientBackend->setUseBuffering(useBuffering); });
     904              : }
     905              : 
     906            2 : bool GStreamerMSEMediaPlayerClient::getUseBuffering()
     907              : {
     908            2 :     if (!m_clientBackend)
     909              :     {
     910            0 :         return kDefaultUseBuffering;
     911              :     }
     912              : 
     913            2 :     bool result{kDefaultUseBuffering};
     914            4 :     m_backendQueue->callInEventLoop([&]() { m_clientBackend->getUseBuffering(result); });
     915            2 :     return result;
     916              : }
     917              : 
     918            3 : bool GStreamerMSEMediaPlayerClient::switchSource(const std::unique_ptr<firebolt::rialto::IMediaPipeline::MediaSource> &source)
     919              : {
     920            3 :     bool result = false;
     921            6 :     m_backendQueue->callInEventLoop([&]() { result = m_clientBackend->switchSource(source); });
     922              : 
     923            3 :     return result;
     924              : }
     925              : 
     926          320 : bool GStreamerMSEMediaPlayerClient::checkIfAllAttachedSourcesInStates(const std::vector<ClientState> &states)
     927              : {
     928          640 :     return std::all_of(m_attachedSources.begin(), m_attachedSources.end(), [states](const auto &source)
     929          973 :                        { return std::find(states.begin(), states.end(), source.second.m_state) != states.end(); });
     930              : }
     931              : 
     932          202 : bool GStreamerMSEMediaPlayerClient::areAllStreamsAttached()
     933              : {
     934          202 :     int32_t attachedVideoSources = 0;
     935          202 :     int32_t attachedAudioSources = 0;
     936          202 :     int32_t attachedSubtitleSources = 0;
     937          414 :     for (auto &source : m_attachedSources)
     938              :     {
     939          212 :         if (source.second.getType() == firebolt::rialto::MediaSourceType::VIDEO)
     940              :         {
     941           40 :             attachedVideoSources++;
     942              :         }
     943          172 :         else if (source.second.getType() == firebolt::rialto::MediaSourceType::AUDIO)
     944              :         {
     945          161 :             attachedAudioSources++;
     946              :         }
     947           11 :         else if (source.second.getType() == firebolt::rialto::MediaSourceType::SUBTITLE)
     948              :         {
     949           11 :             attachedSubtitleSources++;
     950              :         }
     951              :     }
     952              : 
     953          352 :     return attachedVideoSources == m_videoStreams && attachedAudioSources == m_audioStreams &&
     954          352 :            attachedSubtitleSources == m_subtitleStreams;
     955              : }
     956              : 
     957            9 : bool GStreamerMSEMediaPlayerClient::requestPullBuffer(int streamId, size_t frameCount, unsigned int needDataRequestId)
     958              : {
     959            9 :     bool result = false;
     960           18 :     m_backendQueue->callInEventLoop(
     961            9 :         [&]()
     962              :         {
     963            9 :             auto sourceIt = m_attachedSources.find(streamId);
     964            9 :             if (sourceIt == m_attachedSources.end())
     965              :             {
     966            1 :                 GST_ERROR("There's no attached source with id %d", streamId);
     967              : 
     968            1 :                 result = false;
     969            1 :                 return;
     970              :             }
     971            8 :             result = sourceIt->second.m_bufferPuller->requestPullBuffer(streamId, frameCount, needDataRequestId, this);
     972              :         });
     973              : 
     974            9 :     return result;
     975              : }
     976              : 
     977            5 : bool GStreamerMSEMediaPlayerClient::handleQos(int sourceId, firebolt::rialto::QosInfo qosInfo)
     978              : {
     979            5 :     bool result = false;
     980           10 :     m_backendQueue->callInEventLoop(
     981            5 :         [&]()
     982              :         {
     983            5 :             auto sourceIt = m_attachedSources.find(sourceId);
     984            5 :             if (sourceIt == m_attachedSources.end())
     985              :             {
     986            1 :                 result = false;
     987            1 :                 return;
     988              :             }
     989            4 :             sourceIt->second.m_delegate->handleQos(qosInfo.processed, qosInfo.dropped);
     990            4 :             result = true;
     991              :         });
     992              : 
     993            5 :     return result;
     994              : }
     995              : 
     996            2 : bool GStreamerMSEMediaPlayerClient::handleBufferUnderflow(int sourceId)
     997              : {
     998            2 :     bool result = false;
     999            4 :     m_backendQueue->callInEventLoop(
    1000            2 :         [&]()
    1001              :         {
    1002            2 :             auto sourceIt = m_attachedSources.find(sourceId);
    1003            2 :             if (sourceIt == m_attachedSources.end())
    1004              :             {
    1005            1 :                 result = false;
    1006            1 :                 return;
    1007              :             }
    1008              : 
    1009            1 :             rialto_mse_base_handle_rialto_server_sent_buffer_underflow(sourceIt->second.m_rialtoSink);
    1010              : 
    1011            1 :             result = true;
    1012              :         });
    1013              : 
    1014            2 :     return result;
    1015              : }
    1016              : 
    1017            5 : bool GStreamerMSEMediaPlayerClient::handlePlaybackError(int sourceId, firebolt::rialto::PlaybackError error)
    1018              : {
    1019            5 :     bool result = false;
    1020           10 :     m_backendQueue->callInEventLoop(
    1021            5 :         [&]()
    1022              :         {
    1023            5 :             auto sourceIt = m_attachedSources.find(sourceId);
    1024            5 :             if (sourceIt == m_attachedSources.end())
    1025              :             {
    1026            1 :                 result = false;
    1027            1 :                 return;
    1028              :             }
    1029              : 
    1030              :             // Even though rialto has only reported a non-fatal error, still fail the pipeline from rialto-gstreamer
    1031            4 :             GST_ERROR("Received Playback error '%s', posting error on %s sink", toString(error),
    1032              :                       toString(sourceIt->second.getType()));
    1033            4 :             if (firebolt::rialto::PlaybackError::DECRYPTION == error)
    1034              :             {
    1035            6 :                 sourceIt->second.m_delegate->handleError("Rialto dropped a frame that failed to decrypt",
    1036              :                                                          GST_STREAM_ERROR_DECRYPT);
    1037              :             }
    1038              :             else
    1039              :             {
    1040            6 :                 sourceIt->second.m_delegate->handleError("Rialto server playback failed");
    1041              :             }
    1042              : 
    1043            4 :             result = true;
    1044              :         });
    1045              : 
    1046            5 :     return result;
    1047              : }
    1048              : 
    1049            4 : firebolt::rialto::AddSegmentStatus GStreamerMSEMediaPlayerClient::addSegment(
    1050              :     unsigned int needDataRequestId, const std::unique_ptr<firebolt::rialto::IMediaPipeline::MediaSegment> &mediaSegment)
    1051              : {
    1052              :     // rialto client's addSegment call is MT safe, so it's ok to call it from the Puller's thread
    1053            4 :     return m_clientBackend->addSegment(needDataRequestId, mediaSegment);
    1054              : }
    1055              : 
    1056          198 : BufferPuller::BufferPuller(const std::shared_ptr<IMessageQueueFactory> &messageQueueFactory, GstElement *rialtoSink,
    1057              :                            const std::shared_ptr<BufferParser> &bufferParser,
    1058          198 :                            const std::shared_ptr<IPullModePlaybackDelegate> &delegate)
    1059          198 :     : m_queue{messageQueueFactory->createMessageQueue()}, m_rialtoSink(rialtoSink), m_bufferParser(bufferParser),
    1060          198 :       m_delegate{delegate}
    1061              : {
    1062              : }
    1063              : 
    1064          201 : void BufferPuller::start()
    1065              : {
    1066          201 :     m_queue->start();
    1067              : }
    1068              : 
    1069           65 : void BufferPuller::stop()
    1070              : {
    1071           65 :     m_queue->stop();
    1072              : }
    1073              : 
    1074            8 : bool BufferPuller::requestPullBuffer(int sourceId, size_t frameCount, unsigned int needDataRequestId,
    1075              :                                      GStreamerMSEMediaPlayerClient *player)
    1076              : {
    1077           24 :     return m_queue->postMessage(std::make_shared<PullBufferMessage>(sourceId, frameCount, needDataRequestId, m_rialtoSink,
    1078           24 :                                                                     m_bufferParser, *m_queue, player, m_delegate));
    1079              : }
    1080              : 
    1081            9 : HaveDataMessage::HaveDataMessage(firebolt::rialto::MediaSourceStatus status, int sourceId,
    1082            9 :                                  unsigned int needDataRequestId, GStreamerMSEMediaPlayerClient *player)
    1083            9 :     : m_status(status), m_sourceId(sourceId), m_needDataRequestId(needDataRequestId), m_player(player)
    1084              : {
    1085              : }
    1086              : 
    1087            9 : void HaveDataMessage::handle()
    1088              : {
    1089            9 :     if (m_player->m_attachedSources.find(m_sourceId) == m_player->m_attachedSources.end())
    1090              :     {
    1091            1 :         GST_WARNING("Source id %d is invalid", m_sourceId);
    1092            1 :         return;
    1093              :     }
    1094              : 
    1095            8 :     m_player->m_clientBackend->haveData(m_status, m_needDataRequestId);
    1096              : }
    1097              : 
    1098            8 : PullBufferMessage::PullBufferMessage(int sourceId, size_t frameCount, unsigned int needDataRequestId,
    1099              :                                      GstElement *rialtoSink, const std::shared_ptr<BufferParser> &bufferParser,
    1100              :                                      IMessageQueue &pullerQueue, GStreamerMSEMediaPlayerClient *player,
    1101            8 :                                      const std::shared_ptr<IPullModePlaybackDelegate> &delegate)
    1102            8 :     : m_sourceId(sourceId), m_frameCount(frameCount), m_needDataRequestId(needDataRequestId), m_rialtoSink(rialtoSink),
    1103            8 :       m_bufferParser(bufferParser), m_pullerQueue(pullerQueue), m_player(player), m_delegate{delegate}
    1104              : {
    1105              : }
    1106              : 
    1107            7 : void PullBufferMessage::handle()
    1108              : {
    1109            7 :     bool isEos = false;
    1110            7 :     unsigned int addedSegments = 0;
    1111              : 
    1112            9 :     for (unsigned int frame = 0; frame < m_frameCount; ++frame)
    1113              :     {
    1114            7 :         if (!m_delegate->isReadyToSendData())
    1115              :         {
    1116            1 :             GST_INFO_OBJECT(m_rialtoSink, "Not ready to send data - segment or eos not received yet");
    1117            5 :             break;
    1118              :         }
    1119            6 :         GstRefSample sample = m_delegate->getFrontSample();
    1120            6 :         if (!sample)
    1121              :         {
    1122            3 :             if (m_delegate->isEos())
    1123              :             {
    1124            1 :                 isEos = true;
    1125              :             }
    1126              :             else
    1127              :             {
    1128              :                 // it's not a critical issue. It might be caused by receiving too many need data requests.
    1129            2 :                 GST_INFO_OBJECT(m_rialtoSink, "Could not get a sample");
    1130              :             }
    1131            3 :             break;
    1132              :         }
    1133              : 
    1134              :         // we pass GstMapInfo's pointers on data buffers to RialtoClient
    1135              :         // so we need to hold it until RialtoClient copies them to shm
    1136            3 :         GstBuffer *buffer = sample.getBuffer();
    1137              :         GstMapInfo map;
    1138            3 :         if (!gst_buffer_map(buffer, &map, GST_MAP_READ))
    1139              :         {
    1140            0 :             GST_ERROR_OBJECT(m_rialtoSink, "Could not map buffer");
    1141            0 :             m_delegate->popSample();
    1142            0 :             continue;
    1143              :         }
    1144              : 
    1145              :         std::unique_ptr<firebolt::rialto::IMediaPipeline::MediaSegment> mseData =
    1146            3 :             m_bufferParser->parseBuffer(sample, buffer, map, m_sourceId);
    1147            3 :         if (!mseData)
    1148              :         {
    1149            0 :             GST_ERROR_OBJECT(m_rialtoSink, "No data returned from the parser");
    1150            0 :             gst_buffer_unmap(buffer, &map);
    1151            0 :             m_delegate->popSample();
    1152            0 :             continue;
    1153              :         }
    1154              : 
    1155            3 :         firebolt::rialto::AddSegmentStatus addSegmentStatus = m_player->addSegment(m_needDataRequestId, mseData);
    1156            3 :         if (addSegmentStatus == firebolt::rialto::AddSegmentStatus::NO_SPACE)
    1157              :         {
    1158            1 :             gst_buffer_unmap(buffer, &map);
    1159            1 :             GST_INFO_OBJECT(m_rialtoSink, "There's no space to add sample");
    1160            1 :             break;
    1161              :         }
    1162              : 
    1163            2 :         gst_buffer_unmap(buffer, &map);
    1164            2 :         m_delegate->popSample();
    1165            2 :         addedSegments++;
    1166            7 :     }
    1167              : 
    1168            7 :     firebolt::rialto::MediaSourceStatus status = firebolt::rialto::MediaSourceStatus::OK;
    1169            7 :     if (isEos)
    1170              :     {
    1171            1 :         status = firebolt::rialto::MediaSourceStatus::EOS;
    1172              :     }
    1173            6 :     else if (addedSegments == 0)
    1174              :     {
    1175            4 :         status = firebolt::rialto::MediaSourceStatus::NO_AVAILABLE_SAMPLES;
    1176              :     }
    1177              : 
    1178           14 :     m_player->m_backendQueue->postMessage(
    1179           14 :         std::make_shared<HaveDataMessage>(status, m_sourceId, m_needDataRequestId, m_player));
    1180            7 : }
    1181              : 
    1182            9 : NeedDataMessage::NeedDataMessage(int sourceId, size_t frameCount, unsigned int needDataRequestId,
    1183            9 :                                  GStreamerMSEMediaPlayerClient *player)
    1184            9 :     : m_sourceId(sourceId), m_frameCount(frameCount), m_needDataRequestId(needDataRequestId), m_player(player)
    1185              : {
    1186              : }
    1187              : 
    1188            9 : void NeedDataMessage::handle()
    1189              : {
    1190            9 :     if (!m_player->requestPullBuffer(m_sourceId, m_frameCount, m_needDataRequestId))
    1191              :     {
    1192            2 :         GST_ERROR("Failed to pull buffer for sourceId=%d and NeedDataRequestId %u", m_sourceId, m_needDataRequestId);
    1193            4 :         m_player->m_backendQueue->postMessage(
    1194            2 :             std::make_shared<HaveDataMessage>(firebolt::rialto::MediaSourceStatus::ERROR, m_sourceId,
    1195            2 :                                               m_needDataRequestId, m_player));
    1196              :     }
    1197            9 : }
    1198              : 
    1199           55 : PlaybackStateMessage::PlaybackStateMessage(firebolt::rialto::PlaybackState state, GStreamerMSEMediaPlayerClient *player)
    1200           55 :     : m_state(state), m_player(player)
    1201              : {
    1202              : }
    1203              : 
    1204           55 : void PlaybackStateMessage::handle()
    1205              : {
    1206           55 :     m_player->handlePlaybackStateChange(m_state);
    1207              : }
    1208              : 
    1209            5 : QosMessage::QosMessage(int sourceId, firebolt::rialto::QosInfo qosInfo, GStreamerMSEMediaPlayerClient *player)
    1210            5 :     : m_sourceId(sourceId), m_qosInfo(qosInfo), m_player(player)
    1211              : {
    1212              : }
    1213              : 
    1214            5 : void QosMessage::handle()
    1215              : {
    1216            5 :     if (!m_player->handleQos(m_sourceId, m_qosInfo))
    1217              :     {
    1218            1 :         GST_ERROR("Failed to handle qos for sourceId=%d", m_sourceId);
    1219              :     }
    1220            5 : }
    1221              : 
    1222            2 : BufferUnderflowMessage::BufferUnderflowMessage(int sourceId, GStreamerMSEMediaPlayerClient *player)
    1223            2 :     : m_sourceId(sourceId), m_player(player)
    1224              : {
    1225              : }
    1226              : 
    1227            2 : void BufferUnderflowMessage::handle()
    1228              : {
    1229            2 :     if (!m_player->handleBufferUnderflow(m_sourceId))
    1230              :     {
    1231            1 :         GST_ERROR("Failed to handle buffer underflow for sourceId=%d", m_sourceId);
    1232              :     }
    1233            2 : }
    1234              : 
    1235            5 : PlaybackErrorMessage::PlaybackErrorMessage(int sourceId, firebolt::rialto::PlaybackError error,
    1236            5 :                                            GStreamerMSEMediaPlayerClient *player)
    1237            5 :     : m_sourceId(sourceId), m_error(error), m_player(player)
    1238              : {
    1239              : }
    1240              : 
    1241            5 : void PlaybackErrorMessage::handle()
    1242              : {
    1243            5 :     if (!m_player->handlePlaybackError(m_sourceId, m_error))
    1244              :     {
    1245            1 :         GST_ERROR("Failed to handle playback error for sourceId=%d, error %s", m_sourceId, toString(m_error));
    1246              :     }
    1247            5 : }
    1248              : 
    1249            1 : SetPositionMessage::SetPositionMessage(int64_t newPosition, std::unordered_map<int32_t, AttachedSource> &attachedSources)
    1250            1 :     : m_newPosition(newPosition), m_attachedSources(attachedSources)
    1251              : {
    1252              : }
    1253              : 
    1254            1 : void SetPositionMessage::handle()
    1255              : {
    1256            2 :     for (auto &source : m_attachedSources)
    1257              :     {
    1258            1 :         source.second.setPosition(m_newPosition);
    1259              :     }
    1260              : }
    1261              : 
    1262            1 : SetDurationMessage::SetDurationMessage(int64_t newDuration, int64_t &targetDuration)
    1263            1 :     : m_newDuration(newDuration), m_targetDuration(targetDuration)
    1264              : {
    1265              : }
    1266              : 
    1267            1 : void SetDurationMessage::handle()
    1268              : {
    1269            1 :     m_targetDuration = m_newDuration;
    1270              : }
    1271              : 
    1272            5 : SourceFlushedMessage::SourceFlushedMessage(int32_t sourceId, GStreamerMSEMediaPlayerClient *player)
    1273            5 :     : m_sourceId{sourceId}, m_player{player}
    1274              : {
    1275              : }
    1276              : 
    1277            5 : void SourceFlushedMessage::handle()
    1278              : {
    1279            5 :     m_player->handleSourceFlushed(m_sourceId);
    1280              : }
        

Generated by: LCOV version 2.0-1