FrequencyFilter.cpp
Go to the documentation of this file.
1#include "FrequencyFilter.h"
2
3#include <chrono>
4
6{
7
8 bool
10 {
11 // Thread safety: lock the filter mutex for all mutable state access
12 std::lock_guard<std::mutex> lock(filterMutex_);
13
14 //accepting to many elements makes the filter slow and brings problems with the buffer with itself!
15
16 // set default values for used variables:
17 auto start = std::chrono::high_resolution_clock::now();
18 bool instances_accepted = false;
19 std::int64_t current = 0;
20 // get entity id of this snapshot:
21 auto memID = e.id().getEntityID();
22
23 //find out if timestamps are corrupted:
24 if (this->nonCorruptedType == TimestampType::NOT_SET)
25 {
26 ARMARX_DEBUG << "Setting timestamp type";
27 this->setNonCorruptedTimestampType(e);
28 ARMARX_DEBUG << VAROUT(this->nonCorruptedType);
29 }
30
31 if (this->nonCorruptedType == TimestampType::ALL_CORRUPTED)
32 {
33 return true;
34 }
35
36 if (this->lastTimesPerEntity.end() == this->lastTimesPerEntity.find(memID))
37 {
38 //this happens if the key is not in the map, which means this is the first time this
39 //entity tries to save a snapshot
40 ARMARX_DEBUG << "First time this entity is saved";
41 auto firstIndex = e.getInstanceIndices()[0];
42 auto firstInsance = e.getInstance(firstIndex);
43 auto lastT = this->getNonCorruptedTimestamp(firstInsance, simulatedVersion);
44 //for statistics sake:
45 auto end = std::chrono::high_resolution_clock::now();
46 stats.end_time = end;
47 stats.additional_time += (end - start);
48 stats.accepted += 1;
49 //add this last time to the map:
50 this->lastTimesPerEntity[memID] = lastT;
51 return true; //the first one is always accepted
52 }
53 else
54 {
55 auto lastTime = this->lastTimesPerEntity.at(memID);
56
57 // check if any one of the instances for this snapshot were sent in the last x
58 // milliseconds since a snapshot was accepted last for this entity:
60 [this, &instances_accepted, &current, &lastTime, &simulatedVersion](
62 {
63 auto t = this->getNonCorruptedTimestamp(i, simulatedVersion);
64 int difference = std::abs(t - lastTime);
65 if (difference > this->maxDifference)
66 { //at least one instance is older than the last saved instance
67 instances_accepted = true;
68 current = this->getNonCorruptedTimestamp(i, simulatedVersion);
69 }
70 });
71
72 if (instances_accepted)
73 { //if one of the instances was accepted the time when an
74 //instance was accepted last needs to be changed
75 this->lastTimesPerEntity[memID] = current;
76 }
77 }
78
79 //set stats:
80 auto end = std::chrono::high_resolution_clock::now();
81 stats.end_time = end;
82 stats.additional_time += (end - start);
83
84 if (instances_accepted)
85 {
86 this->stats.accepted += 1;
87 }
88 else
89 {
90 this->stats.rejected += 1;
91 }
92
93 return instances_accepted;
94 }
95
96 void
97 SnapshotFrequencyFilter::setNonCorruptedTimestampType(const armem::wm::EntitySnapshot& e)
98 {
99 auto firstIndex = e.getInstanceIndices()[0];
100 auto firstInsance = e.getInstance(firstIndex);
101 auto sentTime = firstInsance.metadata().sentTime;
102 auto arrivedTime = firstInsance.metadata().arrivedTime;
103 auto referencedTime = firstInsance.metadata().referencedTime;
104
105 ARMARX_DEBUG << VAROUT(sentTime.toMilliSecondsSinceEpoch());
106 ARMARX_DEBUG << VAROUT(arrivedTime.toMilliSecondsSinceEpoch());
107 ARMARX_DEBUG << VAROUT(referencedTime.toMilliSecondsSinceEpoch());
108
109 //we want the referenced time, if not set correctly we take sent time, then arrvived time:
110 if (referencedTime.toMilliSecondsSinceEpoch() < 946688400000)
111 {
112 //timestamp corrupted:
113 if (sentTime.toMilliSecondsSinceEpoch() < 946688400000)
114 {
115 //sent also corrupted:
116 if (arrivedTime.toMilliSecondsSinceEpoch() < 946688400000)
117 {
118 //arrived also corrupted:
119 if (!corruptedWarningGiven)
120 {
121 ARMARX_WARNING << "LTM recording does not work correctly, as frequency "
122 "filter is used, but "
123 << "time sent, arrived and referenced are all corrupted. \n"
124 << "Accepting all snapshots.";
125 this->nonCorruptedType = TimestampType::ALL_CORRUPTED;
126 corruptedWarningGiven = true;
127 }
128 }
129 else
130 {
131 if (!corruptedWarningGiven)
132 { //only print this warning once
133 ARMARX_INFO << "Time referenced and sent for snapshot corrupted, using "
134 "time arrived for filtering";
135 corruptedWarningGiven = true;
136 }
137 this->nonCorruptedType = TimestampType::ARRIVED;
138 }
139 }
140 else
141 {
142 if (!corruptedWarningGiven)
143 { //only print this warning once
145 << "Time referenced snapshot corrupted, using time sent for filtering";
146 corruptedWarningGiven = true;
147 }
148 this->nonCorruptedType = TimestampType::SENT;
149 }
150 }
151 else
152 {
153 this->nonCorruptedType = TimestampType::REFERENCED;
154 }
155
156 /**
157 if(sentTime.toMilliSecondsSinceEpoch() < 946688400000){
158 // we assume the timestamp does not make sense if it is older than the year 2000
159 if(arrivedTime.toMilliSecondsSinceEpoch() < 946688400000){
160 if(referencedTime.toMilliSecondsSinceEpoch() < 946688400000){
161 ARMARX_WARNING << "LTM recording does not work correctly, as frequency filter is used, but "
162 << "time sent, arrived and referenced are all corrupted. \n"
163 << "Accepting all snapshots.";
164 this->nonCorruptedType = TimestampType::ALL_CORRUPTED;
165 } else {
166 if(!corruptedWarningGiven){ //only print this warning once
167 ARMARX_INFO << "Time sent and arrived for snapshot corrupted, using time referenced for filtering";
168 corruptedWarningGiven = true;
169 }
170 //use time referenced from now on
171 this->nonCorruptedType = TimestampType::REFERENCED;
172 }
173 } else {
174 if(!corruptedWarningGiven){ //only print this warning once
175 ARMARX_INFO << "Time sent for snapshot corrupted, using time arrived for filtering";
176 corruptedWarningGiven = true;
177 }
178 //use time arrived from now on
179 this->nonCorruptedType = TimestampType::ARRIVED;
180 }
181 } else {
182 this->nonCorruptedType = TimestampType::SENT;
183 }
184 */
185 }
186
187 int
188 SnapshotFrequencyFilter::getNonCorruptedTimestamp(const armem::wm::EntityInstance& i,
189 bool simulatedVersion)
190 {
191 switch (this->nonCorruptedType)
192 {
194 if (!simulatedVersion)
195 {
197 }
198 else
199 {
200 [[fallthrough]];
201 }
206 default:
207 return -1;
208 }
209 }
210
211 void
212 SnapshotFrequencyFilter::configure(const nlohmann::json& json)
213 {
214 std::lock_guard<std::mutex> lock(filterMutex_);
215 if (json.find(PARAM_WAITING_TIME) != json.end())
216 {
217 this->maxDifference = json.at(PARAM_WAITING_TIME);
218 ARMARX_INFO << VAROUT(maxDifference);
219 stats.additional_info = "Max Difference in ms: " + std::to_string(this->maxDifference);
220 }
221 stats.start_time = std::chrono::high_resolution_clock::now();
222 stats.number_of_compared_objects = 1;
223 stats.similarity_type =
224 aron::similarity::NDArraySimilarity::Type::NONE; //information for statistics export
225 }
226
229 {
230 std::lock_guard<std::mutex> lock(filterMutex_);
231 stats.end_time = std::chrono::high_resolution_clock::now();
232 return this->stats;
233 }
234
235 std::string
237 {
238 return this->NAME;
239 }
240} // namespace armarx::armem::server::ltm::processor::filter
#define VAROUT(x)
MemoryID getEntityID() const
Definition MemoryID.cpp:310
bool forEachInstance(InstanceFunctionT &&func)
EntityInstanceT & getInstance(int index)
Get the given instance.
std::vector< int > getInstanceIndices() const
struct armarx::armem::server::ltm::processor::SnapshotFilter::FilterStatistics stats
std::mutex filterMutex_
Mutex for thread-safe access to filter state (stats and derived class state) Derived classes should l...
Definition Filter.h:61
virtual bool accept(const armem::wm::EntitySnapshot &e, bool simulatedVersion) override
Client-side working entity instance.
Client-side working memory entity snapshot.
std::int64_t toMilliSecondsSinceEpoch() const
Definition DateTime.cpp:93
#define ARMARX_INFO
The normal logging level.
Definition Logging.h:181
#define ARMARX_DEBUG
The logging level for output that is only interesting while debugging.
Definition Logging.h:184
#define ARMARX_WARNING
The logging level for unexpected behaviour, but not a serious problem.
Definition Logging.h:193
Time arrivedTime
Time when this value has arrived at the memory.
Time referencedTime
Time this instance refers to.
Time sentTime
Time when this value was sent to the memory.