LCOV - code coverage report
Current view: top level - serverManager/ipc/source - IpcLoop.cpp (source / functions) Coverage Total Hit
Test: coverage.info Lines: 89.4 % 47 42
Test Date: 2025-02-18 13:13:53 Functions: 100.0 % 7 7

            Line data    Source code
       1              : /*
       2              :  * If not stated otherwise in this file or this component's LICENSE file the
       3              :  * following copyright and licenses apply:
       4              :  *
       5              :  * Copyright 2022 Sky UK
       6              :  *
       7              :  * Licensed under the Apache License, Version 2.0 (the "License");
       8              :  * you may not use this file except in compliance with the License.
       9              :  * You may obtain a copy of the License at
      10              :  *
      11              :  * http://www.apache.org/licenses/LICENSE-2.0
      12              :  *
      13              :  * Unless required by applicable law or agreed to in writing, software
      14              :  * distributed under the License is distributed on an "AS IS" BASIS,
      15              :  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      16              :  * See the License for the specific language governing permissions and
      17              :  * limitations under the License.
      18              :  */
      19              : 
      20              : #include "IpcLoop.h"
      21              : #include "Client.h"
      22              : #include "RialtoServerManagerLogging.h"
      23              : #include <utility>
      24              : 
      25              : namespace rialto::servermanager::ipc
      26              : {
      27           15 : std::shared_ptr<IpcLoop> IpcLoop::create(int socket, const Client &client)
      28              : {
      29              :     // Verify that the version of the library that we linked against is
      30              :     // compatible with the version of the headers we compiled against.
      31           15 :     GOOGLE_PROTOBUF_VERIFY_VERSION;
      32              : 
      33           15 :     auto factory = firebolt::rialto::ipc::IChannelFactory::createFactory();
      34           15 :     auto channel = factory->createChannel(socket);
      35           15 :     if (!channel)
      36              :     {
      37            1 :         RIALTO_SERVER_MANAGER_LOG_ERROR("Connection failed");
      38            1 :         return nullptr;
      39              :     }
      40              : 
      41              :     // Wrap the channel in an object that runs the thread loop
      42           14 :     return std::shared_ptr<IpcLoop>(new IpcLoop(std::move(channel), client));
      43           15 : }
      44              : 
      45           14 : IpcLoop::IpcLoop(std::shared_ptr<::firebolt::rialto::ipc::IChannel> channel, const Client &client)
      46           14 :     : m_ipcChannel(std::move(channel)),
      47           14 :       m_ipcControllerFactory(firebolt::rialto::ipc::IControllerFactory::createFactory()),
      48           14 :       m_kClient(client), m_isClosing{false}
      49              : {
      50              :     // spin up the thread that runs the IPC event loop
      51           14 :     m_ipcThread = std::thread(&IpcLoop::ipcThread, this);
      52           14 :     if (!m_ipcThread.joinable())
      53              :     {
      54            0 :         RIALTO_SERVER_MANAGER_LOG_ERROR("Failed to create thread for IPC");
      55            0 :         return;
      56              :     }
      57              : }
      58              : 
      59           14 : IpcLoop::~IpcLoop()
      60              : {
      61           14 :     RIALTO_SERVER_MANAGER_LOG_INFO("closing IPC channel");
      62           14 :     m_isClosing = true;
      63              : 
      64              :     // disconnect from the server, this should terminate the thread so join that too
      65           14 :     if (m_ipcChannel)
      66           14 :         m_ipcChannel->disconnect();
      67              : 
      68           14 :     RIALTO_SERVER_MANAGER_LOG_INFO("terminating IPC thread");
      69              : 
      70           14 :     if (m_ipcThread.joinable())
      71           14 :         m_ipcThread.join();
      72              : 
      73              :     // destroy the IPC channel
      74           14 :     m_ipcChannel.reset();
      75              : }
      76              : 
      77              : /**
      78              :  * @brief Runs the poll loop for reading events and replies from the
      79              :  * IPC channel / socket.
      80              :  *
      81              :  *
      82              :  */
      83           14 : void IpcLoop::ipcThread()
      84              : {
      85           14 :     pthread_setname_np(pthread_self(), "rialtoservermanager-ipc");
      86              : 
      87           14 :     RIALTO_SERVER_MANAGER_LOG_INFO("started ipc thread");
      88              : 
      89           37 :     while (m_ipcChannel->process())
      90              :     {
      91           23 :         m_ipcChannel->wait(-1);
      92              :     }
      93              : 
      94           14 :     RIALTO_SERVER_MANAGER_LOG_INFO("exiting ipc thread");
      95           14 :     if (!m_isClosing)
      96              :     {
      97            1 :         m_kClient.onDisconnected();
      98              :     }
      99           14 : }
     100              : 
     101           42 : ::firebolt::rialto::ipc::IChannel *IpcLoop::channel() const
     102              : {
     103           42 :     return m_ipcChannel.get();
     104              : }
     105              : 
     106              : /**
     107              :  * @brief Returns either a polling or semaphore based blocking closure
     108              :  * depending on the thread the function is called from.
     109              :  *
     110              :  * @param[in] channel  : the channel that the closure is used for
     111              :  *
     112              :  */
     113            8 : std::shared_ptr<firebolt::rialto::ipc::IBlockingClosure> IpcLoop::createBlockingClosure()
     114              : {
     115            8 :     if (!m_ipcChannel)
     116              :     {
     117            0 :         RIALTO_SERVER_MANAGER_LOG_ERROR("ipc channel not connected");
     118            0 :         return nullptr;
     119              :     }
     120              : 
     121              :     // check which thread we're being called from, this determines if we pump
     122              :     // event loop from within the wait() method or not
     123            8 :     auto factory = firebolt::rialto::ipc::IBlockingClosureFactory::createFactory();
     124            8 :     if (m_ipcThread.get_id() == std::this_thread::get_id())
     125            0 :         return factory->createBlockingClosurePoll(m_ipcChannel);
     126              :     else
     127            8 :         return factory->createBlockingClosureSemaphore();
     128              : }
     129              : 
     130              : /**
     131              :  * @brief Returns a RpcController object that can be used for sending
     132              :  * RPC requests.
     133              :  *
     134              :  * This is just a wrapper around the ::firebolt::rialto::ipc::ControllerFactory
     135              :  * object.
     136              :  *
     137              :  * @param[in] channel  : the channel that the closure is used for
     138              :  *
     139              :  */
     140            8 : std::shared_ptr<google::protobuf::RpcController> IpcLoop::createRpcController()
     141              : {
     142            8 :     return m_ipcControllerFactory->create();
     143              : }
     144              : 
     145              : } // namespace rialto::servermanager::ipc
        

Generated by: LCOV version 2.0-1