Multithreading: fix snapshotter (#5095)

This commit is contained in:
Julien Maffre 2023-03-10 16:02:24 +00:00 коммит произвёл GitHub
Родитель 8777d1be33
Коммит c818c38c3b
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
6 изменённых файлов: 120 добавлений и 61 удалений

Просмотреть файл

@ -1,4 +1,4 @@
___ ___
(- -) (o o) | Y & +
(- -) (o o) | Y } +
( V ) z O z O +---'---'
/--x-m- /--m-m---xXx--/--yy------

Просмотреть файл

@ -1 +1 @@
This looks like a "job" for Threading Canary!!
This looks like a "job" for Threading Canary!!!!

Просмотреть файл

@ -407,14 +407,17 @@ namespace ccf
if (index < begin_index())
{
throw std::logic_error(fmt::format(
"Cannot produce proof for {}: index is too old and has been "
"flushed from memory",
index));
"Cannot produce proof for {}: index is older than first index {}, "
"and has been flushed from memory",
index,
begin_index()));
}
if (index > end_index())
{
throw std::logic_error(fmt::format(
"Cannot produce proof for {}: index is not yet known", index));
"Cannot produce proof for {}: index is later than last index {}",
index,
end_index()));
}
return Proof(tree, index);
}

Просмотреть файл

