Remove x-ccf-global-commit header (#2144)

This commit is contained in:
Eddy Ashton 2021-02-02 14:05:32 +00:00 коммит произвёл GitHub
Родитель c1969bfae0
Коммит 6d61f53682
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
13 изменённых файлов: 84 добавлений и 141 удалений

Просмотреть файл

@ -14,6 +14,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
- `TxView`s have been renamed to `MapHandle`s, to clearly distinguish them from consensus views. Calls to `tx.get_view` must be replaced with `tx.rw`.
- `tx.rw` does not support retrieving multiple views in a single call. Instead of `auto [view1, view2] = tx.get_view(map1, map2);`, you must write `auto handle1 = tx.rw(map1); auto handle2 = tx.rw(map2);`.
### Removed
- The `x-ccf-global-commit` header is no longer sent with responses (#1586, #2144). This was a hint of global commit progress, but was known to be imprecise and unrelated to the executed transaction. Instead, clients should call `/commit` to monitor commit progress or `/tx` for a specific transaction.
## [0.17.2]
### Fixed

Просмотреть файл

@ -46,8 +46,6 @@ def truncate(string: str, max_len: int = 128):
CCF_TX_SEQNO_HEADER = "x-ccf-tx-seqno"
CCF_TX_VIEW_HEADER = "x-ccf-tx-view"
# Deprecated, will be removed
CCF_GLOBAL_COMMIT_HEADER = "x-ccf-global-commit"
DEFAULT_CONNECTION_TIMEOUT_SEC = 3
DEFAULT_REQUEST_TIMEOUT_SEC = 10
@ -175,8 +173,6 @@ class Response:
seqno: Optional[int]
#: CCF consensus view
view: Optional[int]
#: CCF global commit sequence number (deprecated)
global_commit: Optional[int]
#: Response HTTP headers
headers: dict
@ -201,7 +197,6 @@ class Response:
body=RequestsResponseBody(rr),
seqno=int_or_none(rr.headers.get(CCF_TX_SEQNO_HEADER)),
view=int_or_none(rr.headers.get(CCF_TX_VIEW_HEADER)),
global_commit=int_or_none(rr.headers.get(CCF_GLOBAL_COMMIT_HEADER)),
headers=rr.headers,
)
@ -227,7 +222,6 @@ class Response:
body=RawResponseBody(raw_body),
seqno=int_or_none(response.getheader(CCF_TX_SEQNO_HEADER)),
view=int_or_none(response.getheader(CCF_TX_VIEW_HEADER)),
global_commit=int_or_none(response.getheader(CCF_GLOBAL_COMMIT_HEADER)),
headers=response.headers,
)
@ -566,10 +560,9 @@ class WSClient:
(status_code,) = struct.unpack("<h", out[:2])
seqno = unpack_seqno_or_view(out[2:10])
view = unpack_seqno_or_view(out[10:18])
global_commit = unpack_seqno_or_view(out[18:26])
payload = out[26:]
payload = out[18:]
body = RawResponseBody(payload)
return Response(status_code, body, seqno, view, global_commit, headers={})
return Response(status_code, body, seqno, view, headers={})
class CCFClient:

Просмотреть файл

@ -112,9 +112,6 @@ namespace aft
auto ctx = create_request_ctx(serialized_req.data(), serialized_req.size());
// Deprecated, this will be removed in future releases
ctx->ctx->set_global_commit(committed_seqno);
return std::make_unique<RequestMessage>(
std::move(serialized_req), args.rid, std::move(ctx), rep_cb);
}
@ -133,9 +130,6 @@ namespace aft
auto ctx = create_request_ctx(request);
// Deprecated, this will be removed in future releases
ctx->ctx->set_global_commit(committed_seqno);
auto request_message = RequestMessage::deserialize(
std::move(request.raw), request.rid, std::move(ctx), nullptr);

Просмотреть файл

