Add support for failover server to the submitter (#5501)

This commit is contained in:
Amaury Chamayou 2023-08-03 18:19:57 +02:00 коммит произвёл GitHub
Родитель 280a2d945d
Коммит 958d070d31
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
4 изменённых файлов: 81 добавлений и 40 удалений

Просмотреть файл

@ -178,10 +178,22 @@ def create_and_fill_key_space(size: int, primary: infra.node.Node) -> List[str]:
def run(args):
hosts = args.nodes or ["local://localhost"]
if args.stop_primary_after_s:
assert (
len(hosts) > 1
), "Can only stop primary if there is at least one other node to fail over to"
LOG.info("Starting nodes on {}".format(hosts))
with infra.network.network(
hosts, args.binary_dir, args.debug_nodes, args.perf_nodes, pdb=args.pdb
) as network:
# Manipulate election timeouts to produce a deterministic successor to the primary
# when it is stopped, allowing the submitters to be configured to fail over accordingly
if args.stop_primary_after_s:
for i in range(len(hosts)):
if i != 1:
network.per_node_args_override[i] = {"election_timeout_ms": 15000}
network.start_and_open(args)
primary, backups = network.find_nodes()
@ -226,29 +238,35 @@ def run(args):
remote_client = configure_remote_client(
args, client_idx, "localhost", network.common_dir
)
cmd = [
args.client,
"--cert",
"user0_cert.pem",
"--key",
"user0_privk.pem",
"--cacert",
os.path.basename(network.cert_path),
f"--server-address={node.get_public_rpc_host()}:{node.get_public_rpc_port()}",
"--max-writes-ahead",
str(args.max_writes_ahead),
"--send-filepath",
"pi_requests.parquet",
"--response-filepath",
"pi_response.parquet",
"--generator-filepath",
os.path.abspath(path_to_requests_file),
"--pid-file-path",
"cmd.pid",
]
# All clients talking to the primary are configured to fail over to the first backup,
# which is the only node whose election timeout has not been raised, to guarantee its
# election as the old primary becomes unavailable.
if args.stop_primary_after_s:
cmd.append(
f"--failover-server-address={backups[0].get_public_rpc_host()}:{backups[0].get_public_rpc_port()}"
)
remote_client.setcmd(cmd)
remote_client.description = f"{gen} x {iterations} to {target} ({node.get_public_rpc_address()})"
remote_client.setcmd(
[
args.client,
"--cert",
"user0_cert.pem",
"--key",
"user0_privk.pem",
"--cacert",
os.path.basename(network.cert_path),
f"--server-address={node.get_public_rpc_host()}:{node.get_public_rpc_port()}",
"--max-writes-ahead",
str(args.max_writes_ahead),
"--send-filepath",
"pi_requests.parquet",
"--response-filepath",
"pi_response.parquet",
"--generator-filepath",
os.path.abspath(path_to_requests_file),
"--pid-file-path",
"cmd.pid",
]
)
clients.append(remote_client)
client_idx += 1
@ -280,8 +298,17 @@ def run(args):
raise TimeoutError(
f"Client still running after {args.client_timeout_s}s"
)
if (
args.stop_primary_after_s
and time.time() > start_time + args.stop_primary_after_s
and not primary.is_stopped()
):
LOG.info(
f"Stopping primary after {args.stop_primary_after_s} seconds"
)
primary.stop()
time.sleep(5)
time.sleep(1)
for remote_client in clients:
remote_client.stop()
@ -467,6 +494,9 @@ def cli_args():
action="append",
required=True,
)
parser.add_argument(
"--stop-primary-after-s", help="Stop primary after this many seconds", type=int
)
return infra.e2e_args.cli_args(
parser=parser, accept_unknown=False, ledger_chunk_bytes_override="5MB"
)

Просмотреть файл

