LCOV - code coverage report
Current view: top level - source - GStreamerMSEMediaPlayerClient.cpp (source / functions) Coverage Total Hit
Test: coverage.info Lines: 95.4 % 610 582
Test Date: 2025-06-24 14:11:58 Functions: 98.5 % 130 128

            Line data    Source code
       1              : /*
       2              :  * Copyright (C) 2022 Sky UK
       3              :  *
       4              :  * This library is free software; you can redistribute it and/or
       5              :  * modify it under the terms of the GNU Lesser General Public
       6              :  * License as published by the Free Software Foundation;
       7              :  * version 2.1 of the License.
       8              :  *
       9              :  * This library is distributed in the hope that it will be useful,
      10              :  * but WITHOUT ANY WARRANTY; without even the implied warranty of
      11              :  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
      12              :  * Lesser General Public License for more details.
      13              :  *
      14              :  * You should have received a copy of the GNU Lesser General Public
      15              :  * License along with this library; if not, write to the Free Software
      16              :  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
      17              :  */
      18              : 
      19              : #include "GStreamerMSEMediaPlayerClient.h"
      20              : #include "Constants.h"
      21              : #include "GstreamerCatLog.h"
      22              : #include "RialtoGStreamerMSEBaseSink.h"
      23              : #include "RialtoGStreamerMSEBaseSinkPrivate.h"
      24              : #include "RialtoGStreamerMSEVideoSink.h"
      25              : 
      26              : #include <algorithm>
      27              : #include <chrono>
      28              : #include <thread>
      29              : 
      30              : namespace
      31              : {
      32              : // The start time of segment might differ from the first sample which is injected.
      33              : // That difference should not be bigger than 1 video / audio frame.
      34              : // 1 second is probably erring on the side of caution, but should not have side effect.
      35              : const int64_t segmentStartMaximumDiff = 1000000000;
      36              : const int32_t UNKNOWN_STREAMS_NUMBER = -1;
      37              : 
      38            0 : const char *toString(const firebolt::rialto::PlaybackError &error)
      39              : {
      40            0 :     switch (error)
      41              :     {
      42            0 :     case firebolt::rialto::PlaybackError::DECRYPTION:
      43            0 :         return "DECRYPTION";
      44            0 :     case firebolt::rialto::PlaybackError::UNKNOWN:
      45            0 :         return "UNKNOWN";
      46              :     }
      47            0 :     return "UNKNOWN";
      48              : }
      49            0 : const char *toString(const firebolt::rialto::MediaSourceType &src)
      50              : {
      51            0 :     switch (src)
      52              :     {
      53            0 :     case firebolt::rialto::MediaSourceType::AUDIO:
      54            0 :         return "AUDIO";
      55            0 :     case firebolt::rialto::MediaSourceType::VIDEO:
      56            0 :         return "VIDEO";
      57            0 :     case firebolt::rialto::MediaSourceType::SUBTITLE:
      58            0 :         return "SUBTITLE";
      59            0 :     case firebolt::rialto::MediaSourceType::UNKNOWN:
      60            0 :         return "UNKNOWN";
      61              :     }
      62            0 :     return "UNKNOWN";
      63              : }
      64              : } // namespace
      65              : #define GST_CAT_DEFAULT rialtoGStreamerCat
      66          259 : GStreamerMSEMediaPlayerClient::GStreamerMSEMediaPlayerClient(
      67              :     const std::shared_ptr<IMessageQueueFactory> &messageQueueFactory,
      68              :     const std::shared_ptr<firebolt::rialto::client::MediaPlayerClientBackendInterface> &MediaPlayerClientBackend,
      69          259 :     const uint32_t maxVideoWidth, const uint32_t maxVideoHeight)
      70          259 :     : m_backendQueue{messageQueueFactory->createMessageQueue()}, m_messageQueueFactory{messageQueueFactory},
      71          259 :       m_clientBackend(MediaPlayerClientBackend), m_duration(0), m_audioStreams{UNKNOWN_STREAMS_NUMBER},
      72          259 :       m_videoStreams{UNKNOWN_STREAMS_NUMBER}, m_subtitleStreams{UNKNOWN_STREAMS_NUMBER},
      73          259 :       m_videoRectangle{0, 0, 1920, 1080}, m_streamingStopped(false),
      74          259 :       m_maxWidth(maxVideoWidth == 0 ? DEFAULT_MAX_VIDEO_WIDTH : maxVideoWidth),
      75          777 :       m_maxHeight(maxVideoHeight == 0 ? DEFAULT_MAX_VIDEO_HEIGHT : maxVideoHeight)
      76              : {
      77          259 :     m_backendQueue->start();
      78              : }
      79              : 
      80          259 : GStreamerMSEMediaPlayerClient::~GStreamerMSEMediaPlayerClient()
      81              : {
      82          259 :     stopStreaming();
      83              : }
      84              : 
      85          401 : void GStreamerMSEMediaPlayerClient::stopStreaming()
      86              : {
      87          401 :     if (!m_streamingStopped)
      88              :     {
      89          259 :         m_backendQueue->stop();
      90              : 
      91          317 :         for (auto &source : m_attachedSources)
      92              :         {
      93           58 :             source.second.m_bufferPuller->stop();
      94              :         }
      95              : 
      96          259 :         m_streamingStopped = true;
      97              :     }
      98          401 : }
      99              : 
     100              : // Deletes client backend -> this deletes mediapipeline object
     101          144 : void GStreamerMSEMediaPlayerClient::destroyClientBackend()
     102              : {
     103          144 :     m_clientBackend.reset();
     104              : }
     105              : 
     106            1 : void GStreamerMSEMediaPlayerClient::notifyDuration(int64_t duration)
     107              : {
     108            1 :     m_backendQueue->postMessage(std::make_shared<SetDurationMessage>(duration, m_duration));
     109              : }
     110              : 
     111            1 : void GStreamerMSEMediaPlayerClient::notifyPosition(int64_t position)
     112              : {
     113            1 :     m_backendQueue->postMessage(std::make_shared<SetPositionMessage>(position, m_attachedSources));
     114              : }
     115              : 
     116              : void GStreamerMSEMediaPlayerClient::notifyNativeSize(uint32_t width, uint32_t height, double aspect) {}
     117              : 
     118              : void GStreamerMSEMediaPlayerClient::notifyNetworkState(firebolt::rialto::NetworkState state) {}
     119              : 
     120           17 : void GStreamerMSEMediaPlayerClient::notifyPlaybackState(firebolt::rialto::PlaybackState state)
     121              : {
     122           17 :     m_backendQueue->postMessage(std::make_shared<PlaybackStateMessage>(state, this));
     123              : }
     124              : 
     125              : void GStreamerMSEMediaPlayerClient::notifyVideoData(bool hasData) {}
     126              : 
     127              : void GStreamerMSEMediaPlayerClient::notifyAudioData(bool hasData) {}
     128              : 
     129            7 : void GStreamerMSEMediaPlayerClient::notifyNeedMediaData(
     130              :     int32_t sourceId, size_t frameCount, uint32_t needDataRequestId,
     131              :     const std::shared_ptr<firebolt::rialto::MediaPlayerShmInfo> & /*shmInfo*/)
     132              : {
     133            7 :     m_backendQueue->postMessage(std::make_shared<NeedDataMessage>(sourceId, frameCount, needDataRequestId, this));
     134              : 
     135            7 :     return;
     136              : }
     137              : 
     138              : void GStreamerMSEMediaPlayerClient::notifyCancelNeedMediaData(int sourceId) {}
     139              : 
     140            4 : void GStreamerMSEMediaPlayerClient::notifyQos(int32_t sourceId, const firebolt::rialto::QosInfo &qosInfo)
     141              : {
     142            4 :     m_backendQueue->postMessage(std::make_shared<QosMessage>(sourceId, qosInfo, this));
     143              : }
     144              : 
     145            2 : void GStreamerMSEMediaPlayerClient::notifyBufferUnderflow(int32_t sourceId)
     146              : {
     147            2 :     m_backendQueue->postMessage(std::make_shared<BufferUnderflowMessage>(sourceId, this));
     148              : }
     149              : 
     150            2 : void GStreamerMSEMediaPlayerClient::notifyPlaybackError(int32_t sourceId, firebolt::rialto::PlaybackError error)
     151              : {
     152            2 :     m_backendQueue->postMessage(std::make_shared<PlaybackErrorMessage>(sourceId, error, this));
     153              : }
     154              : 
     155            3 : void GStreamerMSEMediaPlayerClient::notifySourceFlushed(int32_t sourceId)
     156              : {
     157            3 :     m_backendQueue->postMessage(std::make_shared<SourceFlushedMessage>(sourceId, this));
     158              : }
     159              : 
     160            6 : void GStreamerMSEMediaPlayerClient::getPositionDo(int64_t *position, int32_t sourceId)
     161              : {
     162            6 :     auto sourceIt = m_attachedSources.find(sourceId);
     163            6 :     if (sourceIt == m_attachedSources.end())
     164              :     {
     165            1 :         *position = -1;
     166            1 :         return;
     167              :     }
     168              : 
     169            5 :     if (m_clientBackend && m_clientBackend->getPosition(*position))
     170              :     {
     171            3 :         sourceIt->second.m_position = *position;
     172              :     }
     173              :     else
     174              :     {
     175            2 :         *position = sourceIt->second.m_position;
     176              :     }
     177              : }
     178              : 
     179            6 : int64_t GStreamerMSEMediaPlayerClient::getPosition(int32_t sourceId)
     180              : {
     181              :     int64_t position;
     182           12 :     m_backendQueue->callInEventLoop([&]() { getPositionDo(&position, sourceId); });
     183            6 :     return position;
     184              : }
     185              : 
     186            5 : bool GStreamerMSEMediaPlayerClient::setImmediateOutput(int32_t sourceId, bool immediateOutput)
     187              : {
     188            5 :     if (!m_clientBackend)
     189              :     {
     190            1 :         return false;
     191              :     }
     192              : 
     193            4 :     bool status{false};
     194            8 :     m_backendQueue->callInEventLoop([&]() { status = m_clientBackend->setImmediateOutput(sourceId, immediateOutput); });
     195            4 :     return status;
     196              : }
     197              : 
     198            4 : bool GStreamerMSEMediaPlayerClient::getImmediateOutput(int32_t sourceId, bool &immediateOutput)
     199              : {
     200            4 :     if (!m_clientBackend)
     201              :     {
     202            1 :         return false;
     203              :     }
     204              : 
     205            3 :     bool status{false};
     206            6 :     m_backendQueue->callInEventLoop([&]() { status = m_clientBackend->getImmediateOutput(sourceId, immediateOutput); });
     207            3 :     return status;
     208              : }
     209              : 
     210            1 : bool GStreamerMSEMediaPlayerClient::getStats(int32_t sourceId, uint64_t &renderedFrames, uint64_t &droppedFrames)
     211              : {
     212            1 :     if (!m_clientBackend)
     213              :     {
     214            0 :         return false;
     215              :     }
     216              : 
     217            1 :     bool status{false};
     218            1 :     m_backendQueue->callInEventLoop([&]()
     219            1 :                                     { status = m_clientBackend->getStats(sourceId, renderedFrames, droppedFrames); });
     220            1 :     return status;
     221              : }
     222              : 
     223          150 : bool GStreamerMSEMediaPlayerClient::createBackend()
     224              : {
     225          150 :     bool result = false;
     226          300 :     m_backendQueue->callInEventLoop(
     227          150 :         [&]()
     228              :         {
     229          150 :             if (!m_clientBackend)
     230              :             {
     231            1 :                 GST_ERROR("Client backend is NULL");
     232            1 :                 result = false;
     233            1 :                 return;
     234              :             }
     235          149 :             m_clientBackend->createMediaPlayerBackend(shared_from_this(), m_maxWidth, m_maxHeight);
     236              : 
     237          149 :             if (m_clientBackend->isMediaPlayerBackendCreated())
     238              :             {
     239          144 :                 std::string utf8url = "mse://1";
     240          144 :                 firebolt::rialto::MediaType mediaType = firebolt::rialto::MediaType::MSE;
     241          432 :                 if (!m_clientBackend->load(mediaType, "", utf8url))
     242              :                 {
     243            1 :                     GST_ERROR("Could not load RialtoClient");
     244            1 :                     return;
     245              :                 }
     246          143 :                 result = true;
     247          144 :             }
     248              :             else
     249              :             {
     250            5 :                 GST_ERROR("Media player backend could not be created");
     251              :             }
     252              :         });
     253              : 
     254          150 :     return result;
     255              : }
     256              : 
     257           19 : StateChangeResult GStreamerMSEMediaPlayerClient::play(int32_t sourceId)
     258              : {
     259           19 :     StateChangeResult result = StateChangeResult::NOT_ATTACHED;
     260           38 :     m_backendQueue->callInEventLoop(
     261           19 :         [&]()
     262              :         {
     263           19 :             auto sourceIt = m_attachedSources.find(sourceId);
     264           19 :             if (sourceIt == m_attachedSources.end())
     265              :             {
     266            1 :                 GST_ERROR("Cannot play - there's no attached source with id %d", sourceId);
     267            1 :                 result = StateChangeResult::NOT_ATTACHED;
     268            3 :                 return;
     269              :             }
     270              : 
     271           18 :             if (m_serverPlaybackState == firebolt::rialto::PlaybackState::PLAYING)
     272              :             {
     273            2 :                 GST_INFO("Server is already playing");
     274            2 :                 sourceIt->second.m_state = ClientState::PLAYING;
     275              : 
     276            6 :                 if (checkIfAllAttachedSourcesInStates({ClientState::PLAYING}))
     277              :                 {
     278            1 :                     m_clientState = ClientState::PLAYING;
     279              :                 }
     280              : 
     281            2 :                 result = StateChangeResult::SUCCESS_SYNC;
     282            2 :                 return;
     283              :             }
     284              : 
     285           16 :             sourceIt->second.m_state = ClientState::AWAITING_PLAYING;
     286              : 
     287           16 :             if (m_clientState == ClientState::PAUSED)
     288              :             {
     289              :                 // If one source is AWAITING_PLAYING, the other source can still be PLAYING.
     290              :                 // This happends when we are switching out audio.
     291           39 :                 if (checkIfAllAttachedSourcesInStates({ClientState::AWAITING_PLAYING, ClientState::PLAYING}))
     292              :                 {
     293            9 :                     GST_INFO("Sending play command");
     294            9 :                     m_clientBackend->play();
     295            9 :                     m_clientState = ClientState::AWAITING_PLAYING;
     296              :                 }
     297              :                 else
     298              :                 {
     299            4 :                     GST_DEBUG("Not all sources are ready to play");
     300              :                 }
     301              :             }
     302              :             else
     303              :             {
     304            3 :                 GST_WARNING("Not in PAUSED state in %u state", static_cast<uint32_t>(m_clientState));
     305              :             }
     306              : 
     307           16 :             result = StateChangeResult::SUCCESS_ASYNC;
     308           16 :             rialto_mse_base_async_start(sourceIt->second.m_rialtoSink);
     309              :         });
     310              : 
     311           19 :     return result;
     312              : }
     313              : 
     314          291 : StateChangeResult GStreamerMSEMediaPlayerClient::pause(int32_t sourceId)
     315              : {
     316          291 :     StateChangeResult result = StateChangeResult::NOT_ATTACHED;
     317          582 :     m_backendQueue->callInEventLoop(
     318          291 :         [&]()
     319              :         {
     320          291 :             auto sourceIt = m_attachedSources.find(sourceId);
     321          291 :             if (sourceIt == m_attachedSources.end())
     322              :             {
     323          136 :                 GST_WARNING("Cannot pause - there's no attached source with id %d", sourceId);
     324              : 
     325          136 :                 result = StateChangeResult::NOT_ATTACHED;
     326          136 :                 return;
     327              :             }
     328              : 
     329          155 :             if (m_serverPlaybackState == firebolt::rialto::PlaybackState::PAUSED &&
     330            4 :                 m_clientState != ClientState::AWAITING_PLAYING && m_clientState != ClientState::AWAITING_PAUSED)
     331              :             {
     332              :                 // if the server is already paused and we are not in async, we don't need to send pause command
     333            2 :                 GST_INFO("Server is already paused");
     334            2 :                 sourceIt->second.m_state = ClientState::PAUSED;
     335              : 
     336            6 :                 if (checkIfAllAttachedSourcesInStates({ClientState::PAUSED}))
     337              :                 {
     338            1 :                     m_clientState = ClientState::PAUSED;
     339              :                 }
     340              : 
     341            2 :                 result = StateChangeResult::SUCCESS_SYNC;
     342              :             }
     343              :             else
     344              :             {
     345          153 :                 sourceIt->second.m_state = ClientState::AWAITING_PAUSED;
     346              : 
     347          153 :                 bool shouldPause = false;
     348          153 :                 if (m_clientState == ClientState::READY)
     349              :                 {
     350          426 :                     if (checkIfAllAttachedSourcesInStates({ClientState::AWAITING_PAUSED}))
     351              :                     {
     352          134 :                         shouldPause = true;
     353              :                     }
     354              :                     else
     355              :                     {
     356            8 :                         GST_DEBUG("Not all attached sources are ready to pause");
     357              :                     }
     358              :                 }
     359           11 :                 else if (m_clientState == ClientState::AWAITING_PLAYING || m_clientState == ClientState::PLAYING)
     360              :                 {
     361            8 :                     shouldPause = true;
     362              :                 }
     363              :                 else
     364              :                 {
     365            3 :                     GST_DEBUG("Cannot pause in %u state", static_cast<uint32_t>(m_clientState));
     366              :                 }
     367              : 
     368          153 :                 if (shouldPause)
     369              :                 {
     370          142 :                     GST_INFO("Sending pause command in %u state", static_cast<uint32_t>(m_clientState));
     371          142 :                     m_clientBackend->pause();
     372          142 :                     m_clientState = ClientState::AWAITING_PAUSED;
     373              :                 }
     374              : 
     375          153 :                 result = StateChangeResult::SUCCESS_ASYNC;
     376          153 :                 rialto_mse_base_async_start(sourceIt->second.m_rialtoSink);
     377              :             }
     378              :         });
     379              : 
     380          291 :     return result;
     381              : }
     382              : 
     383          143 : void GStreamerMSEMediaPlayerClient::stop()
     384              : {
     385          286 :     m_backendQueue->callInEventLoop([&]() { m_clientBackend->stop(); });
     386          143 : }
     387              : 
     388            3 : void GStreamerMSEMediaPlayerClient::setPlaybackRate(double rate)
     389              : {
     390            6 :     m_backendQueue->callInEventLoop([&]() { m_clientBackend->setPlaybackRate(rate); });
     391            3 : }
     392              : 
     393            6 : void GStreamerMSEMediaPlayerClient::flush(int32_t sourceId, bool resetTime)
     394              : {
     395           12 :     m_backendQueue->callInEventLoop(
     396            6 :         [&]()
     397              :         {
     398            6 :             bool async{true};
     399            6 :             auto sourceIt = m_attachedSources.find(sourceId);
     400            6 :             if (sourceIt == m_attachedSources.end())
     401              :             {
     402            2 :                 GST_ERROR("Cannot flush - there's no attached source with id %d", sourceId);
     403            3 :                 return;
     404              :             }
     405            4 :             if (!m_clientBackend->flush(sourceId, resetTime, async))
     406              :             {
     407            1 :                 GST_ERROR("Flush operation failed for source with id %d", sourceId);
     408            1 :                 return;
     409              :             }
     410            3 :             sourceIt->second.m_isFlushing = true;
     411            3 :             sourceIt->second.m_bufferPuller->stop();
     412              : 
     413            3 :             if (async)
     414              :             {
     415            3 :                 GST_ERROR("Flush request sent for async source %d. Sink will lose state now", sourceId);
     416            3 :                 rialto_mse_base_sink_lost_state(sourceIt->second.m_rialtoSink);
     417              : 
     418            3 :                 sourceIt->second.m_state = ClientState::AWAITING_PAUSED;
     419            3 :                 if (m_clientState == ClientState::PLAYING)
     420              :                 {
     421            0 :                     m_clientState = ClientState::AWAITING_PLAYING;
     422              :                 }
     423            3 :                 else if (m_clientState == ClientState::PAUSED)
     424              :                 {
     425            1 :                     m_clientState = ClientState::AWAITING_PAUSED;
     426              :                 }
     427              :             }
     428              :         });
     429            6 : }
     430              : 
     431            8 : void GStreamerMSEMediaPlayerClient::setSourcePosition(int32_t sourceId, int64_t position, bool resetTime,
     432              :                                                       double appliedRate, uint64_t stopPosition)
     433              : {
     434           16 :     m_backendQueue->callInEventLoop(
     435            8 :         [&]()
     436              :         {
     437            8 :             auto sourceIt = m_attachedSources.find(sourceId);
     438            8 :             if (sourceIt == m_attachedSources.end())
     439              :             {
     440            1 :                 GST_ERROR("Cannot Set Source Position - there's no attached source with id %d", sourceId);
     441            2 :                 return;
     442              :             }
     443            7 :             if (!m_clientBackend->setSourcePosition(sourceId, position, resetTime, appliedRate, stopPosition))
     444              :             {
     445            1 :                 GST_ERROR("Set Source Position operation failed for source with id %d", sourceId);
     446            1 :                 return;
     447              :             }
     448            6 :             sourceIt->second.m_position = position;
     449              :         });
     450            8 : }
     451              : 
     452            4 : void GStreamerMSEMediaPlayerClient::processAudioGap(int64_t position, uint32_t duration, int64_t discontinuityGap,
     453              :                                                     bool audioAac)
     454              : {
     455            8 :     m_backendQueue->callInEventLoop(
     456            4 :         [&]()
     457              :         {
     458            4 :             if (!m_clientBackend->processAudioGap(position, duration, discontinuityGap, audioAac))
     459              :             {
     460            1 :                 GST_ERROR("Process Audio Gap operation failed");
     461            1 :                 return;
     462              :             }
     463              :         });
     464            4 : }
     465              : 
     466          187 : bool GStreamerMSEMediaPlayerClient::attachSource(std::unique_ptr<firebolt::rialto::IMediaPipeline::MediaSource> &source,
     467              :                                                  RialtoMSEBaseSink *rialtoSink)
     468              : {
     469          187 :     if (source->getType() != firebolt::rialto::MediaSourceType::AUDIO &&
     470          199 :         source->getType() != firebolt::rialto::MediaSourceType::VIDEO &&
     471           12 :         source->getType() != firebolt::rialto::MediaSourceType::SUBTITLE)
     472              :     {
     473            1 :         GST_WARNING_OBJECT(rialtoSink, "Invalid source type %u", static_cast<uint32_t>(source->getType()));
     474            1 :         return false;
     475              :     }
     476              : 
     477          186 :     bool result = false;
     478          372 :     m_backendQueue->callInEventLoop(
     479          186 :         [&]()
     480              :         {
     481          186 :             result = m_clientBackend->attachSource(source);
     482              : 
     483          186 :             if (result)
     484              :             {
     485          185 :                 std::shared_ptr<BufferParser> bufferParser;
     486          185 :                 if (source->getType() == firebolt::rialto::MediaSourceType::AUDIO)
     487              :                 {
     488          135 :                     bufferParser = std::make_shared<AudioBufferParser>();
     489              :                 }
     490           50 :                 else if (source->getType() == firebolt::rialto::MediaSourceType::VIDEO)
     491              :                 {
     492           39 :                     bufferParser = std::make_shared<VideoBufferParser>();
     493              :                 }
     494           11 :                 else if (source->getType() == firebolt::rialto::MediaSourceType::SUBTITLE)
     495              :                 {
     496           11 :                     bufferParser = std::make_shared<SubtitleBufferParser>();
     497              :                 }
     498              : 
     499              :                 std::shared_ptr<BufferPuller> bufferPuller =
     500          185 :                     std::make_shared<BufferPuller>(m_messageQueueFactory, GST_ELEMENT_CAST(rialtoSink), bufferParser);
     501              : 
     502          185 :                 if (m_attachedSources.find(source->getId()) == m_attachedSources.end())
     503              :                 {
     504          185 :                     m_attachedSources.emplace(source->getId(),
     505          370 :                                               AttachedSource(rialtoSink, bufferPuller, source->getType()));
     506              : 
     507          185 :                     rialtoSink->priv->m_sourceId = source->getId();
     508          185 :                     bufferPuller->start();
     509              :                 }
     510              :             }
     511              : 
     512          186 :             sendAllSourcesAttachedIfPossibleInternal();
     513          186 :         });
     514              : 
     515          186 :     return result;
     516              : }
     517              : 
     518            4 : void GStreamerMSEMediaPlayerClient::sendAllSourcesAttachedIfPossible()
     519              : {
     520            8 :     m_backendQueue->callInEventLoop([&]() { sendAllSourcesAttachedIfPossibleInternal(); });
     521            4 : }
     522              : 
     523          190 : void GStreamerMSEMediaPlayerClient::sendAllSourcesAttachedIfPossibleInternal()
     524              : {
     525          190 :     if (!m_wasAllSourcesAttachedSent && areAllStreamsAttached())
     526              :     {
     527              :         // RialtoServer doesn't support dynamic source attachment.
     528              :         // It means that when we notify that all sources were attached, we cannot add any more sources in the current session
     529          140 :         GST_INFO("All sources attached");
     530          140 :         m_clientBackend->allSourcesAttached();
     531          140 :         m_wasAllSourcesAttachedSent = true;
     532          140 :         m_clientState = ClientState::READY;
     533              : 
     534              :         // In playbin3 streams, confirmation about number of available sources comes after attaching the source,
     535              :         // so we need to check if all sources are ready to pause
     536          420 :         if (checkIfAllAttachedSourcesInStates({ClientState::AWAITING_PAUSED}))
     537              :         {
     538            1 :             GST_INFO("Sending pause command, because all attached sources are ready to pause");
     539            1 :             m_clientBackend->pause();
     540            1 :             m_clientState = ClientState::AWAITING_PAUSED;
     541              :         }
     542              :     }
     543          190 : }
     544              : 
     545          137 : void GStreamerMSEMediaPlayerClient::removeSource(int32_t sourceId)
     546              : {
     547          274 :     m_backendQueue->callInEventLoop(
     548          137 :         [&]()
     549              :         {
     550          137 :             if (!m_clientBackend->removeSource(sourceId))
     551              :             {
     552            1 :                 GST_WARNING("Remove source %d failed", sourceId);
     553              :             }
     554          137 :             m_attachedSources.erase(sourceId);
     555          137 :         });
     556              : }
     557              : 
     558           52 : void GStreamerMSEMediaPlayerClient::handlePlaybackStateChange(firebolt::rialto::PlaybackState state)
     559              : {
     560           52 :     GST_DEBUG("Received state change to state %u", static_cast<uint32_t>(state));
     561          104 :     m_backendQueue->callInEventLoop(
     562           52 :         [&]()
     563              :         {
     564           52 :             m_serverPlaybackState = state;
     565           52 :             switch (state)
     566              :             {
     567           46 :             case firebolt::rialto::PlaybackState::PAUSED:
     568              :             case firebolt::rialto::PlaybackState::PLAYING:
     569              :             {
     570           46 :                 if (state == firebolt::rialto::PlaybackState::PAUSED && m_clientState == ClientState::AWAITING_PAUSED)
     571              :                 {
     572           28 :                     m_clientState = ClientState::PAUSED;
     573              :                 }
     574           18 :                 else if (state == firebolt::rialto::PlaybackState::PLAYING &&
     575           10 :                          m_clientState == ClientState::AWAITING_PLAYING)
     576              :                 {
     577            7 :                     m_clientState = ClientState::PLAYING;
     578              :                 }
     579           11 :                 else if (state == firebolt::rialto::PlaybackState::PLAYING &&
     580            3 :                          m_clientState == ClientState::AWAITING_PAUSED)
     581              :                 {
     582            1 :                     GST_WARNING("Outdated Playback State change to PLAYING received. Discarding...");
     583            1 :                     break;
     584              :                 }
     585              : 
     586          100 :                 for (auto &source : m_attachedSources)
     587              :                 {
     588           55 :                     if (state == firebolt::rialto::PlaybackState::PAUSED &&
     589           43 :                         source.second.m_state == ClientState::AWAITING_PAUSED)
     590              :                     {
     591           40 :                         source.second.m_state = ClientState::PAUSED;
     592              :                     }
     593           15 :                     else if (state == firebolt::rialto::PlaybackState::PLAYING &&
     594           12 :                              source.second.m_state == ClientState::AWAITING_PLAYING)
     595              :                     {
     596            9 :                         source.second.m_state = ClientState::PLAYING;
     597              :                     }
     598              : 
     599           55 :                     rialto_mse_base_handle_rialto_server_state_changed(source.second.m_rialtoSink, state);
     600              :                 }
     601              : 
     602           45 :                 break;
     603              :             }
     604            3 :             case firebolt::rialto::PlaybackState::END_OF_STREAM:
     605              :             {
     606            6 :                 for (const auto &source : m_attachedSources)
     607              :                 {
     608            3 :                     rialto_mse_base_handle_rialto_server_eos(source.second.m_rialtoSink);
     609              :                 }
     610              :             }
     611            3 :             break;
     612            1 :             case firebolt::rialto::PlaybackState::SEEK_DONE:
     613              :             {
     614            1 :                 GST_WARNING("firebolt::rialto::PlaybackState::SEEK_DONE notification not supported");
     615            1 :                 break;
     616              :             }
     617            1 :             case firebolt::rialto::PlaybackState::FAILURE:
     618              :             {
     619            2 :                 for (const auto &source : m_attachedSources)
     620              :                 {
     621            1 :                     rialto_mse_base_handle_rialto_server_error(source.second.m_rialtoSink,
     622              :                                                                firebolt::rialto::PlaybackError::UNKNOWN);
     623              :                 }
     624            2 :                 for (auto &source : m_attachedSources)
     625              :                 {
     626            1 :                     source.second.m_position = 0;
     627              :                 }
     628              : 
     629            1 :                 break;
     630              :             }
     631              :             break;
     632            1 :             default:
     633            1 :                 break;
     634              :             }
     635           52 :         });
     636              : }
     637              : 
     638            3 : void GStreamerMSEMediaPlayerClient::handleSourceFlushed(int32_t sourceId)
     639              : {
     640            6 :     m_backendQueue->callInEventLoop(
     641            3 :         [&]()
     642              :         {
     643            3 :             auto sourceIt = m_attachedSources.find(sourceId);
     644            3 :             if (sourceIt == m_attachedSources.end())
     645              :             {
     646            1 :                 GST_ERROR("Cannot finish flush - there's no attached source with id %d", sourceId);
     647            2 :                 return;
     648              :             }
     649            2 :             if (!sourceIt->second.m_isFlushing)
     650              :             {
     651            1 :                 GST_ERROR("Cannot finish flush - source with id %d is not flushing!", sourceId);
     652            1 :                 return;
     653              :             }
     654            1 :             sourceIt->second.m_isFlushing = false;
     655            1 :             sourceIt->second.m_bufferPuller->start();
     656            1 :             rialto_mse_base_handle_rialto_server_completed_flush(sourceIt->second.m_rialtoSink);
     657              :         });
     658            3 : }
     659              : 
     660            7 : void GStreamerMSEMediaPlayerClient::setVideoRectangle(const std::string &rectangleString)
     661              : {
     662           14 :     m_backendQueue->callInEventLoop(
     663            7 :         [&]()
     664              :         {
     665            7 :             if (!m_clientBackend || !m_clientBackend->isMediaPlayerBackendCreated())
     666              :             {
     667            1 :                 GST_WARNING("Missing RialtoClient backend - can't set video window now");
     668            3 :                 return;
     669              :             }
     670              : 
     671            6 :             if (rectangleString.empty())
     672              :             {
     673            1 :                 GST_WARNING("Empty video rectangle string");
     674            1 :                 return;
     675              :             }
     676              : 
     677            5 :             Rectangle rect = {0, 0, 0, 0};
     678            5 :             if (sscanf(rectangleString.c_str(), "%u,%u,%u,%u", &rect.x, &rect.y, &rect.width, &rect.height) != 4)
     679              :             {
     680            1 :                 GST_WARNING("Invalid video rectangle values");
     681            1 :                 return;
     682              :             }
     683              : 
     684            4 :             m_clientBackend->setVideoWindow(rect.x, rect.y, rect.width, rect.height);
     685            4 :             m_videoRectangle = rect;
     686              :         });
     687            7 : }
     688              : 
     689            4 : std::string GStreamerMSEMediaPlayerClient::getVideoRectangle()
     690              : {
     691              :     char rectangle[64];
     692            8 :     m_backendQueue->callInEventLoop(
     693            8 :         [&]()
     694              :         {
     695            4 :             sprintf(rectangle, "%u,%u,%u,%u", m_videoRectangle.x, m_videoRectangle.y, m_videoRectangle.width,
     696              :                     m_videoRectangle.height);
     697            4 :         });
     698              : 
     699            8 :     return std::string(rectangle);
     700              : }
     701              : 
     702            4 : bool GStreamerMSEMediaPlayerClient::renderFrame(RialtoMSEBaseSink *sink)
     703              : {
     704            4 :     bool result = false;
     705            8 :     m_backendQueue->callInEventLoop(
     706            4 :         [&]()
     707              :         {
     708            4 :             result = m_clientBackend->renderFrame();
     709            4 :             if (result)
     710              :             {
     711              :                 // RialtoServer's video sink should drop PAUSED state due to skipping prerolled buffer in PAUSED state
     712            3 :                 rialto_mse_base_sink_lost_state(sink);
     713              :             }
     714            4 :         });
     715            4 :     return result;
     716              : }
     717              : 
     718            6 : void GStreamerMSEMediaPlayerClient::setVolume(double targetVolume, uint32_t volumeDuration,
     719              :                                               firebolt::rialto::EaseType easeType)
     720              : {
     721           12 :     m_backendQueue->callInEventLoop([&]() { m_clientBackend->setVolume(targetVolume, volumeDuration, easeType); });
     722            6 : }
     723              : 
     724            5 : bool GStreamerMSEMediaPlayerClient::getVolume(double &volume)
     725              : {
     726            5 :     bool status{false};
     727           10 :     m_backendQueue->callInEventLoop([&]() { status = m_clientBackend->getVolume(volume); });
     728            5 :     return status;
     729              : }
     730              : 
     731            7 : void GStreamerMSEMediaPlayerClient::setMute(bool mute, int32_t sourceId)
     732              : {
     733           14 :     m_backendQueue->callInEventLoop([&]() { m_clientBackend->setMute(mute, sourceId); });
     734            7 : }
     735              : 
     736            3 : bool GStreamerMSEMediaPlayerClient::getMute(int sourceId)
     737              : {
     738            3 :     bool mute{false};
     739            6 :     m_backendQueue->callInEventLoop([&]() { m_clientBackend->getMute(mute, sourceId); });
     740              : 
     741            3 :     return mute;
     742              : }
     743              : 
     744            3 : void GStreamerMSEMediaPlayerClient::setTextTrackIdentifier(const std::string &textTrackIdentifier)
     745              : {
     746            6 :     m_backendQueue->callInEventLoop([&]() { m_clientBackend->setTextTrackIdentifier(textTrackIdentifier); });
     747            3 : }
     748              : 
     749            2 : std::string GStreamerMSEMediaPlayerClient::getTextTrackIdentifier()
     750              : {
     751            2 :     std::string getTextTrackIdentifier;
     752            4 :     m_backendQueue->callInEventLoop([&]() { m_clientBackend->getTextTrackIdentifier(getTextTrackIdentifier); });
     753            2 :     return getTextTrackIdentifier;
     754              : }
     755              : 
     756            6 : bool GStreamerMSEMediaPlayerClient::setLowLatency(bool lowLatency)
     757              : {
     758            6 :     if (!m_clientBackend)
     759              :     {
     760            1 :         return false;
     761              :     }
     762              : 
     763            5 :     bool status{false};
     764           10 :     m_backendQueue->callInEventLoop([&]() { status = m_clientBackend->setLowLatency(lowLatency); });
     765            5 :     return status;
     766              : }
     767              : 
     768            6 : bool GStreamerMSEMediaPlayerClient::setSync(bool sync)
     769              : {
     770            6 :     if (!m_clientBackend)
     771              :     {
     772            1 :         return false;
     773              :     }
     774              : 
     775            5 :     bool status{false};
     776           10 :     m_backendQueue->callInEventLoop([&]() { status = m_clientBackend->setSync(sync); });
     777            5 :     return status;
     778              : }
     779              : 
     780            4 : bool GStreamerMSEMediaPlayerClient::getSync(bool &sync)
     781              : {
     782            4 :     if (!m_clientBackend)
     783              :     {
     784            1 :         return false;
     785              :     }
     786              : 
     787            3 :     bool status{false};
     788            6 :     m_backendQueue->callInEventLoop([&]() { status = m_clientBackend->getSync(sync); });
     789            3 :     return status;
     790              : }
     791              : 
     792            6 : bool GStreamerMSEMediaPlayerClient::setSyncOff(bool syncOff)
     793              : {
     794            6 :     if (!m_clientBackend)
     795              :     {
     796            1 :         return false;
     797              :     }
     798              : 
     799            5 :     bool status{false};
     800           10 :     m_backendQueue->callInEventLoop([&]() { status = m_clientBackend->setSyncOff(syncOff); });
     801            5 :     return status;
     802              : }
     803              : 
     804           10 : bool GStreamerMSEMediaPlayerClient::setStreamSyncMode(int32_t sourceId, int32_t streamSyncMode)
     805              : {
     806           10 :     if (!m_clientBackend)
     807              :     {
     808            1 :         return false;
     809              :     }
     810              : 
     811            9 :     bool status{false};
     812           18 :     m_backendQueue->callInEventLoop([&]() { status = m_clientBackend->setStreamSyncMode(sourceId, streamSyncMode); });
     813            9 :     return status;
     814              : }
     815              : 
     816            4 : bool GStreamerMSEMediaPlayerClient::getStreamSyncMode(int32_t &streamSyncMode)
     817              : {
     818            4 :     if (!m_clientBackend)
     819              :     {
     820            1 :         return false;
     821              :     }
     822              : 
     823            3 :     bool status{false};
     824            6 :     m_backendQueue->callInEventLoop([&]() { status = m_clientBackend->getStreamSyncMode(streamSyncMode); });
     825            3 :     return status;
     826              : }
     827              : 
     828           39 : ClientState GStreamerMSEMediaPlayerClient::getClientState()
     829              : {
     830           39 :     ClientState state{ClientState::IDLE};
     831           39 :     m_backendQueue->callInEventLoop([&]() { state = m_clientState; });
     832           39 :     return state;
     833              : }
     834              : 
     835          156 : void GStreamerMSEMediaPlayerClient::handleStreamCollection(int32_t audioStreams, int32_t videoStreams,
     836              :                                                            int32_t subtitleStreams)
     837              : {
     838          312 :     m_backendQueue->callInEventLoop(
     839          156 :         [&]()
     840              :         {
     841          156 :             if (m_audioStreams == UNKNOWN_STREAMS_NUMBER)
     842          153 :                 m_audioStreams = audioStreams;
     843          156 :             if (m_videoStreams == UNKNOWN_STREAMS_NUMBER)
     844          152 :                 m_videoStreams = videoStreams;
     845          156 :             if (m_subtitleStreams == UNKNOWN_STREAMS_NUMBER)
     846          152 :                 m_subtitleStreams = subtitleStreams;
     847              : 
     848          156 :             GST_INFO("Updated number of streams. New streams' numbers; video=%d, audio=%d, text=%d", m_videoStreams,
     849              :                      m_audioStreams, m_subtitleStreams);
     850          156 :         });
     851              : }
     852              : 
     853            3 : void GStreamerMSEMediaPlayerClient::setBufferingLimit(uint32_t limitBufferingMs)
     854              : {
     855            3 :     if (!m_clientBackend)
     856              :     {
     857            0 :         return;
     858              :     }
     859            6 :     m_backendQueue->callInEventLoop([&]() { m_clientBackend->setBufferingLimit(limitBufferingMs); });
     860              : }
     861              : 
     862            2 : uint32_t GStreamerMSEMediaPlayerClient::getBufferingLimit()
     863              : {
     864            2 :     if (!m_clientBackend)
     865              :     {
     866            0 :         return kDefaultBufferingLimit;
     867              :     }
     868              : 
     869            2 :     uint32_t result{kDefaultBufferingLimit};
     870            4 :     m_backendQueue->callInEventLoop([&]() { m_clientBackend->getBufferingLimit(result); });
     871            2 :     return result;
     872              : }
     873              : 
     874            3 : void GStreamerMSEMediaPlayerClient::setUseBuffering(bool useBuffering)
     875              : {
     876            3 :     if (!m_clientBackend)
     877              :     {
     878            0 :         return;
     879              :     }
     880            6 :     m_backendQueue->callInEventLoop([&]() { m_clientBackend->setUseBuffering(useBuffering); });
     881              : }
     882              : 
     883            2 : bool GStreamerMSEMediaPlayerClient::getUseBuffering()
     884              : {
     885            2 :     if (!m_clientBackend)
     886              :     {
     887            0 :         return kDefaultUseBuffering;
     888              :     }
     889              : 
     890            2 :     bool result{kDefaultUseBuffering};
     891            4 :     m_backendQueue->callInEventLoop([&]() { m_clientBackend->getUseBuffering(result); });
     892            2 :     return result;
     893              : }
     894              : 
     895            3 : bool GStreamerMSEMediaPlayerClient::switchSource(const std::unique_ptr<firebolt::rialto::IMediaPipeline::MediaSource> &source)
     896              : {
     897            3 :     bool result = false;
     898            6 :     m_backendQueue->callInEventLoop([&]() { result = m_clientBackend->switchSource(source); });
     899              : 
     900            3 :     return result;
     901              : }
     902              : 
     903          299 : bool GStreamerMSEMediaPlayerClient::checkIfAllAttachedSourcesInStates(const std::vector<ClientState> &states)
     904              : {
     905          598 :     return std::all_of(m_attachedSources.begin(), m_attachedSources.end(), [states](const auto &source)
     906          910 :                        { return std::find(states.begin(), states.end(), source.second.m_state) != states.end(); });
     907              : }
     908              : 
     909          189 : bool GStreamerMSEMediaPlayerClient::areAllStreamsAttached()
     910              : {
     911          189 :     int32_t attachedVideoSources = 0;
     912          189 :     int32_t attachedAudioSources = 0;
     913          189 :     int32_t attachedSubtitleSources = 0;
     914          388 :     for (auto &source : m_attachedSources)
     915              :     {
     916          199 :         if (source.second.getType() == firebolt::rialto::MediaSourceType::VIDEO)
     917              :         {
     918           39 :             attachedVideoSources++;
     919              :         }
     920          160 :         else if (source.second.getType() == firebolt::rialto::MediaSourceType::AUDIO)
     921              :         {
     922          149 :             attachedAudioSources++;
     923              :         }
     924           11 :         else if (source.second.getType() == firebolt::rialto::MediaSourceType::SUBTITLE)
     925              :         {
     926           11 :             attachedSubtitleSources++;
     927              :         }
     928              :     }
     929              : 
     930          329 :     return attachedVideoSources == m_videoStreams && attachedAudioSources == m_audioStreams &&
     931          329 :            attachedSubtitleSources == m_subtitleStreams;
     932              : }
     933              : 
     934            7 : bool GStreamerMSEMediaPlayerClient::requestPullBuffer(int streamId, size_t frameCount, unsigned int needDataRequestId)
     935              : {
     936            7 :     bool result = false;
     937           14 :     m_backendQueue->callInEventLoop(
     938            7 :         [&]()
     939              :         {
     940            7 :             auto sourceIt = m_attachedSources.find(streamId);
     941            7 :             if (sourceIt == m_attachedSources.end())
     942              :             {
     943            1 :                 GST_ERROR("There's no attached source with id %d", streamId);
     944              : 
     945            1 :                 result = false;
     946            1 :                 return;
     947              :             }
     948            6 :             result = sourceIt->second.m_bufferPuller->requestPullBuffer(streamId, frameCount, needDataRequestId, this);
     949              :         });
     950              : 
     951            7 :     return result;
     952              : }
     953              : 
     954            4 : bool GStreamerMSEMediaPlayerClient::handleQos(int sourceId, firebolt::rialto::QosInfo qosInfo)
     955              : {
     956            4 :     bool result = false;
     957            8 :     m_backendQueue->callInEventLoop(
     958            4 :         [&]()
     959              :         {
     960            4 :             auto sourceIt = m_attachedSources.find(sourceId);
     961            4 :             if (sourceIt == m_attachedSources.end())
     962              :             {
     963            1 :                 result = false;
     964            1 :                 return;
     965              :             }
     966              : 
     967            3 :             rialto_mse_base_handle_rialto_server_sent_qos(sourceIt->second.m_rialtoSink, qosInfo.processed,
     968            3 :                                                           qosInfo.dropped);
     969              : 
     970            3 :             result = true;
     971              :         });
     972              : 
     973            4 :     return result;
     974              : }
     975              : 
     976            2 : bool GStreamerMSEMediaPlayerClient::handleBufferUnderflow(int sourceId)
     977              : {
     978            2 :     bool result = false;
     979            4 :     m_backendQueue->callInEventLoop(
     980            2 :         [&]()
     981              :         {
     982            2 :             auto sourceIt = m_attachedSources.find(sourceId);
     983            2 :             if (sourceIt == m_attachedSources.end())
     984              :             {
     985            1 :                 result = false;
     986            1 :                 return;
     987              :             }
     988              : 
     989            1 :             rialto_mse_base_handle_rialto_server_sent_buffer_underflow(sourceIt->second.m_rialtoSink);
     990              : 
     991            1 :             result = true;
     992              :         });
     993              : 
     994            2 :     return result;
     995              : }
     996              : 
     997            2 : bool GStreamerMSEMediaPlayerClient::handlePlaybackError(int sourceId, firebolt::rialto::PlaybackError error)
     998              : {
     999            2 :     bool result = false;
    1000            4 :     m_backendQueue->callInEventLoop(
    1001            2 :         [&]()
    1002              :         {
    1003            2 :             auto sourceIt = m_attachedSources.find(sourceId);
    1004            2 :             if (sourceIt == m_attachedSources.end())
    1005              :             {
    1006            1 :                 result = false;
    1007            1 :                 return;
    1008              :             }
    1009              : 
    1010              :             // Even though rialto has only reported a non-fatal error, still fail the pipeline from rialto-gstreamer
    1011            1 :             GST_ERROR("Received Playback error '%s', posting error on %s sink", toString(error),
    1012              :                       toString(sourceIt->second.getType()));
    1013            1 :             rialto_mse_base_handle_rialto_server_error(sourceIt->second.m_rialtoSink, error);
    1014              : 
    1015            1 :             result = true;
    1016              :         });
    1017              : 
    1018            2 :     return result;
    1019              : }
    1020              : 
    1021            3 : firebolt::rialto::AddSegmentStatus GStreamerMSEMediaPlayerClient::addSegment(
    1022              :     unsigned int needDataRequestId, const std::unique_ptr<firebolt::rialto::IMediaPipeline::MediaSegment> &mediaSegment)
    1023              : {
    1024              :     // rialto client's addSegment call is MT safe, so it's ok to call it from the Puller's thread
    1025            3 :     return m_clientBackend->addSegment(needDataRequestId, mediaSegment);
    1026              : }
    1027              : 
    1028          185 : BufferPuller::BufferPuller(const std::shared_ptr<IMessageQueueFactory> &messageQueueFactory, GstElement *rialtoSink,
    1029          185 :                            const std::shared_ptr<BufferParser> &bufferParser)
    1030          185 :     : m_queue{messageQueueFactory->createMessageQueue()}, m_rialtoSink(rialtoSink), m_bufferParser(bufferParser)
    1031              : {
    1032              : }
    1033              : 
    1034          186 : void BufferPuller::start()
    1035              : {
    1036          186 :     m_queue->start();
    1037              : }
    1038              : 
    1039           61 : void BufferPuller::stop()
    1040              : {
    1041           61 :     m_queue->stop();
    1042              : }
    1043              : 
    1044            6 : bool BufferPuller::requestPullBuffer(int sourceId, size_t frameCount, unsigned int needDataRequestId,
    1045              :                                      GStreamerMSEMediaPlayerClient *player)
    1046              : {
    1047           18 :     return m_queue->postMessage(std::make_shared<PullBufferMessage>(sourceId, frameCount, needDataRequestId,
    1048           18 :                                                                     m_rialtoSink, m_bufferParser, *m_queue, player));
    1049              : }
    1050              : 
    1051            7 : HaveDataMessage::HaveDataMessage(firebolt::rialto::MediaSourceStatus status, int sourceId,
    1052            7 :                                  unsigned int needDataRequestId, GStreamerMSEMediaPlayerClient *player)
    1053            7 :     : m_status(status), m_sourceId(sourceId), m_needDataRequestId(needDataRequestId), m_player(player)
    1054              : {
    1055              : }
    1056              : 
    1057            7 : void HaveDataMessage::handle()
    1058              : {
    1059            7 :     if (m_player->m_attachedSources.find(m_sourceId) == m_player->m_attachedSources.end())
    1060              :     {
    1061            1 :         GST_WARNING("Source id %d is invalid", m_sourceId);
    1062            1 :         return;
    1063              :     }
    1064              : 
    1065            6 :     m_player->m_clientBackend->haveData(m_status, m_needDataRequestId);
    1066              : }
    1067              : 
    1068            6 : PullBufferMessage::PullBufferMessage(int sourceId, size_t frameCount, unsigned int needDataRequestId,
    1069              :                                      GstElement *rialtoSink, const std::shared_ptr<BufferParser> &bufferParser,
    1070            6 :                                      IMessageQueue &pullerQueue, GStreamerMSEMediaPlayerClient *player)
    1071            6 :     : m_sourceId(sourceId), m_frameCount(frameCount), m_needDataRequestId(needDataRequestId), m_rialtoSink(rialtoSink),
    1072            6 :       m_bufferParser(bufferParser), m_pullerQueue(pullerQueue), m_player(player)
    1073              : {
    1074              : }
    1075              : 
    1076            5 : void PullBufferMessage::handle()
    1077              : {
    1078            5 :     bool isEos = false;
    1079            5 :     unsigned int addedSegments = 0;
    1080              : 
    1081            7 :     for (unsigned int frame = 0; frame < m_frameCount; ++frame)
    1082              :     {
    1083            5 :         GstRefSample sample = rialto_mse_base_sink_get_front_sample(RIALTO_MSE_BASE_SINK(m_rialtoSink));
    1084            5 :         if (!sample)
    1085              :         {
    1086            2 :             if (rialto_mse_base_sink_is_eos(RIALTO_MSE_BASE_SINK(m_rialtoSink)))
    1087              :             {
    1088            1 :                 isEos = true;
    1089              :             }
    1090              :             else
    1091              :             {
    1092              :                 // it's not a critical issue. It might be caused by receiving too many need data requests.
    1093            1 :                 GST_INFO_OBJECT(m_rialtoSink, "Could not get a sample");
    1094              :             }
    1095            2 :             break;
    1096              :         }
    1097              : 
    1098              :         // we pass GstMapInfo's pointers on data buffers to RialtoClient
    1099              :         // so we need to hold it until RialtoClient copies them to shm
    1100            3 :         GstBuffer *buffer = sample.getBuffer();
    1101              :         GstMapInfo map;
    1102            3 :         if (!gst_buffer_map(buffer, &map, GST_MAP_READ))
    1103              :         {
    1104            1 :             GST_ERROR_OBJECT(m_rialtoSink, "Could not map buffer");
    1105            1 :             rialto_mse_base_sink_pop_sample(RIALTO_MSE_BASE_SINK(m_rialtoSink));
    1106            1 :             continue;
    1107              :         }
    1108              : 
    1109              :         std::unique_ptr<firebolt::rialto::IMediaPipeline::MediaSegment> mseData =
    1110            2 :             m_bufferParser->parseBuffer(sample, buffer, map, m_sourceId);
    1111            2 :         if (!mseData)
    1112              :         {
    1113            0 :             GST_ERROR_OBJECT(m_rialtoSink, "No data returned from the parser");
    1114            0 :             gst_buffer_unmap(buffer, &map);
    1115            0 :             rialto_mse_base_sink_pop_sample(RIALTO_MSE_BASE_SINK(m_rialtoSink));
    1116            0 :             continue;
    1117              :         }
    1118              : 
    1119            2 :         firebolt::rialto::AddSegmentStatus addSegmentStatus = m_player->addSegment(m_needDataRequestId, mseData);
    1120            2 :         if (addSegmentStatus == firebolt::rialto::AddSegmentStatus::NO_SPACE)
    1121              :         {
    1122            1 :             gst_buffer_unmap(buffer, &map);
    1123            1 :             GST_INFO_OBJECT(m_rialtoSink, "There's no space to add sample");
    1124            1 :             break;
    1125              :         }
    1126              : 
    1127            1 :         gst_buffer_unmap(buffer, &map);
    1128            1 :         rialto_mse_base_sink_pop_sample(RIALTO_MSE_BASE_SINK(m_rialtoSink));
    1129            1 :         addedSegments++;
    1130            6 :     }
    1131              : 
    1132            5 :     firebolt::rialto::MediaSourceStatus status = firebolt::rialto::MediaSourceStatus::OK;
    1133            5 :     if (isEos)
    1134              :     {
    1135            1 :         status = firebolt::rialto::MediaSourceStatus::EOS;
    1136              :     }
    1137            4 :     else if (addedSegments == 0)
    1138              :     {
    1139            3 :         status = firebolt::rialto::MediaSourceStatus::NO_AVAILABLE_SAMPLES;
    1140              :     }
    1141              : 
    1142           10 :     m_player->m_backendQueue->postMessage(
    1143           10 :         std::make_shared<HaveDataMessage>(status, m_sourceId, m_needDataRequestId, m_player));
    1144            5 : }
    1145              : 
    1146            7 : NeedDataMessage::NeedDataMessage(int sourceId, size_t frameCount, unsigned int needDataRequestId,
    1147            7 :                                  GStreamerMSEMediaPlayerClient *player)
    1148            7 :     : m_sourceId(sourceId), m_frameCount(frameCount), m_needDataRequestId(needDataRequestId), m_player(player)
    1149              : {
    1150              : }
    1151              : 
    1152            7 : void NeedDataMessage::handle()
    1153              : {
    1154            7 :     if (!m_player->requestPullBuffer(m_sourceId, m_frameCount, m_needDataRequestId))
    1155              :     {
    1156            2 :         GST_ERROR("Failed to pull buffer for sourceId=%d and NeedDataRequestId %u", m_sourceId, m_needDataRequestId);
    1157            4 :         m_player->m_backendQueue->postMessage(
    1158            2 :             std::make_shared<HaveDataMessage>(firebolt::rialto::MediaSourceStatus::ERROR, m_sourceId,
    1159            2 :                                               m_needDataRequestId, m_player));
    1160              :     }
    1161            7 : }
    1162              : 
    1163           17 : PlaybackStateMessage::PlaybackStateMessage(firebolt::rialto::PlaybackState state, GStreamerMSEMediaPlayerClient *player)
    1164           17 :     : m_state(state), m_player(player)
    1165              : {
    1166              : }
    1167              : 
    1168           17 : void PlaybackStateMessage::handle()
    1169              : {
    1170           17 :     m_player->handlePlaybackStateChange(m_state);
    1171              : }
    1172              : 
    1173            4 : QosMessage::QosMessage(int sourceId, firebolt::rialto::QosInfo qosInfo, GStreamerMSEMediaPlayerClient *player)
    1174            4 :     : m_sourceId(sourceId), m_qosInfo(qosInfo), m_player(player)
    1175              : {
    1176              : }
    1177              : 
    1178            4 : void QosMessage::handle()
    1179              : {
    1180            4 :     if (!m_player->handleQos(m_sourceId, m_qosInfo))
    1181              :     {
    1182            1 :         GST_ERROR("Failed to handle qos for sourceId=%d", m_sourceId);
    1183              :     }
    1184            4 : }
    1185              : 
    1186            2 : BufferUnderflowMessage::BufferUnderflowMessage(int sourceId, GStreamerMSEMediaPlayerClient *player)
    1187            2 :     : m_sourceId(sourceId), m_player(player)
    1188              : {
    1189              : }
    1190              : 
    1191            2 : void BufferUnderflowMessage::handle()
    1192              : {
    1193            2 :     if (!m_player->handleBufferUnderflow(m_sourceId))
    1194              :     {
    1195            1 :         GST_ERROR("Failed to handle buffer underflow for sourceId=%d", m_sourceId);
    1196              :     }
    1197            2 : }
    1198              : 
    1199            2 : PlaybackErrorMessage::PlaybackErrorMessage(int sourceId, firebolt::rialto::PlaybackError error,
    1200            2 :                                            GStreamerMSEMediaPlayerClient *player)
    1201            2 :     : m_sourceId(sourceId), m_error(error), m_player(player)
    1202              : {
    1203              : }
    1204              : 
    1205            2 : void PlaybackErrorMessage::handle()
    1206              : {
    1207            2 :     if (!m_player->handlePlaybackError(m_sourceId, m_error))
    1208              :     {
    1209            1 :         GST_ERROR("Failed to handle playback error for sourceId=%d, error %s", m_sourceId, toString(m_error));
    1210              :     }
    1211            2 : }
    1212              : 
    1213            1 : SetPositionMessage::SetPositionMessage(int64_t newPosition, std::unordered_map<int32_t, AttachedSource> &attachedSources)
    1214            1 :     : m_newPosition(newPosition), m_attachedSources(attachedSources)
    1215              : {
    1216              : }
    1217              : 
    1218            1 : void SetPositionMessage::handle()
    1219              : {
    1220            2 :     for (auto &source : m_attachedSources)
    1221              :     {
    1222            1 :         source.second.setPosition(m_newPosition);
    1223              :     }
    1224              : }
    1225              : 
    1226            1 : SetDurationMessage::SetDurationMessage(int64_t newDuration, int64_t &targetDuration)
    1227            1 :     : m_newDuration(newDuration), m_targetDuration(targetDuration)
    1228              : {
    1229              : }
    1230              : 
    1231            1 : void SetDurationMessage::handle()
    1232              : {
    1233            1 :     m_targetDuration = m_newDuration;
    1234              : }
    1235              : 
    1236            3 : SourceFlushedMessage::SourceFlushedMessage(int32_t sourceId, GStreamerMSEMediaPlayerClient *player)
    1237            3 :     : m_sourceId{sourceId}, m_player{player}
    1238              : {
    1239              : }
    1240              : 
    1241            3 : void SourceFlushedMessage::handle()
    1242              : {
    1243            3 :     m_player->handleSourceFlushed(m_sourceId);
    1244              : }
        

Generated by: LCOV version 2.0-1