some transaction methods for cache, some more tests for transactions

This commit is contained in:
Blue 2023-04-05 02:27:31 +03:00
parent 150a0b0da9
commit f99d5559cd
Signed by: blue
GPG key ID: 9B203B252A63EE38
6 changed files with 571 additions and 15 deletions

View file

@ -75,12 +75,19 @@ public:
using Storage<K, V>::drop;
virtual int drop(TransactionID transaction) override;
virtual void addRecord(const K& key, const V& value) override;
virtual void addRecord(const K& key, const V& value, TransactionID txn) override;
virtual bool forceRecord(const K& key, const V& value) override;
virtual bool forceRecord(const K& key, const V& value, TransactionID txn) override;
virtual void changeRecord(const K& key, const V& value) override;
virtual void changeRecord(const K& key, const V& value, TransactionID txn) override;
virtual void removeRecord(const K& key) override;
virtual void removeRecord(const K& key, TransactionID txn) override;
virtual bool checkRecord(const K& key) const override;
virtual bool checkRecord(const K& key, TransactionID txn) const override;
virtual V getRecord(const K& key) const override;
virtual V getRecord(const K& key, TransactionID txn) const override;
virtual SizeType count() const override;
virtual SizeType count(TransactionID txn) const override;
virtual std::map<K, V> readAll() const override;
virtual void replaceAll(const std::map<K, V>& data) override;

View file

