some more transaction methods, method for valueReference in cache, some errors fix

This commit is contained in:
Blue 2023-04-06 02:01:24 +03:00
parent f99d5559cd
commit 181a645efc
Signed by: blue
GPG Key ID: 9B203B252A63EE38
4 changed files with 231 additions and 100 deletions

View File

@ -84,14 +84,19 @@ public:
virtual void removeRecord(const K& key, TransactionID txn) override; virtual void removeRecord(const K& key, TransactionID txn) override;
virtual bool checkRecord(const K& key) const override; virtual bool checkRecord(const K& key) const override;
virtual bool checkRecord(const K& key, TransactionID txn) const override; virtual bool checkRecord(const K& key, TransactionID txn) const override;
virtual void getRecord(const K& key, V& out) const override;
virtual void getRecord(const K& key, V& out, TransactionID txn) const override;
virtual V getRecord(const K& key) const override; virtual V getRecord(const K& key) const override;
virtual V getRecord(const K& key, TransactionID txn) const override; virtual V getRecord(const K& key, TransactionID txn) const override;
virtual SizeType count() const override; virtual SizeType count() const override;
virtual SizeType count(TransactionID txn) const override; virtual SizeType count(TransactionID txn) const override;
virtual std::map<K, V> readAll() const override; virtual std::map<K, V> readAll() const override;
virtual std::map<K, V> readAll(TransactionID txn) const override;
virtual void replaceAll(const std::map<K, V>& data) override; virtual void replaceAll(const std::map<K, V>& data) override;
virtual void replaceAll(const std::map<K, V>& data, TransactionID txn) override;
virtual SizeType addRecords(const std::map<K, V>& data, bool overwrite = false) override; virtual SizeType addRecords(const std::map<K, V>& data, bool overwrite = false) override;
virtual SizeType addRecords(const std::map<K, V>& data, TransactionID txn, bool overwrite = false) override;
protected: protected:
Mode* mode; Mode* mode;

View File

