Unit and end-to-end tests for moving committed ledger files out of main ledger dir (#2093)

This commit is contained in:
Julien Maffre 2021-01-20 15:42:18 +00:00 коммит произвёл GitHub
Родитель b89fc02b0d
Коммит 2feed9885c
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
8 изменённых файлов: 171 добавлений и 22 удалений

Просмотреть файл

@ -739,6 +739,12 @@ if(BUILD_TESTS)
CONSENSUS cft
)
add_e2e_test(
NAME ledger_operation
PYTHON_SCRIPT ${CMAKE_SOURCE_DIR}/tests/ledger_operation.py
CONSENSUS cft
)
if(NOT SAN)
# Writing new ledger files and generating new snapshots uses more file
# descriptors so disable those for this test

Просмотреть файл

@ -25,7 +25,7 @@ Ledger files containing only committed entries are named ``ledger_<start_seqno>-
Ledger files that still contain some uncommitted entries are named ``ledger_<start_seqno>-<end_seqno>`` or ``ledger_<start_seqno>`` for the most recent one. These files are typically held open by the ``cchost`` process, which may modify their content, or even erase them completely. Uncommitted ledger files may differ arbitrarily across nodes.
.. warning:: Removing files from the ``--ledger-dir`` ledger directory may cause a node to crash.
.. warning:: Removing `uncommitted` ledger files from the ``--ledger-dir`` ledger directory may cause a node to crash. It is however safe to move `committed` ledger files to another directory, accessible to a CCF node via the ``--read-only-ledger-dir`` command line argument.
It is important to note that while all entries stored in ledger files ending in ``.committed`` are committed, not all committed entries are stored in such a file at any given time. A number of them are typically in the in-progress files, waiting to be flushed to a ``.committed`` file once the size threshold (``--ledger-chunk-bytes``) is met.

Просмотреть файл

@ -583,11 +583,11 @@ namespace asynchost
// the read cache is full
auto match_file =
std::make_shared<LedgerFile>(ledger_dir_, match.value());
if (files_read_cache.size() >= max_read_cache_files)
files_read_cache.emplace_back(match_file);
if (files_read_cache.size() > max_read_cache_files)
{
files_read_cache.erase(files_read_cache.begin());
}
files_read_cache.emplace_back(match_file);
return match_file;
}
@ -652,8 +652,8 @@ namespace asynchost
LOG_DEBUG_FMT("Recovering read-only ledger directory \"{}\"", read_dir);
if (!fs::is_directory(read_dir))
{
throw std::logic_error(
fmt::format("\"{}\" is not a directory", read_dir));
throw std::logic_error(fmt::format(
"\"{}\" read-only ledger is not a directory", read_dir));
}
for (auto const& f : fs::directory_iterator(read_dir))

Просмотреть файл

@ -15,6 +15,7 @@ using namespace asynchost;
using frame_header_type = uint32_t;
static constexpr size_t frame_header_size = sizeof(frame_header_type);
static constexpr auto ledger_dir = "ledger_dir";
static constexpr auto ledger_dir_read_only = "ledger_dir_ro";
static constexpr auto snapshot_dir = "snapshot_dir";
static const auto dummy_snapshot = std::vector<uint8_t>(128, 42);
@ -26,6 +27,19 @@ ringbuffer::Circuit eio(in_buffer->bd, out_buffer->bd);
auto wf = ringbuffer::WriterFactory(eio);
void move_all_from_to(
const std::string& from, const std::string& to, const std::string& suffix)
{
for (auto const& f : fs::directory_iterator(from))
{
if (nonstd::ends_with(f.path().filename(), suffix))
{
fs::copy_file(f.path(), fs::path(to) / f.path().filename());
fs::remove(f.path());
}
}
}
std::string get_snapshot_file_name(
size_t idx, size_t evidence_idx, size_t evidence_commit_idx)
{
@ -119,8 +133,13 @@ void read_entry_from_ledger(Ledger& ledger, size_t idx)
void read_entries_range_from_ledger(Ledger& ledger, size_t from, size_t to)
{
verify_framed_entries_range(
ledger.read_framed_entries(from, to).value(), from, to);
auto entries = ledger.read_framed_entries(from, to);
if (!entries.has_value())
{
throw std::logic_error(
fmt::format("Failed to read ledger entries from {} to {}", from, to));
}
verify_framed_entries_range(entries.value(), from, to);
}
// Keeps track of ledger entries written to the ledger.
@ -934,6 +953,70 @@ TEST_CASE("Invalid ledger file resilience")
}
}
TEST_CASE("Delete committed file from main directory")
{
// Used to temporarily copy committed ledger files
static constexpr auto ledger_dir_tmp = "ledger_dir_tmp";
fs::remove_all(ledger_dir);
fs::remove_all(ledger_dir_read_only);
fs::remove_all(ledger_dir_tmp);
size_t chunk_threshold = 30;
size_t chunk_count = 5;
// Worst-case scenario: do not keep any committed file in cache
size_t max_read_cache_size = 0;
size_t entries_per_chunk = 0;
size_t last_idx = 0;
size_t last_committed_idx = 0;
fs::create_directory(ledger_dir_read_only);
fs::create_directory(ledger_dir_tmp);
Ledger ledger(
ledger_dir,
wf,
chunk_threshold,
max_read_cache_size,
{ledger_dir_read_only});
TestEntrySubmitter entry_submitter(ledger);
INFO("Write many entries on ledger");
{
entries_per_chunk =
initialise_ledger(entry_submitter, chunk_threshold, chunk_count);
last_committed_idx = entry_submitter.get_last_idx();
ledger.commit(last_committed_idx);
entry_submitter.write(true);
entry_submitter.write(true);
last_idx = entry_submitter.get_last_idx();
// Read all entries from ledger, filling up read cache
read_entries_range_from_ledger(ledger, 1, last_idx);
}
// Move all committed files to temporary directory
move_all_from_to(ledger_dir, ledger_dir_tmp, ledger_committed_suffix);
INFO("Only non-committed entries can be read");
{
read_entries_range_from_ledger(ledger, last_idx - 1, last_idx);
REQUIRE_FALSE(
ledger.read_framed_entries(1, last_committed_idx).has_value());
}
INFO("Move committed files back to read-only ledger directory");
{
move_all_from_to(
ledger_dir_tmp, ledger_dir_read_only, ledger_committed_suffix);
read_entries_range_from_ledger(ledger, 1, last_idx);
}
}
TEST_CASE("Find latest snapshot with corresponding ledger chunk")
{
fs::remove_all(ledger_dir);

Просмотреть файл

@ -262,6 +262,12 @@ def cli_args(add=lambda x: None, parser=None, accept_unknown=False):
help="Disable session auth for members",
action="store_true",
)
parser.add_argument(
"--common-read-only-ledger-dir",
help="Location of read-only ledger directory available to all nodes",
type=str,
default=None,
)
add(parser)