@ -55,6 +55,22 @@ void LMDBAL::Cache<K, V>::addRecord(const K& key, const V& value) {
handleAddRecord(key, value);
}
template<class K, class V>
void LMDBAL::Cache<K, V>::addRecord(const K& key, const V& value, TransactionID txn) {
iStorage::ensureOpened(iStorage::addRecordMethodName);
if (cache->count(key) > 0)
iStorage::throwDuplicate(iStorage::toString(key));
Storage<K, V>::addRecord(key, value, txn);
typename TransactionCache::iterator tc = transactionCache->find(txn);
if (tc != transactionCache->end()) {
std::pair<K, V>* pair = new std::pair<K, V>(key, value);
tc->second.emplace_back(Operation::add, pair);
}
}
template<class K, class V>
void LMDBAL::Cache<K, V>::handleAddRecord(const K& key, const V& value) {
cache->insert(std::make_pair(key, value));
@ -73,6 +89,21 @@ bool LMDBAL::Cache<K, V>::forceRecord(const K& key, const V& value) {
return added;
}
template<class K, class V>
bool LMDBAL::Cache<K, V>::forceRecord(const K& key, const V& value, TransactionID txn) {
iStorage::ensureOpened(iStorage::forceRecordMethodName);
bool added = Storage<K, V>::forceRecord(key, value, txn);
typename TransactionCache::iterator tc = transactionCache->find(txn);
if (tc != transactionCache->end()) {
std::tuple<bool, K, V>* t = new std::tuple<bool, K, V>(added, key, value);
tc->second.emplace_back(Operation::force, t);
}
return added;
}
template<class K, class V>
void LMDBAL::Cache<K, V>::handleForceRecord(const K& key, const V& value, bool added) {
if (*mode == Mode::full) {
@ -118,6 +149,35 @@ void LMDBAL::Cache<K, V>::changeRecord(const K& key, const V& value) {
}
}
template<class K, class V>
void LMDBAL::Cache<K, V>::changeRecord(const K& key, const V& value, TransactionID txn) {
iStorage::ensureOpened(iStorage::changeRecordMethodName);
if (*mode == Mode::full) {
typename std::map<K, V>::iterator itr = cache->find(key);
if (itr == cache->end())
iStorage::throwNotFound(iStorage::toString(key));
Storage<K, V>::changeRecord(key, value, txn);
} else {
if (abscent->count(key) > 0)
iStorage::throwNotFound(iStorage::toString(key));
try {
Storage<K, V>::changeRecord(key, value, txn);
} catch (const NotFound& error) {
abscent->insert(key);
throw error;
}
}
typename TransactionCache::iterator tc = transactionCache->find(txn);
if (tc != transactionCache->end()) {
std::pair<K, V>* pair = new std::pair<K, V>(key, value);
tc->second.emplace_back(Operation::add, pair);
}
}
template<class K, class V>
void LMDBAL::Cache<K, V>::handleChangeRecord(const K& key, const V& value) {
if (*mode == Mode::full) {
@ -155,6 +215,111 @@ V LMDBAL::Cache<K, V>::getRecord(const K& key) const {
}
}
template<class K, class V>
V LMDBAL::Cache<K, V>::getRecord(const K& key, TransactionID txn) const {
iStorage::ensureOpened(iStorage::getRecordMethodName);
//if there are any changes made within this transaction
//I will be able to see them among pending changes
//so, I'm going to go through them in reverse order
//and check every key. If it has anything to do this requested key
//there is a way to tell...
std::optional<V> candidate = std::nullopt;
typename TransactionCache::const_iterator tc = transactionCache->find(txn);
if (tc != transactionCache->end()) {
const Queue& queue = tc->second;
for (typename Queue::const_reverse_iterator i = queue.rbegin(), end = queue.rend(); i != end; ++i) {
const Entry& entry = *i;
switch (entry.first) {
case Operation::add:
if (static_cast<std::pair<K, V>*>(entry.second)->first == key)
return static_cast<std::pair<K, V>*>(entry.second)->second;
break;
case Operation::remove:
if (*static_cast<K*>(entry.second) == key) {
if (candidate.has_value())
return candidate.value();
else
iStorage::throwNotFound(iStorage::toString(key));
}
break;
case Operation::change:
if (static_cast<std::pair<K, V>*>(entry.second)->first == key)
return static_cast<std::pair<K, V>*>(entry.second)->second;
break;
case Operation::force:
if (std::get<1>(*static_cast<std::tuple<bool, K, V>*>(entry.second)) == key)
return std::get<2>(*static_cast<std::tuple<bool, K, V>*>(entry.second));
break;
case Operation::drop:
if (candidate.has_value())
return candidate.value();
else
iStorage::throwNotFound(iStorage::toString(key));
break;
case Operation::replace: {
std::map<K, V>* newMap = static_cast<std::map<K, V>*>(entry.second);
typename std::map<K, V>::const_iterator vitr = newMap->find(key);
if (vitr != newMap->end())
return vitr->second;
}
break;
case Operation::addMany: {
const std::tuple<bool, SizeType, std::map<K, V>>& tuple = *static_cast<std::tuple<bool, SizeType, std::map<K, V>>*>(entry.second);
const std::map<K, V>& newElements = std::get<2>(tuple);
typename std::map<K, V>::const_iterator vitr = newElements.find(key);
if (vitr != newElements.end()) {
if (std::get<0>(tuple)) //if the command was to overwrite -
return vitr->second; //it's clear, current value is the actual
//but if it wasn't, I'm going to remember
if (!candidate.has_value()) //only the last (which is the first, keeping in mind reverse order)
candidate = vitr->second; //occurance and return it in case I meet any NotFound condition
}
}
break;
}
}
}
//... but if nothing was found or if the transaction is not the one
//which caused the changes i just need to check it among local cache
typename std::map<K, V>::const_iterator itr = cache->find(key);
if (itr != cache->end())
return itr->second;
if (*mode == Mode::full || abscent->count(key) != 0) {
if (candidate.has_value())
return candidate.value();
else
iStorage::throwNotFound(iStorage::toString(key));
}
try {
V value = Storage<K, V>::getRecord(key);
cache->insert(std::make_pair(key, value));
handleMode();
return value;
} catch (const NotFound& error) {
if (*mode != Mode::full)
abscent->insert(key);
if (candidate.has_value()) {
throw Unknown(iStorage::dbName(),
"Something completely wrong have happened: \
cache has a pending addition transaction \
(probably as a result of calling addRecords \
method with overwrite parameter == false, \
(default is false)), but the database reports \
that there is no such element in the database under current transaction",
iStorage::name);
}
throw error;
}
}
template<class K, class V>
bool LMDBAL::Cache<K, V>::checkRecord(const K& key) const {
iStorage::ensureOpened(iStorage::checkRecordMethodName);
@ -179,6 +344,75 @@ bool LMDBAL::Cache<K, V>::checkRecord(const K& key) const {
}
}
template<class K, class V>
bool LMDBAL::Cache<K, V>::checkRecord(const K& key, TransactionID txn) const {
iStorage::ensureOpened(iStorage::checkRecordMethodName);
//if there are any changes made within this transaction
//I will be able to see them among pending changes
//so, I'm going to go through them in reverse order
//and check every key. If it has anything to do this requested key
//there is a way to tell...
typename TransactionCache::const_iterator tc = transactionCache->find(txn);
if (tc != transactionCache->end()) {
const Queue& queue = tc->second;
for (typename Queue::const_reverse_iterator i = queue.rbegin(), end = queue.rend(); i != end; ++i) {
const Entry& entry = *i;
switch (entry.first) {
case Operation::add:
if (static_cast<std::pair<K, V>*>(entry.second)->first == key)
return true;
break;
case Operation::remove:
if (*static_cast<K*>(entry.second) == key)
return false;
break;
case Operation::change:
if (static_cast<std::pair<K, V>*>(entry.second)->first == key)
return true;
break;
case Operation::force:
if (std::get<1>(*static_cast<std::tuple<bool, K, V>*>(entry.second)) == key)
return true;
break;
case Operation::drop:
return false;
break;
case Operation::replace:
if (static_cast<std::map<K, V>*>(entry.second)->count(key) > 0)
return true;
break;
case Operation::addMany:
if (std::get<2>(*static_cast<std::tuple<bool, SizeType, std::map<K, V>>*>(entry.second)).count(key) > 0)
return true;
break;
}
}
}
//... but if nothing was found or if the transaction is not the one
//which caused the changes i just need to check it among local cache
typename std::map<K, V>::const_iterator itr = cache->find(key);
if (itr != cache->end())
return true;
if (*mode == Mode::full || abscent->count(key) != 0)
return false;
try {
V value = Storage<K, V>::getRecord(key, txn);
cache->insert(std::make_pair(key, value));
handleMode();
return true;
} catch (const NotFound& error) {
if (*mode != Mode::full)
abscent->insert(key);
return false;
}
}
template<class K, class V>
std::map<K, V> LMDBAL::Cache<K, V>::readAll() const {
iStorage::ensureOpened(iStorage::readAllMethodName);
@ -269,6 +503,26 @@ void LMDBAL::Cache<K, V>::removeRecord(const K& key) {
handleRemoveRecord(key);
}
template<class K, class V>
void LMDBAL::Cache<K, V>::removeRecord(const K& key, TransactionID txn) {
iStorage::ensureOpened(iStorage::removeRecordMethodName);
bool noKey = false;
if (*mode != Mode::full)
noKey = cache->count(key) == 0;
else
noKey = abscent->count(key) > 0;
if (noKey)
iStorage::throwNotFound(iStorage::toString(key));
Storage<K, V>::removeRecord(key, txn);
typename TransactionCache::iterator tc = transactionCache->find(txn);
if (tc != transactionCache->end())
tc->second.emplace_back(Operation::remove, new K(key));
}
template<class K, class V>
void LMDBAL::Cache<K, V>::handleRemoveRecord(const K& key) {
if (cache->erase(key) == 0) //if it was not cached and we are now in size mode then the sizeDifference would decrease
@ -302,6 +556,67 @@ uint32_t LMDBAL::Cache<K, V>::count() const {
}
}
template<class K, class V>
uint32_t LMDBAL::Cache<K, V>::count(TransactionID txn) const {
int32_t diff = 0;
bool currentTransaction = false;
typename TransactionCache::const_iterator tc = transactionCache->find(txn);
if (tc != transactionCache->end()) {
currentTransaction = true;
const Queue& queue = tc->second;
for (typename Queue::const_reverse_iterator i = queue.rbegin(), end = queue.rend(); i != end; ++i) {
const Entry& entry = *i;
switch (entry.first) {
case Operation::add:
++diff;
break;
case Operation::remove:
--diff;
break;
case Operation::change:
break;
case Operation::force:
if (std::get<0>(*static_cast<std::tuple<bool, K, V>*>(entry.second)))
return diff;
break;
case Operation::drop:
return false;
break;
case Operation::replace:
return static_cast<std::map<K, V>*>(entry.second)->size() + diff;
break;
case Operation::addMany:
return Storage<K, V>::count(txn); //it's just close to impossible to tell
break;
}
}
}
switch (*mode) {
case Mode::nothing: {
uint32_t sz = Storage<K, V>::count(txn);
if (!currentTransaction) {
*sizeDifference = sz - cache->size();
if (sz == 0) {
*mode = Mode::full;
abscent->clear();
} else {
*mode = Mode::size;
}
}
return sz;
}
case Mode::size:
return cache->size() + *sizeDifference + diff;
case Mode::full:
return cache->size() + diff;
default:
return 0; //unreachable, no such state, just to suppress the waring
}
}
template<class K, class V>
void LMDBAL::Cache<K, V>::handleMode() const {
if (*mode == Mode::size) {
@ -317,7 +632,9 @@ template<class K, class V>
int LMDBAL::Cache<K, V>::drop(TransactionID transaction) {
int res = Storage<K, V>::drop(transaction);
transactionCache->at(transaction).emplace_back(Operation::drop, nullptr);
typename TransactionCache::iterator tc = transactionCache->find(transaction);
if (tc != transactionCache->end())
tc->second.emplace_back(Operation::drop, nullptr);
return res;
}

View file

@ -32,7 +32,7 @@ void LMDBAL::iStorage::drop() {
ensureOpened(dropMethodName);
TransactionID txn = db->beginTransaction();
int rc = drop(txn);
int rc = iStorage::drop(txn);
if (rc != MDB_SUCCESS) {
abortTransaction(txn);
throw Unknown(db->name, mdb_strerror(rc), name);

View file

@ -40,7 +40,7 @@ void LMDBAL::Storage<K, V>::addRecord(const K& key, const V& value) {
ensureOpened(addRecordMethodName);
TransactionID txn = beginTransaction();
try {
addRecord(key, value, txn);
Storage<K, V>::addRecord(key, value, txn);
} catch (...) {
abortTransaction(txn);
throw;
@ -68,7 +68,7 @@ bool LMDBAL::Storage<K, V>::forceRecord(const K& key, const V& value) {
TransactionID txn = beginTransaction();
bool added;
try {
added = forceRecord(key, value, txn);
added = Storage<K, V>::forceRecord(key, value, txn);
} catch (...) {
abortTransaction(txn);
throw;
@ -113,7 +113,7 @@ void LMDBAL::Storage<K, V>::changeRecord(const K& key, const V& value) {
TransactionID txn = beginTransaction();
try {
changeRecord(key, value, txn);
Storage<K, V>::changeRecord(key, value, txn);
} catch (...) {
abortTransaction(txn);
throw;
@ -139,7 +139,7 @@ V LMDBAL::Storage<K, V>::getRecord(const K& key) const {
ensureOpened(getRecordMethodName);
V value;
getRecord(key, value);
Storage<K, V>::getRecord(key, value);
return value;
}
@ -149,7 +149,7 @@ void LMDBAL::Storage<K, V>::getRecord(const K& key, V& value) const {
TransactionID txn = beginReadOnlyTransaction();
try {
getRecord(key, value, txn);
Storage<K, V>::getRecord(key, value, txn);
} catch (...) {
abortTransaction(txn);
throw;
@ -163,7 +163,7 @@ V LMDBAL::Storage<K, V>::getRecord(const K& key, TransactionID txn) const {
ensureOpened(getRecordMethodName);
V value;
getRecord(key, value, txn);
Storage<K, V>::getRecord(key, value, txn);
return value;
}
@ -188,7 +188,7 @@ bool LMDBAL::Storage<K, V>::checkRecord(const K& key) const {
TransactionID txn = beginReadOnlyTransaction();
bool result;
try {
result = checkRecord(key, txn);
result = Storage<K, V>::checkRecord(key, txn);
} catch (...) {
abortTransaction(txn);
throw;
@ -220,7 +220,7 @@ std::map<K, V> LMDBAL::Storage<K, V>::readAll() const {
ensureOpened(readAllMethodName);
std::map<K, V> result;
readAll(result);
Storage<K, V>::readAll(result);
return result;
}
@ -230,7 +230,7 @@ void LMDBAL::Storage<K, V>::readAll(std::map<K, V>& result) const {
TransactionID txn = beginReadOnlyTransaction();
try {
readAll(result, txn);
Storage<K, V>::readAll(result, txn);
} catch (...) {
abortTransaction(txn);
throw;
@ -244,7 +244,7 @@ std::map<K, V> LMDBAL::Storage<K, V>::readAll(TransactionID txn) const {
ensureOpened(readAllMethodName);
std::map<K, V> result;
readAll(result, txn);
Storage<K, V>::readAll(result, txn);
return result;
}
@ -278,7 +278,7 @@ void LMDBAL::Storage<K, V>::replaceAll(const std::map<K, V>& data) {
TransactionID txn = beginTransaction();
try {
replaceAll(data, txn);
Storage<K, V>::replaceAll(data, txn);
} catch (...) {
abortTransaction(txn);
throw;
@ -313,7 +313,7 @@ uint32_t LMDBAL::Storage<K, V>::addRecords(const std::map<K, V>& data, bool over
TransactionID txn = beginTransaction();
uint32_t amount;
try {
amount = addRecords(data, txn, overwrite);
amount = Storage<K, V>::addRecords(data, txn, overwrite);
} catch (...) {
abortTransaction(txn);
throw;
@ -352,7 +352,7 @@ void LMDBAL::Storage<K, V>::removeRecord(const K& key) {
TransactionID txn = beginTransaction();
try {
removeRecord(key, txn);
Storage<K, V>::removeRecord(key, txn);
} catch (...) {
abortTransaction(txn);
throw;