From c2dfc69d3750683e6b00f2c297934d3f92c3844a Mon Sep 17 00:00:00 2001 From: Ryan Newton Date: Mon, 10 Dec 2018 19:05:36 -0800 Subject: [PATCH] Update wire protocol doc (still unfinished) + tweak how runAmbrosia script traps errors --- .../AMBROSIA_client_network_protocol.md | 67 +++++++++++++------ Clients/C/Makefile | 12 +++- Clients/C/include/ambrosia/client.h | 18 +++-- Clients/C/include/ambrosia/internal/bits.h | 18 ++++- Clients/C/src/ambrosia_client.c | 37 +--------- InternalImmortals/NativeService/Makefile | 6 +- InternalImmortals/NativeService/example.sh | 25 ------- .../NativeService/run_test_in_one_machine.sh | 41 ++++++++---- InternalImmortals/NativeService/service.c | 25 +++---- Scripts/runAmbrosiaService.sh | 23 ++++--- 10 files changed, 139 insertions(+), 133 deletions(-) delete mode 100755 InternalImmortals/NativeService/example.sh diff --git a/CONTRIBUTING/AMBROSIA_client_network_protocol.md b/CONTRIBUTING/AMBROSIA_client_network_protocol.md index b26f073..e28f041 100644 --- a/CONTRIBUTING/AMBROSIA_client_network_protocol.md +++ b/CONTRIBUTING/AMBROSIA_client_network_protocol.md @@ -3,36 +3,53 @@ Client Protocol for AMBROSIA network participants ================================================= This document covers how a network endpoint should communicate with -the AMBROSIA reliability coordinator assigned to it (typically located -within the same physical machine). +the AMBROSIA reliability coordinator assigned to it. The coordinator +is located within the same physical machine/container and assumed to +survive or fail with the application process. + +Indeed, the coordinator could have been running inside the same +process as the application, but it is designed to instead run in a +separate process (and communicate via TCP/IP over a local socket) to +be more language-agnostic. Overview and Terminology ------------------------ -FINISHME +Below we use the following terminology: - * Commit ID - * Sequence ID + * Committer ID - an arbitrary (32 bit) identifier for a communication + endpoint (a service) in the network of running "immortals". + + * Sequence ID - the (monotonically increasing) number of a log entry. Note that + each logical immortal has its own log - * "Async/await" RPC - a classic notion of a *future*. These RPCs return a value back to the caller. Because AMBROSIA ensures reliability, they are semantically identical to function calls, without introducing new failure modes such as timeouts or disconnections. - - * "Fire and Forget" RPC - launch a remote computation, but provide no information back to the caller. Note that even an async/await -RPC with "void" return value exposes more than this, because the caller can ascertain when the remote RPC has been completely processed. + * "Async/await" RPCs - are *futures*; they return a value back to the + caller. Because AMBROSIA ensures reliability, they are semantically + identical to function calls, without introducing new failure modes such as + timeouts or disconnections. + * "Fire and Forget" RPCs - launch a remote computation, but provide no + information back to the caller. Note that even an async/await RPC with + "void" return value indicates more to the caller (namely, that the remote + computation has completed). Required Helper Functions ------------------------- -FINISHME: +In order to build the binary message formats described below, we assume that the +new client software can access TCP sockets and additionally implements the +following serialized datatypes. -Assumes TCP + + * ZigZagInt - a zig-zag encoded variable size 32 bit signed integer + * ZigZagLong - a zig-zag encoded variable size 64 bit signed integer + * IntFixed - a 32 bit little endian number + * LongFixed - a 64 bit little endian number - * WriteZigZagInt - * WriteFixedInt - * WriteZigZagLong - * WriteFixedLong + * CheckSum - FINISHME + +These variable-length integers are in the same format used by, e.g., +[Protobufs](https://developers.google.com/protocol-buffers/docs/encoding). - * CheckSum Message Formats --------------- @@ -43,7 +60,7 @@ Message Formats All information received from the reliability coordinator is in the form of a sequence of log records. Each log record has a 24 byte header, followed by the actual record contents. The header is as follows: - * Bytes [0-3]: The commit ID for the service, this should be constant for all records for the lifetime of the service, format IntFixed. + * Bytes [0-3]: The committer ID for the service, this should be constant for all records for the lifetime of the service, format IntFixed. * Bytes [4-7]: The size of the whole log record, in bytes, including the header. The format is IntFixed * Bytes [8-15]: The check bytes to check the integrity of the log record. The format is LongFixed. * Bytes [16-23]: The log record sequence ID. Excluding records labeled with sequence ID “-1”, these should be in order. The format is LongFixed @@ -58,8 +75,8 @@ The rest of the record is a sequence of messages, packed tightly, each with the All information sent to the reliability coordinator is in the form of a sequence of messages with the format specified above. Message types and associated data which may be sent to or received by services: - * 14 - TrimTo (RRN: INTERNAL!??!) - * 13 - CountReplayableRPCBatchByte (RRN: INTERNAL!??!) + * 14 - TrimTo (FINISHME - INTERNAL!??!) + * 13 - CountReplayableRPCBatchByte (FINISHME - INTERNAL!??!) * 12 – `UpgradeService` (Received): No data @@ -69,7 +86,12 @@ Message types and associated data which may be sent to or received by services: * 9 – `InitialMessage` (Sent/Received): Data is a complete (incoming rpc) message which is given back to the service as the very first RPC message it ever receives. Used to bootstrap service start behavior. - * 8 – `Checkpoint` (Sent/Received): Data are the bytes corresponding to the serialized state of the service. + * 8 – `Checkpoint` (Sent/Received): The payload is a single 64 bit number. + That payload in turn is the size in bytes of a checkpoint itself, which is a + binary blob that follows this message immediately (no additional header). + The reason that checkpoints are not sent in the message payload directly is + so that they can have a 64-bit instead of 32-bit length, in order to support + large checkpoints. * 5 – `RPCBatch` (Sent/Received): Data are a count of the number of RPC messages in the batch, followed by the corresponding number of RPC messages. Note that the count is in variable sized WriteInt format @@ -135,6 +157,7 @@ If performing an upgrade what-if test: When a TakeCheckpoint message is received, no further messages may be processed until the state is serialized and sent in a checkpoint message. Note that the serialized state must include any unsent output messages which resulted from previous incoming calls. Those serialized unsent messages must follow the checkpoint message. -(RRN: What are the rules for SENDING!!) +### Attach-before-send protocol + +FINISHME... -(RRN: What is the ATTACH protocol??) diff --git a/Clients/C/Makefile b/Clients/C/Makefile index 7849266..f5d933a 100644 --- a/Clients/C/Makefile +++ b/Clients/C/Makefile @@ -1,9 +1,12 @@ -# Put your -D variables here, e.g. -DDEBUG -DEFINES= -DIPV4 +# Put your -D variables here: +DEFINES ?= + +EXTRA_DEFINES = -DIPV4 # ^ TODO build everything twice for IPV6 vs IPV4. # TODOTODO fix it so that one compile can work for both. +ALL_DEFINES = $(DEFINES) $(EXTRA_DEFINES) GNULIBS= -lpthread GNUOPTS= -pthread -O3 @@ -16,13 +19,16 @@ OBJS1= $(patsubst src/%.c,bin/static/%.o, $(SRCS) ) OBJS2= $(patsubst src/%.c,bin/shared/%.o, $(SRCS) ) -COMP= gcc $(DEFINES) -I include/ $(GNUOPTS) +COMP= gcc $(ALL_DEFINES) -I include/ $(GNUOPTS) LINK= gcc LIBNAME=libambrosia all: bin/$(LIBNAME).a bin/$(LIBNAME).so +debug: + $(MAKE) DEFINES="-DAMBCLIENT_DEBUG" clean publish + bin/$(LIBNAME).a: $(OBJS1) ar rcs $@ $(OBJS1) diff --git a/Clients/C/include/ambrosia/client.h b/Clients/C/include/ambrosia/client.h index 3f908b0..cad261e 100644 --- a/Clients/C/include/ambrosia/client.h +++ b/Clients/C/include/ambrosia/client.h @@ -125,8 +125,18 @@ int zigzag_int_size(int32_t value); //------------------------------------------------------------------------------ #ifdef AMBCLIENT_DEBUG +extern volatile int64_t amb_debug_lock; -extern volatile int64_t debug_lock; +// Helper used only below +static inline void amb_sleep_seconds(double n) { +#ifdef _WIN32 + Sleep((int)(n * 1000)); +#else + int64_t nanos = (int64_t)(10e9 * n); + const struct timespec ts = {0, nanos}; + nanosleep(&ts, NULL); +#endif +} static inline void amb_debug_log(const char *format, ...) { @@ -134,14 +144,14 @@ static inline void amb_debug_log(const char *format, ...) va_start(args, format); amb_sleep_seconds((double)(rand()%1000) * 0.00001); // .01 - 10 ms #ifdef _WIN32 - while ( 1 == InterlockedCompareExchange64(&debug_lock, 1, 0) ) { } + while ( 1 == InterlockedCompareExchange64(&amb_debug_lock, 1, 0) ) { } #else - while ( 1 == __sync_val_compare_and_swap(&debug_lock, 1, 0) ) { } + while ( 1 == __sync_val_compare_and_swap(&amb_debug_lock, 1, 0) ) { } #endif fprintf(amb_dbg_fd," [AMBCLIENT] "); vfprintf(amb_dbg_fd,format, args); fflush(amb_dbg_fd); - debug_lock = 0; + amb_debug_lock = 0; va_end(args); } #else diff --git a/Clients/C/include/ambrosia/internal/bits.h b/Clients/C/include/ambrosia/internal/bits.h index a55833f..4270b55 100644 --- a/Clients/C/include/ambrosia/internal/bits.h +++ b/Clients/C/include/ambrosia/internal/bits.h @@ -2,7 +2,8 @@ // Internal helper: try repeatedly on a socket until all bytes are sent. -static inline void socket_send_all(int sock, const void* buf, size_t len, int flags) { +static inline +void socket_send_all(int sock, const void* buf, size_t len, int flags) { char* cur = (char*)buf; int remaining = len; while (remaining > 0) { @@ -21,3 +22,18 @@ static inline void socket_send_all(int sock, const void* buf, size_t len, int fl #endif } } + + +static inline +void print_hex_bytes(FILE* fd, char* ptr, int len) { + const int limit = 100; // Only print this many: + fprintf(fd,"0x"); + int j; + for (j=0; j < len && j < limit; j++) { + fprintf(fd,"%02hhx", (unsigned char)ptr[j]); + if (j % 2 == 1) + fprintf(fd," "); + } + if (j /tmp/coord.log & -dotnet "/ambrosia/ImmortalCoordinator/bin/Release/netcoreapp2.0/linux-x64/ImmortalCoordinator.dll" rrnjob 1500 & -PID1=$! -sleep 1 -# dotnet "/ambrosia/ImmortalCoordinator/bin/Release/netcoreapp2.0/linux-x64/ImmortalCoordinator.dll" rrnserver 2500 & -# PID2=$! - -# Lame, racy: -sleep 7 - -echo "Proceeding on the assumption you saw \"Ready...\" above this line..." -./service_v4.exe - -wait $PID1 -# wait $PID2 diff --git a/InternalImmortals/NativeService/run_test_in_one_machine.sh b/InternalImmortals/NativeService/run_test_in_one_machine.sh index 76dc44f..d2c0c6f 100755 --- a/InternalImmortals/NativeService/run_test_in_one_machine.sh +++ b/InternalImmortals/NativeService/run_test_in_one_machine.sh @@ -11,10 +11,17 @@ set -euo pipefail # source `dirname $0`/default_var_settings.sh -PORT1=49001 -PORT2=49002 -PORT3=49003 -PORT4=49004 + +# A number to add to all ports to avoid colliding or reusing recently +# used ports. +if ! [ ${PORTOFFSET:+defined} ]; then + PORTOFFSET=0 +fi + +PORT1=$((49001 + PORTOFFSET)) +PORT2=$((49002 + PORTOFFSET)) +PORT3=$((49003 + PORTOFFSET)) +PORT4=$((49004 + PORTOFFSET)) INSTANCE_PREFIX="" if [ $# -ne 0 ]; @@ -23,26 +30,32 @@ fi CLIENTNAME=${INSTANCE_PREFIX}nativeSend SERVERNAME=${INSTANCE_PREFIX}nativeRecv +_cleanup() { + kill -TERM "$pid_server" 2>/dev/null || true +} +trap _cleanup TERM INT QUIT HUP + echo echo "--------------------------------------------------------------------------------" echo "Running NativeService with 4 processes all in this machine/container" echo " Instance: names $CLIENTNAME, $SERVERNAME" + echo "--------------------------------------------------------------------------------" echo -set -x -time Ambrosia RegisterInstance -i $CLIENTNAME --rp $PORT1 --sp $PORT2 -l "./ambrosia_logs/" -time Ambrosia RegisterInstance -i $SERVERNAME --rp $PORT3 --sp $PORT4 -l "./ambrosia_logs/" -set +x - -# AMBROSIA_IMMORTALCOORDINATOR_PORT=2500 runAmbrosiaService.sh ./service_v4.exe 0 $SERVERNAME $PORT1 $PORT2 24 1 20 +if ! [ ${SKIP_REGISTER:+defined} ]; then + set -x + time Ambrosia RegisterInstance -i $CLIENTNAME --rp $PORT1 --sp $PORT2 -l "./ambrosia_logs/" + time Ambrosia RegisterInstance -i $SERVERNAME --rp $PORT3 --sp $PORT4 -l "./ambrosia_logs/" + set +x +fi echo echo "NativeService: Launching Receiver" set -x -AMBROSIA_INSTANCE_NAME=$SERVERNAME AMBROSIA_IMMORTALCOORDINATOR_PORT=1500 \ +AMBROSIA_INSTANCE_NAME=$SERVERNAME AMBROSIA_IMMORTALCOORDINATOR_PORT=$((1600 + PORTOFFSET)) \ COORDTAG=CoordRecv AMBROSIA_IMMORTALCOORDINATOR_LOG=./recvr.log \ - runAmbrosiaService.sh ./service_v4.exe 1 $CLIENTNAME $PORT3 $PORT4 24 1 20 & + runAmbrosiaService.sh ./service.exe 1 $CLIENTNAME $PORT3 $PORT4 24 1 20 & pid_server=$! set +x @@ -56,9 +69,9 @@ fi echo echo "NativeService: Launching Sender now:" set -x -AMBROSIA_INSTANCE_NAME=$CLIENTNAME AMBROSIA_IMMORTALCOORDINATOR_PORT=2500 \ +AMBROSIA_INSTANCE_NAME=$CLIENTNAME AMBROSIA_IMMORTALCOORDINATOR_PORT=$((1601 + PORTOFFSET)) \ COORDTAG=CoordSend AMBROSIA_IMMORTALCOORDINATOR_LOG=./sender.log \ - runAmbrosiaService.sh ./service_v4.exe 0 $SERVERNAME $PORT1 $PORT2 24 1 20 + runAmbrosiaService.sh ./service.exe 0 $SERVERNAME $PORT1 $PORT2 24 1 20 set +x echo diff --git a/InternalImmortals/NativeService/service.c b/InternalImmortals/NativeService/service.c index 5838001..af9e556 100644 --- a/InternalImmortals/NativeService/service.c +++ b/InternalImmortals/NativeService/service.c @@ -37,8 +37,9 @@ // TODO: remove internal dependency: #include "ambrosia/internal/spsc_rring.h" #include "ambrosia/client.h" -#include "ambrosia/internal/bits.h" +// Extra utilities (print_hex_bytes, socket_send_all): +#include "ambrosia/internal/bits.h" // Library-level global variables: // -------------------------------------------------- @@ -100,21 +101,6 @@ int destLen; // Initialized below.. // General helper functions // -------------------------------------------------------------------------------- -#ifdef AMBCLIENT_DEBUG -// DUPLICATED with ambrosia_client. -void print_hex_bytes(FILE* fd, char* ptr, int len) { - const int limit = 100; // Only print this many: - fprintf(fd,"0x"); - int j; - for (j=0; j < len && j < limit; j++) { - fprintf(fd,"%02hhx", (unsigned char)ptr[j]); - if (j % 2 == 1) - fprintf(fd," "); - } - if (j/dev/null || true - kill -TERM "$tail_pid" 2>/dev/null || true - kill -TERM "$app_pid" 2>/dev/null || true +_normal_cleanup() { + kill -TERM "$coord_pid" 2>/dev/null || true + kill -TERM "$tail_pid" 2>/dev/null || true + kill -TERM "$app_pid" 2>/dev/null || true + rm -f "$keep_monitoring" 2>/dev/null || true +} + +_unexpected_cleanup() { + trap '' EXIT # some shells will call EXIT after the INT handler + echo "$0: Exiting script abnormally! Cleaning up. ($1)" + _normal_cleanup echo "$0: Done with cleanup." } -trap _cleanup EXIT -# subsumed by exit? TERM INT HUP -# what about QUIT? - +trap _normal_cleanup EXIT +trap _unexpected_cleanup TERM INT QUIT function tag_stdin() { local MSG=$1