Line data Source code
1 : /*
2 : * If not stated otherwise in this file or this component's LICENSE file the
3 : * following copyright and licenses apply:
4 : *
5 : * Copyright 2022 Sky UK
6 : *
7 : * Licensed under the Apache License, Version 2.0 (the "License");
8 : * you may not use this file except in compliance with the License.
9 : * You may obtain a copy of the License at
10 : *
11 : * http://www.apache.org/licenses/LICENSE-2.0
12 : *
13 : * Unless required by applicable law or agreed to in writing, software
14 : * distributed under the License is distributed on an "AS IS" BASIS,
15 : * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 : * See the License for the specific language governing permissions and
17 : * limitations under the License.
18 : */
19 :
20 : #include "GstDispatcherThread.h"
21 : #include "RialtoServerLogging.h"
22 :
23 : namespace firebolt::rialto::server
24 : {
25 2 : std::unique_ptr<IGstDispatcherThread> GstDispatcherThreadFactory::createGstDispatcherThread(
26 : IGstDispatcherThreadClient &client, GstElement *pipeline,
27 : const std::shared_ptr<IFlushOnPrerollController> &flushOnPrerollController,
28 : const std::shared_ptr<firebolt::rialto::wrappers::IGstWrapper> &gstWrapper) const
29 : {
30 2 : return std::make_unique<GstDispatcherThread>(client, pipeline, flushOnPrerollController, gstWrapper);
31 : }
32 :
33 9 : GstDispatcherThread::GstDispatcherThread(IGstDispatcherThreadClient &client, GstElement *pipeline,
34 : const std::shared_ptr<IFlushOnPrerollController> &flushOnPrerollController,
35 9 : const std::shared_ptr<firebolt::rialto::wrappers::IGstWrapper> &gstWrapper)
36 9 : : m_client{client}, m_flushOnPrerollController{flushOnPrerollController}, m_gstWrapper{gstWrapper},
37 18 : m_isGstreamerDispatcherActive{true}
38 : {
39 9 : RIALTO_SERVER_LOG_INFO("GstDispatcherThread is starting");
40 9 : m_gstBusDispatcherThread = std::thread(&GstDispatcherThread::gstBusEventHandler, this, pipeline);
41 : }
42 :
43 18 : GstDispatcherThread::~GstDispatcherThread()
44 : {
45 9 : RIALTO_SERVER_LOG_INFO("Stopping GstDispatcherThread");
46 9 : m_isGstreamerDispatcherActive = false;
47 9 : if (m_gstBusDispatcherThread.joinable())
48 : {
49 9 : m_gstBusDispatcherThread.join();
50 : }
51 18 : }
52 :
53 9 : void GstDispatcherThread::gstBusEventHandler(GstElement *pipeline)
54 : {
55 9 : GstBus *bus = m_gstWrapper->gstPipelineGetBus(GST_PIPELINE(pipeline));
56 9 : if (!bus)
57 : {
58 2 : RIALTO_SERVER_LOG_ERROR("Failed to get gst bus");
59 2 : return;
60 : }
61 :
62 26 : while (m_isGstreamerDispatcherActive)
63 : {
64 : GstMessage *message =
65 12 : m_gstWrapper->gstBusTimedPopFiltered(bus, 100 * GST_MSECOND,
66 : static_cast<GstMessageType>(GST_MESSAGE_STATE_CHANGED |
67 : GST_MESSAGE_QOS | GST_MESSAGE_EOS |
68 : GST_MESSAGE_ERROR | GST_MESSAGE_WARNING));
69 :
70 12 : if (message)
71 : {
72 11 : bool shouldHandleMessage{true};
73 11 : if (GST_MESSAGE_SRC(message) == GST_OBJECT(pipeline))
74 : {
75 10 : switch (GST_MESSAGE_TYPE(message))
76 : {
77 4 : case GST_MESSAGE_STATE_CHANGED:
78 : {
79 : GstState oldState, newState, pending;
80 4 : m_gstWrapper->gstMessageParseStateChanged(message, &oldState, &newState, &pending);
81 4 : switch (newState)
82 : {
83 1 : case GST_STATE_NULL:
84 : {
85 1 : m_isGstreamerDispatcherActive = false;
86 1 : if (m_flushOnPrerollController)
87 : {
88 1 : m_flushOnPrerollController->reset();
89 : }
90 1 : break;
91 : }
92 2 : case GST_STATE_PAUSED:
93 : {
94 2 : if (m_flushOnPrerollController && pending != GST_STATE_PAUSED)
95 : {
96 1 : m_flushOnPrerollController->stateReached(newState);
97 : }
98 1 : else if (m_flushOnPrerollController && pending == GST_STATE_PAUSED)
99 : {
100 1 : m_flushOnPrerollController->setPrerolling();
101 : }
102 2 : break;
103 : }
104 1 : case GST_STATE_PLAYING:
105 : {
106 1 : if (m_flushOnPrerollController)
107 : {
108 1 : m_flushOnPrerollController->stateReached(newState);
109 : }
110 1 : break;
111 : }
112 0 : case GST_STATE_READY:
113 : case GST_STATE_VOID_PENDING:
114 : {
115 0 : break;
116 : }
117 : }
118 4 : break;
119 : }
120 6 : case GST_MESSAGE_ERROR:
121 : {
122 6 : m_isGstreamerDispatcherActive = false;
123 6 : break;
124 : }
125 0 : default:
126 : {
127 0 : break;
128 : }
129 : }
130 : }
131 1 : else if (GST_MESSAGE_STATE_CHANGED == GST_MESSAGE_TYPE(message))
132 : {
133 : // Skip handling GST_MESSAGE_STATE_CHANGED for non-pipeline objects.
134 : // It signifficantly slows down rialto gst worker thread
135 1 : shouldHandleMessage = false;
136 1 : m_gstWrapper->gstMessageUnref(message);
137 : }
138 :
139 11 : if (shouldHandleMessage)
140 : {
141 10 : m_client.handleBusMessage(message);
142 : }
143 : }
144 : }
145 :
146 7 : RIALTO_SERVER_LOG_INFO("Gstbus dispatcher exitting");
147 7 : m_gstWrapper->gstObjectUnref(bus);
148 : }
149 : } // namespace firebolt::rialto::server
|