localHistoryStorage #5

Merged
blue merged 6 commits from localHistoryStorage into master 2019-05-15 17:39:54 +00:00
6 changed files with 258 additions and 7 deletions
Showing only changes of commit aa1f728da9 - Show all commits

View File

@ -427,8 +427,30 @@ void Core::Account::onMamMessageReceived(const QString& bareJid, const QXmppMess
handleChatMessage(msg, false, true, true); handleChatMessage(msg, false, true, true);
} }
void Core::Account::requestAchive(const QString& jid) void Core::Account::requestArchive(const QString& jid, int count, const QString& before)
{ {
std::map<QString, Contact*>::const_iterator itr = contacts.find(jid);
if (itr == contacts.end()) {
qDebug() << "An attempt to request archive for" << jid << "in account" << name << ", but the contact with such id wasn't found, skipping";
return;
}
Contact* contact = itr->second;
contact->requestHistory(count, before);
Contact::ArchiveState as = contact->getArchiveState();
switch (as) {
case Contact::empty:
break;
case Contact::beginning:
break;
case Contact::chunk:
break;
case Contact::complete:
break;
case Contact::end:
break;
}
QXmppResultSetQuery query; QXmppResultSetQuery query;
query.setMax(100); query.setMax(100);
QDateTime from = QDateTime::currentDateTime().addDays(-7); QDateTime from = QDateTime::currentDateTime().addDays(-7);

View File

@ -41,7 +41,7 @@ public:
void setAvailability(Shared::Availability avail); void setAvailability(Shared::Availability avail);
QString getFullJid() const; QString getFullJid() const;
void sendMessage(const Shared::Message& data); void sendMessage(const Shared::Message& data);
void requestAchive(const QString& jid); void requestArchive(const QString& jid, int count, const QString& before);
signals: signals:
void connectionStateChanged(int); void connectionStateChanged(int);

View File

@ -28,6 +28,7 @@ Core::Archive::Archive(const QString& p_jid, QObject* parent):
QObject(parent), QObject(parent),
jid(p_jid), jid(p_jid),
opened(false), opened(false),
fromTheBeginning(false),
environment(), environment(),
main(), main(),
order() order()
@ -54,14 +55,16 @@ void Core::Archive::open(const QString& account)
} }
} }
mdb_env_set_maxdbs(environment, 2); mdb_env_set_maxdbs(environment, 3);
mdb_env_open(environment, path.toStdString().c_str(), 0, 0664); mdb_env_open(environment, path.toStdString().c_str(), 0, 0664);
MDB_txn *txn; MDB_txn *txn;
mdb_txn_begin(environment, NULL, 0, &txn); mdb_txn_begin(environment, NULL, 0, &txn);
mdb_dbi_open(txn, "main", MDB_CREATE, &main); mdb_dbi_open(txn, "main", MDB_CREATE, &main);
mdb_dbi_open(txn, "order", MDB_CREATE | MDB_INTEGERKEY, &order); mdb_dbi_open(txn, "order", MDB_CREATE | MDB_INTEGERKEY, &order);
mdb_dbi_open(txn, "order", MDB_CREATE, &stats);
mdb_txn_commit(txn); mdb_txn_commit(txn);
fromTheBeginning = _isFromTheBeginning();
opened = true; opened = true;
} }
} }
@ -69,6 +72,7 @@ void Core::Archive::open(const QString& account)
void Core::Archive::close() void Core::Archive::close()
{ {
if (opened) { if (opened) {
mdb_dbi_close(environment, stats);
mdb_dbi_close(environment, order); mdb_dbi_close(environment, order);
mdb_dbi_close(environment, main); mdb_dbi_close(environment, main);
mdb_env_close(environment); mdb_env_close(environment);
@ -76,7 +80,7 @@ void Core::Archive::close()
} }
} }
void Core::Archive::addElement(const Shared::Message& message) bool Core::Archive::addElement(const Shared::Message& message)
{ {
if (!opened) { if (!opened) {
throw Closed("addElement", jid.toStdString()); throw Closed("addElement", jid.toStdString());
@ -95,7 +99,7 @@ void Core::Archive::addElement(const Shared::Message& message)
MDB_txn *txn; MDB_txn *txn;
mdb_txn_begin(environment, NULL, 0, &txn); mdb_txn_begin(environment, NULL, 0, &txn);
int rc; int rc;
rc = mdb_put(txn, main, &lmdbKey, &lmdbData, 0); rc = mdb_put(txn, main, &lmdbKey, &lmdbData, MDB_NOOVERWRITE);
if (rc == 0) { if (rc == 0) {
MDB_val orderKey; MDB_val orderKey;
orderKey.mv_size = 8; orderKey.mv_size = 8;
@ -105,15 +109,19 @@ void Core::Archive::addElement(const Shared::Message& message)
if (rc) { if (rc) {
qDebug() << "An element couldn't be inserted into the index" << mdb_strerror(rc); qDebug() << "An element couldn't be inserted into the index" << mdb_strerror(rc);
mdb_txn_abort(txn); mdb_txn_abort(txn);
return false;
} else { } else {
rc = mdb_txn_commit(txn); rc = mdb_txn_commit(txn);
if (rc) { if (rc) {
qDebug() << "A transaction error: " << mdb_strerror(rc); qDebug() << "A transaction error: " << mdb_strerror(rc);
return false;
} }
return true;
} }
} else { } else {
qDebug() << "An element couldn't been added to the archive, skipping" << mdb_strerror(rc); qDebug() << "An element couldn't been added to the archive, skipping" << mdb_strerror(rc);
mdb_txn_abort(txn); mdb_txn_abort(txn);
return false;
} }
} }
@ -237,3 +245,150 @@ long unsigned int Core::Archive::size() const
mdb_txn_abort(txn); mdb_txn_abort(txn);
return stat.ms_entries; return stat.ms_entries;
} }
std::list<Shared::Message> Core::Archive::getBefore(int count, const QString& id)
{
if (!opened) {
throw Closed("getBefore", jid.toStdString());
}
std::list<Shared::Message> res;
MDB_cursor* cursor;
MDB_txn *txn;
MDB_val lmdbKey, lmdbData;
int rc;
rc = mdb_txn_begin(environment, NULL, MDB_RDONLY, &txn);
if (id == "") {
rc = mdb_cursor_open(txn, order, &cursor);
rc = mdb_cursor_get(cursor, &lmdbKey, &lmdbData, MDB_LAST);
if (rc) {
qDebug() << "Error geting before newest newest " << mdb_strerror(rc);
mdb_cursor_close(cursor);
mdb_txn_abort(txn);
throw new Empty(jid.toStdString());
} else {
std::string sId((char*)lmdbData.mv_data, lmdbData.mv_size);
}
} else {
lmdbKey.mv_size = id.size();
lmdbKey.mv_data = (char*)id.toStdString().c_str();
rc = mdb_get(txn, main, &lmdbKey, &lmdbData);
if (rc) {
qDebug() <<"Error getting before: " << mdb_strerror(rc);
mdb_txn_abort(txn);
throw NotFound(id.toStdString(), jid.toStdString());
} else {
QByteArray ba((char*)lmdbData.mv_data, lmdbData.mv_size);
QDataStream ds(&ba, QIODevice::ReadOnly);
Shared::Message msg;
msg.deserialize(ds);
quint64 stamp = msg.getTime().toMSecsSinceEpoch();
lmdbKey.mv_data = (quint8*)&stamp;
lmdbKey.mv_size = 8;
rc = mdb_cursor_open(txn, order, &cursor);
rc = mdb_cursor_get(cursor, &lmdbKey, &lmdbData, MDB_SET);
if (rc) {
qDebug() << "Error getting before " << mdb_strerror(rc);
mdb_cursor_close(cursor);
mdb_txn_abort(txn);
throw new NotFound(id.toStdString(), jid.toStdString());
} else {
rc = mdb_cursor_get(cursor, &lmdbKey, &lmdbData, MDB_PREV);
if (rc) {
qDebug() << "Error getting before " << mdb_strerror(rc);
mdb_cursor_close(cursor);
mdb_txn_abort(txn);
throw new NotFound(id.toStdString(), jid.toStdString());
}
}
}
}
do {
QByteArray ba((char*)lmdbData.mv_data, lmdbData.mv_size);
QDataStream ds(&ba, QIODevice::ReadOnly);
res.emplace_back();
Shared::Message& msg = res.back();
msg.deserialize(ds);
--count;
} while (count > 0 && mdb_cursor_get(cursor, &lmdbKey, &lmdbData, MDB_PREV) == 0);
mdb_cursor_close(cursor);
mdb_txn_abort(txn);
return res;
}
bool Core::Archive::_isFromTheBeginning()
{
std::string strKey = "beginning";
MDB_val lmdbKey, lmdbData;
lmdbKey.mv_size = strKey.size();
lmdbKey.mv_data = (char*)strKey.c_str();
MDB_txn *txn;
int rc;
mdb_txn_begin(environment, NULL, MDB_RDONLY, &txn);
rc = mdb_get(txn, stats, &lmdbKey, &lmdbData);
if (rc == MDB_NOTFOUND) {
mdb_txn_abort(txn);
return false;
} else if (rc) {
qDebug() <<"isFromTheBeginning error: " << mdb_strerror(rc);
mdb_txn_abort(txn);
throw NotFound(strKey, jid.toStdString());
} else {
uint8_t value = *(uint8_t*)(lmdbData.mv_data);
bool is;
if (value == 144) {
is = false;
} else if (value == 72) {
is = true;
} else {
qDebug() <<"isFromTheBeginning error: stored value doesn't match any magic number, the answer is most probably wrong";
}
return is;
}
}
bool Core::Archive::isFromTheBeginning()
{
if (!opened) {
throw Closed("isFromTheBeginning", jid.toStdString());
}
return fromTheBeginning;
}
bool Core::Archive::setFromTheBeginning(bool is)
{
if (!opened) {
throw Closed("setFromTheBeginning", jid.toStdString());
}
if (fromTheBeginning != is) {
fromTheBeginning = is;
const std::string& id = "beginning";
uint8_t value = 144;
if (is) {
value = 72;
}
MDB_val lmdbKey, lmdbData;
lmdbKey.mv_size = id.size();
lmdbKey.mv_data = (char*)id.c_str();
lmdbData.mv_size = sizeof value;
lmdbData.mv_data = &value;
MDB_txn *txn;
mdb_txn_begin(environment, NULL, 0, &txn);
int rc;
rc = mdb_put(txn, stats, &lmdbKey, &lmdbData, 0);
if (rc != 0) {
qDebug() << "Couldn't store beginning key into stat database:" << mdb_strerror(rc);
mdb_txn_abort(txn);
}
}
}

