Line data Source code
1 : /*
2 : * If not stated otherwise in this file or this component's LICENSE file the
3 : * following copyright and licenses apply:
4 : *
5 : * Copyright 2022 Sky UK
6 : *
7 : * Licensed under the Apache License, Version 2.0 (the "License");
8 : * you may not use this file except in compliance with the License.
9 : * You may obtain a copy of the License at
10 : *
11 : * http://www.apache.org/licenses/LICENSE-2.0
12 : *
13 : * Unless required by applicable law or agreed to in writing, software
14 : * distributed under the License is distributed on an "AS IS" BASIS,
15 : * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 : * See the License for the specific language governing permissions and
17 : * limitations under the License.
18 : */
19 :
20 : #include "GstDispatcherThread.h"
21 : #include "RialtoServerLogging.h"
22 :
23 : namespace firebolt::rialto::server
24 : {
25 2 : std::unique_ptr<IGstDispatcherThread> GstDispatcherThreadFactory::createGstDispatcherThread(
26 : IGstDispatcherThreadClient &client, GstElement *pipeline,
27 : const std::shared_ptr<firebolt::rialto::wrappers::IGstWrapper> &gstWrapper) const
28 : {
29 2 : return std::make_unique<GstDispatcherThread>(client, pipeline, gstWrapper);
30 : }
31 :
32 7 : GstDispatcherThread::GstDispatcherThread(IGstDispatcherThreadClient &client, GstElement *pipeline,
33 7 : const std::shared_ptr<firebolt::rialto::wrappers::IGstWrapper> &gstWrapper)
34 7 : : m_client{client}, m_gstWrapper{gstWrapper}, m_isGstreamerDispatcherActive{true}
35 : {
36 7 : RIALTO_SERVER_LOG_INFO("GstDispatcherThread is starting");
37 7 : m_gstBusDispatcherThread = std::thread(&GstDispatcherThread::gstBusEventHandler, this, pipeline);
38 : }
39 :
40 14 : GstDispatcherThread::~GstDispatcherThread()
41 : {
42 7 : RIALTO_SERVER_LOG_INFO("Stopping GstDispatcherThread");
43 7 : m_isGstreamerDispatcherActive = false;
44 7 : if (m_gstBusDispatcherThread.joinable())
45 : {
46 7 : m_gstBusDispatcherThread.join();
47 : }
48 14 : }
49 :
50 7 : void GstDispatcherThread::gstBusEventHandler(GstElement *pipeline)
51 : {
52 7 : GstBus *bus = m_gstWrapper->gstPipelineGetBus(GST_PIPELINE(pipeline));
53 7 : if (!bus)
54 : {
55 2 : RIALTO_SERVER_LOG_ERROR("Failed to get gst bus");
56 2 : return;
57 : }
58 :
59 18 : while (m_isGstreamerDispatcherActive)
60 : {
61 : GstMessage *message =
62 8 : m_gstWrapper->gstBusTimedPopFiltered(bus, 100 * GST_MSECOND,
63 : static_cast<GstMessageType>(GST_MESSAGE_STATE_CHANGED |
64 : GST_MESSAGE_QOS | GST_MESSAGE_EOS |
65 : GST_MESSAGE_ERROR | GST_MESSAGE_WARNING));
66 :
67 8 : if (message)
68 : {
69 7 : bool shouldHandleMessage{true};
70 7 : if (GST_MESSAGE_SRC(message) == GST_OBJECT(pipeline))
71 : {
72 6 : switch (GST_MESSAGE_TYPE(message))
73 : {
74 2 : case GST_MESSAGE_STATE_CHANGED:
75 : {
76 : GstState oldState, newState, pending;
77 2 : m_gstWrapper->gstMessageParseStateChanged(message, &oldState, &newState, &pending);
78 2 : switch (newState)
79 : {
80 1 : case GST_STATE_NULL:
81 : {
82 1 : m_isGstreamerDispatcherActive = false;
83 : }
84 2 : case GST_STATE_READY:
85 : case GST_STATE_PAUSED:
86 : case GST_STATE_PLAYING:
87 : case GST_STATE_VOID_PENDING:
88 : {
89 2 : break;
90 : }
91 : }
92 2 : break;
93 : }
94 4 : case GST_MESSAGE_ERROR:
95 : {
96 4 : m_isGstreamerDispatcherActive = false;
97 4 : break;
98 : }
99 0 : default:
100 : {
101 0 : break;
102 : }
103 : }
104 : }
105 1 : else if (GST_MESSAGE_STATE_CHANGED == GST_MESSAGE_TYPE(message))
106 : {
107 : // Skip handling GST_MESSAGE_STATE_CHANGED for non-pipeline objects.
108 : // It signifficantly slows down rialto gst worker thread
109 1 : shouldHandleMessage = false;
110 : }
111 :
112 7 : if (shouldHandleMessage)
113 : {
114 6 : m_client.handleBusMessage(message);
115 : }
116 : }
117 : }
118 :
119 5 : RIALTO_SERVER_LOG_INFO("Gstbus dispatcher exitting");
120 5 : m_gstWrapper->gstObjectUnref(bus);
121 : }
122 : } // namespace firebolt::rialto::server
|