Line data Source code
1 : /*
2 : * If not stated otherwise in this file or this component's LICENSE file the
3 : * following copyright and licenses apply:
4 : *
5 : * Copyright 2022 Sky UK
6 : *
7 : * Licensed under the Apache License, Version 2.0 (the "License");
8 : * you may not use this file except in compliance with the License.
9 : * You may obtain a copy of the License at
10 : *
11 : * http://www.apache.org/licenses/LICENSE-2.0
12 : *
13 : * Unless required by applicable law or agreed to in writing, software
14 : * distributed under the License is distributed on an "AS IS" BASIS,
15 : * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 : * See the License for the specific language governing permissions and
17 : * limitations under the License.
18 : */
19 :
20 : #include "GstDispatcherThread.h"
21 : #include "RialtoServerLogging.h"
22 :
23 : namespace firebolt::rialto::server
24 : {
25 2 : std::unique_ptr<IGstDispatcherThread> GstDispatcherThreadFactory::createGstDispatcherThread(
26 : IGstDispatcherThreadClient &client, GstElement *pipeline,
27 : const std::shared_ptr<firebolt::rialto::wrappers::IGstWrapper> &gstWrapper) const
28 : {
29 2 : return std::make_unique<GstDispatcherThread>(client, pipeline, gstWrapper);
30 : }
31 :
32 6 : GstDispatcherThread::GstDispatcherThread(IGstDispatcherThreadClient &client, GstElement *pipeline,
33 6 : const std::shared_ptr<firebolt::rialto::wrappers::IGstWrapper> &gstWrapper)
34 6 : : m_client{client}, m_gstWrapper{gstWrapper}, m_isGstreamerDispatcherActive{true}
35 : {
36 6 : RIALTO_SERVER_LOG_INFO("GstDispatcherThread is starting");
37 6 : m_gstBusDispatcherThread = std::thread(&GstDispatcherThread::gstBusEventHandler, this, pipeline);
38 : }
39 :
40 12 : GstDispatcherThread::~GstDispatcherThread()
41 : {
42 6 : RIALTO_SERVER_LOG_INFO("Stopping GstDispatcherThread");
43 6 : m_isGstreamerDispatcherActive = false;
44 6 : if (m_gstBusDispatcherThread.joinable())
45 : {
46 6 : m_gstBusDispatcherThread.join();
47 : }
48 12 : }
49 :
50 6 : void GstDispatcherThread::gstBusEventHandler(GstElement *pipeline)
51 : {
52 6 : GstBus *bus = m_gstWrapper->gstPipelineGetBus(GST_PIPELINE(pipeline));
53 6 : if (!bus)
54 : {
55 2 : RIALTO_SERVER_LOG_ERROR("Failed to get gst bus");
56 2 : return;
57 : }
58 :
59 10 : while (m_isGstreamerDispatcherActive)
60 : {
61 : GstMessage *message =
62 6 : m_gstWrapper->gstBusTimedPopFiltered(bus, 100 * GST_MSECOND,
63 : static_cast<GstMessageType>(GST_MESSAGE_STATE_CHANGED |
64 : GST_MESSAGE_QOS | GST_MESSAGE_EOS |
65 : GST_MESSAGE_ERROR | GST_MESSAGE_WARNING));
66 :
67 6 : if (message)
68 : {
69 5 : if (GST_MESSAGE_SRC(message) == GST_OBJECT(pipeline))
70 : {
71 5 : switch (GST_MESSAGE_TYPE(message))
72 : {
73 2 : case GST_MESSAGE_STATE_CHANGED:
74 : {
75 : GstState oldState, newState, pending;
76 2 : m_gstWrapper->gstMessageParseStateChanged(message, &oldState, &newState, &pending);
77 2 : switch (newState)
78 : {
79 1 : case GST_STATE_NULL:
80 : {
81 1 : m_isGstreamerDispatcherActive = false;
82 : }
83 2 : case GST_STATE_READY:
84 : case GST_STATE_PAUSED:
85 : case GST_STATE_PLAYING:
86 : case GST_STATE_VOID_PENDING:
87 : {
88 2 : break;
89 : }
90 : }
91 2 : break;
92 : }
93 3 : case GST_MESSAGE_ERROR:
94 : {
95 3 : m_isGstreamerDispatcherActive = false;
96 3 : break;
97 : }
98 0 : default:
99 : {
100 0 : break;
101 : }
102 : }
103 : }
104 :
105 5 : m_client.handleBusMessage(message);
106 : }
107 : }
108 :
109 4 : RIALTO_SERVER_LOG_INFO("Gstbus dispatcher exitting");
110 4 : m_gstWrapper->gstObjectUnref(bus);
111 : }
112 : } // namespace firebolt::rialto::server
|