@ -184,7 +184,6 @@ namespace enclave
virtual void set_seqno(kv::Version) = 0;
virtual void set_view(kv::Consensus::View) = 0;
virtual void set_global_commit(kv::Version) = 0;
virtual void set_response_header(
const std::string_view& name, const std::string_view& value) = 0;
@ -193,8 +192,6 @@ namespace enclave
set_response_header(name, fmt::format("{}", n));
}
virtual bool has_global_commit() = 0;
virtual void set_error(
http_status status, const std::string& code, std::string&& msg)
{

Просмотреть файл

@ -21,9 +21,6 @@ namespace http
static constexpr auto CCF_TX_SEQNO = "x-ccf-tx-seqno";
static constexpr auto CCF_TX_VIEW = "x-ccf-tx-view";
// Deprecated, will be removed in a later release
static constexpr auto CCF_GLOBAL_COMMIT = "x-ccf-global-commit";
}
namespace headervalues

Просмотреть файл

@ -203,12 +203,6 @@ namespace http
set_response_header(http::headers::CCF_TX_VIEW, fmt::format("{}", v));
}
virtual void set_global_commit(kv::Version gc) override
{
set_response_header(
http::headers::CCF_GLOBAL_COMMIT, fmt::format("{}", gc));
}
virtual const std::vector<uint8_t>& get_request_body() const override
{
return request_body;
@ -300,12 +294,6 @@ namespace http
response_headers[std::string(name)] = value;
}
virtual bool has_global_commit() override
{
return response_headers.find(http::headers::CCF_GLOBAL_COMMIT) !=
response_headers.end();
}
virtual void set_apply_writes(bool apply) override
{
explicit_apply_writes = apply;

Просмотреть файл

@ -66,7 +66,6 @@ namespace ws
size_t code,
kv::Version seqno,
kv::Consensus::View view,
kv::Version global_commit,
const std::vector<uint8_t>& body)
{
size_t out_frame_size = ws::OUT_CCF_HEADER_SIZE + body.size();
@ -78,7 +77,6 @@ namespace ws
serialized::write<uint16_t>(p, s, code);
serialized::write<size_t>(p, s, seqno);
serialized::write<size_t>(p, s, view);
serialized::write<size_t>(p, s, global_commit);
assert(s == body.size());
::memcpy(p, body.data(), s);
return frame;

Просмотреть файл

@ -9,7 +9,7 @@ namespace ws
static constexpr size_t INITIAL_READ = 2;
static constexpr size_t OUT_CCF_HEADER_SIZE =
sizeof(uint16_t) /* return code */ + sizeof(size_t) /* seqno */ +
sizeof(size_t) /* view */ + sizeof(size_t) /* global_commit */;
sizeof(size_t) /* view */;
enum Verb
{

Просмотреть файл

@ -115,16 +115,13 @@ namespace ws
auto status = serialized::read<uint16_t>(data, s);
auto seqno = serialized::read<size_t>(data, s);
auto view = serialized::read<size_t>(data, s);
auto global_commit = serialized::read<size_t>(data, s);
std::vector<uint8_t> body(data, data + s);
proc.handle_response(
(http_status)status,
{{http::headers::CCF_TX_SEQNO, fmt::format("{}", seqno)},
{http::headers::CCF_TX_VIEW, fmt::format("{}", view)},
{http::headers::CCF_GLOBAL_COMMIT,
fmt::format("{}", global_commit)}},
{http::headers::CCF_TX_VIEW, fmt::format("{}", view)}},
std::move(body));
state = INIT;
return INITIAL_READ;

Просмотреть файл

@ -15,10 +15,9 @@ namespace ws
size_t code,
const std::vector<uint8_t>& body,
kv::Version seqno = kv::NoVersion,
kv::Consensus::View view = ccf::VIEW_UNKNOWN,
kv::Version global_commit = kv::NoVersion)
kv::Consensus::View view = ccf::VIEW_UNKNOWN)
{
return make_out_frame(code, seqno, view, global_commit, body);
return make_out_frame(code, seqno, view, body);
};
inline std::vector<uint8_t> error(ccf::ErrorDetails&& error)
@ -62,7 +61,6 @@ namespace ws
size_t seqno = 0;
size_t view = 0;
size_t global_commit = 0;
public:
WsRpcContext(
@ -185,16 +183,6 @@ namespace ws
view = t;
}
virtual void set_global_commit(kv::Version gc) override
{
global_commit = gc;
}
virtual bool has_global_commit() override
{
return global_commit != 0;
}
virtual void set_apply_writes(bool apply) override
{
explicit_apply_writes = apply;
@ -213,8 +201,7 @@ namespace ws
virtual std::vector<uint8_t> serialise_response() const override
{
return serialise(
response_status, response_body, seqno, view, global_commit);
return serialise(response_status, response_body, seqno, view);
}
};
}

