a bit better appoach

This commit is contained in:
Blue 2019-04-29 00:34:28 +03:00
parent 88ea4cf3ac
commit d09fef5a34
3 changed files with 168 additions and 104 deletions

View File

@ -206,6 +206,62 @@ Shared::Message Core::Archive::oldest()
return getElement(oldestId()); return getElement(oldestId());
} }
unsigned int Core::Archive::addElements(const std::list<Shared::Message>& messages)
{
if (!opened) {
throw Closed("addElements", jid.toStdString());
}
int success = 0;
int rc = 0;
MDB_val lmdbKey, lmdbData;
MDB_txn *txn;
mdb_txn_begin(environment, NULL, 0, &txn);
std::list<Shared::Message>::const_iterator itr = messages.begin();
while (rc == 0 && itr != messages.end()) {
const Shared::Message& message = *itr;
QByteArray ba;
QDataStream ds(&ba, QIODevice::WriteOnly);
message.serialize(ds);
quint64 stamp = message.getTime().toMSecsSinceEpoch();
const std::string& id = message.getId().toStdString();
lmdbKey.mv_size = id.size();
lmdbKey.mv_data = (char*)id.c_str();
lmdbData.mv_size = ba.size();
lmdbData.mv_data = (uint8_t*)ba.data();
rc = mdb_put(txn, main, &lmdbKey, &lmdbData, MDB_NOOVERWRITE);
if (rc == 0) {
MDB_val orderKey;
orderKey.mv_size = 8;
orderKey.mv_data = (uint8_t*) &stamp;
rc = mdb_put(txn, order, &orderKey, &lmdbKey, 0);
if (rc) {
qDebug() << "An element couldn't be inserted into the index, aborting the transaction" << mdb_strerror(rc);
} else {
success++;
}
} else {
if (rc == MDB_KEYEXIST) {
rc = 0;
} else {
qDebug() << "An element couldn't been added to the archive, aborting the transaction" << mdb_strerror(rc);
}
}
}
if (rc != 0) {
mdb_txn_abort(txn);
success = 0;
} else {
mdb_txn_commit(txn);
}
return success;
}
QString Core::Archive::oldestId() QString Core::Archive::oldestId()
{ {
if (!opened) { if (!opened) {

View File

@ -108,7 +108,9 @@ unsigned int Core::Contact::groupsCount() const
void Core::Contact::addMessageToArchive(const Shared::Message& msg) void Core::Contact::addMessageToArchive(const Shared::Message& msg)
{ {
hisoryCache.emplace_back(msg); if (msg.getId().size() > 0 && msg.getBody().size() > 0) {
hisoryCache.emplace_back(msg);
}
} }
void Core::Contact::requestHistory(int count, const QString& before) void Core::Contact::requestHistory(int count, const QString& before)
@ -150,57 +152,37 @@ void Core::Contact::performRequest(int count, const QString& before)
switch (archiveState) { switch (archiveState) {
case empty: case empty:
emit needEarlierHistory(before, "", QDateTime(), QDateTime()); emit needHistory(before, "", QDateTime(), QDateTime());
break; break;
case chunk: case chunk:
case beginning: { case beginning:
bool found = false; if (count != -1) {
if (appendCache.size() != 0) { requestCache.emplace_back(requestedCount, before);
for (std::list<Shared::Message>::const_reverse_iterator itr = appendCache.rbegin(), end = appendCache.rend(); itr != end; ++itr) { requestedCount = -1;
const Shared::Message& msg = *itr;
if (found) {
responseCache.emplace_front(msg);
} else {
if (msg.getId() == before) {
found = true;
responseCache.emplace_front(*itr);
}
}
if (responseCache.size() == count) {
break;
}
}
if (responseCache.size() == count) {
nextRequest();
break;
}
}
if (found) {
requestedBefore = responseCache.front().getId();
emit needEarlierHistory(requestedBefore, "", QDateTime(), QDateTime());
} else {
if (requestFromArchive(before)) {
nextRequest();
}
}
} }
emit needHistory("", archive->newestId(), QDateTime(), QDateTime());
break; break;
case end: { case end:
if (count != -1) {
bool found = requestFromArchive(before); bool found = requestFromArchive(before);
if (found) { if (found) {
int rSize = responseCache.size(); int rSize = responseCache.size();
if (rSize < count) { if (rSize < count) {
if (rSize != 0) { if (rSize != 0) {
requestedBefore = responseCache.front().getId(); emit needHistory(responseCache.front().getId(), "", QDateTime(), QDateTime());
emit needEarlierHistory(responseCache.front().getId(), "", QDateTime(), QDateTime());
} else { } else {
requestedBefore = before; emit needHistory(before, "", QDateTime(), QDateTime());
emit needEarlierHistory(before, "", QDateTime(), QDateTime());
} }
} else { } else {
nextRequest(); nextRequest();
} }
} else {
requestCache.emplace_back(requestedCount, before);
requestedCount = -1;
emit needHistory(archive->oldestId(), "", QDateTime(), QDateTime());
} }
} else {
emit needHistory(archive->oldestId(), "", QDateTime(), QDateTime());
} }
break; break;
case complete: case complete:
@ -212,8 +194,7 @@ void Core::Contact::performRequest(int count, const QString& before)
} }
} }
bool Core::Contact::requestFromArchive(const QString& before) bool Core::Contact::requestFromArchive(const QString& before) {
{
std::list<Shared::Message> arc; std::list<Shared::Message> arc;
QString lBefore; QString lBefore;
if (responseCache.size() > 0) { if (responseCache.size() > 0) {
@ -229,7 +210,7 @@ bool Core::Contact::requestFromArchive(const QString& before)
} catch (Archive::NotFound e) { } catch (Archive::NotFound e) {
requestCache.emplace_back(requestedCount, before); requestCache.emplace_back(requestedCount, before);
requestedCount = -1; requestedCount = -1;
requestEarlierToSync(); emit needHistory(archive->oldestId(), "", QDateTime(), QDateTime());
return false; return false;
} }
} else { } else {
@ -239,79 +220,107 @@ bool Core::Contact::requestFromArchive(const QString& before)
//may be even it's a signal that the history is now complete? //may be even it's a signal that the history is now complete?
return true; return true;
} catch (Archive::NotFound e) { } catch (Archive::NotFound e) {
requestEarlierToSync(); emit needHistory(archive->oldestId(), "", QDateTime(), QDateTime());
return false; return false;
} }
} }
} }
void Core::Contact::requestEarlierToSync()
{
switch (archiveState) {
case empty:
break;
case beginning: //need to reach complete
case chunk: //need to reach end
emit needEarlierHistory("", archive->newestId(), QDateTime(), QDateTime());
break;
case end: //need to reach complete
emit needEarlierHistory(archive->oldestId(), "", QDateTime(), QDateTime());
break;
case complete: //nothing to sync
break;
}
}
void Core::Contact::appendMessageToArchive(const Shared::Message& msg) void Core::Contact::appendMessageToArchive(const Shared::Message& msg)
{ {
if (msg.getId().size() > 0 && msg.getBody().size() > 0) { const QString& id = msg.getId();
switch (archiveState) { if (id.size() > 0) {
case empty: if (msg.getBody().size() > 0) {
if (archive->addElement(msg)) { switch (archiveState) {
archiveState = end; case empty:
}; if (archive->addElement(msg)) {
requestHistory(-1, msg.getId()); archiveState = end;
break; };
case beginning: if (!syncronizing) {
appendCache.emplace_back(msg); requestHistory(-1, id);
requestHistory(-1, msg.getId()); }
break; break;
case end: case beginning:
archive->addElement(msg); appendCache.emplace_back(msg);
break; if (!syncronizing) {
case chunk: requestHistory(-1, id);
appendCache.emplace_back(msg); }
requestHistory(-1, msg.getId()); break;
break; case end:
case complete: archive->addElement(msg);
archive->addElement(msg); break;
break; case chunk:
appendCache.emplace_back(msg);
if (!syncronizing) {
requestHistory(-1, id);
}
break;
case complete:
archive->addElement(msg);
break;
}
} else if (!syncronizing && archiveState == empty) {
requestHistory(-1, id);
} }
} }
} }
void Core::Contact::flushMessagesToArchive(bool finished, const QString& lastId) void Core::Contact::flushMessagesToArchive(bool finished, const QString& firstId, const QString& lastId)
{ {
unsigned int amount(0);
if (hisoryCache.size() > 0) { if (hisoryCache.size() > 0) {
amount = archive->addElements(hisoryCache); archive->addElements(hisoryCache);
hisoryCache.clear();
} }
if (requestedCount == -1) { switch (archiveState) {
if (amount >= requestedCount - responseCache.size()) { break;
if (requestFromArchive(requestedBefore)){ case beginning:
if (finished) {
archiveState = complete;
nextRequest(); nextRequest();
return; } else {
emit needHistory("", lastId, QDateTime(), QDateTime());
} }
} case chunk:
if (!finished) { if (finished) {
if (lastId.size() != 0) { archiveState = end;
nextRequest();
} else {
emit needHistory("", lastId, QDateTime(), QDateTime());
} }
} break;
case empty:
} else { archiveState = end;
case end:
if (finished) {
archiveState = complete;
archive->setFromTheBeginning(true);
}
if (requestedCount != -1) {
QString before;
if (responseCache.size() > 0) {
before = responseCache.front().getId();
} else {
before = requestedBefore;
}
if (!requestFromArchive(before)) {
qDebug("Something went terrible wrong flushing messages to the archive");
}
if (requestedCount < responseCache.size()) {
if (archiveState == complete) {
nextRequest();
} else {
emit needHistory(firstId, "", QDateTime(), QDateTime());
}
} else {
nextRequest();
}
} else {
nextRequest();
}
break;
case complete:
nextRequest();
break;
} }
} }

