LCOV - code coverage report
Current view: top level - media/client/ipc/source - IpcClient.cpp (source / functions) Coverage Total Hit
Test: coverage.info Lines: 76.1 % 92 70
Test Date: 2026-05-08 12:58:51 Functions: 91.7 % 12 11

            Line data    Source code
       1              : /*
       2              :  * If not stated otherwise in this file or this component's LICENSE file the
       3              :  * following copyright and licenses apply:
       4              :  *
       5              :  * Copyright 2023 Sky UK
       6              :  *
       7              :  * Licensed under the Apache License, Version 2.0 (the "License");
       8              :  * you may not use this file except in compliance with the License.
       9              :  * You may obtain a copy of the License at
      10              :  *
      11              :  * http://www.apache.org/licenses/LICENSE-2.0
      12              :  *
      13              :  * Unless required by applicable law or agreed to in writing, software
      14              :  * distributed under the License is distributed on an "AS IS" BASIS,
      15              :  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      16              :  * See the License for the specific language governing permissions and
      17              :  * limitations under the License.
      18              :  */
      19              : 
      20              : #include "IpcClient.h"
      21              : #include "RialtoClientLogging.h"
      22              : #include <utility>
      23              : 
      24              : namespace firebolt::rialto::client
      25              : {
      26            3 : IIpcClientAccessor &IIpcClientAccessor::instance()
      27              : {
      28            3 :     static IpcClientAccessor factory;
      29            3 :     return factory;
      30              : }
      31              : 
      32            3 : IIpcClient &IpcClientAccessor::getIpcClient() const
      33              : {
      34           12 :     static IpcClient ipcClient{ipc::IChannelFactory::createFactory(), ipc::IControllerFactory::createFactory(),
      35           12 :                                ipc::IBlockingClosureFactory::createFactory()};
      36            0 :     return ipcClient;
      37              : }
      38              : 
      39            9 : IpcClient::IpcClient(const std::shared_ptr<ipc::IChannelFactory> &ipcChannelFactory,
      40              :                      const std::shared_ptr<ipc::IControllerFactory> &ipcControllerFactory,
      41            9 :                      const std::shared_ptr<ipc::IBlockingClosureFactory> &blockingClosureFactory)
      42            9 :     : m_ipcControllerFactory(ipcControllerFactory), m_ipcChannelFactory(ipcChannelFactory),
      43           18 :       m_blockingClosureFactory(blockingClosureFactory), m_disconnecting(false)
      44              : {
      45              :     // For now, always connect the client on construction
      46            9 :     if (!connect())
      47              :     {
      48            4 :         throw std::runtime_error("Could not connect client");
      49              :     }
      50           33 : }
      51              : 
      52           10 : IpcClient::~IpcClient()
      53              : {
      54            5 :     if (!disconnect())
      55              :     {
      56            0 :         RIALTO_CLIENT_LOG_WARN("Could not disconnect client");
      57              :     }
      58           10 : }
      59              : 
      60            9 : bool IpcClient::connect()
      61              : {
      62            9 :     if (m_ipcChannel)
      63              :     {
      64            0 :         RIALTO_CLIENT_LOG_INFO("Client already connected");
      65            0 :         return true;
      66              :     }
      67              : 
      68              :     // Verify that the version of the library that we linked against is
      69              :     // compatible with the version of the headers we compiled against.
      70            9 :     GOOGLE_PROTOBUF_VERIFY_VERSION;
      71              : 
      72              :     // check if either of following env vars are set to determine the location of the rialto socket
      73              :     //  - RIALTO_SOCKET_PATH should specify the absolute path to the socket to connect to
      74              :     //  - RIALTO_SOCKET_FD should specify the number of a file descriptor of the socket to connect to
      75            9 :     const char *kRialtoPath = getenv("RIALTO_SOCKET_PATH");
      76            9 :     const char *kRialtoFd = getenv("RIALTO_SOCKET_FD");
      77            9 :     if (kRialtoFd)
      78              :     {
      79            0 :         char *end = nullptr;
      80            0 :         int fd = strtol(kRialtoFd, &end, 10);
      81            0 :         if ((errno != 0) || (kRialtoFd == end) || (*end != '\0'))
      82              :         {
      83            0 :             RIALTO_CLIENT_LOG_SYS_ERROR(errno, "Invalid value set in RIALTO_SOCKET_FD env var");
      84            0 :             return false;
      85              :         }
      86              : 
      87            0 :         m_ipcChannel = m_ipcChannelFactory->createChannel(fd);
      88              :     }
      89            9 :     else if (kRialtoPath)
      90              :     {
      91           27 :         m_ipcChannel = m_ipcChannelFactory->createChannel(kRialtoPath);
      92              :     }
      93              :     else
      94              :     {
      95            0 :         RIALTO_CLIENT_LOG_ERROR("No rialto socket specified");
      96            0 :         return false;
      97              :     }
      98              : 
      99              :     // check if the channel was opened
     100            9 :     if (!m_ipcChannel)
     101              :     {
     102            4 :         RIALTO_CLIENT_LOG_ERROR("Failed to open a connection to the ipc socket");
     103            4 :         return false;
     104              :     }
     105              : 
     106              :     // spin up the thread that runs the IPC event loop
     107            5 :     m_ipcThread = std::thread(&IpcClient::processIpcThread, this);
     108            5 :     if (!m_ipcThread.joinable())
     109              :     {
     110            0 :         RIALTO_CLIENT_LOG_ERROR("Failed to create thread for IPC");
     111            0 :         return false;
     112              :     }
     113              : 
     114            5 :     return true;
     115              : }
     116              : 
     117            5 : bool IpcClient::disconnect()
     118              : {
     119              :     // Increase reference in case client disconnects from another thread
     120            5 :     std::shared_ptr<ipc::IChannel> ipcChannel = m_ipcChannel;
     121            5 :     if (!ipcChannel)
     122              :     {
     123              :         // The ipc channel may have disconnected unexpectedly, join the ipc thread if possible
     124            2 :         if (m_ipcThread.joinable())
     125            2 :             m_ipcThread.join();
     126              : 
     127            2 :         RIALTO_CLIENT_LOG_INFO("Client already disconnect");
     128            2 :         return true;
     129              :     }
     130              : 
     131            3 :     RIALTO_CLIENT_LOG_INFO("closing IPC channel");
     132            3 :     m_disconnecting = true;
     133              : 
     134              :     // disconnect from the server, this should terminate the thread so join that too
     135            3 :     ipcChannel->disconnect();
     136              : 
     137            3 :     if (m_ipcThread.joinable())
     138            3 :         m_ipcThread.join();
     139              : 
     140              :     // destroy the IPC channel
     141            3 :     m_ipcChannel.reset();
     142              : 
     143            3 :     m_disconnecting = false;
     144              : 
     145            3 :     return true;
     146            5 : }
     147              : 
     148            5 : void IpcClient::processIpcThread()
     149              : {
     150            5 :     pthread_setname_np(pthread_self(), "rialto-ipc");
     151              : 
     152            5 :     RIALTO_CLIENT_LOG_INFO("started ipc thread");
     153              : 
     154            8 :     while (m_ipcChannel->process())
     155              :     {
     156            3 :         m_ipcChannel->wait(-1);
     157              :     }
     158              : 
     159            5 :     if (!m_disconnecting)
     160              :     {
     161            2 :         RIALTO_CLIENT_LOG_ERROR("The ipc channel unexpectedly disconnected, destroying the channel");
     162              : 
     163              :         // Safe to destroy the ipc objects in the ipc thread as the client has already disconnected.
     164              :         // This ensures the channel is destructed and that all ongoing ipc calls are unblocked.
     165            2 :         m_ipcChannel.reset();
     166              : 
     167            2 :         auto connectionObserver{m_connectionObserver.lock()};
     168            2 :         if (connectionObserver)
     169              :         {
     170            1 :             connectionObserver->onConnectionBroken();
     171              :         }
     172            2 :     }
     173              : 
     174            5 :     RIALTO_CLIENT_LOG_INFO("exiting ipc thread");
     175              : }
     176              : 
     177            3 : std::weak_ptr<::firebolt::rialto::ipc::IChannel> IpcClient::getChannel() const
     178              : {
     179            3 :     return m_ipcChannel;
     180              : }
     181              : 
     182            1 : std::shared_ptr<ipc::IBlockingClosure> IpcClient::createBlockingClosure()
     183              : {
     184              :     // Increase reference in case client disconnects from another thread
     185            1 :     std::shared_ptr<ipc::IChannel> ipcChannel = m_ipcChannel;
     186            1 :     if (!ipcChannel)
     187              :     {
     188            0 :         RIALTO_CLIENT_LOG_ERROR("ipc channel not connected");
     189            0 :         return nullptr;
     190              :     }
     191              : 
     192              :     // check which thread we're being called from, this determines if we pump
     193              :     // event loop from within the wait() method or not
     194            1 :     if (m_ipcThread.get_id() == std::this_thread::get_id())
     195            0 :         return m_blockingClosureFactory->createBlockingClosurePoll(std::move(ipcChannel));
     196              :     else
     197            1 :         return m_blockingClosureFactory->createBlockingClosureSemaphore();
     198              : }
     199              : 
     200            1 : std::shared_ptr<google::protobuf::RpcController> IpcClient::createRpcController()
     201              : {
     202            1 :     return m_ipcControllerFactory->create();
     203              : }
     204              : 
     205            0 : bool IpcClient::reconnect()
     206              : {
     207            0 :     RIALTO_CLIENT_LOG_INFO("Trying to reconnect channel");
     208            0 :     if (disconnect())
     209              :     {
     210            0 :         return connect();
     211              :     }
     212            0 :     return false;
     213              : }
     214              : 
     215            1 : void IpcClient::registerConnectionObserver(const std::weak_ptr<IConnectionObserver> &observer)
     216              : {
     217            1 :     m_connectionObserver = observer;
     218              : }
     219              : 
     220              : }; // namespace firebolt::rialto::client
        

Generated by: LCOV version 2.0-1