28 #include <Ice/Initialize.h>
29 #include <Ice/ObjectAdapter.h>
30 #include <IceUtil/UUID.h>
41 class RemoteReferenceCounter;
53 Instance().remove(ptr);
65 std::atomic_bool shutdown {
false};
66 HeartbeatMonitor() =
default;
67 using PqEntry = std::pair<IceUtil::Time, RemoteReferenceCounter*>;
70 bool operator()(
const PqEntry& lhs,
const PqEntry& rhs)
const
72 return lhs.first > rhs.first;
75 std::priority_queue<PqEntry,
77 PqEntryCompare> entries;
78 std::set<void*> entriesToDelete;
81 const IceUtil::Time HeartbeatMonitor::maximalSleepTime = IceUtil::Time::milliSeconds(100);
88 : RemoteReferenceCounterBase(prx, id, heartBeatMs)
98 block->removeCounter(counterId);
103 RemoteReferenceCounterBasePtr
copy(
const Ice::Current& = Ice::emptyCurrent)
const final override
107 std::string
getId(
const Ice::Current& = Ice::emptyCurrent)
const final override
113 block->addCounter(counterId);
118 block->heartbeat(counterId);
119 return now + IceUtil::Time::milliSeconds(heartBeatMs);
122 const std::string counterId
124 IceUtil::generateUUID()
132 SimpleRemoteReferenceCounter(
const SimpleRemoteReferenceCountControlBlockInterfacePrx& prx,
const std::string&
id) : SimpleRemoteReferenceCounterBase(prx, id)
142 block->removeCounter(counterId);
146 SimpleRemoteReferenceCounterBasePtr
copy(
const Ice::Current& = Ice::emptyCurrent)
const final override
150 std::string
getId(
const Ice::Current& = Ice::emptyCurrent)
const final override
156 block->addCounter(counterId);
159 const std::string counterId
161 IceUtil::generateUUID()
165 HeartbeatMonitor& HeartbeatMonitor::Instance()
167 static HeartbeatMonitor monitor;
170 void HeartbeatMonitor::add(RemoteReferenceCounter* ptr)
172 std::lock_guard<std::mutex> guard {mtx};
176 t = std::thread {[
this]{heartbeatTask();}};
178 entries.emplace(ptr->heartbeat(IceUtil::Time::now()), ptr);
180 void HeartbeatMonitor::remove(RemoteReferenceCounter* ptr)
182 std::lock_guard<std::mutex> guard {mtx};
183 entriesToDelete.emplace(ptr);
185 HeartbeatMonitor::~HeartbeatMonitor()
187 std::lock_guard<std::mutex> guard {mtx};
194 void HeartbeatMonitor::heartbeatTask()
203 std::lock_guard<std::mutex> guard {mtx};
204 now = IceUtil::Time::now();
205 while (!shutdown && !entries.empty() && entries.top().first <= now)
207 RemoteReferenceCounter* ptr;
208 std::tie(std::ignore, ptr) = entries.top();
210 if (entriesToDelete.count(ptr))
217 entries.emplace(ptr->heartbeat(now), ptr);
224 if (!entries.empty())
226 msToSleep =
std::min(maximalSleepTime, entries.top().first - now);
228 std::this_thread::sleep_for(std::chrono::milliseconds {msToSleep.toMilliSeconds()});
234 if (!shutdown.exchange(
true))
236 if (thread.joinable())
243 void RemoteReferenceCountControlBlockManager::manage()
249 std::lock_guard<std::mutex> guard {stateMutex};
250 now = IceUtil::Time::now();
253 auto newPendingForActivation = pendingForActivation;
254 for (
const auto& pending : pendingForActivation)
256 if (pending->isCountingActivated())
258 rrccbs.emplace(pending->nextCheckTimePoint(), pending);
259 newPendingForActivation.erase(pending);
262 pendingForActivation = std::move(newPendingForActivation);
265 while (!shutdown && !rrccbs.empty() && rrccbs.top().first <= now)
268 std::tie(std::ignore, ptr) = rrccbs.top();
270 const auto newT = ptr->nextCheckTimePoint();
275 ptr->countReachedZero();
279 ARMARX_WARNING <<
" function countReachedZero threw an exception";
283 rrccbs.emplace(newT, std::move(ptr));
286 std::this_thread::sleep_for(std::chrono::milliseconds {period.toMilliSeconds()});
290 std::lock_guard<std::mutex> guard {stateMutex};
296 <<
" counting activated : " << ptr->isCountingActivated() <<
"\n"
297 <<
" identity (cat/name): " << ptr->getProxy()->ice_getIdentity().category <<
" / " << ptr->getProxy()->ice_getIdentity().name <<
"\n"
298 <<
" type id : " << ptr->getProxy()->ice_id();
301 ptr->countReachedZero();
307 while (!rrccbs.empty())
310 std::tie(std::ignore, ptr) = rrccbs.top();
312 processRemovedEntry(ptr);
314 for (
const auto& ptr : pendingForActivation)
316 processRemovedEntry(ptr);
318 pendingForActivation.clear();
325 std::lock_guard<std::mutex> guard {stateMutex};
327 if (!thread.joinable())
329 thread = std::thread {[
this]{manage();}};
331 pendingForActivation.emplace(ptr);
339 : armarXManager {manager},
id {
id}
342 const auto iceId = Ice::stringToIdentity(
"RRCB_" + IceUtil::generateUUID());
344 selfProxy = armarXManager->getAdapter()->add(
this, iceId);
345 __setNoDelete(
false);
352 ARMARX_FATAL <<
"armarXManager NULL\n(line " << __LINE__ <<
" in " << __FUNCTION__ <<
" in " << __FILE__
")";
357 ARMARX_FATAL <<
"selfProxy NULL\n(line " << __LINE__ <<
" in " << __FUNCTION__ <<
" in " << __FILE__
")";
365 catch (Ice::ObjectAdapterDeactivatedException&) {}
378 std::lock_guard<std::mutex> guard {
mtx};
379 if (counterIds.count(counterId))
385 ARMARX_DEBUG <<
"adding counter id '" << counterId <<
"'. "
386 <<
"New number of counters: " << counterIds.size() + 1;
388 counterIds[counterId] = IceUtil::Time::now();
392 std::lock_guard<std::mutex> guard {
mtx};
393 if (!counterIds.count(counterId))
395 ARMARX_WARNING <<
"heartbeat of nonexistent counter '" << counterId <<
"'. "
396 <<
"The counter was added.";
398 counterIds[counterId] = IceUtil::Time::now();
402 std::lock_guard<std::mutex> guard {
mtx};
403 if (counterIds.count(counterId))
405 counterIds.erase(counterId);
406 ARMARX_DEBUG <<
"deleteing counter id '" << counterId <<
"'. "
407 <<
"Remaining number of counters: " << counterIds.size();
413 if (counterIds.empty())
422 deletionDelay {deletionDelay}, orphantDeletionDelay {orphantDeletionDelay}, heartBeatMs {heartBeatMs}
428 IceUtil::Time AbstractRemoteReferenceCountControlBlock::nextCheckTimePoint()
430 std::lock_guard<std::mutex> guard {
mtx};
431 if (counterIds.empty())
438 for (
const auto& entry : counterIds)
442 return max + orphantDeletionDelay;
447 ARMARX_DEBUG <<
"creating RemoteReferenceCounterBasePtr";
448 auto proxy = RemoteReferenceCountControlBlockInterfacePrx::checkedCast(
selfProxy);
456 std::lock_guard<std::mutex> guard {
mtx};
457 if (counterIds.count(counterId))
463 counterIds.emplace(counterId);
464 ARMARX_DEBUG <<
"adding counter id '" << counterId <<
"'. "
465 <<
"New number of counters: " << counterIds.size();
470 std::lock_guard<std::mutex> guard {
mtx};
471 if (counterIds.count(counterId))
473 counterIds.erase(counterId);
474 ARMARX_DEBUG <<
"deleteing counter id '" << counterId <<
"'. "
475 <<
"Remaining number of counters: " << counterIds.size();
481 if (counterIds.empty())
495 ARMARX_DEBUG <<
"creating SimpleRemoteReferenceCounterBasePtr";
496 auto proxy = SimpleRemoteReferenceCountControlBlockInterfacePrx::checkedCast(
selfProxy);
500 IceUtil::Time AbstractSimpleRemoteReferenceCountControlBlock::nextCheckTimePoint()
502 std::lock_guard<std::mutex> guard {
mtx};
503 if (counterIds.empty())
507 return IceUtil::Time::now() + deletionDelay;
519 add<armarx::RemoteReferenceCounterBase, armarx::RemoteReferenceCounter>(map);
520 add<armarx::SimpleRemoteReferenceCounterBase, armarx::SimpleRemoteReferenceCounter>(map);