Просмотреть файл

@ -293,12 +293,6 @@ namespace ccf
ctx->set_view(tx.commit_term());
}
// Deprecated, this will be removed in future releases
if (!ctx->has_global_commit())
{
ctx->set_global_commit(consensus->get_committed_seqno());
}
if (history != nullptr && consensus->is_primary())
{
history->try_emit_signature();

Просмотреть файл

@ -224,32 +224,37 @@ namespace client
if (response_times.is_timing_active() && reply.status == HTTP_STATUS_OK)
{
const auto commits = timing::parse_commit_ids(reply);
const auto tx_id = timing::extract_transaction_id(reply);
if (!tx_id.has_value())
{
throw std::logic_error("No transaction ID found in response headers");
}
// Record time of received responses
response_times.record_receive(reply.id, commits);
response_times.record_receive(reply.id, tx_id);
if (commits->view < last_response_commit.view)
if (tx_id->view < last_response_tx_id.view)
{
throw std::logic_error(fmt::format(
"View went backwards (expected {}, saw {})!",
last_response_commit.view,
commits->view));
last_response_tx_id.view,
tx_id->view));
}
else if (
commits->view > last_response_commit.view &&
commits->seqno <= last_response_commit.seqno)
tx_id->view > last_response_tx_id.view &&
tx_id->seqno <= last_response_tx_id.seqno)
{
throw std::logic_error(fmt::format(
"There has been an election and transactions have "
"been lost! (saw {}.{}, currently at {}.{})",
last_response_commit.view,
last_response_commit.seqno,
commits->view,
commits->seqno));
last_response_tx_id.view,
last_response_tx_id.seqno,
tx_id->view,
tx_id->seqno));
}
last_response_commit = {commits->view, commits->seqno};
last_response_tx_id = tx_id.value();
}
}
@ -280,7 +285,7 @@ namespace client
PreparedTxs prepared_txs;
timing::ResponseTimes response_times;
timing::CommitPoint last_response_commit = {0, 0};
timing::TransactionID last_response_tx_id = {0, 0};
std::chrono::high_resolution_clock::time_point last_write_time;
std::chrono::nanoseconds write_delay_ns = std::chrono::nanoseconds::zero();
@ -461,9 +466,9 @@ namespace client
// Create a new connection, because we need to do some GETs
// and when all you have is a WebSocket, everything looks like a POST!
auto c = create_connection(true, false);
wait_for_global_commit(last_response_commit);
wait_for_global_commit(last_response_tx_id);
}
const auto last_commit = last_response_commit.seqno;
const auto last_commit = last_response_tx_id.seqno;
auto timing_results = end_timing(last_commit);
LOG_INFO_FMT("Timing ended");
return timing_results;
@ -708,7 +713,7 @@ namespace client
}
}
void wait_for_global_commit(const timing::CommitPoint& target)
void wait_for_global_commit(const timing::TransactionID& target)
{
response_times.wait_for_global_commit(target);
}
@ -717,16 +722,14 @@ namespace client
{
check_response(response);
const auto response_commit_ids = timing::parse_commit_ids(response);
if (!response_commit_ids.has_value())
const auto tx_id = timing::extract_transaction_id(response);
if (!tx_id.has_value())
{
throw std::logic_error(
"Cannot wait for response to commit - it does not have a TxID");
}
const timing::CommitPoint cp{response_commit_ids->view,
response_commit_ids->seqno};
wait_for_global_commit(cp);
wait_for_global_commit(tx_id.value());
}
void begin_timing()

