Line data Source code
1 : /*
2 : * If not stated otherwise in this file or this component's LICENSE file the
3 : * following copyright and licenses apply:
4 : *
5 : * Copyright 2022 Sky UK
6 : *
7 : * Licensed under the Apache License, Version 2.0 (the "License");
8 : * you may not use this file except in compliance with the License.
9 : * You may obtain a copy of the License at
10 : *
11 : * http://www.apache.org/licenses/LICENSE-2.0
12 : *
13 : * Unless required by applicable law or agreed to in writing, software
14 : * distributed under the License is distributed on an "AS IS" BASIS,
15 : * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 : * See the License for the specific language governing permissions and
17 : * limitations under the License.
18 : */
19 :
20 : #include "tasks/generic/HandleBusMessage.h"
21 : #include "GenericPlayerContext.h"
22 : #include "IGstGenericPlayerClient.h"
23 : #include "IGstWrapper.h"
24 : #include "RialtoServerLogging.h"
25 :
26 : namespace firebolt::rialto::server::tasks::generic
27 : {
28 36 : HandleBusMessage::HandleBusMessage(GenericPlayerContext &context, IGstGenericPlayerPrivate &player,
29 : IGstGenericPlayerClient *client,
30 : const std::shared_ptr<firebolt::rialto::wrappers::IGstWrapper> &gstWrapper,
31 : const std::shared_ptr<firebolt::rialto::wrappers::IGlibWrapper> &glibWrapper,
32 36 : GstMessage *message, const IFlushWatcher &flushWatcher)
33 36 : : m_context{context}, m_player{player}, m_gstPlayerClient{client}, m_gstWrapper{gstWrapper},
34 36 : m_glibWrapper{glibWrapper}, m_message{message}, m_flushWatcher{flushWatcher},
35 36 : m_isFlushOngoingDuringCreation{flushWatcher.isFlushOngoing()},
36 72 : m_isAsyncFlushOngoingDuringCreation{flushWatcher.isAsyncFlushOngoing()}
37 : {
38 36 : RIALTO_SERVER_LOG_DEBUG("Constructing HandleBusMessage");
39 : }
40 :
41 37 : HandleBusMessage::~HandleBusMessage()
42 : {
43 36 : RIALTO_SERVER_LOG_DEBUG("HandleBusMessage finished");
44 37 : }
45 :
46 35 : void HandleBusMessage::execute() const
47 : {
48 35 : RIALTO_SERVER_LOG_DEBUG("Executing HandleBusMessage");
49 35 : switch (GST_MESSAGE_TYPE(m_message))
50 : {
51 14 : case GST_MESSAGE_STATE_CHANGED:
52 : {
53 14 : if (m_context.pipeline && GST_MESSAGE_SRC(m_message) == GST_OBJECT(m_context.pipeline))
54 : {
55 : GstState oldState, newState, pending;
56 12 : m_gstWrapper->gstMessageParseStateChanged(m_message, &oldState, &newState, &pending);
57 12 : const char *oldStateName = m_gstWrapper->gstElementStateGetName(oldState);
58 12 : const char *newStateName = m_gstWrapper->gstElementStateGetName(newState);
59 12 : const char *pendingStateName = m_gstWrapper->gstElementStateGetName(pending);
60 12 : RIALTO_SERVER_LOG_MIL("State changed (old: %s, new: %s, pending: %s)", oldStateName, newStateName,
61 : pendingStateName);
62 48 : auto recordId = m_context.gstProfiler->createRecord("Pipeline State Changed", newStateName);
63 12 : if (recordId)
64 0 : m_context.gstProfiler->logRecord(recordId.value());
65 12 : if (newState == GST_STATE_PLAYING)
66 5 : m_context.gstProfiler->logPipelineSummary();
67 :
68 36 : std::string filename = std::string(oldStateName) + "-" + std::string(newStateName);
69 12 : m_gstWrapper->gstDebugBinToDotFileWithTs(GST_BIN(m_context.pipeline), GST_DEBUG_GRAPH_SHOW_ALL,
70 : filename.c_str());
71 12 : if (!m_gstPlayerClient)
72 : {
73 1 : break;
74 : }
75 11 : switch (newState)
76 : {
77 1 : case GST_STATE_NULL:
78 : {
79 1 : m_gstPlayerClient->notifyPlaybackState(PlaybackState::STOPPED);
80 1 : break;
81 : }
82 5 : case GST_STATE_PAUSED:
83 : {
84 5 : m_player.startNotifyPlaybackInfoTimer();
85 5 : m_player.stopPositionReportingAndCheckAudioUnderflowTimer();
86 5 : if (pending != GST_STATE_PAUSED)
87 : {
88 : // If async flush was requested before HandleBusMessage task creation (but it was not executed yet)
89 : // or if async flush was created after HandleBusMessage task creation (but before its execution)
90 : // we can't report playback state, because async flush causes state loss - reported state is probably invalid.
91 4 : if (m_isAsyncFlushOngoingDuringCreation || m_flushWatcher.isAsyncFlushOngoing())
92 : {
93 2 : RIALTO_SERVER_LOG_WARN("Skip PAUSED notification - flush is ongoing");
94 2 : break;
95 : }
96 : // newState==GST_STATE_PAUSED, pending==GST_STATE_PAUSED state transition is received as a result of
97 : // waiting for preroll after seek.
98 : // Subsequent newState==GST_STATE_PAUSED, pending!=GST_STATE_PAUSED transition will
99 : // indicate that the pipeline is prerolled and it reached GST_STATE_PAUSED state after seek.
100 2 : m_gstPlayerClient->notifyPlaybackState(PlaybackState::PAUSED);
101 : }
102 :
103 3 : if (m_player.hasSourceType(MediaSourceType::SUBTITLE))
104 : {
105 0 : m_player.stopSubtitleClockResyncTimer();
106 : }
107 3 : break;
108 : }
109 5 : case GST_STATE_PLAYING:
110 : {
111 : // If async flush was requested before HandleBusMessage task creation (but it was not executed yet)
112 : // or if async flush was created after HandleBusMessage task creation (but before its execution)
113 : // we can't report playback state, because async flush causes state loss - reported state is probably invalid.
114 5 : if (m_isAsyncFlushOngoingDuringCreation || m_flushWatcher.isAsyncFlushOngoing())
115 : {
116 2 : RIALTO_SERVER_LOG_WARN("Skip PLAYING notification - flush is ongoing");
117 2 : break;
118 : }
119 3 : if (m_context.pendingPlaybackRate != kNoPendingPlaybackRate)
120 : {
121 1 : m_player.setPendingPlaybackRate();
122 : }
123 3 : m_player.startPositionReportingAndCheckAudioUnderflowTimer();
124 3 : if (m_player.hasSourceType(MediaSourceType::SUBTITLE))
125 : {
126 0 : m_player.startSubtitleClockResyncTimer();
127 : }
128 :
129 3 : m_context.isPlaying = true;
130 3 : m_gstPlayerClient->notifyPlaybackState(PlaybackState::PLAYING);
131 3 : break;
132 : }
133 0 : case GST_STATE_VOID_PENDING:
134 : {
135 0 : break;
136 : }
137 0 : case GST_STATE_READY:
138 : {
139 0 : m_player.stopNotifyPlaybackInfoTimer();
140 0 : break;
141 : }
142 : }
143 12 : }
144 13 : break;
145 : }
146 6 : case GST_MESSAGE_EOS:
147 : {
148 : // If flush was requested before HandleBusMessage task creation (but it was not executed yet)
149 : // or if flush was created after HandleBusMessage task creation (but before its execution)
150 : // we can't report EOS, because flush clears EOS.
151 6 : if (m_isFlushOngoingDuringCreation || m_flushWatcher.isFlushOngoing())
152 : {
153 2 : RIALTO_SERVER_LOG_WARN("Skip EOS notification - flush is ongoing");
154 2 : break;
155 : }
156 4 : if (m_context.pipeline && GST_MESSAGE_SRC(m_message) == GST_OBJECT(m_context.pipeline))
157 : {
158 2 : RIALTO_SERVER_LOG_MIL("End of stream reached.");
159 2 : if (!m_context.eosNotified && m_gstPlayerClient)
160 : {
161 1 : m_gstPlayerClient->notifyPlaybackState(PlaybackState::END_OF_STREAM);
162 1 : m_context.eosNotified = true;
163 : }
164 : }
165 4 : break;
166 : }
167 4 : case GST_MESSAGE_QOS:
168 : {
169 : GstFormat format;
170 4 : gboolean isLive = FALSE;
171 4 : guint64 runningTime = 0;
172 4 : guint64 streamTime = 0;
173 4 : guint64 timestamp = 0;
174 4 : guint64 duration = 0;
175 4 : guint64 dropped = 0;
176 4 : guint64 processed = 0;
177 :
178 4 : m_gstWrapper->gstMessageParseQos(m_message, &isLive, &runningTime, &streamTime, ×tamp, &duration);
179 4 : m_gstWrapper->gstMessageParseQosStats(m_message, &format, &processed, &dropped);
180 :
181 4 : if (GST_FORMAT_BUFFERS == format || GST_FORMAT_DEFAULT == format)
182 : {
183 3 : RIALTO_SERVER_LOG_INFO("QOS message: runningTime %" G_GUINT64_FORMAT ", streamTime %" G_GUINT64_FORMAT
184 : ", timestamp %" G_GUINT64_FORMAT ", duration %" G_GUINT64_FORMAT
185 : ", format %u, processed %" G_GUINT64_FORMAT ", dropped %" G_GUINT64_FORMAT,
186 : runningTime, streamTime, timestamp, duration, format, processed, dropped);
187 :
188 3 : if (m_gstPlayerClient)
189 : {
190 3 : firebolt::rialto::QosInfo qosInfo = {processed, dropped};
191 : const gchar *klass;
192 3 : klass = m_gstWrapper->gstElementClassGetMetadata(GST_ELEMENT_GET_CLASS(GST_MESSAGE_SRC(m_message)),
193 : GST_ELEMENT_METADATA_KLASS);
194 :
195 3 : if (g_strrstr(klass, "Video"))
196 : {
197 1 : m_gstPlayerClient->notifyQos(firebolt::rialto::MediaSourceType::VIDEO, qosInfo);
198 : }
199 2 : else if (g_strrstr(klass, "Audio"))
200 : {
201 1 : m_gstPlayerClient->notifyQos(firebolt::rialto::MediaSourceType::AUDIO, qosInfo);
202 : }
203 : else
204 : {
205 1 : RIALTO_SERVER_LOG_WARN("Unknown source type for class '%s', ignoring QOS Message", klass);
206 : }
207 : }
208 3 : }
209 : else
210 : {
211 1 : RIALTO_SERVER_LOG_WARN("Received a QOS_MESSAGE with unhandled format %s",
212 : m_gstWrapper->gstFormatGetName(format));
213 : }
214 4 : break;
215 : }
216 6 : case GST_MESSAGE_ERROR:
217 : {
218 6 : GError *err = nullptr;
219 6 : gchar *debug = nullptr;
220 6 : m_gstWrapper->gstMessageParseError(m_message, &err, &debug);
221 :
222 6 : if ((err->domain == GST_STREAM_ERROR) && (allSourcesEos()))
223 : {
224 2 : RIALTO_SERVER_LOG_WARN("Got stream error from %s. But all streams are ended, so reporting EOS. Error code "
225 : "%d: %s "
226 : "(%s).",
227 : GST_OBJECT_NAME(GST_MESSAGE_SRC(m_message)), err->code, err->message, debug);
228 2 : if (!m_context.eosNotified && m_gstPlayerClient)
229 : {
230 1 : m_gstPlayerClient->notifyPlaybackState(PlaybackState::END_OF_STREAM);
231 1 : m_context.eosNotified = true;
232 : }
233 : }
234 : else
235 : {
236 4 : RIALTO_SERVER_LOG_ERROR("Error from %s - %d: %s (%s)", GST_OBJECT_NAME(GST_MESSAGE_SRC(m_message)),
237 : err->code, err->message, debug);
238 4 : m_gstPlayerClient->notifyPlaybackState(PlaybackState::FAILURE);
239 : }
240 :
241 6 : m_glibWrapper->gFree(debug);
242 6 : m_glibWrapper->gErrorFree(err);
243 6 : break;
244 : }
245 4 : case GST_MESSAGE_WARNING:
246 : {
247 4 : PlaybackError rialtoError = PlaybackError::UNKNOWN;
248 4 : GError *err = nullptr;
249 4 : gchar *debug = nullptr;
250 4 : m_gstWrapper->gstMessageParseWarning(m_message, &err, &debug);
251 :
252 4 : if ((err->domain == GST_STREAM_ERROR) && (err->code == GST_STREAM_ERROR_DECRYPT))
253 : {
254 3 : RIALTO_SERVER_LOG_WARN("Decrypt error %s - %d: %s (%s)", GST_OBJECT_NAME(GST_MESSAGE_SRC(m_message)),
255 : err->code, err->message, debug);
256 3 : rialtoError = PlaybackError::DECRYPTION;
257 : }
258 : else
259 : {
260 1 : RIALTO_SERVER_LOG_WARN("Unknown warning, ignoring %s - %d: %s (%s)",
261 : GST_OBJECT_NAME(GST_MESSAGE_SRC(m_message)), err->code, err->message, debug);
262 : }
263 :
264 4 : if ((PlaybackError::UNKNOWN != rialtoError) && (m_gstPlayerClient))
265 : {
266 3 : const gchar *kName = GST_ELEMENT_NAME(GST_ELEMENT(GST_MESSAGE_SRC(m_message)));
267 3 : if (g_strrstr(kName, "video"))
268 : {
269 1 : m_gstPlayerClient->notifyPlaybackError(firebolt::rialto::MediaSourceType::VIDEO,
270 : PlaybackError::DECRYPTION);
271 : }
272 2 : else if (g_strrstr(kName, "audio"))
273 : {
274 1 : m_gstPlayerClient->notifyPlaybackError(firebolt::rialto::MediaSourceType::AUDIO,
275 : PlaybackError::DECRYPTION);
276 : }
277 : else
278 : {
279 1 : RIALTO_SERVER_LOG_WARN("Unknown source type for element '%s', not propagating error", kName);
280 : }
281 : }
282 :
283 4 : m_glibWrapper->gFree(debug);
284 4 : m_glibWrapper->gErrorFree(err);
285 4 : break;
286 : }
287 1 : default:
288 1 : break;
289 : }
290 :
291 35 : m_gstWrapper->gstMessageUnref(m_message);
292 : }
293 :
294 4 : bool HandleBusMessage::allSourcesEos() const
295 : {
296 8 : for (const auto &streamInfo : m_context.streamInfo)
297 : {
298 6 : const auto eosInfoIt = m_context.endOfStreamInfo.find(streamInfo.first);
299 6 : if (eosInfoIt == m_context.endOfStreamInfo.end() || eosInfoIt->second != EosState::SET)
300 : {
301 2 : return false;
302 : }
303 : }
304 2 : return true;
305 : }
306 : } // namespace firebolt::rialto::server::tasks::generic
|