View File

@ -41,7 +41,7 @@ public:
}; };
Contact(const QString& pJid, const QString& account, QObject* parent = 0); Contact(const QString& pJid, const QString& account, QObject* parent = 0);
~Contact(); ~Contact();
ArchiveState getArchiveState() const; ArchiveState getArchiveState() const;
QString getName() const; QString getName() const;
void setName(const QString& n); void setName(const QString& n);
@ -52,17 +52,17 @@ public:
unsigned int groupsCount() const; unsigned int groupsCount() const;
void addMessageToArchive(const Shared::Message& msg); void addMessageToArchive(const Shared::Message& msg);
void appendMessageToArchive(const Shared::Message& msg); void appendMessageToArchive(const Shared::Message& msg);
void flushMessagesToArchive(bool finished, const QString& lastId); void flushMessagesToArchive(bool finished, const QString& firstId, const QString& lastId);
void requestHistory(int count, const QString& before); void requestHistory(int count, const QString& before);
signals: signals:
void groupAdded(const QString& name); void groupAdded(const QString& name);
void groupRemoved(const QString& name); void groupRemoved(const QString& name);
void nameChanged(const QString& name); void nameChanged(const QString& name);
void subscriptionStateChanged(Shared::SubscriptionState state); void subscriptionStateChanged(Shared::SubscriptionState state);
void historyResponse(const std::list<Shared::Message>& messages); void historyResponse(const std::list<Shared::Message>& messages);
void needEarlierHistory(const QString& before, const QString& after, const QDateTime& from, const QDateTime& to); void needHistory(const QString& before, const QString& after, const QDateTime& from, const QDateTime& to);
public: public:
const QString jid; const QString jid;
@ -72,7 +72,7 @@ private:
ArchiveState archiveState; ArchiveState archiveState;
Archive* archive; Archive* archive;
Shared::SubscriptionState subscriptionState; Shared::SubscriptionState subscriptionState;
bool syncronizing; bool syncronizing;
int requestedCount; int requestedCount;
QString requestedBefore; QString requestedBefore;
@ -80,12 +80,11 @@ private:
std::list<Shared::Message> appendCache; std::list<Shared::Message> appendCache;
std::list<Shared::Message> responseCache; std::list<Shared::Message> responseCache;
std::list<std::pair<int, QString>> requestCache; std::list<std::pair<int, QString>> requestCache;
private: private:
void nextRequest(); void nextRequest();
void performRequest(int count, const QString& before); void performRequest(int count, const QString& before);
bool requestFromArchive(const QString& before); bool requestFromArchive(const QString& before);
void requestEarlierToSync();
}; };
} }