29 #include <Ice/ObjectAdapter.h>
30 #include <IceUtil/UUID.h>
33 #include <ArmarXCore/interface/core/Log.h>
38 #include <mongo/client/dbclient.h>
39 #include <mongo/client/dbclientinterface.h>
48 return "CommonStorage";
58 mongo::client::initialize();
60 accessGridFSFilesMutex.reset(
new std::mutex());
61 if (getProperty<std::string>(
"MongoHost").isSet())
63 hostAndPort = getProperty<std::string>(
"MongoHost").getValue();
69 else if (getenv(
"MONGODB_HOST"))
71 hostAndPort = getenv(
"MONGODB_HOST");
75 hostAndPort = getProperty<std::string>(
"MongoHost").getValue();
78 if (hostAndPort.find(
':') == std::string::npos)
84 else if (getenv(
"MONGODB_PORT"))
86 hostAndPort +=
":" + std::string(getenv(
"MONGODB_PORT"));
93 useAuth = getProperty<bool>(
"MongoAuth").getValue();
94 userName = getProperty<std::string>(
"MongoUser").getValue();
96 connectionCheckerTask =
111 createPasswordDigest(userName, getProperty<std::string>(
"MongoPassword").getValue());
112 connectionCheckerTask->start();
119 connectionCheckerTask->stop();
121 std::unique_lock l(openedDatabasesMutex);
122 openedDatabases.clear();
125 std::unique_lock l(openedCollectionsMutex);
126 openedCollections.clear();
129 std::unique_lock l(openedFilesMutex);
133 std::unique_lock l(openedGridFSMutex);
134 openedGridFS.clear();
137 mongo::client::shutdown();
148 CommonStorage::connect()
152 openedGridFS.clear();
153 conn.reset(
new mongo::DBClientConnection);
154 conn->connect(hostAndPort);
156 catch (
const mongo::DBException& e)
158 ARMARX_ERROR <<
"Can't connect to MongoDB: " << e.what();
161 catch (
const std::exception& e)
163 ARMARX_ERROR <<
"Can't connect to MongoDB: " << e.what();
167 ARMARX_INFO <<
"Connected to Mongo: host = " << hostAndPort;
173 CommonStorage::checkConnection()
177 mongo::DBClientConnection conn;
178 conn.connect(hostAndPort);
179 conn.getDatabaseNames();
188 CommonStorage::ConnectionWrapper
189 CommonStorage::getConnection()
191 std::shared_ptr<mongo::DBClientConnection> conn;
193 std::lock_guard<std::mutex> guard{serverSettingsMutex};
200 conn.reset(
new mongo::DBClientConnection());
201 conn->connect(hostAndPort);
203 catch (std::exception& e)
205 ARMARX_ERROR <<
"Can't connect to MongoDB: " << e.what();
213 conn = std::move(pool.front());
217 return {*
this, std::move(conn)};
221 CommonStorage::createPasswordDigest(
const std::string& username,
const std::string& password)
225 std::string digest = getConnection().conn().createPasswordDigest(username, password);
230 CommonStorage::extractDBNameFromNS(
const std::string& ns)
232 const size_t found = ns.find_first_of(
'.');
233 return (found != std::string::npos) ? ns.substr(0, found) :
"";
237 CommonStorage::forceAuthenticate(
const std::string& dbName,
238 const std::string& userName,
239 const std::string& password)
243 result = getConnection().conn().auth(dbName, userName, pwdDigest, errmsg,
false);
247 authDBs.insert(dbName);
254 CommonStorage::authenticateNS(
const std::string& ns)
256 const std::string dbName = extractDBNameFromNS(ns);
261 throw DBNotSpecifiedException(
"Database name not specified for collection: " + ns +
262 ". Please use <dbName>.<collectionName> format!");
265 return authenticateDB(dbName);
269 CommonStorage::authenticateDB(
const std::string& dbName)
276 ARMARX_INFO <<
"Try to Auth: db = " << dbName <<
", user = " << userName
277 <<
", pwd = " << pwdDigest << std::endl;
279 if (authDBs.count(dbName))
285 return forceAuthenticate(dbName, userName, pwdDigest);
291 const std::string& userName,
292 const std::string& password,
295 pwdDigest = createPasswordDigest(userName, password);
296 return forceAuthenticate(dbName, userName, this->pwdDigest);
308 authenticateDB(
"admin");
311 std::list<std::string> result = getConnection().conn().getDatabaseNames();
312 return NameList(result.begin(), result.end());
319 std::list<std::string> result = getConnection().conn().getCollectionNames(dbName);
320 return NameList(result.begin(), result.end());
331 const ::std::string& userName,
332 const ::std::string& password,
333 const ::Ice::Current&)
335 this->hostAndPort = hostAndPort;
336 this->userName = userName;
337 this->pwdDigest = createPasswordDigest(userName, password);
338 this->useAuth = !userName.empty();
346 if (authenticateDB(dbName))
351 std::unique_lock l(openedDatabasesMutex);
352 openedDatabases[dbId] = db;
354 Ice::ObjectPrx node =
c.adapter->add(db, dbId);
355 return DatabaseInterfacePrx::uncheckedCast(node);
359 throw MongoAuthenticationException(
"Mongo authentication failed (user = " + userName +
360 ", database = " + dbName +
")");
367 std::unique_lock l(openedDatabasesMutex);
368 openedDatabases.erase(db->ice_getIdentity());
371 CollectionInterfacePrx
375 if (authenticateNS(collectionNS))
380 std::unique_lock l(openedCollectionsMutex);
381 openedCollections[collId] = coll;
383 Ice::ObjectPrx node =
c.adapter->add(coll, collId);
384 return CollectionInterfacePrx::uncheckedCast(node);
388 throw MongoAuthenticationException(
"Mongo authentication failed (user = " + userName +
389 ", collection = " + collectionNS +
")");
396 std::unique_lock l(openedCollectionsMutex);
397 openedCollections.erase(coll->ice_getIdentity());
404 if (authenticateNS(collectionNS))
406 getConnection().conn().dropCollection(collectionNS);
410 throw MongoAuthenticationException(
"Mongo authentication failed (user = " + userName +
411 ", collection = " + collectionNS +
")");
416 CommonStorage::getDocumentId(
const mongo::BSONObj& doc)
422 CommonStorage::getDocumentField(
const mongo::BSONObj& doc,
const std::string& fieldName)
424 if (doc.hasField(fieldName.c_str()))
426 const mongo::BSONElement field = doc[fieldName.c_str()];
428 switch (field.type())
431 return field.OID().toString();
437 return field.toString(
false);
447 CommonStorage::getGridFS(
const std::string& dbName)
449 std::unique_lock l(openedGridFSMutex);
450 std::map<std::string, GridFSPtr>::const_iterator it = openedGridFS.find(dbName);
452 if (it != openedGridFS.end())
458 GridFSPtr gridFS(
new mongo::GridFS(*conn, dbName));
459 openedGridFS[dbName] = gridFS;
466 const std::string& fileName,
467 const ::std::string& gridFSName ,
468 const Ice::Current&
c)
474 if (!std::filesystem::exists(fileName))
476 throw FileNotFoundException(
"File could not be found: " + fileName, fileName);
478 std::unique_lock l(*accessGridFSFilesMutex);
479 mongo::GridFile oldFile =
480 gridfs->findFileByName((gridFSName.empty()) ? fileName : gridFSName);
481 const mongo::BSONObj newFileDoc = gridfs->storeFile(fileName, gridFSName);
482 mongo::GridFile newFile =
483 gridfs->findFileByName((gridFSName.empty()) ? fileName : gridFSName);
486 if (keepOldFileIfEqual(oldFile, newFile, newFileDoc, dbName, oldId))
491 return getDocumentId(newFileDoc);
497 const std::string& bufferToStore,
498 const std::string& gridFSName,
499 const Ice::Current&
c)
503 if (gridFSName.empty())
505 throw armarx::LocalException(
"gridFSName must not be empty");
507 std::unique_lock l(*accessGridFSFilesMutex);
508 mongo::GridFile oldFile = gridfs->findFileByName(gridFSName);
509 const mongo::BSONObj newFileDoc =
510 gridfs->storeFile(bufferToStore.c_str(), bufferToStore.size(), gridFSName);
511 mongo::GridFile newFile = gridfs->findFileByName(gridFSName);
514 if (keepOldFileIfEqual(oldFile, newFile, newFileDoc, dbName, oldId))
518 return getDocumentId(newFileDoc);
524 const memoryx::Blob& bufferToStore,
525 const std::string& gridFSName,
526 const Ice::Current&
c)
530 if (gridFSName.empty())
532 throw armarx::LocalException(
"gridFSName must not be empty");
534 std::unique_lock l(*accessGridFSFilesMutex);
535 mongo::GridFile oldFile = gridfs->findFileByName(gridFSName);
537 const mongo::BSONObj newFileDoc = gridfs->storeFile(
538 reinterpret_cast<const char*
>(&bufferToStore[0]), bufferToStore.size(), gridFSName);
539 mongo::GridFile newFile = gridfs->findFileByName(gridFSName);
542 if (keepOldFileIfEqual(oldFile, newFile, newFileDoc, dbName, oldId))
547 return getDocumentId(newFileDoc);
552 CommonStorage::keepOldFileIfEqual(
const mongo::GridFile& oldFile,
553 const mongo::GridFile newFile,
554 const mongo::BSONObj& newFileDoc,
555 const std::string dbName,
558 if (oldFile.exists())
560 std::string docId = (oldFile.getFileField(
"_id").OID().toString());
562 if (!docId.empty() && newFile.getMD5() == oldFile.getMD5())
574 CommonStorage::getFileByQuery(
const std::string& dbName,
const mongo::BSONObj& query)
579 std::unique_lock l(*accessGridFSFilesMutex);
582 mongo::GridFile gf = gfs->findFile(query);
604 CommonStorage::createFileProxy(mongo::GridFile gridFile,
const Ice::Current&
c)
606 if (gridFile.exists())
608 GridFileWrapperPtr fileWrapper =
new GridFileWrapper(gridFile, accessGridFSFilesMutex);
611 std::unique_lock l(openedFilesMutex);
612 openedFiles[fileIceId] = fileWrapper;
614 Ice::ObjectPrx node =
c.adapter->add(fileWrapper, fileIceId);
615 return GridFileInterfacePrx::uncheckedCast(node);
619 ARMARX_WARNING <<
"Grid file does not exit " << gridFile.getFilename();
620 return GridFileInterfacePrx();
626 const std::string& fileId,
627 const Ice::Current&
c)
629 const mongo::BSONObj query = BSON(
MONGO_ID_FIELD << mongo::OID(fileId));
630 mongo::GridFile gridFile = getFileByQuery(dbName, query);
631 return createFileProxy(gridFile,
c);
636 const std::string& gridFSName,
637 const Ice::Current&
c)
639 const mongo::BSONObj query = BSON(
"filename" << gridFSName);
640 mongo::GridFile gridFile = getFileByQuery(dbName, query);
641 return createFileProxy(gridFile,
c);
649 std::unique_lock l(openedFilesMutex);
650 c.adapter->remove(fileProxy->ice_getIdentity());
651 openedFiles.erase(fileProxy->ice_getIdentity());
656 CommonStorage::readTextFile(mongo::GridFile& gridFile, std::string& buffer)
658 if (gridFile.exists())
660 std::ostringstream ss;
672 CommonStorage::readBinaryFile(mongo::GridFile& gridFile, memoryx::Blob& buffer)
674 if (gridFile.exists())
676 buffer.reserve(gridFile.getContentLength());
678 sb.pubsetbuf((
char*)&buffer[0], buffer.capacity());
679 std::iostream os(&sb);
692 const std::string& fileId,
694 const Ice::Current&
c)
696 const mongo::BSONObj query = BSON(
MONGO_ID_FIELD << mongo::OID(fileId));
697 mongo::GridFile gridFile = getFileByQuery(dbName, query);
698 return readTextFile(gridFile, buffer);
703 const std::string& fileId,
704 memoryx::Blob& buffer,
705 const Ice::Current&
c)
707 const mongo::BSONObj query = BSON(
MONGO_ID_FIELD << mongo::OID(fileId));
708 mongo::GridFile gridFile = getFileByQuery(dbName, query);
709 return readBinaryFile(gridFile, buffer);
714 const std::string& gridFSName,
716 const Ice::Current&
c)
718 const mongo::BSONObj query = BSON(
"filename" << gridFSName);
719 mongo::GridFile gridFile = getFileByQuery(dbName, query);
720 return readTextFile(gridFile, buffer);
725 const std::string& gridFSName,
726 memoryx::Blob& buffer,
727 const Ice::Current&
c)
729 const mongo::BSONObj query = BSON(
"filename" << gridFSName);
730 mongo::GridFile gridFile = getFileByQuery(dbName, query);
731 return readBinaryFile(gridFile, buffer);
737 std::string GridFsFilesNamespace = dbName +
".fs.files";
738 std::string GridFsChunksNamespace = dbName +
".fs.chunks";
742 auto conn = getConnection();
743 std::unique_ptr<mongo::DBClientCursor>
files =
744 conn.conn().query(GridFsFilesNamespace, fileQuery);
746 while (
files->more())
748 mongo::BSONObj file =
files->next();
749 mongo::BSONElement
id = file[
"_id"];
751 conn.conn().remove(GridFsFilesNamespace, BSON(
"_id" <<
id));
752 conn.conn().remove(GridFsChunksNamespace, BSON(
"files_id" <<
id));
755 catch (mongo::DBException& e)
757 ARMARX_ERROR <<
"Error removing file by query " << fileQuery <<
": " << e.what();
765 auto gridfs = getGridFS(dbName);
766 std::unique_ptr<mongo::DBClientCursor>
list = gridfs->list();
770 mongo::BSONObj query =
list->nextSafe();
771 auto file = getFileByQuery(dbName, query);
773 result.push_back(file.getFilename());
783 auto gridfs = getGridFS(dbName);
785 std::unique_lock l(*accessGridFSFilesMutex);
787 std::unique_ptr<mongo::DBClientCursor>
list = gridfs->list();
791 mongo::BSONObj query =
list->nextSafe();
792 result.push_back(getDocumentId(query));
800 const std::string& fileId,
803 const mongo::BSONObj fileQuery = BSON(
MONGO_ID_FIELD << mongo::OID(fileId));
804 auto gridfs = getGridFS(dbName);
806 std::unique_lock l(*accessGridFSFilesMutex);
807 mongo::GridFile gridFile = gridfs->findFile(fileQuery);
809 if (!gridFile.exists())
821 const std::string& gridFSName,
822 const Ice::Current&
c)
824 auto gridfs = getGridFS(dbName);
826 std::unique_lock l(*accessGridFSFilesMutex);
827 gridfs->removeFile(gridFSName);
839 count = getConnection().conn().count(ns);
841 catch (mongo::DBException& e)
843 ARMARX_ERROR <<
"Error on db.count(" << ns <<
"): " << e.what();
859 catch (mongo::AssertionException& e)
861 ARMARX_ERROR <<
"findByMongoId failed for id " <<
id <<
": " << e.what()
863 throw InvalidMongoIdException(e.what(),
id);
867 DBStorableDataList result = findByMongoQuery(ns, query,
true);
869 return result.size() > 0 ? result[0] : DBStorableData();
874 const std::string& fieldName,
875 const ::std::string& fieldValue)
877 const mongo::Query query = mongo::Query(BSON(fieldName << fieldValue));
878 return findByMongoQuery(ns, query,
false);
883 const std::string& fieldName,
884 const NameList& fieldValueList)
886 mongo::BSONArrayBuilder b;
888 for (
const auto& it : fieldValueList)
893 const mongo::Query query(BSON(fieldName << mongo::Query(BSON(
"$in" << b.arr()))));
894 return findByMongoQuery(ns, query,
false);
899 const std::string& fieldName,
900 const ::std::string& fieldValue)
902 const mongo::Query query = mongo::Query(BSON(fieldName << fieldValue));
903 const DBStorableDataList result = findByMongoQuery(ns, query,
false);
904 return result.size() > 0 ? result[0] : DBStorableData();
909 const std::string& query,
910 const std::string& where)
912 mongo::Query
q(query);
919 return findByMongoQuery(ns,
q,
false);
925 const DBStorableDataList result = findByMongoQuery(ns, mongo::Query(query),
false);
926 return result.size() > 0 ? result[0] : DBStorableData();
932 const mongo::Query query;
933 return findByMongoQuery(ns, query,
false);
939 auto conn = getConnection();
940 mongo::BSONObj fetch;
941 std::size_t dotPosition = ns.find_first_of(
".");
942 std::string databaseName = ns.substr(0, dotPosition);
943 std::string collectionName = ns.substr(dotPosition + 1, ns.size());
944 conn.conn().runCommand(
945 databaseName, BSON(
"distinct" << collectionName <<
"key" << fieldName), fetch);
946 mongo::BSONObj fetchedValues = fetch.getObjectField(
"values");
947 DBStorableData result;
948 result.JSON = fetchedValues.jsonString();
955 const mongo::Query query;
956 return (EntityIdList)findFieldByMongoQuery(ns, query,
MONGO_ID_FIELD);
962 const mongo::Query query;
963 return findFieldByMongoQuery(ns, query, fieldName);
967 CommonStorage::findFieldByMongoQuery(
const std::string& ns,
968 const mongo::Query& query,
969 const std::string& fieldName)
974 auto conn = getConnection();
975 boost::scoped_ptr<mongo::DBClientCursor> cursor(conn.conn().query(ns, query));
977 while (cursor->more())
979 result.push_back(getDocumentField(cursor->next(), fieldName));
982 catch (mongo::DBException& e)
984 ARMARX_ERROR <<
"Error fetching field values by query: " << e.what();
991 CommonStorage::findByMongoQuery(
const std::string& ns,
992 const mongo::Query& query,
995 DBStorableDataList result;
998 auto conn = getConnection();
999 boost::scoped_ptr<mongo::DBClientCursor> cursor(
1000 conn.conn().query(ns, query, justOne ? 1 : 0));
1002 while (cursor->more())
1005 obj.JSON = cursor->nextSafe().jsonString();
1006 result.push_back(obj);
1009 catch (mongo::DBException& e)
1011 ARMARX_ERROR <<
"Error fetching objects by query: " << e.what();
1019 const DBStorableData& obj,
1022 std::string result =
"";
1026 mongo::BSONObj bsonObj = mongo::fromjson(obj.JSON);
1031 result = getDocumentId(bsonObj);
1036 mongo::BSONObjBuilder builder;
1037 mongo::OID newID = mongo::OID::gen();
1039 builder.appendElements(bsonObj);
1040 bsonObj = builder.obj();
1041 result = newID.toString();
1047 getConnection().conn().update(ns, query, bsonObj,
true);
1051 getConnection().conn().insert(ns, bsonObj);
1054 catch (mongo::DBException& e)
1056 ARMARX_ERROR <<
"Error inserting object: " << e.what();
1062 std::vector<std::string>
1065 std::vector<std::string> result(objectList.size(),
"");
1069 std::vector<mongo::BSONObj> bsonObjects;
1070 bsonObjects.reserve(objectList.size());
1072 for (
size_t i = 0; i < objectList.size(); i++)
1074 bsonObjects.push_back(mongo::fromjson(objectList[i].JSON));
1075 result[i] = getDocumentId(bsonObjects[i]);
1081 mongo::OID newID = mongo::OID::gen();
1082 result[i] = newID.toString();
1084 mongo::BSONObjBuilder builder;
1086 builder.appendElements(bsonObjects[i]);
1087 bsonObjects[i] = builder.obj();
1091 getConnection().conn().insert(ns, bsonObjects);
1093 catch (mongo::DBException& e)
1095 ARMARX_ERROR <<
"Error inserting object: " << e.what();
1103 const DBStorableData& obj,
1104 const std::string& keyField,
1107 bool result =
false;
1111 const mongo::BSONObj mongoObj = mongo::fromjson(obj.JSON);
1113 if (!mongoObj.hasField(keyField.c_str()))
1115 throw FieldNotFoundException(
"field not found in supplied JSON object", keyField);
1118 mongo::Query query(BSON(keyField << mongoObj[keyField]));
1120 getConnection().conn().update(ns, query, mongoObj, upsert);
1124 catch (
const mongo::DBException& e)
1134 const std::string& query,
1135 const mongo::BSONObj& obj)
1137 bool result =
false;
1141 conn->update(ns, query, obj);
1144 catch (mongo::DBException& e)
1162 catch (mongo::AssertionException& e)
1164 throw InvalidMongoIdException(e.what(),
id);
1168 return removeByMongoQuery(ns, query);
1173 const std::string& fieldName,
1174 const std::string& fieldValue)
1176 const mongo::Query query(BSON(fieldName << fieldValue));
1177 return removeByMongoQuery(ns, query);
1183 return removeByMongoQuery(ns, mongo::Query(query));
1189 return removeByMongoQuery(ns, mongo::Query());
1193 CommonStorage::removeByMongoQuery(
const std::string& ns,
const mongo::Query& query)
1199 getConnection().conn().remove(ns, query);
1202 catch (mongo::DBException& e)
1204 ARMARX_ERROR <<
"Error deleting objects by query: " << e.what();
1217 const mongo::BSONObj keys = BSON(fieldName << 1);
1221 #ifdef MONGOCLIENT_VERSION
1222 getConnection().conn().createIndex(ns, keys);
1224 getConnection().conn().ensureIndex(ns, keys, unique);
1229 catch (mongo::DBException& e)
1237 CommonStorage::ConnectionWrapper::ConnectionWrapper(
1239 std::shared_ptr<mongo::DBClientConnection> connPtr) :
1240 connPtr{std::move(connPtr)}, hostAndPort{storage.hostAndPort}, storage{&storage}
1244 CommonStorage::ConnectionWrapper::~ConnectionWrapper()
1246 std::lock_guard<std::mutex> guard{storage->serverSettingsMutex};
1247 if (hostAndPort == storage->hostAndPort)
1249 storage->pool.emplace_back(std::move(connPtr));
1253 mongo::DBClientConnection&
1254 CommonStorage::ConnectionWrapper::conn()