зеркало из https://github.com/nextcloud/desktop.git
SyncEngine: Recover when the PUT reply (or chunkin's MOVE) is lost
This can happen if the upload of a file is finished, but we just got disconnected right before recieving the reply containing the etag. So nothing was save din the DB, and we are not sure if the server recieved the file properly or not. Further local update of the file will cause a conflict. In order to fix this, store the checksum of the uploading file in the uploadinfo table of the local db (even if there is no chunking involved). And when we have a conflict, check that it is not because of this situation by checking the entry in the uploadinfo table. Issue #5106
This commit is contained in:
Родитель
4369853ddb
Коммит
4dc49ff3b0
|
@ -389,6 +389,7 @@ bool SyncJournalDb::checkConnect()
|
|||
"errorcount INTEGER,"
|
||||
"size INTEGER(8),"
|
||||
"modtime INTEGER(8),"
|
||||
"contentChecksum TEXT,"
|
||||
"PRIMARY KEY(path)"
|
||||
");");
|
||||
|
||||
|
@ -611,15 +612,15 @@ bool SyncJournalDb::checkConnect()
|
|||
}
|
||||
|
||||
_getUploadInfoQuery.reset(new SqlQuery(_db));
|
||||
if (_getUploadInfoQuery->prepare("SELECT chunk, transferid, errorcount, size, modtime FROM "
|
||||
if (_getUploadInfoQuery->prepare("SELECT chunk, transferid, errorcount, size, modtime, contentChecksum FROM "
|
||||
"uploadinfo WHERE path=?1")) {
|
||||
return sqlFail("prepare _getUploadInfoQuery", *_getUploadInfoQuery);
|
||||
}
|
||||
|
||||
_setUploadInfoQuery.reset(new SqlQuery(_db));
|
||||
if (_setUploadInfoQuery->prepare("INSERT OR REPLACE INTO uploadinfo "
|
||||
"(path, chunk, transferid, errorcount, size, modtime) "
|
||||
"VALUES ( ?1 , ?2, ?3 , ?4 , ?5, ?6 )")) {
|
||||
"(path, chunk, transferid, errorcount, size, modtime, contentChecksum) "
|
||||
"VALUES ( ?1 , ?2, ?3 , ?4 , ?5, ?6 , ?7 )")) {
|
||||
return sqlFail("prepare _setUploadInfoQuery", *_setUploadInfoQuery);
|
||||
}
|
||||
|
||||
|
@ -849,6 +850,16 @@ bool SyncJournalDb::updateMetadataTableStructure()
|
|||
commitInternal("update database structure: add contentChecksumTypeId col");
|
||||
}
|
||||
|
||||
if (!tableColumns("uploadinfo").contains("contentChecksum")) {
|
||||
SqlQuery query(_db);
|
||||
query.prepare("ALTER TABLE uploadinfo ADD COLUMN contentChecksum TEXT;");
|
||||
if (!query.exec()) {
|
||||
sqlFail("updateMetadataTableStructure: add contentChecksum column", query);
|
||||
re = false;
|
||||
}
|
||||
commitInternal("update database structure: add contentChecksum col for uploadinfo");
|
||||
}
|
||||
|
||||
|
||||
return re;
|
||||
}
|
||||
|
@ -1472,6 +1483,7 @@ SyncJournalDb::UploadInfo SyncJournalDb::getUploadInfo(const QString &file)
|
|||
res._errorCount = _getUploadInfoQuery->intValue(2);
|
||||
res._size = _getUploadInfoQuery->int64Value(3);
|
||||
res._modtime = _getUploadInfoQuery->int64Value(4);
|
||||
res._contentChecksum = _getUploadInfoQuery->baValue(5);
|
||||
res._valid = ok;
|
||||
}
|
||||
}
|
||||
|
@ -1494,6 +1506,7 @@ void SyncJournalDb::setUploadInfo(const QString &file, const SyncJournalDb::Uplo
|
|||
_setUploadInfoQuery->bindValue(4, i._errorCount);
|
||||
_setUploadInfoQuery->bindValue(5, i._size);
|
||||
_setUploadInfoQuery->bindValue(6, i._modtime);
|
||||
_setUploadInfoQuery->bindValue(7, i._contentChecksum);
|
||||
|
||||
if (!_setUploadInfoQuery->exec()) {
|
||||
return;
|
||||
|
@ -2001,7 +2014,8 @@ bool operator==(const SyncJournalDb::UploadInfo &lhs,
|
|||
&& lhs._modtime == rhs._modtime
|
||||
&& lhs._valid == rhs._valid
|
||||
&& lhs._size == rhs._size
|
||||
&& lhs._transferid == rhs._transferid;
|
||||
&& lhs._transferid == rhs._transferid
|
||||
&& lhs._contentChecksum == rhs._contentChecksum;
|
||||
}
|
||||
|
||||
} // namespace OCC
|
||||
|
|
|
@ -112,6 +112,7 @@ public:
|
|||
qint64 _modtime;
|
||||
int _errorCount;
|
||||
bool _valid;
|
||||
QByteArray _contentChecksum;
|
||||
};
|
||||
|
||||
struct PollInfo
|
||||
|
|
|
@ -27,6 +27,7 @@
|
|||
#include "csync_rename.h"
|
||||
#include "common/c_jhash.h"
|
||||
#include "common/asserts.h"
|
||||
#include "common/syncjournalfilerecord.h"
|
||||
|
||||
#include <QLoggingCategory>
|
||||
Q_LOGGING_CATEGORY(lcReconcile, "sync.csync.reconciler", QtInfoMsg)
|
||||
|
@ -316,6 +317,35 @@ static int _csync_merge_algorithm_visitor(csync_file_stat_t *cur, CSYNC * ctx) {
|
|||
(ctx->current == REMOTE_REPLICA ? cur->checksumHeader : other->checksumHeader);
|
||||
if (!remoteChecksumHeader.isEmpty()) {
|
||||
is_conflict = true;
|
||||
|
||||
// Do we have an UploadInfo for this?
|
||||
// Maybe the Upload was completed, but the connection was broken just before
|
||||
// we recieved the etag (Issue #5106)
|
||||
auto up = ctx->statedb->getUploadInfo(cur->path);
|
||||
if (up._valid && up._contentChecksum == remoteChecksumHeader) {
|
||||
// Solve the conflict into an upload, or nothing
|
||||
auto remoteNode = ctx->current == REMOTE_REPLICA ? cur : other;
|
||||
auto localNode = ctx->current == REMOTE_REPLICA ? other : cur;
|
||||
remoteNode->instruction = CSYNC_INSTRUCTION_NONE;
|
||||
localNode->instruction = up._modtime == localNode->modtime ? CSYNC_INSTRUCTION_UPDATE_METADATA : CSYNC_INSTRUCTION_SYNC;
|
||||
|
||||
// Update the etag and other server metadata in the journal already
|
||||
// (We can't use a typical CSYNC_INSTRUCTION_UPDATE_METADATA because
|
||||
// we must not store the size/modtime from the file system)
|
||||
OCC::SyncJournalFileRecord rec;
|
||||
if (ctx->statedb->getFileRecord(remoteNode->path, &rec)) {
|
||||
rec._path = remoteNode->path;
|
||||
rec._etag = remoteNode->etag;
|
||||
rec._fileId = remoteNode->file_id;
|
||||
rec._modtime = remoteNode->modtime;
|
||||
rec._type = remoteNode->type;
|
||||
rec._fileSize = remoteNode->size;
|
||||
rec._remotePerm = remoteNode->remotePerm;
|
||||
rec._checksumHeader = remoteNode->checksumHeader;
|
||||
ctx->statedb->setFileRecordMetadata(rec);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// SO: If there is no checksum, we can have !is_conflict here
|
||||
|
|
|
@ -230,6 +230,7 @@ void PropagateUploadFileNG::startNewUpload()
|
|||
pi._valid = true;
|
||||
pi._transferid = _transferId;
|
||||
pi._modtime = _item->_modtime;
|
||||
pi._contentChecksum = _item->_checksumHeader;
|
||||
propagator()->_journal->setUploadInfo(_item->_file, pi);
|
||||
propagator()->_journal->commit("Upload info");
|
||||
QMap<QByteArray, QByteArray> headers;
|
||||
|
|
|
@ -43,10 +43,24 @@ void PropagateUploadFileV1::doStartUpload()
|
|||
|
||||
const SyncJournalDb::UploadInfo progressInfo = propagator()->_journal->getUploadInfo(_item->_file);
|
||||
|
||||
if (progressInfo._valid && progressInfo._modtime == _item->_modtime) {
|
||||
if (progressInfo._valid && progressInfo._modtime == _item->_modtime
|
||||
&& (progressInfo._contentChecksum == _item->_checksumHeader || progressInfo._contentChecksum.isEmpty() || _item->_checksumHeader.isEmpty())) {
|
||||
_startChunk = progressInfo._chunk;
|
||||
_transferId = progressInfo._transferid;
|
||||
qCInfo(lcPropagateUpload) << _item->_file << ": Resuming from chunk " << _startChunk;
|
||||
} else if (_chunkCount <= 1 && !_item->_checksumHeader.isEmpty()) {
|
||||
// If there is only one chunk, write the checksum in the database, so if the PUT is sent
|
||||
// to the server, but the connection drops before we get the etag, we can check the checksum
|
||||
// in reconcile (issue #5106)
|
||||
SyncJournalDb::UploadInfo pi;
|
||||
pi._valid = true;
|
||||
pi._chunk = 0;
|
||||
pi._transferid = _transferId;
|
||||
pi._modtime = _item->_modtime;
|
||||
pi._errorCount = 0;
|
||||
pi._contentChecksum = _item->_checksumHeader;
|
||||
propagator()->_journal->setUploadInfo(_item->_file, pi);
|
||||
propagator()->_journal->commit("Upload info");
|
||||
}
|
||||
|
||||
_currentChunk = 0;
|
||||
|
@ -274,6 +288,7 @@ void PropagateUploadFileV1::slotPutFinished()
|
|||
pi._transferid = _transferId;
|
||||
pi._modtime = _item->_modtime;
|
||||
pi._errorCount = 0; // successful chunk upload resets
|
||||
pi._contentChecksum = _item->_checksumHeader;
|
||||
propagator()->_journal->setUploadInfo(_item->_file, pi);
|
||||
propagator()->_journal->commit("Upload info");
|
||||
startNextChunk();
|
||||
|
|
|
@ -452,7 +452,11 @@ public:
|
|||
emit finished();
|
||||
}
|
||||
|
||||
void abort() override { }
|
||||
void abort() override
|
||||
{
|
||||
setError(OperationCanceledError, "abort");
|
||||
emit finished();
|
||||
}
|
||||
qint64 readData(char *, qint64) override { return 0; }
|
||||
};
|
||||
|
||||
|
@ -696,7 +700,12 @@ public:
|
|||
emit finished();
|
||||
}
|
||||
|
||||
void abort() override { }
|
||||
void abort() override
|
||||
{
|
||||
setError(OperationCanceledError, "abort");
|
||||
emit finished();
|
||||
}
|
||||
|
||||
qint64 readData(char *, qint64) override { return 0; }
|
||||
};
|
||||
|
||||
|
|
|
@ -161,17 +161,6 @@ private slots:
|
|||
QVERIFY(!moveChecksumHeader.isEmpty());
|
||||
fakeFolder.remoteModifier().find("A/a0")->checksums = moveChecksumHeader;
|
||||
|
||||
// This time it's a real conflict, we have a remote checksum!
|
||||
connection = connect(&fakeFolder.syncEngine(), &SyncEngine::aboutToPropagate, [&](SyncFileItemVector &items) {
|
||||
SyncFileItemPtr a0;
|
||||
for (auto &item : items) {
|
||||
if (item->_file == "A/a0")
|
||||
a0 = item;
|
||||
}
|
||||
|
||||
QVERIFY(a0);
|
||||
QCOMPARE(a0->_instruction, CSYNC_INSTRUCTION_CONFLICT);
|
||||
});
|
||||
QVERIFY(fakeFolder.syncOnce());
|
||||
disconnect(connection);
|
||||
QCOMPARE(nGET, 0); // no new download, just a metadata update!
|
||||
|
@ -378,6 +367,65 @@ private slots:
|
|||
QVERIFY(fakeFolder.uploadState().children.first().name != chunkingId);
|
||||
}
|
||||
|
||||
// Check what happens when the connection is dropped on the PUT (non-chunking) or MOVE (chunking)
|
||||
// for on the issue #5106
|
||||
void connectionDroppedBeforeEtagRecieved_data()
|
||||
{
|
||||
QTest::addColumn<bool>("chunking");
|
||||
QTest::newRow("big file") << true;
|
||||
QTest::newRow("small file") << false;
|
||||
}
|
||||
void connectionDroppedBeforeEtagRecieved()
|
||||
{
|
||||
QFETCH(bool, chunking);
|
||||
FakeFolder fakeFolder{ FileInfo::A12_B12_C12_S12() };
|
||||
fakeFolder.syncEngine().account()->setCapabilities({ { "dav", QVariantMap{ { "chunking", "1.0" } } }, { "checksums", QVariantMap{ { "supportedTypes", QStringList() << "SHA1" } } } });
|
||||
const int size = chunking ? 150 * 1000 * 1000 : 300;
|
||||
|
||||
// Make the MOVE never reply, but trigger a client-abort and apply the change remotely
|
||||
QByteArray checksumHeader;
|
||||
int nGET = 0;
|
||||
QScopedValueRollback<int> setHttpTimeout(AbstractNetworkJob::httpTimeout, 1);
|
||||
int responseDelay = AbstractNetworkJob::httpTimeout * 1000 * 1000; // much bigger than http timeout (so a timeout will occur)
|
||||
// This will perform the operation on the server, but the reply will not come to the client
|
||||
fakeFolder.setServerOverride([&](QNetworkAccessManager::Operation op, const QNetworkRequest &request, QIODevice *outgoingData) -> QNetworkReply * {
|
||||
if (!chunking && op == QNetworkAccessManager::PutOperation) {
|
||||
checksumHeader = request.rawHeader("OC-Checksum");
|
||||
return new DelayedReply<FakePutReply>(responseDelay, fakeFolder.remoteModifier(), op, request, outgoingData->readAll(), &fakeFolder.syncEngine());
|
||||
} else if (chunking && request.attribute(QNetworkRequest::CustomVerbAttribute) == "MOVE") {
|
||||
checksumHeader = request.rawHeader("OC-Checksum");
|
||||
return new DelayedReply<FakeChunkMoveReply>(responseDelay, fakeFolder.uploadState(), fakeFolder.remoteModifier(), op, request, &fakeFolder.syncEngine());
|
||||
} else if (op == QNetworkAccessManager::GetOperation) {
|
||||
nGET++;
|
||||
}
|
||||
return nullptr;
|
||||
});
|
||||
|
||||
|
||||
// Test 1: a NEW file
|
||||
fakeFolder.localModifier().insert("A/a0", size);
|
||||
QVERIFY(!fakeFolder.syncOnce()); // timeout!
|
||||
QCOMPARE(fakeFolder.currentLocalState(), fakeFolder.currentRemoteState()); // but the upload succeeded
|
||||
QVERIFY(!checksumHeader.isEmpty());
|
||||
fakeFolder.remoteModifier().find("A/a0")->checksums = checksumHeader; // The test system don't do that automatically
|
||||
// Should be resolved properly
|
||||
QVERIFY(fakeFolder.syncOnce());
|
||||
QCOMPARE(nGET, 0);
|
||||
QCOMPARE(fakeFolder.currentLocalState(), fakeFolder.currentRemoteState());
|
||||
|
||||
// Test 2: Modify the file further
|
||||
fakeFolder.localModifier().appendByte("A/a0");
|
||||
QVERIFY(!fakeFolder.syncOnce()); // timeout!
|
||||
QCOMPARE(fakeFolder.currentLocalState(), fakeFolder.currentRemoteState()); // but the upload succeeded
|
||||
fakeFolder.remoteModifier().find("A/a0")->checksums = checksumHeader;
|
||||
// modify again, should not cause conflict
|
||||
fakeFolder.localModifier().appendByte("A/a0");
|
||||
QVERIFY(!fakeFolder.syncOnce()); // now it's trying to upload the modified file
|
||||
QCOMPARE(fakeFolder.currentLocalState(), fakeFolder.currentRemoteState());
|
||||
fakeFolder.remoteModifier().find("A/a0")->checksums = checksumHeader;
|
||||
QVERIFY(fakeFolder.syncOnce());
|
||||
QCOMPARE(nGET, 0);
|
||||
}
|
||||
};
|
||||
|
||||
QTEST_GUILESS_MAIN(TestChunkingNG)
|
||||
|
|
Загрузка…
Ссылка в новой задаче