Line data Source code
1 : /*
2 : * If not stated otherwise in this file or this component's LICENSE file the
3 : * following copyright and licenses apply:
4 : *
5 : * Copyright 2023 Sky UK
6 : *
7 : * Licensed under the Apache License, Version 2.0 (the "License");
8 : * you may not use this file except in compliance with the License.
9 : * You may obtain a copy of the License at
10 : *
11 : * http://www.apache.org/licenses/LICENSE-2.0
12 : *
13 : * Unless required by applicable law or agreed to in writing, software
14 : * distributed under the License is distributed on an "AS IS" BASIS,
15 : * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 : * See the License for the specific language governing permissions and
17 : * limitations under the License.
18 : */
19 :
20 : #include "IpcClient.h"
21 : #include "RialtoClientLogging.h"
22 :
23 : namespace firebolt::rialto::client
24 : {
25 3 : IIpcClientAccessor &IIpcClientAccessor::instance()
26 : {
27 3 : static IpcClientAccessor factory;
28 3 : return factory;
29 : }
30 :
31 3 : IIpcClient &IpcClientAccessor::getIpcClient() const
32 : {
33 12 : static IpcClient ipcClient{ipc::IChannelFactory::createFactory(), ipc::IControllerFactory::createFactory(),
34 12 : ipc::IBlockingClosureFactory::createFactory()};
35 0 : return ipcClient;
36 : }
37 :
38 9 : IpcClient::IpcClient(const std::shared_ptr<ipc::IChannelFactory> &ipcChannelFactory,
39 : const std::shared_ptr<ipc::IControllerFactory> &ipcControllerFactory,
40 9 : const std::shared_ptr<ipc::IBlockingClosureFactory> &blockingClosureFactory)
41 9 : : m_ipcControllerFactory(ipcControllerFactory), m_ipcChannelFactory(ipcChannelFactory),
42 18 : m_blockingClosureFactory(blockingClosureFactory), m_disconnecting(false)
43 : {
44 : // For now, always connect the client on construction
45 9 : if (!connect())
46 : {
47 4 : throw std::runtime_error("Could not connect client");
48 : }
49 33 : }
50 :
51 10 : IpcClient::~IpcClient()
52 : {
53 5 : if (!disconnect())
54 : {
55 0 : RIALTO_CLIENT_LOG_WARN("Could not disconnect client");
56 : }
57 10 : }
58 :
59 9 : bool IpcClient::connect()
60 : {
61 9 : if (m_ipcChannel)
62 : {
63 0 : RIALTO_CLIENT_LOG_INFO("Client already connected");
64 0 : return true;
65 : }
66 :
67 : // Verify that the version of the library that we linked against is
68 : // compatible with the version of the headers we compiled against.
69 9 : GOOGLE_PROTOBUF_VERIFY_VERSION;
70 :
71 : // check if either of following env vars are set to determine the location of the rialto socket
72 : // - RIALTO_SOCKET_PATH should specify the absolute path to the socket to connect to
73 : // - RIALTO_SOCKET_FD should specify the number of a file descriptor of the socket to connect to
74 9 : const char *kRialtoPath = getenv("RIALTO_SOCKET_PATH");
75 9 : const char *kRialtoFd = getenv("RIALTO_SOCKET_FD");
76 9 : if (kRialtoFd)
77 : {
78 0 : char *end = nullptr;
79 0 : int fd = strtol(kRialtoFd, &end, 10);
80 0 : if ((errno != 0) || (kRialtoFd == end) || (*end != '\0'))
81 : {
82 0 : RIALTO_CLIENT_LOG_SYS_ERROR(errno, "Invalid value set in RIALTO_SOCKET_FD env var");
83 0 : return false;
84 : }
85 :
86 0 : m_ipcChannel = m_ipcChannelFactory->createChannel(fd);
87 : }
88 9 : else if (kRialtoPath)
89 : {
90 27 : m_ipcChannel = m_ipcChannelFactory->createChannel(kRialtoPath);
91 : }
92 : else
93 : {
94 0 : RIALTO_CLIENT_LOG_ERROR("No rialto socket specified");
95 0 : return false;
96 : }
97 :
98 : // check if the channel was opened
99 9 : if (!m_ipcChannel)
100 : {
101 4 : RIALTO_CLIENT_LOG_ERROR("Failed to open a connection to the ipc socket");
102 4 : return false;
103 : }
104 :
105 : // spin up the thread that runs the IPC event loop
106 5 : m_ipcThread = std::thread(&IpcClient::processIpcThread, this);
107 5 : if (!m_ipcThread.joinable())
108 : {
109 0 : RIALTO_CLIENT_LOG_ERROR("Failed to create thread for IPC");
110 0 : return false;
111 : }
112 :
113 5 : return true;
114 : }
115 :
116 5 : bool IpcClient::disconnect()
117 : {
118 : // Increase reference in case client disconnects from another thread
119 5 : std::shared_ptr<ipc::IChannel> ipcChannel = m_ipcChannel;
120 5 : if (!ipcChannel)
121 : {
122 : // The ipc channel may have disconnected unexpectedly, join the ipc thread if possible
123 2 : if (m_ipcThread.joinable())
124 2 : m_ipcThread.join();
125 :
126 2 : RIALTO_CLIENT_LOG_INFO("Client already disconnect");
127 2 : return true;
128 : }
129 :
130 3 : RIALTO_CLIENT_LOG_INFO("closing IPC channel");
131 3 : m_disconnecting = true;
132 :
133 : // disconnect from the server, this should terminate the thread so join that too
134 3 : ipcChannel->disconnect();
135 :
136 3 : if (m_ipcThread.joinable())
137 3 : m_ipcThread.join();
138 :
139 : // destroy the IPC channel
140 3 : m_ipcChannel.reset();
141 :
142 3 : m_disconnecting = false;
143 :
144 3 : return true;
145 5 : }
146 :
147 5 : void IpcClient::processIpcThread()
148 : {
149 5 : pthread_setname_np(pthread_self(), "rialto-ipc");
150 :
151 5 : RIALTO_CLIENT_LOG_INFO("started ipc thread");
152 :
153 8 : while (m_ipcChannel->process())
154 : {
155 3 : m_ipcChannel->wait(-1);
156 : }
157 :
158 5 : if (!m_disconnecting)
159 : {
160 2 : RIALTO_CLIENT_LOG_ERROR("The ipc channel unexpectedly disconnected, destroying the channel");
161 :
162 : // Safe to destroy the ipc objects in the ipc thread as the client has already disconnected.
163 : // This ensures the channel is destructed and that all ongoing ipc calls are unblocked.
164 2 : m_ipcChannel.reset();
165 :
166 2 : auto connectionObserver{m_connectionObserver.lock()};
167 2 : if (connectionObserver)
168 : {
169 1 : connectionObserver->onConnectionBroken();
170 : }
171 2 : }
172 :
173 5 : RIALTO_CLIENT_LOG_INFO("exiting ipc thread");
174 : }
175 :
176 3 : std::weak_ptr<::firebolt::rialto::ipc::IChannel> IpcClient::getChannel() const
177 : {
178 3 : return m_ipcChannel;
179 : }
180 :
181 1 : std::shared_ptr<ipc::IBlockingClosure> IpcClient::createBlockingClosure()
182 : {
183 : // Increase reference in case client disconnects from another thread
184 1 : std::shared_ptr<ipc::IChannel> ipcChannel = m_ipcChannel;
185 1 : if (!ipcChannel)
186 : {
187 0 : RIALTO_CLIENT_LOG_ERROR("ipc channel not connected");
188 0 : return nullptr;
189 : }
190 :
191 : // check which thread we're being called from, this determines if we pump
192 : // event loop from within the wait() method or not
193 1 : if (m_ipcThread.get_id() == std::this_thread::get_id())
194 0 : return m_blockingClosureFactory->createBlockingClosurePoll(ipcChannel);
195 : else
196 1 : return m_blockingClosureFactory->createBlockingClosureSemaphore();
197 : }
198 :
199 1 : std::shared_ptr<google::protobuf::RpcController> IpcClient::createRpcController()
200 : {
201 1 : return m_ipcControllerFactory->create();
202 : }
203 :
204 0 : bool IpcClient::reconnect()
205 : {
206 0 : RIALTO_CLIENT_LOG_INFO("Trying to reconnect channel");
207 0 : if (disconnect())
208 : {
209 0 : return connect();
210 : }
211 0 : return false;
212 : }
213 :
214 1 : void IpcClient::registerConnectionObserver(const std::weak_ptr<IConnectionObserver> &observer)
215 : {
216 1 : m_connectionObserver = observer;
217 : }
218 :
219 : }; // namespace firebolt::rialto::client
|