LCOV - code coverage report
Current view: top level - media/client/ipc/source - IpcClient.cpp (source / functions) Coverage Total Hit
Test: coverage.info Lines: 76.1 % 92 70
Test Date: 2025-03-21 11:02:39 Functions: 91.7 % 12 11

            Line data    Source code
       1              : /*
       2              :  * If not stated otherwise in this file or this component's LICENSE file the
       3              :  * following copyright and licenses apply:
       4              :  *
       5              :  * Copyright 2023 Sky UK
       6              :  *
       7              :  * Licensed under the Apache License, Version 2.0 (the "License");
       8              :  * you may not use this file except in compliance with the License.
       9              :  * You may obtain a copy of the License at
      10              :  *
      11              :  * http://www.apache.org/licenses/LICENSE-2.0
      12              :  *
      13              :  * Unless required by applicable law or agreed to in writing, software
      14              :  * distributed under the License is distributed on an "AS IS" BASIS,
      15              :  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      16              :  * See the License for the specific language governing permissions and
      17              :  * limitations under the License.
      18              :  */
      19              : 
      20              : #include "IpcClient.h"
      21              : #include "RialtoClientLogging.h"
      22              : 
      23              : namespace firebolt::rialto::client
      24              : {
      25            3 : IIpcClientAccessor &IIpcClientAccessor::instance()
      26              : {
      27            3 :     static IpcClientAccessor factory;
      28            3 :     return factory;
      29              : }
      30              : 
      31            3 : IIpcClient &IpcClientAccessor::getIpcClient() const
      32              : {
      33           12 :     static IpcClient ipcClient{ipc::IChannelFactory::createFactory(), ipc::IControllerFactory::createFactory(),
      34           12 :                                ipc::IBlockingClosureFactory::createFactory()};
      35            0 :     return ipcClient;
      36              : }
      37              : 
      38            9 : IpcClient::IpcClient(const std::shared_ptr<ipc::IChannelFactory> &ipcChannelFactory,
      39              :                      const std::shared_ptr<ipc::IControllerFactory> &ipcControllerFactory,
      40            9 :                      const std::shared_ptr<ipc::IBlockingClosureFactory> &blockingClosureFactory)
      41            9 :     : m_ipcControllerFactory(ipcControllerFactory), m_ipcChannelFactory(ipcChannelFactory),
      42           18 :       m_blockingClosureFactory(blockingClosureFactory), m_disconnecting(false)
      43              : {
      44              :     // For now, always connect the client on construction
      45            9 :     if (!connect())
      46              :     {
      47            4 :         throw std::runtime_error("Could not connect client");
      48              :     }
      49           33 : }
      50              : 
      51           10 : IpcClient::~IpcClient()
      52              : {
      53            5 :     if (!disconnect())
      54              :     {
      55            0 :         RIALTO_CLIENT_LOG_WARN("Could not disconnect client");
      56              :     }
      57           10 : }
      58              : 
      59            9 : bool IpcClient::connect()
      60              : {
      61            9 :     if (m_ipcChannel)
      62              :     {
      63            0 :         RIALTO_CLIENT_LOG_INFO("Client already connected");
      64            0 :         return true;
      65              :     }
      66              : 
      67              :     // Verify that the version of the library that we linked against is
      68              :     // compatible with the version of the headers we compiled against.
      69            9 :     GOOGLE_PROTOBUF_VERIFY_VERSION;
      70              : 
      71              :     // check if either of following env vars are set to determine the location of the rialto socket
      72              :     //  - RIALTO_SOCKET_PATH should specify the absolute path to the socket to connect to
      73              :     //  - RIALTO_SOCKET_FD should specify the number of a file descriptor of the socket to connect to
      74            9 :     const char *kRialtoPath = getenv("RIALTO_SOCKET_PATH");
      75            9 :     const char *kRialtoFd = getenv("RIALTO_SOCKET_FD");
      76            9 :     if (kRialtoFd)
      77              :     {
      78            0 :         char *end = nullptr;
      79            0 :         int fd = strtol(kRialtoFd, &end, 10);
      80            0 :         if ((errno != 0) || (kRialtoFd == end) || (*end != '\0'))
      81              :         {
      82            0 :             RIALTO_CLIENT_LOG_SYS_ERROR(errno, "Invalid value set in RIALTO_SOCKET_FD env var");
      83            0 :             return false;
      84              :         }
      85              : 
      86            0 :         m_ipcChannel = m_ipcChannelFactory->createChannel(fd);
      87              :     }
      88            9 :     else if (kRialtoPath)
      89              :     {
      90           27 :         m_ipcChannel = m_ipcChannelFactory->createChannel(kRialtoPath);
      91              :     }
      92              :     else
      93              :     {
      94            0 :         RIALTO_CLIENT_LOG_ERROR("No rialto socket specified");
      95            0 :         return false;
      96              :     }
      97              : 
      98              :     // check if the channel was opened
      99            9 :     if (!m_ipcChannel)
     100              :     {
     101            4 :         RIALTO_CLIENT_LOG_ERROR("Failed to open a connection to the ipc socket");
     102            4 :         return false;
     103              :     }
     104              : 
     105              :     // spin up the thread that runs the IPC event loop
     106            5 :     m_ipcThread = std::thread(&IpcClient::processIpcThread, this);
     107            5 :     if (!m_ipcThread.joinable())
     108              :     {
     109            0 :         RIALTO_CLIENT_LOG_ERROR("Failed to create thread for IPC");
     110            0 :         return false;
     111              :     }
     112              : 
     113            5 :     return true;
     114              : }
     115              : 
     116            5 : bool IpcClient::disconnect()
     117              : {
     118              :     // Increase reference in case client disconnects from another thread
     119            5 :     std::shared_ptr<ipc::IChannel> ipcChannel = m_ipcChannel;
     120            5 :     if (!ipcChannel)
     121              :     {
     122              :         // The ipc channel may have disconnected unexpectedly, join the ipc thread if possible
     123            2 :         if (m_ipcThread.joinable())
     124            2 :             m_ipcThread.join();
     125              : 
     126            2 :         RIALTO_CLIENT_LOG_INFO("Client already disconnect");
     127            2 :         return true;
     128              :     }
     129              : 
     130            3 :     RIALTO_CLIENT_LOG_INFO("closing IPC channel");
     131            3 :     m_disconnecting = true;
     132              : 
     133              :     // disconnect from the server, this should terminate the thread so join that too
     134            3 :     ipcChannel->disconnect();
     135              : 
     136            3 :     if (m_ipcThread.joinable())
     137            3 :         m_ipcThread.join();
     138              : 
     139              :     // destroy the IPC channel
     140            3 :     m_ipcChannel.reset();
     141              : 
     142            3 :     m_disconnecting = false;
     143              : 
     144            3 :     return true;
     145            5 : }
     146              : 
     147            5 : void IpcClient::processIpcThread()
     148              : {
     149            5 :     pthread_setname_np(pthread_self(), "rialto-ipc");
     150              : 
     151            5 :     RIALTO_CLIENT_LOG_INFO("started ipc thread");
     152              : 
     153            8 :     while (m_ipcChannel->process())
     154              :     {
     155            3 :         m_ipcChannel->wait(-1);
     156              :     }
     157              : 
     158            5 :     if (!m_disconnecting)
     159              :     {
     160            2 :         RIALTO_CLIENT_LOG_ERROR("The ipc channel unexpectedly disconnected, destroying the channel");
     161              : 
     162              :         // Safe to destroy the ipc objects in the ipc thread as the client has already disconnected.
     163              :         // This ensures the channel is destructed and that all ongoing ipc calls are unblocked.
     164            2 :         m_ipcChannel.reset();
     165              : 
     166            2 :         auto connectionObserver{m_connectionObserver.lock()};
     167            2 :         if (connectionObserver)
     168              :         {
     169            1 :             connectionObserver->onConnectionBroken();
     170              :         }
     171            2 :     }
     172              : 
     173            5 :     RIALTO_CLIENT_LOG_INFO("exiting ipc thread");
     174              : }
     175              : 
     176            3 : std::weak_ptr<::firebolt::rialto::ipc::IChannel> IpcClient::getChannel() const
     177              : {
     178            3 :     return m_ipcChannel;
     179              : }
     180              : 
     181            1 : std::shared_ptr<ipc::IBlockingClosure> IpcClient::createBlockingClosure()
     182              : {
     183              :     // Increase reference in case client disconnects from another thread
     184            1 :     std::shared_ptr<ipc::IChannel> ipcChannel = m_ipcChannel;
     185            1 :     if (!ipcChannel)
     186              :     {
     187            0 :         RIALTO_CLIENT_LOG_ERROR("ipc channel not connected");
     188            0 :         return nullptr;
     189              :     }
     190              : 
     191              :     // check which thread we're being called from, this determines if we pump
     192              :     // event loop from within the wait() method or not
     193            1 :     if (m_ipcThread.get_id() == std::this_thread::get_id())
     194            0 :         return m_blockingClosureFactory->createBlockingClosurePoll(ipcChannel);
     195              :     else
     196            1 :         return m_blockingClosureFactory->createBlockingClosureSemaphore();
     197              : }
     198              : 
     199            1 : std::shared_ptr<google::protobuf::RpcController> IpcClient::createRpcController()
     200              : {
     201            1 :     return m_ipcControllerFactory->create();
     202              : }
     203              : 
     204            0 : bool IpcClient::reconnect()
     205              : {
     206            0 :     RIALTO_CLIENT_LOG_INFO("Trying to reconnect channel");
     207            0 :     if (disconnect())
     208              :     {
     209            0 :         return connect();
     210              :     }
     211            0 :     return false;
     212              : }
     213              : 
     214            1 : void IpcClient::registerConnectionObserver(const std::weak_ptr<IConnectionObserver> &observer)
     215              : {
     216            1 :     m_connectionObserver = observer;
     217              : }
     218              : 
     219              : }; // namespace firebolt::rialto::client
        

Generated by: LCOV version 2.0-1