Line data Source code
1 : /*
2 : * If not stated otherwise in this file or this component's LICENSE file the
3 : * following copyright and licenses apply:
4 : *
5 : * Copyright 2022 Sky UK
6 : *
7 : * Licensed under the Apache License, Version 2.0 (the "License");
8 : * you may not use this file except in compliance with the License.
9 : * You may obtain a copy of the License at
10 : *
11 : * http://www.apache.org/licenses/LICENSE-2.0
12 : *
13 : * Unless required by applicable law or agreed to in writing, software
14 : * distributed under the License is distributed on an "AS IS" BASIS,
15 : * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 : * See the License for the specific language governing permissions and
17 : * limitations under the License.
18 : */
19 :
20 : #include "EventThread.h"
21 : #include "RialtoCommonLogging.h"
22 :
23 : #include <pthread.h>
24 : #include <semaphore.h>
25 : #include <unistd.h>
26 :
27 : namespace firebolt::rialto::common
28 : {
29 5 : std::shared_ptr<IEventThreadFactory> IEventThreadFactory::createFactory()
30 : {
31 5 : std::shared_ptr<IEventThreadFactory> factory;
32 :
33 : try
34 : {
35 5 : factory = std::make_shared<EventThreadFactory>();
36 : }
37 0 : catch (const std::exception &e)
38 : {
39 0 : RIALTO_COMMON_LOG_ERROR("Failed to create the event thread factory, reason: %s", e.what());
40 : }
41 :
42 5 : return factory;
43 : }
44 :
45 5 : std::unique_ptr<IEventThread> EventThreadFactory::createEventThread(std::string threadName) const
46 : {
47 5 : return std::make_unique<EventThread>(threadName);
48 : }
49 :
50 5 : EventThread::EventThread(std::string threadName) : m_kThreadName(std::move(threadName)), m_shutdown(false)
51 : {
52 5 : m_thread = std::thread(&EventThread::threadExecutor, this);
53 : }
54 :
55 15 : EventThread::~EventThread()
56 : {
57 5 : std::unique_lock<std::mutex> locker(m_lock);
58 :
59 5 : m_shutdown = true;
60 :
61 5 : m_cond.notify_all();
62 :
63 5 : locker.unlock();
64 :
65 5 : if (m_thread.joinable())
66 5 : m_thread.join();
67 10 : }
68 :
69 5 : void EventThread::threadExecutor()
70 : {
71 5 : if (!m_kThreadName.empty())
72 : {
73 5 : pthread_setname_np(pthread_self(), m_kThreadName.c_str());
74 : }
75 :
76 5 : std::unique_lock<std::mutex> locker(m_lock);
77 :
78 : while (true)
79 : {
80 23 : while (!m_shutdown && m_funcs.empty())
81 5 : m_cond.wait(locker);
82 :
83 18 : if (m_shutdown)
84 5 : break;
85 :
86 13 : std::function<void()> func = std::move(m_funcs.front());
87 13 : m_funcs.pop_front();
88 :
89 13 : m_lock.unlock();
90 :
91 13 : if (func)
92 13 : func();
93 :
94 13 : m_lock.lock();
95 : }
96 5 : }
97 :
98 1 : void EventThread::flush()
99 : {
100 : sem_t semaphore;
101 1 : sem_init(&semaphore, 0, 0);
102 :
103 : // add a simple function to release the semaphore in the context of the event thread
104 1 : addImpl(
105 1 : [sem = &semaphore]()
106 : {
107 1 : if (sem_post(sem) != 0)
108 0 : RIALTO_COMMON_LOG_SYS_ERROR(errno, "failed to signal semaphore");
109 1 : });
110 :
111 : // wait for the above call to unblock the semaphore
112 1 : TEMP_FAILURE_RETRY(sem_wait(&semaphore));
113 : }
114 :
115 13 : void EventThread::addImpl(std::function<void()> &&func)
116 : {
117 13 : std::lock_guard<std::mutex> locker(m_lock);
118 13 : m_funcs.emplace_back(std::move(func));
119 13 : m_cond.notify_all();
120 : }
121 :
122 : }; // namespace firebolt::rialto::common
|