This commit is contained in:
Julien Maffre 2020-06-15 15:58:45 +01:00 коммит произвёл GitHub
Родитель e1b405d594
Коммит 5b5c2842b5
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
25 изменённых файлов: 1615 добавлений и 333 удалений

Просмотреть файл

@ -17,7 +17,7 @@ steps:
-b ${{ parameters.install_prefix }}/bin \
-g $(pwd)/../../src/runtime_config/gov.lua \
-v
cp ./workspace/start_network_0/0.ledger .
cp -r ./workspace/start_network_0/0.ledger .
cp ./workspace/start_network_0/network_enc_pubk.pem .
timeout --signal=SIGINT --kill-after=30s --preserve-status 30s \
python ${{ parameters.install_prefix }}/bin/start_network.py \
@ -25,7 +25,7 @@ steps:
-b ${{ parameters.install_prefix }}/bin \
-v \
--recover \
--ledger 0.ledger \
--ledger-dir 0.ledger \
--network-enc-pubk network_enc_pubk.pem \
--common-dir ./workspace/start_network_common/
displayName: Test installed CCF

Просмотреть файл

@ -313,7 +313,8 @@ if(BUILD_TESTS)
target_link_libraries(encryptor_test PRIVATE secp256k1.host)
add_unit_test(
historical_queries_test ${CMAKE_CURRENT_SOURCE_DIR}/src/node/test/historical_queries.cpp
historical_queries_test
${CMAKE_CURRENT_SOURCE_DIR}/src/node/test/historical_queries.cpp
)
target_link_libraries(historical_queries_test PRIVATE secp256k1.host)
@ -461,9 +462,7 @@ if(BUILD_TESTS)
kv_bench SRCS src/kv/test/kv_bench.cpp src/crypto/symmetric_key.cpp
src/enclave/thread_local.cpp
)
add_picobench(
hash_bench SRCS src/ds/test/hash_bench.cpp
)
add_picobench(hash_bench SRCS src/ds/test/hash_bench.cpp)
# Storing signed governance operations
add_e2e_test(
@ -483,6 +482,10 @@ if(BUILD_TESTS)
# purpose to check that a recovery network is robust to a view change.
--raft-election-timeout
4000
# Smaller ledger chunks than default to make sure that a ledger made of
# multiple chunks can be recovered
--ledger-chunk-max-bytes
20KB
)
add_e2e_test(

Просмотреть файл

@ -3,7 +3,7 @@ Ledger
The ledger is the persistent distributed append-only record of the transactions that have been executed by the network. It is written by the primary when a transaction is committed and replicated to all backups which maintain their own duplicated copy.
A node writes its ledger to a file as specified by the ``--ledger-file`` command line argument.
A node writes its ledger to a directory as specified by the ``--ledger-dir`` command line argument.
Ledger Encryption
-----------------

Просмотреть файл

@ -26,12 +26,12 @@ To initiate the first phase of the recovery procedure, one or several nodes shou
--rpc-address <ccf-node-address>
--public-rpc-address <ccf-node-public-address>
[--domain domain]
--ledger-file /path/to/ledger/to/recover
--ledger-dir /path/to/ledger/dir/to/recover
--node-cert-file /path/to/node_certificate
recover
--network-cert-file /path/to/network_certificate
Each node will then immediately restore the public entries of its ledger (``--ledger-file``). Because deserialising the public entries present in the ledger may take some time, operators can query the progress of the public recovery by calling ``getSignedIndex`` which returns the version of the last signed recovered ledger entry. Once the public ledger is fully recovered, the recovered node automatically becomes part of the public network, allowing other nodes to join the network.
Each node will then immediately restore the public entries of its ledger (``--ledger-dir``). Because deserialising the public entries present in the ledger may take some time, operators can query the progress of the public recovery by calling ``getSignedIndex`` which returns the version of the last signed recovered ledger entry. Once the public ledger is fully recovered, the recovered node automatically becomes part of the public network, allowing other nodes to join the network.
.. note:: If more than one node were started in ``recover`` mode, the node with the highest signed index (as per the response to the ``getSignedIndex`` RPC) should be preferred to start the new network. Other nodes should be shutdown and new nodes restarted with the ``join`` option.
@ -44,7 +44,7 @@ Similarly to the normal join protocol (see :ref:`operators/start_network:Adding
participant Node 2
participant Node 3
Operators->>+Node 2: cchost --rpc-address=ip2:port2 --ledger-file=ledger0 recover
Operators->>+Node 2: cchost --rpc-address=ip2:port2 --ledger-dir=./ledger recover
Node 2-->>Operators: Network Certificate
Note over Node 2: Reading Public Ledger...

Просмотреть файл

@ -19,7 +19,7 @@ To create a new CCF network, the first node of the network should be invoked wit
--node-address <ccf-node-to-node-address>
--public-rpc-address <ccf-node-public-address>
[--domain domain]
--ledger-file /path/to/ledger
--ledger-dir /path/to/ledger/dir
--node-cert-file /path/to/node_certificate
[--sig-max-tx number_of_transactions]
[--sig-max-ms number_of_milliseconds]
@ -81,7 +81,7 @@ To add a new node to an existing opening network, other nodes should be started
--rpc-address <ccf-node-address>
--node-address <ccf-node-to-node-address>
--public-rpc-address <ccf-node-public-address>
--ledger-file /path/to/ledger
--ledger-dir /path/to/ledger/dir
--node-cert-file /path/to/node_certificate
join
--network-cert-file /path/to/existing/network_certificate

Просмотреть файл

@ -41,9 +41,9 @@ The ``start_test_network.sh`` script can also be used to automatically recover a
.. code-block:: bash
$ cd CCF/build
$ cp ./workspace/test_network_0/0.ledger .
$ cp -r ./workspace/test_network_0/0.ledger .
$ cp ./workspace/test_network_0/network_enc_pubk.pem .
$ ../start_test_network.sh -p liblogging.enclave.so.signed --recover --ledger 0.ledger --network-enc-pubk network_enc_pubk.pem --common-dir ./workspace/test_network_common/
$ ../start_test_network.sh -p liblogging.enclave.so.signed --recover --ledger-dir 0.ledger --network-enc-pubk network_enc_pubk.pem --common-dir ./workspace/test_network_common/
[2020-05-14 14:50:19.746] Starting 3 CCF nodes...
[2020-05-14 14:50:19.746] Recovering network from:
[2020-05-14 14:50:19.746] - Ledger: 0.ledger

Просмотреть файл

@ -28,11 +28,11 @@ namespace consensus
* Put a single entry to be written the ledger, when primary.
*
* @param entry Serialised entry
* @param globally_committable True is entry is signature transaction
*/
void put_entry(const std::vector<uint8_t>& entry)
void put_entry(const std::vector<uint8_t>& entry, bool globally_committable)
{
// write the message
RINGBUFFER_WRITE_MESSAGE(consensus::ledger_append, to_host, entry);
put_entry(entry.data(), entry.size(), globally_committable);
}
/**
@ -40,33 +40,14 @@ namespace consensus
*
* @param data Serialised entry start
* @param size Serialised entry size
* @param globally_committable True is entry is signature transaction
*/
void put_entry(const uint8_t* data, size_t size)
void put_entry(const uint8_t* data, size_t size, bool globally_committable)
{
serializer::ByteRange byte_range = {data, size};
// write the message
RINGBUFFER_WRITE_MESSAGE(consensus::ledger_append, to_host, byte_range);
}
/**
* Record a single entry to the ledger, when backup.
*
* @param data Serialised entries
* @param size Size of overall serialised entries
*
* @return Pair of boolean status (false if rejected), raw data as a vector
*/
std::pair<std::vector<uint8_t>, bool> record_entry(
const uint8_t*& data, size_t& size)
{
auto entry_len = serialized::read<uint32_t>(data, size);
std::vector<uint8_t> entry(data, data + entry_len);
RINGBUFFER_WRITE_MESSAGE(consensus::ledger_append, to_host, entry);
serialized::skip(data, size, entry_len);
return std::make_pair(std::move(entry), true);
RINGBUFFER_WRITE_MESSAGE(
consensus::ledger_append, to_host, globally_committable, byte_range);
}
/**
@ -83,15 +64,20 @@ namespace consensus
serialized::skip(data, size, entry_len);
}
std::pair<std::vector<uint8_t>, bool> get_entry(
const uint8_t*& data, size_t& size)
/**
* Retrieve a single entry, advancing offset to the next entry.
*
* @param data Serialised entries
* @param size Size of overall serialised entries
*
* @return Raw entry as a vector
*/
std::vector<uint8_t> get_entry(const uint8_t*& data, size_t& size)
{
auto entry_len = serialized::read<uint32_t>(data, size);
std::vector<uint8_t> entry(data, data + entry_len);
serialized::skip(data, size, entry_len);
return std::make_pair(std::move(entry), true);
return entry;
}
/**
@ -103,5 +89,15 @@ namespace consensus
{
RINGBUFFER_WRITE_MESSAGE(consensus::ledger_truncate, to_host, idx);
}
/**
* Commit the ledger at a given index.
*
* @param idx Index to commit at
*/
void commit(Index idx)
{
RINGBUFFER_WRITE_MESSAGE(consensus::ledger_commit, to_host, idx);
}
};
}

Просмотреть файл

@ -30,6 +30,7 @@ namespace consensus
/// Modify the local ledger. Enclave -> Host
DEFINE_RINGBUFFER_MSG_TYPE(ledger_append),
DEFINE_RINGBUFFER_MSG_TYPE(ledger_truncate),
DEFINE_RINGBUFFER_MSG_TYPE(ledger_commit),
///@}
};
}
@ -46,6 +47,7 @@ DECLARE_RINGBUFFER_MESSAGE_PAYLOAD(
consensus::Index,
consensus::LedgerRequestPurpose);
DECLARE_RINGBUFFER_MESSAGE_PAYLOAD(
consensus::ledger_append, std::vector<uint8_t>);
consensus::ledger_append, bool /* committable */, std::vector<uint8_t>);
DECLARE_RINGBUFFER_MESSAGE_PAYLOAD(
consensus::ledger_truncate, consensus::Index);
DECLARE_RINGBUFFER_MESSAGE_PAYLOAD(consensus::ledger_commit, consensus::Index);

Просмотреть файл

@ -2,12 +2,12 @@
// Licensed under the MIT license.
#pragma once
#include "consensus/pbft/libbyz/ledger.h"
#include "consensus/pbft/pbft_new_views.h"
#include "consensus/pbft/pbft_pre_prepares.h"
#include "consensus/pbft/pbft_requests.h"
#include "consensus/pbft/pbft_types.h"
#include "kv/kv_types.h"
#include "ledger.h"
#include "new_view.h"
#include "node/signatures.h"
#include "prepared_cert.h"

Просмотреть файл

@ -8,7 +8,6 @@
#include "consensus/pbft/pbft_tables.h"
#include "consensus/pbft/pbft_types.h"
#include "ds/ccf_exception.h"
#include "host/ledger.h"
#include "kv/store.h"
#include "kv/test/stub_consensus.h"
#include "message.h"

Просмотреть файл

@ -16,7 +16,6 @@
#include "ds/logger.h"
#include "enclave/rpc_map.h"
#include "enclave/rpc_sessions.h"
#include "host/ledger.h"
#include "kv/kv_types.h"
#include "node/nodetypes.h"
@ -624,26 +623,11 @@ namespace pbft
return client_proxy->get_statistics();
}
template <typename T>
size_t write_to_ledger(const T& data)
{
ledger->put_entry(data->data(), data->size());
return data->size();
}
template <>
size_t write_to_ledger<std::vector<uint8_t>>(
const std::vector<uint8_t>& data)
{
ledger->put_entry(data);
return data.size();
}
bool replicate(const kv::BatchVector& entries) override
{
for (auto& [index, data, globally_committable] : entries)
{
write_to_ledger(data);
ledger->put_entry(*data, globally_committable);
}
return true;
}
@ -833,24 +817,26 @@ namespace pbft
}
LOG_TRACE_FMT("Applying append entry for index {}", i);
auto ret = ledger->get_entry(data, size);
if (!ret.second)
std::vector<uint8_t> entry;
try
{
// NB: This will currently never be triggered.
// This should only fail if there is malformed data. Truncate
// the log and reply false.
entry = ledger->get_entry(data, size);
}
catch (const std::logic_error& e)
{
// This should only fail if there is malformed data.
LOG_FAIL_FMT(
"Recv append entries to {} from {} but the data is malformed",
"Recv append entries to {} from {} but the data is malformed: "
"{}",
local_id,
r.from_node);
ledger->truncate(r.prev_idx);
r.from_node,
e.what());
return;
}
kv::Tx tx;
auto deserialise_success =
store->deserialise_views(ret.first, public_only, nullptr, &tx);
store->deserialise_views(entry, public_only, nullptr, &tx);
switch (deserialise_success)
{

Просмотреть файл

@ -325,21 +325,6 @@ namespace raft
return configurations.back().nodes;
}
template <typename T>
size_t write_to_ledger(const T& data)
{
ledger->put_entry(data->data(), data->size());
return data->size();
}
template <>
size_t write_to_ledger<std::vector<uint8_t>>(
const std::vector<uint8_t>& data)
{
ledger->put_entry(data);
return data.size();
}
template <typename T>
bool replicate(const std::vector<std::tuple<Index, T, bool>>& entries)
{
@ -370,8 +355,8 @@ namespace raft
committable_indices.push_back(index);
last_idx = index;
auto s = write_to_ledger(*data);
entry_size_not_limited += s;
ledger->put_entry(*data, globally_committable);
entry_size_not_limited += data->size();
entry_count++;
term_history.update(index, current_term);
@ -673,27 +658,31 @@ namespace raft
last_idx = i;
is_first_entry = false;
auto ret = ledger->record_entry(data, size);
std::vector<uint8_t> entry;
if (!ret.second)
try
{
// NB: This will currently never be triggered.
// This should only fail if there is malformed data. Truncate
// the log and reply false.
entry = ledger->get_entry(data, size);
}
catch (const std::logic_error& e)
{
// This should only fail if there is malformed data.
LOG_FAIL_FMT(
"Recv append entries to {} from {} but the data is malformed",
"Recv append entries to {} from {} but the data is malformed: {}",
local_id,
r.from_node);
r.from_node,
e.what());
last_idx = r.prev_idx;
ledger->truncate(r.prev_idx);
send_append_entries_response(r.from_node, false);
return;
}
Term sig_term = 0;
auto deserialise_success =
store->deserialise(ret.first, public_only, &sig_term);
store->deserialise(entry, public_only, &sig_term);
ledger->put_entry(
entry, deserialise_success == kv::DeserialiseSuccess::PASS_SIGNATURE);
switch (deserialise_success)
{
@ -1178,6 +1167,7 @@ namespace raft
LOG_DEBUG_FMT("Compacting...");
store->compact(idx);
ledger->commit(idx);
LOG_DEBUG_FMT("Commit on {}: {}", local_id, idx);
// Examine all configurations that are followed by a globally committed

Просмотреть файл

@ -20,8 +20,9 @@ TEST_CASE("Enclave put")
auto enclave = LedgerEnclave(*writer_factory);
bool globally_committable = false;
const std::vector<uint8_t> tx = {'a', 'b', 'c'};
enclave.put_entry(tx);
enclave.put_entry(tx, globally_committable);
size_t num_msgs = 0;
eio.read_from_inside().read(
-1, [&](ringbuffer::Message m, const uint8_t* data, size_t size) {
@ -30,6 +31,7 @@ TEST_CASE("Enclave put")
case consensus::ledger_append:
{
REQUIRE(num_msgs == 0);
REQUIRE(serialized::read<bool>(data, size) == globally_committable);
auto entry = std::vector<uint8_t>(data, data + size);
REQUIRE(entry == tx);
}
@ -55,8 +57,9 @@ TEST_CASE("Enclave record")
auto leader_ledger_enclave = LedgerEnclave(*writer_factory_leader);
auto follower_ledger_enclave = LedgerEnclave(*writer_factory_follower);
bool globally_committable = false;
const std::vector<uint8_t> tx = {'a', 'b', 'c'};
leader_ledger_enclave.put_entry(tx);
leader_ledger_enclave.put_entry(tx, globally_committable);
size_t num_msgs = 0;
std::vector<uint8_t> record;
eio_leader.read_from_inside().read(
@ -66,6 +69,7 @@ TEST_CASE("Enclave record")
case consensus::ledger_append:
{
REQUIRE(num_msgs == 0);
REQUIRE(serialized::read<bool>(data, size) == globally_committable);
copy(data, data + size, back_inserter(record));
}
break;
@ -86,10 +90,9 @@ TEST_CASE("Enclave record")
auto size_ = msg.size();
num_msgs = 0;
auto r = follower_ledger_enclave.record_entry(data__, size_);
REQUIRE(r.second);
REQUIRE(r.first == tx);
auto r = follower_ledger_enclave.get_entry(data__, size_);
REQUIRE(r == tx);
follower_ledger_enclave.put_entry(r, globally_committable);
eio_follower.read_from_inside().read(
-1, [&](ringbuffer::Message m, const uint8_t* data, size_t size) {
switch (m)
@ -97,6 +100,7 @@ TEST_CASE("Enclave record")
case consensus::ledger_append:
{
REQUIRE(num_msgs == 0);
REQUIRE(serialized::read<bool>(data, size) == globally_committable);
auto entry = std::vector<uint8_t>(data, data + size);
REQUIRE(entry == tx);
}

Просмотреть файл

@ -21,7 +21,7 @@ namespace raft
LedgerStubProxy(NodeId id) : _id(id) {}
void put_entry(const std::vector<uint8_t>& data)
void put_entry(const std::vector<uint8_t>& data, bool globally_committable)
{
#ifdef STUB_LOG
std::cout << " Node" << _id << "->>Ledger" << _id
@ -37,24 +37,16 @@ namespace raft
ledger.push_back(buffer);
}
std::pair<std::vector<uint8_t>, bool> record_entry(
const uint8_t*& data, size_t& size)
{
#ifdef STUB_LOG
std::cout << " Node" << _id << "->>Ledger" << _id
<< ": record s: " << size << std::endl;
#endif
auto buffer = std::make_shared<std::vector<uint8_t>>(data, data + size);
ledger.push_back(buffer);
return std::make_pair(*buffer, true);
}
void skip_entry(const uint8_t*& data, size_t& size)
{
skip_count++;
}
std::vector<uint8_t> get_entry(const uint8_t*& data, size_t& size)
{
return {data, data + size};
}
void truncate(Index idx)
{
ledger.resize(idx);
@ -68,6 +60,8 @@ namespace raft
{
skip_count = 0;
}
void commit(Index idx) {}
};
class ChannelStubProxy

Просмотреть файл

@ -204,9 +204,6 @@ namespace enclave
bp, AdminMessage::stop, [&bp, this](const uint8_t*, size_t) {
bp.set_finished();
threading::ThreadMessaging::thread_messaging.set_finished();
auto w = writer_factory.create_writer_to_outside();
LOG_INFO_FMT("Enclave stopped successfully. Stopping host...");
RINGBUFFER_WRITE_MESSAGE(AdminMessage::stopped, w);
});
DISPATCHER_SET_MESSAGE_HANDLER(
@ -354,6 +351,11 @@ namespace enclave
CCF_PAUSE();
}
});
auto w = writer_factory.create_writer_to_outside();
LOG_INFO_FMT("Enclave stopped successfully. Stopping host...");
RINGBUFFER_WRITE_MESSAGE(AdminMessage::stopped, w);
return true;
}
#ifndef VIRTUAL_ENCLAVE

Просмотреть файл

@ -8,186 +8,782 @@
#include <cstdint>
#include <cstdio>
#include <errno.h>
#include <filesystem>
#include <limits>
#include <linux/limits.h>
#include <list>
#include <map>
#include <string>
#include <sys/types.h>
#include <unistd.h>
#include <vector>
namespace fs = std::filesystem;
namespace asynchost
{
class Ledger
static constexpr size_t max_chunk_threshold_size =
std::numeric_limits<uint32_t>::max(); // 4GB
static constexpr size_t max_read_cache_files_default = 5;
static constexpr auto ledger_committed_suffix = "committed";
static constexpr auto ledger_start_idx_delimiter = "_";
static constexpr auto ledger_last_idx_delimiter = "-";
static inline bool is_ledger_file_committed(const std::string& file_name)
{
auto pos = file_name.find(".");
if (pos == std::string::npos)
{
return false;
}
return file_name.substr(pos + 1) == ledger_committed_suffix;
}
static inline size_t get_start_idx_from_file_name(
const std::string& file_name)
{
auto pos = file_name.find(ledger_start_idx_delimiter);
if (pos == std::string::npos)
{
throw std::logic_error(fmt::format(
"Ledger file name {} does not contain a start idx", file_name));
}
return std::stol(file_name.substr(pos + 1));
}
static inline std::optional<size_t> get_last_idx_from_file_name(
const std::string& file_name)
{
auto pos = file_name.find(ledger_last_idx_delimiter);
if (pos == std::string::npos)
{
// Non-committed file names do not contain a last idx
return std::nullopt;
}
return std::stol(file_name.substr(pos + 1));
}
class LedgerFile
{
private:
using positions_offset_header_t = size_t;
static constexpr auto file_name_prefix = "ledger";
static constexpr size_t frame_header_size = sizeof(uint32_t);
const std::string dir;
size_t start_idx = 1;
size_t total_len = 0;
std::vector<uint32_t> positions;
// This uses C stdio instead of fstream because an fstream
// cannot be truncated.
FILE* file;
std::vector<size_t> positions;
size_t total_len;
ringbuffer::WriterPtr to_enclave;
bool completed = false;
bool committed = false;
public:
Ledger(
const std::string& filename,
ringbuffer::AbstractWriterFactory& writer_factory) :
LedgerFile(const std::string& dir, size_t start_idx) :
dir(dir),
file(NULL),
to_enclave(writer_factory.create_writer_to_inside())
start_idx(start_idx)
{
file = fopen(filename.c_str(), "r+b");
const auto filename = fmt::format("{}_{}", file_name_prefix, start_idx);
const auto file_path = fs::path(dir) / fs::path(filename);
file = fopen(file_path.c_str(), "w+b");
if (!file)
file = fopen(filename.c_str(), "w+b");
if (!file)
throw std::logic_error("Unable to open or create ledger file");
fseeko(file, 0, SEEK_END);
auto len = ftello(file);
if (len == 1)
{
std::stringstream ss;
ss << "Failed to tell file size: " << strerror(errno);
throw std::logic_error(ss.str());
}
fseeko(file, 0, SEEK_SET);
size_t pos = 0;
uint32_t size = 0;
while (len >= frame_header_size)
{
if (fread(&size, frame_header_size, 1, file) != 1)
throw std::logic_error("Failed to read from file");
len -= frame_header_size;
if (len < size)
throw std::logic_error("Malformed ledger file");
fseeko(file, size, SEEK_CUR);
len -= size;
positions.push_back(pos);
pos += (size + frame_header_size);
throw std::logic_error(fmt::format(
"Unable to open ledger file {}: {}", file_path, strerror(errno)));
}
total_len = pos;
if (len != 0)
throw std::logic_error("Malformed ledger file");
// Header reserved for the offset to the position table
fseeko(file, sizeof(positions_offset_header_t), SEEK_SET);
total_len = sizeof(positions_offset_header_t);
}
Ledger(const Ledger& that) = delete;
// Used when recovering an existing ledger file
LedgerFile(const std::string& dir, const std::string& file_name) :
dir(dir),
file(NULL)
{
auto full_path = (fs::path(dir) / fs::path(file_name));
file = fopen(full_path.c_str(), "r+b");
if (!file)
{
throw std::logic_error(fmt::format(
"Unable to open ledger file {}: {}", full_path, strerror(errno)));
}
~Ledger()
committed = is_ledger_file_committed(file_name);
start_idx = get_start_idx_from_file_name(file_name);
// First, get full size of file
fseeko(file, 0, SEEK_END);
size_t total_file_size = ftello(file);
// Second, read offset to header table
fseeko(file, 0, SEEK_SET);
positions_offset_header_t table_offset;
if (fread(&table_offset, sizeof(positions_offset_header_t), 1, file) != 1)
{
throw std::logic_error(fmt::format(
"Failed to read positions offset from ledger file {}", full_path));
}
if (table_offset != 0)
{
// If the chunk was completed, read positions table from file directly
total_len = table_offset;
fseeko(file, table_offset, SEEK_SET);
positions.resize(
(total_file_size - table_offset) / sizeof(positions.at(0)));
if (
fread(
positions.data(),
sizeof(positions.at(0)),
positions.size(),
file) != positions.size())
{
throw std::logic_error(fmt::format(
"Failed to read positions table from ledger file {}", full_path));
}
completed = true;
}
else
{
// If the chunk was not completed, read all entries to reconstruct
// positions table
total_len = total_file_size;
auto len = total_len - sizeof(positions_offset_header_t);
size_t pos = sizeof(positions_offset_header_t);
uint32_t entry_size = 0;
while (len >= frame_header_size)
{
if (fread(&entry_size, frame_header_size, 1, file) != 1)
{
throw std::logic_error(fmt::format(
"Failed to read frame from ledger file {}", full_path));
}
len -= frame_header_size;
if (len < entry_size)
{
throw std::logic_error(
fmt::format("Malformed ledger file {}", full_path));
}
fseeko(file, entry_size, SEEK_CUR);
len -= entry_size;
positions.push_back(pos);
pos += (entry_size + frame_header_size);
}
completed = false;
}
}
~LedgerFile()
{
if (file)
{
fflush(file);
fclose(file);
}
}
size_t get_last_idx()
std::string get_file_name() const
{
return positions.size();
int fd = fileno(file);
auto path = fmt::format("/proc/self/fd/{}", fd);
char result[PATH_MAX];
::memset(result, 0, sizeof(result));
if (readlink(path.c_str(), result, sizeof(result) - 1) < 0)
{
throw std::logic_error("Could not read ledger file name");
}
return fs::path(result).filename();
}
const std::vector<uint8_t> read_entry(size_t idx)
size_t get_start_idx() const
{
if ((idx == 0) || (idx > positions.size()))
return {};
return start_idx;
}
size_t get_last_idx() const
{
return start_idx + positions.size() - 1;
}
size_t get_current_size() const
{
return total_len;
}
bool is_committed() const
{
return committed;
}
bool is_complete() const
{
return completed;
}
size_t write_entry(const uint8_t* data, size_t size, bool committable)
{
fseeko(file, total_len, SEEK_SET);
positions.push_back(total_len);
size_t new_idx = get_last_idx();
uint32_t frame = (uint32_t)size;
if (fwrite(&frame, frame_header_size, 1, file) != 1)
{
throw std::logic_error("Failed to write entry header to ledger");
}
if (fwrite(data, size, 1, file) != 1)
{
throw std::logic_error("Failed to write entry to ledger");
}
// Committable entries get flushed straight away
if (committable && fflush(file) != 0)
{
throw std::logic_error(
fmt::format("Failed to flush entry to ledger: {}", strerror(errno)));
}
total_len += (size + frame_header_size);
return new_idx;
}
size_t framed_entries_size(size_t from, size_t to) const
{
if ((from < start_idx) || (to < from) || (to > get_last_idx()))
{
return 0;
}
if (to == get_last_idx())
{
return total_len - positions.at(from - start_idx);
}
else
{
return positions.at(to - start_idx + 1) -
positions.at(from - start_idx);
}
}
size_t entry_size(size_t idx) const
{
auto framed_size = framed_entries_size(idx, idx);
return (framed_size != 0) ? framed_size - frame_header_size : 0;
}
std::optional<std::vector<uint8_t>> read_entry(size_t idx) const
{
if ((idx < start_idx) || (idx > get_last_idx()))
{
return std::nullopt;
}
auto len = entry_size(idx);
std::vector<uint8_t> entry(len);
fseeko(file, positions.at(idx - 1) + frame_header_size, SEEK_SET);
fseeko(file, positions.at(idx - start_idx) + frame_header_size, SEEK_SET);
if (fread(entry.data(), len, 1, file) != 1)
throw std::logic_error("Failed to read from file");
{
throw std::logic_error(
fmt::format("Failed to read entry {} from file", idx));
}
return entry;
}
const std::vector<uint8_t> read_framed_entries(size_t from, size_t to)
std::optional<std::vector<uint8_t>> read_framed_entries(
size_t from, size_t to) const
{
if ((from < start_idx) || (to > get_last_idx()) || (to < from))
{
LOG_FAIL_FMT("Unknown entries range: {} - {}", from, to);
return std::nullopt;
}
auto framed_size = framed_entries_size(from, to);
std::vector<uint8_t> framed_entries(framed_size);
if (framed_size == 0)
return framed_entries;
fseeko(file, positions.at(from - 1), SEEK_SET);
fseeko(file, positions.at(from - start_idx), SEEK_SET);
if (fread(framed_entries.data(), framed_size, 1, file) != 1)
throw std::logic_error("Failed to read from file");
{
throw std::logic_error(fmt::format(
"Failed to read entry range {} - {} from file", from, to));
}
return framed_entries;
}
size_t framed_entries_size(size_t from, size_t to)
bool truncate(size_t idx)
{
if ((from == 0) || (to < from) || (to > positions.size()))
return 0;
if (to == positions.size())
if (committed || (idx < start_idx - 1) || (idx >= get_last_idx()))
{
return total_len - positions.at(from - 1);
return false;
}
else
if (idx == start_idx - 1)
{
return positions.at(to) - positions.at(from - 1);
// Truncating everything triggers file deletion
if (!fs::remove(fs::path(dir) / fs::path(get_file_name())))
{
throw std::logic_error(
fmt::format("Could not remove file {}", get_file_name()));
}
return true;
}
}
size_t entry_size(size_t idx)
{
auto framed_size = framed_entries_size(idx, idx);
// Reset positions offset header
fseeko(file, 0, SEEK_SET);
positions_offset_header_t table_offset = 0;
if (fwrite(&table_offset, sizeof(table_offset), 1, file) != 1)
{
throw std::logic_error("Failed to reset positions table offset");
}
return framed_size ? framed_size - frame_header_size : 0;
}
void write_entry(const uint8_t* data, size_t size)
{
fseeko(file, total_len, SEEK_SET);
positions.push_back(total_len);
LOG_DEBUG_FMT("Ledger write {}: {} bytes", positions.size(), size);
total_len += (size + frame_header_size);
uint32_t frame = (uint32_t)size;
if (fwrite(&frame, frame_header_size, 1, file) != 1)
throw std::logic_error("Failed to write to file");
if (fwrite(data, size, 1, file) != 1)
throw std::logic_error("Failed to write to file");
}
void truncate(size_t last_idx)
{
LOG_DEBUG_FMT("Ledger truncate: {}/{}", last_idx, positions.size());
// positions[last_idx - 1] is the position of the specified
// final index. Truncate the ledger at position[last_idx].
if (last_idx >= positions.size())
return;
total_len = positions.at(last_idx);
positions.resize(last_idx);
completed = false;
total_len = positions.at(idx - start_idx + 1);
positions.resize(idx - start_idx + 1);
if (fflush(file) != 0)
{
std::stringstream ss;
ss << "Failed to flush file: " << strerror(errno);
throw std::logic_error(ss.str());
throw std::logic_error(
fmt::format("Failed to flush ledger file: {}", strerror(errno)));
}
if (ftruncate(fileno(file), total_len))
throw std::logic_error("Failed to truncate file");
{
throw std::logic_error(
fmt::format("Failed to truncate ledger: {}", strerror(errno)));
}
fseeko(file, total_len, SEEK_SET);
return false;
}
void complete()
{
if (completed)
{
return;
}
fseeko(file, total_len, SEEK_SET);
size_t table_offset = ftello(file);
if (
fwrite(
reinterpret_cast<uint8_t*>(positions.data()),
sizeof(positions.at(0)),
positions.size(),
file) != positions.size())
{
throw std::logic_error("Failed to write positions table to ledger");
}
// Write positions table offset at start of file
if (fseeko(file, 0, SEEK_SET) != 0)
{
throw std::logic_error("Failed to set file offset to 0");
}
if (fwrite(&table_offset, sizeof(table_offset), 1, file) != 1)
{
throw std::logic_error("Failed to write positions table to ledger");
}
if (fflush(file) != 0)
{
throw std::logic_error(
fmt::format("Failed to flush ledger file: {}", strerror(errno)));
}
completed = true;
}
bool commit(size_t idx)
{
if (!completed || committed || (idx != get_last_idx()))
{
// No effect if commit idx is not last idx
return false;
}
if (fflush(file) != 0)
{
throw std::logic_error(
fmt::format("Failed to flush ledger file: {}", strerror(errno)));
}
const auto committed_file_name = fmt::format(
"{}_{}-{}.{}",
file_name_prefix,
start_idx,
get_last_idx(),
ledger_committed_suffix);
fs::rename(
fs::path(dir) / fs::path(get_file_name()),
fs::path(dir) / fs::path(committed_file_name));
committed = true;
return true;
}
};
class Ledger
{
private:
ringbuffer::WriterPtr to_enclave;
// Ledger directory
const std::string ledger_dir;
// Keep tracks of all ledger files for writing.
// Current ledger file is always the last one
std::list<std::shared_ptr<LedgerFile>> files;
// Cache of ledger files for reading
size_t max_read_cache_files;
std::list<std::shared_ptr<LedgerFile>> files_read_cache;
const size_t chunk_threshold;
size_t last_idx = 0;
size_t committed_idx = 0;
// True if a new file should be created when writing an entry
bool require_new_file;
auto get_it_contains_idx(size_t idx) const
{
if (idx == 0)
{
return files.end();
}
auto f = std::upper_bound(
files.begin(),
files.end(),
idx,
[](size_t idx, const std::shared_ptr<LedgerFile>& f) {
return (idx <= f->get_last_idx());
});
return f;
}
std::shared_ptr<LedgerFile> get_file_from_cache(size_t idx)
{
if (idx == 0)
{
return nullptr;
}
// First, try to find file from read cache
for (auto const& f : files_read_cache)
{
if (f->get_start_idx() <= idx && idx <= f->get_last_idx())
{
return f;
}
}
// If the file is not in the cache, find the file from the ledger
// directory
std::optional<std::string> match = std::nullopt;
for (auto const& f : fs::directory_iterator(ledger_dir))
{
// If any file, based on its name, contains idx. Only committed files
// (i.e. those with a last idx) are considered here.
auto last_idx = get_last_idx_from_file_name(f.path().filename());
if (
last_idx.has_value() && idx <= last_idx.value() &&
idx >= get_start_idx_from_file_name(f.path().filename()))
{
match = f.path().filename();
break;
}
}
if (!match.has_value())
{
return nullptr;
}
// Emplace file in the max-sized read cache
auto match_file = std::make_shared<LedgerFile>(ledger_dir, match.value());
if (files_read_cache.size() >= max_read_cache_files)
{
files_read_cache.erase(files_read_cache.begin());
}
files_read_cache.emplace_back(match_file);
return match_file;
}
std::shared_ptr<LedgerFile> get_file_from_idx(size_t idx)
{
if (idx == 0)
{
return nullptr;
}
// First, check if the file is in the list of files open for writing
auto f = std::upper_bound(
files.rbegin(),
files.rend(),
idx,
[](size_t idx, const std::shared_ptr<LedgerFile>& f) {
return idx >= f->get_start_idx();
});
if (f != files.rend())
{
return *f;
}
// Otherwise, return file from read cache
return get_file_from_cache(idx);
}
std::shared_ptr<LedgerFile> get_latest_file() const
{
if (files.empty())
{
return nullptr;
}
return files.back();
}
public:
Ledger(
const std::string& ledger_dir,
ringbuffer::AbstractWriterFactory& writer_factory,
size_t chunk_threshold,
size_t max_read_cache_files = max_read_cache_files_default) :
ledger_dir(ledger_dir),
chunk_threshold(chunk_threshold),
to_enclave(writer_factory.create_writer_to_inside()),
max_read_cache_files(max_read_cache_files)
{
if (chunk_threshold == 0 || chunk_threshold > max_chunk_threshold_size)
{
throw std::logic_error(fmt::format(
"Error: Ledger chunk threshold should be between 1-{}",
max_chunk_threshold_size));
}
if (fs::is_directory(ledger_dir))
{
// If the ledger directory exists, recover ledger files from it
for (auto const& f : fs::directory_iterator(ledger_dir))
{
files.push_back(
std::make_shared<LedgerFile>(ledger_dir, f.path().filename()));
}
files.sort([](
const std::shared_ptr<LedgerFile>& a,
const std::shared_ptr<LedgerFile>& b) {
return a->get_last_idx() < b->get_last_idx();
});
last_idx = get_latest_file()->get_last_idx();
for (auto f = files.begin(); f != files.end();)
{
if ((*f)->is_committed())
{
committed_idx = (*f)->get_last_idx();
auto f_ = f;
f++;
files.erase(f_);
}
else
{
f++;
}
}
// Continue writing at the end of last file only if that file is not
// complete
if (files.size() > 0 && !files.back()->is_complete())
{
require_new_file = false;
}
else
{
require_new_file = true;
}
}
else
{
if (!fs::create_directory(ledger_dir))
{
throw std::logic_error(fmt::format(
"Error: Could not create ledger directory: {}", ledger_dir));
}
require_new_file = true;
}
}
Ledger(const Ledger& that) = delete;
std::optional<std::vector<uint8_t>> read_entry(size_t idx)
{
auto f = get_file_from_idx(idx);
if (f == nullptr)
{
return std::nullopt;
}
return f->read_entry(idx);
}
std::optional<std::vector<uint8_t>> read_framed_entries(
size_t from, size_t to)
{
if ((from <= 0) || (to > last_idx) || (to < from))
{
return std::nullopt;
}
std::vector<uint8_t> entries;
size_t idx = from;
while (idx <= to)
{
auto f_from = get_file_from_idx(idx);
if (f_from == nullptr)
{
return std::nullopt;
}
auto to_ = std::min(f_from->get_last_idx(), to);
auto v = f_from->read_framed_entries(idx, to_);
if (!v.has_value())
{
return std::nullopt;
}
entries.insert(
entries.end(),
std::make_move_iterator(v->begin()),
std::make_move_iterator(v->end()));
idx = to_ + 1;
}
return entries;
}
size_t write_entry(const uint8_t* data, size_t size, bool committable)
{
if (require_new_file)
{
files.push_back(std::make_shared<LedgerFile>(ledger_dir, last_idx + 1));
require_new_file = false;
}
auto f = get_latest_file();
last_idx = f->write_entry(data, size, committable);
LOG_DEBUG_FMT(
"Wrote entry at {} [committable: {}]", last_idx, committable);
if (committable && f->get_current_size() >= chunk_threshold)
{
f->complete();
require_new_file = true;
LOG_TRACE_FMT("New ledger chunk will start at {}", last_idx + 1);
}
return last_idx;
}
void truncate(size_t idx)
{
LOG_DEBUG_FMT("Ledger truncate: {}/{}", idx, last_idx);
if (idx >= last_idx || idx < committed_idx)
{
return;
}
require_new_file = true;
auto f_from = get_it_contains_idx(idx + 1);
auto f_to = get_it_contains_idx(last_idx);
auto f_end = std::next(f_to);
for (auto it = f_from; it != f_end;)
{
// Truncate the first file to the truncation index while the more recent
// files are deleted entirely
auto truncate_idx = (it == f_from) ? idx : (*it)->get_start_idx() - 1;
if ((*it)->truncate(truncate_idx))
{
auto it_ = it;
it++;
files.erase(it_);
}
else
{
// A new file will not be required on the next written entry if the
// file is _not_ deleted entirely
require_new_file = false;
it++;
}
}
last_idx = idx;
}
void commit(size_t idx)
{
LOG_DEBUG_FMT("Ledger commit: {}/{}", idx, last_idx);
if (idx <= committed_idx)
{
return;
}
auto f_from = (committed_idx == 0) ? get_it_contains_idx(1) :
get_it_contains_idx(committed_idx);
auto f_to = get_it_contains_idx(idx);
auto f_end = std::next(f_to);
for (auto it = f_from; it != f_end;)
{
// Commit all previous file to their latest index while the latest
// file is committed to the committed index
auto commit_idx = (it == f_to) ? idx : (*it)->get_last_idx();
if (
(*it)->commit(commit_idx) &&
(it != f_to || (idx == (*it)->get_last_idx())))
{
auto it_ = it;
it++;
files.erase(it_);
}
else
{
it++;
}
}
committed_idx = idx;
}
void register_message_handlers(
@ -196,7 +792,10 @@ namespace asynchost
DISPATCHER_SET_MESSAGE_HANDLER(
disp,
consensus::ledger_append,
[this](const uint8_t* data, size_t size) { write_entry(data, size); });
[this](const uint8_t* data, size_t size) {
auto committable = serialized::read<bool>(data, size);
write_entry(data, size, committable);
});
DISPATCHER_SET_MESSAGE_HANDLER(
disp,
@ -207,17 +806,24 @@ namespace asynchost
});
DISPATCHER_SET_MESSAGE_HANDLER(
disp, consensus::ledger_get, [this](const uint8_t* data, size_t size) {
// The enclave has asked for a ledger entry.
disp,
consensus::ledger_commit,
[this](const uint8_t* data, size_t size) {
auto idx = serialized::read<consensus::Index>(data, size);
commit(idx);
});
DISPATCHER_SET_MESSAGE_HANDLER(
disp, consensus::ledger_get, [&](const uint8_t* data, size_t size) {
auto [idx, purpose] =
ringbuffer::read_message<consensus::ledger_get>(data, size);
auto& entry = read_entry(idx);
auto entry = read_entry(idx);
if (entry.size() > 0)
if (entry.has_value())
{
RINGBUFFER_WRITE_MESSAGE(
consensus::ledger_entry, to_enclave, idx, purpose, entry);
consensus::ledger_entry, to_enclave, idx, purpose, entry.value());
}
else
{

Просмотреть файл

@ -117,10 +117,19 @@ int main(int argc, char** argv)
"Address to advertise publicly to clients (defaults to same as "
"--rpc-address)");
std::string ledger_file("ccf.ledger");
app.add_option("--ledger-file", ledger_file, "Ledger file")
std::string ledger_dir("ledger");
app.add_option("--ledger-dir", ledger_dir, "Ledger directory")
->capture_default_str();
size_t ledger_chunk_threshold = 5'000'000;
app
.add_option(
"--ledger-chunk-max-bytes",
ledger_chunk_threshold,
"Minimum size (bytes) at which a new ledger chunk is created.")
->capture_default_str()
->transform(CLI::AsSizeValue(true)); // 1000 is kb
logger::Level host_log_level{logger::Level::INFO};
std::vector<std::pair<std::string, logger::Level>> level_map;
for (int i = logger::TRACE; i < logger::MAX_LOG_LEVEL; i++)
@ -247,8 +256,7 @@ int main(int argc, char** argv)
max_fragment_size,
"Determines maximum size of individual ringbuffer message fragments. "
"Messages larger than this will be split into multiple fragments. Value "
"is "
"used as a shift factor, ie - given N, the limit is (1 << N)")
"is used as a shift factor, ie - given N, the limit is (1 << N)")
->capture_default_str();
size_t tick_period_ms = 10;
@ -402,15 +410,15 @@ int main(int argc, char** argv)
rpc_address.hostname));
}
if ((*start || *join) && files::exists(ledger_file))
if ((*start || *join) && files::exists(ledger_dir))
{
throw std::logic_error(fmt::format(
"On start/join, ledger file should not exist ({})", ledger_file));
"On start/join, ledger directory should not exist ({})", ledger_dir));
}
else if (*recover && !files::exists(ledger_file))
else if (*recover && !files::exists(ledger_dir))
{
throw std::logic_error(fmt::format(
"On recovery, ledger file should exist ({}) ", ledger_file));
"On recovery, ledger directory should exist ({}) ", ledger_dir));
}
if (*start)
@ -503,7 +511,7 @@ int main(int argc, char** argv)
asynchost::Sigterm sigterm(writer_factory);
// write to a ledger
asynchost::Ledger ledger(ledger_file, writer_factory);
asynchost::Ledger ledger(ledger_dir, writer_factory, ledger_chunk_threshold);
ledger.register_message_handlers(bp.get_dispatcher());
// Begin listening for node-to-node and RPC messages.

Просмотреть файл

@ -247,20 +247,33 @@ namespace asynchost
serialized::overlay<consensus::AppendEntriesIndex>(p, psize);
// Find the total frame size, and write it along with the header.
auto count = ae.idx - ae.prev_idx;
uint32_t frame = (uint32_t)(
size_to_send +
ledger.framed_entries_size(ae.prev_idx + 1, ae.idx));
uint32_t frame = (uint32_t)size_to_send;
std::optional<std::vector<uint8_t>> framed_entries = std::nullopt;
framed_entries =
ledger.read_framed_entries(ae.prev_idx + 1, ae.idx);
if (framed_entries.has_value())
{
frame += (uint32_t)framed_entries->size();
node.value()->write(sizeof(uint32_t), (uint8_t*)&frame);
node.value()->write(size_to_send, data_to_send);
frame = (uint32_t)framed_entries->size();
node.value()->write(frame, framed_entries->data());
}
else
{
// Header-only AE
node.value()->write(sizeof(uint32_t), (uint8_t*)&frame);
node.value()->write(size_to_send, data_to_send);
}
LOG_DEBUG_FMT(
"send AE to {} [{}]: {}, {}", to, frame, ae.idx, ae.prev_idx);
node.value()->write(sizeof(uint32_t), (uint8_t*)&frame);
node.value()->write(size_to_send, data_to_send);
auto framed_entries =
ledger.read_framed_entries(ae.prev_idx + 1, ae.idx);
frame = (uint32_t)framed_entries.size();
node.value()->write(frame, framed_entries.data());
"send AE to node {} [{}]: {}, {}",
to,
frame,
ae.idx,
ae.prev_idx);
}
else
{

Просмотреть файл

@ -3,59 +3,689 @@
#define DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN
#include "../ledger.h"
#include "../ds/serialized.h"
#include <doctest/doctest.h>
#include <string>
TEST_CASE("Read/Write test")
{
ringbuffer::Circuit eio(1024);
auto wf = ringbuffer::WriterFactory(eio);
// Used throughout
using frame_header_type = uint32_t;
static constexpr size_t frame_header_size = sizeof(frame_header_type);
static constexpr auto ledger_dir = "ledger_dir";
ringbuffer::Circuit eio(1024);
auto wf = ringbuffer::WriterFactory(eio);
const std::vector<uint8_t> e1 = {1, 2, 3};
const std::vector<uint8_t> e2 = {5, 5, 6, 7};
// Ledger entry type
template <typename T>
struct LedgerEntry
{
T value_ = 0;
uint8_t* data()
{
asynchost::Ledger l("testlog", wf);
l.truncate(0);
REQUIRE(l.get_last_idx() == 0);
l.write_entry(e1.data(), e1.size());
l.write_entry(e2.data(), e2.size());
return reinterpret_cast<uint8_t*>(&value_);
}
asynchost::Ledger l("testlog", wf);
REQUIRE(l.get_last_idx() == 2);
auto r1 = l.read_entry(1);
REQUIRE(e1 == r1);
auto r2 = l.read_entry(2);
REQUIRE(e2 == r2);
auto value() const
{
return value_;
}
auto set_value(T v)
{
value_ = v;
}
LedgerEntry() = default;
LedgerEntry(T v) : value_(v) {}
LedgerEntry(const std::vector<uint8_t>& raw)
{
const uint8_t* data = raw.data();
size_t size = raw.size();
value_ = serialized::read<T>(data, size);
}
};
using TestLedgerEntry = LedgerEntry<uint32_t>;
size_t number_of_files_in_ledger_dir()
{
size_t file_count = 0;
for (auto const& f : fs::directory_iterator(ledger_dir))
{
file_count++;
}
return file_count;
}
TEST_CASE("Entry sizes")
size_t number_of_committed_files_in_ledger_dir()
{
ringbuffer::Circuit eio(2);
auto wf = ringbuffer::WriterFactory(eio);
size_t committed_file_count = 0;
for (auto const& f : fs::directory_iterator(ledger_dir))
{
if (asynchost::is_ledger_file_committed(f.path().string()))
{
committed_file_count++;
}
}
const std::vector<uint8_t> e1 = {1, 2, 3};
const std::vector<uint8_t> e2 = {5, 5, 6, 7};
return committed_file_count;
}
asynchost::Ledger l("testlog", wf);
l.truncate(0);
REQUIRE(l.get_last_idx() == 0);
l.write_entry(e1.data(), e1.size());
l.write_entry(e2.data(), e2.size());
void verify_framed_entries_range(
const std::vector<uint8_t>& framed_entries, size_t from, size_t to)
{
size_t idx = from;
for (int i = 0; i < framed_entries.size();)
{
const uint8_t* data = &framed_entries[i];
size_t size = framed_entries.size() - i;
REQUIRE(l.entry_size(1) == e1.size());
REQUIRE(l.entry_size(2) == e2.size());
REQUIRE(l.entry_size(0) == 0);
REQUIRE(l.entry_size(3) == 0);
auto frame = serialized::read<frame_header_type>(data, size);
auto entry = serialized::read(data, size, frame);
REQUIRE(TestLedgerEntry(entry).value() == idx);
i += frame_header_size + frame;
idx++;
}
REQUIRE(l.framed_entries_size(1, 1) == (e1.size() + sizeof(uint32_t)));
REQUIRE(
l.framed_entries_size(1, 2) ==
(e1.size() + sizeof(uint32_t) + e2.size() + sizeof(uint32_t)));
REQUIRE(idx == to + 1);
}
/*
auto e = l.read_framed_entries(1, 1);
for (auto c : e)
std::cout << std::hex << (int)c;
std::cout << std::endl;*/
void read_entry_from_ledger(asynchost::Ledger& ledger, size_t idx)
{
REQUIRE(TestLedgerEntry(ledger.read_entry(idx).value()).value() == idx);
}
void read_entries_range_from_ledger(
asynchost::Ledger& ledger, size_t from, size_t to)
{
verify_framed_entries_range(
ledger.read_framed_entries(from, to).value(), from, to);
}
// Keeps track of ledger entries written to the ledger.
// An entry submitted at index i has for value i so that it is easy to verify
// that the ledger entry read from the ledger at a specific index is right.
class TestEntrySubmitter
{
private:
asynchost::Ledger& ledger;
size_t last_idx;
public:
TestEntrySubmitter(asynchost::Ledger& ledger, size_t initial_last_idx = 0) :
ledger(ledger),
last_idx(initial_last_idx)
{}
size_t get_last_idx()
{
return last_idx;
}
void write(bool is_committable)
{
auto e = TestLedgerEntry(++last_idx);
REQUIRE(
ledger.write_entry(e.data(), sizeof(TestLedgerEntry), is_committable) ==
last_idx);
}
void truncate(size_t idx)
{
ledger.truncate(idx);
// Check that we can read until truncated entry but cannot read after it
if (idx > 0)
{
read_entries_range_from_ledger(ledger, 1, idx);
}
REQUIRE_FALSE(ledger.read_framed_entries(1, idx + 1).has_value());
if (idx < last_idx)
{
last_idx = idx;
}
}
};
size_t get_entries_per_chunk(size_t chunk_threshold)
{
// The number of entries per chunk is a function of the threshold (minus the
// size of the fixes space for the offset at the size of each file) and the
// size of each _framed_ entry
return ceil(
(static_cast<float>(chunk_threshold - sizeof(size_t))) /
(frame_header_size + sizeof(TestLedgerEntry)));
}
// Assumes that no entries have been written yet
size_t initialise_ledger(
TestEntrySubmitter& entry_submitter,
size_t chunk_threshold,
size_t chunk_count)
{
size_t end_of_first_chunk_idx = 0;
bool is_committable = true;
size_t entries_per_chunk = get_entries_per_chunk(chunk_threshold);
for (int i = 0; i < entries_per_chunk * chunk_count; i++)
{
entry_submitter.write(is_committable);
}
REQUIRE(number_of_files_in_ledger_dir() == chunk_count);
return entries_per_chunk;
}
TEST_CASE("Regular chunking")
{
fs::remove_all(ledger_dir);
INFO("Cannot create a ledger with a chunk threshold of 0");
{
size_t chunk_threshold = 0;
REQUIRE_THROWS(asynchost::Ledger(ledger_dir, wf, chunk_threshold));
}
size_t chunk_threshold = 30;
size_t entries_per_chunk = get_entries_per_chunk(chunk_threshold);
asynchost::Ledger ledger(ledger_dir, wf, chunk_threshold);
TestEntrySubmitter entry_submitter(ledger);
size_t end_of_first_chunk_idx = 0;
bool is_committable = true;
INFO("Not quite enough entries before chunk threshold");
{
is_committable = true;
for (int i = 0; i < entries_per_chunk - 1; i++)
{
entry_submitter.write(is_committable);
}
// Writing committable entries without reaching the chunk threshold
// does not create new ledger files
REQUIRE(number_of_files_in_ledger_dir() == 1);
}
INFO("Additional non-committable entries do not trigger chunking");
{
is_committable = false;
entry_submitter.write(is_committable);
entry_submitter.write(is_committable);
REQUIRE(number_of_files_in_ledger_dir() == 1);
}
INFO("Additional committable entry triggers chunking");
{
is_committable = true;
entry_submitter.write(is_committable);
REQUIRE(number_of_files_in_ledger_dir() == 1);
// Threshold is passed, a new ledger file should be created
entry_submitter.write(false);
end_of_first_chunk_idx = entry_submitter.get_last_idx() - 1;
REQUIRE(number_of_files_in_ledger_dir() == 2);
}
INFO(
"Submitting more committable entries trigger chunking at regular interval");
{
size_t chunk_count = 10;
size_t number_of_files_before = number_of_files_in_ledger_dir();
for (int i = 0; i < entries_per_chunk * chunk_count; i++)
{
entry_submitter.write(is_committable);
}
REQUIRE(
number_of_files_in_ledger_dir() == chunk_count + number_of_files_before);
}
INFO("Reading entries across all chunks");
{
is_committable = false;
entry_submitter.write(is_committable);
auto last_idx = entry_submitter.get_last_idx();
// Reading the last entry succeeds
read_entry_from_ledger(ledger, last_idx);
// Reading in the future fails
REQUIRE_FALSE(ledger.read_entry(last_idx + 1).has_value());
// Reading at 0 fails
REQUIRE_FALSE(ledger.read_entry(0).has_value());
// Reading in the past succeeds
read_entry_from_ledger(ledger, 1);
read_entry_from_ledger(ledger, end_of_first_chunk_idx);
read_entry_from_ledger(ledger, end_of_first_chunk_idx + 1);
read_entry_from_ledger(ledger, last_idx);
}
INFO("Reading range of entries across all chunks");
{
// Note: only testing write cache as no chunk has yet been committed
auto last_idx = entry_submitter.get_last_idx();
// Reading from 0 fails
REQUIRE_FALSE(
ledger.read_framed_entries(0, end_of_first_chunk_idx).has_value());
// Reading in the future fails
REQUIRE_FALSE(ledger.read_framed_entries(1, last_idx + 1).has_value());
REQUIRE_FALSE(
ledger.read_framed_entries(last_idx, last_idx + 1).has_value());
// Reading from the start to any valid index succeeds
read_entries_range_from_ledger(ledger, 1, 1);
read_entries_range_from_ledger(
ledger, end_of_first_chunk_idx - 1, end_of_first_chunk_idx);
read_entries_range_from_ledger(ledger, 1, end_of_first_chunk_idx);
read_entries_range_from_ledger(ledger, 1, end_of_first_chunk_idx + 1);
read_entries_range_from_ledger(ledger, 1, last_idx - 1);
read_entries_range_from_ledger(ledger, 1, last_idx);
// Reading from just before/after a chunk succeeds
read_entries_range_from_ledger(
ledger, end_of_first_chunk_idx, end_of_first_chunk_idx + 1);
read_entries_range_from_ledger(
ledger, end_of_first_chunk_idx, last_idx - 1);
read_entries_range_from_ledger(ledger, end_of_first_chunk_idx, last_idx);
read_entries_range_from_ledger(
ledger, end_of_first_chunk_idx + 1, last_idx);
read_entries_range_from_ledger(
ledger, end_of_first_chunk_idx + 1, last_idx - 1);
}
}
TEST_CASE("Truncation")
{
fs::remove_all(ledger_dir);
size_t chunk_threshold = 30;
asynchost::Ledger ledger(ledger_dir, wf, chunk_threshold);
TestEntrySubmitter entry_submitter(ledger);
size_t chunk_count = 3;
size_t end_of_first_chunk_idx =
initialise_ledger(entry_submitter, chunk_threshold, chunk_count);
// Write another entry to create a new chunk
entry_submitter.write(true);
size_t chunks_so_far = number_of_files_in_ledger_dir();
auto last_idx = entry_submitter.get_last_idx();
INFO("Truncating latest index has no effect");
{
entry_submitter.truncate(last_idx);
REQUIRE(number_of_files_in_ledger_dir() == chunks_so_far);
}
INFO("Truncating last entry in penultimate chunk closes latest file");
{
entry_submitter.truncate(last_idx - 1);
REQUIRE(number_of_files_in_ledger_dir() == chunks_so_far - 1);
// New file gets open when one more entry gets submitted
entry_submitter.write(true);
REQUIRE(number_of_files_in_ledger_dir() == chunks_so_far);
entry_submitter.write(true);
REQUIRE(number_of_files_in_ledger_dir() == chunks_so_far);
}
INFO("Truncating any entry in penultimate chunk closes latest file");
{
entry_submitter.truncate(last_idx - 2);
REQUIRE(number_of_files_in_ledger_dir() == chunks_so_far - 1);
// New file gets opened when two more entries are submitted
entry_submitter.write(true);
REQUIRE(number_of_files_in_ledger_dir() == chunks_so_far - 1);
entry_submitter.write(true);
REQUIRE(number_of_files_in_ledger_dir() == chunks_so_far);
}
INFO("Truncating entry at the start of second chunk");
{
entry_submitter.truncate(end_of_first_chunk_idx + 1);
REQUIRE(number_of_files_in_ledger_dir() == 2);
}
INFO("Truncating entry at the end of first chunk");
{
entry_submitter.truncate(end_of_first_chunk_idx);
REQUIRE(number_of_files_in_ledger_dir() == 1);
entry_submitter.write(true);
}
INFO("Truncating very first entry");
{
entry_submitter.truncate(1);
REQUIRE(number_of_files_in_ledger_dir() == 1);
}
INFO("Truncating all the things");
{
entry_submitter.truncate(0);
REQUIRE(number_of_files_in_ledger_dir() == 0);
entry_submitter.write(true);
}
}
TEST_CASE("Commit")
{
fs::remove_all(ledger_dir);
size_t chunk_threshold = 30;
asynchost::Ledger ledger(ledger_dir, wf, chunk_threshold);
TestEntrySubmitter entry_submitter(ledger);
size_t chunk_count = 3;
size_t end_of_first_chunk_idx =
initialise_ledger(entry_submitter, chunk_threshold, chunk_count);
entry_submitter.write(true);
size_t last_idx = entry_submitter.get_last_idx();
REQUIRE(number_of_committed_files_in_ledger_dir() == 0);
INFO("Comitting end of first chunk");
{
ledger.commit(end_of_first_chunk_idx);
REQUIRE(number_of_committed_files_in_ledger_dir() == 1);
read_entries_range_from_ledger(ledger, 1, end_of_first_chunk_idx + 1);
}
INFO("Comitting in the middle on complete chunk");
{
ledger.commit(end_of_first_chunk_idx + 1);
REQUIRE(number_of_committed_files_in_ledger_dir() == 1); // No effect
ledger.commit(2 * end_of_first_chunk_idx - 1); // No effect
REQUIRE(number_of_committed_files_in_ledger_dir() == 1);
}
INFO("Comitting at the end of a complete chunk");
{
ledger.commit(2 * end_of_first_chunk_idx);
REQUIRE(number_of_committed_files_in_ledger_dir() == 2);
read_entries_range_from_ledger(ledger, 1, 2 * end_of_first_chunk_idx + 1);
}
INFO("Comitting at the end of last complete chunk");
{
ledger.commit(last_idx - 1);
REQUIRE(number_of_committed_files_in_ledger_dir() == 3);
read_entries_range_from_ledger(ledger, 1, last_idx);
}
INFO("Comitting incomplete chunk");
{
ledger.commit(last_idx); // No effect
REQUIRE(number_of_committed_files_in_ledger_dir() == 3);
}
INFO("Complete latest chunk and commit");
{
entry_submitter.write(true);
entry_submitter.write(true);
last_idx = entry_submitter.get_last_idx();
ledger.commit(last_idx);
REQUIRE(number_of_committed_files_in_ledger_dir() == 4);
read_entries_range_from_ledger(ledger, 1, last_idx);
}
INFO("Ledger cannot be truncated earlier than commit");
{
ledger.truncate(1); // No effect
read_entries_range_from_ledger(ledger, 1, last_idx);
ledger.truncate(2 * end_of_first_chunk_idx); // No effect
read_entries_range_from_ledger(ledger, 1, last_idx);
// Write and truncate a new entry past commit
entry_submitter.write(true);
last_idx = entry_submitter.get_last_idx();
ledger.truncate(last_idx - 1); // Deletes entry at last_idx
read_entries_range_from_ledger(ledger, 1, last_idx - 1);
REQUIRE_FALSE(ledger.read_framed_entries(1, last_idx).has_value());
}
}
TEST_CASE("Restore existing ledger")
{
fs::remove_all(ledger_dir);
size_t chunk_threshold = 30;
size_t last_idx = 0;
size_t end_of_first_chunk_idx = 0;
size_t chunk_count = 3;
size_t number_of_ledger_files = 0;
SUBCASE("Restoring uncommitted chunks")
{
INFO("Initialise first ledger with complete chunks");
{
asynchost::Ledger ledger(ledger_dir, wf, chunk_threshold);
TestEntrySubmitter entry_submitter(ledger);
end_of_first_chunk_idx =
initialise_ledger(entry_submitter, chunk_threshold, chunk_count);
number_of_ledger_files = number_of_files_in_ledger_dir();
last_idx = chunk_count * end_of_first_chunk_idx;
}
asynchost::Ledger ledger2(ledger_dir, wf, chunk_threshold);
read_entries_range_from_ledger(ledger2, 1, last_idx);
// Restored ledger can be written to
TestEntrySubmitter entry_submitter(ledger2, last_idx);
entry_submitter.write(true);
// On restore, we write a new file as all restored chunks were complete
REQUIRE(number_of_files_in_ledger_dir() == number_of_ledger_files + 1);
entry_submitter.write(true);
entry_submitter.write(true);
// Restored ledger can be truncated
entry_submitter.truncate(end_of_first_chunk_idx + 1);
entry_submitter.truncate(end_of_first_chunk_idx);
entry_submitter.truncate(1);
}
SUBCASE("Restoring truncated ledger")
{
INFO("Initialise first ledger with truncation");
{
asynchost::Ledger ledger(ledger_dir, wf, chunk_threshold);
TestEntrySubmitter entry_submitter(ledger);
end_of_first_chunk_idx =
initialise_ledger(entry_submitter, chunk_threshold, chunk_count);
entry_submitter.truncate(end_of_first_chunk_idx + 1);
last_idx = entry_submitter.get_last_idx();
number_of_ledger_files = number_of_files_in_ledger_dir();
}
asynchost::Ledger ledger2(ledger_dir, wf, chunk_threshold);
read_entries_range_from_ledger(ledger2, 1, last_idx);
TestEntrySubmitter entry_submitter(ledger2, last_idx);
entry_submitter.write(true);
// On restore, we write at the end of the last file is that file is not
// complete
REQUIRE(number_of_files_in_ledger_dir() == number_of_ledger_files);
}
SUBCASE("Restoring some committed chunks")
{
// This is the scenario on recovery
size_t committed_idx = 0;
INFO("Initialise first ledger with committed chunks");
{
asynchost::Ledger ledger(ledger_dir, wf, chunk_threshold);
TestEntrySubmitter entry_submitter(ledger);
end_of_first_chunk_idx =
initialise_ledger(entry_submitter, chunk_threshold, chunk_count);
committed_idx = 2 * end_of_first_chunk_idx + 1;
entry_submitter.write(true);
last_idx = entry_submitter.get_last_idx();
ledger.commit(committed_idx);
}
asynchost::Ledger ledger2(ledger_dir, wf, chunk_threshold);
read_entries_range_from_ledger(ledger2, 1, last_idx);
// Restored ledger cannot be truncated before last idx of last committed
// chunk
TestEntrySubmitter entry_submitter(ledger2, last_idx);
entry_submitter.truncate(committed_idx - 1); // Successful
ledger2.truncate(committed_idx - 2); // Unsuccessful
read_entries_range_from_ledger(ledger2, 1, end_of_first_chunk_idx);
}
SUBCASE("Restoring ledger with different chunking threshold")
{
INFO("Initialise first ledger with committed chunks");
{
asynchost::Ledger ledger(ledger_dir, wf, chunk_threshold);
TestEntrySubmitter entry_submitter(ledger);
end_of_first_chunk_idx =
initialise_ledger(entry_submitter, chunk_threshold, chunk_count);
entry_submitter.write(true);
last_idx = entry_submitter.get_last_idx();
}
INFO("Restore new ledger with twice the chunking threshold");
{
asynchost::Ledger ledger2(ledger_dir, wf, 2 * chunk_threshold);
read_entries_range_from_ledger(ledger2, 1, last_idx);
TestEntrySubmitter entry_submitter(ledger2, last_idx);
size_t orig_number_files = number_of_files_in_ledger_dir();
while (number_of_files_in_ledger_dir() == orig_number_files)
{
entry_submitter.write(true);
}
last_idx = entry_submitter.get_last_idx();
}
INFO("Restore new ledger with half the chunking threshold");
{
asynchost::Ledger ledger2(ledger_dir, wf, chunk_threshold / 2);
read_entries_range_from_ledger(ledger2, 1, last_idx);
TestEntrySubmitter entry_submitter(ledger2, last_idx);
size_t orig_number_files = number_of_files_in_ledger_dir();
while (number_of_files_in_ledger_dir() == orig_number_files)
{
entry_submitter.write(true);
}
}
}
}
size_t number_open_fd()
{
size_t fd_count = 0;
for (auto const& f : fs::directory_iterator("/proc/self/fd"))
{
fd_count++;
}
return fd_count;
}
TEST_CASE("Limit number of open files")
{
fs::remove_all(ledger_dir);
size_t chunk_threshold = 30;
size_t chunk_count = 5;
size_t max_read_cache_size = 2;
asynchost::Ledger ledger(
ledger_dir, wf, chunk_threshold, max_read_cache_size);
TestEntrySubmitter entry_submitter(ledger);
size_t initial_number_fd = number_open_fd();
size_t last_idx = 0;
size_t end_of_first_chunk_idx =
initialise_ledger(entry_submitter, chunk_threshold, chunk_count);
REQUIRE(number_open_fd() == initial_number_fd + chunk_count);
INFO("Writing a new chunk opens a new file");
{
entry_submitter.write(true);
last_idx = entry_submitter.get_last_idx();
REQUIRE(number_open_fd() == initial_number_fd + chunk_count + 1);
}
INFO("Commit closes files and reading committed chunks opens those");
{
ledger.commit(1); // No file committed
REQUIRE(number_open_fd() == initial_number_fd + chunk_count + 1);
ledger.commit(end_of_first_chunk_idx); // One file now committed
REQUIRE(number_open_fd() == initial_number_fd + chunk_count);
read_entry_from_ledger(ledger, 1);
read_entries_range_from_ledger(ledger, 1, end_of_first_chunk_idx);
// Committed file is open in read cache
REQUIRE(number_open_fd() == initial_number_fd + chunk_count + 1);
ledger.commit(2 * end_of_first_chunk_idx); // Two files now committed
REQUIRE(number_open_fd() == initial_number_fd + chunk_count);
read_entries_range_from_ledger(ledger, 1, 2 * end_of_first_chunk_idx);
// Two committed files open in read cache
REQUIRE(number_open_fd() == initial_number_fd + chunk_count + 1);
ledger.commit(last_idx); // All but one file committed
// One file open for write, two files open for read
REQUIRE(number_open_fd() == initial_number_fd + 3);
read_entries_range_from_ledger(ledger, 1, last_idx);
// Number of open files is capped by size of read cache
REQUIRE(number_open_fd() == initial_number_fd + 1 + max_read_cache_size);
// Reading out of order succeeds
read_entries_range_from_ledger(ledger, 1, end_of_first_chunk_idx);
read_entries_range_from_ledger(
ledger, 2 * end_of_first_chunk_idx, 3 * end_of_first_chunk_idx);
read_entries_range_from_ledger(ledger, 1, last_idx);
read_entries_range_from_ledger(
ledger, 3 * end_of_first_chunk_idx, last_idx - 1);
read_entries_range_from_ledger(ledger, 1, end_of_first_chunk_idx);
}
INFO("Close and commit latest file");
{
entry_submitter.write(true);
entry_submitter.write(true);
last_idx = entry_submitter.get_last_idx();
ledger.commit(last_idx);
read_entries_range_from_ledger(ledger, 1, last_idx);
REQUIRE(number_open_fd() == initial_number_fd + max_read_cache_size);
}
INFO("Still possible to recover a new ledger");
{
initial_number_fd = number_open_fd();
asynchost::Ledger ledger2(
ledger_dir, wf, chunk_threshold, max_read_cache_size);
// Committed files are not open for write
REQUIRE(number_open_fd() == initial_number_fd);
read_entries_range_from_ledger(ledger2, 1, last_idx);
REQUIRE(number_open_fd() == initial_number_fd + max_read_cache_size);
}
}

Просмотреть файл

@ -8,7 +8,7 @@ import infra.crypto
import infra.ledger
from infra.proposal import ProposalState
import http
import os
from loguru import logger as LOG
@ -61,7 +61,11 @@ def run(args):
network.start_and_join(args)
primary, _ = network.find_primary()
ledger_filename = network.find_primary()[0].remote.ledger_path()
ledger_directory = network.find_primary()[0].remote.ledger_path()
# For now, this test only works with one ledger file
for l in os.listdir(ledger_directory):
if l.endswith("_1"):
ledger_filename = os.path.join(ledger_directory, l)
ledger = infra.ledger.Ledger(ledger_filename)
(
original_proposals,

Просмотреть файл

@ -72,6 +72,7 @@ class Network:
"gov_script",
"join_timer",
"worker_threads",
"ledger_chunk_max_bytes",
"domain",
]
@ -183,7 +184,7 @@ class Network:
raise
node.network_state = infra.node.NodeNetworkState.joined
def _start_all_nodes(self, args, recovery=False, ledger_file=None):
def _start_all_nodes(self, args, recovery=False, ledger_dir=None):
hosts = self.hosts
if not args.package:
@ -211,7 +212,7 @@ class Network:
else:
node.recover(
lib_name=args.package,
ledger_file=ledger_file,
ledger_dir=ledger_dir,
workspace=args.workspace,
label=args.label,
common_dir=self.common_dir,
@ -313,19 +314,19 @@ class Network:
LOG.success("***** Network is now open *****")
def start_in_recovery(
self, args, ledger_file, common_dir=None,
self, args, ledger_dir, common_dir=None,
):
"""
Starts a CCF network in recovery mode.
:param args: command line arguments to configure the CCF nodes.
:param ledger_file: ledger file to recover from.
:param ledger_dir: ledger directory to recover from.
:param common_dir: common directory containing member and user keys and certs.
"""
self.common_dir = common_dir or get_common_folder_name(
args.workspace, args.label
)
primary = self._start_all_nodes(args, recovery=True, ledger_file=ledger_file)
primary = self._start_all_nodes(args, recovery=True, ledger_dir=ledger_dir)
# If a common directory was passed in, initialise the consortium from it
if common_dir is not None:

Просмотреть файл

@ -151,6 +151,11 @@ def cli_args(add=lambda x: None, parser=None, accept_unknown=False):
type=int,
default=10,
)
parser.add_argument(
"--ledger-chunk-max-bytes",
help="Minimum size (bytes) at which a new ledger chunk is created.",
default="100MB",
)
add(parser)

Просмотреть файл

@ -10,6 +10,7 @@ GCM_SIZE_TAG = 16
GCM_SIZE_IV = 12
LEDGER_TRANSACTION_SIZE = 4
LEDGER_DOMAIN_SIZE = 8
LEDGER_HEADER_SIZE = 8
UNPACK_ARGS = {"raw": True, "strict_map_key": False}
@ -116,7 +117,7 @@ class Transaction:
_file = None
_total_size = 0
_public_domain_size = 0
_next_offset = 0
_next_offset = LEDGER_HEADER_SIZE
_public_domain = None
_file_size = 0
gcm_header = None

Просмотреть файл

@ -11,6 +11,8 @@ import uuid
import ctypes
import signal
import re
import stat
import shutil
from collections import deque
from loguru import logger as LOG
@ -178,8 +180,13 @@ class SSHRemote(CmdMixin):
session.chmod(tgt_path, stat.st_mode)
for path in self.data_files:
tgt_path = os.path.join(self.root, os.path.basename(path))
if os.path.isdir(path):
session.mkdir(tgt_path)
for f in os.listdir(path):
session.put(os.path.join(path, f), os.path.join(tgt_path, f))
else:
session.put(path, tgt_path)
LOG.info("[{}] copy {} from {}".format(self.hostname, tgt_path, path))
session.put(path, tgt_path)
session.close()
def get(self, file_name, dst_path, timeout=FILE_TIMEOUT, target_name=None):
@ -199,10 +206,22 @@ class SSHRemote(CmdMixin):
while time.time() < end_time:
try:
target_name = target_name or file_name
session.get(
os.path.join(self.root, file_name),
os.path.join(dst_path, target_name),
)
fileattr = session.lstat(os.path.join(self.root, file_name))
if stat.S_ISDIR(fileattr.st_mode):
src_dir = os.path.join(self.root, file_name)
dst_dir = os.path.join(dst_path, file_name)
if os.path.exists(dst_dir):
shutil.rmtree(dst_dir)
os.makedirs(dst_dir)
for f in session.listdir(src_dir):
session.get(
os.path.join(src_dir, f), os.path.join(dst_dir, f),
)
else:
session.get(
os.path.join(self.root, file_name),
os.path.join(dst_path, target_name),
)
LOG.debug(
"[{}] found {} after {}s".format(
self.hostname, file_name, int(time.time() - start_time)
@ -391,6 +410,20 @@ class LocalRemote(CmdMixin):
LOG.info("[{}] {}".format(self.hostname, cmd))
return subprocess.call(cmd, shell=True)
def _cp(self, src_path, dst_path):
if os.path.isdir(src_path):
assert (
self._rc(
"rm -rf {}".format(
os.path.join(dst_path, os.path.basename(src_path))
)
)
== 0
)
assert self._rc("cp -r {} {}".format(src_path, dst_path)) == 0
else:
assert self._rc("cp {} {}".format(src_path, dst_path)) == 0
def _setup_files(self):
assert self._rc("rm -rf {}".format(self.root)) == 0
assert self._rc("mkdir -p {}".format(self.root)) == 0
@ -400,7 +433,7 @@ class LocalRemote(CmdMixin):
assert self._rc("ln -s {} {}".format(src_path, dst_path)) == 0
for path in self.data_files:
dst_path = self.root
assert self._rc("cp {} {}".format(path, dst_path)) == 0
self._cp(path, dst_path)
def get(self, file_name, dst_path, timeout=FILE_TIMEOUT, target_name=None):
path = os.path.join(self.root, file_name)
@ -412,9 +445,7 @@ class LocalRemote(CmdMixin):
else:
raise ValueError(path)
target_name = target_name or file_name
assert (
self._rc("cp {} {}".format(path, os.path.join(dst_path, target_name))) == 0
)
self._cp(path, dst_path)
def list_files(self):
return os.listdir(self.root)
@ -526,9 +557,10 @@ class CCFRemote(object):
memory_reserve_startup=0,
notify_server=None,
gov_script=None,
ledger_file=None,
ledger_dir=None,
log_format_json=None,
binary_dir=".",
ledger_chunk_max_bytes=(5 * 1024 * 1024),
domain=None,
):
"""
@ -542,14 +574,15 @@ class CCFRemote(object):
self.BIN = infra.path.build_bin_path(
self.BIN, enclave_type, binary_dir=binary_dir
)
self.ledger_file = ledger_file
self.ledger_file_name = (
os.path.basename(ledger_file) if ledger_file else f"{local_node_id}.ledger"
self.ledger_dir = ledger_dir
self.ledger_dir_name = (
os.path.basename(ledger_dir) if ledger_dir else f"{local_node_id}.ledger"
)
self.common_dir = common_dir
exe_files = [self.BIN, lib_path] + self.DEPS
data_files = [self.ledger_file] if self.ledger_file else []
data_files = [self.ledger_dir] if self.ledger_dir else []
# exe_files may be relative or absolute. The remote implementation should
# copy (or symlink) to the target workspace, and then node will be able
@ -572,7 +605,7 @@ class CCFRemote(object):
f"--rpc-address={make_address(host, rpc_port)}",
f"--rpc-address-file={self.rpc_address_path}",
f"--public-rpc-address={make_address(pubhost, rpc_port)}",
f"--ledger-file={self.ledger_file_name}",
f"--ledger-dir={self.ledger_dir_name}",
f"--node-cert-file={self.pem}",
f"--host-log-level={host_log_level}",
election_timeout_arg,
@ -592,6 +625,9 @@ class CCFRemote(object):
if memory_reserve_startup:
cmd += [f"--memory-reserve-startup={memory_reserve_startup}"]
if ledger_chunk_max_bytes:
cmd += [f"--ledger-chunk-max-bytes={ledger_chunk_max_bytes}"]
if notify_server:
notify_server_host, *notify_server_port = notify_server.split(":")
@ -707,11 +743,11 @@ class CCFRemote(object):
self.remote.set_perf()
def get_ledger(self):
self.remote.get(self.ledger_file_name, self.common_dir)
return os.path.join(self.common_dir, self.ledger_file_name)
self.remote.get(self.ledger_dir_name, self.common_dir)
return os.path.join(self.common_dir, self.ledger_dir_name)
def ledger_path(self):
return os.path.join(self.remote.root, self.ledger_file_name)
return os.path.join(self.remote.root, self.ledger_dir_name)
@contextmanager

Просмотреть файл

@ -29,12 +29,12 @@ def run(args):
if args.recover:
args.label = args.label + "_recover"
LOG.info("Recovering network from:")
LOG.info(f" - Ledger: {args.ledger}")
LOG.info(f" - Ledger: {args.ledger_dir}")
LOG.info(
f" - Defunct network public encryption key: {args.network_enc_pubk}"
)
LOG.info(f" - Common directory: {args.common_dir}")
network.start_in_recovery(args, args.ledger, args.common_dir)
network.start_in_recovery(args, args.ledger_dir, args.common_dir)
network.recover(args, args.network_enc_pubk)
else:
network.start_and_join(args)
@ -99,7 +99,7 @@ if __name__ == "__main__":
default=False,
)
parser.add_argument(
"--ledger", help="Ledger to recover from",
"--ledger-dir", help="Ledger directory to recover from",
)
parser.add_argument(
"--network-enc-pubk",
@ -112,7 +112,9 @@ if __name__ == "__main__":
args = infra.e2e_args.cli_args(add)
if args.recover and (
args.ledger is None or args.common_dir is None or args.network_enc_pubk is None
args.ledger_dir is None
or args.common_dir is None
or args.network_enc_pubk is None
):
print(
"Error: --recover requires --ledger, --network-enc-pubk and --common-dir arguments."