Line data Source code
1 : /*
2 : * Copyright (C) 2026 Sky UK
3 : *
4 : * This library is free software; you can redistribute it and/or
5 : * modify it under the terms of the GNU Lesser General Public
6 : * License as published by the Free Software Foundation;
7 : * version 2.1 of the License.
8 : *
9 : * This library is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 : * Lesser General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU Lesser General Public
15 : * License along with this library; if not, write to the Free Software
16 : * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 : */
18 :
19 : #include "FlushAndDataSynchronizer.h"
20 : #include "GStreamerMSEMediaPlayerClient.h"
21 : #include "GstreamerCatLog.h"
22 : #include <algorithm>
23 : #include <gst/gst.h>
24 :
25 : #define GST_CAT_DEFAULT rialtoGStreamerCat
26 :
27 213 : void FlushAndDataSynchronizer::addSource(int32_t sourceId)
28 : {
29 213 : std::unique_lock lock(m_mutex);
30 213 : m_sourceStates[sourceId] = {FlushState::IDLE, DataState::NO_DATA};
31 213 : GST_INFO("Added source %d to FlushAndDataSynchronizer", sourceId);
32 : }
33 :
34 159 : void FlushAndDataSynchronizer::removeSource(int32_t sourceId)
35 : {
36 159 : std::unique_lock lock(m_mutex);
37 159 : m_sourceStates.erase(sourceId);
38 159 : m_cv.notify_all();
39 159 : GST_INFO("Removed source %d from FlushAndDataSynchronizer", sourceId);
40 : }
41 :
42 17 : void FlushAndDataSynchronizer::notifyFlushStarted(int32_t sourceId)
43 : {
44 17 : std::unique_lock lock(m_mutex);
45 17 : m_sourceStates[sourceId].flushState = FlushState::FLUSHING;
46 17 : m_sourceStates[sourceId].dataState = DataState::NO_DATA;
47 17 : GST_INFO("FlushAndDataSynchronizer: Flush started for source %d", sourceId);
48 : }
49 :
50 10 : void FlushAndDataSynchronizer::notifyFlushCompleted(int32_t sourceId)
51 : {
52 10 : std::unique_lock lock(m_mutex);
53 10 : m_sourceStates[sourceId].flushState = FlushState::FLUSHED;
54 10 : m_cv.notify_all();
55 10 : GST_INFO("FlushAndDataSynchronizer: Flush completed for source %d", sourceId);
56 : }
57 :
58 33 : void FlushAndDataSynchronizer::notifyDataReceived(int32_t sourceId)
59 : {
60 33 : std::unique_lock lock(m_mutex);
61 33 : if (m_sourceStates[sourceId].dataState == DataState::NO_DATA)
62 : {
63 8 : m_sourceStates[sourceId].dataState = DataState::DATA_RECEIVED;
64 8 : GST_INFO("FlushAndDataSynchronizer: Data received for source %d", sourceId);
65 : }
66 33 : }
67 :
68 7 : void FlushAndDataSynchronizer::notifyDataPushed(int32_t sourceId)
69 : {
70 7 : std::unique_lock lock(m_mutex);
71 7 : m_sourceStates[sourceId].dataState = DataState::DATA_PUSHED;
72 7 : m_sourceStates[sourceId].flushState = FlushState::IDLE;
73 7 : m_cv.notify_all();
74 7 : GST_INFO("FlushAndDataSynchronizer: Data pushed for source %d", sourceId);
75 : }
76 :
77 16 : void FlushAndDataSynchronizer::waitIfRequired(int32_t sourceId)
78 : {
79 16 : std::unique_lock lock(m_mutex);
80 16 : GST_INFO("FlushAndDataSynchronizer: waitIfRequired enter for source %d", sourceId);
81 16 : m_cv.wait(lock,
82 20 : [&]()
83 : {
84 20 : return std::none_of(m_sourceStates.begin(), m_sourceStates.end(),
85 32 : [&](const auto &state)
86 : {
87 52 : return (state.first == sourceId &&
88 62 : state.second.flushState == FlushState::FLUSHING) ||
89 30 : (state.second.flushState != FlushState::IDLE &&
90 38 : state.second.dataState == DataState::DATA_RECEIVED);
91 20 : });
92 : });
93 16 : GST_INFO("FlushAndDataSynchronizer: waitIfRequired exit for source %d", sourceId);
94 : }
95 :
96 2 : bool FlushAndDataSynchronizer::isAnySourceFlushing() const
97 : {
98 2 : std::unique_lock lock(m_mutex);
99 2 : return std::any_of(m_sourceStates.begin(), m_sourceStates.end(),
100 7 : [](const auto &state) { return state.second.flushState == FlushState::FLUSHING; });
101 2 : }
|