Fix ledger chunks discrepancy between primary and backups (#2110)

This commit is contained in:
Julien Maffre 2021-01-26 17:05:07 +00:00 коммит произвёл GitHub
Родитель 32cc0ad938
Коммит 2d32dd1365
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
13 изменённых файлов: 211 добавлений и 109 удалений

Просмотреть файл

@ -1 +1 @@
Time is a flat circle.
Time is a flat circle..

Просмотреть файл

@ -468,7 +468,7 @@ namespace aft
// Only if globally committable, a snapshot requires a new ledger
// chunk to be created
force_ledger_chunk = snapshotter->requires_snapshot(index);
force_ledger_chunk = snapshotter->record_committable(index);
}
state->last_idx = index;
@ -1132,7 +1132,7 @@ namespace aft
bool force_ledger_chunk = false;
if (globally_committable)
{
force_ledger_chunk = snapshotter->requires_snapshot(i);
force_ledger_chunk = snapshotter->record_committable(i);
}
ledger->put_entry(entry, globally_committable, force_ledger_chunk);
@ -2054,10 +2054,10 @@ namespace aft
LOG_DEBUG_FMT("Compacting...");
snapshotter->commit(idx);
if (replica_state == Leader && consensus_type == ConsensusType::CFT)
if (consensus_type == ConsensusType::CFT)
{
// Snapshots are not yet supported with BFT
snapshotter->snapshot(idx);
snapshotter->update(idx, replica_state == Leader);
}
store->compact(idx);
ledger->commit(idx);

Просмотреть файл

