RemoteReferenceCount.cpp
Go to the documentation of this file.
1/*
2 * This file is part of ArmarX.
3 *
4 * Copyright (C) 2011-2017, High Performance Humanoid Technologies (H2T), Karlsruhe Institute of Technology (KIT), all rights reserved.
5 *
6 * ArmarX is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 as
8 * published by the Free Software Foundation.
9 *
10 * ArmarX is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 *
18 * @package ArmarX
19 * @author Raphael Grimm( raphael dor grimm at kit dot edu)
20 * @date 2017
21 * @copyright http://www.gnu.org/licenses/gpl-2.0.txt
22 * GNU General Public License
23 */
25
26#include <chrono>
27#include <queue>
28#include <thread>
29
30#include <Ice/Initialize.h> // for stringToIdentity
31#include <Ice/ObjectAdapter.h>
32#include <IceUtil/UUID.h>
33
34#include "ArmarXManager.h"
37
38namespace armarx
39{
40
43
44 class HeartbeatMonitor
45 {
46 public:
47 static void
49 {
50 Instance().add(ptr);
51 }
52
53 static void
55 {
56 Instance().remove(ptr);
57 }
58
59 private:
60 void add(RemoteReferenceCounter* ptr);
61 void remove(RemoteReferenceCounter* ptr);
62 static HeartbeatMonitor& Instance();
64
65 void heartbeatTask();
66
67 std::mutex mtx;
68 std::thread t;
69 std::atomic_bool shutdown{false};
70 HeartbeatMonitor() = default;
71 using PqEntry = std::pair<IceUtil::Time, RemoteReferenceCounter*>;
72
73 struct PqEntryCompare
74 {
75 bool
76 operator()(const PqEntry& lhs, const PqEntry& rhs) const
77 {
78 return lhs.first > rhs.first;
79 }
80 };
81
82 std::priority_queue<PqEntry, std::vector<PqEntry>, PqEntryCompare> entries;
83 std::set<void*> entriesToDelete;
84 static const IceUtil::Time maximalSleepTime;
85 };
86
87 const IceUtil::Time HeartbeatMonitor::maximalSleepTime = IceUtil::Time::milliSeconds(100);
88
89 class RemoteReferenceCounter : virtual public RemoteReferenceCounterBase
90 {
91 public:
93
94 RemoteReferenceCounter(const RemoteReferenceCountControlBlockInterfacePrx& prx,
95 const std::string& id,
96 Ice::Long heartBeatMs) :
97 RemoteReferenceCounterBase(prx, id, heartBeatMs)
98 {
100 }
101
103 RemoteReferenceCounter(other.block, other.id, other.heartBeatMs)
104 {
105 }
106
108 {
109 if (block)
110 {
111 block->removeCounter(counterId);
112 }
114 }
115
116 RemoteReferenceCounterBasePtr
117 copy(const Ice::Current& = Ice::emptyCurrent) const final override
118 {
119 return new RemoteReferenceCounter{*this};
120 }
121
122 std::string
123 getId(const Ice::Current& = Ice::emptyCurrent) const final override
124 {
125 return id;
126 }
127
128 void
130 {
131 block->addCounter(counterId);
133 }
134
135 IceUtil::Time
136 heartbeat(IceUtil::Time now)
137 {
138 block->heartbeat(counterId);
139 return now + IceUtil::Time::milliSeconds(heartBeatMs);
140 }
141
142 private:
143 const std::string counterId{IceUtil::generateUUID()};
144 };
145
146 class SimpleRemoteReferenceCounter : virtual public SimpleRemoteReferenceCounterBase
147 {
148 public:
150
151 SimpleRemoteReferenceCounter(const SimpleRemoteReferenceCountControlBlockInterfacePrx& prx,
152 const std::string& id) :
153 SimpleRemoteReferenceCounterBase(prx, id)
154 {
156 }
157
159 SimpleRemoteReferenceCounter(other.block, other.id)
160 {
161 }
162
164 {
165 if (block)
166 {
167 block->removeCounter(counterId);
168 }
169 }
170
171 SimpleRemoteReferenceCounterBasePtr
172 copy(const Ice::Current& = Ice::emptyCurrent) const final override
173 {
174 return new SimpleRemoteReferenceCounter{*this};
175 }
176
177 std::string
178 getId(const Ice::Current& = Ice::emptyCurrent) const final override
179 {
180 return id;
181 }
182
183 void
185 {
186 block->addCounter(counterId);
187 }
188
189 private:
190 const std::string counterId{IceUtil::generateUUID()};
191 };
192
194 HeartbeatMonitor::Instance()
195 {
196 static HeartbeatMonitor monitor;
197 return monitor;
198 }
199
200 void
201 HeartbeatMonitor::add(RemoteReferenceCounter* ptr)
202 {
203 std::lock_guard<std::mutex> guard{mtx};
204
205 if (!t.joinable())
206 {
207 t = std::thread{[this] { heartbeatTask(); }};
208 }
209 entries.emplace(ptr->heartbeat(IceUtil::Time::now()), ptr);
210 }
211
212 void
213 HeartbeatMonitor::remove(RemoteReferenceCounter* ptr)
214 {
215 std::lock_guard<std::mutex> guard{mtx};
216 entriesToDelete.emplace(ptr);
217 }
218
219 HeartbeatMonitor::~HeartbeatMonitor()
220 {
221 std::lock_guard<std::mutex> guard{mtx};
222 shutdown = true;
223 if (t.joinable())
224 {
225 t.join();
226 }
227 }
228
229 void
230 HeartbeatMonitor::heartbeatTask()
231 {
232 while (!shutdown)
233 {
234 IceUtil::Time now;
235 {
236 // this stops all dtors of RemoteReferenceCounter
237 // -> if the ptr is not already in entriesToDelete,
238 // the object can't be deleted
239 std::lock_guard<std::mutex> guard{mtx};
240 now = IceUtil::Time::now();
241 while (!shutdown && !entries.empty() && entries.top().first <= now)
242 {
243 RemoteReferenceCounter* ptr;
244 std::tie(std::ignore, ptr) = entries.top();
245 entries.pop();
246 if (entriesToDelete.count(ptr))
247 {
248 continue;
249 }
250 try
251 {
252 // the remote may be dead!
253 entries.emplace(ptr->heartbeat(now), ptr);
254 }
255 catch (...)
256 {
257 }
258 }
259 }
260 IceUtil::Time msToSleep = maximalSleepTime;
261 if (!entries.empty())
262 {
263 msToSleep = std::min(maximalSleepTime, entries.top().first - now);
264 }
265 std::this_thread::sleep_for(std::chrono::milliseconds{msToSleep.toMilliSeconds()});
266 }
267 }
268
269 void
271 {
272 if (!shutdown.exchange(true))
273 {
274 if (thread.joinable())
275 {
276 thread.join();
277 }
278 }
279 }
280
281 void
282 RemoteReferenceCountControlBlockManager::manage()
283 {
284 while (!shutdown)
285 {
286 IceUtil::Time now;
287 {
288 std::lock_guard<std::mutex> guard{stateMutex};
289 now = IceUtil::Time::now();
290 //sweep pendingForActivation
291 {
292 auto newPendingForActivation = pendingForActivation;
293 for (const auto& pending : pendingForActivation)
294 {
295 if (pending->isCountingActivated())
296 {
297 rrccbs.emplace(pending->nextCheckTimePoint(), pending);
298 newPendingForActivation.erase(pending);
299 }
300 }
301 pendingForActivation = std::move(newPendingForActivation);
302 }
303 //sweep rrccbs
304 while (!shutdown && !rrccbs.empty() && rrccbs.top().first <= now)
305 {
307 std::tie(std::ignore, ptr) = rrccbs.top();
308 rrccbs.pop();
309 const auto newT = ptr->nextCheckTimePoint();
310 if (newT < now)
311 {
312 try
313 {
314 ptr->countReachedZero();
315 }
316 catch (...)
317 {
318 ARMARX_WARNING << " function countReachedZero threw an exception";
319 }
320 continue;
321 }
322 rrccbs.emplace(newT, std::move(ptr));
323 }
324 }
325 std::this_thread::sleep_for(std::chrono::milliseconds{period.toMilliSeconds()});
326 }
327 //exiting -> call all reach 0 + warn
328 ARMARX_CHECK_EXPRESSION(shutdown);
329 std::lock_guard<std::mutex> guard{stateMutex};
330 auto processRemovedEntry =
332 {
334 ARMARX_CHECK_EXPRESSION(ptr->getProxy());
335 ARMARX_WARNING << "REMOVING RemoteReferenceCount on shutdown!:\n"
336 << " counting activated : " << ptr->isCountingActivated() << "\n"
337 << " identity (cat/name): "
338 << ptr->getProxy()->ice_getIdentity().category << " / "
339 << ptr->getProxy()->ice_getIdentity().name << "\n"
340 << " type id : " << ptr->getProxy()->ice_id();
341 try
342 {
343 ptr->countReachedZero();
344 }
345 catch (...)
346 {
347 }
348 };
349
350 while (!rrccbs.empty())
351 {
353 std::tie(std::ignore, ptr) = rrccbs.top();
354 rrccbs.pop();
355 processRemovedEntry(ptr);
356 }
357 for (const auto& ptr : pendingForActivation)
358 {
359 processRemovedEntry(ptr);
360 }
361 pendingForActivation.clear();
362 }
363
364 void
367 {
369 ARMARX_CHECK_EXPRESSION(ptr->getProxy());
370 std::lock_guard<std::mutex> guard{stateMutex};
371 ARMARX_CHECK_EXPRESSION(!shutdown);
372 if (!thread.joinable())
373 {
374 thread = std::thread{[this] { manage(); }};
375 }
376 pendingForActivation.emplace(ptr);
377 }
378} // namespace armarx
379
380namespace armarx::detail
381{
382 //RemoteReferenceCountControlBlockManagementInterface
385 const std::string& id) :
386 armarXManager{manager}, id{id}
387 {
389 const auto iceId = Ice::stringToIdentity("RRCB_" + IceUtil::generateUUID());
390 __setNoDelete(true);
391 selfProxy = armarXManager->getAdapter()->add(this, iceId);
392 __setNoDelete(false);
393 }
394
397 {
398 if (!armarXManager)
399 {
400 ARMARX_FATAL << "armarXManager NULL\n(line " << __LINE__ << " in " << __FUNCTION__
401 << " in " << __FILE__ ")";
402 std::
403 terminate(); //throwing an exception calls terminate anyways (the dtor is noexcept) -> call terminate directly";
404 }
405 if (!selfProxy)
406 {
407 ARMARX_FATAL << "selfProxy NULL\n(line " << __LINE__ << " in " << __FUNCTION__ << " in "
408 << __FILE__ ")";
409 std::
410 terminate(); //throwing an exception calls terminate anyways (the dtor is noexcept) -> call terminate directly";
411 }
412 try
413 {
414 //removing may fail if the adapter was deactivated in a different thread
415 armarXManager->getAdapter()->remove(selfProxy->ice_getIdentity());
416 }
417 catch (Ice::ObjectAdapterDeactivatedException&)
418 {
419 }
420 }
421
422 void
429
430 //AbstractRemoteReferenceCountControlBlock
431 void
433 const Ice::Current&)
434 {
435 std::lock_guard<std::mutex> guard{mtx};
436 if (counterIds.count(counterId))
437 {
438 ARMARX_WARNING << "an already existing counter id was added! " << VAROUT(counterId);
439 }
440 else
441 {
442 ARMARX_DEBUG << "adding counter id '" << counterId << "'. "
443 << "New number of counters: " << counterIds.size() + 1;
444 }
445 counterIds[counterId] = IceUtil::Time::now();
446 }
447
448 void
450 const Ice::Current&)
451 {
452 std::lock_guard<std::mutex> guard{mtx};
453 if (!counterIds.count(counterId))
454 {
455 ARMARX_WARNING << "heartbeat of nonexistent counter '" << counterId << "'. "
456 << "The counter was added.";
457 }
458 counterIds[counterId] = IceUtil::Time::now();
459 }
460
461 void
463 const Ice::Current&)
464 {
465 std::lock_guard<std::mutex> guard{mtx};
466 if (counterIds.count(counterId))
467 {
468 counterIds.erase(counterId);
469 ARMARX_DEBUG << "deleteing counter id '" << counterId << "'. "
470 << "Remaining number of counters: " << counterIds.size();
471 }
472 else
473 {
474 ARMARX_WARNING << "an non-existent counter id was deleted! " << VAROUT(counterId);
475 }
476 if (counterIds.empty())
477 {
478 lastTimeReachedZero = IceUtil::Time::now();
479 }
480 }
481
483 const ArmarXManagerPtr& manager,
484 const std::string& id,
485 IceUtil::Time deletionDelay,
486 IceUtil::Time orphantDeletionDelay,
487 long heartBeatMs) :
489 deletionDelay{deletionDelay},
490 orphantDeletionDelay{orphantDeletionDelay},
491 heartBeatMs{heartBeatMs}
492 {
493 ARMARX_CHECK_GREATER(heartBeatMs, 0);
494 ARMARX_CHECK_GREATER(deletionDelay.toMicroSeconds(), 0);
495 ARMARX_CHECK_GREATER(orphantDeletionDelay.toMicroSeconds(), 0);
496 }
497
498 IceUtil::Time
499 AbstractRemoteReferenceCountControlBlock::nextCheckTimePoint()
500 {
501 std::lock_guard<std::mutex> guard{mtx};
502 if (counterIds.empty())
503 {
504 return lastTimeReachedZero + deletionDelay;
505 }
506 else
507 {
508 IceUtil::Time max;
509 for (const auto& entry : counterIds)
510 {
511 max = std::max(max, entry.second);
512 }
513 return max + orphantDeletionDelay;
514 }
515 }
516
517 RemoteReferenceCounterBasePtr
519 {
520 ARMARX_DEBUG << "creating RemoteReferenceCounterBasePtr";
521 auto proxy = RemoteReferenceCountControlBlockInterfacePrx::checkedCast(selfProxy);
523 return new RemoteReferenceCounter{proxy, id, heartBeatMs};
524 }
525
526 //AbstractSimpleRemoteReferenceCountControlBlock
527 void
529 const Ice::Current&)
530 {
531 std::lock_guard<std::mutex> guard{mtx};
532 if (counterIds.count(counterId))
533 {
534 ARMARX_WARNING << "an already existing counter id was added! " << VAROUT(counterId);
535 }
536 else
537 {
538 counterIds.emplace(counterId);
539 ARMARX_DEBUG << "adding counter id '" << counterId << "'. "
540 << "New number of counters: " << counterIds.size();
541 }
542 }
543
544 void
546 const Ice::Current&)
547 {
548 std::lock_guard<std::mutex> guard{mtx};
549 if (counterIds.count(counterId))
550 {
551 counterIds.erase(counterId);
552 ARMARX_DEBUG << "deleteing counter id '" << counterId << "'. "
553 << "Remaining number of counters: " << counterIds.size();
554 }
555 else
556 {
557 ARMARX_WARNING << "an non-existent counter id was deleted! " << VAROUT(counterId);
558 }
559 if (counterIds.empty())
560 {
561 lastTimeReachedZero = IceUtil::Time::now();
562 }
563 }
564
566 const ArmarXManagerPtr& manager,
567 const std::string& id,
568 IceUtil::Time deletionDelay) :
570 deletionDelay{deletionDelay}
571 {
572 ARMARX_CHECK_GREATER(deletionDelay.toMicroSeconds(), 0);
573 }
574
575 SimpleRemoteReferenceCounterBasePtr
577 {
578 ARMARX_DEBUG << "creating SimpleRemoteReferenceCounterBasePtr";
579 auto proxy = SimpleRemoteReferenceCountControlBlockInterfacePrx::checkedCast(selfProxy);
581 return new SimpleRemoteReferenceCounter{proxy, id};
582 }
583
584 IceUtil::Time
585 AbstractSimpleRemoteReferenceCountControlBlock::nextCheckTimePoint()
586 {
587 std::lock_guard<std::mutex> guard{mtx};
588 if (counterIds.empty())
589 {
590 return lastTimeReachedZero + deletionDelay;
591 }
592 return IceUtil::Time::now() + deletionDelay;
593 }
594} // namespace armarx::detail
595
596namespace armarx::ObjectFactories
597{
611
614} // namespace armarx::ObjectFactories
615
616namespace armarx
617{
620} // namespace armarx
#define VAROUT(x)
static FactoryCollectionBaseCleanUp addToPreregistration(FactoryCollectionBasePtr factoryCollection)
void add(ObjectFactoryMap &map)
static void Remove(RemoteReferenceCounter *ptr)
static void Add(RemoteReferenceCounter *ptr)
void add(detail::RemoteReferenceCountControlBlockManagementInterfacePtr ptr)
IceUtil::Time heartbeat(IceUtil::Time now)
RemoteReferenceCounter(const RemoteReferenceCounter &other)
std::string getId(const Ice::Current &=Ice::emptyCurrent) const final override
RemoteReferenceCounterBasePtr copy(const Ice::Current &=Ice::emptyCurrent) const final override
RemoteReferenceCounter(const RemoteReferenceCountControlBlockInterfacePrx &prx, const std::string &id, Ice::Long heartBeatMs)
std::string getId(const Ice::Current &=Ice::emptyCurrent) const final override
SimpleRemoteReferenceCounter(const SimpleRemoteReferenceCounter &other)
SimpleRemoteReferenceCounter(const SimpleRemoteReferenceCountControlBlockInterfacePrx &prx, const std::string &id)
SimpleRemoteReferenceCounterBasePtr copy(const Ice::Current &=Ice::emptyCurrent) const final override
AbstractRemoteReferenceCountControlBlock(const ArmarXManagerPtr &manager, const std::string &id, IceUtil::Time deletionDelay, IceUtil::Time orphantDeletionDelay, long heartBeatMs)
void heartbeat(const std::string &counterId, const Ice::Current &=Ice::emptyCurrent) final override
void removeCounter(const std::string &counterId, const Ice::Current &=Ice::emptyCurrent) final override
void addCounter(const std::string &counterId, const Ice::Current &=Ice::emptyCurrent) final override
void addCounter(const std::string &counterId, const Ice::Current &) final override
void removeCounter(const std::string &counterId, const Ice::Current &) final override
AbstractSimpleRemoteReferenceCountControlBlock(const ArmarXManagerPtr &manager, const std::string &id, IceUtil::Time deletionDelay)
RemoteReferenceCountControlBlockManagementInterface(const ArmarXManagerPtr &manager, const std::string &id)
T max(T t1, T t2)
Definition gdiam.h:51
#define ARMARX_CHECK_GREATER(lhs, rhs)
This macro evaluates whether lhs is greater (>) than rhs and if it turns out to be false it will thro...
#define ARMARX_CHECK_EXPRESSION(expression)
This macro evaluates the expression and if it turns out to be false it will throw an ExpressionExcept...
#define ARMARX_CHECK_NOT_NULL(ptr)
This macro evaluates whether ptr is not null and if it turns out to be false it will throw an Express...
#define ARMARX_FATAL
The logging level for unexpected behaviour, that will lead to a seriously malfunctioning program and ...
Definition Logging.h:199
#define ARMARX_DEBUG
The logging level for output that is only interesting while debugging.
Definition Logging.h:184
#define ARMARX_WARNING
The logging level for unexpected behaviour, but not a serious problem.
Definition Logging.h:193
const FactoryCollectionBaseCleanUp RemoteReferenceCounterObjectFactoriesVar
IceUtil::Handle< detail::RemoteReferenceCountControlBlockManagementInterface > RemoteReferenceCountControlBlockManagementInterfacePtr
This file offers overloads of toIce() and fromIce() functions for STL container types.
IceUtil::Handle< RemoteReferenceCounter > RemoteReferenceCounterPtr
std::map< std::string, Ice::ValueFactoryPtr > ObjectFactoryMap
std::vector< T > max(const std::vector< T > &v1, const std::vector< T > &v2)