30 #include <Ice/Initialize.h>
31 #include <Ice/ObjectAdapter.h>
32 #include <IceUtil/UUID.h>
41 class RemoteReferenceCounter;
56 Instance().remove(ptr);
69 std::atomic_bool shutdown{
false};
70 HeartbeatMonitor() =
default;
71 using PqEntry = std::pair<IceUtil::Time, RemoteReferenceCounter*>;
76 operator()(
const PqEntry& lhs,
const PqEntry& rhs)
const
78 return lhs.first > rhs.first;
82 std::priority_queue<PqEntry, std::vector<PqEntry>, PqEntryCompare> entries;
83 std::set<void*> entriesToDelete;
87 const IceUtil::Time HeartbeatMonitor::maximalSleepTime = IceUtil::Time::milliSeconds(100);
95 const std::string&
id,
97 RemoteReferenceCounterBase(prx, id, heartBeatMs)
111 block->removeCounter(counterId);
116 RemoteReferenceCounterBasePtr
117 copy(
const Ice::Current& = Ice::emptyCurrent)
const final override
123 getId(
const Ice::Current& = Ice::emptyCurrent)
const final override
131 block->addCounter(counterId);
138 block->heartbeat(counterId);
139 return now + IceUtil::Time::milliSeconds(heartBeatMs);
143 const std::string counterId{IceUtil::generateUUID()};
152 const std::string&
id) :
153 SimpleRemoteReferenceCounterBase(prx, id)
167 block->removeCounter(counterId);
171 SimpleRemoteReferenceCounterBasePtr
172 copy(
const Ice::Current& = Ice::emptyCurrent)
const final override
178 getId(
const Ice::Current& = Ice::emptyCurrent)
const final override
186 block->addCounter(counterId);
190 const std::string counterId{IceUtil::generateUUID()};
194 HeartbeatMonitor::Instance()
196 static HeartbeatMonitor monitor;
201 HeartbeatMonitor::add(RemoteReferenceCounter* ptr)
203 std::lock_guard<std::mutex> guard{mtx};
207 t = std::thread{[
this] { heartbeatTask(); }};
209 entries.emplace(ptr->heartbeat(IceUtil::Time::now()), ptr);
213 HeartbeatMonitor::remove(RemoteReferenceCounter* ptr)
215 std::lock_guard<std::mutex> guard{mtx};
216 entriesToDelete.emplace(ptr);
219 HeartbeatMonitor::~HeartbeatMonitor()
221 std::lock_guard<std::mutex> guard{mtx};
230 HeartbeatMonitor::heartbeatTask()
239 std::lock_guard<std::mutex> guard{mtx};
240 now = IceUtil::Time::now();
241 while (!shutdown && !entries.empty() && entries.top().first <= now)
243 RemoteReferenceCounter* ptr;
244 std::tie(std::ignore, ptr) = entries.top();
246 if (entriesToDelete.count(ptr))
253 entries.emplace(ptr->heartbeat(now), ptr);
261 if (!entries.empty())
263 msToSleep =
std::min(maximalSleepTime, entries.top().first - now);
265 std::this_thread::sleep_for(std::chrono::milliseconds{msToSleep.toMilliSeconds()});
272 if (!shutdown.exchange(
true))
274 if (thread.joinable())
282 RemoteReferenceCountControlBlockManager::manage()
288 std::lock_guard<std::mutex> guard{stateMutex};
289 now = IceUtil::Time::now();
292 auto newPendingForActivation = pendingForActivation;
293 for (
const auto& pending : pendingForActivation)
295 if (pending->isCountingActivated())
297 rrccbs.emplace(pending->nextCheckTimePoint(), pending);
298 newPendingForActivation.erase(pending);
301 pendingForActivation = std::move(newPendingForActivation);
304 while (!shutdown && !rrccbs.empty() && rrccbs.top().first <= now)
307 std::tie(std::ignore, ptr) = rrccbs.top();
309 const auto newT = ptr->nextCheckTimePoint();
314 ptr->countReachedZero();
318 ARMARX_WARNING <<
" function countReachedZero threw an exception";
322 rrccbs.emplace(newT, std::move(ptr));
325 std::this_thread::sleep_for(std::chrono::milliseconds{period.toMilliSeconds()});
329 std::lock_guard<std::mutex> guard{stateMutex};
330 auto processRemovedEntry =
336 <<
" counting activated : " << ptr->isCountingActivated() <<
"\n"
337 <<
" identity (cat/name): "
338 << ptr->getProxy()->ice_getIdentity().category <<
" / "
339 << ptr->getProxy()->ice_getIdentity().name <<
"\n"
340 <<
" type id : " << ptr->getProxy()->ice_id();
343 ptr->countReachedZero();
350 while (!rrccbs.empty())
353 std::tie(std::ignore, ptr) = rrccbs.top();
355 processRemovedEntry(ptr);
357 for (
const auto& ptr : pendingForActivation)
359 processRemovedEntry(ptr);
361 pendingForActivation.clear();
370 std::lock_guard<std::mutex> guard{stateMutex};
372 if (!thread.joinable())
374 thread = std::thread{[
this] { manage(); }};
376 pendingForActivation.emplace(ptr);
385 const std::string&
id) :
386 armarXManager{manager},
id{
id}
389 const auto iceId = Ice::stringToIdentity(
"RRCB_" + IceUtil::generateUUID());
391 selfProxy = armarXManager->getAdapter()->add(
this, iceId);
392 __setNoDelete(
false);
400 ARMARX_FATAL <<
"armarXManager NULL\n(line " << __LINE__ <<
" in " << __FUNCTION__
401 <<
" in " << __FILE__
")";
407 ARMARX_FATAL <<
"selfProxy NULL\n(line " << __LINE__ <<
" in " << __FUNCTION__ <<
" in "
417 catch (Ice::ObjectAdapterDeactivatedException&)
435 std::lock_guard<std::mutex> guard{
mtx};
436 if (counterIds.count(counterId))
442 ARMARX_DEBUG <<
"adding counter id '" << counterId <<
"'. "
443 <<
"New number of counters: " << counterIds.size() + 1;
445 counterIds[counterId] = IceUtil::Time::now();
452 std::lock_guard<std::mutex> guard{
mtx};
453 if (!counterIds.count(counterId))
455 ARMARX_WARNING <<
"heartbeat of nonexistent counter '" << counterId <<
"'. "
456 <<
"The counter was added.";
458 counterIds[counterId] = IceUtil::Time::now();
465 std::lock_guard<std::mutex> guard{
mtx};
466 if (counterIds.count(counterId))
468 counterIds.erase(counterId);
469 ARMARX_DEBUG <<
"deleteing counter id '" << counterId <<
"'. "
470 <<
"Remaining number of counters: " << counterIds.size();
476 if (counterIds.empty())
484 const std::string&
id,
489 deletionDelay{deletionDelay},
490 orphantDeletionDelay{orphantDeletionDelay},
491 heartBeatMs{heartBeatMs}
499 AbstractRemoteReferenceCountControlBlock::nextCheckTimePoint()
501 std::lock_guard<std::mutex> guard{
mtx};
502 if (counterIds.empty())
509 for (
const auto& entry : counterIds)
513 return max + orphantDeletionDelay;
517 RemoteReferenceCounterBasePtr
520 ARMARX_DEBUG <<
"creating RemoteReferenceCounterBasePtr";
521 auto proxy = RemoteReferenceCountControlBlockInterfacePrx::checkedCast(
selfProxy);
531 std::lock_guard<std::mutex> guard{
mtx};
532 if (counterIds.count(counterId))
538 counterIds.emplace(counterId);
539 ARMARX_DEBUG <<
"adding counter id '" << counterId <<
"'. "
540 <<
"New number of counters: " << counterIds.size();
548 std::lock_guard<std::mutex> guard{
mtx};
549 if (counterIds.count(counterId))
551 counterIds.erase(counterId);
552 ARMARX_DEBUG <<
"deleteing counter id '" << counterId <<
"'. "
553 <<
"Remaining number of counters: " << counterIds.size();
559 if (counterIds.empty())
567 const std::string&
id,
570 deletionDelay{deletionDelay}
575 SimpleRemoteReferenceCounterBasePtr
578 ARMARX_DEBUG <<
"creating SimpleRemoteReferenceCounterBasePtr";
579 auto proxy = SimpleRemoteReferenceCountControlBlockInterfacePrx::checkedCast(
selfProxy);
585 AbstractSimpleRemoteReferenceCountControlBlock::nextCheckTimePoint()
587 std::lock_guard<std::mutex> guard{
mtx};
588 if (counterIds.empty())
592 return IceUtil::Time::now() + deletionDelay;
605 add<armarx::RemoteReferenceCounterBase, armarx::RemoteReferenceCounter>(map);
606 add<armarx::SimpleRemoteReferenceCounterBase, armarx::SimpleRemoteReferenceCounter>(