diff --git a/core/account.cpp b/core/account.cpp index 9517c39..d6d36e4 100644 --- a/core/account.cpp +++ b/core/account.cpp @@ -427,8 +427,30 @@ void Core::Account::onMamMessageReceived(const QString& bareJid, const QXmppMess handleChatMessage(msg, false, true, true); } -void Core::Account::requestAchive(const QString& jid) +void Core::Account::requestArchive(const QString& jid, int count, const QString& before) { + std::map::const_iterator itr = contacts.find(jid); + if (itr == contacts.end()) { + qDebug() << "An attempt to request archive for" << jid << "in account" << name << ", but the contact with such id wasn't found, skipping"; + return; + } + Contact* contact = itr->second; + + contact->requestHistory(count, before); + Contact::ArchiveState as = contact->getArchiveState(); + switch (as) { + case Contact::empty: + break; + case Contact::beginning: + break; + case Contact::chunk: + break; + case Contact::complete: + break; + case Contact::end: + break; + } + QXmppResultSetQuery query; query.setMax(100); QDateTime from = QDateTime::currentDateTime().addDays(-7); diff --git a/core/account.h b/core/account.h index 72abb55..9b41cbf 100644 --- a/core/account.h +++ b/core/account.h @@ -41,7 +41,7 @@ public: void setAvailability(Shared::Availability avail); QString getFullJid() const; void sendMessage(const Shared::Message& data); - void requestAchive(const QString& jid); + void requestArchive(const QString& jid, int count, const QString& before); signals: void connectionStateChanged(int); diff --git a/core/archive.cpp b/core/archive.cpp index 70327a7..49df1c0 100644 --- a/core/archive.cpp +++ b/core/archive.cpp @@ -28,6 +28,7 @@ Core::Archive::Archive(const QString& p_jid, QObject* parent): QObject(parent), jid(p_jid), opened(false), + fromTheBeginning(false), environment(), main(), order() @@ -54,14 +55,16 @@ void Core::Archive::open(const QString& account) } } - mdb_env_set_maxdbs(environment, 2); + mdb_env_set_maxdbs(environment, 3); mdb_env_open(environment, path.toStdString().c_str(), 0, 0664); MDB_txn *txn; mdb_txn_begin(environment, NULL, 0, &txn); mdb_dbi_open(txn, "main", MDB_CREATE, &main); mdb_dbi_open(txn, "order", MDB_CREATE | MDB_INTEGERKEY, &order); + mdb_dbi_open(txn, "order", MDB_CREATE, &stats); mdb_txn_commit(txn); + fromTheBeginning = _isFromTheBeginning(); opened = true; } } @@ -69,6 +72,7 @@ void Core::Archive::open(const QString& account) void Core::Archive::close() { if (opened) { + mdb_dbi_close(environment, stats); mdb_dbi_close(environment, order); mdb_dbi_close(environment, main); mdb_env_close(environment); @@ -76,7 +80,7 @@ void Core::Archive::close() } } -void Core::Archive::addElement(const Shared::Message& message) +bool Core::Archive::addElement(const Shared::Message& message) { if (!opened) { throw Closed("addElement", jid.toStdString()); @@ -95,7 +99,7 @@ void Core::Archive::addElement(const Shared::Message& message) MDB_txn *txn; mdb_txn_begin(environment, NULL, 0, &txn); int rc; - rc = mdb_put(txn, main, &lmdbKey, &lmdbData, 0); + rc = mdb_put(txn, main, &lmdbKey, &lmdbData, MDB_NOOVERWRITE); if (rc == 0) { MDB_val orderKey; orderKey.mv_size = 8; @@ -105,15 +109,19 @@ void Core::Archive::addElement(const Shared::Message& message) if (rc) { qDebug() << "An element couldn't be inserted into the index" << mdb_strerror(rc); mdb_txn_abort(txn); + return false; } else { rc = mdb_txn_commit(txn); if (rc) { qDebug() << "A transaction error: " << mdb_strerror(rc); + return false; } + return true; } } else { qDebug() << "An element couldn't been added to the archive, skipping" << mdb_strerror(rc); mdb_txn_abort(txn); + return false; } } @@ -237,3 +245,150 @@ long unsigned int Core::Archive::size() const mdb_txn_abort(txn); return stat.ms_entries; } + +std::list Core::Archive::getBefore(int count, const QString& id) +{ + if (!opened) { + throw Closed("getBefore", jid.toStdString()); + } + std::list res; + MDB_cursor* cursor; + MDB_txn *txn; + MDB_val lmdbKey, lmdbData; + int rc; + rc = mdb_txn_begin(environment, NULL, MDB_RDONLY, &txn); + if (id == "") { + rc = mdb_cursor_open(txn, order, &cursor); + rc = mdb_cursor_get(cursor, &lmdbKey, &lmdbData, MDB_LAST); + if (rc) { + qDebug() << "Error geting before newest newest " << mdb_strerror(rc); + mdb_cursor_close(cursor); + mdb_txn_abort(txn); + throw new Empty(jid.toStdString()); + } else { + std::string sId((char*)lmdbData.mv_data, lmdbData.mv_size); + } + } else { + lmdbKey.mv_size = id.size(); + lmdbKey.mv_data = (char*)id.toStdString().c_str(); + rc = mdb_get(txn, main, &lmdbKey, &lmdbData); + if (rc) { + qDebug() <<"Error getting before: " << mdb_strerror(rc); + mdb_txn_abort(txn); + throw NotFound(id.toStdString(), jid.toStdString()); + } else { + QByteArray ba((char*)lmdbData.mv_data, lmdbData.mv_size); + QDataStream ds(&ba, QIODevice::ReadOnly); + + Shared::Message msg; + msg.deserialize(ds); + quint64 stamp = msg.getTime().toMSecsSinceEpoch(); + lmdbKey.mv_data = (quint8*)&stamp; + lmdbKey.mv_size = 8; + + rc = mdb_cursor_open(txn, order, &cursor); + rc = mdb_cursor_get(cursor, &lmdbKey, &lmdbData, MDB_SET); + + if (rc) { + qDebug() << "Error getting before " << mdb_strerror(rc); + mdb_cursor_close(cursor); + mdb_txn_abort(txn); + throw new NotFound(id.toStdString(), jid.toStdString()); + } else { + rc = mdb_cursor_get(cursor, &lmdbKey, &lmdbData, MDB_PREV); + if (rc) { + qDebug() << "Error getting before " << mdb_strerror(rc); + mdb_cursor_close(cursor); + mdb_txn_abort(txn); + throw new NotFound(id.toStdString(), jid.toStdString()); + } + } + } + } + + do { + QByteArray ba((char*)lmdbData.mv_data, lmdbData.mv_size); + QDataStream ds(&ba, QIODevice::ReadOnly); + + res.emplace_back(); + Shared::Message& msg = res.back(); + msg.deserialize(ds); + + --count; + + } while (count > 0 && mdb_cursor_get(cursor, &lmdbKey, &lmdbData, MDB_PREV) == 0); + + mdb_cursor_close(cursor); + mdb_txn_abort(txn); + return res; +} + +bool Core::Archive::_isFromTheBeginning() +{ + std::string strKey = "beginning"; + + MDB_val lmdbKey, lmdbData; + lmdbKey.mv_size = strKey.size(); + lmdbKey.mv_data = (char*)strKey.c_str(); + + MDB_txn *txn; + int rc; + mdb_txn_begin(environment, NULL, MDB_RDONLY, &txn); + rc = mdb_get(txn, stats, &lmdbKey, &lmdbData); + if (rc == MDB_NOTFOUND) { + mdb_txn_abort(txn); + return false; + } else if (rc) { + qDebug() <<"isFromTheBeginning error: " << mdb_strerror(rc); + mdb_txn_abort(txn); + throw NotFound(strKey, jid.toStdString()); + } else { + uint8_t value = *(uint8_t*)(lmdbData.mv_data); + bool is; + if (value == 144) { + is = false; + } else if (value == 72) { + is = true; + } else { + qDebug() <<"isFromTheBeginning error: stored value doesn't match any magic number, the answer is most probably wrong"; + } + return is; + } +} + +bool Core::Archive::isFromTheBeginning() +{ + if (!opened) { + throw Closed("isFromTheBeginning", jid.toStdString()); + } + return fromTheBeginning; +} + +bool Core::Archive::setFromTheBeginning(bool is) +{ + if (!opened) { + throw Closed("setFromTheBeginning", jid.toStdString()); + } + if (fromTheBeginning != is) { + fromTheBeginning = is; + const std::string& id = "beginning"; + uint8_t value = 144; + if (is) { + value = 72; + } + + MDB_val lmdbKey, lmdbData; + lmdbKey.mv_size = id.size(); + lmdbKey.mv_data = (char*)id.c_str(); + lmdbData.mv_size = sizeof value; + lmdbData.mv_data = &value; + MDB_txn *txn; + mdb_txn_begin(environment, NULL, 0, &txn); + int rc; + rc = mdb_put(txn, stats, &lmdbKey, &lmdbData, 0); + if (rc != 0) { + qDebug() << "Couldn't store beginning key into stat database:" << mdb_strerror(rc); + mdb_txn_abort(txn); + } + } +} diff --git a/core/archive.h b/core/archive.h index 614ce40..025d927 100644 --- a/core/archive.h +++ b/core/archive.h @@ -23,6 +23,7 @@ #include "../global.h" #include #include "../exception.h" +#include namespace Core { @@ -36,7 +37,7 @@ public: void open(const QString& account); void close(); - void addElement(const Shared::Message& message); + bool addElement(const Shared::Message& message); Shared::Message getElement(const QString& id); Shared::Message oldest(); QString oldestId(); @@ -44,6 +45,9 @@ public: QString newestId(); void clear(); long unsigned int size() const; + std::list getBefore(int count, const QString& id); + bool isFromTheBeginning(); + bool setFromTheBeginning(bool is); public: const QString jid; @@ -97,9 +101,13 @@ public: private: bool opened; + bool fromTheBeginning; MDB_env* environment; MDB_dbi main; MDB_dbi order; + MDB_dbi stats; + + bool _isFromTheBeginning(); }; } diff --git a/core/contact.cpp b/core/contact.cpp index 1cc7e8a..8525f96 100644 --- a/core/contact.cpp +++ b/core/contact.cpp @@ -25,9 +25,22 @@ Core::Contact::Contact(const QString& pJid, const QString& account, QObject* par groups(), archiveState(empty), archive(new Archive(jid)), - subscriptionState(Shared::unknown) + subscriptionState(Shared::unknown), + syncronizing(false), + requestedCount(0), + receivedCount(0), + hisoryCache(), + appendCache(), + requestCache() { archive->open(account); + if (archive->isFromTheBeginning()) { + archiveState = beginning; + } else { + if (archive->size() != 0) { + archiveState = chunk; + } + } } Core::Contact::~Contact() @@ -91,3 +104,42 @@ unsigned int Core::Contact::groupsCount() const { return groups.size(); } + +void Core::Contact::addMessageToArchive(const Shared::Message& msg) +{ + +} + +void Core::Contact::requestHistory(int count, const QString& before) +{ + if (syncronizing) { + requestCache.emplace_back(count, before); + } else { + switch (archiveState) { + case empty: + if (appendCache.size() != 0) { + //from last + } else { + //search + } + break; + case beginning: + //from last + break; + case end: + //try + break; + case chunk: + //from last + break; + case complete: + //just give + break; + } + syncronizing = true; + requestedCount = count; + receivedCount = 0; + hisoryCache.clear(); + } +} + diff --git a/core/contact.h b/core/contact.h index b245d45..cc3f413 100644 --- a/core/contact.h +++ b/core/contact.h @@ -23,6 +23,7 @@ #include #include "archive.h" #include "../global.h" +#include namespace Core { @@ -48,12 +49,18 @@ public: void setSubscriptionState(Shared::SubscriptionState state); Shared::SubscriptionState getSubscriptionState() const; unsigned int groupsCount() const; + void addMessageToArchive(const Shared::Message& msg); + void appendMessageToArchive(const Shared::Message& msg); + void flushMessagesToArchive(); + void requestHistory(int count, const QString& before); signals: void groupAdded(const QString& name); void groupRemoved(const QString& name); void nameChanged(const QString& name); void subscriptionStateChanged(Shared::SubscriptionState state); + void historyResponse(const std::list& messages); + void needEarlierHistory(const QString& before, const QString& after, int count, const QDateTime& from, const QDateTime& to); public: const QString jid; @@ -64,6 +71,13 @@ private: ArchiveState archiveState; Archive* archive; Shared::SubscriptionState subscriptionState; + + bool syncronizing; + int requestedCount; + int receivedCount; + std::list hisoryCache; + std::list appendCache; + std::list> requestCache; }; }