Line data Source code
1 : /*
2 : * If not stated otherwise in this file or this component's LICENSE file the
3 : * following copyright and licenses apply:
4 : *
5 : * Copyright 2022 Sky UK
6 : *
7 : * Licensed under the Apache License, Version 2.0 (the "License");
8 : * you may not use this file except in compliance with the License.
9 : * You may obtain a copy of the License at
10 : *
11 : * http://www.apache.org/licenses/LICENSE-2.0
12 : *
13 : * Unless required by applicable law or agreed to in writing, software
14 : * distributed under the License is distributed on an "AS IS" BASIS,
15 : * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 : * See the License for the specific language governing permissions and
17 : * limitations under the License.
18 : */
19 :
20 : #include "GstDispatcherThread.h"
21 : #include "RialtoServerLogging.h"
22 :
23 : namespace firebolt::rialto::server
24 : {
25 2 : std::unique_ptr<IGstDispatcherThread> GstDispatcherThreadFactory::createGstDispatcherThread(
26 : IGstDispatcherThreadClient &client, GstElement *pipeline,
27 : const std::shared_ptr<firebolt::rialto::wrappers::IGstWrapper> &gstWrapper) const
28 : {
29 2 : return std::make_unique<GstDispatcherThread>(client, pipeline, gstWrapper);
30 : }
31 :
32 8 : GstDispatcherThread::GstDispatcherThread(IGstDispatcherThreadClient &client, GstElement *pipeline,
33 8 : const std::shared_ptr<firebolt::rialto::wrappers::IGstWrapper> &gstWrapper)
34 8 : : m_client{client}, m_gstWrapper{gstWrapper}, m_isGstreamerDispatcherActive{true}
35 : {
36 8 : RIALTO_SERVER_LOG_INFO("GstDispatcherThread is starting");
37 8 : m_gstBusDispatcherThread = std::thread(&GstDispatcherThread::gstBusEventHandler, this, pipeline);
38 : }
39 :
40 16 : GstDispatcherThread::~GstDispatcherThread()
41 : {
42 8 : RIALTO_SERVER_LOG_INFO("Stopping GstDispatcherThread");
43 8 : m_isGstreamerDispatcherActive = false;
44 8 : if (m_gstBusDispatcherThread.joinable())
45 : {
46 8 : m_gstBusDispatcherThread.join();
47 : }
48 16 : }
49 :
50 8 : void GstDispatcherThread::gstBusEventHandler(GstElement *pipeline)
51 : {
52 8 : GstBus *bus = m_gstWrapper->gstPipelineGetBus(GST_PIPELINE(pipeline));
53 8 : if (!bus)
54 : {
55 2 : RIALTO_SERVER_LOG_ERROR("Failed to get gst bus");
56 2 : return;
57 : }
58 :
59 21 : while (m_isGstreamerDispatcherActive)
60 : {
61 : GstMessage *message =
62 9 : m_gstWrapper->gstBusTimedPopFiltered(bus, 100 * GST_MSECOND,
63 : static_cast<GstMessageType>(GST_MESSAGE_STATE_CHANGED |
64 : GST_MESSAGE_QOS | GST_MESSAGE_EOS |
65 : GST_MESSAGE_ERROR | GST_MESSAGE_WARNING));
66 :
67 9 : if (message)
68 : {
69 8 : bool shouldHandleMessage{true};
70 8 : bool isPrioritised{false};
71 8 : if (GST_MESSAGE_SRC(message) == GST_OBJECT(pipeline))
72 : {
73 7 : switch (GST_MESSAGE_TYPE(message))
74 : {
75 2 : case GST_MESSAGE_STATE_CHANGED:
76 : {
77 2 : isPrioritised = true;
78 : GstState oldState, newState, pending;
79 2 : m_gstWrapper->gstMessageParseStateChanged(message, &oldState, &newState, &pending);
80 2 : switch (newState)
81 : {
82 1 : case GST_STATE_NULL:
83 : {
84 1 : m_isGstreamerDispatcherActive = false;
85 : }
86 2 : case GST_STATE_READY:
87 : case GST_STATE_PAUSED:
88 : case GST_STATE_PLAYING:
89 : case GST_STATE_VOID_PENDING:
90 : {
91 2 : break;
92 : }
93 : }
94 2 : break;
95 : }
96 5 : case GST_MESSAGE_ERROR:
97 : {
98 5 : m_isGstreamerDispatcherActive = false;
99 5 : break;
100 : }
101 0 : default:
102 : {
103 0 : break;
104 : }
105 : }
106 : }
107 1 : else if (GST_MESSAGE_STATE_CHANGED == GST_MESSAGE_TYPE(message))
108 : {
109 : // Skip handling GST_MESSAGE_STATE_CHANGED for non-pipeline objects.
110 : // It signifficantly slows down rialto gst worker thread
111 1 : shouldHandleMessage = false;
112 : }
113 :
114 8 : if (shouldHandleMessage)
115 : {
116 7 : m_client.handleBusMessage(message, isPrioritised);
117 : }
118 : }
119 : }
120 :
121 6 : RIALTO_SERVER_LOG_INFO("Gstbus dispatcher exitting");
122 6 : m_gstWrapper->gstObjectUnref(bus);
123 : }
124 : } // namespace firebolt::rialto::server
|