LCOV - code coverage report
Current view: top level - source - GStreamerMSEMediaPlayerClient.cpp (source / functions) Coverage Total Hit
Test: coverage.info Lines: 94.9 % 693 658
Test Date: 2026-06-10 12:08:30 Functions: 99.3 % 140 139

            Line data    Source code
       1              : /*
       2              :  * Copyright (C) 2022 Sky UK
       3              :  *
       4              :  * This library is free software; you can redistribute it and/or
       5              :  * modify it under the terms of the GNU Lesser General Public
       6              :  * License as published by the Free Software Foundation;
       7              :  * version 2.1 of the License.
       8              :  *
       9              :  * This library is distributed in the hope that it will be useful,
      10              :  * but WITHOUT ANY WARRANTY; without even the implied warranty of
      11              :  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
      12              :  * Lesser General Public License for more details.
      13              :  *
      14              :  * You should have received a copy of the GNU Lesser General Public
      15              :  * License along with this library; if not, write to the Free Software
      16              :  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
      17              :  */
      18              : 
      19              : #include "GStreamerMSEMediaPlayerClient.h"
      20              : #include "Constants.h"
      21              : #include "GstreamerCatLog.h"
      22              : #include "RialtoGStreamerMSEBaseSink.h"
      23              : #include "RialtoGStreamerMSEBaseSinkPrivate.h"
      24              : #include "RialtoGStreamerMSEVideoSink.h"
      25              : 
      26              : #include <algorithm>
      27              : #include <chrono>
      28              : #include <thread>
      29              : 
      30              : namespace
      31              : {
      32              : // The start time of segment might differ from the first sample which is injected.
      33              : // That difference should not be bigger than 1 video / audio frame.
      34              : // 1 second is probably erring on the side of caution, but should not have side effect.
      35              : const int64_t segmentStartMaximumDiff = 1000000000;
      36              : const int32_t UNKNOWN_STREAMS_NUMBER = -1;
      37              : 
      38            1 : const char *toString(const firebolt::rialto::PlaybackError &error)
      39              : {
      40            1 :     switch (error)
      41              :     {
      42            0 :     case firebolt::rialto::PlaybackError::DECRYPTION:
      43            0 :         return "DECRYPTION";
      44            1 :     case firebolt::rialto::PlaybackError::OUTPUT_PROTECTION:
      45            1 :         return "OUTPUT_PROTECTION";
      46            0 :     case firebolt::rialto::PlaybackError::UNKNOWN:
      47            0 :         return "UNKNOWN";
      48              :     }
      49            0 :     return "UNKNOWN";
      50              : }
      51            0 : const char *toString(const firebolt::rialto::MediaSourceType &src)
      52              : {
      53            0 :     switch (src)
      54              :     {
      55            0 :     case firebolt::rialto::MediaSourceType::AUDIO:
      56            0 :         return "AUDIO";
      57            0 :     case firebolt::rialto::MediaSourceType::VIDEO:
      58            0 :         return "VIDEO";
      59            0 :     case firebolt::rialto::MediaSourceType::SUBTITLE:
      60            0 :         return "SUBTITLE";
      61            0 :     case firebolt::rialto::MediaSourceType::UNKNOWN:
      62            0 :         return "UNKNOWN";
      63              :     }
      64            0 :     return "UNKNOWN";
      65              : }
      66              : } // namespace
      67              : #define GST_CAT_DEFAULT rialtoGStreamerCat
      68          288 : GStreamerMSEMediaPlayerClient::GStreamerMSEMediaPlayerClient(
      69              :     const std::shared_ptr<IMessageQueueFactory> &messageQueueFactory,
      70              :     const std::shared_ptr<firebolt::rialto::client::MediaPlayerClientBackendInterface> &MediaPlayerClientBackend,
      71          288 :     const uint32_t maxVideoWidth, const uint32_t maxVideoHeight, bool isLive)
      72          288 :     : m_backendQueue{messageQueueFactory->createMessageQueue()}, m_messageQueueFactory{messageQueueFactory},
      73          288 :       m_clientBackend(MediaPlayerClientBackend), m_position(0), m_duration(0), m_audioStreams{UNKNOWN_STREAMS_NUMBER},
      74          288 :       m_videoStreams{UNKNOWN_STREAMS_NUMBER}, m_subtitleStreams{UNKNOWN_STREAMS_NUMBER},
      75          288 :       m_videoRectangle{0, 0, 1920, 1080}, m_streamingStopped(false),
      76          288 :       m_maxWidth(maxVideoWidth == 0 ? DEFAULT_MAX_VIDEO_WIDTH : maxVideoWidth),
      77          864 :       m_maxHeight(maxVideoHeight == 0 ? DEFAULT_MAX_VIDEO_HEIGHT : maxVideoHeight), m_isLive{isLive}
      78              : {
      79          288 :     m_backendQueue->start();
      80              : }
      81              : 
      82          288 : GStreamerMSEMediaPlayerClient::~GStreamerMSEMediaPlayerClient()
      83              : {
      84          288 :     stopStreaming();
      85              : }
      86              : 
      87          447 : void GStreamerMSEMediaPlayerClient::stopStreaming()
      88              : {
      89          447 :     if (!m_streamingStopped)
      90              :     {
      91          288 :         m_backendQueue->stop();
      92              : 
      93          356 :         for (auto &source : m_attachedSources)
      94              :         {
      95           68 :             source.second.m_bufferPuller->stop();
      96              :         }
      97              : 
      98          288 :         m_streamingStopped = true;
      99              :     }
     100          447 : }
     101              : 
     102              : // Deletes client backend -> this deletes mediapipeline object
     103          164 : void GStreamerMSEMediaPlayerClient::destroyClientBackend()
     104              : {
     105          164 :     m_clientBackend.reset();
     106              : }
     107              : 
     108            1 : void GStreamerMSEMediaPlayerClient::notifyDuration(int64_t duration)
     109              : {
     110            1 :     m_backendQueue->postMessage(std::make_shared<SetDurationMessage>(duration, m_duration));
     111              : }
     112              : 
     113            1 : void GStreamerMSEMediaPlayerClient::notifyPosition(int64_t position)
     114              : {
     115            1 :     m_backendQueue->postMessage(std::make_shared<SetPositionMessage>(position, m_attachedSources));
     116              : }
     117              : 
     118              : void GStreamerMSEMediaPlayerClient::notifyNativeSize(uint32_t width, uint32_t height, double aspect) {}
     119              : 
     120              : void GStreamerMSEMediaPlayerClient::notifyNetworkState(firebolt::rialto::NetworkState state) {}
     121              : 
     122           60 : void GStreamerMSEMediaPlayerClient::notifyPlaybackState(firebolt::rialto::PlaybackState state)
     123              : {
     124           60 :     m_backendQueue->postMessage(std::make_shared<PlaybackStateMessage>(state, this));
     125              : }
     126              : 
     127              : void GStreamerMSEMediaPlayerClient::notifyVideoData(bool hasData) {}
     128              : 
     129              : void GStreamerMSEMediaPlayerClient::notifyAudioData(bool hasData) {}
     130              : 
     131           10 : void GStreamerMSEMediaPlayerClient::notifyNeedMediaData(
     132              :     int32_t sourceId, size_t frameCount, uint32_t needDataRequestId,
     133              :     const std::shared_ptr<firebolt::rialto::MediaPlayerShmInfo> & /*shmInfo*/)
     134              : {
     135           10 :     m_backendQueue->postMessage(std::make_shared<NeedDataMessage>(sourceId, frameCount, needDataRequestId, this));
     136              : 
     137           10 :     return;
     138              : }
     139              : 
     140              : void GStreamerMSEMediaPlayerClient::notifyCancelNeedMediaData(int sourceId) {}
     141              : 
     142            5 : void GStreamerMSEMediaPlayerClient::notifyQos(int32_t sourceId, const firebolt::rialto::QosInfo &qosInfo)
     143              : {
     144            5 :     m_backendQueue->postMessage(std::make_shared<QosMessage>(sourceId, qosInfo, this));
     145              : }
     146              : 
     147            2 : void GStreamerMSEMediaPlayerClient::notifyBufferUnderflow(int32_t sourceId)
     148              : {
     149            2 :     m_backendQueue->postMessage(std::make_shared<BufferUnderflowMessage>(sourceId, this));
     150              : }
     151              : 
     152            2 : void GStreamerMSEMediaPlayerClient::notifyFirstFrameReceived(int32_t sourceId)
     153              : {
     154            2 :     m_backendQueue->postMessage(std::make_shared<FirstFrameReceivedMessage>(sourceId, this));
     155              : }
     156              : 
     157            7 : void GStreamerMSEMediaPlayerClient::notifyPlaybackError(int32_t sourceId, firebolt::rialto::PlaybackError error)
     158              : {
     159            7 :     m_backendQueue->postMessage(std::make_shared<PlaybackErrorMessage>(sourceId, error, this));
     160              : }
     161              : 
     162            9 : void GStreamerMSEMediaPlayerClient::notifySourceFlushed(int32_t sourceId)
     163              : {
     164            9 :     m_backendQueue->postMessage(std::make_shared<SourceFlushedMessage>(sourceId, this));
     165              : }
     166              : 
     167            8 : void GStreamerMSEMediaPlayerClient::notifyPlaybackInfo(const firebolt::rialto::PlaybackInfo &playbackInfo)
     168              : {
     169            8 :     if (m_flushAndDataSynchronizer.isAnySourceFlushing())
     170              :     {
     171            1 :         GST_WARNING("Not updating playback info, because flush is ongoing");
     172            1 :         return;
     173              :     }
     174            7 :     std::unique_lock lock{m_playbackInfoMutex};
     175            7 :     m_playbackInfo = playbackInfo;
     176              : }
     177              : 
     178            8 : int64_t GStreamerMSEMediaPlayerClient::getPosition(int32_t sourceId)
     179              : {
     180            8 :     std::unique_lock lock{m_playbackInfoMutex};
     181            8 :     return m_playbackInfo.currentPosition;
     182              : }
     183              : 
     184            4 : bool GStreamerMSEMediaPlayerClient::getDuration(int64_t &duration)
     185              : {
     186            4 :     if (!m_clientBackend)
     187              :     {
     188            1 :         return false;
     189              :     }
     190              : 
     191            3 :     bool status{false};
     192            6 :     m_backendQueue->callInEventLoop([&]() { status = m_clientBackend->getDuration(duration); });
     193            3 :     return status;
     194              : }
     195              : 
     196            5 : bool GStreamerMSEMediaPlayerClient::setImmediateOutput(int32_t sourceId, bool immediateOutput)
     197              : {
     198            5 :     if (!m_clientBackend)
     199              :     {
     200            1 :         return false;
     201              :     }
     202              : 
     203            4 :     bool status{false};
     204            8 :     m_backendQueue->callInEventLoop([&]() { status = m_clientBackend->setImmediateOutput(sourceId, immediateOutput); });
     205            4 :     return status;
     206              : }
     207              : 
     208            4 : bool GStreamerMSEMediaPlayerClient::getImmediateOutput(int32_t sourceId, bool &immediateOutput)
     209              : {
     210            4 :     if (!m_clientBackend)
     211              :     {
     212            1 :         return false;
     213              :     }
     214              : 
     215            3 :     bool status{false};
     216            6 :     m_backendQueue->callInEventLoop([&]() { status = m_clientBackend->getImmediateOutput(sourceId, immediateOutput); });
     217            3 :     return status;
     218              : }
     219              : 
     220            1 : bool GStreamerMSEMediaPlayerClient::getStats(int32_t sourceId, uint64_t &renderedFrames, uint64_t &droppedFrames)
     221              : {
     222            1 :     if (!m_clientBackend)
     223              :     {
     224            0 :         return false;
     225              :     }
     226              : 
     227            1 :     bool status{false};
     228            1 :     m_backendQueue->callInEventLoop([&]()
     229            1 :                                     { status = m_clientBackend->getStats(sourceId, renderedFrames, droppedFrames); });
     230            1 :     return status;
     231              : }
     232              : 
     233          167 : bool GStreamerMSEMediaPlayerClient::createBackend()
     234              : {
     235          167 :     bool result = false;
     236          334 :     m_backendQueue->callInEventLoop(
     237          167 :         [&]()
     238              :         {
     239          167 :             if (!m_clientBackend)
     240              :             {
     241            1 :                 GST_ERROR("Client backend is NULL");
     242            1 :                 result = false;
     243            1 :                 return;
     244              :             }
     245          166 :             m_clientBackend->createMediaPlayerBackend(shared_from_this(), m_maxWidth, m_maxHeight);
     246              : 
     247          166 :             if (m_clientBackend->isMediaPlayerBackendCreated())
     248              :             {
     249          161 :                 std::string utf8url = "mse://1";
     250          161 :                 firebolt::rialto::MediaType mediaType = firebolt::rialto::MediaType::MSE;
     251          483 :                 if (!m_clientBackend->load(mediaType, "", utf8url, m_isLive))
     252              :                 {
     253            1 :                     GST_ERROR("Could not load RialtoClient");
     254            1 :                     return;
     255              :                 }
     256          160 :                 result = true;
     257          161 :             }
     258              :             else
     259              :             {
     260            5 :                 GST_ERROR("Media player backend could not be created");
     261              :             }
     262              :         });
     263              : 
     264          167 :     return result;
     265              : }
     266              : 
     267           23 : StateChangeResult GStreamerMSEMediaPlayerClient::play(int32_t sourceId)
     268              : {
     269           23 :     StateChangeResult result = StateChangeResult::NOT_ATTACHED;
     270           46 :     m_backendQueue->callInEventLoop(
     271           23 :         [&]()
     272              :         {
     273           23 :             auto sourceIt = m_attachedSources.find(sourceId);
     274           23 :             if (sourceIt == m_attachedSources.end())
     275              :             {
     276            1 :                 GST_ERROR("Cannot play - there's no attached source with id %d", sourceId);
     277            1 :                 result = StateChangeResult::NOT_ATTACHED;
     278            4 :                 return;
     279              :             }
     280              : 
     281           22 :             if (m_serverPlaybackState == firebolt::rialto::PlaybackState::PLAYING ||
     282           20 :                 (m_serverPlaybackState == firebolt::rialto::PlaybackState::END_OF_STREAM && wasPlayingBeforeEos))
     283              :             {
     284            2 :                 GST_INFO("Server is already playing");
     285            2 :                 sourceIt->second.m_state = ClientState::PLAYING;
     286              : 
     287            6 :                 if (checkIfAllAttachedSourcesInStates({ClientState::PLAYING}))
     288              :                 {
     289            1 :                     m_clientState = ClientState::PLAYING;
     290              :                 }
     291              : 
     292            2 :                 result = StateChangeResult::SUCCESS_SYNC;
     293            2 :                 return;
     294              :             }
     295              : 
     296           20 :             sourceIt->second.m_state = ClientState::AWAITING_PLAYING;
     297              : 
     298           20 :             if (m_clientState == ClientState::PAUSED)
     299              :             {
     300              :                 // If one source is AWAITING_PLAYING, the other source can still be PLAYING.
     301              :                 // This happends when we are switching out audio.
     302           48 :                 if (checkIfAllAttachedSourcesInStates({ClientState::AWAITING_PLAYING, ClientState::PLAYING}))
     303              :                 {
     304           11 :                     GST_INFO("Sending play command");
     305           11 :                     bool async{true};
     306           11 :                     m_clientBackend->play(async);
     307           11 :                     m_clientState = ClientState::AWAITING_PLAYING;
     308           11 :                     if (!async)
     309              :                     {
     310              :                         // Synchronous playing state change. Finish procedure for other sources and return SUCCESS_SYNC
     311            1 :                         result = StateChangeResult::SUCCESS_SYNC;
     312            2 :                         m_backendQueue->postMessage(
     313            2 :                             std::make_shared<PlaybackStateMessage>(firebolt::rialto::PlaybackState::PLAYING, this));
     314            1 :                         return;
     315              :                     }
     316              :                 }
     317              :                 else
     318              :                 {
     319            5 :                     GST_DEBUG("Not all sources are ready to play");
     320              :                 }
     321              :             }
     322              :             else
     323              :             {
     324            4 :                 GST_WARNING("Not in PAUSED state in client state %u state; server playback state: %u",
     325              :                             static_cast<uint32_t>(m_clientState), static_cast<uint32_t>(m_serverPlaybackState));
     326              :             }
     327              : 
     328           19 :             result = StateChangeResult::SUCCESS_ASYNC;
     329           19 :             sourceIt->second.m_delegate->postAsyncStart();
     330              :         });
     331              : 
     332           23 :     return result;
     333              : }
     334              : 
     335          327 : StateChangeResult GStreamerMSEMediaPlayerClient::pause(int32_t sourceId)
     336              : {
     337          327 :     StateChangeResult result = StateChangeResult::NOT_ATTACHED;
     338          654 :     m_backendQueue->callInEventLoop(
     339          327 :         [&]()
     340              :         {
     341          327 :             auto sourceIt = m_attachedSources.find(sourceId);
     342          327 :             if (sourceIt == m_attachedSources.end())
     343              :             {
     344          153 :                 GST_WARNING("Cannot pause - there's no attached source with id %d", sourceId);
     345              : 
     346          153 :                 result = StateChangeResult::NOT_ATTACHED;
     347          153 :                 return;
     348              :             }
     349              : 
     350          174 :             if (m_serverPlaybackState == firebolt::rialto::PlaybackState::PAUSED &&
     351            4 :                 m_clientState != ClientState::AWAITING_PLAYING && m_clientState != ClientState::AWAITING_PAUSED)
     352              :             {
     353              :                 // if the server is already paused and we are not in async, we don't need to send pause command
     354            2 :                 GST_INFO("Server is already paused");
     355            2 :                 sourceIt->second.m_state = ClientState::PAUSED;
     356              : 
     357            6 :                 if (checkIfAllAttachedSourcesInStates({ClientState::PAUSED}))
     358              :                 {
     359            1 :                     m_clientState = ClientState::PAUSED;
     360              :                 }
     361              : 
     362            2 :                 result = StateChangeResult::SUCCESS_SYNC;
     363              :             }
     364              :             else
     365              :             {
     366          172 :                 sourceIt->second.m_state = ClientState::AWAITING_PAUSED;
     367              : 
     368          172 :                 bool shouldPause = false;
     369          172 :                 if (m_clientState == ClientState::READY)
     370              :                 {
     371          480 :                     if (checkIfAllAttachedSourcesInStates({ClientState::AWAITING_PAUSED}))
     372              :                     {
     373          151 :                         shouldPause = true;
     374              :                     }
     375              :                     else
     376              :                     {
     377            9 :                         GST_DEBUG("Not all attached sources are ready to pause");
     378              :                     }
     379              :                 }
     380           12 :                 else if (m_clientState == ClientState::AWAITING_PLAYING || m_clientState == ClientState::PLAYING)
     381              :                 {
     382            9 :                     shouldPause = true;
     383              :                 }
     384              :                 else
     385              :                 {
     386            3 :                     GST_DEBUG("Cannot pause in %u state", static_cast<uint32_t>(m_clientState));
     387              :                 }
     388              : 
     389          172 :                 if (shouldPause)
     390              :                 {
     391          160 :                     GST_INFO("Sending pause command in %u state", static_cast<uint32_t>(m_clientState));
     392          160 :                     m_clientBackend->pause();
     393          160 :                     m_clientState = ClientState::AWAITING_PAUSED;
     394              :                 }
     395              : 
     396          172 :                 result = StateChangeResult::SUCCESS_ASYNC;
     397          172 :                 sourceIt->second.m_delegate->postAsyncStart();
     398              :             }
     399              :         });
     400              : 
     401          327 :     return result;
     402              : }
     403              : 
     404          160 : void GStreamerMSEMediaPlayerClient::stop()
     405              : {
     406          320 :     m_backendQueue->callInEventLoop([&]() { m_clientBackend->stop(); });
     407          160 : }
     408              : 
     409            4 : void GStreamerMSEMediaPlayerClient::setPlaybackRate(double rate)
     410              : {
     411            8 :     m_backendQueue->callInEventLoop([&]() { m_clientBackend->setPlaybackRate(rate); });
     412            4 : }
     413              : 
     414           12 : void GStreamerMSEMediaPlayerClient::flush(int32_t sourceId, bool resetTime)
     415              : {
     416           12 :     m_flushAndDataSynchronizer.notifyFlushStarted(sourceId);
     417           24 :     m_backendQueue->callInEventLoop(
     418           12 :         [&]()
     419              :         {
     420           12 :             wasPlayingBeforeEos = false;
     421           12 :             bool async{true};
     422           12 :             auto sourceIt = m_attachedSources.find(sourceId);
     423           12 :             if (sourceIt == m_attachedSources.end())
     424              :             {
     425            2 :                 GST_ERROR("Cannot flush - there's no attached source with id %d", sourceId);
     426            3 :                 return;
     427              :             }
     428           10 :             if (!m_clientBackend->flush(sourceId, resetTime, async))
     429              :             {
     430            1 :                 GST_ERROR("Flush operation failed for source with id %d", sourceId);
     431            1 :                 return;
     432              :             }
     433            9 :             sourceIt->second.m_isFlushing = true;
     434              : 
     435            9 :             if (async)
     436              :             {
     437            9 :                 GST_INFO("Flush request sent for async source %d. Sink will lose state now", sourceId);
     438            9 :                 sourceIt->second.m_delegate->lostState();
     439              : 
     440            9 :                 sourceIt->second.m_state = ClientState::AWAITING_PAUSED;
     441            9 :                 if (m_clientState == ClientState::PLAYING)
     442              :                 {
     443            0 :                     m_clientState = ClientState::AWAITING_PLAYING;
     444              :                 }
     445            9 :                 else if (m_clientState == ClientState::PAUSED)
     446              :                 {
     447            4 :                     m_clientState = ClientState::AWAITING_PAUSED;
     448              :                 }
     449              :             }
     450              :         });
     451           12 : }
     452              : 
     453            9 : void GStreamerMSEMediaPlayerClient::setSourcePosition(int32_t sourceId, int64_t position, bool resetTime,
     454              :                                                       double appliedRate, uint64_t stopPosition)
     455              : {
     456           18 :     m_backendQueue->callInEventLoop(
     457            9 :         [&]()
     458              :         {
     459            9 :             auto sourceIt = m_attachedSources.find(sourceId);
     460            9 :             if (sourceIt == m_attachedSources.end())
     461              :             {
     462            1 :                 GST_ERROR("Cannot Set Source Position - there's no attached source with id %d", sourceId);
     463            2 :                 return;
     464              :             }
     465            8 :             if (!m_clientBackend->setSourcePosition(sourceId, position, resetTime, appliedRate, stopPosition))
     466              :             {
     467            1 :                 GST_ERROR("Set Source Position operation failed for source with id %d", sourceId);
     468            1 :                 return;
     469              :             }
     470            7 :             sourceIt->second.m_position = position;
     471              :         });
     472            9 : }
     473              : 
     474            1 : void GStreamerMSEMediaPlayerClient::setSubtitleOffset(int32_t sourceId, int64_t position)
     475              : {
     476            2 :     m_backendQueue->callInEventLoop(
     477            1 :         [&]()
     478              :         {
     479            1 :             auto sourceIt = m_attachedSources.find(sourceId);
     480            1 :             if (sourceIt == m_attachedSources.end())
     481              :             {
     482            0 :                 GST_ERROR("Cannot Set Subtitle Offset - there's no attached source with id %d", sourceId);
     483            0 :                 return;
     484              :             }
     485            1 :             if (!m_clientBackend->setSubtitleOffset(sourceId, position))
     486              :             {
     487            0 :                 GST_ERROR("Set Subtitle Offset operation failed for source with id %d", sourceId);
     488            0 :                 return;
     489              :             }
     490              :         });
     491            1 : }
     492              : 
     493            4 : void GStreamerMSEMediaPlayerClient::processAudioGap(int64_t position, uint32_t duration, int64_t discontinuityGap,
     494              :                                                     bool audioAac)
     495              : {
     496            8 :     m_backendQueue->callInEventLoop(
     497            4 :         [&]()
     498              :         {
     499            4 :             if (!m_clientBackend->processAudioGap(position, duration, discontinuityGap, audioAac))
     500              :             {
     501            1 :                 GST_ERROR("Process Audio Gap operation failed");
     502            1 :                 return;
     503              :             }
     504              :         });
     505            4 : }
     506              : 
     507          213 : bool GStreamerMSEMediaPlayerClient::attachSource(std::unique_ptr<firebolt::rialto::IMediaPipeline::MediaSource> &source,
     508              :                                                  RialtoMSEBaseSink *rialtoSink,
     509              :                                                  const std::shared_ptr<IPullModePlaybackDelegate> &delegate)
     510              : {
     511          213 :     if (source->getType() != firebolt::rialto::MediaSourceType::AUDIO &&
     512          225 :         source->getType() != firebolt::rialto::MediaSourceType::VIDEO &&
     513           12 :         source->getType() != firebolt::rialto::MediaSourceType::SUBTITLE)
     514              :     {
     515            1 :         GST_WARNING_OBJECT(rialtoSink, "Invalid source type %u", static_cast<uint32_t>(source->getType()));
     516            1 :         return false;
     517              :     }
     518              : 
     519          212 :     bool result = false;
     520          424 :     m_backendQueue->callInEventLoop(
     521          212 :         [&]()
     522              :         {
     523          212 :             result = m_clientBackend->attachSource(source);
     524              : 
     525          212 :             if (result)
     526              :             {
     527          211 :                 std::shared_ptr<BufferParser> bufferParser;
     528          211 :                 if (source->getType() == firebolt::rialto::MediaSourceType::AUDIO)
     529              :                 {
     530          157 :                     bufferParser = std::make_shared<AudioBufferParser>();
     531              :                 }
     532           54 :                 else if (source->getType() == firebolt::rialto::MediaSourceType::VIDEO)
     533              :                 {
     534           43 :                     bufferParser = std::make_shared<VideoBufferParser>();
     535              :                 }
     536           11 :                 else if (source->getType() == firebolt::rialto::MediaSourceType::SUBTITLE)
     537              :                 {
     538           11 :                     bufferParser = std::make_shared<SubtitleBufferParser>();
     539              :                 }
     540              : 
     541          211 :                 std::shared_ptr<BufferPuller> bufferPuller = std::make_shared<BufferPuller>(m_messageQueueFactory,
     542            0 :                                                                                             GST_ELEMENT_CAST(rialtoSink),
     543          211 :                                                                                             bufferParser, delegate);
     544              : 
     545          211 :                 if (m_attachedSources.find(source->getId()) == m_attachedSources.end())
     546              :                 {
     547          211 :                     m_attachedSources.emplace(source->getId(),
     548          422 :                                               AttachedSource(rialtoSink, bufferPuller, delegate, source->getType()));
     549          211 :                     delegate->setSourceId(source->getId());
     550          211 :                     m_flushAndDataSynchronizer.addSource(source->getId());
     551          211 :                     bufferPuller->start();
     552              :                 }
     553              :             }
     554              : 
     555          212 :             sendAllSourcesAttachedIfPossibleInternal();
     556          212 :         });
     557              : 
     558          212 :     return result;
     559              : }
     560              : 
     561            4 : void GStreamerMSEMediaPlayerClient::sendAllSourcesAttachedIfPossible()
     562              : {
     563            8 :     m_backendQueue->callInEventLoop([&]() { sendAllSourcesAttachedIfPossibleInternal(); });
     564            4 : }
     565              : 
     566          216 : void GStreamerMSEMediaPlayerClient::sendAllSourcesAttachedIfPossibleInternal()
     567              : {
     568          216 :     if (!m_wasAllSourcesAttachedSent && areAllStreamsAttached())
     569              :     {
     570              :         // RialtoServer doesn't support dynamic source attachment.
     571              :         // It means that when we notify that all sources were attached, we cannot add any more sources in the current session
     572          157 :         GST_INFO("All sources attached");
     573          157 :         m_clientBackend->allSourcesAttached();
     574          157 :         m_wasAllSourcesAttachedSent = true;
     575          157 :         m_clientState = ClientState::READY;
     576              : 
     577              :         // In playbin3 streams, confirmation about number of available sources comes after attaching the source,
     578              :         // so we need to check if all sources are ready to pause
     579          471 :         if (checkIfAllAttachedSourcesInStates({ClientState::AWAITING_PAUSED}))
     580              :         {
     581            1 :             GST_INFO("Sending pause command, because all attached sources are ready to pause");
     582            1 :             m_clientBackend->pause();
     583            1 :             m_clientState = ClientState::AWAITING_PAUSED;
     584              :         }
     585              :     }
     586          216 : }
     587              : 
     588          154 : void GStreamerMSEMediaPlayerClient::removeSource(int32_t sourceId)
     589              : {
     590          308 :     m_backendQueue->callInEventLoop(
     591          154 :         [&]()
     592              :         {
     593          154 :             if (!m_clientBackend->removeSource(sourceId))
     594              :             {
     595            1 :                 GST_WARNING("Remove source %d failed", sourceId);
     596              :             }
     597          154 :             m_attachedSources.erase(sourceId);
     598          154 :             m_flushAndDataSynchronizer.removeSource(sourceId);
     599          154 :         });
     600              : }
     601              : 
     602           61 : void GStreamerMSEMediaPlayerClient::handlePlaybackStateChange(firebolt::rialto::PlaybackState state)
     603              : {
     604           61 :     GST_DEBUG("Received state change to state %u", static_cast<uint32_t>(state));
     605          122 :     m_backendQueue->callInEventLoop(
     606           61 :         [&]()
     607              :         {
     608           61 :             const auto kPreviousState{m_serverPlaybackState};
     609           61 :             m_serverPlaybackState = state;
     610           61 :             switch (state)
     611              :             {
     612           54 :             case firebolt::rialto::PlaybackState::PAUSED:
     613              :             case firebolt::rialto::PlaybackState::PLAYING:
     614              :             {
     615           54 :                 wasPlayingBeforeEos = false;
     616           54 :                 if (state == firebolt::rialto::PlaybackState::PAUSED && m_clientState == ClientState::AWAITING_PAUSED)
     617              :                 {
     618           39 :                     m_clientState = ClientState::PAUSED;
     619              :                 }
     620           15 :                 else if (state == firebolt::rialto::PlaybackState::PLAYING &&
     621           13 :                          m_clientState == ClientState::AWAITING_PLAYING)
     622              :                 {
     623            9 :                     m_clientState = ClientState::PLAYING;
     624              :                 }
     625            6 :                 else if (state == firebolt::rialto::PlaybackState::PLAYING &&
     626            4 :                          m_clientState == ClientState::AWAITING_PAUSED)
     627              :                 {
     628            1 :                     GST_WARNING("Outdated Playback State change to PLAYING received. Discarding...");
     629            1 :                     break;
     630              :                 }
     631              : 
     632          119 :                 for (auto &source : m_attachedSources)
     633              :                 {
     634           66 :                     if (state == firebolt::rialto::PlaybackState::PAUSED &&
     635           49 :                         source.second.m_state == ClientState::AWAITING_PAUSED)
     636              :                     {
     637           46 :                         source.second.m_state = ClientState::PAUSED;
     638              :                     }
     639           20 :                     else if (state == firebolt::rialto::PlaybackState::PLAYING &&
     640           17 :                              source.second.m_state == ClientState::AWAITING_PLAYING)
     641              :                     {
     642           12 :                         source.second.m_state = ClientState::PLAYING;
     643              :                     }
     644           66 :                     source.second.m_delegate->handleStateChanged(state);
     645              :                 }
     646              : 
     647           53 :                 break;
     648              :             }
     649            4 :             case firebolt::rialto::PlaybackState::END_OF_STREAM:
     650              :             {
     651            4 :                 if (!wasPlayingBeforeEos && firebolt::rialto::PlaybackState::PLAYING == kPreviousState)
     652              :                 {
     653            2 :                     wasPlayingBeforeEos = true;
     654              :                 }
     655            8 :                 for (const auto &source : m_attachedSources)
     656              :                 {
     657            4 :                     source.second.m_delegate->handleEos();
     658              :                 }
     659              :             }
     660            4 :             break;
     661            1 :             case firebolt::rialto::PlaybackState::SEEK_DONE:
     662              :             {
     663            1 :                 GST_WARNING("firebolt::rialto::PlaybackState::SEEK_DONE notification not supported");
     664            1 :                 break;
     665              :             }
     666            1 :             case firebolt::rialto::PlaybackState::FAILURE:
     667              :             {
     668            1 :                 wasPlayingBeforeEos = false;
     669            2 :                 for (const auto &source : m_attachedSources)
     670              :                 {
     671            3 :                     source.second.m_delegate->handleError("Rialto server playback failed");
     672              :                 }
     673            2 :                 for (auto &source : m_attachedSources)
     674              :                 {
     675            1 :                     source.second.m_position = 0;
     676              :                 }
     677              :                 {
     678            1 :                     std::unique_lock lock{m_playbackInfoMutex};
     679            1 :                     m_playbackInfo.currentPosition = 0;
     680              :                 }
     681              : 
     682            1 :                 break;
     683              :             }
     684              :             break;
     685            1 :             default:
     686            1 :                 break;
     687              :             }
     688           61 :         });
     689              : }
     690              : 
     691            9 : void GStreamerMSEMediaPlayerClient::handleSourceFlushed(int32_t sourceId)
     692              : {
     693           18 :     m_backendQueue->callInEventLoop(
     694            9 :         [&]()
     695              :         {
     696            9 :             auto sourceIt = m_attachedSources.find(sourceId);
     697            9 :             if (sourceIt == m_attachedSources.end())
     698              :             {
     699            1 :                 GST_ERROR("Cannot finish flush - there's no attached source with id %d", sourceId);
     700            2 :                 return;
     701              :             }
     702            8 :             if (!sourceIt->second.m_isFlushing)
     703              :             {
     704            1 :                 GST_ERROR("Cannot finish flush - source with id %d is not flushing!", sourceId);
     705            1 :                 return;
     706              :             }
     707            7 :             sourceIt->second.m_isFlushing = false;
     708            7 :             sourceIt->second.m_delegate->handleFlushCompleted();
     709            7 :             m_flushAndDataSynchronizer.notifyFlushCompleted(sourceId);
     710              :         });
     711            9 : }
     712              : 
     713            7 : void GStreamerMSEMediaPlayerClient::setVideoRectangle(const std::string &rectangleString)
     714              : {
     715           14 :     m_backendQueue->callInEventLoop(
     716            7 :         [&]()
     717              :         {
     718            7 :             if (!m_clientBackend || !m_clientBackend->isMediaPlayerBackendCreated())
     719              :             {
     720            1 :                 GST_WARNING("Missing RialtoClient backend - can't set video window now");
     721            3 :                 return;
     722              :             }
     723              : 
     724            6 :             if (rectangleString.empty())
     725              :             {
     726            1 :                 GST_WARNING("Empty video rectangle string");
     727            1 :                 return;
     728              :             }
     729              : 
     730            5 :             Rectangle rect = {0, 0, 0, 0};
     731            5 :             if (sscanf(rectangleString.c_str(), "%u,%u,%u,%u", &rect.x, &rect.y, &rect.width, &rect.height) != 4)
     732              :             {
     733            1 :                 GST_WARNING("Invalid video rectangle values");
     734            1 :                 return;
     735              :             }
     736              : 
     737            4 :             m_clientBackend->setVideoWindow(rect.x, rect.y, rect.width, rect.height);
     738            4 :             m_videoRectangle = rect;
     739              :         });
     740            7 : }
     741              : 
     742            4 : std::string GStreamerMSEMediaPlayerClient::getVideoRectangle()
     743              : {
     744            4 :     char rectangle[64] = {0};
     745            8 :     m_backendQueue->callInEventLoop(
     746            8 :         [&]()
     747              :         {
     748            4 :             sprintf(rectangle, "%u,%u,%u,%u", m_videoRectangle.x, m_videoRectangle.y, m_videoRectangle.width,
     749              :                     m_videoRectangle.height);
     750            4 :         });
     751              : 
     752            8 :     return std::string(rectangle);
     753              : }
     754              : 
     755            4 : bool GStreamerMSEMediaPlayerClient::renderFrame(int32_t sourceId)
     756              : {
     757            4 :     bool result = false;
     758            8 :     m_backendQueue->callInEventLoop(
     759            4 :         [&]()
     760              :         {
     761            4 :             result = m_clientBackend->renderFrame();
     762            4 :             if (result)
     763              :             {
     764              :                 // RialtoServer's video sink should drop PAUSED state due to skipping prerolled buffer in PAUSED state
     765            3 :                 auto sourceIt = m_attachedSources.find(sourceId);
     766            3 :                 if (sourceIt != m_attachedSources.end())
     767              :                 {
     768            3 :                     sourceIt->second.m_delegate->lostState();
     769              :                 }
     770              :             }
     771            4 :         });
     772            4 :     return result;
     773              : }
     774              : 
     775            6 : void GStreamerMSEMediaPlayerClient::setVolume(double targetVolume, uint32_t volumeDuration,
     776              :                                               firebolt::rialto::EaseType easeType)
     777              : {
     778           12 :     m_backendQueue->callInEventLoop(
     779            6 :         [&]()
     780              :         {
     781            6 :             m_clientBackend->setVolume(targetVolume, volumeDuration, easeType);
     782            6 :             std::unique_lock lock{m_playbackInfoMutex};
     783            6 :             m_playbackInfo.volume = targetVolume;
     784            6 :         });
     785              : }
     786              : 
     787            2 : bool GStreamerMSEMediaPlayerClient::getVolume(double &volume)
     788              : {
     789            2 :     bool success{false};
     790            4 :     m_backendQueue->callInEventLoop([&]() { success = m_clientBackend->getVolume(volume); });
     791            2 :     return success;
     792              : }
     793              : 
     794            2 : bool GStreamerMSEMediaPlayerClient::getCachedVolume(double &volume)
     795              : {
     796            2 :     std::unique_lock lock{m_playbackInfoMutex};
     797            2 :     volume = m_playbackInfo.volume;
     798            2 :     return true;
     799              : }
     800              : 
     801            7 : void GStreamerMSEMediaPlayerClient::setMute(bool mute, int32_t sourceId)
     802              : {
     803           14 :     m_backendQueue->callInEventLoop([&]() { m_clientBackend->setMute(mute, sourceId); });
     804            7 : }
     805              : 
     806            3 : bool GStreamerMSEMediaPlayerClient::getMute(int sourceId)
     807              : {
     808            3 :     bool mute{false};
     809            6 :     m_backendQueue->callInEventLoop([&]() { m_clientBackend->getMute(mute, sourceId); });
     810              : 
     811            3 :     return mute;
     812              : }
     813              : 
     814            3 : void GStreamerMSEMediaPlayerClient::setTextTrackIdentifier(const std::string &textTrackIdentifier)
     815              : {
     816            6 :     m_backendQueue->callInEventLoop([&]() { m_clientBackend->setTextTrackIdentifier(textTrackIdentifier); });
     817            3 : }
     818              : 
     819            2 : std::string GStreamerMSEMediaPlayerClient::getTextTrackIdentifier()
     820              : {
     821            2 :     std::string getTextTrackIdentifier;
     822            4 :     m_backendQueue->callInEventLoop([&]() { m_clientBackend->getTextTrackIdentifier(getTextTrackIdentifier); });
     823            2 :     return getTextTrackIdentifier;
     824              : }
     825              : 
     826            6 : bool GStreamerMSEMediaPlayerClient::setLowLatency(bool lowLatency)
     827              : {
     828            6 :     if (!m_clientBackend)
     829              :     {
     830            1 :         return false;
     831              :     }
     832              : 
     833            5 :     bool status{false};
     834           10 :     m_backendQueue->callInEventLoop([&]() { status = m_clientBackend->setLowLatency(lowLatency); });
     835            5 :     return status;
     836              : }
     837              : 
     838            6 : bool GStreamerMSEMediaPlayerClient::setSync(bool sync)
     839              : {
     840            6 :     if (!m_clientBackend)
     841              :     {
     842            1 :         return false;
     843              :     }
     844              : 
     845            5 :     bool status{false};
     846           10 :     m_backendQueue->callInEventLoop([&]() { status = m_clientBackend->setSync(sync); });
     847            5 :     return status;
     848              : }
     849              : 
     850            4 : bool GStreamerMSEMediaPlayerClient::getSync(bool &sync)
     851              : {
     852            4 :     if (!m_clientBackend)
     853              :     {
     854            1 :         return false;
     855              :     }
     856              : 
     857            3 :     bool status{false};
     858            6 :     m_backendQueue->callInEventLoop([&]() { status = m_clientBackend->getSync(sync); });
     859            3 :     return status;
     860              : }
     861              : 
     862            6 : bool GStreamerMSEMediaPlayerClient::setSyncOff(bool syncOff)
     863              : {
     864            6 :     if (!m_clientBackend)
     865              :     {
     866            1 :         return false;
     867              :     }
     868              : 
     869            5 :     bool status{false};
     870           10 :     m_backendQueue->callInEventLoop([&]() { status = m_clientBackend->setSyncOff(syncOff); });
     871            5 :     return status;
     872              : }
     873              : 
     874           10 : bool GStreamerMSEMediaPlayerClient::setStreamSyncMode(int32_t sourceId, int32_t streamSyncMode)
     875              : {
     876           10 :     if (!m_clientBackend)
     877              :     {
     878            1 :         return false;
     879              :     }
     880              : 
     881            9 :     bool status{false};
     882           18 :     m_backendQueue->callInEventLoop([&]() { status = m_clientBackend->setStreamSyncMode(sourceId, streamSyncMode); });
     883            9 :     return status;
     884              : }
     885              : 
     886            4 : bool GStreamerMSEMediaPlayerClient::getStreamSyncMode(int32_t &streamSyncMode)
     887              : {
     888            4 :     if (!m_clientBackend)
     889              :     {
     890            1 :         return false;
     891              :     }
     892              : 
     893            3 :     bool status{false};
     894            6 :     m_backendQueue->callInEventLoop([&]() { status = m_clientBackend->getStreamSyncMode(streamSyncMode); });
     895            3 :     return status;
     896              : }
     897              : 
     898           44 : ClientState GStreamerMSEMediaPlayerClient::getClientState()
     899              : {
     900           44 :     ClientState state{ClientState::IDLE};
     901           44 :     m_backendQueue->callInEventLoop([&]() { state = m_clientState; });
     902           44 :     return state;
     903              : }
     904              : 
     905          174 : void GStreamerMSEMediaPlayerClient::handleStreamCollection(int32_t audioStreams, int32_t videoStreams,
     906              :                                                            int32_t subtitleStreams)
     907              : {
     908          348 :     m_backendQueue->callInEventLoop(
     909          174 :         [&]()
     910              :         {
     911          174 :             if (m_audioStreams == UNKNOWN_STREAMS_NUMBER)
     912          171 :                 m_audioStreams = audioStreams;
     913          174 :             if (m_videoStreams == UNKNOWN_STREAMS_NUMBER)
     914          170 :                 m_videoStreams = videoStreams;
     915          174 :             if (m_subtitleStreams == UNKNOWN_STREAMS_NUMBER)
     916          170 :                 m_subtitleStreams = subtitleStreams;
     917              : 
     918          174 :             GST_INFO("Updated number of streams. New streams' numbers; video=%d, audio=%d, text=%d", m_videoStreams,
     919              :                      m_audioStreams, m_subtitleStreams);
     920          174 :         });
     921              : }
     922              : 
     923            3 : void GStreamerMSEMediaPlayerClient::setBufferingLimit(uint32_t limitBufferingMs)
     924              : {
     925            3 :     if (!m_clientBackend)
     926              :     {
     927            0 :         return;
     928              :     }
     929            6 :     m_backendQueue->callInEventLoop([&]() { m_clientBackend->setBufferingLimit(limitBufferingMs); });
     930              : }
     931              : 
     932            2 : uint32_t GStreamerMSEMediaPlayerClient::getBufferingLimit()
     933              : {
     934            2 :     if (!m_clientBackend)
     935              :     {
     936            0 :         return kDefaultBufferingLimit;
     937              :     }
     938              : 
     939            2 :     uint32_t result{kDefaultBufferingLimit};
     940            4 :     m_backendQueue->callInEventLoop([&]() { m_clientBackend->getBufferingLimit(result); });
     941            2 :     return result;
     942              : }
     943              : 
     944            3 : void GStreamerMSEMediaPlayerClient::setUseBuffering(bool useBuffering)
     945              : {
     946            3 :     if (!m_clientBackend)
     947              :     {
     948            0 :         return;
     949              :     }
     950            6 :     m_backendQueue->callInEventLoop([&]() { m_clientBackend->setUseBuffering(useBuffering); });
     951              : }
     952              : 
     953            2 : bool GStreamerMSEMediaPlayerClient::getUseBuffering()
     954              : {
     955            2 :     if (!m_clientBackend)
     956              :     {
     957            0 :         return kDefaultUseBuffering;
     958              :     }
     959              : 
     960            2 :     bool result{kDefaultUseBuffering};
     961            4 :     m_backendQueue->callInEventLoop([&]() { m_clientBackend->getUseBuffering(result); });
     962            2 :     return result;
     963              : }
     964              : 
     965            3 : bool GStreamerMSEMediaPlayerClient::switchSource(const std::unique_ptr<firebolt::rialto::IMediaPipeline::MediaSource> &source)
     966              : {
     967            3 :     bool result = false;
     968            6 :     m_backendQueue->callInEventLoop([&]() { result = m_clientBackend->switchSource(source); });
     969              : 
     970            3 :     return result;
     971              : }
     972              : 
     973           36 : IFlushAndDataSynchronizer &GStreamerMSEMediaPlayerClient::getFlushAndDataSynchronizer()
     974              : {
     975           36 :     return m_flushAndDataSynchronizer;
     976              : }
     977              : 
     978          337 : bool GStreamerMSEMediaPlayerClient::checkIfAllAttachedSourcesInStates(const std::vector<ClientState> &states)
     979              : {
     980          674 :     return std::all_of(m_attachedSources.begin(), m_attachedSources.end(), [states](const auto &source)
     981         1026 :                        { return std::find(states.begin(), states.end(), source.second.m_state) != states.end(); });
     982              : }
     983              : 
     984          215 : bool GStreamerMSEMediaPlayerClient::areAllStreamsAttached()
     985              : {
     986          215 :     int32_t attachedVideoSources = 0;
     987          215 :     int32_t attachedAudioSources = 0;
     988          215 :     int32_t attachedSubtitleSources = 0;
     989          441 :     for (auto &source : m_attachedSources)
     990              :     {
     991          226 :         if (source.second.getType() == firebolt::rialto::MediaSourceType::VIDEO)
     992              :         {
     993           43 :             attachedVideoSources++;
     994              :         }
     995          183 :         else if (source.second.getType() == firebolt::rialto::MediaSourceType::AUDIO)
     996              :         {
     997          172 :             attachedAudioSources++;
     998              :         }
     999           11 :         else if (source.second.getType() == firebolt::rialto::MediaSourceType::SUBTITLE)
    1000              :         {
    1001           11 :             attachedSubtitleSources++;
    1002              :         }
    1003              :     }
    1004              : 
    1005          372 :     return attachedVideoSources == m_videoStreams && attachedAudioSources == m_audioStreams &&
    1006          372 :            attachedSubtitleSources == m_subtitleStreams;
    1007              : }
    1008              : 
    1009           10 : bool GStreamerMSEMediaPlayerClient::requestPullBuffer(int streamId, size_t frameCount, unsigned int needDataRequestId)
    1010              : {
    1011           10 :     bool result = false;
    1012           20 :     m_backendQueue->callInEventLoop(
    1013           10 :         [&]()
    1014              :         {
    1015           10 :             auto sourceIt = m_attachedSources.find(streamId);
    1016           10 :             if (sourceIt == m_attachedSources.end())
    1017              :             {
    1018            1 :                 GST_ERROR("There's no attached source with id %d", streamId);
    1019              : 
    1020            1 :                 result = false;
    1021            1 :                 return;
    1022              :             }
    1023            9 :             result = sourceIt->second.m_bufferPuller->requestPullBuffer(streamId, frameCount, needDataRequestId, this);
    1024              :         });
    1025              : 
    1026           10 :     return result;
    1027              : }
    1028              : 
    1029            5 : bool GStreamerMSEMediaPlayerClient::handleQos(int sourceId, firebolt::rialto::QosInfo qosInfo)
    1030              : {
    1031            5 :     bool result = false;
    1032           10 :     m_backendQueue->callInEventLoop(
    1033            5 :         [&]()
    1034              :         {
    1035            5 :             auto sourceIt = m_attachedSources.find(sourceId);
    1036            5 :             if (sourceIt == m_attachedSources.end())
    1037              :             {
    1038            1 :                 result = false;
    1039            1 :                 return;
    1040              :             }
    1041            4 :             sourceIt->second.m_delegate->handleQos(qosInfo.processed, qosInfo.dropped);
    1042            4 :             result = true;
    1043              :         });
    1044              : 
    1045            5 :     return result;
    1046              : }
    1047              : 
    1048            2 : bool GStreamerMSEMediaPlayerClient::handleBufferUnderflow(int sourceId)
    1049              : {
    1050            2 :     bool result = false;
    1051            4 :     m_backendQueue->callInEventLoop(
    1052            2 :         [&]()
    1053              :         {
    1054            2 :             auto sourceIt = m_attachedSources.find(sourceId);
    1055            2 :             if (sourceIt == m_attachedSources.end())
    1056              :             {
    1057            1 :                 result = false;
    1058            1 :                 return;
    1059              :             }
    1060              : 
    1061            1 :             rialto_mse_base_handle_rialto_server_sent_buffer_underflow(sourceIt->second.m_rialtoSink);
    1062              : 
    1063            1 :             result = true;
    1064              :         });
    1065              : 
    1066            2 :     return result;
    1067              : }
    1068              : 
    1069            2 : bool GStreamerMSEMediaPlayerClient::handleFirstFrameReceived(int sourceId)
    1070              : {
    1071            2 :     bool result = false;
    1072            4 :     m_backendQueue->callInEventLoop(
    1073            2 :         [&]()
    1074              :         {
    1075            2 :             auto sourceIt = m_attachedSources.find(sourceId);
    1076            2 :             if (sourceIt == m_attachedSources.end())
    1077              :             {
    1078            1 :                 result = false;
    1079            1 :                 return;
    1080              :             }
    1081            1 :             if (sourceIt->second.getType() == firebolt::rialto::MediaSourceType::VIDEO)
    1082              :             {
    1083            1 :                 rialto_mse_base_handle_rialto_server_sent_first_video_frame_received(sourceIt->second.m_rialtoSink);
    1084            1 :                 result = true;
    1085              :             }
    1086              :         });
    1087              : 
    1088            2 :     return result;
    1089              : }
    1090              : 
    1091            7 : bool GStreamerMSEMediaPlayerClient::handlePlaybackError(int sourceId, firebolt::rialto::PlaybackError error)
    1092              : {
    1093            7 :     bool result = false;
    1094           14 :     m_backendQueue->callInEventLoop(
    1095            7 :         [&]()
    1096              :         {
    1097            7 :             auto sourceIt = m_attachedSources.find(sourceId);
    1098            7 :             if (sourceIt == m_attachedSources.end())
    1099              :             {
    1100            2 :                 result = false;
    1101            2 :                 return;
    1102              :             }
    1103              : 
    1104              :             // OUTPUT_PROTECTION is handled separately by posting an application message (not a pipeline error)
    1105            5 :             if (firebolt::rialto::PlaybackError::OUTPUT_PROTECTION == error)
    1106              :             {
    1107            1 :                 GST_WARNING("HDCP output protection failure, posting HDCPProtectionFailure application message");
    1108            1 :                 GstStructure *hdcpMsg = gst_structure_new("HDCPProtectionFailure", "message", G_TYPE_STRING,
    1109              :                                                           "HDCP Output Protection Error", "error", G_TYPE_STRING,
    1110              :                                                           toString(error), nullptr);
    1111              :                 GstMessage *message =
    1112            1 :                     gst_message_new_application(GST_OBJECT_CAST(sourceIt->second.m_rialtoSink), hdcpMsg);
    1113            1 :                 result = gst_element_post_message(GST_ELEMENT_CAST(sourceIt->second.m_rialtoSink), message);
    1114            1 :                 if (!result)
    1115              :                 {
    1116            0 :                     GST_WARNING("Failed to post HDCPProtectionFailure application message");
    1117              :                 }
    1118              :             }
    1119              :             else
    1120              :             {
    1121              :                 // For other playback errors, fail the pipeline from rialto-gstreamer
    1122            4 :                 GST_ERROR("Received Playback error '%s', posting error on %s sink", toString(error),
    1123              :                           toString(sourceIt->second.getType()));
    1124            4 :                 if (firebolt::rialto::PlaybackError::DECRYPTION == error)
    1125              :                 {
    1126            6 :                     sourceIt->second.m_delegate->handleError("Rialto dropped a frame that failed to decrypt",
    1127              :                                                              GST_STREAM_ERROR_DECRYPT);
    1128              :                 }
    1129              :                 else
    1130              :                 {
    1131            6 :                     sourceIt->second.m_delegate->handleError("Rialto server playback failed");
    1132              :                 }
    1133              : 
    1134            4 :                 result = true;
    1135              :             }
    1136              :         });
    1137              : 
    1138            7 :     return result;
    1139              : }
    1140              : 
    1141            4 : firebolt::rialto::AddSegmentStatus GStreamerMSEMediaPlayerClient::addSegment(
    1142              :     unsigned int needDataRequestId, const std::unique_ptr<firebolt::rialto::IMediaPipeline::MediaSegment> &mediaSegment)
    1143              : {
    1144              :     // rialto client's addSegment call is MT safe, so it's ok to call it from the Puller's thread
    1145            4 :     return m_clientBackend->addSegment(needDataRequestId, mediaSegment);
    1146              : }
    1147              : 
    1148          211 : BufferPuller::BufferPuller(const std::shared_ptr<IMessageQueueFactory> &messageQueueFactory, GstElement *rialtoSink,
    1149              :                            const std::shared_ptr<BufferParser> &bufferParser,
    1150          211 :                            const std::shared_ptr<IPullModePlaybackDelegate> &delegate)
    1151          211 :     : m_queue{messageQueueFactory->createMessageQueue()}, m_rialtoSink(rialtoSink), m_bufferParser(bufferParser),
    1152          211 :       m_delegate{delegate}
    1153              : {
    1154              : }
    1155              : 
    1156          211 : void BufferPuller::start()
    1157              : {
    1158          211 :     m_queue->start();
    1159              : }
    1160              : 
    1161           68 : void BufferPuller::stop()
    1162              : {
    1163           68 :     m_queue->stop();
    1164              : }
    1165              : 
    1166            9 : bool BufferPuller::requestPullBuffer(int sourceId, size_t frameCount, unsigned int needDataRequestId,
    1167              :                                      GStreamerMSEMediaPlayerClient *player)
    1168              : {
    1169           27 :     return m_queue->postMessage(std::make_shared<PullBufferMessage>(sourceId, frameCount, needDataRequestId, m_rialtoSink,
    1170           27 :                                                                     m_bufferParser, *m_queue, player, m_delegate));
    1171              : }
    1172              : 
    1173           10 : HaveDataMessage::HaveDataMessage(firebolt::rialto::MediaSourceStatus status, int sourceId,
    1174           10 :                                  unsigned int needDataRequestId, GStreamerMSEMediaPlayerClient *player)
    1175           10 :     : m_status(status), m_sourceId(sourceId), m_needDataRequestId(needDataRequestId), m_player(player)
    1176              : {
    1177              : }
    1178              : 
    1179           10 : void HaveDataMessage::handle()
    1180              : {
    1181           10 :     if (m_player->m_attachedSources.find(m_sourceId) == m_player->m_attachedSources.end())
    1182              :     {
    1183            1 :         GST_WARNING("Source id %d is invalid", m_sourceId);
    1184            1 :         return;
    1185              :     }
    1186              : 
    1187            9 :     m_player->m_clientBackend->haveData(m_status, m_needDataRequestId);
    1188              : }
    1189              : 
    1190            9 : PullBufferMessage::PullBufferMessage(int sourceId, size_t frameCount, unsigned int needDataRequestId,
    1191              :                                      GstElement *rialtoSink, const std::shared_ptr<BufferParser> &bufferParser,
    1192              :                                      IMessageQueue &pullerQueue, GStreamerMSEMediaPlayerClient *player,
    1193            9 :                                      const std::shared_ptr<IPullModePlaybackDelegate> &delegate)
    1194            9 :     : m_sourceId(sourceId), m_frameCount(frameCount), m_needDataRequestId(needDataRequestId), m_rialtoSink(rialtoSink),
    1195            9 :       m_bufferParser(bufferParser), m_pullerQueue(pullerQueue), m_player(player), m_delegate{delegate}
    1196              : {
    1197              : }
    1198              : 
    1199            8 : void PullBufferMessage::handle()
    1200              : {
    1201            8 :     bool isEos = false;
    1202            8 :     unsigned int addedSegments = 0;
    1203              : 
    1204           10 :     for (unsigned int frame = 0; frame < m_frameCount; ++frame)
    1205              :     {
    1206            8 :         if (!m_delegate->isReadyToSendData())
    1207              :         {
    1208            2 :             GST_INFO_OBJECT(m_rialtoSink, "Not ready to send data - segment or eos not received yet");
    1209            6 :             break;
    1210              :         }
    1211            6 :         GstRefSample sample = m_delegate->getFrontSample();
    1212            6 :         if (!sample)
    1213              :         {
    1214            3 :             if (m_delegate->isEos())
    1215              :             {
    1216            1 :                 isEos = true;
    1217              :             }
    1218              :             else
    1219              :             {
    1220              :                 // it's not a critical issue. It might be caused by receiving too many need data requests.
    1221            2 :                 GST_INFO_OBJECT(m_rialtoSink, "Could not get a sample");
    1222              :             }
    1223            3 :             break;
    1224              :         }
    1225              : 
    1226              :         // we pass GstMapInfo's pointers on data buffers to RialtoClient
    1227              :         // so we need to hold it until RialtoClient copies them to shm
    1228            3 :         GstBuffer *buffer = sample.getBuffer();
    1229              :         GstMapInfo map;
    1230            3 :         if (!gst_buffer_map(buffer, &map, GST_MAP_READ))
    1231              :         {
    1232            0 :             GST_ERROR_OBJECT(m_rialtoSink, "Could not map buffer");
    1233            0 :             m_delegate->popSample();
    1234            0 :             continue;
    1235              :         }
    1236              : 
    1237              :         std::unique_ptr<firebolt::rialto::IMediaPipeline::MediaSegment> mseData =
    1238            3 :             m_bufferParser->parseBuffer(sample, buffer, map, m_sourceId);
    1239            3 :         if (!mseData)
    1240              :         {
    1241            0 :             GST_ERROR_OBJECT(m_rialtoSink, "No data returned from the parser");
    1242            0 :             gst_buffer_unmap(buffer, &map);
    1243            0 :             m_delegate->popSample();
    1244            0 :             continue;
    1245              :         }
    1246              : 
    1247            3 :         firebolt::rialto::AddSegmentStatus addSegmentStatus = m_player->addSegment(m_needDataRequestId, mseData);
    1248            3 :         if (addSegmentStatus == firebolt::rialto::AddSegmentStatus::NO_SPACE)
    1249              :         {
    1250            1 :             gst_buffer_unmap(buffer, &map);
    1251            1 :             GST_INFO_OBJECT(m_rialtoSink, "There's no space to add sample");
    1252            1 :             break;
    1253              :         }
    1254              : 
    1255            2 :         gst_buffer_unmap(buffer, &map);
    1256            2 :         m_delegate->popSample();
    1257            2 :         addedSegments++;
    1258            7 :     }
    1259              : 
    1260            8 :     firebolt::rialto::MediaSourceStatus status = firebolt::rialto::MediaSourceStatus::OK;
    1261            8 :     if (isEos)
    1262              :     {
    1263            1 :         status = firebolt::rialto::MediaSourceStatus::EOS;
    1264              :     }
    1265            7 :     else if (addedSegments == 0)
    1266              :     {
    1267            5 :         status = firebolt::rialto::MediaSourceStatus::NO_AVAILABLE_SAMPLES;
    1268              :     }
    1269              : 
    1270            8 :     if (firebolt::rialto::MediaSourceStatus::OK == status || firebolt::rialto::MediaSourceStatus::EOS == status)
    1271              :     {
    1272            3 :         m_player->getFlushAndDataSynchronizer().notifyDataPushed(m_sourceId);
    1273              :     }
    1274              : 
    1275           16 :     m_player->m_backendQueue->postMessage(
    1276           16 :         std::make_shared<HaveDataMessage>(status, m_sourceId, m_needDataRequestId, m_player));
    1277            8 : }
    1278              : 
    1279           10 : NeedDataMessage::NeedDataMessage(int sourceId, size_t frameCount, unsigned int needDataRequestId,
    1280           10 :                                  GStreamerMSEMediaPlayerClient *player)
    1281           10 :     : m_sourceId(sourceId), m_frameCount(frameCount), m_needDataRequestId(needDataRequestId), m_player(player)
    1282              : {
    1283              : }
    1284              : 
    1285           10 : void NeedDataMessage::handle()
    1286              : {
    1287           10 :     if (!m_player->requestPullBuffer(m_sourceId, m_frameCount, m_needDataRequestId))
    1288              :     {
    1289            2 :         GST_ERROR("Failed to pull buffer for sourceId=%d and NeedDataRequestId %u", m_sourceId, m_needDataRequestId);
    1290            4 :         m_player->m_backendQueue->postMessage(
    1291            2 :             std::make_shared<HaveDataMessage>(firebolt::rialto::MediaSourceStatus::ERROR, m_sourceId,
    1292            2 :                                               m_needDataRequestId, m_player));
    1293              :     }
    1294           10 : }
    1295              : 
    1296           61 : PlaybackStateMessage::PlaybackStateMessage(firebolt::rialto::PlaybackState state, GStreamerMSEMediaPlayerClient *player)
    1297           61 :     : m_state(state), m_player(player)
    1298              : {
    1299              : }
    1300              : 
    1301           61 : void PlaybackStateMessage::handle()
    1302              : {
    1303           61 :     m_player->handlePlaybackStateChange(m_state);
    1304              : }
    1305              : 
    1306            5 : QosMessage::QosMessage(int sourceId, firebolt::rialto::QosInfo qosInfo, GStreamerMSEMediaPlayerClient *player)
    1307            5 :     : m_sourceId(sourceId), m_qosInfo(qosInfo), m_player(player)
    1308              : {
    1309              : }
    1310              : 
    1311            5 : void QosMessage::handle()
    1312              : {
    1313            5 :     if (!m_player->handleQos(m_sourceId, m_qosInfo))
    1314              :     {
    1315            1 :         GST_ERROR("Failed to handle qos for sourceId=%d", m_sourceId);
    1316              :     }
    1317            5 : }
    1318              : 
    1319            2 : BufferUnderflowMessage::BufferUnderflowMessage(int sourceId, GStreamerMSEMediaPlayerClient *player)
    1320            2 :     : m_sourceId(sourceId), m_player(player)
    1321              : {
    1322              : }
    1323              : 
    1324            2 : void BufferUnderflowMessage::handle()
    1325              : {
    1326            2 :     if (!m_player->handleBufferUnderflow(m_sourceId))
    1327              :     {
    1328            1 :         GST_ERROR("Failed to handle buffer underflow for sourceId=%d", m_sourceId);
    1329              :     }
    1330            2 : }
    1331              : 
    1332            2 : FirstFrameReceivedMessage::FirstFrameReceivedMessage(int sourceId, GStreamerMSEMediaPlayerClient *player)
    1333            2 :     : m_sourceId(sourceId), m_player(player)
    1334              : {
    1335              : }
    1336              : 
    1337            2 : void FirstFrameReceivedMessage::handle()
    1338              : {
    1339            2 :     if (!m_player->handleFirstFrameReceived(m_sourceId))
    1340              :     {
    1341            1 :         GST_ERROR("Failed to handle first frame received for sourceId=%d", m_sourceId);
    1342              :     }
    1343            2 : }
    1344              : 
    1345            7 : PlaybackErrorMessage::PlaybackErrorMessage(int sourceId, firebolt::rialto::PlaybackError error,
    1346            7 :                                            GStreamerMSEMediaPlayerClient *player)
    1347            7 :     : m_sourceId(sourceId), m_error(error), m_player(player)
    1348              : {
    1349              : }
    1350              : 
    1351            7 : void PlaybackErrorMessage::handle()
    1352              : {
    1353            7 :     if (!m_player->handlePlaybackError(m_sourceId, m_error))
    1354              :     {
    1355            2 :         GST_ERROR("Failed to handle playback error for sourceId=%d, error %s", m_sourceId, toString(m_error));
    1356              :     }
    1357            7 : }
    1358              : 
    1359            1 : SetPositionMessage::SetPositionMessage(int64_t newPosition, std::unordered_map<int32_t, AttachedSource> &attachedSources)
    1360            1 :     : m_newPosition(newPosition), m_attachedSources(attachedSources)
    1361              : {
    1362              : }
    1363              : 
    1364            1 : void SetPositionMessage::handle()
    1365              : {
    1366            2 :     for (auto &source : m_attachedSources)
    1367              :     {
    1368            1 :         source.second.setPosition(m_newPosition);
    1369              :     }
    1370              : }
    1371              : 
    1372            1 : SetDurationMessage::SetDurationMessage(int64_t newDuration, int64_t &targetDuration)
    1373            1 :     : m_newDuration(newDuration), m_targetDuration(targetDuration)
    1374              : {
    1375              : }
    1376              : 
    1377            1 : void SetDurationMessage::handle()
    1378              : {
    1379            1 :     m_targetDuration = m_newDuration;
    1380              : }
    1381              : 
    1382            9 : SourceFlushedMessage::SourceFlushedMessage(int32_t sourceId, GStreamerMSEMediaPlayerClient *player)
    1383            9 :     : m_sourceId{sourceId}, m_player{player}
    1384              : {
    1385              : }
    1386              : 
    1387            9 : void SourceFlushedMessage::handle()
    1388              : {
    1389            9 :     m_player->handleSourceFlushed(m_sourceId);
    1390              : }
        

Generated by: LCOV version 2.0-1