Update wire protocol doc (still unfinished) + tweak how runAmbrosia script traps errors

This commit is contained in:
Ryan Newton 2018-12-10 19:05:36 -08:00
Родитель 638d70ce67
Коммит c2dfc69d37
10 изменённых файлов: 139 добавлений и 133 удалений

Просмотреть файл

@ -3,36 +3,53 @@ Client Protocol for AMBROSIA network participants
================================================= =================================================
This document covers how a network endpoint should communicate with This document covers how a network endpoint should communicate with
the AMBROSIA reliability coordinator assigned to it (typically located the AMBROSIA reliability coordinator assigned to it. The coordinator
within the same physical machine). is located within the same physical machine/container and assumed to
survive or fail with the application process.
Indeed, the coordinator could have been running inside the same
process as the application, but it is designed to instead run in a
separate process (and communicate via TCP/IP over a local socket) to
be more language-agnostic.
Overview and Terminology Overview and Terminology
------------------------ ------------------------
FINISHME Below we use the following terminology:
* Commit ID * Committer ID - an arbitrary (32 bit) identifier for a communication
* Sequence ID endpoint (a service) in the network of running "immortals".
* Sequence ID - the (monotonically increasing) number of a log entry. Note that
each logical immortal has its own log
* "Async/await" RPC - a classic notion of a *future*. These RPCs return a value back to the caller. Because AMBROSIA ensures reliability, they are semantically identical to function calls, without introducing new failure modes such as timeouts or disconnections. * "Async/await" RPCs - are *futures*; they return a value back to the
caller. Because AMBROSIA ensures reliability, they are semantically
* "Fire and Forget" RPC - launch a remote computation, but provide no information back to the caller. Note that even an async/await identical to function calls, without introducing new failure modes such as
RPC with "void" return value exposes more than this, because the caller can ascertain when the remote RPC has been completely processed. timeouts or disconnections.
* "Fire and Forget" RPCs - launch a remote computation, but provide no
information back to the caller. Note that even an async/await RPC with
"void" return value indicates more to the caller (namely, that the remote
computation has completed).
Required Helper Functions Required Helper Functions
------------------------- -------------------------
FINISHME: In order to build the binary message formats described below, we assume that the
new client software can access TCP sockets and additionally implements the
following serialized datatypes.
Assumes TCP + * ZigZagInt - a zig-zag encoded variable size 32 bit signed integer
* ZigZagLong - a zig-zag encoded variable size 64 bit signed integer
* IntFixed - a 32 bit little endian number
* LongFixed - a 64 bit little endian number
* WriteZigZagInt * CheckSum - FINISHME
* WriteFixedInt
* WriteZigZagLong These variable-length integers are in the same format used by, e.g.,
* WriteFixedLong [Protobufs](https://developers.google.com/protocol-buffers/docs/encoding).
* CheckSum
Message Formats Message Formats
--------------- ---------------
@ -43,7 +60,7 @@ Message Formats
All information received from the reliability coordinator is in the form of a sequence of log records. All information received from the reliability coordinator is in the form of a sequence of log records.
Each log record has a 24 byte header, followed by the actual record contents. The header is as follows: Each log record has a 24 byte header, followed by the actual record contents. The header is as follows:
* Bytes [0-3]: The commit ID for the service, this should be constant for all records for the lifetime of the service, format IntFixed. * Bytes [0-3]: The committer ID for the service, this should be constant for all records for the lifetime of the service, format IntFixed.
* Bytes [4-7]: The size of the whole log record, in bytes, including the header. The format is IntFixed * Bytes [4-7]: The size of the whole log record, in bytes, including the header. The format is IntFixed
* Bytes [8-15]: The check bytes to check the integrity of the log record. The format is LongFixed. * Bytes [8-15]: The check bytes to check the integrity of the log record. The format is LongFixed.
* Bytes [16-23]: The log record sequence ID. Excluding records labeled with sequence ID “-1”, these should be in order. The format is LongFixed * Bytes [16-23]: The log record sequence ID. Excluding records labeled with sequence ID “-1”, these should be in order. The format is LongFixed
@ -58,8 +75,8 @@ The rest of the record is a sequence of messages, packed tightly, each with the
All information sent to the reliability coordinator is in the form of a sequence of messages with the format specified above. All information sent to the reliability coordinator is in the form of a sequence of messages with the format specified above.
Message types and associated data which may be sent to or received by services: Message types and associated data which may be sent to or received by services:
* 14 - TrimTo (RRN: INTERNAL!??!) * 14 - TrimTo (FINISHME - INTERNAL!??!)
* 13 - CountReplayableRPCBatchByte (RRN: INTERNAL!??!) * 13 - CountReplayableRPCBatchByte (FINISHME - INTERNAL!??!)
* 12 – `UpgradeService` (Received): No data * 12 – `UpgradeService` (Received): No data
@ -69,7 +86,12 @@ Message types and associated data which may be sent to or received by services:
* 9 – `InitialMessage` (Sent/Received): Data is a complete (incoming rpc) message which is given back to the service as the very first RPC message it ever receives. Used to bootstrap service start behavior. * 9 – `InitialMessage` (Sent/Received): Data is a complete (incoming rpc) message which is given back to the service as the very first RPC message it ever receives. Used to bootstrap service start behavior.
* 8 – `Checkpoint` (Sent/Received): Data are the bytes corresponding to the serialized state of the service. * 8 – `Checkpoint` (Sent/Received): The payload is a single 64 bit number.
That payload in turn is the size in bytes of a checkpoint itself, which is a
binary blob that follows this message immediately (no additional header).
The reason that checkpoints are not sent in the message payload directly is
so that they can have a 64-bit instead of 32-bit length, in order to support
large checkpoints.
* 5 – `RPCBatch` (Sent/Received): Data are a count of the number of RPC messages in the batch, followed by the corresponding number of RPC messages. Note that the count is in variable sized WriteInt format * 5 – `RPCBatch` (Sent/Received): Data are a count of the number of RPC messages in the batch, followed by the corresponding number of RPC messages. Note that the count is in variable sized WriteInt format
@ -135,6 +157,7 @@ If performing an upgrade what-if test:
When a TakeCheckpoint message is received, no further messages may be processed until the state is serialized and sent in a checkpoint message. Note that the serialized state must include any unsent output messages which resulted from previous incoming calls. Those serialized unsent messages must follow the checkpoint message. When a TakeCheckpoint message is received, no further messages may be processed until the state is serialized and sent in a checkpoint message. Note that the serialized state must include any unsent output messages which resulted from previous incoming calls. Those serialized unsent messages must follow the checkpoint message.
(RRN: What are the rules for SENDING!!) ### Attach-before-send protocol
FINISHME...
(RRN: What is the ATTACH protocol??)

Просмотреть файл

@ -1,9 +1,12 @@
# Put your -D variables here, e.g. -DDEBUG # Put your -D variables here:
DEFINES= -DIPV4 DEFINES ?=
EXTRA_DEFINES = -DIPV4
# ^ TODO build everything twice for IPV6 vs IPV4. # ^ TODO build everything twice for IPV6 vs IPV4.
# TODOTODO fix it so that one compile can work for both. # TODOTODO fix it so that one compile can work for both.
ALL_DEFINES = $(DEFINES) $(EXTRA_DEFINES)
GNULIBS= -lpthread GNULIBS= -lpthread
GNUOPTS= -pthread -O3 GNUOPTS= -pthread -O3
@ -16,13 +19,16 @@ OBJS1= $(patsubst src/%.c,bin/static/%.o, $(SRCS) )
OBJS2= $(patsubst src/%.c,bin/shared/%.o, $(SRCS) ) OBJS2= $(patsubst src/%.c,bin/shared/%.o, $(SRCS) )
COMP= gcc $(DEFINES) -I include/ $(GNUOPTS) COMP= gcc $(ALL_DEFINES) -I include/ $(GNUOPTS)
LINK= gcc LINK= gcc
LIBNAME=libambrosia LIBNAME=libambrosia
all: bin/$(LIBNAME).a bin/$(LIBNAME).so all: bin/$(LIBNAME).a bin/$(LIBNAME).so
debug:
$(MAKE) DEFINES="-DAMBCLIENT_DEBUG" clean publish
bin/$(LIBNAME).a: $(OBJS1) bin/$(LIBNAME).a: $(OBJS1)
ar rcs $@ $(OBJS1) ar rcs $@ $(OBJS1)

Просмотреть файл

@ -125,8 +125,18 @@ int zigzag_int_size(int32_t value);
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
#ifdef AMBCLIENT_DEBUG #ifdef AMBCLIENT_DEBUG
extern volatile int64_t amb_debug_lock;
extern volatile int64_t debug_lock; // Helper used only below
static inline void amb_sleep_seconds(double n) {
#ifdef _WIN32
Sleep((int)(n * 1000));
#else
int64_t nanos = (int64_t)(10e9 * n);
const struct timespec ts = {0, nanos};
nanosleep(&ts, NULL);
#endif
}
static inline void amb_debug_log(const char *format, ...) static inline void amb_debug_log(const char *format, ...)
{ {
@ -134,14 +144,14 @@ static inline void amb_debug_log(const char *format, ...)
va_start(args, format); va_start(args, format);
amb_sleep_seconds((double)(rand()%1000) * 0.00001); // .01 - 10 ms amb_sleep_seconds((double)(rand()%1000) * 0.00001); // .01 - 10 ms
#ifdef _WIN32 #ifdef _WIN32
while ( 1 == InterlockedCompareExchange64(&debug_lock, 1, 0) ) { } while ( 1 == InterlockedCompareExchange64(&amb_debug_lock, 1, 0) ) { }
#else #else
while ( 1 == __sync_val_compare_and_swap(&debug_lock, 1, 0) ) { } while ( 1 == __sync_val_compare_and_swap(&amb_debug_lock, 1, 0) ) { }
#endif #endif
fprintf(amb_dbg_fd," [AMBCLIENT] "); fprintf(amb_dbg_fd," [AMBCLIENT] ");
vfprintf(amb_dbg_fd,format, args); vfprintf(amb_dbg_fd,format, args);
fflush(amb_dbg_fd); fflush(amb_dbg_fd);
debug_lock = 0; amb_debug_lock = 0;
va_end(args); va_end(args);
} }
#else #else

Просмотреть файл

@ -2,7 +2,8 @@
// Internal helper: try repeatedly on a socket until all bytes are sent. // Internal helper: try repeatedly on a socket until all bytes are sent.
static inline void socket_send_all(int sock, const void* buf, size_t len, int flags) { static inline
void socket_send_all(int sock, const void* buf, size_t len, int flags) {
char* cur = (char*)buf; char* cur = (char*)buf;
int remaining = len; int remaining = len;
while (remaining > 0) { while (remaining > 0) {
@ -21,3 +22,18 @@ static inline void socket_send_all(int sock, const void* buf, size_t len, int fl
#endif #endif
} }
} }
static inline
void print_hex_bytes(FILE* fd, char* ptr, int len) {
const int limit = 100; // Only print this many:
fprintf(fd,"0x");
int j;
for (j=0; j < len && j < limit; j++) {
fprintf(fd,"%02hhx", (unsigned char)ptr[j]);
if (j % 2 == 1)
fprintf(fd," ");
}
if (j<len) fprintf(fd,"...");
}

Просмотреть файл

@ -50,7 +50,7 @@ const char* coordinator_host = "::1";
#endif #endif
#ifdef AMBCLIENT_DEBUG #ifdef AMBCLIENT_DEBUG
// volatile int64_t debug_lock = 0; volatile int64_t amb_debug_lock = 0;
#endif #endif
@ -71,18 +71,6 @@ char* amb_get_error_string() {
return strerror(errno); return strerror(errno);
#endif #endif
} }
void print_hex_bytes(FILE* fd, char* ptr, int len) {
const int limit = 100; // Only print this many:
fprintf(fd,"0x");
int j;
for (j=0; j < len && j < limit; j++) {
fprintf(fd,"%02hhx", (unsigned char)ptr[j]);
if (j % 2 == 1)
fprintf(fd," ");
}
if (j<len) fprintf(fd,"...");
}
void print_decimal_bytes(char* ptr, int len) { void print_decimal_bytes(char* ptr, int len) {
const int limit = 100; // Only print this many: const int limit = 100; // Only print this many:
@ -291,29 +279,6 @@ void attach_if_needed(char* dest, int destLen) {
} }
} }
/*
// INEFFICIENT version that makes an extra copy:
void send_message(char* buf, int len) {
attach_if_needed(destName, ??); // Hard-coded global dest name.
// FIXME - LAME COPY to PREPEND header bytes!
char* sendbuf = (char*)malloc(1 + 5 + destLen + 1 + 5 + 1 + len);
char* newpos = amb_write_outgoing_rpc(sendbuf, destName, destLen, 0, TPUT_MSG_ID, 1, buf, len);
// FIXME: one system call per message!
socket_send_all(g_to_immortal_coord, sendbuf, newpos-sendbuf, 0);
#ifdef AMBCLIENT_DEBUG
amb_debug_log("Sent %d byte message up to coordinator, argsLen %d...\n Hex: ", newpos-sendbuf, len);
print_hex_bytes(amb_dbg_fd, sendbuf, newpos-sendbuf);
fprintf(amb_dbg_fd,"\n Decimal: ");
print_decimal_bytes(sendbuf, newpos-sendbuf); printf("\n");
#endif
free(sendbuf);
}
*/
// Begin connect_sockets: // Begin connect_sockets:
// -------------------------------------------------- // --------------------------------------------------

