28 #include <ArmarXCore/interface/core/Log.h>
31 #include <mongo/client/dbclient.h>
32 #include <mongo/client/dbclientinterface.h>
34 #include <IceUtil/UUID.h>
35 #include <Ice/ObjectAdapter.h>
51 return "CommonStorage";
60 mongo::client::initialize();
62 accessGridFSFilesMutex.reset(
new std::mutex());
63 if (getProperty<std::string>(
"MongoHost").isSet())
65 hostAndPort = getProperty<std::string>(
"MongoHost").getValue();
71 else if (getenv(
"MONGODB_HOST"))
73 hostAndPort = getenv(
"MONGODB_HOST");
77 hostAndPort = getProperty<std::string>(
"MongoHost").getValue();
80 if (hostAndPort.find(
':') == std::string::npos)
86 else if (getenv(
"MONGODB_PORT"))
88 hostAndPort +=
":" + std::string(getenv(
"MONGODB_PORT"));
95 useAuth = getProperty<bool>(
"MongoAuth").getValue();
96 userName = getProperty<std::string>(
"MongoUser").getValue();
110 pwdDigest = createPasswordDigest(userName, getProperty<std::string>(
"MongoPassword").getValue());
111 connectionCheckerTask->start();
117 connectionCheckerTask->stop();
119 std::unique_lock l(openedDatabasesMutex);
120 openedDatabases.clear();
123 std::unique_lock l(openedCollectionsMutex);
124 openedCollections.clear();
127 std::unique_lock l(openedFilesMutex);
131 std::unique_lock l(openedGridFSMutex);
132 openedGridFS.clear();
135 mongo::client::shutdown();
143 bool CommonStorage::connect()
147 openedGridFS.clear();
148 conn.reset(
new mongo::DBClientConnection);
149 conn->connect(hostAndPort);
151 catch (
const mongo::DBException& e)
153 ARMARX_ERROR <<
"Can't connect to MongoDB: " << e.what();
156 catch (
const std::exception& e)
158 ARMARX_ERROR <<
"Can't connect to MongoDB: " << e.what();
162 ARMARX_INFO <<
"Connected to Mongo: host = " << hostAndPort;
168 void CommonStorage::checkConnection()
172 mongo::DBClientConnection conn;
173 conn.connect(hostAndPort);
174 conn.getDatabaseNames();
183 CommonStorage::ConnectionWrapper CommonStorage::getConnection()
185 std::shared_ptr<mongo::DBClientConnection> conn;
187 std::lock_guard<std::mutex> guard {serverSettingsMutex};
194 conn.reset(
new mongo::DBClientConnection());
195 conn->connect(hostAndPort);
197 catch (std::exception& e)
199 ARMARX_ERROR <<
"Can't connect to MongoDB: " << e.what();
207 conn = std::move(pool.front());
211 return {*
this, std::move(conn)};
214 std::string CommonStorage::createPasswordDigest(
const std::string& username,
const std::string& password)
218 std::string digest = getConnection().conn().createPasswordDigest(username, password);
222 std::string CommonStorage::extractDBNameFromNS(
const std::string& ns)
224 const size_t found = ns.find_first_of(
'.');
225 return (found != std::string::npos) ? ns.substr(0, found) :
"";
228 bool CommonStorage::forceAuthenticate(
const std::string& dbName,
229 const std::string& userName,
const std::string& password)
233 result = getConnection().conn().auth(dbName, userName, pwdDigest, errmsg,
false);
237 authDBs.insert(dbName);
243 bool CommonStorage::authenticateNS(
const std::string& ns)
245 const std::string dbName = extractDBNameFromNS(ns);
250 throw DBNotSpecifiedException(
"Database name not specified for collection: " + ns +
251 ". Please use <dbName>.<collectionName> format!");
254 return authenticateDB(dbName);
257 bool CommonStorage::authenticateDB(
const std::string& dbName)
265 ", user = " << userName <<
", pwd = " << pwdDigest << std::endl;
267 if (authDBs.count(dbName))
273 return forceAuthenticate(dbName, userName, pwdDigest);
278 const std::string& userName,
const std::string& password,
281 pwdDigest = createPasswordDigest(userName, password);
282 return forceAuthenticate(dbName, userName, this->pwdDigest);
292 authenticateDB(
"admin");
295 std::list<std::string> result = getConnection().conn().getDatabaseNames();
296 return NameList(result.begin(), result.end());
302 std::list<std::string> result = getConnection().conn().getCollectionNames(dbName);
303 return NameList(result.begin(), result.end());
312 const ::Ice::Current&)
314 this->hostAndPort = hostAndPort;
315 this->userName = userName;
316 this->pwdDigest = createPasswordDigest(userName, password);
317 this->useAuth = !userName.empty();
324 if (authenticateDB(dbName))
329 std::unique_lock l(openedDatabasesMutex);
330 openedDatabases[dbId] = db;
332 Ice::ObjectPrx node =
c.adapter->add(db, dbId);
333 return DatabaseInterfacePrx::uncheckedCast(node);
337 throw MongoAuthenticationException(
"Mongo authentication failed (user = " + userName +
338 ", database = " + dbName +
")");
344 std::unique_lock l(openedDatabasesMutex);
345 openedDatabases.erase(db->ice_getIdentity());
351 if (authenticateNS(collectionNS))
356 std::unique_lock l(openedCollectionsMutex);
357 openedCollections[collId] = coll;
359 Ice::ObjectPrx node =
c.adapter->add(coll, collId);
360 return CollectionInterfacePrx::uncheckedCast(node);
364 throw MongoAuthenticationException(
"Mongo authentication failed (user = " + userName +
365 ", collection = " + collectionNS +
")");
371 std::unique_lock l(openedCollectionsMutex);
372 openedCollections.erase(coll->ice_getIdentity());
378 if (authenticateNS(collectionNS))
380 getConnection().conn().dropCollection(collectionNS);
384 throw MongoAuthenticationException(
"Mongo authentication failed (user = " + userName +
385 ", collection = " + collectionNS +
")");
389 std::string CommonStorage::getDocumentId(
const mongo::BSONObj& doc)
394 std::string CommonStorage::getDocumentField(
const mongo::BSONObj& doc,
const std::string& fieldName)
396 if (doc.hasField(fieldName.c_str()))
398 const mongo::BSONElement field = doc[fieldName.c_str()];
400 switch (field.type())
403 return field.OID().toString();
409 return field.toString(
false);
418 GridFSPtr CommonStorage::getGridFS(
const std::string& dbName)
420 std::unique_lock l(openedGridFSMutex);
421 std::map<std::string, GridFSPtr>::const_iterator it = openedGridFS.find(dbName);
423 if (it != openedGridFS.end())
429 GridFSPtr gridFS(
new mongo::GridFS(*conn, dbName));
430 openedGridFS[dbName] = gridFS;
436 const ::std::string& gridFSName ,
const Ice::Current&
c)
442 if (!std::filesystem::exists(fileName))
444 throw FileNotFoundException(
"File could not be found: " + fileName, fileName);
446 std::unique_lock l(*accessGridFSFilesMutex);
447 mongo::GridFile oldFile = gridfs->findFileByName((gridFSName.empty()) ? fileName : gridFSName);
448 const mongo::BSONObj newFileDoc = gridfs->storeFile(fileName, gridFSName);
449 mongo::GridFile newFile = gridfs->findFileByName((gridFSName.empty()) ? fileName : gridFSName);
452 if (keepOldFileIfEqual(oldFile, newFile, newFileDoc, dbName, oldId))
457 return getDocumentId(newFileDoc);
462 const std::string& bufferToStore,
const std::string& gridFSName,
463 const Ice::Current&
c)
467 if (gridFSName.empty())
469 throw armarx::LocalException(
"gridFSName must not be empty");
471 std::unique_lock l(*accessGridFSFilesMutex);
472 mongo::GridFile oldFile = gridfs->findFileByName(gridFSName);
473 const mongo::BSONObj newFileDoc = gridfs->storeFile(bufferToStore.c_str(), bufferToStore.size(), gridFSName);
474 mongo::GridFile newFile = gridfs->findFileByName(gridFSName);
477 if (keepOldFileIfEqual(oldFile, newFile, newFileDoc, dbName, oldId))
481 return getDocumentId(newFileDoc);
486 const memoryx::Blob& bufferToStore,
const std::string& gridFSName,
487 const Ice::Current&
c)
491 if (gridFSName.empty())
493 throw armarx::LocalException(
"gridFSName must not be empty");
495 std::unique_lock l(*accessGridFSFilesMutex);
496 mongo::GridFile oldFile = gridfs->findFileByName(gridFSName);
498 const mongo::BSONObj newFileDoc = gridfs->storeFile(
reinterpret_cast<const char*
>(&bufferToStore[0]), bufferToStore.size(), gridFSName);
499 mongo::GridFile newFile = gridfs->findFileByName(gridFSName);
502 if (keepOldFileIfEqual(oldFile, newFile, newFileDoc, dbName, oldId))
507 return getDocumentId(newFileDoc);
511 bool CommonStorage::keepOldFileIfEqual(
const mongo::GridFile& oldFile,
const mongo::GridFile newFile,
const mongo::BSONObj& newFileDoc,
const std::string dbName, std::string& oldId)
513 if (oldFile.exists())
515 std::string docId = (oldFile.getFileField(
"_id").OID().toString());
517 if (!docId.empty() && newFile.getMD5() == oldFile.getMD5())
528 mongo::GridFile CommonStorage::getFileByQuery(
const std::string& dbName,
529 const mongo::BSONObj& query)
534 std::unique_lock l(*accessGridFSFilesMutex);
537 mongo::GridFile gf = gfs->findFile(query);
557 GridFileInterfacePrx CommonStorage::createFileProxy(mongo::GridFile gridFile,
const Ice::Current&
c)
559 if (gridFile.exists())
561 GridFileWrapperPtr fileWrapper =
new GridFileWrapper(gridFile, accessGridFSFilesMutex);
564 std::unique_lock l(openedFilesMutex);
565 openedFiles[fileIceId] = fileWrapper;
567 Ice::ObjectPrx node =
c.adapter->add(fileWrapper, fileIceId);
568 return GridFileInterfacePrx::uncheckedCast(node);
572 ARMARX_WARNING <<
"Grid file does not exit " << gridFile.getFilename();
573 return GridFileInterfacePrx();
578 const std::string& fileId,
const Ice::Current&
c)
580 const mongo::BSONObj query = BSON(
MONGO_ID_FIELD << mongo::OID(fileId));
581 mongo::GridFile gridFile = getFileByQuery(dbName, query);
582 return createFileProxy(gridFile,
c);
586 const std::string& gridFSName,
const Ice::Current&
c)
588 const mongo::BSONObj query = BSON(
"filename" << gridFSName);
589 mongo::GridFile gridFile = getFileByQuery(dbName, query);
590 return createFileProxy(gridFile,
c);
594 const Ice::Current&
c)
598 std::unique_lock l(openedFilesMutex);
599 c.adapter->remove(fileProxy->ice_getIdentity());
600 openedFiles.erase(fileProxy->ice_getIdentity());
604 bool CommonStorage::readTextFile(mongo::GridFile& gridFile,
607 if (gridFile.exists())
609 std::ostringstream ss;
620 bool CommonStorage::readBinaryFile(mongo::GridFile& gridFile,
621 memoryx::Blob& buffer)
623 if (gridFile.exists())
625 buffer.reserve(gridFile.getContentLength());
627 sb.pubsetbuf((
char*) &buffer[0], buffer.capacity());
628 std::iostream os(&sb);
641 const std::string& fileId, std::string& buffer,
const Ice::Current&
c)
643 const mongo::BSONObj query = BSON(
MONGO_ID_FIELD << mongo::OID(fileId));
644 mongo::GridFile gridFile = getFileByQuery(dbName, query);
645 return readTextFile(gridFile, buffer);
649 const std::string& fileId, memoryx::Blob& buffer,
650 const Ice::Current&
c)
652 const mongo::BSONObj query = BSON(
MONGO_ID_FIELD << mongo::OID(fileId));
653 mongo::GridFile gridFile = getFileByQuery(dbName, query);
654 return readBinaryFile(gridFile, buffer);
658 const std::string& gridFSName, std::string& buffer,
659 const Ice::Current&
c)
661 const mongo::BSONObj query = BSON(
"filename" << gridFSName);
662 mongo::GridFile gridFile = getFileByQuery(dbName, query);
663 return readTextFile(gridFile, buffer);
667 const std::string& gridFSName, memoryx::Blob& buffer,
668 const Ice::Current&
c)
670 const mongo::BSONObj query = BSON(
"filename" << gridFSName);
671 mongo::GridFile gridFile = getFileByQuery(dbName, query);
672 return readBinaryFile(gridFile, buffer);
678 std::string GridFsFilesNamespace = dbName +
".fs.files";
679 std::string GridFsChunksNamespace = dbName +
".fs.chunks";
683 auto conn = getConnection();
684 std::unique_ptr<mongo::DBClientCursor>
files = conn.conn().query(GridFsFilesNamespace, fileQuery);
686 while (
files->more())
688 mongo::BSONObj file =
files->next();
689 mongo::BSONElement
id = file[
"_id"];
691 conn.conn().remove(GridFsFilesNamespace, BSON(
"_id" <<
id));
692 conn.conn().remove(GridFsChunksNamespace, BSON(
"files_id" <<
id));
695 catch (mongo::DBException& e)
697 ARMARX_ERROR <<
"Error removing file by query " << fileQuery <<
": " << e.what();
704 auto gridfs = getGridFS(dbName);
705 std::unique_ptr<mongo::DBClientCursor>
list = gridfs->list();
709 mongo::BSONObj query =
list->nextSafe();
710 auto file = getFileByQuery(dbName, query);
712 result.push_back(file.getFilename());
721 auto gridfs = getGridFS(dbName);
723 std::unique_lock l(*accessGridFSFilesMutex);
725 std::unique_ptr<mongo::DBClientCursor>
list = gridfs->list();
729 mongo::BSONObj query =
list->nextSafe();
730 result.push_back(getDocumentId(query));
738 const std::string& fileId,
const Ice::Current&)
740 const mongo::BSONObj fileQuery = BSON(
MONGO_ID_FIELD << mongo::OID(fileId));
741 auto gridfs = getGridFS(dbName);
743 std::unique_lock l(*accessGridFSFilesMutex);
744 mongo::GridFile gridFile = gridfs->findFile(fileQuery);
746 if (!gridFile.exists())
757 const std::string& gridFSName,
const Ice::Current&
c)
759 auto gridfs = getGridFS(dbName);
761 std::unique_lock l(*accessGridFSFilesMutex);
762 gridfs->removeFile(gridFSName);
774 count = getConnection().conn().count(ns);
776 catch (mongo::DBException& e)
778 ARMARX_ERROR <<
"Error on db.count(" << ns <<
"): " << e.what();
793 catch (mongo::AssertionException& e)
795 ARMARX_ERROR <<
"findByMongoId failed for id " <<
id <<
": " << e.what() <<
"\nNS: " << ns;
796 throw InvalidMongoIdException(e.what(),
id);
800 DBStorableDataList result = findByMongoQuery(ns, query,
true);
802 return result.size() > 0 ? result[0] : DBStorableData();
807 const mongo::Query query = mongo::Query(BSON(fieldName << fieldValue));
808 return findByMongoQuery(ns, query,
false);
813 mongo::BSONArrayBuilder b;
815 for (
const auto& it : fieldValueList)
820 const mongo::Query query(BSON(fieldName << mongo::Query(BSON(
"$in" << b.arr()))));
821 return findByMongoQuery(ns, query,
false);
826 const mongo::Query query = mongo::Query(BSON(fieldName << fieldValue));
827 const DBStorableDataList result = findByMongoQuery(ns, query,
false);
828 return result.size() > 0 ? result[0] : DBStorableData();
833 mongo::Query
q(query);
840 return findByMongoQuery(ns,
q,
false);
845 const DBStorableDataList result = findByMongoQuery(ns, mongo::Query(query),
false);
846 return result.size() > 0 ? result[0] : DBStorableData();
851 const mongo::Query query;
852 return findByMongoQuery(ns, query,
false);
857 auto conn = getConnection();
858 mongo::BSONObj fetch;
859 std::size_t dotPosition = ns.find_first_of(
".");
860 std::string databaseName = ns.substr(0, dotPosition);
861 std::string collectionName = ns.substr(dotPosition + 1, ns.size());
862 conn.conn().runCommand(databaseName, BSON(
"distinct" << collectionName <<
"key" << fieldName), fetch);
863 mongo::BSONObj fetchedValues = fetch.getObjectField(
"values");
864 DBStorableData result;
865 result.JSON = fetchedValues.jsonString();
871 const mongo::Query query;
872 return (EntityIdList) findFieldByMongoQuery(ns, query,
MONGO_ID_FIELD);
877 const mongo::Query query;
878 return findFieldByMongoQuery(ns, query, fieldName);
882 NameList CommonStorage::findFieldByMongoQuery(
const std::string& ns,
const mongo::Query& query,
const std::string& fieldName)
887 auto conn = getConnection();
888 boost::scoped_ptr<mongo::DBClientCursor> cursor(conn.conn().query(ns, query));
890 while (cursor->more())
892 result.push_back(getDocumentField(cursor->next(), fieldName));
895 catch (mongo::DBException& e)
897 ARMARX_ERROR <<
"Error fetching field values by query: " << e.what();
903 DBStorableDataList CommonStorage::findByMongoQuery(
const std::string& ns,
const mongo::Query& query,
906 DBStorableDataList result;
909 auto conn = getConnection();
910 boost::scoped_ptr<mongo::DBClientCursor> cursor(conn.conn().query(ns, query, justOne ? 1 : 0));
912 while (cursor->more())
915 obj.JSON = cursor->nextSafe().jsonString();
916 result.push_back(obj);
919 catch (mongo::DBException& e)
921 ARMARX_ERROR <<
"Error fetching objects by query: " << e.what();
930 std::string result =
"";
934 mongo::BSONObj bsonObj = mongo::fromjson(obj.JSON);
939 result = getDocumentId(bsonObj);
944 mongo::BSONObjBuilder builder;
945 mongo::OID newID = mongo::OID::gen();
947 builder.appendElements(bsonObj);
948 bsonObj = builder.obj();
949 result = newID.toString();
955 getConnection().conn().update(ns, query, bsonObj,
true);
959 getConnection().conn().insert(ns, bsonObj);
963 catch (mongo::DBException& e)
973 std::vector<std::string> result(objectList.size(),
"");
977 std::vector<mongo::BSONObj> bsonObjects;
978 bsonObjects.reserve(objectList.size());
980 for (
size_t i = 0; i < objectList.size(); i++)
982 bsonObjects.push_back(mongo::fromjson(objectList[i].JSON));
983 result[i] = getDocumentId(bsonObjects[i]);
989 mongo::OID newID = mongo::OID::gen();
990 result[i] = newID.toString();
992 mongo::BSONObjBuilder builder;
994 builder.appendElements(bsonObjects[i]);
995 bsonObjects[i] = builder.obj();
999 getConnection().conn().insert(ns, bsonObjects);
1001 catch (mongo::DBException& e)
1003 ARMARX_ERROR <<
"Error inserting object: " << e.what();
1012 bool result =
false;
1016 const mongo::BSONObj mongoObj = mongo::fromjson(obj.JSON);
1018 if (!mongoObj.hasField(keyField.c_str()))
1020 throw FieldNotFoundException(
"field not found in supplied JSON object", keyField);
1023 mongo::Query query(BSON(keyField << mongoObj[keyField]));
1025 getConnection().conn().update(ns, query, mongoObj, upsert);
1029 catch (
const mongo::DBException& e)
1039 bool result =
false;
1043 conn->update(ns, query, obj);
1046 catch (mongo::DBException& e)
1063 catch (mongo::AssertionException& e)
1065 throw InvalidMongoIdException(e.what(),
id);
1069 return removeByMongoQuery(ns, query);
1074 const mongo::Query query(BSON(fieldName << fieldValue));
1075 return removeByMongoQuery(ns, query);
1080 return removeByMongoQuery(ns, mongo::Query(query));
1085 return removeByMongoQuery(ns, mongo::Query());
1088 bool CommonStorage::removeByMongoQuery(
const std::string& ns,
const mongo::Query& query)
1094 getConnection().conn().remove(ns, query);
1097 catch (mongo::DBException& e)
1099 ARMARX_ERROR <<
"Error deleting objects by query: " << e.what();
1111 const mongo::BSONObj keys = BSON(fieldName << 1);
1115 #ifdef MONGOCLIENT_VERSION
1116 getConnection().conn().createIndex(ns, keys);
1118 getConnection().conn().ensureIndex(ns, keys, unique);
1123 catch (mongo::DBException& e)
1132 CommonStorage::ConnectionWrapper::ConnectionWrapper(
CommonStorage& storage, std::shared_ptr<mongo::DBClientConnection> connPtr):
1133 connPtr{std::move(connPtr)}, hostAndPort{storage.hostAndPort}, storage{&storage}
1136 CommonStorage::ConnectionWrapper::~ConnectionWrapper()
1138 std::lock_guard<std::mutex> guard {storage->serverSettingsMutex};
1139 if (hostAndPort == storage->hostAndPort)
1141 storage->pool.emplace_back(std::move(connPtr));
1145 mongo::DBClientConnection& CommonStorage::ConnectionWrapper::conn()