30#include <Ice/Initialize.h>
31#include <Ice/ObjectAdapter.h>
32#include <IceUtil/UUID.h>
44 class HeartbeatMonitor
56 Instance().remove(ptr);
69 std::atomic_bool shutdown{
false};
70 HeartbeatMonitor() =
default;
71 using PqEntry = std::pair<IceUtil::Time, RemoteReferenceCounter*>;
76 operator()(
const PqEntry& lhs,
const PqEntry& rhs)
const
78 return lhs.first > rhs.first;
82 std::priority_queue<PqEntry, std::vector<PqEntry>, PqEntryCompare> entries;
83 std::set<void*> entriesToDelete;
84 static const IceUtil::Time maximalSleepTime;
87 const IceUtil::Time HeartbeatMonitor::maximalSleepTime = IceUtil::Time::milliSeconds(100);
95 const std::string&
id,
96 Ice::Long heartBeatMs) :
97 RemoteReferenceCounterBase(prx, id, heartBeatMs)
111 block->removeCounter(counterId);
116 RemoteReferenceCounterBasePtr
117 copy(
const Ice::Current& = Ice::emptyCurrent)
const final override
123 getId(
const Ice::Current& = Ice::emptyCurrent)
const final override
131 block->addCounter(counterId);
138 block->heartbeat(counterId);
139 return now + IceUtil::Time::milliSeconds(heartBeatMs);
143 const std::string counterId{IceUtil::generateUUID()};
152 const std::string&
id) :
153 SimpleRemoteReferenceCounterBase(prx, id)
167 block->removeCounter(counterId);
171 SimpleRemoteReferenceCounterBasePtr
172 copy(
const Ice::Current& = Ice::emptyCurrent)
const final override
178 getId(
const Ice::Current& = Ice::emptyCurrent)
const final override
186 block->addCounter(counterId);
190 const std::string counterId{IceUtil::generateUUID()};
194 HeartbeatMonitor::Instance()
196 static HeartbeatMonitor monitor;
203 std::lock_guard<std::mutex> guard{mtx};
207 t = std::thread{[
this] { heartbeatTask(); }};
209 entries.emplace(ptr->heartbeat(IceUtil::Time::now()), ptr);
215 std::lock_guard<std::mutex> guard{mtx};
216 entriesToDelete.emplace(ptr);
219 HeartbeatMonitor::~HeartbeatMonitor()
221 std::lock_guard<std::mutex> guard{mtx};
230 HeartbeatMonitor::heartbeatTask()
239 std::lock_guard<std::mutex> guard{mtx};
240 now = IceUtil::Time::now();
241 while (!shutdown && !entries.empty() && entries.top().first <= now)
243 RemoteReferenceCounter* ptr;
244 std::tie(std::ignore, ptr) = entries.top();
246 if (entriesToDelete.count(ptr))
253 entries.emplace(ptr->heartbeat(now), ptr);
260 IceUtil::Time msToSleep = maximalSleepTime;
261 if (!entries.empty())
263 msToSleep = std::min(maximalSleepTime, entries.top().first - now);
265 std::this_thread::sleep_for(std::chrono::milliseconds{msToSleep.toMilliSeconds()});
272 if (!shutdown.exchange(
true))
274 if (thread.joinable())
282 RemoteReferenceCountControlBlockManager::manage()
288 std::lock_guard<std::mutex> guard{stateMutex};
289 now = IceUtil::Time::now();
292 auto newPendingForActivation = pendingForActivation;
293 for (
const auto& pending : pendingForActivation)
295 if (pending->isCountingActivated())
297 rrccbs.emplace(pending->nextCheckTimePoint(), pending);
298 newPendingForActivation.erase(pending);
301 pendingForActivation = std::move(newPendingForActivation);
304 while (!shutdown && !rrccbs.empty() && rrccbs.top().first <= now)
307 std::tie(std::ignore, ptr) = rrccbs.top();
309 const auto newT = ptr->nextCheckTimePoint();
314 ptr->countReachedZero();
318 ARMARX_WARNING <<
" function countReachedZero threw an exception";
322 rrccbs.emplace(newT, std::move(ptr));
325 std::this_thread::sleep_for(std::chrono::milliseconds{period.toMilliSeconds()});
329 std::lock_guard<std::mutex> guard{stateMutex};
330 auto processRemovedEntry =
336 <<
" counting activated : " << ptr->isCountingActivated() <<
"\n"
337 <<
" identity (cat/name): "
338 << ptr->getProxy()->ice_getIdentity().category <<
" / "
339 << ptr->getProxy()->ice_getIdentity().name <<
"\n"
340 <<
" type id : " << ptr->getProxy()->ice_id();
343 ptr->countReachedZero();
350 while (!rrccbs.empty())
353 std::tie(std::ignore, ptr) = rrccbs.top();
355 processRemovedEntry(ptr);
357 for (
const auto& ptr : pendingForActivation)
359 processRemovedEntry(ptr);
361 pendingForActivation.clear();
370 std::lock_guard<std::mutex> guard{stateMutex};
372 if (!thread.joinable())
374 thread = std::thread{[
this] { manage(); }};
376 pendingForActivation.emplace(ptr);
385 const std::string&
id) :
389 const auto iceId = Ice::stringToIdentity(
"RRCB_" + IceUtil::generateUUID());
392 __setNoDelete(
false);
400 ARMARX_FATAL <<
"armarXManager NULL\n(line " << __LINE__ <<
" in " << __FUNCTION__
401 <<
" in " << __FILE__
")";
407 ARMARX_FATAL <<
"selfProxy NULL\n(line " << __LINE__ <<
" in " << __FUNCTION__ <<
" in "
417 catch (Ice::ObjectAdapterDeactivatedException&)
435 std::lock_guard<std::mutex> guard{
mtx};
436 if (counterIds.count(counterId))
442 ARMARX_DEBUG <<
"adding counter id '" << counterId <<
"'. "
443 <<
"New number of counters: " << counterIds.size() + 1;
445 counterIds[counterId] = IceUtil::Time::now();
452 std::lock_guard<std::mutex> guard{
mtx};
453 if (!counterIds.count(counterId))
455 ARMARX_WARNING <<
"heartbeat of nonexistent counter '" << counterId <<
"'. "
456 <<
"The counter was added.";
458 counterIds[counterId] = IceUtil::Time::now();
465 std::lock_guard<std::mutex> guard{
mtx};
466 if (counterIds.count(counterId))
468 counterIds.erase(counterId);
469 ARMARX_DEBUG <<
"deleteing counter id '" << counterId <<
"'. "
470 <<
"Remaining number of counters: " << counterIds.size();
476 if (counterIds.empty())
484 const std::string&
id,
485 IceUtil::Time deletionDelay,
486 IceUtil::Time orphantDeletionDelay,
489 deletionDelay{deletionDelay},
490 orphantDeletionDelay{orphantDeletionDelay},
491 heartBeatMs{heartBeatMs}
499 AbstractRemoteReferenceCountControlBlock::nextCheckTimePoint()
501 std::lock_guard<std::mutex> guard{
mtx};
502 if (counterIds.empty())
509 for (
const auto& entry : counterIds)
511 max = std::max(
max, entry.second);
513 return max + orphantDeletionDelay;
517 RemoteReferenceCounterBasePtr
520 ARMARX_DEBUG <<
"creating RemoteReferenceCounterBasePtr";
521 auto proxy = RemoteReferenceCountControlBlockInterfacePrx::checkedCast(
selfProxy);
531 std::lock_guard<std::mutex> guard{
mtx};
532 if (counterIds.count(counterId))
538 counterIds.emplace(counterId);
539 ARMARX_DEBUG <<
"adding counter id '" << counterId <<
"'. "
540 <<
"New number of counters: " << counterIds.size();
548 std::lock_guard<std::mutex> guard{
mtx};
549 if (counterIds.count(counterId))
551 counterIds.erase(counterId);
552 ARMARX_DEBUG <<
"deleteing counter id '" << counterId <<
"'. "
553 <<
"Remaining number of counters: " << counterIds.size();
559 if (counterIds.empty())
567 const std::string&
id,
568 IceUtil::Time deletionDelay) :
570 deletionDelay{deletionDelay}
575 SimpleRemoteReferenceCounterBasePtr
578 ARMARX_DEBUG <<
"creating SimpleRemoteReferenceCounterBasePtr";
579 auto proxy = SimpleRemoteReferenceCountControlBlockInterfacePrx::checkedCast(
selfProxy);
585 AbstractSimpleRemoteReferenceCountControlBlock::nextCheckTimePoint()
587 std::lock_guard<std::mutex> guard{
mtx};
588 if (counterIds.empty())
592 return IceUtil::Time::now() + deletionDelay;
596namespace armarx::ObjectFactories
static FactoryCollectionBaseCleanUp addToPreregistration(FactoryCollectionBasePtr factoryCollection)
void add(ObjectFactoryMap &map)
static void Remove(RemoteReferenceCounter *ptr)
static void Add(RemoteReferenceCounter *ptr)
ObjectFactoryMap getFactories() override
static const Ice::Long DefaultOrphantDeletionDelayMs
static const Ice::Long DefaultDeletionDelayMs
void add(detail::RemoteReferenceCountControlBlockManagementInterfacePtr ptr)
void ice_postUnmarshal() override
IceUtil::Time heartbeat(IceUtil::Time now)
RemoteReferenceCounter(const RemoteReferenceCounter &other)
std::string getId(const Ice::Current &=Ice::emptyCurrent) const final override
~RemoteReferenceCounter() override
RemoteReferenceCounterBasePtr copy(const Ice::Current &=Ice::emptyCurrent) const final override
RemoteReferenceCounter(const RemoteReferenceCountControlBlockInterfacePrx &prx, const std::string &id, Ice::Long heartBeatMs)
RemoteReferenceCounter()=default
void ice_postUnmarshal() override
std::string getId(const Ice::Current &=Ice::emptyCurrent) const final override
~SimpleRemoteReferenceCounter() override
SimpleRemoteReferenceCounter(const SimpleRemoteReferenceCounter &other)
SimpleRemoteReferenceCounter(const SimpleRemoteReferenceCountControlBlockInterfacePrx &prx, const std::string &id)
SimpleRemoteReferenceCounterBasePtr copy(const Ice::Current &=Ice::emptyCurrent) const final override
SimpleRemoteReferenceCounter()=default
AbstractRemoteReferenceCountControlBlock(const ArmarXManagerPtr &manager, const std::string &id, IceUtil::Time deletionDelay, IceUtil::Time orphantDeletionDelay, long heartBeatMs)
IceUtil::Handle< ArmarXManager > ArmarXManagerPtr
RemoteReferenceCounterBasePtr getReferenceCounter()
void heartbeat(const std::string &counterId, const Ice::Current &=Ice::emptyCurrent) final override
void removeCounter(const std::string &counterId, const Ice::Current &=Ice::emptyCurrent) final override
void addCounter(const std::string &counterId, const Ice::Current &=Ice::emptyCurrent) final override
void addCounter(const std::string &counterId, const Ice::Current &) final override
void removeCounter(const std::string &counterId, const Ice::Current &) final override
SimpleRemoteReferenceCounterBasePtr getReferenceCounter()
IceUtil::Handle< ArmarXManager > ArmarXManagerPtr
AbstractSimpleRemoteReferenceCountControlBlock(const ArmarXManagerPtr &manager, const std::string &id, IceUtil::Time deletionDelay)
IceUtil::Time lastTimeReachedZero
virtual void onCountReachedZero()=0
~RemoteReferenceCountControlBlockManagementInterface() override
IceUtil::Handle< ArmarXManager > ArmarXManagerPtr
bool isCountingActivated() const
ArmarXManagerPtr armarXManager
RemoteReferenceCountControlBlockManagementInterface(const ArmarXManagerPtr &manager, const std::string &id)
#define ARMARX_CHECK_GREATER(lhs, rhs)
This macro evaluates whether lhs is greater (>) than rhs and if it turns out to be false it will thro...
#define ARMARX_CHECK_EXPRESSION(expression)
This macro evaluates the expression and if it turns out to be false it will throw an ExpressionExcept...
#define ARMARX_CHECK_NOT_NULL(ptr)
This macro evaluates whether ptr is not null and if it turns out to be false it will throw an Express...
#define ARMARX_FATAL
The logging level for unexpected behaviour, that will lead to a seriously malfunctioning program and ...
#define ARMARX_DEBUG
The logging level for output that is only interesting while debugging.
#define ARMARX_WARNING
The logging level for unexpected behaviour, but not a serious problem.
const FactoryCollectionBaseCleanUp RemoteReferenceCounterObjectFactoriesVar
IceUtil::Handle< detail::RemoteReferenceCountControlBlockManagementInterface > RemoteReferenceCountControlBlockManagementInterfacePtr
This file offers overloads of toIce() and fromIce() functions for STL container types.
IceUtil::Handle< RemoteReferenceCounter > RemoteReferenceCounterPtr
std::map< std::string, Ice::ValueFactoryPtr > ObjectFactoryMap
std::vector< T > max(const std::vector< T > &v1, const std::vector< T > &v2)