@ -112,7 +112,8 @@ void LMDBAL::Cache<K, V>::handleForceRecord(const K& key, const V& value, bool a
if (added) if (added)
abscent->erase(key); abscent->erase(key);
std::pair<typename std::map<K, V>::iterator, bool> result = cache->insert(std::make_pair(key, value)); std::pair<typename std::map<K, V>::iterator, bool> result =
cache->insert(std::make_pair(key, value));
if (!result.second) if (!result.second)
result.first->second = value; result.first->second = value;
else if (!added) //this way database had value but cache didn't, so, need to decrease sizeDifference else if (!added) //this way database had value but cache didn't, so, need to decrease sizeDifference
@ -137,7 +138,8 @@ void LMDBAL::Cache<K, V>::changeRecord(const K& key, const V& value) {
try { try {
Storage<K, V>::changeRecord(key, value); Storage<K, V>::changeRecord(key, value);
typename std::pair<typename std::map<K, V>::iterator, bool> res = cache->insert(std::make_pair(key, value)); typename std::pair<typename std::map<K, V>::iterator, bool> res =
cache->insert(std::make_pair(key, value));
if (!res.second) if (!res.second)
res.first->second = value; res.first->second = value;
else else
@ -183,7 +185,8 @@ void LMDBAL::Cache<K, V>::handleChangeRecord(const K& key, const V& value) {
if (*mode == Mode::full) { if (*mode == Mode::full) {
cache->at(key) = value; cache->at(key) = value;
} else { } else {
typename std::pair<typename std::map<K, V>::iterator, bool> res = cache->insert(std::make_pair(key, value)); typename std::pair<typename std::map<K, V>::iterator, bool> res =
cache->insert(std::make_pair(key, value));
if (!res.second) if (!res.second)
res.first->second = value; res.first->second = value;
else else
@ -195,18 +198,29 @@ template<class K, class V>
V LMDBAL::Cache<K, V>::getRecord(const K& key) const { V LMDBAL::Cache<K, V>::getRecord(const K& key) const {
iStorage::ensureOpened(iStorage::getRecordMethodName); iStorage::ensureOpened(iStorage::getRecordMethodName);
V value;
Cache<K, V>::getRecord(key, value);
return value;
}
template<class K, class V>
void LMDBAL::Cache<K, V>::getRecord(const K& key, V& out) const {
iStorage::ensureOpened(iStorage::getRecordMethodName);
typename std::map<K, V>::const_iterator itr = cache->find(key); typename std::map<K, V>::const_iterator itr = cache->find(key);
if (itr != cache->end()) if (itr != cache->end()) {
return itr->second; out = itr->second;
return;
}
if (*mode == Mode::full || abscent->count(key) != 0) if (*mode == Mode::full || abscent->count(key) != 0)
iStorage::throwNotFound(iStorage::toString(key)); iStorage::throwNotFound(iStorage::toString(key));
try { try {
V value = Storage<K, V>::getRecord(key); Storage<K, V>::getRecord(key, out);
cache->insert(std::make_pair(key, value)); cache->insert(std::make_pair(key, out));
handleMode(); handleMode();
return value; return;
} catch (const NotFound& error) { } catch (const NotFound& error) {
if (*mode != Mode::full) if (*mode != Mode::full)
abscent->insert(key); abscent->insert(key);
@ -215,68 +229,79 @@ V LMDBAL::Cache<K, V>::getRecord(const K& key) const {
} }
} }
template<class K, class V> template<class K, class V>
V LMDBAL::Cache<K, V>::getRecord(const K& key, TransactionID txn) const { V LMDBAL::Cache<K, V>::getRecord(const K& key, TransactionID txn) const {
iStorage::ensureOpened(iStorage::getRecordMethodName); iStorage::ensureOpened(iStorage::getRecordMethodName);
V value;
Cache<K, V>::getRecord(key, value, txn);
return value;
}
template<class K, class V>
void LMDBAL::Cache<K, V>::getRecord(const K& key, V& out, TransactionID txn) const {
iStorage::ensureOpened(iStorage::getRecordMethodName);
//if there are any changes made within this transaction //if there are any changes made within this transaction
//I will be able to see them among pending changes //I will be able to see them among pending changes
//so, I'm going to go through them in reverse order //so, I'm going to go through them in reverse order
//and check every key. If it has anything to do this requested key //and check every key. If it has anything to do this requested key
//there is a way to tell... //there is a way to tell...
std::optional<V> candidate = std::nullopt; bool currentTransaction = false;
typename TransactionCache::const_iterator tc = transactionCache->find(txn); typename TransactionCache::const_iterator tc = transactionCache->find(txn);
if (tc != transactionCache->end()) { if (tc != transactionCache->end()) {
currentTransaction = true;
const Queue& queue = tc->second; const Queue& queue = tc->second;
for (typename Queue::const_reverse_iterator i = queue.rbegin(), end = queue.rend(); i != end; ++i) { for (typename Queue::const_reverse_iterator i = queue.rbegin(), end = queue.rend(); i != end; ++i) {
const Entry& entry = *i; const Entry& entry = *i;
switch (entry.first) { switch (entry.first) {
case Operation::add: case Operation::add:
if (static_cast<std::pair<K, V>*>(entry.second)->first == key) if (static_cast<std::pair<K, V>*>(entry.second)->first == key) {
return static_cast<std::pair<K, V>*>(entry.second)->second; out = static_cast<std::pair<K, V>*>(entry.second)->second;
break; return;
case Operation::remove:
if (*static_cast<K*>(entry.second) == key) {
if (candidate.has_value())
return candidate.value();
else
iStorage::throwNotFound(iStorage::toString(key));
} }
break; break;
case Operation::remove:
iStorage::throwNotFound(iStorage::toString(key));
break;
case Operation::change: case Operation::change:
if (static_cast<std::pair<K, V>*>(entry.second)->first == key) if (static_cast<std::pair<K, V>*>(entry.second)->first == key) {
return static_cast<std::pair<K, V>*>(entry.second)->second; out = static_cast<std::pair<K, V>*>(entry.second)->second;
return;
}
break; break;
case Operation::force: case Operation::force:
if (std::get<1>(*static_cast<std::tuple<bool, K, V>*>(entry.second)) == key) if (std::get<1>(*static_cast<std::tuple<bool, K, V>*>(entry.second)) == key) {
return std::get<2>(*static_cast<std::tuple<bool, K, V>*>(entry.second)); out = std::get<2>(*static_cast<std::tuple<bool, K, V>*>(entry.second));
return;
}
break; break;
case Operation::drop: case Operation::drop:
if (candidate.has_value())
return candidate.value();
else
iStorage::throwNotFound(iStorage::toString(key)); iStorage::throwNotFound(iStorage::toString(key));
break; break;
case Operation::replace: { case Operation::replace: {
std::map<K, V>* newMap = static_cast<std::map<K, V>*>(entry.second); std::map<K, V>* newMap = static_cast<std::map<K, V>*>(entry.second);
typename std::map<K, V>::const_iterator vitr = newMap->find(key); typename std::map<K, V>::const_iterator vitr = newMap->find(key);
if (vitr != newMap->end()) if (vitr != newMap->end()) {
return vitr->second; out = vitr->second;
return;
} else {
iStorage::throwNotFound(iStorage::toString(key));
}
} }
break; break;
case Operation::addMany: { case Operation::addMany: {
const std::tuple<bool, SizeType, std::map<K, V>>& tuple = *static_cast<std::tuple<bool, SizeType, std::map<K, V>>*>(entry.second); const std::tuple<bool, SizeType, std::map<K, V>>& tuple =
*static_cast<std::tuple<bool, SizeType, std::map<K, V>>*>(entry.second);
const std::map<K, V>& newElements = std::get<2>(tuple); const std::map<K, V>& newElements = std::get<2>(tuple);
typename std::map<K, V>::const_iterator vitr = newElements.find(key); typename std::map<K, V>::const_iterator vitr = newElements.find(key);
if (vitr != newElements.end()) { if (vitr != newElements.end()) {
if (std::get<0>(tuple)) //if the command was to overwrite - out = vitr->second;
return vitr->second; //it's clear, current value is the actual return;
//but if it wasn't, I'm going to remember
if (!candidate.has_value()) //only the last (which is the first, keeping in mind reverse order)
candidate = vitr->second; //occurance and return it in case I meet any NotFound condition
} }
} }
break; break;
} }
@ -286,35 +311,24 @@ V LMDBAL::Cache<K, V>::getRecord(const K& key, TransactionID txn) const {
//which caused the changes i just need to check it among local cache //which caused the changes i just need to check it among local cache
typename std::map<K, V>::const_iterator itr = cache->find(key); typename std::map<K, V>::const_iterator itr = cache->find(key);
if (itr != cache->end()) if (itr != cache->end()) {
return itr->second; out = itr->second;
return;
if (*mode == Mode::full || abscent->count(key) != 0) {
if (candidate.has_value())
return candidate.value();
else
iStorage::throwNotFound(iStorage::toString(key));
} }
if (*mode == Mode::full || abscent->count(key) != 0)
iStorage::throwNotFound(iStorage::toString(key));
try { try {
V value = Storage<K, V>::getRecord(key); Storage<K, V>::getRecord(key, out, txn);
cache->insert(std::make_pair(key, value)); if (!currentTransaction) {
cache->insert(std::make_pair(key, out));
handleMode(); handleMode();
return value;
} catch (const NotFound& error) {
if (*mode != Mode::full)
abscent->insert(key);
if (candidate.has_value()) {
throw Unknown(iStorage::dbName(),
"Something completely wrong have happened: \
cache has a pending addition transaction \
(probably as a result of calling addRecords \
method with overwrite parameter == false, \
(default is false)), but the database reports \
that there is no such element in the database under current transaction",
iStorage::name);
} }
return;
} catch (const NotFound& error) {
if (! currentTransaction && *mode != Mode::full)
abscent->insert(key);
throw error; throw error;
} }
@ -353,8 +367,10 @@ bool LMDBAL::Cache<K, V>::checkRecord(const K& key, TransactionID txn) const {
//so, I'm going to go through them in reverse order //so, I'm going to go through them in reverse order
//and check every key. If it has anything to do this requested key //and check every key. If it has anything to do this requested key
//there is a way to tell... //there is a way to tell...
bool currentTransaction = false;
typename TransactionCache::const_iterator tc = transactionCache->find(txn); typename TransactionCache::const_iterator tc = transactionCache->find(txn);
if (tc != transactionCache->end()) { if (tc != transactionCache->end()) {
currentTransaction = true;
const Queue& queue = tc->second; const Queue& queue = tc->second;
for (typename Queue::const_reverse_iterator i = queue.rbegin(), end = queue.rend(); i != end; ++i) { for (typename Queue::const_reverse_iterator i = queue.rbegin(), end = queue.rend(); i != end; ++i) {
const Entry& entry = *i; const Entry& entry = *i;
@ -382,9 +398,13 @@ bool LMDBAL::Cache<K, V>::checkRecord(const K& key, TransactionID txn) const {
case Operation::replace: case Operation::replace:
if (static_cast<std::map<K, V>*>(entry.second)->count(key) > 0) if (static_cast<std::map<K, V>*>(entry.second)->count(key) > 0)
return true; return true;
else
return false;
break; break;
case Operation::addMany: case Operation::addMany:
if (std::get<2>(*static_cast<std::tuple<bool, SizeType, std::map<K, V>>*>(entry.second)).count(key) > 0) if (std::get<2>(
*static_cast<std::tuple<bool, SizeType, std::map<K, V>>*>(entry.second)
).count(key) > 0)
return true; return true;
break; break;
} }
@ -402,11 +422,13 @@ bool LMDBAL::Cache<K, V>::checkRecord(const K& key, TransactionID txn) const {
try { try {
V value = Storage<K, V>::getRecord(key, txn); V value = Storage<K, V>::getRecord(key, txn);
if (!currentTransaction) {
cache->insert(std::make_pair(key, value)); cache->insert(std::make_pair(key, value));
handleMode(); handleMode();
}
return true; return true;
} catch (const NotFound& error) { } catch (const NotFound& error) {
if (*mode != Mode::full) if (!currentTransaction && *mode != Mode::full)
abscent->insert(key); abscent->insert(key);
return false; return false;
@ -427,6 +449,81 @@ std::map<K, V> LMDBAL::Cache<K, V>::readAll() const {
return *cache; return *cache;
} }
template<class K, class V>
std::map<K, V> LMDBAL::Cache<K, V>::readAll(TransactionID txn) const {
iStorage::ensureOpened(iStorage::readAllMethodName);
typename TransactionCache::iterator tc = transactionCache->find(txn);
if (tc != transactionCache->end()) {
Queue& queue = tc->second;
if (*mode != Mode::full) {
std::map<K, V> result = *cache;
for (typename Queue::const_iterator i = queue.begin(), end = queue.end(); i != end; ++i) {
const Entry& entry = *i;
switch (entry.first) {
case Operation::add:
result.insert(*static_cast<std::pair<K, V>*>(entry.second));
break;
case Operation::remove:
result.erase(*static_cast<K*>(entry.second));
break;
case Operation::change: {
std::pair<K, V>* pair = static_cast<std::pair<K, V>*>(entry.second);
result.at(pair->first) = pair->second;
}
break;
case Operation::force:{
const std::tuple<bool, K, V>& tuple =
*static_cast<std::tuple<bool, K, V>*>(entry.second);
result[std::get<1>(tuple)] = std::get<2>(tuple);
}
break;
case Operation::drop:
result.clear();
break;
case Operation::replace:
result = *static_cast<std::map<K, V>*>(entry.second);
break;
case Operation::addMany: {
const std::tuple<bool, SizeType, std::map<K, V>>& t =
*static_cast<std::tuple<bool, SizeType, std::map<K, V>>*>(entry.second);
const std::map<K, V>& added = std::get<2>(t);
bool overwrite = std::get<0>(t);
for (const std::pair<const K, V>& pair : added) {
if (overwrite)
result[pair.first] = pair.second;
else
result.insert(pair);
}
}
break;
}
}
return result;
} else {
std::map<K, V>* result = new std::map<K, V>();
Storage<K, V>::readAll(*result, txn);
//queue.clear(); //since I'm getting a complete state of the database
queue.emplace_back(Operation::replace, result); //I can as well erase all previous cache entries
return *result;
}
} else {
if (*mode != Mode::full) { //there is a room for optimization
*mode = Mode::full; //I can read and deserialize only those values
*cache = Storage<K, V>::readAll(txn); //that are missing in the cache
abscent->clear();
*sizeDifference = 0;
}
return *cache;
}
}
template<class K, class V> template<class K, class V>
void LMDBAL::Cache<K, V>::replaceAll(const std::map<K, V>& data) { void LMDBAL::Cache<K, V>::replaceAll(const std::map<K, V>& data) {
Storage<K, V>::replaceAll(data); Storage<K, V>::replaceAll(data);
@ -439,6 +536,18 @@ void LMDBAL::Cache<K, V>::replaceAll(const std::map<K, V>& data) {
} }
} }
template<class K, class V>
void LMDBAL::Cache<K, V>::replaceAll(const std::map<K, V>& data, TransactionID txn) {
Storage<K, V>::replaceAll(data, txn);
typename TransactionCache::iterator tc = transactionCache->find(txn);
if (tc != transactionCache->end()) {
//queue.clear();
std::map<K, V>* map = new std::map<K, V>(data); //since I'm getting a complete state of the database
tc->second.emplace_back(Operation::replace, map); //I can as well erase all previous cache entries
}
}
template<class K, class V> template<class K, class V>
void LMDBAL::Cache<K, V>::handleReplaceAll(std::map<K, V>* data) { void LMDBAL::Cache<K, V>::handleReplaceAll(std::map<K, V>* data) {
delete cache; delete cache;
@ -459,6 +568,20 @@ LMDBAL::SizeType LMDBAL::Cache<K, V>::addRecords(const std::map<K, V>& data, boo
return newSize; return newSize;
} }
template<class K, class V>
LMDBAL::SizeType LMDBAL::Cache<K, V>::addRecords(const std::map<K, V>& data, TransactionID txn, bool overwrite) {
SizeType newSize = Storage<K, V>::addRecords(data, txn, overwrite);
typename TransactionCache::iterator tc = transactionCache->find(txn);
if (tc != transactionCache->end()) {
std::tuple<bool, SizeType, std::map<K, V>>* tuple =
new std::tuple<bool, SizeType, std::map<K, V>>(overwrite, newSize, data);
tc->second.emplace_back(Operation::addMany, tuple);
}
return newSize;
}
template<class K, class V> template<class K, class V>
void LMDBAL::Cache<K, V>::handleAddRecords(const std::map<K, V>& data, bool overwrite, SizeType newSize) { void LMDBAL::Cache<K, V>::handleAddRecords(const std::map<K, V>& data, bool overwrite, SizeType newSize) {
Mode& m = *mode; Mode& m = *mode;
@ -579,7 +702,7 @@ uint32_t LMDBAL::Cache<K, V>::count(TransactionID txn) const {
break; break;
case Operation::force: case Operation::force:
if (std::get<0>(*static_cast<std::tuple<bool, K, V>*>(entry.second))) if (std::get<0>(*static_cast<std::tuple<bool, K, V>*>(entry.second)))
return diff; ++diff;
break; break;
case Operation::drop: case Operation::drop:
return false; return false;
@ -588,7 +711,7 @@ uint32_t LMDBAL::Cache<K, V>::count(TransactionID txn) const {
return static_cast<std::map<K, V>*>(entry.second)->size() + diff; return static_cast<std::map<K, V>*>(entry.second)->size() + diff;
break; break;
case Operation::addMany: case Operation::addMany:
return Storage<K, V>::count(txn); //it's just close to impossible to tell return std::get<1>(*static_cast<std::tuple<bool, SizeType, std::map<K, V>>*>(entry.second)) + diff;
break; break;
} }
} }

View File

@ -334,6 +334,9 @@ uint32_t LMDBAL::Storage<K, V>::addRecords(const std::map<K, V>& data, Transacti
lmdbData = valueSerializer->setData(pair.second); lmdbData = valueSerializer->setData(pair.second);
rc = mdb_put(txn, dbi, &lmdbKey, &lmdbData, overwrite ? 0 : MDB_NOOVERWRITE); rc = mdb_put(txn, dbi, &lmdbKey, &lmdbData, overwrite ? 0 : MDB_NOOVERWRITE);
if (rc == MDB_KEYEXIST)
throwDuplicate(toString(pair.first));
if (rc != MDB_SUCCESS) if (rc != MDB_SUCCESS)
throwUnknown(rc); throwUnknown(rc);
} }

View File

@ -132,41 +132,41 @@ TEST_F(CacheTransactionsTest, Reading) {
} }
TEST_F(CacheTransactionsTest, ConcurentReading) { TEST_F(CacheTransactionsTest, ConcurentReading) {
// EXPECT_EQ(db->ready(), true); EXPECT_EQ(db->ready(), true);
//
// LMDBAL::SizeType size = c1->count(); LMDBAL::SizeType size = c1->count();
// LMDBAL::TransactionID txn = db->beginTransaction(); LMDBAL::TransactionID txn = db->beginTransaction();
// EXPECT_EQ(c1->getRecord(5, txn), 13); EXPECT_EQ(c1->getRecord(5, txn), 13);
// EXPECT_EQ(c1->getRecord(5), 13); EXPECT_EQ(c1->getRecord(5), 13);
//
// c1->removeRecord(5, txn); c1->removeRecord(5, txn);
//
// EXPECT_FALSE(c1->checkRecord(5, txn)); EXPECT_FALSE(c1->checkRecord(5, txn));
// EXPECT_EQ(c1->getRecord(5), 13); EXPECT_EQ(c1->getRecord(5), 13);
//
// c1->addRecord(5, 571, txn); c1->addRecord(5, 571, txn);
// EXPECT_EQ(c1->getRecord(5, txn), 571); EXPECT_EQ(c1->getRecord(5, txn), 571);
// EXPECT_EQ(c1->getRecord(5), 13); EXPECT_EQ(c1->getRecord(5), 13);
//
// c1->forceRecord(5, -472, txn); c1->forceRecord(5, -472, txn);
// EXPECT_EQ(c1->getRecord(5, txn), -472); EXPECT_EQ(c1->getRecord(5, txn), -472);
// EXPECT_EQ(c1->getRecord(5), 13); EXPECT_EQ(c1->getRecord(5), 13);
//
// c1->replaceAll({ c1->replaceAll({
// {1, 75} {1, 75}
// }, txn); }, txn);
// EXPECT_FALSE(c1->checkRecord(5, txn)); EXPECT_FALSE(c1->checkRecord(5, txn));
// EXPECT_EQ(c1->getRecord(5), 13); EXPECT_EQ(c1->getRecord(5), 13);
// EXPECT_EQ(c1->count(txn), 1); EXPECT_EQ(c1->count(txn), 1);
// EXPECT_EQ(c1->count(), size); EXPECT_EQ(c1->count(), size);
//
// db->commitTransaction(txn); db->commitTransaction(txn);
//
// EXPECT_FALSE(c1->checkRecord(5)); EXPECT_FALSE(c1->checkRecord(5));
// EXPECT_EQ(c1->count(), 1); EXPECT_EQ(c1->count(), 1);
} }
/*
TEST_F(CacheTransactionsTest, ConcurentModification) { TEST_F(CacheTransactionsTest, ConcurentModification) {
EXPECT_EQ(db->ready(), true); EXPECT_EQ(db->ready(), true);
@ -228,4 +228,4 @@ TEST_F(CacheTransactionsTest, ConcurentModification) {
std::cout << "checking final result" << std::endl; std::cout << "checking final result" << std::endl;
EXPECT_EQ(c1->getRecord(5), -46); EXPECT_EQ(c1->getRecord(5), -46);
} }
*/