@ -197,6 +197,10 @@ class Network:
"tick_ms",
"max_msg_size_bytes",
]
# Map of node id to dict of node arg to override value
# for example, to set the election timeout to 2s for node 3:
# per_node_args_override = {3: {"election_timeout_ms": 2000}}
per_node_args_override = {}
# Maximum delay (seconds) for updates to propagate from the primary to backups
replication_delay = 30
@ -415,6 +419,8 @@ class Network:
}
for i, node in enumerate(self.nodes):
forwarded_args_with_overrides = forwarded_args.copy()
forwarded_args_with_overrides.update(self.per_node_args_override.get(i, {}))
try:
if i == 0:
if not recovery:
@ -424,7 +430,7 @@ class Network:
label=args.label,
common_dir=self.common_dir,
members_info=self.consortium.get_members_info(),
**forwarded_args,
**forwarded_args_with_overrides,
**kwargs,
)
else:
@ -436,7 +442,7 @@ class Network:
ledger_dir=ledger_dir,
read_only_ledger_dirs=read_only_ledger_dirs,
snapshots_dir=snapshots_dir,
**forwarded_args,
**forwarded_args_with_overrides,
**kwargs,
)
self.wait_for_state(
@ -455,7 +461,7 @@ class Network:
from_snapshot=snapshots_dir is not None,
read_only_ledger_dirs=read_only_ledger_dirs,
snapshots_dir=snapshots_dir,
**forwarded_args,
**forwarded_args_with_overrides,
**kwargs,
)
except Exception:

Просмотреть файл

@ -17,7 +17,8 @@ public:
std::string cert;
std::string key;
std::string rootCa;
std::string server_address = "127.0.0.1:8000";
std::string server_address;
std::string failover_server_address = "";
std::string send_filepath;
std::string response_filepath;
std::string generator_filepath;
@ -54,26 +55,32 @@ public:
"-a,--server-address",
server_address,
"Specify the address to submit requests.")
->required(true);
app
.add_option(
"--failover-server-address",
failover_server_address,
"Specify failover address, in case connection to the main server address is lost.")
->capture_default_str();
app
.add_option(
"-s,--send-filepath",
send_filepath,
"Path to parquet file to store the submitted requests.")
->required();
->required(true);
app
.add_option(
"-r,--response-filepath",
response_filepath,
"Path to parquet file to store the responses from the submitted "
"requests.")
->required();
->required(true);
app
.add_option(
"-g,--generator-filepath",
generator_filepath,
"Path to parquet file with the generated requests to be submitted.")
->required();
->required(true);
app
.add_option(
"-m,--max-writes-ahead",

Просмотреть файл

@ -258,18 +258,15 @@ int main(int argc, char** argv)
read_parquet_file(args.generator_filepath, data_handler);
std::string server_address = args.server_address;
std::string failover_server_address = args.failover_server_address;
if (failover_server_address.empty())
{
failover_server_address = server_address;
}
// Write PID to disk
files::dump(fmt::format("{}", ::getpid()), args.pid_file_path);
// Keep only the host and port removing any https:// characters
std::string separator = "//";
auto exists_index = server_address.find(separator);
if (exists_index != std::string::npos)
{
server_address = server_address.substr(exists_index + separator.length());
}
auto requests_size = data_handler.ids.size();
std::vector<timespec> start(requests_size);
@ -284,6 +281,7 @@ int main(int argc, char** argv)
size_t retry_count = 0;
size_t read_reqs = 0;
LOG_INFO_FMT("Connecting to {}", server_address);
auto connection = create_connection(certificates, server_address);
connection->set_tcp_nodelay(true);
@ -322,8 +320,8 @@ int main(int argc, char** argv)
catch (std::logic_error& e)
{
LOG_FAIL_FMT(
"Sending interrupted: {}, attempting reconnection", e.what());
connection = create_connection(certificates, server_address);
"Sending interrupted: {}, attempting reconnection to {}", e.what(), failover_server_address);
connection = create_connection(certificates, failover_server_address);
connection->set_tcp_nodelay(true);
retry_count++;
}