Line data Source code
1 : /*
2 : * If not stated otherwise in this file or this component's LICENSE file the
3 : * following copyright and licenses apply:
4 : *
5 : * Copyright 2022 Sky UK
6 : *
7 : * Licensed under the Apache License, Version 2.0 (the "License");
8 : * you may not use this file except in compliance with the License.
9 : * You may obtain a copy of the License at
10 : *
11 : * http://www.apache.org/licenses/LICENSE-2.0
12 : *
13 : * Unless required by applicable law or agreed to in writing, software
14 : * distributed under the License is distributed on an "AS IS" BASIS,
15 : * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 : * See the License for the specific language governing permissions and
17 : * limitations under the License.
18 : */
19 :
20 : #include "IpcLoop.h"
21 : #include "Client.h"
22 : #include "RialtoServerManagerLogging.h"
23 : #include <utility>
24 :
25 : namespace rialto::servermanager::ipc
26 : {
27 15 : std::shared_ptr<IpcLoop> IpcLoop::create(int socket, const Client &client)
28 : {
29 : // Verify that the version of the library that we linked against is
30 : // compatible with the version of the headers we compiled against.
31 15 : GOOGLE_PROTOBUF_VERIFY_VERSION;
32 :
33 15 : auto factory = firebolt::rialto::ipc::IChannelFactory::createFactory();
34 15 : auto channel = factory->createChannel(socket);
35 15 : if (!channel)
36 : {
37 1 : RIALTO_SERVER_MANAGER_LOG_ERROR("Connection failed");
38 1 : return nullptr;
39 : }
40 :
41 : // Wrap the channel in an object that runs the thread loop
42 14 : return std::shared_ptr<IpcLoop>(new IpcLoop(std::move(channel), client));
43 15 : }
44 :
45 14 : IpcLoop::IpcLoop(std::shared_ptr<::firebolt::rialto::ipc::IChannel> channel, const Client &client)
46 14 : : m_ipcChannel(std::move(channel)),
47 14 : m_ipcControllerFactory(firebolt::rialto::ipc::IControllerFactory::createFactory()),
48 14 : m_kClient(client), m_isClosing{false}
49 : {
50 : // spin up the thread that runs the IPC event loop
51 14 : m_ipcThread = std::thread(&IpcLoop::ipcThread, this);
52 14 : if (!m_ipcThread.joinable())
53 : {
54 0 : RIALTO_SERVER_MANAGER_LOG_ERROR("Failed to create thread for IPC");
55 0 : return;
56 : }
57 : }
58 :
59 14 : IpcLoop::~IpcLoop()
60 : {
61 14 : RIALTO_SERVER_MANAGER_LOG_INFO("closing IPC channel");
62 14 : m_isClosing = true;
63 :
64 : // disconnect from the server, this should terminate the thread so join that too
65 14 : if (m_ipcChannel)
66 14 : m_ipcChannel->disconnect();
67 :
68 14 : RIALTO_SERVER_MANAGER_LOG_INFO("terminating IPC thread");
69 :
70 14 : if (m_ipcThread.joinable())
71 14 : m_ipcThread.join();
72 :
73 : // destroy the IPC channel
74 14 : m_ipcChannel.reset();
75 : }
76 :
77 : /**
78 : * @brief Runs the poll loop for reading events and replies from the
79 : * IPC channel / socket.
80 : *
81 : *
82 : */
83 14 : void IpcLoop::ipcThread()
84 : {
85 14 : pthread_setname_np(pthread_self(), "rialtoservermanager-ipc");
86 :
87 14 : RIALTO_SERVER_MANAGER_LOG_INFO("started ipc thread");
88 :
89 37 : while (m_ipcChannel->process())
90 : {
91 23 : m_ipcChannel->wait(-1);
92 : }
93 :
94 14 : RIALTO_SERVER_MANAGER_LOG_INFO("exiting ipc thread");
95 14 : if (!m_isClosing)
96 : {
97 1 : m_kClient.onDisconnected();
98 : }
99 14 : }
100 :
101 42 : ::firebolt::rialto::ipc::IChannel *IpcLoop::channel() const
102 : {
103 42 : return m_ipcChannel.get();
104 : }
105 :
106 : /**
107 : * @brief Returns either a polling or semaphore based blocking closure
108 : * depending on the thread the function is called from.
109 : *
110 : * @param[in] channel : the channel that the closure is used for
111 : *
112 : */
113 8 : std::shared_ptr<firebolt::rialto::ipc::IBlockingClosure> IpcLoop::createBlockingClosure()
114 : {
115 8 : if (!m_ipcChannel)
116 : {
117 0 : RIALTO_SERVER_MANAGER_LOG_ERROR("ipc channel not connected");
118 0 : return nullptr;
119 : }
120 :
121 : // check which thread we're being called from, this determines if we pump
122 : // event loop from within the wait() method or not
123 8 : auto factory = firebolt::rialto::ipc::IBlockingClosureFactory::createFactory();
124 8 : if (m_ipcThread.get_id() == std::this_thread::get_id())
125 0 : return factory->createBlockingClosurePoll(m_ipcChannel);
126 : else
127 8 : return factory->createBlockingClosureSemaphore();
128 : }
129 :
130 : /**
131 : * @brief Returns a RpcController object that can be used for sending
132 : * RPC requests.
133 : *
134 : * This is just a wrapper around the ::firebolt::rialto::ipc::ControllerFactory
135 : * object.
136 : *
137 : * @param[in] channel : the channel that the closure is used for
138 : *
139 : */
140 8 : std::shared_ptr<google::protobuf::RpcController> IpcLoop::createRpcController()
141 : {
142 8 : return m_ipcControllerFactory->create();
143 : }
144 :
145 : } // namespace rialto::servermanager::ipc
|