View File

@ -23,6 +23,7 @@
#include "../global.h" #include "../global.h"
#include <lmdb.h> #include <lmdb.h>
#include "../exception.h" #include "../exception.h"
#include <list>
namespace Core { namespace Core {
@ -36,7 +37,7 @@ public:
void open(const QString& account); void open(const QString& account);
void close(); void close();
void addElement(const Shared::Message& message); bool addElement(const Shared::Message& message);
Shared::Message getElement(const QString& id); Shared::Message getElement(const QString& id);
Shared::Message oldest(); Shared::Message oldest();
QString oldestId(); QString oldestId();
@ -44,6 +45,9 @@ public:
QString newestId(); QString newestId();
void clear(); void clear();
long unsigned int size() const; long unsigned int size() const;
std::list<Shared::Message> getBefore(int count, const QString& id);
bool isFromTheBeginning();
bool setFromTheBeginning(bool is);
public: public:
const QString jid; const QString jid;
@ -97,9 +101,13 @@ public:
private: private:
bool opened; bool opened;
bool fromTheBeginning;
MDB_env* environment; MDB_env* environment;
MDB_dbi main; MDB_dbi main;
MDB_dbi order; MDB_dbi order;
MDB_dbi stats;
bool _isFromTheBeginning();
}; };
} }