@ -251,13 +251,13 @@ namespace aft
class StubSnapshotter
{
public:
void snapshot(Index)
void update(Index, bool)
{
// For now, do not test snapshots in unit tests
return;
}
bool requires_snapshot(Index)
bool record_committable(Index)
{
// For now, do not test snapshots in unit tests
return false;

Просмотреть файл

@ -163,7 +163,6 @@ namespace ccf
consensus::Index last_recovered_signed_idx = 1;
std::list<RecoveredLedgerSecret> recovery_ledger_secrets;
consensus::Index ledger_idx = 0;
size_t recovery_snapshot_tx_interval = Snapshotter::max_tx_interval;
struct StartupSnapshotInfo
{
@ -243,8 +242,7 @@ namespace ccf
to_host(writer_factory.create_writer_to_outside()),
network(network),
rpcsessions(rpcsessions),
share_manager(share_manager),
snapshotter(std::make_shared<Snapshotter>(writer_factory, network))
share_manager(share_manager)
{}
//
@ -308,13 +306,12 @@ namespace ccf
network.ledger_secrets = std::make_shared<LedgerSecrets>(self);
network.ledger_secrets->init();
setup_snapshotter(config.snapshot_tx_interval);
setup_encryptor();
setup_consensus();
setup_progress_tracker();
setup_history();
snapshotter->set_tx_interval(config.snapshot_tx_interval);
// Become the primary and force replication
consensus->force_become_primary();
@ -351,6 +348,7 @@ namespace ccf
// deserialise the public domain when recovering the public ledger
network.ledger_secrets = std::make_shared<LedgerSecrets>();
setup_encryptor();
setup_snapshotter(config.snapshot_tx_interval);
initialise_startup_snapshot(config);
@ -380,10 +378,7 @@ namespace ccf
// secrets.
setup_encryptor();
// Snapshot generation is disabled until private recovery is
// complete
recovery_snapshot_tx_interval = config.snapshot_tx_interval;
setup_snapshotter(config.snapshot_tx_interval);
bool from_snapshot = !config.startup_snapshot.empty();
setup_recovery_hook(from_snapshot);
@ -479,6 +474,7 @@ namespace ccf
resp.network_info.consensus_type));
}
setup_snapshotter(config.snapshot_tx_interval);
setup_encryptor();
setup_consensus(resp.network_info.public_only);
setup_progress_tracker();
@ -528,10 +524,6 @@ namespace ccf
// recovery store
startup_snapshot_info.reset();
}
else
{
recovery_snapshot_tx_interval = config.snapshot_tx_interval;
}
LOG_INFO_FMT(
"Joiner successfully resumed from snapshot at seqno {} and "
@ -549,11 +541,12 @@ namespace ccf
last_recovered_signed_idx =
resp.network_info.last_recovered_signed_idx;
setup_recovery_hook(startup_snapshot_info != nullptr);
snapshotter->set_snapshot_generation(false);
sm.advance(State::partOfPublicNetwork);
}
else
{
snapshotter->set_tx_interval(config.snapshot_tx_interval);
reset_data(quote);
sm.advance(State::partOfNetwork);
}
@ -729,9 +722,9 @@ namespace ccf
hook->call(consensus.get());
}
// If the ledger entry is a signature, it is safe to compact the store
if (result == kv::DeserialiseSuccess::PASS_SIGNATURE)
{
// If the ledger entry is a signature, it is safe to compact the store
network.tables->compact(ledger_idx);
auto tx = network.tables->create_tx();
GenesisGenerator g(network, tx);
@ -769,6 +762,12 @@ namespace ccf
{
startup_snapshot_info->is_evidence_committed = true;
}
// Inform snapshotter of all signature entries so that this node can
// continue generating snapshots at the correct interval once the
// recovery is complete
snapshotter->record_committable(ledger_idx);
snapshotter->commit(ledger_idx);
}
else if (
result == kv::DeserialiseSuccess::PASS_SNAPSHOT_EVIDENCE &&
@ -846,14 +845,16 @@ namespace ccf
// index and promote network secrets to this index
network.tables->rollback(last_recovered_signed_idx);
ledger_truncate(last_recovered_signed_idx);
snapshotter->rollback(last_recovered_signed_idx);
LOG_INFO_FMT(
"End of public ledger recovery - Truncating ledger to last signed "
"index: {}",
"seqno: {}",
last_recovered_signed_idx);
// KV term must be set before the first Tx is committed
auto new_term = view_history.size() + 2;
LOG_INFO_FMT("Setting term on public recovery KV to {}", new_term);
LOG_INFO_FMT("Setting term on public recovery store to {}", new_term);
network.tables->set_term(new_term);
auto tx = network.tables->create_tx();
@ -867,11 +868,15 @@ namespace ccf
node_encrypt_kp->public_key_pem().raw(),
NodeStatus::PENDING}));
LOG_INFO_FMT("Deleted previous nodes and added self as {}", self);
network.ledger_secrets->init(last_recovered_signed_idx + 1);
network.ledger_secrets->set_node_id(self);
setup_encryptor();
LOG_INFO_FMT("Deleted previous nodes and added self as {}", self);
// Initialise snapshotter after public recovery
snapshotter->init_after_public_recovery();
snapshotter->set_snapshot_generation(false);
kv::Version index = 0;
kv::Term view = 0;
@ -974,7 +979,7 @@ namespace ccf
if (recovery_v != recovery_store->current_version())
{
throw std::logic_error(fmt::format(
"Private recovery did not reach public ledger version: {}/{}",
"Private recovery did not reach public ledger seqno: {}/{}",
recovery_store->current_version(),
recovery_v));
}
@ -996,7 +1001,7 @@ namespace ccf
consensus->enable_all_domains();
// Snapshots are only generated after recovery is complete
snapshotter->set_tx_interval(recovery_snapshot_tx_interval);
snapshotter->set_snapshot_generation(true);
// Open the service
if (consensus->is_primary())
@ -1764,6 +1769,12 @@ namespace ccf
}
}
void setup_snapshotter(size_t snapshot_tx_interval)
{
snapshotter = std::make_shared<Snapshotter>(
writer_factory, network, snapshot_tx_interval);
}
void setup_tracker_store()
{
if (tracker_store == nullptr)

Просмотреть файл

@ -50,6 +50,9 @@ namespace ccf
// Index at which the lastest snapshot was generated
consensus::Index last_snapshot_idx = 0;
// Used to suspend snapshot generation during public recovery
bool snapshot_generation_enabled = true;
// Indices at which a snapshot will be next generated
std::deque<consensus::Index> next_snapshot_indices;
@ -127,24 +130,36 @@ namespace ccf
public:
Snapshotter(
ringbuffer::AbstractWriterFactory& writer_factory,
NetworkState& network_) :
NetworkState& network_,
size_t snapshot_tx_interval_) :
to_host(writer_factory.create_writer_to_outside()),
network(network_)
network(network_),
snapshot_tx_interval(snapshot_tx_interval_)
{
next_snapshot_indices.push_back(last_snapshot_idx);
}
void set_tx_interval(size_t snapshot_tx_interval_)
void init_after_public_recovery()
{
// After public recovery, the first node should have restored all
// snapshot indices in next_snapshot_indices so that snapshot
// generation can continue at the correct interval
std::lock_guard<SpinLock> guard(lock);
last_snapshot_idx = next_snapshot_indices.back();
}
void set_snapshot_generation(bool enabled)
{
std::lock_guard<SpinLock> guard(lock);
snapshot_tx_interval = snapshot_tx_interval_;
snapshot_generation_enabled = enabled;
}
void set_last_snapshot_idx(consensus::Index idx)
{
// Should only be called once, after a snapshot has been applied
std::lock_guard<SpinLock> guard(lock);
// Should only be called once, after a snapshot has been applied
if (last_snapshot_idx != 0)
{
throw std::logic_error(
@ -158,8 +173,12 @@ namespace ccf
next_snapshot_indices.push_back(last_snapshot_idx);
}
void snapshot(consensus::Index idx)
void update(consensus::Index idx, bool generate_snapshot)
{
// If generate_snapshot is true, takes a snapshot of the key value store
// at idx, and schedule snapshot serialisation on another thread
// (round-robin). Otherwise, only record that a snapshot was
// generated at idx.
std::lock_guard<SpinLock> guard(lock);
if (idx < last_snapshot_idx)
@ -173,34 +192,48 @@ namespace ccf
if (idx - last_snapshot_idx >= snapshot_tx_interval)
{
auto msg = std::make_unique<threading::Tmsg<SnapshotMsg>>(&snapshot_cb);
msg->data.self = shared_from_this();
msg->data.snapshot = network.tables->snapshot(idx);
last_snapshot_idx = idx;
static uint32_t generation_count = 0;
threading::ThreadMessaging::thread_messaging.add_task(
threading::ThreadMessaging::get_execution_thread(generation_count++),
std::move(msg));
if (generate_snapshot && snapshot_generation_enabled)
{
auto msg =
std::make_unique<threading::Tmsg<SnapshotMsg>>(&snapshot_cb);
msg->data.self = shared_from_this();
msg->data.snapshot = network.tables->snapshot(idx);
static uint32_t generation_count = 0;
threading::ThreadMessaging::thread_messaging.add_task(
threading::ThreadMessaging::get_execution_thread(
generation_count++),
std::move(msg));
}
}
}
bool record_committable(consensus::Index idx)
{
// Returns true if the committable idx will require the generation of a
// snapshot, and thus a new ledger chunk
std::lock_guard<SpinLock> guard(lock);
if ((idx - next_snapshot_indices.back()) >= snapshot_tx_interval)
{
next_snapshot_indices.push_back(idx);
return true;
}
return false;
}
void commit(consensus::Index idx)
{
std::lock_guard<SpinLock> guard(lock);
while (!next_snapshot_indices.empty() &&
while ((next_snapshot_indices.size() > 1) &&
(next_snapshot_indices.front() < idx))
{
next_snapshot_indices.pop_front();
}
if (next_snapshot_indices.empty())
{
next_snapshot_indices.push_back(last_snapshot_idx);
}
for (auto it = snapshot_evidence_indices.begin();
it != snapshot_evidence_indices.end();)
{
@ -223,19 +256,6 @@ namespace ccf
}
}
bool requires_snapshot(consensus::Index idx)
{
std::lock_guard<SpinLock> guard(lock);
// Returns true if the idx will require the generation of a snapshot
if ((idx - next_snapshot_indices.back()) >= snapshot_tx_interval)
{
next_snapshot_indices.push_back(idx);
return true;
}
return false;
}
void rollback(consensus::Index idx)
{
std::lock_guard<SpinLock> guard(lock);

Просмотреть файл

@ -70,23 +70,22 @@ TEST_CASE("Regular snapshotting")
issue_transactions(network, snapshot_tx_interval * interval_count);
auto snapshotter =
std::make_shared<ccf::Snapshotter>(*writer_factory, network);
snapshotter->set_tx_interval(snapshot_tx_interval);
auto snapshotter = std::make_shared<ccf::Snapshotter>(
*writer_factory, network, snapshot_tx_interval);
REQUIRE_FALSE(snapshotter->requires_snapshot(snapshot_tx_interval - 1));
REQUIRE(snapshotter->requires_snapshot(snapshot_tx_interval));
REQUIRE_FALSE(snapshotter->record_committable(snapshot_tx_interval - 1));
REQUIRE(snapshotter->record_committable(snapshot_tx_interval));
INFO("Generate snapshots at regular intervals");
{
for (size_t i = 1; i <= interval_count; i++)
{
// No snapshot generated if < interval
snapshotter->snapshot(i * (snapshot_tx_interval - 1));
snapshotter->update(i * (snapshot_tx_interval - 1), true);
threading::ThreadMessaging::thread_messaging.run_one();
REQUIRE(read_ringbuffer_out(eio) == std::nullopt);
snapshotter->snapshot(i * snapshot_tx_interval);
snapshotter->update(i * snapshot_tx_interval, true);
threading::ThreadMessaging::thread_messaging.run_one();
REQUIRE(
read_ringbuffer_out(eio) ==
@ -97,7 +96,7 @@ TEST_CASE("Regular snapshotting")
INFO("Cannot snapshot before latest snapshot");
{
REQUIRE_THROWS_AS(
snapshotter->snapshot(snapshot_tx_interval - 1), std::logic_error);
snapshotter->update(snapshot_tx_interval - 1, true), std::logic_error);
}
}
@ -115,13 +114,12 @@ TEST_CASE("Commit snapshot evidence")
size_t snapshot_tx_interval = 10;
issue_transactions(network, snapshot_tx_interval);
auto snapshotter =
std::make_shared<ccf::Snapshotter>(*writer_factory, network);
snapshotter->set_tx_interval(snapshot_tx_interval);
auto snapshotter = std::make_shared<ccf::Snapshotter>(
*writer_factory, network, snapshot_tx_interval);
INFO("Generate snapshot");
{
snapshotter->snapshot(snapshot_tx_interval);
snapshotter->update(snapshot_tx_interval, true);
threading::ThreadMessaging::thread_messaging.run_one();
REQUIRE(
read_ringbuffer_out(eio) ==
@ -162,13 +160,12 @@ TEST_CASE("Rollback before evidence is committed")
size_t snapshot_tx_interval = 10;
issue_transactions(network, snapshot_tx_interval);
auto snapshotter =
std::make_shared<ccf::Snapshotter>(*writer_factory, network);
snapshotter->set_tx_interval(snapshot_tx_interval);
auto snapshotter = std::make_shared<ccf::Snapshotter>(
*writer_factory, network, snapshot_tx_interval);
INFO("Generate snapshot");
{
snapshotter->snapshot(snapshot_tx_interval);
snapshotter->update(snapshot_tx_interval, true);
threading::ThreadMessaging::thread_messaging.run_one();
REQUIRE(
read_ringbuffer_out(eio) ==
@ -193,7 +190,7 @@ TEST_CASE("Rollback before evidence is committed")
issue_transactions(network, snapshot_tx_interval);
size_t snapshot_idx = network.tables->current_version();
snapshotter->snapshot(snapshot_idx);
snapshotter->update(snapshot_idx, true);
threading::ThreadMessaging::thread_messaging.run_one();
REQUIRE(
read_ringbuffer_out(eio) == rb_msg({consensus::snapshot, snapshot_idx}));

Просмотреть файл

@ -120,7 +120,6 @@ def run(args):
# If the network was changed (e.g. recovery test), stop the previous network
# and use the new network from now on
if new_network != network:
network.stop_all_nodes()
network = new_network
LOG.debug(f"Test {s.test_name(test)} took {test_elapsed:.2f} secs")

Просмотреть файл

@ -485,12 +485,59 @@ class Network:
def stop_all_nodes(self):
fatal_error_found = False
longest_ledger_seqno = 0
most_up_to_date_node = None
committed_ledger_dirs = {}
for node in self.nodes:
_, fatal_errors = node.stop()
if fatal_errors:
fatal_error_found = True
LOG.info("All nodes stopped...")
# Find stopped node with longest ledger
_, committed_ledger_dir = node.get_ledger(include_read_only_dirs=True)
ledger_end_seqno = 0
for ledger_file in os.listdir(committed_ledger_dir):
end_seqno = infra.node.get_committed_ledger_end_seqno(ledger_file)
if end_seqno > ledger_end_seqno:
ledger_end_seqno = end_seqno
if ledger_end_seqno > longest_ledger_seqno:
longest_ledger_seqno = ledger_end_seqno
most_up_to_date_node = node
committed_ledger_dirs[node.node_id] = [
committed_ledger_dir,
ledger_end_seqno,
]
LOG.info("All nodes stopped")
# Verify that all ledger files on stopped nodes exist on most up-to-date node
# and are identical
if most_up_to_date_node:
longest_ledger_dir, _ = committed_ledger_dirs[most_up_to_date_node.node_id]
for node_id, (committed_ledger_dir, _) in (
l
for l in committed_ledger_dirs.items()
if not l[0] == most_up_to_date_node.node_id
):
for ledger_file in os.listdir(committed_ledger_dir):
if ledger_file not in os.listdir(longest_ledger_dir):
raise Exception(
f"Ledger file on node {node_id} does not exist on most up-to-date node {most_up_to_date_node.node_id}: {ledger_file}"
)
if infra.path.compute_file_checksum(
os.path.join(longest_ledger_dir, ledger_file)
) != infra.path.compute_file_checksum(
os.path.join(committed_ledger_dir, ledger_file)
):
raise Exception(
f"Ledger file checksums between node {node_id} and most up-to-date node {most_up_to_date_node.node_id} did not match: {ledger_file}"
)
LOG.success(
f"Verified ledger files consistency on all {len(self.nodes)} stopped nodes"
)
if fatal_error_found:
if self.ignoring_shutdown_errors:

Просмотреть файл

@ -39,9 +39,20 @@ def is_file_committed(file_name):
return ".committed" in file_name
def is_ledger_file(file_name):
return file_name.startswith("ledger_")
def get_committed_ledger_end_seqno(file_name):
if not is_ledger_file(file_name) or not is_file_committed(file_name):
raise ValueError(f"{file_name} ledger file is not a committed ledger file")
return int(re.findall(r"\d+", file_name)[1])
def get_snapshot_seqnos(file_name):
# Returns the tuple (snapshot_seqno, evidence_seqno)
return int(re.findall(r"\d+", file_name)[0]), int(re.findall(r"\d+", file_name)[1])
seqnos = re.findall(r"\d+", file_name)
return int(seqnos[0]), int(seqnos[1])
class Node:
@ -270,11 +281,13 @@ class Node:
except ccf.clients.CCFConnectionException as e:
raise TimeoutError(f"Node {self.node_id} failed to join the network") from e
def get_ledger(self, **kwargs):
def get_ledger(self, include_read_only_dirs=False):
"""
Triage committed and un-committed (i.e. current) ledger files
"""
main_ledger_dir, read_only_ledger_dirs = self.remote.get_ledger(**kwargs)
main_ledger_dir, read_only_ledger_dirs = self.remote.get_ledger(
f"{self.node_id}.ledger", include_read_only_dirs
)
current_ledger_dir = os.path.join(
self.common_dir, f"{self.node_id}.ledger.current"

Просмотреть файл

@ -3,6 +3,7 @@
import os
from contextlib import contextmanager
from shutil import copy2, rmtree
import hashlib
from loguru import logger as LOG
@ -86,6 +87,14 @@ def copy_dir(src_path, dst_path):
copy2(src_path, dst_path)
def compute_file_checksum(file_name):
h = hashlib.sha256()
with open(file_name, "rb") as f:
for b in iter(lambda: f.read(4096), b""):
h.update(b)
return h.hexdigest()
@contextmanager
def working_dir(path):
cwd = os.getcwd()

Просмотреть файл

@ -420,14 +420,7 @@ class LocalRemote(CmdMixin):
def _cp(self, src_path, dst_path):
if os.path.isdir(src_path):
assert (
self._rc(
"rm -rf {}".format(
os.path.join(dst_path, os.path.basename(src_path))
)
)
== 0
)
assert self._rc("rm -rf {}".format(os.path.join(dst_path))) == 0
assert self._rc("cp -r {} {}".format(src_path, dst_path)) == 0
else:
assert self._rc("cp {} {}".format(src_path, dst_path)) == 0
@ -461,10 +454,8 @@ class LocalRemote(CmdMixin):
raise ValueError(path)
if not pre_condition_func(path, os.listdir):
raise RuntimeError("Pre-condition for getting remote files failed")
if target_name is not None:
self._cp(path, os.path.join(dst_path, target_name))
else:
self._cp(path, dst_path)
target_name = target_name or os.path.basename(src_path)
self._cp(path, os.path.join(dst_path, target_name))
def list_files(self):
return os.listdir(self.root)
@ -794,18 +785,29 @@ class CCFRemote(object):
# For now, it makes sense to default include_read_only_dirs to False
# but when nodes started from snapshots are fully supported in the test
# suite, this argument will probably default to True (or be deleted entirely)
def get_ledger(self, include_read_only_dirs=False):
self.remote.get(self.ledger_dir_name, self.common_dir)
def get_ledger(self, ledger_dir_name, include_read_only_dirs=False):
self.remote.get(
self.ledger_dir_name,
self.common_dir,
target_name=ledger_dir_name,
)
read_only_ledger_dirs = []
if include_read_only_dirs and self.read_only_ledger_dir is not None:
read_only_ledger_dir_name = (
f"{ledger_dir_name}.ro"
if ledger_dir_name
else self.read_only_ledger_dir
)
self.remote.get(
os.path.basename(self.read_only_ledger_dir), self.common_dir
os.path.basename(self.read_only_ledger_dir),
self.common_dir,
target_name=read_only_ledger_dir_name,
)
read_only_ledger_dirs.append(
os.path.join(self.common_dir, self.read_only_ledger_dir)
os.path.join(self.common_dir, read_only_ledger_dir_name)
)
return (
os.path.join(self.common_dir, self.ledger_dir_name),
os.path.join(self.common_dir, ledger_dir_name),
read_only_ledger_dirs,
)

Просмотреть файл

@ -36,15 +36,16 @@ def save_committed_ledger_files(network, args):
def run(args):
with infra.network.network(
args.nodes,
args.binary_dir,
args.debug_nodes,
args.perf_nodes,
pdb=args.pdb,
) as network:
with tempfile.TemporaryDirectory() as tmp_dir:
with infra.network.network(
args.nodes,
args.binary_dir,
args.debug_nodes,
args.perf_nodes,
pdb=args.pdb,
) as network:
with tempfile.TemporaryDirectory() as tmp_dir:
args.common_read_only_ledger_dir = tmp_dir
network.start_and_join(args)
@ -55,7 +56,7 @@ if __name__ == "__main__":
args = infra.e2e_args.cli_args()
args.package = "liblogging"
args.nodes = infra.e2e_args.max_nodes(args, f=0)
args.nodes = infra.e2e_args.min_nodes(args, f=0)
args.initial_user_count = 1
args.ledger_chunk_bytes = "1" # Chunk ledger at every signature transaction
run(args)

Просмотреть файл

@ -21,6 +21,8 @@ def test(network, args, from_snapshot=False):
include_read_only_dirs=True
)
network.stop_all_nodes()
recovered_network = infra.network.Network(
args.nodes, args.binary_dir, args.debug_nodes, args.perf_nodes, network
)
@ -46,6 +48,8 @@ def test_share_resilience(network, args, from_snapshot=False):
include_read_only_dirs=True
)
network.stop_all_nodes()
recovered_network = infra.network.Network(
args.nodes, args.binary_dir, args.debug_nodes, args.perf_nodes, network
)
@ -119,7 +123,6 @@ def run(args):
)
else:
recovered_network = test(network, args, from_snapshot=False)
network.stop_all_nodes()
network = recovered_network
LOG.success("Recovery complete on all nodes")