diff --git a/core/archive.cpp b/core/archive.cpp index ac8a852..5efe17e 100644 --- a/core/archive.cpp +++ b/core/archive.cpp @@ -206,6 +206,62 @@ Shared::Message Core::Archive::oldest() return getElement(oldestId()); } +unsigned int Core::Archive::addElements(const std::list& messages) +{ + if (!opened) { + throw Closed("addElements", jid.toStdString()); + } + + int success = 0; + int rc = 0; + MDB_val lmdbKey, lmdbData; + MDB_txn *txn; + mdb_txn_begin(environment, NULL, 0, &txn); + std::list::const_iterator itr = messages.begin(); + while (rc == 0 && itr != messages.end()) { + const Shared::Message& message = *itr; + QByteArray ba; + QDataStream ds(&ba, QIODevice::WriteOnly); + message.serialize(ds); + quint64 stamp = message.getTime().toMSecsSinceEpoch(); + const std::string& id = message.getId().toStdString(); + + lmdbKey.mv_size = id.size(); + lmdbKey.mv_data = (char*)id.c_str(); + lmdbData.mv_size = ba.size(); + lmdbData.mv_data = (uint8_t*)ba.data(); + rc = mdb_put(txn, main, &lmdbKey, &lmdbData, MDB_NOOVERWRITE); + if (rc == 0) { + MDB_val orderKey; + orderKey.mv_size = 8; + orderKey.mv_data = (uint8_t*) &stamp; + + rc = mdb_put(txn, order, &orderKey, &lmdbKey, 0); + if (rc) { + qDebug() << "An element couldn't be inserted into the index, aborting the transaction" << mdb_strerror(rc); + } else { + success++; + } + } else { + if (rc == MDB_KEYEXIST) { + rc = 0; + } else { + qDebug() << "An element couldn't been added to the archive, aborting the transaction" << mdb_strerror(rc); + } + } + } + + if (rc != 0) { + mdb_txn_abort(txn); + success = 0; + } else { + mdb_txn_commit(txn); + } + + return success; +} + + QString Core::Archive::oldestId() { if (!opened) { diff --git a/core/contact.cpp b/core/contact.cpp index 777ec6c..2b29003 100644 --- a/core/contact.cpp +++ b/core/contact.cpp @@ -108,7 +108,9 @@ unsigned int Core::Contact::groupsCount() const void Core::Contact::addMessageToArchive(const Shared::Message& msg) { - hisoryCache.emplace_back(msg); + if (msg.getId().size() > 0 && msg.getBody().size() > 0) { + hisoryCache.emplace_back(msg); + } } void Core::Contact::requestHistory(int count, const QString& before) @@ -150,57 +152,37 @@ void Core::Contact::performRequest(int count, const QString& before) switch (archiveState) { case empty: - emit needEarlierHistory(before, "", QDateTime(), QDateTime()); + emit needHistory(before, "", QDateTime(), QDateTime()); break; case chunk: - case beginning: { - bool found = false; - if (appendCache.size() != 0) { - for (std::list::const_reverse_iterator itr = appendCache.rbegin(), end = appendCache.rend(); itr != end; ++itr) { - const Shared::Message& msg = *itr; - if (found) { - responseCache.emplace_front(msg); - } else { - if (msg.getId() == before) { - found = true; - responseCache.emplace_front(*itr); - } - } - if (responseCache.size() == count) { - break; - } - } - if (responseCache.size() == count) { - nextRequest(); - break; - } - } - if (found) { - requestedBefore = responseCache.front().getId(); - emit needEarlierHistory(requestedBefore, "", QDateTime(), QDateTime()); - } else { - if (requestFromArchive(before)) { - nextRequest(); - } - } + case beginning: + if (count != -1) { + requestCache.emplace_back(requestedCount, before); + requestedCount = -1; } + emit needHistory("", archive->newestId(), QDateTime(), QDateTime()); break; - case end: { + case end: + if (count != -1) { bool found = requestFromArchive(before); if (found) { int rSize = responseCache.size(); if (rSize < count) { if (rSize != 0) { - requestedBefore = responseCache.front().getId(); - emit needEarlierHistory(responseCache.front().getId(), "", QDateTime(), QDateTime()); + emit needHistory(responseCache.front().getId(), "", QDateTime(), QDateTime()); } else { - requestedBefore = before; - emit needEarlierHistory(before, "", QDateTime(), QDateTime()); + emit needHistory(before, "", QDateTime(), QDateTime()); } } else { nextRequest(); } + } else { + requestCache.emplace_back(requestedCount, before); + requestedCount = -1; + emit needHistory(archive->oldestId(), "", QDateTime(), QDateTime()); } + } else { + emit needHistory(archive->oldestId(), "", QDateTime(), QDateTime()); } break; case complete: @@ -212,8 +194,7 @@ void Core::Contact::performRequest(int count, const QString& before) } } -bool Core::Contact::requestFromArchive(const QString& before) -{ +bool Core::Contact::requestFromArchive(const QString& before) { std::list arc; QString lBefore; if (responseCache.size() > 0) { @@ -229,7 +210,7 @@ bool Core::Contact::requestFromArchive(const QString& before) } catch (Archive::NotFound e) { requestCache.emplace_back(requestedCount, before); requestedCount = -1; - requestEarlierToSync(); + emit needHistory(archive->oldestId(), "", QDateTime(), QDateTime()); return false; } } else { @@ -239,79 +220,107 @@ bool Core::Contact::requestFromArchive(const QString& before) //may be even it's a signal that the history is now complete? return true; } catch (Archive::NotFound e) { - requestEarlierToSync(); + emit needHistory(archive->oldestId(), "", QDateTime(), QDateTime()); return false; } } } -void Core::Contact::requestEarlierToSync() -{ - switch (archiveState) { - case empty: - break; - case beginning: //need to reach complete - case chunk: //need to reach end - emit needEarlierHistory("", archive->newestId(), QDateTime(), QDateTime()); - break; - case end: //need to reach complete - emit needEarlierHistory(archive->oldestId(), "", QDateTime(), QDateTime()); - break; - case complete: //nothing to sync - break; - } -} - void Core::Contact::appendMessageToArchive(const Shared::Message& msg) { - if (msg.getId().size() > 0 && msg.getBody().size() > 0) { - switch (archiveState) { - case empty: - if (archive->addElement(msg)) { - archiveState = end; - }; - requestHistory(-1, msg.getId()); - break; - case beginning: - appendCache.emplace_back(msg); - requestHistory(-1, msg.getId()); - break; - case end: - archive->addElement(msg); - break; - case chunk: - appendCache.emplace_back(msg); - requestHistory(-1, msg.getId()); - break; - case complete: - archive->addElement(msg); - break; + const QString& id = msg.getId(); + if (id.size() > 0) { + if (msg.getBody().size() > 0) { + switch (archiveState) { + case empty: + if (archive->addElement(msg)) { + archiveState = end; + }; + if (!syncronizing) { + requestHistory(-1, id); + } + break; + case beginning: + appendCache.emplace_back(msg); + if (!syncronizing) { + requestHistory(-1, id); + } + break; + case end: + archive->addElement(msg); + break; + case chunk: + appendCache.emplace_back(msg); + if (!syncronizing) { + requestHistory(-1, id); + } + break; + case complete: + archive->addElement(msg); + break; + } + } else if (!syncronizing && archiveState == empty) { + requestHistory(-1, id); } - } } -void Core::Contact::flushMessagesToArchive(bool finished, const QString& lastId) +void Core::Contact::flushMessagesToArchive(bool finished, const QString& firstId, const QString& lastId) { - unsigned int amount(0); if (hisoryCache.size() > 0) { - amount = archive->addElements(hisoryCache); + archive->addElements(hisoryCache); + hisoryCache.clear(); } - if (requestedCount == -1) { - if (amount >= requestedCount - responseCache.size()) { - if (requestFromArchive(requestedBefore)){ + switch (archiveState) { + break; + case beginning: + if (finished) { + archiveState = complete; nextRequest(); - return; + } else { + emit needHistory("", lastId, QDateTime(), QDateTime()); } - } - if (!finished) { - if (lastId.size() != 0) { - + case chunk: + if (finished) { + archiveState = end; + nextRequest(); + } else { + emit needHistory("", lastId, QDateTime(), QDateTime()); } - } - - } else { - + break; + case empty: + archiveState = end; + case end: + if (finished) { + archiveState = complete; + archive->setFromTheBeginning(true); + } + if (requestedCount != -1) { + QString before; + if (responseCache.size() > 0) { + before = responseCache.front().getId(); + } else { + before = requestedBefore; + } + if (!requestFromArchive(before)) { + qDebug("Something went terrible wrong flushing messages to the archive"); + } + if (requestedCount < responseCache.size()) { + if (archiveState == complete) { + nextRequest(); + } else { + emit needHistory(firstId, "", QDateTime(), QDateTime()); + } + } else { + nextRequest(); + } + } else { + nextRequest(); + } + break; + case complete: + nextRequest(); + break; } } diff --git a/core/contact.h b/core/contact.h index 8e2d35a..80c2db0 100644 --- a/core/contact.h +++ b/core/contact.h @@ -41,7 +41,7 @@ public: }; Contact(const QString& pJid, const QString& account, QObject* parent = 0); ~Contact(); - + ArchiveState getArchiveState() const; QString getName() const; void setName(const QString& n); @@ -52,17 +52,17 @@ public: unsigned int groupsCount() const; void addMessageToArchive(const Shared::Message& msg); void appendMessageToArchive(const Shared::Message& msg); - void flushMessagesToArchive(bool finished, const QString& lastId); + void flushMessagesToArchive(bool finished, const QString& firstId, const QString& lastId); void requestHistory(int count, const QString& before); - + signals: void groupAdded(const QString& name); void groupRemoved(const QString& name); void nameChanged(const QString& name); void subscriptionStateChanged(Shared::SubscriptionState state); void historyResponse(const std::list& messages); - void needEarlierHistory(const QString& before, const QString& after, const QDateTime& from, const QDateTime& to); - + void needHistory(const QString& before, const QString& after, const QDateTime& from, const QDateTime& to); + public: const QString jid; @@ -72,7 +72,7 @@ private: ArchiveState archiveState; Archive* archive; Shared::SubscriptionState subscriptionState; - + bool syncronizing; int requestedCount; QString requestedBefore; @@ -80,12 +80,11 @@ private: std::list appendCache; std::list responseCache; std::list> requestCache; - + private: void nextRequest(); void performRequest(int count, const QString& before); bool requestFromArchive(const QString& before); - void requestEarlierToSync(); }; }