View File

@ -25,9 +25,22 @@ Core::Contact::Contact(const QString& pJid, const QString& account, QObject* par
groups(), groups(),
archiveState(empty), archiveState(empty),
archive(new Archive(jid)), archive(new Archive(jid)),
subscriptionState(Shared::unknown) subscriptionState(Shared::unknown),
syncronizing(false),
requestedCount(0),
receivedCount(0),
hisoryCache(),
appendCache(),
requestCache()
{ {
archive->open(account); archive->open(account);
if (archive->isFromTheBeginning()) {
archiveState = beginning;
} else {
if (archive->size() != 0) {
archiveState = chunk;
}
}
} }
Core::Contact::~Contact() Core::Contact::~Contact()
@ -91,3 +104,42 @@ unsigned int Core::Contact::groupsCount() const
{ {
return groups.size(); return groups.size();
} }
void Core::Contact::addMessageToArchive(const Shared::Message& msg)
{
}
void Core::Contact::requestHistory(int count, const QString& before)
{
if (syncronizing) {
requestCache.emplace_back(count, before);
} else {
switch (archiveState) {
case empty:
if (appendCache.size() != 0) {
//from last
} else {
//search
}
break;
case beginning:
//from last
break;
case end:
//try
break;
case chunk:
//from last
break;
case complete:
//just give
break;
}
syncronizing = true;
requestedCount = count;
receivedCount = 0;
hisoryCache.clear();
}
}

View File

@ -23,6 +23,7 @@
#include <QSet> #include <QSet>
#include "archive.h" #include "archive.h"
#include "../global.h" #include "../global.h"
#include <list>
namespace Core { namespace Core {
@ -48,12 +49,18 @@ public:
void setSubscriptionState(Shared::SubscriptionState state); void setSubscriptionState(Shared::SubscriptionState state);
Shared::SubscriptionState getSubscriptionState() const; Shared::SubscriptionState getSubscriptionState() const;
unsigned int groupsCount() const; unsigned int groupsCount() const;
void addMessageToArchive(const Shared::Message& msg);
void appendMessageToArchive(const Shared::Message& msg);
void flushMessagesToArchive();
void requestHistory(int count, const QString& before);
signals: signals:
void groupAdded(const QString& name); void groupAdded(const QString& name);
void groupRemoved(const QString& name); void groupRemoved(const QString& name);
void nameChanged(const QString& name); void nameChanged(const QString& name);
void subscriptionStateChanged(Shared::SubscriptionState state); void subscriptionStateChanged(Shared::SubscriptionState state);
void historyResponse(const std::list<Shared::Message>& messages);
void needEarlierHistory(const QString& before, const QString& after, int count, const QDateTime& from, const QDateTime& to);
public: public:
const QString jid; const QString jid;
@ -64,6 +71,13 @@ private:
ArchiveState archiveState; ArchiveState archiveState;
Archive* archive; Archive* archive;
Shared::SubscriptionState subscriptionState; Shared::SubscriptionState subscriptionState;
bool syncronizing;
int requestedCount;
int receivedCount;
std::list<Shared::Message> hisoryCache;
std::list<Shared::Message> appendCache;
std::list<std::pair<int, QString>> requestCache;
}; };
} }