Line data Source code
1 : /*
2 : * If not stated otherwise in this file or this component's LICENSE file the
3 : * following copyright and licenses apply:
4 : *
5 : * Copyright 2022 Sky UK
6 : *
7 : * Licensed under the Apache License, Version 2.0 (the "License");
8 : * you may not use this file except in compliance with the License.
9 : * You may obtain a copy of the License at
10 : *
11 : * http://www.apache.org/licenses/LICENSE-2.0
12 : *
13 : * Unless required by applicable law or agreed to in writing, software
14 : * distributed under the License is distributed on an "AS IS" BASIS,
15 : * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 : * See the License for the specific language governing permissions and
17 : * limitations under the License.
18 : */
19 :
20 : #include "tasks/generic/HandleBusMessage.h"
21 : #include "GenericPlayerContext.h"
22 : #include "IGstGenericPlayerClient.h"
23 : #include "IGstWrapper.h"
24 : #include "RialtoServerLogging.h"
25 :
26 : namespace firebolt::rialto::server::tasks::generic
27 : {
28 29 : HandleBusMessage::HandleBusMessage(GenericPlayerContext &context, IGstGenericPlayerPrivate &player,
29 : IGstGenericPlayerClient *client,
30 : std::shared_ptr<firebolt::rialto::wrappers::IGstWrapper> gstWrapper,
31 : std::shared_ptr<firebolt::rialto::wrappers::IGlibWrapper> glibWrapper,
32 29 : GstMessage *message, bool isFlushOngoing)
33 29 : : m_context{context}, m_player{player}, m_gstPlayerClient{client}, m_gstWrapper{gstWrapper},
34 29 : m_glibWrapper{glibWrapper}, m_message{message}, m_isFlushOngoing{isFlushOngoing}
35 : {
36 29 : RIALTO_SERVER_LOG_DEBUG("Constructing HandleBusMessage");
37 : }
38 :
39 30 : HandleBusMessage::~HandleBusMessage()
40 : {
41 29 : RIALTO_SERVER_LOG_DEBUG("HandleBusMessage finished");
42 30 : }
43 :
44 28 : void HandleBusMessage::execute() const
45 : {
46 28 : RIALTO_SERVER_LOG_DEBUG("Executing HandleBusMessage");
47 28 : switch (GST_MESSAGE_TYPE(m_message))
48 : {
49 8 : case GST_MESSAGE_STATE_CHANGED:
50 : {
51 8 : if (m_context.pipeline && GST_MESSAGE_SRC(m_message) == GST_OBJECT(m_context.pipeline))
52 : {
53 : GstState oldState, newState, pending;
54 6 : m_gstWrapper->gstMessageParseStateChanged(m_message, &oldState, &newState, &pending);
55 6 : RIALTO_SERVER_LOG_INFO("State changed (old: %s, new: %s, pending: %s)",
56 : m_gstWrapper->gstElementStateGetName(oldState),
57 : m_gstWrapper->gstElementStateGetName(newState),
58 : m_gstWrapper->gstElementStateGetName(pending));
59 :
60 12 : std::string filename = std::string(m_gstWrapper->gstElementStateGetName(oldState)) + "-" +
61 18 : std::string(m_gstWrapper->gstElementStateGetName(newState));
62 6 : m_gstWrapper->gstDebugBinToDotFileWithTs(GST_BIN(m_context.pipeline), GST_DEBUG_GRAPH_SHOW_ALL,
63 : filename.c_str());
64 6 : if (!m_gstPlayerClient)
65 : {
66 1 : break;
67 : }
68 5 : switch (newState)
69 : {
70 1 : case GST_STATE_NULL:
71 : {
72 1 : m_gstPlayerClient->notifyPlaybackState(PlaybackState::STOPPED);
73 1 : break;
74 : }
75 2 : case GST_STATE_PAUSED:
76 : {
77 2 : if (pending != GST_STATE_PAUSED)
78 : {
79 : // newState==GST_STATE_PAUSED, pending==GST_STATE_PAUSED state transition is received as a result of
80 : // waiting for preroll after seek.
81 : // Subsequent newState==GST_STATE_PAUSED, pending!=GST_STATE_PAUSED transition will
82 : // indicate that the pipeline is prerolled and it reached GST_STATE_PAUSED state after seek.
83 1 : m_gstPlayerClient->notifyPlaybackState(PlaybackState::PAUSED);
84 : }
85 2 : break;
86 : }
87 2 : case GST_STATE_PLAYING:
88 : {
89 2 : if (m_context.pendingPlaybackRate != kNoPendingPlaybackRate)
90 : {
91 1 : m_player.setPendingPlaybackRate();
92 : }
93 2 : m_player.startPositionReportingAndCheckAudioUnderflowTimer();
94 :
95 2 : m_context.isPlaying = true;
96 2 : m_gstPlayerClient->notifyPlaybackState(PlaybackState::PLAYING);
97 2 : break;
98 : }
99 0 : case GST_STATE_VOID_PENDING:
100 : case GST_STATE_READY:
101 : {
102 0 : break;
103 : }
104 : }
105 6 : }
106 7 : break;
107 : }
108 5 : case GST_MESSAGE_EOS:
109 : {
110 5 : if (m_isFlushOngoing)
111 : {
112 1 : RIALTO_SERVER_LOG_WARN("Skip EOS notification - flush is ongoing");
113 1 : break;
114 : }
115 4 : if (m_context.pipeline && GST_MESSAGE_SRC(m_message) == GST_OBJECT(m_context.pipeline))
116 : {
117 2 : RIALTO_SERVER_LOG_MIL("End of stream reached.");
118 2 : if (!m_context.eosNotified && m_gstPlayerClient)
119 : {
120 1 : m_gstPlayerClient->notifyPlaybackState(PlaybackState::END_OF_STREAM);
121 1 : m_context.eosNotified = true;
122 : }
123 : }
124 4 : break;
125 : }
126 4 : case GST_MESSAGE_QOS:
127 : {
128 : GstFormat format;
129 4 : gboolean isLive = FALSE;
130 4 : guint64 runningTime = 0;
131 4 : guint64 streamTime = 0;
132 4 : guint64 timestamp = 0;
133 4 : guint64 duration = 0;
134 4 : guint64 dropped = 0;
135 4 : guint64 processed = 0;
136 :
137 4 : m_gstWrapper->gstMessageParseQos(m_message, &isLive, &runningTime, &streamTime, ×tamp, &duration);
138 4 : m_gstWrapper->gstMessageParseQosStats(m_message, &format, &processed, &dropped);
139 :
140 4 : if (GST_FORMAT_BUFFERS == format || GST_FORMAT_DEFAULT == format)
141 : {
142 3 : RIALTO_SERVER_LOG_INFO("QOS message: runningTime %" G_GUINT64_FORMAT ", streamTime %" G_GUINT64_FORMAT
143 : ", timestamp %" G_GUINT64_FORMAT ", duration %" G_GUINT64_FORMAT
144 : ", format %u, processed %" G_GUINT64_FORMAT ", dropped %" G_GUINT64_FORMAT,
145 : runningTime, streamTime, timestamp, duration, format, processed, dropped);
146 :
147 3 : if (m_gstPlayerClient)
148 : {
149 3 : firebolt::rialto::QosInfo qosInfo = {processed, dropped};
150 : const gchar *klass;
151 3 : klass = m_gstWrapper->gstElementClassGetMetadata(GST_ELEMENT_GET_CLASS(GST_MESSAGE_SRC(m_message)),
152 : GST_ELEMENT_METADATA_KLASS);
153 :
154 3 : if (g_strrstr(klass, "Video"))
155 : {
156 1 : m_gstPlayerClient->notifyQos(firebolt::rialto::MediaSourceType::VIDEO, qosInfo);
157 : }
158 2 : else if (g_strrstr(klass, "Audio"))
159 : {
160 1 : m_gstPlayerClient->notifyQos(firebolt::rialto::MediaSourceType::AUDIO, qosInfo);
161 : }
162 : else
163 : {
164 1 : RIALTO_SERVER_LOG_WARN("Unknown source type for class '%s', ignoring QOS Message", klass);
165 : }
166 : }
167 3 : }
168 : else
169 : {
170 1 : RIALTO_SERVER_LOG_WARN("Received a QOS_MESSAGE with unhandled format %s",
171 : m_gstWrapper->gstFormatGetName(format));
172 : }
173 4 : break;
174 : }
175 6 : case GST_MESSAGE_ERROR:
176 : {
177 6 : GError *err = nullptr;
178 6 : gchar *debug = nullptr;
179 6 : m_gstWrapper->gstMessageParseError(m_message, &err, &debug);
180 :
181 6 : if ((err->domain == GST_STREAM_ERROR) && (allSourcesEos()))
182 : {
183 2 : RIALTO_SERVER_LOG_WARN("Got stream error from %s. But all streams are ended, so reporting EOS. Error code "
184 : "%d: %s "
185 : "(%s).",
186 : GST_OBJECT_NAME(GST_MESSAGE_SRC(m_message)), err->code, err->message, debug);
187 2 : if (!m_context.eosNotified && m_gstPlayerClient)
188 : {
189 1 : m_gstPlayerClient->notifyPlaybackState(PlaybackState::END_OF_STREAM);
190 1 : m_context.eosNotified = true;
191 : }
192 : }
193 : else
194 : {
195 4 : RIALTO_SERVER_LOG_ERROR("Error from %s - %d: %s (%s)", GST_OBJECT_NAME(GST_MESSAGE_SRC(m_message)),
196 : err->code, err->message, debug);
197 4 : m_gstPlayerClient->notifyPlaybackState(PlaybackState::FAILURE);
198 : }
199 :
200 6 : m_glibWrapper->gFree(debug);
201 6 : m_glibWrapper->gErrorFree(err);
202 6 : break;
203 : }
204 4 : case GST_MESSAGE_WARNING:
205 : {
206 4 : PlaybackError rialtoError = PlaybackError::UNKNOWN;
207 4 : GError *err = nullptr;
208 4 : gchar *debug = nullptr;
209 4 : m_gstWrapper->gstMessageParseWarning(m_message, &err, &debug);
210 :
211 4 : if ((err->domain == GST_STREAM_ERROR) && (err->code == GST_STREAM_ERROR_DECRYPT))
212 : {
213 3 : RIALTO_SERVER_LOG_WARN("Decrypt error %s - %d: %s (%s)", GST_OBJECT_NAME(GST_MESSAGE_SRC(m_message)),
214 : err->code, err->message, debug);
215 3 : rialtoError = PlaybackError::DECRYPTION;
216 : }
217 : else
218 : {
219 1 : RIALTO_SERVER_LOG_WARN("Unknown warning, ignoring %s - %d: %s (%s)",
220 : GST_OBJECT_NAME(GST_MESSAGE_SRC(m_message)), err->code, err->message, debug);
221 : }
222 :
223 4 : if ((PlaybackError::UNKNOWN != rialtoError) && (m_gstPlayerClient))
224 : {
225 3 : const gchar *kName = GST_ELEMENT_NAME(GST_ELEMENT(GST_MESSAGE_SRC(m_message)));
226 3 : if (g_strrstr(kName, "video"))
227 : {
228 1 : m_gstPlayerClient->notifyPlaybackError(firebolt::rialto::MediaSourceType::VIDEO,
229 : PlaybackError::DECRYPTION);
230 : }
231 2 : else if (g_strrstr(kName, "audio"))
232 : {
233 1 : m_gstPlayerClient->notifyPlaybackError(firebolt::rialto::MediaSourceType::AUDIO,
234 : PlaybackError::DECRYPTION);
235 : }
236 : else
237 : {
238 1 : RIALTO_SERVER_LOG_WARN("Unknown source type for element '%s', not propagating error", kName);
239 : }
240 : }
241 :
242 4 : m_glibWrapper->gFree(debug);
243 4 : m_glibWrapper->gErrorFree(err);
244 4 : break;
245 : }
246 1 : default:
247 1 : break;
248 : }
249 :
250 28 : m_gstWrapper->gstMessageUnref(m_message);
251 : }
252 :
253 4 : bool HandleBusMessage::allSourcesEos() const
254 : {
255 8 : for (const auto &streamInfo : m_context.streamInfo)
256 : {
257 6 : const auto eosInfoIt = m_context.endOfStreamInfo.find(streamInfo.first);
258 6 : if (eosInfoIt == m_context.endOfStreamInfo.end() || eosInfoIt->second != EosState::SET)
259 : {
260 2 : return false;
261 : }
262 : }
263 2 : return true;
264 : }
265 : } // namespace firebolt::rialto::server::tasks::generic
|