Dobby 3.0
Dobby “Docker based Thingy” is a tool for managing and running OCI containers using crun
Loading...
Searching...
No Matches
Notifier.h
1/*
2* If not stated otherwise in this file or this component's LICENSE file the
3* following copyright and licenses apply:
4*
5* Copyright 2014 Sky UK
6*
7* Licensed under the Apache License, Version 2.0 (the "License");
8* you may not use this file except in compliance with the License.
9* You may obtain a copy of the License at
10*
11* http://www.apache.org/licenses/LICENSE-2.0
12*
13* Unless required by applicable law or agreed to in writing, software
14* distributed under the License is distributed on an "AS IS" BASIS,
15* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16* See the License for the specific language governing permissions and
17* limitations under the License.
18*/
19/*
20 * File: Notifier.h
21 * Author: jarek.dziedzic@bskyb.com
22 *
23 * Created on 26 June 2014
24 *
25 */
26#ifndef NOTIFIER_H
27#define NOTIFIER_H
28
29#include "Polymorphic.h"
30#include "IDispatcher.h"
31#include <memory>
32#include <deque>
33#include <functional>
34#include <mutex>
35#include <algorithm>
36#include <utility>
37#include <vector>
38#include <thread>
39#include <condition_variable>
40#include <stdexcept>
41
42namespace AICommon
43{
44
50template<typename T>
51class Notifier : virtual public Polymorphic
52{
53public:
54
55 Notifier() : notifyingObservers(false), waiteeCount(0) {}
56
60 void addObserver(const std::shared_ptr<T>& observer)
61 {
62 std::lock_guard<std::mutex> lock(m);
63 observers.push_back(observer);
64 }
65
69 void removeObserver(const std::shared_ptr<T>& observer)
70 {
71 std::unique_lock<std::mutex> lock(m);
72
73#if (AI_BUILD_TYPE == AI_DEBUG)
74 if (dispatcher && dispatcher->invokedFromDispatcherThread())
75 {
76 throw std::logic_error("AI notifier: potential deadlock as this method should not be called from the dispatcher call");
77 }
78#endif
79 //find matching observers
80 for(size_t i = 0; i < observers.size(); ++i)
81 {
82 if(observers[i].lock() == observer)
83 {
84 observers.erase(observers.begin() + i);
85 //if addObserver was called 3 times for one object, you need
86 //to call remove 3 times too.
87 break;
88 }
89 }
90
91 if (notifyingObservers)
92 {
93 waiteeCount++;
94
95 do {
96 cv.wait(lock);
97 } while(notifyingObservers);
98
99 waiteeCount--;
100 }
101 }
102
106 void setDispatcher(const std::shared_ptr<IDispatcher>& dispatcher_)
107 {
108 std::lock_guard<std::mutex> lock(m);
109 dispatcher = dispatcher_;
110 }
111
112protected:
113
114 template<typename F, typename... Args>
115 void notify(F f, Args&&... args)
116 {
117 notify_impl(std::bind(f, std::placeholders::_1, std::forward<Args>(args)...));
118 }
119
120 template<typename F>
121 void notify(F f)
122 {
123 notify_impl(f);
124 }
125
126private:
127
128 void notify_impl(std::function<void (const std::shared_ptr<T>&)> fun)
129 {
130 std::unique_lock<std::mutex> lock(m);
131
132 if(!dispatcher)
133 {
134 throw std::logic_error("You must set a dispatcher before you can produce events.");
135 }
136
137 //don't want to lock adding new observers while callbacks are executed - will make a copy instead.
138 decltype(observers) observersCopy;
139 //scrub them for expired listeners. Only copy if (bool)use_count == true. Happens when use_count is positive.
140 //(wishing for C++ lambdas here)
141 using namespace std;
142 using namespace std::placeholders;
143 copy_if(observers.begin(), observers.end(), back_inserter(observersCopy), bind(&weak_ptr<T>::use_count, _1));
144 //in the unlikely event that there were some expired observers, replace
145 //the original observers with the updated copy.
146 if(observers.size() != observersCopy.size())
147 {
148 observers = observersCopy;
149 }
150
151 notifyingObservers = true;
152
153 lock.unlock();
154
155 //--------------------------------------------------------------------------------------------
156 //---------------------------------------- NOTE ----------------------------------------------
157 //We maintain vector of strong pointers pointing to observer objects as otherwise bad things
158 //can happen. Lets consider, the observer object point backs to the notifier object itself.
159 //That means, there is a circular dependency between the notifier and the observer, but we
160 //break that by using a combination of shared and weak pointers. However, imagine, within the
161 //notify_impl() method, we gets a shared pointer of observer object out of weak_ptr. After the
162 //shared pointer is constructed (bit still in use), now the owner of the observer resets its
163 //pointer that is pointing to the observer object. This might result one to one references
164 //between the notifier and the observer, i.e., as soon as the observer will be destroyed the
165 //notifier will also be destroyed. It means, if now the observer object is destroyed from
166 //the notify_imp() method, it will cause the notifier object itself to be destroyed, where
167 //the notify_impl can still continue to access its member variable (e.g. dispatcher). This
168 //might result an undefined behaviour.
169 //--------------------------------------------------------------------------------------------
170 //--------------------------------------------------------------------------------------------
171 std::vector<std::shared_ptr<T>> observerStrongPtrs;
172
173 for(auto o = observersCopy.cbegin(); o != observersCopy.cend(); ++o)
174 {
175 std::shared_ptr<T> strong = o->lock();
176 if(strong)
177 {
178 dispatcher->post(std::bind(fun, strong));
179 }
180 observerStrongPtrs.push_back(strong);
181 }
182
183 lock.lock();
184 if (dispatcher && (waiteeCount > 0))
185 {
186 // We are unregistering an observer so make sure we will not notify unregistered observers
187 lock.unlock();
188 dispatcher->sync();
189 lock.lock();
190 }
191
192 notifyingObservers = false;
193
194 if (waiteeCount > 0)
195 {
196 cv.notify_all();
197 }
198 lock.unlock();
199 }
200
201protected:
202 std::shared_ptr<IDispatcher> dispatcher;
203
204private:
205 std::mutex m;
206 std::deque<std::weak_ptr<T>> observers;
207
208 std::condition_variable cv;
209 bool notifyingObservers;
210 unsigned int waiteeCount;
211};
212
213} //AICommon
214
215#endif /* NOTIFIER_H */
216
A template of observable objects that send notifications defined in interface T.
Definition Notifier.h:52
void setDispatcher(const std::shared_ptr< IDispatcher > &dispatcher_)
Definition Notifier.h:106
void removeObserver(const std::shared_ptr< T > &observer)
Unregister from updates.
Definition Notifier.h:69
void addObserver(const std::shared_ptr< T > &observer)
Register interest in receiving updates.
Definition Notifier.h:60
Inherit from this from all types that have virtual functions.
Definition Polymorphic.h:39