diff --git a/src/cache.h b/src/cache.h index 2a460b6..39e7902 100644 --- a/src/cache.h +++ b/src/cache.h @@ -75,12 +75,19 @@ public: using Storage::drop; virtual int drop(TransactionID transaction) override; virtual void addRecord(const K& key, const V& value) override; + virtual void addRecord(const K& key, const V& value, TransactionID txn) override; virtual bool forceRecord(const K& key, const V& value) override; + virtual bool forceRecord(const K& key, const V& value, TransactionID txn) override; virtual void changeRecord(const K& key, const V& value) override; + virtual void changeRecord(const K& key, const V& value, TransactionID txn) override; virtual void removeRecord(const K& key) override; + virtual void removeRecord(const K& key, TransactionID txn) override; virtual bool checkRecord(const K& key) const override; + virtual bool checkRecord(const K& key, TransactionID txn) const override; virtual V getRecord(const K& key) const override; + virtual V getRecord(const K& key, TransactionID txn) const override; virtual SizeType count() const override; + virtual SizeType count(TransactionID txn) const override; virtual std::map readAll() const override; virtual void replaceAll(const std::map& data) override; diff --git a/src/cache.hpp b/src/cache.hpp index d92b0f6..5c0d9ca 100644 --- a/src/cache.hpp +++ b/src/cache.hpp @@ -55,6 +55,22 @@ void LMDBAL::Cache::addRecord(const K& key, const V& value) { handleAddRecord(key, value); } +template +void LMDBAL::Cache::addRecord(const K& key, const V& value, TransactionID txn) { + iStorage::ensureOpened(iStorage::addRecordMethodName); + + if (cache->count(key) > 0) + iStorage::throwDuplicate(iStorage::toString(key)); + + Storage::addRecord(key, value, txn); + + typename TransactionCache::iterator tc = transactionCache->find(txn); + if (tc != transactionCache->end()) { + std::pair* pair = new std::pair(key, value); + tc->second.emplace_back(Operation::add, pair); + } +} + template void LMDBAL::Cache::handleAddRecord(const K& key, const V& value) { cache->insert(std::make_pair(key, value)); @@ -73,6 +89,21 @@ bool LMDBAL::Cache::forceRecord(const K& key, const V& value) { return added; } +template +bool LMDBAL::Cache::forceRecord(const K& key, const V& value, TransactionID txn) { + iStorage::ensureOpened(iStorage::forceRecordMethodName); + + bool added = Storage::forceRecord(key, value, txn); + + typename TransactionCache::iterator tc = transactionCache->find(txn); + if (tc != transactionCache->end()) { + std::tuple* t = new std::tuple(added, key, value); + tc->second.emplace_back(Operation::force, t); + } + + return added; +} + template void LMDBAL::Cache::handleForceRecord(const K& key, const V& value, bool added) { if (*mode == Mode::full) { @@ -118,6 +149,35 @@ void LMDBAL::Cache::changeRecord(const K& key, const V& value) { } } +template +void LMDBAL::Cache::changeRecord(const K& key, const V& value, TransactionID txn) { + iStorage::ensureOpened(iStorage::changeRecordMethodName); + + if (*mode == Mode::full) { + typename std::map::iterator itr = cache->find(key); + if (itr == cache->end()) + iStorage::throwNotFound(iStorage::toString(key)); + + Storage::changeRecord(key, value, txn); + } else { + if (abscent->count(key) > 0) + iStorage::throwNotFound(iStorage::toString(key)); + + try { + Storage::changeRecord(key, value, txn); + } catch (const NotFound& error) { + abscent->insert(key); + throw error; + } + } + + typename TransactionCache::iterator tc = transactionCache->find(txn); + if (tc != transactionCache->end()) { + std::pair* pair = new std::pair(key, value); + tc->second.emplace_back(Operation::add, pair); + } +} + template void LMDBAL::Cache::handleChangeRecord(const K& key, const V& value) { if (*mode == Mode::full) { @@ -155,6 +215,111 @@ V LMDBAL::Cache::getRecord(const K& key) const { } } +template +V LMDBAL::Cache::getRecord(const K& key, TransactionID txn) const { + iStorage::ensureOpened(iStorage::getRecordMethodName); + + //if there are any changes made within this transaction + //I will be able to see them among pending changes + //so, I'm going to go through them in reverse order + //and check every key. If it has anything to do this requested key + //there is a way to tell... + std::optional candidate = std::nullopt; + typename TransactionCache::const_iterator tc = transactionCache->find(txn); + if (tc != transactionCache->end()) { + const Queue& queue = tc->second; + for (typename Queue::const_reverse_iterator i = queue.rbegin(), end = queue.rend(); i != end; ++i) { + const Entry& entry = *i; + + switch (entry.first) { + case Operation::add: + if (static_cast*>(entry.second)->first == key) + return static_cast*>(entry.second)->second; + break; + case Operation::remove: + if (*static_cast(entry.second) == key) { + if (candidate.has_value()) + return candidate.value(); + else + iStorage::throwNotFound(iStorage::toString(key)); + } + break; + case Operation::change: + if (static_cast*>(entry.second)->first == key) + return static_cast*>(entry.second)->second; + break; + case Operation::force: + if (std::get<1>(*static_cast*>(entry.second)) == key) + return std::get<2>(*static_cast*>(entry.second)); + break; + case Operation::drop: + if (candidate.has_value()) + return candidate.value(); + else + iStorage::throwNotFound(iStorage::toString(key)); + break; + case Operation::replace: { + std::map* newMap = static_cast*>(entry.second); + typename std::map::const_iterator vitr = newMap->find(key); + if (vitr != newMap->end()) + return vitr->second; + } + break; + case Operation::addMany: { + const std::tuple>& tuple = *static_cast>*>(entry.second); + const std::map& newElements = std::get<2>(tuple); + typename std::map::const_iterator vitr = newElements.find(key); + if (vitr != newElements.end()) { + if (std::get<0>(tuple)) //if the command was to overwrite - + return vitr->second; //it's clear, current value is the actual + //but if it wasn't, I'm going to remember + if (!candidate.has_value()) //only the last (which is the first, keeping in mind reverse order) + candidate = vitr->second; //occurance and return it in case I meet any NotFound condition + } + + } + break; + } + } + } + //... but if nothing was found or if the transaction is not the one + //which caused the changes i just need to check it among local cache + + typename std::map::const_iterator itr = cache->find(key); + if (itr != cache->end()) + return itr->second; + + if (*mode == Mode::full || abscent->count(key) != 0) { + if (candidate.has_value()) + return candidate.value(); + else + iStorage::throwNotFound(iStorage::toString(key)); + } + + try { + V value = Storage::getRecord(key); + cache->insert(std::make_pair(key, value)); + handleMode(); + return value; + } catch (const NotFound& error) { + if (*mode != Mode::full) + abscent->insert(key); + + if (candidate.has_value()) { + throw Unknown(iStorage::dbName(), +"Something completely wrong have happened: \ +cache has a pending addition transaction \ +(probably as a result of calling addRecords \ +method with overwrite parameter == false, \ +(default is false)), but the database reports \ +that there is no such element in the database under current transaction", + iStorage::name); + } + + throw error; + } +} + template bool LMDBAL::Cache::checkRecord(const K& key) const { iStorage::ensureOpened(iStorage::checkRecordMethodName); @@ -179,6 +344,75 @@ bool LMDBAL::Cache::checkRecord(const K& key) const { } } +template +bool LMDBAL::Cache::checkRecord(const K& key, TransactionID txn) const { + iStorage::ensureOpened(iStorage::checkRecordMethodName); + + //if there are any changes made within this transaction + //I will be able to see them among pending changes + //so, I'm going to go through them in reverse order + //and check every key. If it has anything to do this requested key + //there is a way to tell... + typename TransactionCache::const_iterator tc = transactionCache->find(txn); + if (tc != transactionCache->end()) { + const Queue& queue = tc->second; + for (typename Queue::const_reverse_iterator i = queue.rbegin(), end = queue.rend(); i != end; ++i) { + const Entry& entry = *i; + + switch (entry.first) { + case Operation::add: + if (static_cast*>(entry.second)->first == key) + return true; + break; + case Operation::remove: + if (*static_cast(entry.second) == key) + return false; + break; + case Operation::change: + if (static_cast*>(entry.second)->first == key) + return true; + break; + case Operation::force: + if (std::get<1>(*static_cast*>(entry.second)) == key) + return true; + break; + case Operation::drop: + return false; + break; + case Operation::replace: + if (static_cast*>(entry.second)->count(key) > 0) + return true; + break; + case Operation::addMany: + if (std::get<2>(*static_cast>*>(entry.second)).count(key) > 0) + return true; + break; + } + } + } + //... but if nothing was found or if the transaction is not the one + //which caused the changes i just need to check it among local cache + + typename std::map::const_iterator itr = cache->find(key); + if (itr != cache->end()) + return true; + + if (*mode == Mode::full || abscent->count(key) != 0) + return false; + + try { + V value = Storage::getRecord(key, txn); + cache->insert(std::make_pair(key, value)); + handleMode(); + return true; + } catch (const NotFound& error) { + if (*mode != Mode::full) + abscent->insert(key); + + return false; + } +} + template std::map LMDBAL::Cache::readAll() const { iStorage::ensureOpened(iStorage::readAllMethodName); @@ -269,6 +503,26 @@ void LMDBAL::Cache::removeRecord(const K& key) { handleRemoveRecord(key); } +template +void LMDBAL::Cache::removeRecord(const K& key, TransactionID txn) { + iStorage::ensureOpened(iStorage::removeRecordMethodName); + + bool noKey = false; + if (*mode != Mode::full) + noKey = cache->count(key) == 0; + else + noKey = abscent->count(key) > 0; + + if (noKey) + iStorage::throwNotFound(iStorage::toString(key)); + + Storage::removeRecord(key, txn); + + typename TransactionCache::iterator tc = transactionCache->find(txn); + if (tc != transactionCache->end()) + tc->second.emplace_back(Operation::remove, new K(key)); +} + template void LMDBAL::Cache::handleRemoveRecord(const K& key) { if (cache->erase(key) == 0) //if it was not cached and we are now in size mode then the sizeDifference would decrease @@ -302,6 +556,67 @@ uint32_t LMDBAL::Cache::count() const { } } +template +uint32_t LMDBAL::Cache::count(TransactionID txn) const { + + int32_t diff = 0; + bool currentTransaction = false; + typename TransactionCache::const_iterator tc = transactionCache->find(txn); + if (tc != transactionCache->end()) { + currentTransaction = true; + const Queue& queue = tc->second; + for (typename Queue::const_reverse_iterator i = queue.rbegin(), end = queue.rend(); i != end; ++i) { + const Entry& entry = *i; + + switch (entry.first) { + case Operation::add: + ++diff; + break; + case Operation::remove: + --diff; + break; + case Operation::change: + break; + case Operation::force: + if (std::get<0>(*static_cast*>(entry.second))) + return diff; + break; + case Operation::drop: + return false; + break; + case Operation::replace: + return static_cast*>(entry.second)->size() + diff; + break; + case Operation::addMany: + return Storage::count(txn); //it's just close to impossible to tell + break; + } + } + } + + switch (*mode) { + case Mode::nothing: { + uint32_t sz = Storage::count(txn); + if (!currentTransaction) { + *sizeDifference = sz - cache->size(); + if (sz == 0) { + *mode = Mode::full; + abscent->clear(); + } else { + *mode = Mode::size; + } + } + return sz; + } + case Mode::size: + return cache->size() + *sizeDifference + diff; + case Mode::full: + return cache->size() + diff; + default: + return 0; //unreachable, no such state, just to suppress the waring + } +} + template void LMDBAL::Cache::handleMode() const { if (*mode == Mode::size) { @@ -317,7 +632,9 @@ template int LMDBAL::Cache::drop(TransactionID transaction) { int res = Storage::drop(transaction); - transactionCache->at(transaction).emplace_back(Operation::drop, nullptr); + typename TransactionCache::iterator tc = transactionCache->find(transaction); + if (tc != transactionCache->end()) + tc->second.emplace_back(Operation::drop, nullptr); return res; } diff --git a/src/storage.cpp b/src/storage.cpp index 5949bc7..351ae9d 100644 --- a/src/storage.cpp +++ b/src/storage.cpp @@ -32,7 +32,7 @@ void LMDBAL::iStorage::drop() { ensureOpened(dropMethodName); TransactionID txn = db->beginTransaction(); - int rc = drop(txn); + int rc = iStorage::drop(txn); if (rc != MDB_SUCCESS) { abortTransaction(txn); throw Unknown(db->name, mdb_strerror(rc), name); diff --git a/src/storage.hpp b/src/storage.hpp index 52e4422..68ad50e 100644 --- a/src/storage.hpp +++ b/src/storage.hpp @@ -40,7 +40,7 @@ void LMDBAL::Storage::addRecord(const K& key, const V& value) { ensureOpened(addRecordMethodName); TransactionID txn = beginTransaction(); try { - addRecord(key, value, txn); + Storage::addRecord(key, value, txn); } catch (...) { abortTransaction(txn); throw; @@ -68,7 +68,7 @@ bool LMDBAL::Storage::forceRecord(const K& key, const V& value) { TransactionID txn = beginTransaction(); bool added; try { - added = forceRecord(key, value, txn); + added = Storage::forceRecord(key, value, txn); } catch (...) { abortTransaction(txn); throw; @@ -113,7 +113,7 @@ void LMDBAL::Storage::changeRecord(const K& key, const V& value) { TransactionID txn = beginTransaction(); try { - changeRecord(key, value, txn); + Storage::changeRecord(key, value, txn); } catch (...) { abortTransaction(txn); throw; @@ -139,7 +139,7 @@ V LMDBAL::Storage::getRecord(const K& key) const { ensureOpened(getRecordMethodName); V value; - getRecord(key, value); + Storage::getRecord(key, value); return value; } @@ -149,7 +149,7 @@ void LMDBAL::Storage::getRecord(const K& key, V& value) const { TransactionID txn = beginReadOnlyTransaction(); try { - getRecord(key, value, txn); + Storage::getRecord(key, value, txn); } catch (...) { abortTransaction(txn); throw; @@ -163,7 +163,7 @@ V LMDBAL::Storage::getRecord(const K& key, TransactionID txn) const { ensureOpened(getRecordMethodName); V value; - getRecord(key, value, txn); + Storage::getRecord(key, value, txn); return value; } @@ -188,7 +188,7 @@ bool LMDBAL::Storage::checkRecord(const K& key) const { TransactionID txn = beginReadOnlyTransaction(); bool result; try { - result = checkRecord(key, txn); + result = Storage::checkRecord(key, txn); } catch (...) { abortTransaction(txn); throw; @@ -220,7 +220,7 @@ std::map LMDBAL::Storage::readAll() const { ensureOpened(readAllMethodName); std::map result; - readAll(result); + Storage::readAll(result); return result; } @@ -230,7 +230,7 @@ void LMDBAL::Storage::readAll(std::map& result) const { TransactionID txn = beginReadOnlyTransaction(); try { - readAll(result, txn); + Storage::readAll(result, txn); } catch (...) { abortTransaction(txn); throw; @@ -244,7 +244,7 @@ std::map LMDBAL::Storage::readAll(TransactionID txn) const { ensureOpened(readAllMethodName); std::map result; - readAll(result, txn); + Storage::readAll(result, txn); return result; } @@ -278,7 +278,7 @@ void LMDBAL::Storage::replaceAll(const std::map& data) { TransactionID txn = beginTransaction(); try { - replaceAll(data, txn); + Storage::replaceAll(data, txn); } catch (...) { abortTransaction(txn); throw; @@ -313,7 +313,7 @@ uint32_t LMDBAL::Storage::addRecords(const std::map& data, bool over TransactionID txn = beginTransaction(); uint32_t amount; try { - amount = addRecords(data, txn, overwrite); + amount = Storage::addRecords(data, txn, overwrite); } catch (...) { abortTransaction(txn); throw; @@ -352,7 +352,7 @@ void LMDBAL::Storage::removeRecord(const K& key) { TransactionID txn = beginTransaction(); try { - removeRecord(key, txn); + Storage::removeRecord(key, txn); } catch (...) { abortTransaction(txn); throw; diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index e961886..43480dd 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -6,6 +6,7 @@ add_executable(runUnitTests basic.cpp serialization.cpp storagetransaction.cpp + cachetransaction.cpp ) target_compile_options(runUnitTests PRIVATE -fPIC) diff --git a/test/cachetransaction.cpp b/test/cachetransaction.cpp new file mode 100644 index 0000000..5ae7ea3 --- /dev/null +++ b/test/cachetransaction.cpp @@ -0,0 +1,231 @@ +#include + +#include + +#include "base.h" +#include "cache.h" + +class CacheTransactionsTest : public testing::Test { +protected: + CacheTransactionsTest(): + testing::Test(), + c1(db->getCache("cache1")), + c2(db->getCache("cache2")) {} + + ~CacheTransactionsTest() {} + + int waitForChildFork(int pid) { + int status; + if (0 > waitpid(pid, &status, 0)) { + std::cerr << "[----------] Waitpid error!" << std::endl; + return (-1); + } + if (WIFEXITED(status)) { + const int exit_status = WEXITSTATUS(status); + if (exit_status != 0) { + std::cerr << "[----------] Non-zero exit status " << exit_status << " from test!" << std::endl; + } + return exit_status; + } else { + std::cerr << "[----------] Non-normal exit from child!" << std::endl; + return (-2); + } + } + + static void SetUpTestSuite() { + if (db == nullptr) { + db = new LMDBAL::Base("storageTrnansactionsTestBase"); + db->addStorage("cache1"); + db->addStorage("cache2"); + } + + db->open(); + db->drop(); + } + + static void TearDownTestSuite() { + db->close(); + db->removeDirectory(); + delete db; + db = nullptr; + } + + static LMDBAL::Base* db; + + LMDBAL::Cache* c1; + LMDBAL::Cache* c2; +}; + + +LMDBAL::Base* CacheTransactionsTest::db = nullptr; + +TEST_F(CacheTransactionsTest, Adding) { + EXPECT_EQ(db->ready(), true); + EXPECT_EQ(c1->count(), 0); + EXPECT_EQ(c2->count(), 0); + + LMDBAL::TransactionID txn = db->beginTransaction(); + c1->addRecord(5, 13, txn); + c1->addRecord(-53, 782, txn); + c1->addRecord(5892, -37829, txn); + + c2->addRecord("lorem", 481, txn); + c2->addRecord("decallence", 8532.48, txn); + c2->addRecord("prevent recovery", -64.64, txn); + + EXPECT_EQ(c1->count(), 0); + EXPECT_EQ(c2->count(), 0); + + db->commitTransaction(txn); + + EXPECT_EQ(c1->count(), 3); + EXPECT_EQ(c1->getRecord(5), 13); + EXPECT_EQ(c1->getRecord(-53), 782); + EXPECT_EQ(c1->getRecord(5892), -37829); + + EXPECT_EQ(c2->count(), 3); + EXPECT_FLOAT_EQ(c2->getRecord("lorem"), 481); + EXPECT_FLOAT_EQ(c2->getRecord("decallence"), 8532.48); + EXPECT_FLOAT_EQ(c2->getRecord("prevent recovery"), -64.64); +} + +TEST_F(CacheTransactionsTest, Aborting) { + EXPECT_EQ(db->ready(), true); + + LMDBAL::SizeType s1 = c1->count(); + LMDBAL::SizeType s2 = c2->count(); + + LMDBAL::TransactionID txn = db->beginTransaction(); + c1->addRecord(18, 40, txn); + c1->addRecord(85, -4, txn); + c1->addRecord(-5, -3, txn); + + c2->addRecord("tapestry", .053, txn); + c2->addRecord("pepper plants are beautifull", -7, txn); + c2->addRecord("horrots", -23.976, txn); + + EXPECT_EQ(c1->count(), s1); + EXPECT_EQ(c2->count(), s2); + + db->abortTransaction(txn); + + EXPECT_EQ(c1->count(), s1); + EXPECT_EQ(c2->count(), s2); +} + +TEST_F(CacheTransactionsTest, Reading) { + EXPECT_EQ(db->ready(), true); + + LMDBAL::TransactionID txn = db->beginReadOnlyTransaction(); + + EXPECT_EQ(c1->count(txn), 3); + EXPECT_EQ(c1->getRecord(5, txn), 13); + EXPECT_EQ(c1->getRecord(-53, txn), 782); + EXPECT_EQ(c1->getRecord(5892, txn), -37829); + + EXPECT_EQ(c2->count(txn), 3); + EXPECT_FLOAT_EQ(c2->getRecord("lorem", txn), 481); + EXPECT_FLOAT_EQ(c2->getRecord("decallence", txn), 8532.48); + EXPECT_FLOAT_EQ(c2->getRecord("prevent recovery", txn), -64.64); + + db->abortTransaction(txn); +} + +TEST_F(CacheTransactionsTest, ConcurentReading) { + // EXPECT_EQ(db->ready(), true); + // + // LMDBAL::SizeType size = c1->count(); + // LMDBAL::TransactionID txn = db->beginTransaction(); + // EXPECT_EQ(c1->getRecord(5, txn), 13); + // EXPECT_EQ(c1->getRecord(5), 13); + // + // c1->removeRecord(5, txn); + // + // EXPECT_FALSE(c1->checkRecord(5, txn)); + // EXPECT_EQ(c1->getRecord(5), 13); + // + // c1->addRecord(5, 571, txn); + // EXPECT_EQ(c1->getRecord(5, txn), 571); + // EXPECT_EQ(c1->getRecord(5), 13); + // + // c1->forceRecord(5, -472, txn); + // EXPECT_EQ(c1->getRecord(5, txn), -472); + // EXPECT_EQ(c1->getRecord(5), 13); + // + // c1->replaceAll({ + // {1, 75} + // }, txn); + // EXPECT_FALSE(c1->checkRecord(5, txn)); + // EXPECT_EQ(c1->getRecord(5), 13); + // EXPECT_EQ(c1->count(txn), 1); + // EXPECT_EQ(c1->count(), size); + // + // db->commitTransaction(txn); + // + // EXPECT_FALSE(c1->checkRecord(5)); + // EXPECT_EQ(c1->count(), 1); +} + +/* +TEST_F(CacheTransactionsTest, ConcurentModification) { + EXPECT_EQ(db->ready(), true); + + //if you start one writable transaction after another + //in a single thread like so: + // + //LMDBAL::TransactionID txn1 = db->beginTransaction(); + //LMDBAL::TransactionID txn2 = db->beginTransaction(); + // + //the execution should block on the second transaction + //so this test should preform in a sequence + //first the parent, then the child + + int pid = fork(); + if (pid == 0) { // I am the child + std::cout << "beggining second transaction" << std::endl; + LMDBAL::TransactionID txn2 = db->beginTransaction(); //<--- this is where the execution should pause + //and wait for the first transaction to get finished + std::cout << "checking result of the first transaction value" << std::endl; + EXPECT_EQ(c1->getRecord(5, txn2), 812); + + std::cout << "forcing second transaction value" << std::endl; + c1->forceRecord(5, -46, txn2); + + std::cout << "checking second transaction value" << std::endl; + EXPECT_EQ(c1->getRecord(5, txn2), -46); + + std::cout << "checking value independently" << std::endl; + EXPECT_EQ(c1->getRecord(5), 812); + + std::cout << "commiting second transaction" << std::endl; + db->commitTransaction(txn2); + + std::cout << "quitting child thread" << std::endl; + exit(testing::Test::HasFailure()); + } else { // I am the parent + std::cout << "beggining first transaction" << std::endl; + LMDBAL::TransactionID txn1 = db->beginTransaction(); + + std::cout << "putting parent thread to sleep for 5 ms" << std::endl; + usleep(5); + + std::cout << "adding first transaction value" << std::endl; + c1->addRecord(5, 812, txn1); + + std::cout << "checking first transaction value" << std::endl; + EXPECT_EQ(c1->getRecord(5, txn1), 812); + + std::cout << "checking value independently" << std::endl; + EXPECT_FALSE(c1->checkRecord(5)); + + std::cout << "commiting first transaction" << std::endl; + db->commitTransaction(txn1); + + std::cout << "waiting for the other thread to finish" << std::endl; + ASSERT_EQ(0, waitForChildFork(pid)); //child process should have no problems + } + + std::cout << "checking final result" << std::endl; + EXPECT_EQ(c1->getRecord(5), -46); +} +*/