Просмотреть файл

@ -79,6 +79,7 @@ class Network:
"domain",
"snapshot_tx_interval",
"jwt_key_refresh_interval_s",
"common_read_only_ledger_dir",
]
# Maximum delay (seconds) for updates to propagate from the primary to backups

Просмотреть файл

@ -383,20 +383,6 @@ class SSHRemote(CmdMixin):
client.close()
@contextmanager
def ssh_remote(*args, **kwargs):
"""
Context Manager wrapper for SSHRemote
"""
remote = SSHRemote(*args, **kwargs)
try:
remote.setup()
remote.start()
yield remote
finally:
remote.stop()
class LocalRemote(CmdMixin):
def __init__(
self,
@ -592,7 +578,8 @@ class CCFRemote(object):
memory_reserve_startup=0,
gov_script=None,
ledger_dir=None,
read_only_ledger_dir=None,
read_only_ledger_dir=None, # Read-only ledger dir to copy to node director
common_read_only_ledger_dir=None, # Read-only ledger dir for all nodes
log_format_json=None,
binary_dir=".",
ledger_chunk_bytes=(5 * 1000 * 1000),
@ -619,7 +606,9 @@ class CCFRemote(object):
if self.ledger_dir
else f"{local_node_id}.ledger"
)
self.read_only_ledger_dir = read_only_ledger_dir
self.common_read_only_ledger_dir = common_read_only_ledger_dir
self.snapshot_dir = os.path.normpath(snapshot_dir) if snapshot_dir else None
self.snapshot_dir_name = (
@ -692,6 +681,9 @@ class CCFRemote(object):
]
data_files += [os.path.join(self.common_dir, self.read_only_ledger_dir)]
if self.common_read_only_ledger_dir is not None:
cmd += [f"--read-only-ledger-dir={self.common_read_only_ledger_dir}"]
if start_type == StartType.new:
cmd += [
"start",

61
tests/ledger_operation.py Normal file
Просмотреть файл

@ -0,0 +1,61 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the Apache 2.0 License.
import tempfile
import os
import shutil
import infra.logging_app as app
import infra.e2e_args
import infra.network
import suite.test_requirements as reqs
from loguru import logger as LOG
@reqs.description("Move committed ledger files to read-only directory")
def save_committed_ledger_files(network, args):
txs = app.LoggingTxs()
# Issue txs in a loop to force a signature and a new ledger chunk
# each time. Record log messages at the same key (repeat=True) so
# that CCF makes use of historical queries when verifying messages
for _ in range(1, 5):
txs.issue(network, 1, repeat=True)
LOG.info(f"Moving committed ledger files to {args.common_read_only_ledger_dir}")
primary, _ = network.find_primary()
for l in os.listdir(primary.remote.ledger_path()):
if infra.node.is_file_committed(l):
shutil.move(
os.path.join(primary.remote.ledger_path(), l),
os.path.join(args.common_read_only_ledger_dir, l),
)
txs.verify(network)
return network
def run(args):
with infra.network.network(
args.nodes,
args.binary_dir,
args.debug_nodes,
args.perf_nodes,
pdb=args.pdb,
) as network:
with tempfile.TemporaryDirectory() as tmp_dir:
args.common_read_only_ledger_dir = tmp_dir
network.start_and_join(args)
save_committed_ledger_files(network, args)
if __name__ == "__main__":
args = infra.e2e_args.cli_args()
args.package = "liblogging"
args.nodes = infra.e2e_args.max_nodes(args, f=0)
args.initial_user_count = 1
args.ledger_chunk_bytes = "1" # Chunk ledger at every signature transaction
run(args)