Просмотреть файл

@ -20,7 +20,7 @@ LIBS= -L $(AMBROSIA_BINDIR) -l:libambrosia.a -lpthread
LINK= gcc LINK= gcc
all: service_v4.exe service_v6.exe all: service_v4.exe service_v6.exe
dbg: service_dbg_v4.exe debug: service_dbg_v4.exe
service_temp.exe: service.c service_temp.exe: service.c
$(COMP) service.c -o service.o $(COMP) service.c -o service.o
@ -29,10 +29,12 @@ service_temp.exe: service.c
service_v4.exe: $(HEADERS) $(SRCS) service.c service_v4.exe: $(HEADERS) $(SRCS) service.c
$(MAKE) DEFINES="-DIPV4" partclean service_temp.exe $(MAKE) DEFINES="-DIPV4" partclean service_temp.exe
mv service_temp.exe $@ mv service_temp.exe $@
ln -sf $@ service.exe
service_dbg_v4.exe: $(HEADERS) $(SRCS) service.c service_dbg_v4.exe: $(HEADERS) $(SRCS) service.c
$(MAKE) DEFINES="-DAMBCLIENT_DEBUG -DIPV4" partclean service_temp.exe $(MAKE) DEFINES="-DAMBCLIENT_DEBUG -DIPV4" partclean service_temp.exe
mv service_temp.exe $@ mv service_temp.exe $@
ln -sf $@ service.exe
service_v6.exe: $(HEADERS) $(SRCS) service.c service_v6.exe: $(HEADERS) $(SRCS) service.c
$(MAKE) DEFINES="-DIPV6" partclean service_temp.exe $(MAKE) DEFINES="-DIPV6" partclean service_temp.exe
@ -42,7 +44,7 @@ partclean:
rm -f service_temp.exe rm -f service_temp.exe
clean: clean:
rm -f service_winsockv4.exe service_winsockv6.exe service_v4.exe service_v6.exe rm -f service_winsockv4.exe service_winsockv6.exe service_v4.exe service_v6.exe service_dbg_v4.exe serice_temp.exe
rm -f \#* .\#* *~ rm -f \#* .\#* *~
.PHONY: lin clean partclean .PHONY: lin clean partclean

