зеркало из https://github.com/microsoft/CCF.git
Join shouldn't be allowed from stale snapshots (#2422)
This commit is contained in:
Родитель
34ee8ab286
Коммит
f16909a046
|
@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
|
|||
|
||||
### Added
|
||||
|
||||
- `/node/state` endpoint also returns the `seqno` at which a node was started (i.e. `seqno` of the snapshot a node started from or `0` otherwise) (#2422).
|
||||
- The service certificate is now returned as part of the `/node/network/` endpoint response.
|
||||
|
||||
### Removed
|
||||
|
|
|
@ -203,6 +203,9 @@
|
|||
"recovery_target_seqno": {
|
||||
"$ref": "#/components/schemas/uint64"
|
||||
},
|
||||
"startup_seqno": {
|
||||
"$ref": "#/components/schemas/uint64"
|
||||
},
|
||||
"state": {
|
||||
"$ref": "#/components/schemas/ccf__State"
|
||||
}
|
||||
|
@ -210,7 +213,8 @@
|
|||
"required": [
|
||||
"node_id",
|
||||
"state",
|
||||
"last_signed_seqno"
|
||||
"last_signed_seqno",
|
||||
"startup_seqno"
|
||||
],
|
||||
"type": "object"
|
||||
},
|
||||
|
|
|
@ -773,9 +773,10 @@ int main(int argc, char** argv)
|
|||
snapshot));
|
||||
}
|
||||
|
||||
ccf_config.startup_snapshot = files::slurp(snapshot);
|
||||
ccf_config.startup_snapshot = snapshots.read_snapshot(snapshot);
|
||||
ccf_config.startup_snapshot_evidence_seqno =
|
||||
snapshot_evidence_idx->first;
|
||||
|
||||
LOG_INFO_FMT(
|
||||
"Found latest snapshot file: {} (size: {}, evidence seqno: {})",
|
||||
snapshot,
|
||||
|
|
|
@ -3,6 +3,7 @@
|
|||
#pragma once
|
||||
|
||||
#include "consensus/ledger_enclave_types.h"
|
||||
#include "ds/files.h"
|
||||
#include "host/ledger.h"
|
||||
|
||||
#include <charconv>
|
||||
|
@ -123,6 +124,11 @@ namespace asynchost
|
|||
}
|
||||
}
|
||||
|
||||
std::vector<uint8_t> read_snapshot(const std::string& file_name)
|
||||
{
|
||||
return files::slurp(fs::path(snapshot_dir) / fs::path(file_name));
|
||||
}
|
||||
|
||||
void write_snapshot(
|
||||
consensus::Index idx,
|
||||
consensus::Index evidence_idx,
|
||||
|
@ -235,7 +241,7 @@ namespace asynchost
|
|||
size_t snapshot_idx = std::stol(file_name.substr(pos + 1));
|
||||
if (snapshot_idx > latest_idx)
|
||||
{
|
||||
snapshot_file = f.path().string();
|
||||
snapshot_file = file_name;
|
||||
latest_idx = snapshot_idx;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1061,7 +1061,11 @@ TEST_CASE("Find latest snapshot with corresponding ledger chunk")
|
|||
snapshot_idx, snapshot_evidence_idx, snapshot_evidence_commit_idx);
|
||||
|
||||
REQUIRE(
|
||||
snapshots.find_latest_committed_snapshot().value() == snapshot_file_name);
|
||||
fmt::format(
|
||||
"{}/{}",
|
||||
snapshot_dir,
|
||||
snapshots.find_latest_committed_snapshot().value()) ==
|
||||
snapshot_file_name);
|
||||
|
||||
fs::remove(snapshot_file_name);
|
||||
}
|
||||
|
@ -1091,7 +1095,10 @@ TEST_CASE("Find latest snapshot with corresponding ledger chunk")
|
|||
|
||||
// Snapshot is now valid
|
||||
REQUIRE(
|
||||
snapshots.find_latest_committed_snapshot().value() ==
|
||||
fmt::format(
|
||||
"{}/{}",
|
||||
snapshot_dir,
|
||||
snapshots.find_latest_committed_snapshot().value()) ==
|
||||
get_snapshot_file_name(
|
||||
snapshot_idx, snapshot_evidence_idx, snapshot_evidence_commit_idx));
|
||||
}
|
||||
|
|
|
@ -212,6 +212,9 @@ namespace ccf
|
|||
}
|
||||
};
|
||||
std::unique_ptr<StartupSnapshotInfo> startup_snapshot_info = nullptr;
|
||||
// Set to the snapshot seqno when a node starts from one and remembered for
|
||||
// the lifetime of the node
|
||||
std::optional<kv::Version> startup_seqno = std::nullopt;
|
||||
|
||||
std::shared_ptr<kv::AbstractTxEncryptor> make_encryptor()
|
||||
{
|
||||
|
@ -263,6 +266,8 @@ namespace ccf
|
|||
"Public snapshot deserialised at seqno {}",
|
||||
snapshot_store->current_version());
|
||||
|
||||
startup_seqno = snapshot_store->current_version();
|
||||
|
||||
ledger_idx = snapshot_store->current_version();
|
||||
last_recovered_signed_idx = ledger_idx;
|
||||
|
||||
|
@ -654,6 +659,7 @@ namespace ccf
|
|||
node_encrypt_kp->public_key_pem().raw();
|
||||
join_params.quote_info = quote_info;
|
||||
join_params.consensus_type = network.consensus_type;
|
||||
join_params.startup_seqno = startup_seqno;
|
||||
|
||||
LOG_DEBUG_FMT(
|
||||
"Sending join request to {}:{}",
|
||||
|
@ -745,12 +751,12 @@ namespace ccf
|
|||
!sm.check(State::verifyingSnapshot))
|
||||
{
|
||||
throw std::logic_error(fmt::format(
|
||||
"Node should be in state {} or {} to recover public ledger entry",
|
||||
"Node should be in state {} or {} to start reading ledger",
|
||||
State::readingPublicLedger,
|
||||
State::verifyingSnapshot));
|
||||
}
|
||||
|
||||
LOG_INFO_FMT("Starting public recovery");
|
||||
LOG_INFO_FMT("Starting to read public ledger");
|
||||
read_ledger_idx(++ledger_idx);
|
||||
}
|
||||
|
||||
|
@ -857,6 +863,7 @@ namespace ccf
|
|||
|
||||
if (ledger_idx == startup_snapshot_info->evidence_seqno)
|
||||
{
|
||||
LOG_FAIL_FMT("here");
|
||||
auto evidence = snapshot_evidence->get(0);
|
||||
if (!evidence.has_value())
|
||||
{
|
||||
|
@ -896,10 +903,12 @@ namespace ccf
|
|||
if (!startup_snapshot_info->is_snapshot_verified())
|
||||
{
|
||||
// Node should shutdown if the startup snapshot cannot be verified
|
||||
throw std::logic_error(fmt::format(
|
||||
"Snapshot evidence at {} was not committed in ledger ending at {}",
|
||||
LOG_FAIL_FMT(
|
||||
"Snapshot evidence at {} was not committed in ledger ending at {}. "
|
||||
"Node should be shutdown by operator.",
|
||||
startup_snapshot_info->evidence_seqno,
|
||||
ledger_idx));
|
||||
ledger_idx);
|
||||
return;
|
||||
}
|
||||
|
||||
ledger_truncate(startup_snapshot_info->seqno);
|
||||
|
@ -1482,6 +1491,12 @@ namespace ccf
|
|||
return self;
|
||||
}
|
||||
|
||||
std::optional<kv::Version> get_startup_snapshot_seqno() override
|
||||
{
|
||||
std::lock_guard<SpinLock> guard(lock);
|
||||
return startup_seqno;
|
||||
}
|
||||
|
||||
private:
|
||||
crypto::SubjectAltName get_subject_alt_name()
|
||||
{
|
||||
|
|
|
@ -74,6 +74,7 @@ namespace ccf
|
|||
ERROR(InvalidQuote)
|
||||
ERROR(InvalidNodeState)
|
||||
ERROR(NodeAlreadyExists)
|
||||
ERROR(StartupSnapshotIsOld)
|
||||
|
||||
#undef ERROR
|
||||
}
|
||||
|
|
|
@ -35,6 +35,7 @@ namespace ccf
|
|||
ccf::NodeId node_id;
|
||||
ccf::State state;
|
||||
kv::Version last_signed_seqno;
|
||||
kv::Version startup_seqno;
|
||||
|
||||
// Only on recovery
|
||||
std::optional<kv::Version> recovery_target_seqno;
|
||||
|
@ -68,6 +69,7 @@ namespace ccf
|
|||
QuoteInfo quote_info;
|
||||
crypto::Pem public_encryption_key;
|
||||
ConsensusType consensus_type = ConsensusType::CFT;
|
||||
std::optional<kv::Version> startup_seqno = std::nullopt;
|
||||
};
|
||||
|
||||
struct Out
|
||||
|
|
|
@ -222,6 +222,24 @@ namespace ccf
|
|||
this->network.consensus_type));
|
||||
}
|
||||
|
||||
// If the joiner and this node both started from a snapshot, make sure
|
||||
// that the joiner's snapshot is more recent than this node's snapshot
|
||||
auto this_startup_seqno =
|
||||
this->context.get_node_state().get_startup_snapshot_seqno();
|
||||
if (
|
||||
this_startup_seqno.has_value() && in.startup_seqno.has_value() &&
|
||||
this_startup_seqno.value() > in.startup_seqno.value())
|
||||
{
|
||||
return make_error(
|
||||
HTTP_STATUS_BAD_REQUEST,
|
||||
ccf::errors::StartupSnapshotIsOld,
|
||||
fmt::format(
|
||||
"Node requested to join from snapshot at seqno {} which is older "
|
||||
"than this node startup seqno {}",
|
||||
in.startup_seqno.value(),
|
||||
this_startup_seqno.value()));
|
||||
}
|
||||
|
||||
auto nodes = args.tx.rw(this->network.nodes);
|
||||
auto service = args.tx.rw(this->network.service);
|
||||
|
||||
|
@ -325,6 +343,9 @@ namespace ccf
|
|||
result.state = s;
|
||||
result.recovery_target_seqno = rts;
|
||||
result.last_recovered_seqno = lrs;
|
||||
result.startup_seqno =
|
||||
this->context.get_node_state().get_startup_snapshot_seqno().value_or(
|
||||
0);
|
||||
|
||||
auto signatures = args.tx.template ro<Signatures>(Tables::SIGNATURES);
|
||||
auto sig = signatures->get(0);
|
||||
|
|
|
@ -42,5 +42,6 @@ namespace ccf
|
|||
kv::ReadOnlyTx& tx,
|
||||
const QuoteInfo& quote_info,
|
||||
const std::vector<uint8_t>& expected_node_public_key_der) = 0;
|
||||
virtual std::optional<kv::Version> get_startup_snapshot_seqno() = 0;
|
||||
};
|
||||
}
|
|
@ -20,7 +20,8 @@ namespace ccf
|
|||
{ccf::State::readingPrivateLedger, "ReadingPrivateLedger"},
|
||||
{ccf::State::verifyingSnapshot, "VerifyingSnapshot"}})
|
||||
DECLARE_JSON_TYPE_WITH_OPTIONAL_FIELDS(GetState::Out)
|
||||
DECLARE_JSON_REQUIRED_FIELDS(GetState::Out, node_id, state, last_signed_seqno)
|
||||
DECLARE_JSON_REQUIRED_FIELDS(
|
||||
GetState::Out, node_id, state, last_signed_seqno, startup_seqno)
|
||||
DECLARE_JSON_OPTIONAL_FIELDS(
|
||||
GetState::Out, recovery_target_seqno, last_recovered_seqno)
|
||||
|
||||
|
@ -30,7 +31,8 @@ namespace ccf
|
|||
node_info_network,
|
||||
quote_info,
|
||||
public_encryption_key,
|
||||
consensus_type)
|
||||
consensus_type,
|
||||
startup_seqno)
|
||||
|
||||
DECLARE_JSON_TYPE(NetworkIdentity)
|
||||
DECLARE_JSON_REQUIRED_FIELDS(NetworkIdentity, cert, priv_key)
|
||||
|
|
|
@ -94,6 +94,11 @@ namespace ccf
|
|||
{
|
||||
return QuoteVerificationResult::Verified;
|
||||
}
|
||||
|
||||
std::optional<kv::Version> get_startup_snapshot_seqno() override
|
||||
{
|
||||
return std::nullopt;
|
||||
}
|
||||
};
|
||||
|
||||
class StubNodeStateCache : public historical::AbstractStateCache
|
||||
|
|
|
@ -55,6 +55,10 @@ class CodeIdNotFound(Exception):
|
|||
pass
|
||||
|
||||
|
||||
class StartupSnapshotIsOld(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class NodeShutdownError(Exception):
|
||||
pass
|
||||
|
||||
|
@ -176,7 +180,7 @@ class Network:
|
|||
target_node=None,
|
||||
recovery=False,
|
||||
ledger_dir=None,
|
||||
copy_ledger_read_only=False,
|
||||
copy_ledger_read_only=True,
|
||||
read_only_ledger_dir=None,
|
||||
from_snapshot=True,
|
||||
snapshot_dir=None,
|
||||
|
@ -591,8 +595,6 @@ class Network:
|
|||
),
|
||||
)
|
||||
except TimeoutError as e:
|
||||
# The node can be safely discarded since it has not been
|
||||
# attributed a unique node_id by CCF
|
||||
LOG.error(f"New pending node {new_node.node_id} failed to join the network")
|
||||
errors, _ = new_node.stop()
|
||||
self.nodes.remove(new_node)
|
||||
|
@ -601,6 +603,8 @@ class Network:
|
|||
for error in errors:
|
||||
if "Quote does not contain known enclave measurement" in error:
|
||||
raise CodeIdNotFound from e
|
||||
if "StartupSnapshotIsOld" in error:
|
||||
raise StartupSnapshotIsOld from e
|
||||
raise
|
||||
|
||||
return new_node
|
||||
|
|
|
@ -7,6 +7,9 @@ import infra.logging_app as app
|
|||
from ccf.tx_id import TxID
|
||||
import suite.test_requirements as reqs
|
||||
import time
|
||||
import tempfile
|
||||
from shutil import copy
|
||||
import os
|
||||
|
||||
from loguru import logger as LOG
|
||||
|
||||
|
@ -59,6 +62,9 @@ def test_add_node(network, args):
|
|||
with new_node.client() as c:
|
||||
s = c.get("/node/state")
|
||||
assert s.body.json()["node_id"] == new_node.node_id
|
||||
assert (
|
||||
s.body.json()["startup_seqno"] == 0
|
||||
), "Node started without snapshot but reports startup seqno != 0"
|
||||
assert new_node
|
||||
return network
|
||||
|
||||
|
@ -78,10 +84,19 @@ def test_add_node_from_backup(network, args):
|
|||
|
||||
@reqs.description("Adding a valid node from snapshot")
|
||||
@reqs.at_least_n_nodes(2)
|
||||
@reqs.add_from_snapshot()
|
||||
def test_add_node_from_snapshot(
|
||||
network, args, copy_ledger_read_only=True, from_backup=False
|
||||
):
|
||||
# Before adding the node from a snapshot, override at least one app entry
|
||||
# and wait for a new committed snapshot covering that entry, so that there
|
||||
# is at least one historical entry to verify.
|
||||
network.txs.issue(network, number_txs=1)
|
||||
for _ in range(1, args.snapshot_tx_interval):
|
||||
network.txs.issue(network, number_txs=1, repeat=True)
|
||||
last_tx = network.txs.get_last_tx(priv=True)
|
||||
if network.wait_for_snapshot_committed_for(seqno=last_tx[1]["seqno"]):
|
||||
break
|
||||
|
||||
target_node = None
|
||||
snapshot_dir = None
|
||||
if from_backup:
|
||||
|
@ -99,6 +114,17 @@ def test_add_node_from_snapshot(
|
|||
snapshot_dir=snapshot_dir,
|
||||
)
|
||||
assert new_node
|
||||
|
||||
if copy_ledger_read_only:
|
||||
with new_node.client() as c:
|
||||
r = c.get("/node/state")
|
||||
assert (
|
||||
r.body.json()["startup_seqno"] != 0
|
||||
), "Node started from snapshot but reports startup seqno of 0"
|
||||
|
||||
# Finally, verify all app entries on the new node, including historical ones
|
||||
network.txs.verify(node=new_node)
|
||||
|
||||
return network
|
||||
|
||||
|
||||
|
@ -215,10 +241,70 @@ def run(args):
|
|||
test_node_filter(network, args)
|
||||
|
||||
|
||||
def run_join_old_snapshot(args):
|
||||
txs = app.LoggingTxs()
|
||||
nodes = ["local://localhost"]
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmp_dir:
|
||||
|
||||
with infra.network.network(
|
||||
nodes,
|
||||
args.binary_dir,
|
||||
args.debug_nodes,
|
||||
args.perf_nodes,
|
||||
pdb=args.pdb,
|
||||
txs=txs,
|
||||
) as network:
|
||||
network.start_and_join(args)
|
||||
primary, _ = network.find_primary()
|
||||
|
||||
# First, retrieve and save one committed snapshot
|
||||
txs.issue(network, number_txs=args.snapshot_tx_interval)
|
||||
old_committed_snapshots = network.get_committed_snapshots(primary)
|
||||
copy(
|
||||
os.path.join(
|
||||
old_committed_snapshots, os.listdir(old_committed_snapshots)[0]
|
||||
),
|
||||
tmp_dir,
|
||||
)
|
||||
|
||||
# Then generate another newer snapshot, and add two more nodes from it
|
||||
txs.issue(network, number_txs=args.snapshot_tx_interval)
|
||||
|
||||
for _ in range(0, 2):
|
||||
network.create_and_trust_node(
|
||||
args.package,
|
||||
"local://localhost",
|
||||
args,
|
||||
from_snapshot=True,
|
||||
)
|
||||
|
||||
# Kill primary and wait for a new one: new primary is
|
||||
# guaranteed to have started from the new snapshot
|
||||
primary.stop()
|
||||
network.wait_for_new_primary(primary.node_id)
|
||||
|
||||
# Start new node from the old snapshot
|
||||
try:
|
||||
network.create_and_trust_node(
|
||||
args.package,
|
||||
"local://localhost",
|
||||
args,
|
||||
from_snapshot=True,
|
||||
snapshot_dir=tmp_dir,
|
||||
timeout=3,
|
||||
)
|
||||
assert False, "Node should not be able to join from old snapshot"
|
||||
except infra.network.StartupSnapshotIsOld:
|
||||
pass
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
args = infra.e2e_args.cli_args()
|
||||
args.package = "liblogging"
|
||||
args.nodes = infra.e2e_args.max_nodes(args, f=0)
|
||||
args.initial_user_count = 1
|
||||
|
||||
run(args)
|
||||
run_join_old_snapshot(args)
|
||||
|
|
|
@ -1,9 +1,9 @@
|
|||
# Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
# Licensed under the Apache 2.0 License.
|
||||
|
||||
import infra.network
|
||||
import functools
|
||||
|
||||
import infra.network
|
||||
from loguru import logger as LOG
|
||||
from math import ceil
|
||||
|
||||
|
@ -154,36 +154,3 @@ def recover(number_txs=5):
|
|||
return wrapper
|
||||
|
||||
return decorator
|
||||
|
||||
|
||||
def add_from_snapshot():
|
||||
# Before adding the node from a snapshot, override at least one app entry
|
||||
# and wait for a snapshot covering that entry. After the test, verify
|
||||
# that all entries (including historical ones) can be read.
|
||||
def issue_historical_queries_with_snapshot(network, snapshot_tx_interval):
|
||||
network.txs.issue(network, number_txs=1)
|
||||
for _ in range(1, snapshot_tx_interval):
|
||||
network.txs.issue(network, number_txs=1, repeat=True)
|
||||
last_tx = network.txs.get_last_tx(priv=True)
|
||||
if network.wait_for_snapshot_committed_for(seqno=last_tx[1]["seqno"]):
|
||||
break
|
||||
|
||||
def decorator(func):
|
||||
@functools.wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
network = args[0]
|
||||
infra.e2e_args = vars(args[1])
|
||||
snapshot_tx_interval = infra.e2e_args.get("snapshot_tx_interval")
|
||||
if snapshot_tx_interval is not None:
|
||||
issue_historical_queries_with_snapshot(
|
||||
network, int(snapshot_tx_interval)
|
||||
)
|
||||
network = func(*args, **kwargs)
|
||||
# Only verify entries on node just added
|
||||
network.txs.verify(node=network.get_joined_nodes()[-1])
|
||||
|
||||
return network
|
||||
|
||||
return wrapper
|
||||
|
||||
return decorator
|
||||
|
|
Загрузка…
Ссылка в новой задаче