Line data Source code
1 : /*
2 : * If not stated otherwise in this file or this component's LICENSE file the
3 : * following copyright and licenses apply:
4 : *
5 : * Copyright 2023 Sky UK
6 : *
7 : * Licensed under the Apache License, Version 2.0 (the "License");
8 : * you may not use this file except in compliance with the License.
9 : * You may obtain a copy of the License at
10 : *
11 : * http://www.apache.org/licenses/LICENSE-2.0
12 : *
13 : * Unless required by applicable law or agreed to in writing, software
14 : * distributed under the License is distributed on an "AS IS" BASIS,
15 : * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 : * See the License for the specific language governing permissions and
17 : * limitations under the License.
18 : */
19 :
20 : #include "IpcClient.h"
21 : #include "RialtoClientLogging.h"
22 : #include <utility>
23 :
24 : namespace firebolt::rialto::client
25 : {
26 3 : IIpcClientAccessor &IIpcClientAccessor::instance()
27 : {
28 3 : static IpcClientAccessor factory;
29 3 : return factory;
30 : }
31 :
32 3 : IIpcClient &IpcClientAccessor::getIpcClient() const
33 : {
34 12 : static IpcClient ipcClient{ipc::IChannelFactory::createFactory(), ipc::IControllerFactory::createFactory(),
35 12 : ipc::IBlockingClosureFactory::createFactory()};
36 0 : return ipcClient;
37 : }
38 :
39 9 : IpcClient::IpcClient(const std::shared_ptr<ipc::IChannelFactory> &ipcChannelFactory,
40 : const std::shared_ptr<ipc::IControllerFactory> &ipcControllerFactory,
41 9 : const std::shared_ptr<ipc::IBlockingClosureFactory> &blockingClosureFactory)
42 9 : : m_ipcControllerFactory(ipcControllerFactory), m_ipcChannelFactory(ipcChannelFactory),
43 18 : m_blockingClosureFactory(blockingClosureFactory), m_disconnecting(false)
44 : {
45 : // For now, always connect the client on construction
46 9 : if (!connect())
47 : {
48 4 : throw std::runtime_error("Could not connect client");
49 : }
50 33 : }
51 :
52 10 : IpcClient::~IpcClient()
53 : {
54 5 : if (!disconnect())
55 : {
56 0 : RIALTO_CLIENT_LOG_WARN("Could not disconnect client");
57 : }
58 10 : }
59 :
60 9 : bool IpcClient::connect()
61 : {
62 9 : if (m_ipcChannel)
63 : {
64 0 : RIALTO_CLIENT_LOG_INFO("Client already connected");
65 0 : return true;
66 : }
67 :
68 : // Verify that the version of the library that we linked against is
69 : // compatible with the version of the headers we compiled against.
70 9 : GOOGLE_PROTOBUF_VERIFY_VERSION;
71 :
72 : // check if either of following env vars are set to determine the location of the rialto socket
73 : // - RIALTO_SOCKET_PATH should specify the absolute path to the socket to connect to
74 : // - RIALTO_SOCKET_FD should specify the number of a file descriptor of the socket to connect to
75 9 : const char *kRialtoPath = getenv("RIALTO_SOCKET_PATH");
76 9 : const char *kRialtoFd = getenv("RIALTO_SOCKET_FD");
77 9 : if (kRialtoFd)
78 : {
79 0 : char *end = nullptr;
80 0 : int fd = strtol(kRialtoFd, &end, 10);
81 0 : if ((errno != 0) || (kRialtoFd == end) || (*end != '\0'))
82 : {
83 0 : RIALTO_CLIENT_LOG_SYS_ERROR(errno, "Invalid value set in RIALTO_SOCKET_FD env var");
84 0 : return false;
85 : }
86 :
87 0 : m_ipcChannel = m_ipcChannelFactory->createChannel(fd);
88 : }
89 9 : else if (kRialtoPath)
90 : {
91 27 : m_ipcChannel = m_ipcChannelFactory->createChannel(kRialtoPath);
92 : }
93 : else
94 : {
95 0 : RIALTO_CLIENT_LOG_ERROR("No rialto socket specified");
96 0 : return false;
97 : }
98 :
99 : // check if the channel was opened
100 9 : if (!m_ipcChannel)
101 : {
102 4 : RIALTO_CLIENT_LOG_ERROR("Failed to open a connection to the ipc socket");
103 4 : return false;
104 : }
105 :
106 : // spin up the thread that runs the IPC event loop
107 5 : m_ipcThread = std::thread(&IpcClient::processIpcThread, this);
108 5 : if (!m_ipcThread.joinable())
109 : {
110 0 : RIALTO_CLIENT_LOG_ERROR("Failed to create thread for IPC");
111 0 : return false;
112 : }
113 :
114 5 : return true;
115 : }
116 :
117 5 : bool IpcClient::disconnect()
118 : {
119 : // Increase reference in case client disconnects from another thread
120 5 : std::shared_ptr<ipc::IChannel> ipcChannel = m_ipcChannel;
121 5 : if (!ipcChannel)
122 : {
123 : // The ipc channel may have disconnected unexpectedly, join the ipc thread if possible
124 2 : if (m_ipcThread.joinable())
125 2 : m_ipcThread.join();
126 :
127 2 : RIALTO_CLIENT_LOG_INFO("Client already disconnect");
128 2 : return true;
129 : }
130 :
131 3 : RIALTO_CLIENT_LOG_INFO("closing IPC channel");
132 3 : m_disconnecting = true;
133 :
134 : // disconnect from the server, this should terminate the thread so join that too
135 3 : ipcChannel->disconnect();
136 :
137 3 : if (m_ipcThread.joinable())
138 3 : m_ipcThread.join();
139 :
140 : // destroy the IPC channel
141 3 : m_ipcChannel.reset();
142 :
143 3 : m_disconnecting = false;
144 :
145 3 : return true;
146 5 : }
147 :
148 5 : void IpcClient::processIpcThread()
149 : {
150 5 : pthread_setname_np(pthread_self(), "rialto-ipc");
151 :
152 5 : RIALTO_CLIENT_LOG_INFO("started ipc thread");
153 :
154 8 : while (m_ipcChannel->process())
155 : {
156 3 : m_ipcChannel->wait(-1);
157 : }
158 :
159 5 : if (!m_disconnecting)
160 : {
161 2 : RIALTO_CLIENT_LOG_ERROR("The ipc channel unexpectedly disconnected, destroying the channel");
162 :
163 : // Safe to destroy the ipc objects in the ipc thread as the client has already disconnected.
164 : // This ensures the channel is destructed and that all ongoing ipc calls are unblocked.
165 2 : m_ipcChannel.reset();
166 :
167 2 : auto connectionObserver{m_connectionObserver.lock()};
168 2 : if (connectionObserver)
169 : {
170 1 : connectionObserver->onConnectionBroken();
171 : }
172 2 : }
173 :
174 5 : RIALTO_CLIENT_LOG_INFO("exiting ipc thread");
175 : }
176 :
177 3 : std::weak_ptr<::firebolt::rialto::ipc::IChannel> IpcClient::getChannel() const
178 : {
179 3 : return m_ipcChannel;
180 : }
181 :
182 1 : std::shared_ptr<ipc::IBlockingClosure> IpcClient::createBlockingClosure()
183 : {
184 : // Increase reference in case client disconnects from another thread
185 1 : std::shared_ptr<ipc::IChannel> ipcChannel = m_ipcChannel;
186 1 : if (!ipcChannel)
187 : {
188 0 : RIALTO_CLIENT_LOG_ERROR("ipc channel not connected");
189 0 : return nullptr;
190 : }
191 :
192 : // check which thread we're being called from, this determines if we pump
193 : // event loop from within the wait() method or not
194 1 : if (m_ipcThread.get_id() == std::this_thread::get_id())
195 0 : return m_blockingClosureFactory->createBlockingClosurePoll(std::move(ipcChannel));
196 : else
197 1 : return m_blockingClosureFactory->createBlockingClosureSemaphore();
198 : }
199 :
200 1 : std::shared_ptr<google::protobuf::RpcController> IpcClient::createRpcController()
201 : {
202 1 : return m_ipcControllerFactory->create();
203 : }
204 :
205 0 : bool IpcClient::reconnect()
206 : {
207 0 : RIALTO_CLIENT_LOG_INFO("Trying to reconnect channel");
208 0 : if (disconnect())
209 : {
210 0 : return connect();
211 : }
212 0 : return false;
213 : }
214 :
215 1 : void IpcClient::registerConnectionObserver(const std::weak_ptr<IConnectionObserver> &observer)
216 : {
217 1 : m_connectionObserver = observer;
218 : }
219 :
220 : }; // namespace firebolt::rialto::client
|