Просмотреть файл

@ -1,25 +0,0 @@
#!/bin/bash
set -xeuo pipefail
# Run both processes together in one script. This is intended to be
# used within the Docker container built by Dockerfile.
# TODO: set up your credentials before calling this script:
# export AZURE_STORAGE_CONN_STRING="..."
echo "Launching CRA worker"
# strace dotnet "/ambrosia/ImmortalCoordinator/bin/Release/netcoreapp2.0/linux-x64/ImmortalCoordinator.dll" rrnjob 1500 2> /tmp/coord.log &
dotnet "/ambrosia/ImmortalCoordinator/bin/Release/netcoreapp2.0/linux-x64/ImmortalCoordinator.dll" rrnjob 1500 &
PID1=$!
sleep 1
# dotnet "/ambrosia/ImmortalCoordinator/bin/Release/netcoreapp2.0/linux-x64/ImmortalCoordinator.dll" rrnserver 2500 &
# PID2=$!
# Lame, racy:
sleep 7
echo "Proceeding on the assumption you saw \"Ready...\" above this line..."
./service_v4.exe
wait $PID1
# wait $PID2

Просмотреть файл

@ -11,10 +11,17 @@ set -euo pipefail
# source `dirname $0`/default_var_settings.sh # source `dirname $0`/default_var_settings.sh
PORT1=49001
PORT2=49002 # A number to add to all ports to avoid colliding or reusing recently
PORT3=49003 # used ports.
PORT4=49004 if ! [ ${PORTOFFSET:+defined} ]; then
PORTOFFSET=0
fi
PORT1=$((49001 + PORTOFFSET))
PORT2=$((49002 + PORTOFFSET))
PORT3=$((49003 + PORTOFFSET))
PORT4=$((49004 + PORTOFFSET))
INSTANCE_PREFIX="" INSTANCE_PREFIX=""
if [ $# -ne 0 ]; if [ $# -ne 0 ];
@ -23,26 +30,32 @@ fi
CLIENTNAME=${INSTANCE_PREFIX}nativeSend CLIENTNAME=${INSTANCE_PREFIX}nativeSend
SERVERNAME=${INSTANCE_PREFIX}nativeRecv SERVERNAME=${INSTANCE_PREFIX}nativeRecv
_cleanup() {
kill -TERM "$pid_server" 2>/dev/null || true
}
trap _cleanup TERM INT QUIT HUP
echo echo
echo "--------------------------------------------------------------------------------" echo "--------------------------------------------------------------------------------"
echo "Running NativeService with 4 processes all in this machine/container" echo "Running NativeService with 4 processes all in this machine/container"
echo " Instance: names $CLIENTNAME, $SERVERNAME" echo " Instance: names $CLIENTNAME, $SERVERNAME"
echo "--------------------------------------------------------------------------------" echo "--------------------------------------------------------------------------------"
echo echo
set -x if ! [ ${SKIP_REGISTER:+defined} ]; then
time Ambrosia RegisterInstance -i $CLIENTNAME --rp $PORT1 --sp $PORT2 -l "./ambrosia_logs/" set -x
time Ambrosia RegisterInstance -i $SERVERNAME --rp $PORT3 --sp $PORT4 -l "./ambrosia_logs/" time Ambrosia RegisterInstance -i $CLIENTNAME --rp $PORT1 --sp $PORT2 -l "./ambrosia_logs/"
set +x time Ambrosia RegisterInstance -i $SERVERNAME --rp $PORT3 --sp $PORT4 -l "./ambrosia_logs/"
set +x
# AMBROSIA_IMMORTALCOORDINATOR_PORT=2500 runAmbrosiaService.sh ./service_v4.exe 0 $SERVERNAME $PORT1 $PORT2 24 1 20 fi
echo echo
echo "NativeService: Launching Receiver" echo "NativeService: Launching Receiver"
set -x set -x
AMBROSIA_INSTANCE_NAME=$SERVERNAME AMBROSIA_IMMORTALCOORDINATOR_PORT=1500 \ AMBROSIA_INSTANCE_NAME=$SERVERNAME AMBROSIA_IMMORTALCOORDINATOR_PORT=$((1600 + PORTOFFSET)) \
COORDTAG=CoordRecv AMBROSIA_IMMORTALCOORDINATOR_LOG=./recvr.log \ COORDTAG=CoordRecv AMBROSIA_IMMORTALCOORDINATOR_LOG=./recvr.log \
runAmbrosiaService.sh ./service_v4.exe 1 $CLIENTNAME $PORT3 $PORT4 24 1 20 & runAmbrosiaService.sh ./service.exe 1 $CLIENTNAME $PORT3 $PORT4 24 1 20 &
pid_server=$! pid_server=$!
set +x set +x
@ -56,9 +69,9 @@ fi
echo echo
echo "NativeService: Launching Sender now:" echo "NativeService: Launching Sender now:"
set -x set -x
AMBROSIA_INSTANCE_NAME=$CLIENTNAME AMBROSIA_IMMORTALCOORDINATOR_PORT=2500 \ AMBROSIA_INSTANCE_NAME=$CLIENTNAME AMBROSIA_IMMORTALCOORDINATOR_PORT=$((1601 + PORTOFFSET)) \
COORDTAG=CoordSend AMBROSIA_IMMORTALCOORDINATOR_LOG=./sender.log \ COORDTAG=CoordSend AMBROSIA_IMMORTALCOORDINATOR_LOG=./sender.log \
runAmbrosiaService.sh ./service_v4.exe 0 $SERVERNAME $PORT1 $PORT2 24 1 20 runAmbrosiaService.sh ./service.exe 0 $SERVERNAME $PORT1 $PORT2 24 1 20
set +x set +x
echo echo

Просмотреть файл

@ -37,8 +37,9 @@
// TODO: remove internal dependency: // TODO: remove internal dependency:
#include "ambrosia/internal/spsc_rring.h" #include "ambrosia/internal/spsc_rring.h"
#include "ambrosia/client.h" #include "ambrosia/client.h"
#include "ambrosia/internal/bits.h"
// Extra utilities (print_hex_bytes, socket_send_all):
#include "ambrosia/internal/bits.h"
// Library-level global variables: // Library-level global variables:
// -------------------------------------------------- // --------------------------------------------------
@ -100,21 +101,6 @@ int destLen; // Initialized below..
// General helper functions // General helper functions
// -------------------------------------------------------------------------------- // --------------------------------------------------------------------------------
#ifdef AMBCLIENT_DEBUG
// DUPLICATED with ambrosia_client.
void print_hex_bytes(FILE* fd, char* ptr, int len) {
const int limit = 100; // Only print this many:
fprintf(fd,"0x");
int j;
for (j=0; j < len && j < limit; j++) {
fprintf(fd,"%02hhx", (unsigned char)ptr[j]);
if (j % 2 == 1)
fprintf(fd," ");
}
if (j<len) fprintf(fd,"...");
}
#endif
// Hacky busy-wait by thread-yielding for now: // Hacky busy-wait by thread-yielding for now:
static inline void yield_thread() { static inline void yield_thread() {
#ifdef _WIN32 #ifdef _WIN32
@ -587,7 +573,7 @@ void* network_progress_thread( void* lpParam )
} else if ( spin_tries == 0) { } else if ( spin_tries == 0) {
spin_tries = hot_spin_amount; spin_tries = hot_spin_amount;
// amb_debug_log(" network thread: yielding to wait...\n"); // amb_debug_log(" network thread: yielding to wait...\n");
#ifdef AMBCLIENT_DEBUG #ifdef AMBCLIENT_DEBUG
sleep_seconds(0.5); sleep_seconds(0.5);
sleep_seconds(0.05); sleep_seconds(0.05);
#endif #endif
@ -681,6 +667,11 @@ int main(int argc, char** argv)
fprintf(stderr, "\nERROR: Bytes-per-round should be bigger than max message size.\n"); fprintf(stderr, "\nERROR: Bytes-per-round should be bigger than max message size.\n");
abort(); abort();
} }
if ( g_is_sender || destLen == 0)
printf("We are running the SENDER\n");
else
printf("We are running the RECEIVER\n");
printf("Connecting to my coordinator on ports: %d (up), %d (down)\n", upport, downport); printf("Connecting to my coordinator on ports: %d (up), %d (down)\n", upport, downport);
printf("The 'up' port we connect, and the 'down' one the coordinator connects to us.\n"); printf("The 'up' port we connect, and the 'down' one the coordinator connects to us.\n");

Просмотреть файл

@ -83,19 +83,24 @@ fi
tail_pid="" tail_pid=""
coord_pid="" coord_pid=""
app_pid="" app_pid=""
keep_monitoring=""
_cleanup() { _normal_cleanup() {
echo "$0: Exiting script (or caught signal)! Cleaning up." kill -TERM "$coord_pid" 2>/dev/null || true
kill -TERM "$coord_pid" 2>/dev/null || true kill -TERM "$tail_pid" 2>/dev/null || true
kill -TERM "$tail_pid" 2>/dev/null || true kill -TERM "$app_pid" 2>/dev/null || true
kill -TERM "$app_pid" 2>/dev/null || true rm -f "$keep_monitoring" 2>/dev/null || true
}
_unexpected_cleanup() {
trap '' EXIT # some shells will call EXIT after the INT handler
echo "$0: Exiting script abnormally! Cleaning up. ($1)"
_normal_cleanup
echo "$0: Done with cleanup." echo "$0: Done with cleanup."
} }
trap _cleanup EXIT trap _normal_cleanup EXIT
# subsumed by exit? TERM INT HUP trap _unexpected_cleanup TERM INT QUIT
# what about QUIT?
function tag_stdin() { function tag_stdin() {
local MSG=$1 local MSG=$1