Просмотреть файл

@ -66,24 +66,18 @@ namespace timing
const bool expects_commit;
};
struct CommitIDs
struct TransactionID
{
size_t seqno;
size_t global;
size_t view;
size_t seqno;
};
struct ReceivedReply
{
const TimeDelta receive_time;
const size_t rpc_id;
const optional<CommitIDs> commit;
};
struct CommitPoint
{
size_t view;
size_t seqno;
TimeDelta receive_time;
size_t rpc_id;
optional<TransactionID> commit;
size_t global_seqno;
};
std::string timestamp()
@ -163,35 +157,26 @@ namespace timing
vector<PerRound> per_round;
};
static std::optional<CommitIDs> parse_commit_ids(
static std::optional<TransactionID> extract_transaction_id(
const RpcTlsClient::Response& response)
{
const auto& h = response.headers;
const auto local_commit_it = h.find(http::headers::CCF_TX_SEQNO);
if (local_commit_it == h.end())
{
return std::nullopt;
}
const auto global_commit_it = h.find(http::headers::CCF_GLOBAL_COMMIT);
if (global_commit_it == h.end())
{
return std::nullopt;
}
const auto view_it = h.find(http::headers::CCF_TX_VIEW);
if (view_it == h.end())
{
return std::nullopt;
}
const auto seqno =
std::strtoul(local_commit_it->second.c_str(), nullptr, 0);
const auto global =
std::strtoul(global_commit_it->second.c_str(), nullptr, 0);
const auto view = std::strtoul(view_it->second.c_str(), nullptr, 0);
const auto seqno_it = h.find(http::headers::CCF_TX_SEQNO);
if (seqno_it == h.end())
{
return std::nullopt;
}
return {{seqno, global, view}};
const auto view = std::strtoul(view_it->second.c_str(), nullptr, 0);
const auto seqno = std::strtoul(seqno_it->second.c_str(), nullptr, 0);
return {{view, seqno}};
}
class ResponseTimes
@ -240,16 +225,20 @@ namespace timing
{Clock::now() - start_time, method, rpc_id, expects_commit});
}
void record_receive(size_t rpc_id, const optional<CommitIDs>& commit)
void record_receive(
size_t rpc_id,
const optional<TransactionID>& tx_id,
size_t global_seqno = 0)
{
receives.push_back({Clock::now() - start_time, rpc_id, commit});
receives.push_back(
{Clock::now() - start_time, rpc_id, tx_id, global_seqno});
}
// Repeatedly calls GET /tx RPC until the target seqno has been
// committed (or will never be committed), returns first confirming
// response. Calls record_[send/response], if record is true.
// Throws on errors, or if target is rolled back
void wait_for_global_commit(const CommitPoint& target, bool record = true)
void wait_for_global_commit(const TransactionID& target, bool record = true)
{
auto params = nlohmann::json::object();
params["view"] = target.view;
@ -279,7 +268,7 @@ namespace timing
body.dump()));
}
const auto commit_ids = parse_commit_ids(response);
const auto tx_id = extract_transaction_id(response);
// NB: Eventual header re-org should be exposing API types so
// they can be consumed cleanly from C++ clients
@ -288,7 +277,7 @@ namespace timing
{
if (record)
{
record_receive(response.id, commit_ids);
record_receive(response.id, tx_id);
}
// Commit is pending, poll again
@ -298,27 +287,24 @@ namespace timing
else if (tx_status == "COMMITTED")
{
LOG_INFO_FMT("Found global commit {}.{}", target.view, target.seqno);
if (commit_ids.has_value())
if (tx_id.has_value())
{
LOG_INFO_FMT(
" (headers view: {}, seqno: {}, global: {})",
commit_ids->view,
commit_ids->seqno,
commit_ids->global);
" (headers view: {}, seqno: {})", tx_id->view, tx_id->seqno);
}
if (record)
{
if (commit_ids.has_value())
if (tx_id.has_value())
{
record_receive(response.id, commit_ids);
record_receive(response.id, tx_id, target.seqno);
}
else
{
// If this response didn't contain commit IDs in headers, we can
// still construct them from the body
record_receive(
response.id, {{target.seqno, target.seqno, target.view}});
response.id, {{target.view, target.seqno}}, target.seqno);
}
}
return;
@ -365,11 +351,11 @@ namespace timing
if (receive.commit.has_value())
{
if (receive.commit->global >= highest_local_commit)
if (receive.global_seqno >= highest_local_commit)
{
LOG_INFO_FMT(
"Global commit match {} for highest local commit {}",
receive.commit->global,
receive.global_seqno,
highest_local_commit);
auto was =
duration_cast<milliseconds>(end_time_delta).count() / 1000.0;
@ -400,16 +386,16 @@ namespace timing
vector<PendingGlobalCommit> pending_global_commits;
auto complete_pending = [&](const ReceivedReply& receive) {
if (receive.commit.has_value())
if (receive.global_seqno > 0)
{
auto pending_it = pending_global_commits.begin();
while (pending_it != pending_global_commits.end())
{
if (receive.commit->global >= pending_it->target_commit)
if (receive.global_seqno >= pending_it->target_commit)
{
round_global_commit.push_back(
(receive.receive_time - pending_it->send_time).count());
pending_it = pending_global_commits.erase(pending_it);
++pending_it;
}
else
{
@ -419,6 +405,11 @@ namespace timing
break;
}
}
if (pending_it != pending_global_commits.begin())
{
pending_global_commits.erase(
pending_global_commits.begin(), pending_it);
}
}
};
@ -427,7 +418,7 @@ namespace timing
const auto& send = *send_it;
double tx_latency;
optional<CommitIDs> response_commit;
optional<ReceivedReply> matching_reply;
for (auto i = next_recv; i < receives.size(); ++i)
{
const auto& receive = receives[i];
@ -448,7 +439,7 @@ namespace timing
continue;
}
response_commit = receive.commit;
matching_reply = receive;
next_recv = i + 1;
break;
}
@ -456,24 +447,24 @@ namespace timing
if (send.expects_commit)
{
if (response_commit.has_value())
if (matching_reply.has_value())
{
// Successful write - measure local tx time AND try to find global
// commit time
round_local_commit.push_back(tx_latency);
if (response_commit->global >= response_commit->seqno)
if (matching_reply->global_seqno >= matching_reply->commit->seqno)
{
// Global commit already already
round_global_commit.push_back(tx_latency);
}
else
{
if (response_commit->seqno <= highest_local_commit)
if (matching_reply->commit->seqno <= highest_local_commit)
{
// Store expected global commit to find later
pending_global_commits.push_back(
{send.send_time, response_commit->seqno});
{send.send_time, matching_reply->commit->seqno});
}
else
{
@ -481,7 +472,7 @@ namespace timing
"Ignoring request with ID {} because it committed too late "
"({} > {})",
send.rpc_id,
response_commit->seqno,
matching_reply->commit->seqno,
highest_local_commit);
}
}
@ -597,14 +588,14 @@ namespace timing
{
recv_csv << "," << reply.commit->seqno;
recv_csv << "," << reply.commit->view;
recv_csv << "," << reply.commit->global;
}
else
{
recv_csv << "," << 0;
recv_csv << "," << 0;
recv_csv << "," << 0;
}
recv_csv << "," << reply.global_seqno;
recv_csv << endl;
}
LOG_INFO_FMT("Wrote {} entries to {}", receives.size(), recv_path);