@ -2526,6 +2526,17 @@ namespace ccf
return kv::ConsensusHookPtr(nullptr);
}));
network.tables->set_map_hook(
network.snapshot_evidence.get_name(),
network.snapshot_evidence.wrap_map_hook(
[s = this->snapshotter](
kv::Version version, const SnapshotEvidence::Write& w) {
assert(w.has_value());
auto snapshot_evidence = w.value();
s->record_snapshot_evidence_idx(version, snapshot_evidence);
return kv::ConsensusHookPtr(nullptr);
}));
network.tables->set_global_hook(
network.config.get_name(),
network.config.wrap_commit_hook(

Просмотреть файл

@ -35,39 +35,27 @@ namespace ccf
struct SnapshotInfo
{
consensus::Index idx;
consensus::Index evidence_idx;
crypto::Sha256Hash write_set_digest;
std::string commit_evidence;
crypto::Sha256Hash snapshot_digest;
std::optional<consensus::Index> evidence_idx = std::nullopt;
std::optional<NodeId> node_id = std::nullopt;
std::optional<crypto::Pem> node_cert = std::nullopt;
std::optional<std::vector<uint8_t>> sig = std::nullopt;
std::optional<std::vector<uint8_t>> tree = std::nullopt;
SnapshotInfo(
consensus::Index idx,
consensus::Index evidence_idx,
const crypto::Sha256Hash& write_set_digest_,
const std::string& commit_evidence_,
const crypto::Sha256Hash& snapshot_digest_) :
idx(idx),
evidence_idx(evidence_idx),
write_set_digest(write_set_digest_),
commit_evidence(commit_evidence_),
snapshot_digest(snapshot_digest_)
{}
SnapshotInfo() = default;
};
// Queue of pending snapshots that have been generated, but are not yet
// committed
std::deque<SnapshotInfo> pending_snapshots;
std::map<consensus::Index, SnapshotInfo> pending_snapshots;
// Initial snapshot index
static constexpr consensus::Index initial_snapshot_idx = 0;
// Index at which the lastest snapshot was generated
// Index at which the latest snapshot was generated
consensus::Index last_snapshot_idx = 0;
// Used to suspend snapshot generation during public recovery
@ -81,7 +69,6 @@ namespace ccf
bool forced;
bool done;
};
std::deque<SnapshotEntry> next_snapshot_indices;
void record_snapshot(
@ -149,6 +136,13 @@ namespace ccf
commit_evidence = commit_evidence_;
};
// It is possible that the signature following the snapshot evidence is
// scheduled by another thread while the below snapshot evidence
// transaction is committed. To allow for such scenario, the evidence
// seqno is recorded via `record_snapshot_evidence_idx()` on a hook rather
// than here.
pending_snapshots[snapshot_version] = {};
auto rc =
tx.commit(cd, false, nullptr, capture_ws_digest_and_commit_evidence);
if (rc != kv::CommitResult::SUCCESS)
@ -160,26 +154,19 @@ namespace ccf
return;
}
pending_snapshots[snapshot_version].commit_evidence = commit_evidence;
pending_snapshots[snapshot_version].write_set_digest = ws_digest;
pending_snapshots[snapshot_version].snapshot_digest = cd.value();
auto evidence_version = tx.commit_version();
record_snapshot(snapshot_version, evidence_version, serialised_snapshot);
consensus::Index snapshot_idx =
static_cast<consensus::Index>(snapshot_version);
consensus::Index snapshot_evidence_idx =
static_cast<consensus::Index>(evidence_version);
pending_snapshots.emplace_back(
snapshot_idx,
snapshot_evidence_idx,
ws_digest,
commit_evidence,
cd.value());
LOG_DEBUG_FMT(
"Snapshot successfully generated for seqno {}, with evidence seqno "
"{}: "
"Snapshot successfully generated for seqno {}, with evidence seqno {}: "
"{}, ws digest: {}",
snapshot_idx,
snapshot_evidence_idx,
snapshot_version,
evidence_version,
cd.value(),
ws_digest);
}
@ -194,21 +181,25 @@ namespace ccf
for (auto it = pending_snapshots.begin(); it != pending_snapshots.end();)
{
if (idx > it->evidence_idx)
auto& snapshot_idx = it->first;
auto& snapshot_info = it->second;
if (
snapshot_info.evidence_idx.has_value() &&
idx > snapshot_info.evidence_idx.value())
{
auto serialised_receipt = build_and_serialise_receipt(
it->sig.value(),
it->tree.value(),
it->node_id.value(),
it->node_cert.value(),
it->evidence_idx,
it->write_set_digest,
it->commit_evidence,
std::move(it->snapshot_digest));
commit_snapshot(it->idx, serialised_receipt);
auto it_ = it;
++it;
pending_snapshots.erase(it_);
snapshot_info.sig.value(),
snapshot_info.tree.value(),
snapshot_info.node_id.value(),
snapshot_info.node_cert.value(),
snapshot_info.evidence_idx.value(),
snapshot_info.write_set_digest,
snapshot_info.commit_evidence,
std::move(snapshot_info.snapshot_digest));
commit_snapshot(snapshot_idx, serialised_receipt);
it = pending_snapshots.erase(it);
}
else
{
@ -312,12 +303,19 @@ namespace ccf
{
std::lock_guard<ccf::pal::Mutex> guard(lock);
for (auto& pending_snapshot : pending_snapshots)
for (auto& [snapshot_idx, pending_snapshot] : pending_snapshots)
{
if (
pending_snapshot.evidence_idx < idx &&
pending_snapshot.evidence_idx.has_value() &&
idx > pending_snapshot.evidence_idx.value() &&
!pending_snapshot.sig.has_value())
{
LOG_TRACE_FMT(
"Recording signature at {} for snapshot {} with evidence at {}",
idx,
snapshot_idx,
pending_snapshot.evidence_idx.value());
pending_snapshot.node_id = node_id;
pending_snapshot.node_cert = node_cert;
pending_snapshot.sig = sig;
@ -330,19 +328,42 @@ namespace ccf
{
std::lock_guard<ccf::pal::Mutex> guard(lock);
for (auto& pending_snapshot : pending_snapshots)
for (auto& [snapshot_idx, pending_snapshot] : pending_snapshots)
{
if (
pending_snapshot.evidence_idx < idx &&
pending_snapshot.evidence_idx.has_value() &&
idx > pending_snapshot.evidence_idx.value() &&
!pending_snapshot.tree.has_value())
{
LOG_TRACE_FMT(
"Recording serialised tree at {} for snapshot {} with evidence at "
"{}",
idx,
snapshot_idx,
pending_snapshot.evidence_idx.value());
pending_snapshot.tree = tree;
}
}
}
// Caller of this function needs to lock map related locks using
// kv::ScopedStoreMapsLock
void record_snapshot_evidence_idx(
consensus::Index idx, const SnapshotHash& snapshot)
{
std::lock_guard<ccf::pal::Mutex> guard(lock);
for (auto& [snapshot_idx, pending_snapshot] : pending_snapshots)
{
if (snapshot_idx == snapshot.version)
{
LOG_TRACE_FMT(
"Recording evidence idx at {} for snapshot {}", idx, snapshot_idx);
pending_snapshot.evidence_idx = idx;
}
}
}
void schedule_snapshot(consensus::Index idx)
{
auto msg = std::make_unique<threading::Tmsg<SnapshotMsg>>(&snapshot_cb);
@ -422,10 +443,17 @@ namespace ccf
"Rolled back snapshotter: last snapshottable idx is now {}",
next_snapshot_indices.front().idx);
while (!pending_snapshots.empty() &&
(pending_snapshots.back().evidence_idx > idx))
while (!pending_snapshots.empty())
{
pending_snapshots.pop_back();
const auto& last_snapshot = std::prev(pending_snapshots.end());
if (
last_snapshot->second.evidence_idx.has_value() &&
idx >= last_snapshot->second.evidence_idx.value())
{
break;
}
pending_snapshots.erase(last_snapshot);
}
}
};

Просмотреть файл

@ -117,8 +117,19 @@ bool record_signature(
return requires_snapshot;
}
void record_snapshot_evidence(
const std::shared_ptr<ccf::Snapshotter>& snapshotter,
size_t snapshot_idx,
size_t evidence_idx)
{
snapshotter->record_snapshot_evidence_idx(
evidence_idx, ccf::SnapshotHash{.version = snapshot_idx});
}
TEST_CASE("Regular snapshotting")
{
logger::config::default_init();
ccf::NetworkState network;
auto consensus = std::make_shared<kv::test::StubConsensus>();
@ -145,6 +156,7 @@ TEST_CASE("Regular snapshotting")
size_t commit_idx = 0;
size_t snapshot_idx = snapshot_tx_interval;
size_t snapshot_evidence_idx = snapshot_idx + 1;
INFO("Generate snapshot before interval has no effect");
{
@ -176,8 +188,9 @@ TEST_CASE("Regular snapshotting")
INFO("Commit first snapshot");
{
issue_transactions(network, 1);
record_snapshot_evidence(snapshotter, snapshot_idx, snapshot_evidence_idx);
// Signature after evidence is recorded
commit_idx = snapshot_tx_interval + 2;
commit_idx = snapshot_idx + 2;
REQUIRE_FALSE(record_signature(history, snapshotter, commit_idx));
snapshotter->commit(commit_idx, true);
REQUIRE(
@ -198,6 +211,7 @@ TEST_CASE("Regular snapshotting")
INFO("Generate second snapshot");
{
snapshot_idx = snapshot_tx_interval * 2;
snapshot_evidence_idx = snapshot_idx + 1;
REQUIRE(record_signature(history, snapshotter, snapshot_idx));
// Note: Commit exactly on snapshot idx
commit_idx = snapshot_idx;
@ -212,6 +226,7 @@ TEST_CASE("Regular snapshotting")
INFO("Commit second snapshot");
{
issue_transactions(network, 1);
record_snapshot_evidence(snapshotter, snapshot_idx, snapshot_evidence_idx);
// Signature after evidence is recorded
commit_idx = snapshot_tx_interval * 2 + 2;
REQUIRE_FALSE(record_signature(history, snapshotter, commit_idx));
@ -295,6 +310,7 @@ TEST_CASE("Rollback before snapshot is committed")
// Commit evidence
issue_transactions(network, 1);
commit_idx = snapshot_idx + 2;
record_snapshot_evidence(snapshotter, snapshot_idx, snapshot_idx + 1);
REQUIRE_FALSE(record_signature(history, snapshotter, commit_idx));
snapshotter->commit(commit_idx, true);
REQUIRE(
@ -323,6 +339,7 @@ TEST_CASE("Rollback before snapshot is committed")
// Commit evidence
issue_transactions(network, 1);
commit_idx = snapshot_idx + 2;
record_snapshot_evidence(snapshotter, snapshot_idx, snapshot_idx + 1);
REQUIRE_FALSE(record_signature(history, snapshotter, commit_idx));
snapshotter->commit(commit_idx, true);
REQUIRE(