From 8ffd6679e66f1730cfed71490bbf2eadc1a2e533 Mon Sep 17 00:00:00 2001 From: Mark Santaniello Date: Tue, 17 Feb 2015 09:05:23 -0800 Subject: [PATCH] Initial checkin. --- barrier.h | 71 ++++ histogram.h | 275 +++++++++++++++ incast.cpp | 961 ++++++++++++++++++++++++++++++++++++++++++++++++++++ incast.h | 152 +++++++++ make.cmd | 1 + tcpstats.h | 216 ++++++++++++ utils.h | 170 ++++++++++ 7 files changed, 1846 insertions(+) create mode 100644 barrier.h create mode 100644 histogram.h create mode 100644 incast.cpp create mode 100644 incast.h create mode 100644 make.cmd create mode 100644 tcpstats.h create mode 100644 utils.h diff --git a/barrier.h b/barrier.h new file mode 100644 index 0000000..543b12e --- /dev/null +++ b/barrier.h @@ -0,0 +1,71 @@ +// Incast +// +// Copyright (c) Microsoft Corporation +// +// All rights reserved. +// +// MIT License +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +#pragma once + +#ifndef __BARRIER_H__ +#define __BARRIER_H__ + +#include + +struct barrier +{ + barrier( long count ) + : threshold_(count), count_(count), generation_(0) + { + InitializeCriticalSection(&cs_); + InitializeConditionVariable(&cv_); + } + + CRITICAL_SECTION cs_; + CONDITION_VARIABLE cv_; + long threshold_; + long count_; + long generation_; + + void wait() + { + EnterCriticalSection(&cs_); + + long gen = generation_; + + if(--count_ == 0) + { + ++generation_; + count_ = threshold_; + WakeAllConditionVariable(&cv_); + } + else + { + while( gen == generation_ ) + SleepConditionVariableCS(&cv_,&cs_,INFINITE); + } + + LeaveCriticalSection(&cs_); + } +}; +#endif + diff --git a/histogram.h b/histogram.h new file mode 100644 index 0000000..e8a108e --- /dev/null +++ b/histogram.h @@ -0,0 +1,275 @@ +// Incast +// +// Copyright (c) Microsoft Corporation +// +// All rights reserved. +// +// MIT License +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +#pragma once +#ifndef __HISTOGRAM_H_ +#define __HISTOGRAM_H_ + +#include +#include +#include +#include +#include +#include + +#pragma push_macro("min") +#pragma push_macro("max") +#undef min +#undef max + +template< typename T > +class Histogram +{ + private: + + unsigned samples_; + +#define USE_HASH_TABLE +#ifdef USE_HASH_TABLE + std::unordered_map data_; + + std::map get_sorted_data() const + { + return std::map(data_.begin(), data_.end()); + } +#else + std::map data_; + + std::map get_sorted_data() const + { + return data_; + } +#endif + public: + + Histogram() + : samples_(0) + {} + + void clear() + { + data_.clear(); + samples_ = 0; + } + + void add( T v ) + { + data_[ v ]++; + samples_++; + } + + void merge( const Histogram &other ) + { + for( auto i : other.data_ ) + { + data_[ i.first ] += i.second; + } + + _samples += other._samples; + } + + T get_min() const + { + T min( std::numeric_limits::max() ); + + for( auto i : data_ ) + { + if( i.first < min ) + { + min = i.first; + } + } + + return min; + } + + T get_max() const + { + T max( std::numeric_limits::min() ); + + for( auto i : data_ ) + { + if( i.first > max ) + { + max = i.first; + } + } + + return max; + } + + unsigned get_sample_size() const + { + return samples_; + } + + T get_percentile( double p ) const + { + // ISSUE-REVIEW + // What do the 0th and 100th percentile really mean? + if( (p < 0) || (p > 1) ) + { + throw std::invalid_argument("Percentile must be >= 0 and <= 1"); + } + + const double target = get_sample_size() * p; + + unsigned cur = 0; + for( auto i : get_sorted_data() ) + { + cur += i.second; + if( cur >= target ) + { + return i.first; + } + } + + throw std::runtime_error("Percentile is undefined"); + } + + T get_percentile( int p ) const + { + return get_percentile( static_cast( p ) / 100 ); + } + + T get_median() const + { + return get_percentile( 0.5 ); + } + + + double get_std_dev() const { return get_standard_deviation(); } + double get_avg() const { return get_mean(); } + + double get_mean() const + { + double sum(0); + unsigned samples = get_sample_size(); + + for( auto i : data_ ) + { + double bucket_val = + static_cast(i.first) * i.second / samples; + + if (sum + bucket_val < 0) + { + throw std::overflow_error("while trying to accumulate sum"); + } + + sum += bucket_val; + } + + return sum; + } + + double get_standard_deviation() const + { + T mean(get_arithmetic_mean()); + T ssd(0); + + for( auto i : data_ ) + { + double dev = static_cast(i.first) - mean; + double sqdev = dev*dev; + ssd += i.second * sqdev; + } + + return sqrt( ssd / get_sample_size() ); + } + + std::string get_histogram_csv( const unsigned BINS ) const + { + return get_histogram_csv( BINS, get_min(), get_max() ); + } + + std::string get_histogram_csv( const unsigned BINS, const T LOW, const T HIGH ) const + { + // ISSUE-REVIEW + // Currently bins are defined as strictly less-than + // their upper limit, with the exception of the last + // bin. Otherwise where would I put the max value? + const double BIN_SIZE = (HIGH - LOW) / BINS; + double limit = static_cast(LOW); + + std::ostringstream os; + os.precision(std::numeric_limits::digits10); + + std::map sorted_data = get_sorted_data(); + + auto pos = sorted_data.begin(); + + unsigned cumulative = 0; + + for( unsigned bin = 1; bin <= BINS; ++bin ) + { + unsigned count = 0; + limit += BIN_SIZE; + + while( pos != sorted_data.end() && + ( pos->first < limit || bin == BINS ) ) + { + count += pos->second; + ++pos; + } + + cumulative += count; + + os << limit << "," << count << "," << cumulative << std::endl; + } + + return os.str(); + } + + std::string get_raw_csv() const + { + std::ostringstream os; + os.precision(std::numeric_limits::digits10); + + for( auto i : get_sorted_data() ) + { + os << i.first << "," << i.second << std::endl; + } + + return os.str(); + } + + std::string get_raw() const + { + std::ostringstream os; + + for( auto i : get_sorted_data() ) + { + os << i.second << " " << i.first << std::endl; + } + + return os.str(); + } +}; + +#pragma pop_macro("min") +#pragma pop_macro("max") + +#endif diff --git a/incast.cpp b/incast.cpp new file mode 100644 index 0000000..83d0936 --- /dev/null +++ b/incast.cpp @@ -0,0 +1,961 @@ +// Incast +// +// Copyright (c) Microsoft Corporation +// +// All rights reserved. +// +// MIT License +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +#include "incast.h" +#include "utils.h" +#include "tcpstats.h" +#include "histogram.h" + +using namespace std; + +unsigned int __stdcall serverThread( void *p ) +{ + int client_num = (int) p; + SOCKET s = clientSockets[client_num]; + int bytes; + + if( (gtp.delay > 0) && (gtp.delay_method == RANDOM_JITTER) ) + { + // each thread needs a unique seed + unsigned junk = 0xBFBFBFB; + srand( (client_num+1) * junk ); + } + + // send global test parameters to client + if ((bytes = send(s, (char*) >p, sizeof(GlobalTestParameters), 0)) == SOCKET_ERROR) + { + fprintf(stderr, "send() global test parameters failed: %d\n", WSAGetLastError()); + exit(-1); + } + HARD_ASSERT(bytes == sizeof(GlobalTestParameters)); + + // send client-specific test parameters to client + ClientSpecificTestParameters cstp; + cstp.client_num = client_num; + + if ((bytes = send(s, (char*) &cstp, sizeof(ClientSpecificTestParameters), 0)) == SOCKET_ERROR) + { + fprintf(stderr, "send() client-specific test parameters failed: %d\n", WSAGetLastError()); + exit(-1); + } + HARD_ASSERT(bytes == sizeof(ClientSpecificTestParameters)); + + unique_ptr fobuf( new char[gtp.fo_msg_size] ); + unique_ptr fibuf( new char[gtp.fi_msg_size] ); + + if( client_num == 0 ) + { + printf( "\nWarming up..." ); + } + + // warm-up + for( int i = 0; i < WARMUP_ITERS; ++i ) + { + // synchronize with the other serverThreads + pb->wait(); + + // send the fan-out + if ((bytes = send(s, fobuf.get(), gtp.fo_msg_size, 0)) == SOCKET_ERROR) + { + fprintf(stderr, "send() fan-out failed: %d\n", WSAGetLastError()); + exit(-1); + } + HARD_ASSERT(bytes == gtp.fo_msg_size); + + // expect the fan-in + if ((bytes = recv(s, fibuf.get(), gtp.fi_msg_size, MSG_WAITALL)) == SOCKET_ERROR) + { + fprintf(stderr, "recv() fan-in failed: %d\n", WSAGetLastError()); + exit(-1); + } + HARD_ASSERT(bytes == gtp.fi_msg_size); + } + + if( client_num == 0 ) + { + printf( "done!\nTesting..." ); + GetTcpStatistics(&tcpStatsBefore); + } + + Measurement m; + + __int64 qpcStartTime = qpc(); + + for( int i = 0; i < gtp.iters; ++i ) + { + // synchronize with the other serverThreads + pb->wait(); + + m.start = qpc(); + + if( gtp.delay > 0 ) + { + double target_delay = 0; + + if( gtp.delay_method == RANDOM_JITTER ) + { + target_delay = gtp.delay * ((double) rand()) / RAND_MAX; + } + else if( gtp.delay_method == UNIFORM_SCHED ) + { + target_delay = gtp.delay * ((double) client_num / gtp.clients); + } + else + { + HARD_ASSERT( UNREACHED ); + } + + m.actual_delay = mySleep( target_delay ); + } + + // send the fan-out + if ((bytes = send(s, fobuf.get(), gtp.fo_msg_size, 0)) == SOCKET_ERROR) + { + fprintf(stderr, "send() fan-out failed: %d\n", WSAGetLastError()); + exit(-1); + } + HARD_ASSERT(bytes == gtp.fo_msg_size); + + // expect the fan-in + if ((bytes = recv(s, fibuf.get(), gtp.fi_msg_size, MSG_WAITALL)) == SOCKET_ERROR) + { + fprintf(stderr, "recv() fan-in failed: %d\n", WSAGetLastError()); + exit(-1); + } + HARD_ASSERT(bytes == gtp.fi_msg_size); + + m.stop = qpc(); + + if( gtp.rate_limited ) + { + double expectedElapsedSeconds = ((double) i) / gtp.target_rate; + double expectedElapsedQpcTicks = expectedElapsedSeconds * freq; + + double expectedQpc = qpcStartTime + expectedElapsedQpcTicks; + + while( qpc() < expectedQpc ) + { + // slow down + + // ISSUE-REVIEW + // Sleep is 1 ms min. Is a spin wait warranted? + Sleep(0); + } + } + + clientResults[client_num].measurements.push_back(m); + } + + // expect client results + ClientResultData * crd = &clientResults[client_num].crd; + if ((bytes = recv(s, (char*) crd, sizeof(ClientResultData), MSG_WAITALL)) == SOCKET_ERROR) + { + fprintf(stderr, "recv() client results failed: %d\n", WSAGetLastError()); + exit(-1); + } + HARD_ASSERT(bytes == sizeof(ClientResultData)); + + return 0; +} + +void reportGlobalTestParameters() +{ + printf( "\n" ); + printf( "Test parameters:\n" ); + printf( "\tclients: %d\n", gtp.clients ); + printf( "\titerations: %d\n", gtp.iters ); + if( gtp.rate_limited ) + { + printf( "\trate limit: %d\n", gtp.target_rate ); + } + else + { + printf( "\trate limit: none\n" ); + } + printf( "\tfan-out msg bytes: %d\n", gtp.fo_msg_size ); + printf( "\tfan-in msg bytes: %d\n", gtp.fi_msg_size ); + + printf( "\tNagle's algorithm: %s\n", gtp.nagle ? "enabled" : "disabled" ); + + if( gtp.send_buffer >= 0 ) + { + printf( "\tsend buffer size: %d\n", gtp.send_buffer ); + } + else + { + printf( "\tsend buffer size: OS default\n" ); + } + + if( gtp.recv_buffer >= 0 ) + { + printf( "\treceive buffer size: %d\n", gtp.recv_buffer ); + } + else + { + printf( "\treceive buffer size: OS default\n" ); + } + + if( gtp.delay > 0 ) + { + printf( "\tdelay: %d\n", gtp.delay ); + printf( "\tdelay method: %s\n", + gtp.delay_method == RANDOM_JITTER ? "random jitter" : "uniform sched" ); + } + else + { + printf( "\tdelay: none\n" ); + } +} + +void reportLatencyThroughput() +{ + try { + + Histogram<__int64> hist; + __int64 globalFirstStart, globalLastStop; + + int clients = clientResults.size(); + + for( int i = 0; i < gtp.iters; ++i ) + { + __int64 firstStart = numeric_limits<__int64>::max(); + __int64 lastStop = numeric_limits<__int64>::min(); + + for( int c = 0; c < clients; ++c ) + { + Measurements &m = clientResults[c].measurements; + firstStart = min( firstStart, m[i].start ); + lastStop = max( lastStop, m[i].stop ); + } + + HARD_ASSERT(lastStop>firstStart); + + hist.add(lastStop-firstStart); + + if( i == 0 ) globalFirstStart = firstStart; + if( i == gtp.iters-1 ) globalLastStop = lastStop; + } + + // for delay, we also calculate the exclusive latency + // by subtracting the actual per-client, per-iteration + // delay, thus effectively pretending that all the + // tests began at exactly the same time + + Histogram<__int64> exclusive_hist; + +#ifdef REPORT_DELAY + Histogram<__int64> delay_hist; +#endif + + if( gtp.delay ) + { + for( int i = 0; i < gtp.iters; ++i ) + { + for( int c = 0; c < clients; ++c ) + { + Measurements &m = clientResults[c].measurements; +#ifdef REPORT_DELAY + delay_hist.add(m[i].actual_delay); +#endif + m[i].stop -= m[i].actual_delay; + + } + + __int64 firstStart = numeric_limits<__int64>::max(); + __int64 lastStop = numeric_limits<__int64>::min(); + + for( int c = 0; c < clients; ++c ) + { + Measurements &m = clientResults[c].measurements; + + firstStart = min( firstStart, m[i].start ); + lastStop = max( lastStop, m[i].stop ); + } + + HARD_ASSERT(lastStop>firstStart); + + exclusive_hist.add(lastStop-firstStart); + } + } + + if( gtp.histogram ) + { + histfile << freq << endl << endl; + if( gtp.delay ) + { +#ifdef REPORT_DELAY + histfile << "Delay" << endl; + histfile << delay_hist.get_histogram_csv( 10000 ) << endl; +#endif + histfile << "Exclusive" << endl; + histfile << exclusive_hist.get_histogram_csv( 10000 ); + histfile << endl << "Inclusive" << endl; + } + histfile << hist.get_histogram_csv( 10000 ); + histfile.close(); + } + + if( gtp.delay ) + printf( "\nLatency (inclusive):\n" ); + else + printf( "\nLatency:\n" ); + + double lmin = hist.get_min() * 1.0e6 / freq; + printf( "\tminimum usec/iter: %10.3f\n", lmin ); + + double lmax = hist.get_max() * 1.0e6 / freq; + printf( "\tmaximum usec/iter: %10.3f\n", lmax ); + + double avg = hist.get_avg() * 1.0e6 / freq; + printf( "\taverage usec/iter: %10.3f\n", avg ); + + double median = hist.get_median() * 1.0e6 / freq; + printf( "\tmedian usec/iter: %10.3f\n", median ); + + double p95 = hist.get_percentile(0.95) * 1.0e6 / freq; + printf( "\t95th %%ile usec/iter: %10.3f\n", p95 ); + + double p99 = hist.get_percentile(0.99) * 1.0e6 / freq; + printf( "\t99th %%ile usec/iter: %10.3f\n", p99 ); + + if( gtp.delay ) + { + printf( "\nLatency (exclusive):\n" ); + + double lmin = exclusive_hist.get_min() * 1.0e6 / freq; + printf( "\tminimum usec/iter: %10.3f\n", lmin ); + + double lmax = exclusive_hist.get_max() * 1.0e6 / freq; + printf( "\tmaximum usec/iter: %10.3f\n", lmax ); + + double avg = exclusive_hist.get_avg() * 1.0e6 / freq; + printf( "\taverage usec/iter: %10.3f\n", avg ); + + double median = exclusive_hist.get_median() * 1.0e6 / freq; + printf( "\tmedian usec/iter: %10.3f\n", median ); + + double p95 = exclusive_hist.get_percentile(0.95) * 1.0e6 / freq; + printf( "\t95th %%ile usec/iter: %10.3f\n", p95 ); + + double p99 = exclusive_hist.get_percentile(0.99) * 1.0e6 / freq; + printf( "\t99th %%ile usec/iter: %10.3f\n", p99 ); + + } + +#ifdef REPORT_DELAY + if( gtp.delay ) + { + printf( "\nJitter:\n" ); + + double lmin = delay_hist.get_min() * 1.0e6 / freq; + printf( "\tminimum usec/iter: %10.3f\n", lmin ); + + double lmax = delay_hist.get_max() * 1.0e6 / freq; + printf( "\tmaximum usec/iter: %10.3f\n", lmax ); + + double avg = delay_hist.get_avg() * 1.0e6 / freq; + printf( "\taverage usec/iter: %10.3f\n", avg ); + + double median = delay_hist.get_median() * 1.0e6 / freq; + printf( "\tmedian usec/iter: %10.3f\n", median ); + + double p95 = delay_hist.get_percentile(0.95) * 1.0e6 / freq; + printf( "\t95th %%ile usec/iter: %10.3f\n", p95 ); + + double p99 = delay_hist.get_percentile(0.99) * 1.0e6 / freq; + printf( "\t99th %%ile usec/iter: %10.3f\n", p99 ); + } +#endif + + printf( "\nThroughput:\n" ); + + double totalSeconds = ((double)(globalLastStop-globalFirstStart)) / freq; + double sendMBytes = gtp.fo_msg_size / 1.0e6 * clients * gtp.iters; + double recvMBytes = gtp.fi_msg_size / 1.0e6 * clients * gtp.iters; + double totalMBytes = sendMBytes + recvMBytes; + + double sendMbps = sendMBytes * 8 / totalSeconds; + printf( "\tmbit/sec send: %10.3f\n", sendMbps ); + + double recvMbps = recvMBytes * 8 / totalSeconds; + printf( "\tmbit/sec recv: %10.3f\n", recvMbps ); + + double totalMbps = totalMBytes * 8 / totalSeconds; + printf( "\tmbit/sec tot: %10.3f\n", totalMbps ); + + double ips = gtp.iters / totalSeconds; + printf( "\titer/sec: %10.3f\n", ips ); + + const double FUDGE_FACTOR = 0.95; + if( gtp.rate_limited && (ips < gtp.target_rate * FUDGE_FACTOR ) ) + { + printf( "\nWarning: missed target of %d iterations/sec\n", gtp.target_rate ); + } + + } + catch( exception& e ) + { + fprintf(stderr, "\nException caught: %s\n", e.what()); + } +} + +void serverMain() +{ + printf( "Server mode\n\n" ); + + SOCKET ls; + + if ((ls = socket(PF_INET,SOCK_STREAM,0)) == INVALID_SOCKET) + { + fprintf(stderr, "socket() failed: %d\n", WSAGetLastError()); + exit(-1); + } + + SOCKADDR_IN sin = {0}; + sin.sin_family = AF_INET; + sin.sin_port = htons(PORT); + sin.sin_addr.s_addr = INADDR_ANY; + + if (bind(ls, (SOCKADDR*) &sin, sizeof(SOCKADDR)) == SOCKET_ERROR) + { + fprintf(stderr, "bind() failed: %d\n", WSAGetLastError()); + exit(-1); + } + + if (listen(ls, SOMAXCONN) == SOCKET_ERROR) + { + fprintf(stderr, "listen() failed: %d\n", WSAGetLastError()); + exit(-1); + } + + printf( "Start clients and then press any key to begin test.\n" ); + + while (!_kbhit()) + { + if (isConnectionPending(ls)) + { + SOCKET cs; + + int nlen = sizeof(SOCKADDR); + if ((cs = accept(ls, (SOCKADDR*) &sin, &nlen)) == INVALID_SOCKET) + { + fprintf(stderr, "accept() failed: %d\n", WSAGetLastError()); + exit(-1); + } + + const int client_num = gtp.clients++; + + nlen = sizeof(sockaddr); + getpeername( cs, (struct sockaddr *)&sin, &nlen ); + + string ip( inet_ntoa(sin.sin_addr) ); + + clientAddressMap[ip].push_back(client_num); + + printf("\tClient %3d connected from %15.15s\n", client_num, ip.c_str()); + + clientSockets.push_back(cs); + + if( gtp.nagle == false ) + disableNagle(cs); + + if( gtp.send_buffer >= 0 ) + setSocketBufferSize(cs, SO_SNDBUF, gtp.send_buffer ); + + if( gtp.recv_buffer >= 0 ) + setSocketBufferSize(cs, SO_RCVBUF, gtp.recv_buffer ); + + if( gtp.clients_limited && (gtp.clients == gtp.client_limit) ) + { + printf( "Reached limit of %d clients.\n", gtp.client_limit ); + break; + } + } + } + + // stop listening for clients + closesocket(ls); + + if (gtp.clients <= 0) + { + printf("No clients, exiting...\n"); + exit(0); + } + + if( !gtp.clients_limited || (gtp.clients < gtp.client_limit) ) + { + printf( "%d clients connected.\n", gtp.clients ); + } + +#ifdef REPORT_ESTATS + bool estats = enableTcpEStats(); + if( !estats ) + { + printf( "\nCould not enable TCP EStats. Run server as admin?\n" ); + } +#endif + + barrier b(gtp.clients); + pb = &b; + + clientResults.resize(gtp.clients); + + for( int c = 0; c < gtp.clients; ++c ) + { + clientThreads.push_back( + (HANDLE) _beginthreadex( NULL, 0, serverThread, (void*) c, 0, NULL ) ); + } + + HARD_ASSERT( clientSockets.size() == gtp.clients ); + HARD_ASSERT( clientResults.size() == gtp.clients ); + HARD_ASSERT( clientThreads.size() == gtp.clients ); + + // wait for all serverThreads to complete the test and exit + WaitForMultipleObjectsEx( gtp.clients, &clientThreads[0], true, INFINITE, FALSE ); + + printf( "done!\n" ); + + GetTcpStatistics(&tcpStatsAfter); + + reportGlobalTestParameters(); + + reportLatencyThroughput(); + + reportTcpStats(); + +#ifdef REPORT_ESTATS + if( estats ) + { + reportTcpEStats(); + } +#endif + + for( int c = 0; c < gtp.clients; ++c ) + { + gracefulShutdown( clientSockets[c] ); + } +} + +void clientMain( char* server ) +{ + printf("Client mode\n"); + + ULONG addr = inet_addr( server ); + if (addr == INADDR_NONE) + { + PADDRINFOA pai; + if( getaddrinfo( server, NULL, NULL, &pai ) != 0 ) + { + fprintf(stderr, "getaddrinfo() failed: %d\n", WSAGetLastError()); + exit(-1); + } + + for( PADDRINFOA p = pai; p != NULL; p=p->ai_next ) + { + if( p->ai_family == AF_INET ) + { + PSOCKADDR_IN sai = (PSOCKADDR_IN) p->ai_addr; + addr = *(ULONG*) &(sai->sin_addr); + break; + } + } + + freeaddrinfo( pai ); + } + + SOCKET s; + +beginTest: + printf("\nCTRL-C to quit.\n"); + + if ((s = socket(PF_INET,SOCK_STREAM,0)) == INVALID_SOCKET) + { + fprintf(stderr, "socket() failed: %d\n", WSAGetLastError()); + exit(-1); + } + + SOCKADDR_IN sin = {0}; + sin.sin_family = AF_INET; + sin.sin_port = htons(PORT); + sin.sin_addr.s_addr = addr; + + char *ip = inet_ntoa(sin.sin_addr); + + if (strcmp(server,ip) == 0) + printf("Connecting to %s port %d...", server, PORT); + else + printf("Connecting to %s (%s) port %d...", server, ip, PORT); + + while (true) + { + if (connect(s, (SOCKADDR*) &sin, sizeof(SOCKADDR)) != SOCKET_ERROR) + { + break; + } + + int err = WSAGetLastError(); + + if( (err == WSAETIMEDOUT) || (err == WSAECONNREFUSED) ) + { + //printf("."); + Sleep(100); + continue; + } + + fprintf(stderr, "Error: connect() failed: %d\n", WSAGetLastError()); + exit(-1); + } + + printf("connected!\n"); + + if( gtp.nagle == false ) + disableNagle(s); + + if( gtp.send_buffer >= 0 ) + setSocketBufferSize(s, SO_SNDBUF, gtp.send_buffer ); + + if( gtp.recv_buffer >= 0 ) + setSocketBufferSize(s, SO_RCVBUF, gtp.recv_buffer ); + + int bytes; + + // get global test parameters from server + if ((bytes = recv(s, (char*) >p, sizeof(GlobalTestParameters), MSG_WAITALL)) == SOCKET_ERROR) + { + fprintf(stderr, "recv() global test parameters failed: %d\n", WSAGetLastError()); + exit(-1); + } + HARD_ASSERT(bytes == sizeof(GlobalTestParameters)); + + // get client-specific test parameters from server + ClientSpecificTestParameters cstp; + if ((bytes = recv(s, (char*) &cstp, sizeof(ClientSpecificTestParameters), MSG_WAITALL)) == SOCKET_ERROR) + { + fprintf(stderr, "recv() client-specific test parameters failed: %d\n", WSAGetLastError()); + exit(-1); + } + HARD_ASSERT(bytes == sizeof(ClientSpecificTestParameters)); + + unique_ptr fobuf( new char[gtp.fo_msg_size] ); + unique_ptr fibuf( new char[gtp.fi_msg_size] ); + + printf( "\nWarming Up..." ); + + for( int i = 0; i < WARMUP_ITERS; ++i ) + { + // expect the fan-out + if ((bytes = recv(s, fobuf.get(), gtp.fo_msg_size, MSG_WAITALL)) == SOCKET_ERROR) + { + fprintf(stderr, "recv() fan-out failed: %d\n", WSAGetLastError()); + exit(-1); + } + HARD_ASSERT(bytes == gtp.fo_msg_size); + + // send the fan-in + if ((bytes = send(s, fibuf.get(), gtp.fi_msg_size, 0)) == SOCKET_ERROR) + { + fprintf(stderr, "send() fan-in failed: %d\n", WSAGetLastError()); + exit(-1); + } + HARD_ASSERT(bytes == gtp.fi_msg_size); + } + + printf( "done!\nTesting..." ); + + MIB_TCPSTATS tcpStatsBefore, tcpStatsAfter; + GetTcpStatistics(&tcpStatsBefore); + + for( int i = 0; i < gtp.iters; ++i ) + { + // expect the fan-out + if ((bytes = recv(s, fobuf.get(), gtp.fo_msg_size, MSG_WAITALL)) == SOCKET_ERROR) + { + fprintf(stderr, "recv() fan-out failed: %d\n", WSAGetLastError()); + exit(-1); + } + HARD_ASSERT(bytes == gtp.fo_msg_size); + + // send the fan-in + if ((bytes = send(s, fibuf.get(), gtp.fi_msg_size, 0)) == SOCKET_ERROR) + { + fprintf(stderr, "send() fan-in failed: %d\n", WSAGetLastError()); + exit(-1); + } + HARD_ASSERT(bytes == gtp.fi_msg_size); + + //printf( "." ); + } + + printf( "done!\n" ); + + GetTcpStatistics(&tcpStatsAfter); + + // ISSUE-REVIEW + // This is a system-wide statistic for all TCP connections. Can I get a + // per-connection equivalent with GetPerTcpConnectionEStats or another API? + ClientResultData crd; + crd.retransmits = tcpStatsAfter.dwRetransSegs - tcpStatsBefore.dwRetransSegs; + + // send client results + if ((bytes = send(s, (char*) &crd, sizeof(ClientResultData), 0)) == SOCKET_ERROR) + { + fprintf(stderr, "send() client results failed: %d\n", WSAGetLastError()); + exit(-1); + } + HARD_ASSERT(bytes == sizeof(ClientResultData)); + + gracefulShutdown(s); + + goto beginTest; +} + +void usage() +{ + fprintf(stderr, "\ +INCAST: Simulates the incast network traffic pattern.\n\ +\n\ +Copyright (c) Microsoft Corporation 2011\n\ +Mark Santaniello (marksan)\n\ +\n\ +Incast can be run in two modes, client or server. There is only one server,\n\ +but arbitrarily many clients. Clients launch a coordinated incast \"volley\"\n\ +at the server.\n\ +\n\ +Clients will connect to the server, run a test, and loop forever. Each server\n\ +invocation represents a new test.\n\ +\n\ +For client mode, the only argument is the server IP or name:\n\ + INCAST.EXE \n\ +\n\ +Test options are specified only on the server side:\n\ + INCAST.EXE \n\ +\n\ +Available and their default values:\n\ + -n ITERS Number of iterations (%d)\n\ + -r RATE Iteration rate limit (no limit)\n\ + -c NUM Number of clients limit (no limit)\n\ + -d Disable Nagle's algorithm (enabled)\n\ + -sb SIZE Socket send buffer size (OS default)\n\ + -rb SIZE Socket receive buffer size (OS default)\n\ + -o SIZE Fan-out message size (%d)\n\ + -i SIZE Fan-in message size (%d)\n\ + -f FILE Dump full histogram to file\n\ + -j MSEC Delay clients via random jitter (disabled)\n\ + -s MSEC Delay clients via uniform scheduling (disabled)\n", + DEFAULT_ITERS, DEFAULT_FO_MSG_SIZE, DEFAULT_FI_MSG_SIZE ); + + exit(-1); +} + +int __cdecl main( int argc, char** argv ) +{ + setHighPriority(); + + // this improves the accuracy of the Sleep calls in the jitter code + // ISSUE-REVIEW: what about ARM? +#ifndef _M_ARM + TIMECAPS tc; + HRESULT hr; + hr = timeGetDevCaps( &tc, sizeof(tc) ); + HARD_ASSERT( hr == MMSYSERR_NOERROR); + hr = timeBeginPeriod( tc.wPeriodMin ); + HARD_ASSERT( hr == TIMERR_NOERROR ); +#endif + + WSADATA WSAData; + + if (WSAStartup(MAKEWORD(2, 2), &WSAData) != 0) + { + fprintf(stderr, "WSAStartup() failed with error code %d", WSAGetLastError()); + exit(-1); + } + + // ISSUE-REVIEW: Switch to something standard like getopt + if ((argc == 2) && (argv[1][0] != '-') && (argv[1][0] != '/')) + { + clientMain( argv[1] ); + } + else + { + for( int a = 1; a < argc; ++a ) + { + if ((argv[a][0] != '-') && (argv[a][0] != '/')) + { + usage(); + } + + switch (argv[a][1]) + { + case '?': + case 'h': + usage(); + + case 'i': + a++; + gtp.fi_msg_size = atoi(argv[a]); + if( gtp.fi_msg_size <= 0 ) + { + fprintf(stderr, "-i parameter invalid\n"); + exit(-1); + } + break; + + case 'o': + a++; + gtp.fo_msg_size = atoi(argv[a]); + if( gtp.fo_msg_size <= 0 ) + { + fprintf(stderr, "-o parameter invalid\n"); + exit(-1); + } + break; + + case 'r': + { + if( argv[a][2] == NULL ) + { + a++; + gtp.rate_limited = true; + gtp.target_rate = atoi(argv[a]); + if( gtp.target_rate <= 0 ) + { + fprintf(stderr, "-r parameter invalid\n"); + exit(-1); + } + } + else if( argv[a][2] == 'b' ) + { + a++; + gtp.recv_buffer = atoi(argv[a]); + if( gtp.recv_buffer < 0 ) + { + fprintf(stderr, "-rb parameter invalid\n"); + exit(-1); + } + } + else + { + fprintf(stderr, "Unknown command line option\n\n"); + usage(); + } + } + break; + + case 's': + { + if( argv[a][2] == NULL ) + { + a++; + gtp.delay = atoi(argv[a]); + gtp.delay_method = UNIFORM_SCHED; + if( gtp.delay <= 0 ) + { + fprintf(stderr, "-s parameter invalid\n"); + exit(-1); + } + } + else if( argv[a][2] == 'b' ) + { + a++; + gtp.send_buffer = atoi(argv[a]); + if( gtp.send_buffer < 0 ) + { + fprintf(stderr, "-sb parameter invalid\n"); + exit(-1); + } + } + else + { + fprintf(stderr, "Unknown command line option\n\n"); + usage(); + } + } + break; + + case 'c': + a++; + gtp.clients_limited = true; + gtp.client_limit = atoi(argv[a]); + if( gtp.client_limit <= 0 ) + { + fprintf(stderr, "-c parameter invalid\n"); + exit(-1); + } + break; + + case 'n': + a++; + gtp.iters = atoi(argv[a]); + if( gtp.iters <= 0 ) + { + fprintf(stderr, "-n parameter invalid\n"); + exit(-1); + } + break; + + case 'd': + gtp.nagle = false; + break; + + case 'f': + a++; + gtp.histogram = true; + // ISSUE-REVIEW: Overwriting existing files? + histfile.open(argv[a]); + if( !histfile.good() ) + { + fprintf(stderr, "-f parameter invalid\n"); + exit(-1); + } + break; + + case 'j': + a++; + gtp.delay = atoi(argv[a]); + gtp.delay_method = RANDOM_JITTER; + if( gtp.delay <= 0 ) + { + fprintf(stderr, "-j parameter invalid\n"); + exit(-1); + } + break; + + default: + fprintf(stderr, "Unknown command line option\n\n"); + usage(); + } + } + + serverMain(); + } + +#ifndef _M_ARM + hr = timeEndPeriod( tc.wPeriodMin ); + HARD_ASSERT( hr == TIMERR_NOERROR ); +#endif + + WSACleanup(); +} diff --git a/incast.h b/incast.h new file mode 100644 index 0000000..da75346 --- /dev/null +++ b/incast.h @@ -0,0 +1,152 @@ +// Incast +// +// Copyright (c) Microsoft Corporation +// +// All rights reserved. +// +// MIT License +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +#ifndef _INCAST_H +#define _INCAST_H + +#include +#include +#define NOMINMAX +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#ifndef _M_ARM + #include +#endif + +#include "barrier.h" + +const unsigned PORT = 27779; +const int DEFAULT_ITERS = 10000; +const int WARMUP_ITERS = 10; +const int DEFAULT_FO_MSG_SIZE = 256; +const int DEFAULT_FI_MSG_SIZE = 4096; + +enum DelayMethod +{ + NONE, + RANDOM_JITTER, + UNIFORM_SCHED +}; + +struct GlobalTestParameters +{ + int clients; + int iters; + bool rate_limited; + int target_rate; + int fo_msg_size; + int fi_msg_size; + bool clients_limited; + int client_limit; + int delay; + DelayMethod delay_method; + + // ISSUE-REVIEW + // Should these be broken down into distinct + // client and server options? + bool nagle; + int send_buffer; + int recv_buffer; + + bool histogram; + + GlobalTestParameters() + : clients(0) + , iters(DEFAULT_ITERS) + , rate_limited(false) + , target_rate(0) + , fo_msg_size(DEFAULT_FO_MSG_SIZE) + , fi_msg_size(DEFAULT_FI_MSG_SIZE) + , clients_limited(false) + , client_limit(0) + , delay(0) + , delay_method(NONE) + , nagle(true) + , send_buffer(-1) + , recv_buffer(-1) + , histogram(false) + {}; +} gtp; + +struct ClientSpecificTestParameters +{ + int client_num; + ClientSpecificTestParameters() + : client_num(-1) + {}; +}; + +struct ClientResultData +{ + int retransmits; + + ClientResultData() + : retransmits(0) + {}; +}; + +struct Measurement +{ + __int64 actual_delay; + __int64 start; + __int64 stop; +}; + +typedef std::vector Measurements; + +struct TestResult +{ + ClientResultData crd; + Measurements measurements; +}; + +std::vector clientResults; +std::vector clientThreads; +std::vector clientSockets; + +barrier *pb; + +std::ofstream histfile; + +MIB_TCPSTATS tcpStatsBefore, tcpStatsAfter; + +typedef std::map> ClientAddressMap; +ClientAddressMap clientAddressMap; + +#endif // _INCAST_H diff --git a/make.cmd b/make.cmd new file mode 100644 index 0000000..3c2d1e5 --- /dev/null +++ b/make.cmd @@ -0,0 +1 @@ +cl /EHsc /O2 incast.cpp ws2_32.lib iphlpapi.lib winmm.lib diff --git a/tcpstats.h b/tcpstats.h new file mode 100644 index 0000000..8362d10 --- /dev/null +++ b/tcpstats.h @@ -0,0 +1,216 @@ +// Incast +// +// Copyright (c) Microsoft Corporation +// +// All rights reserved. +// +// MIT License +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +#ifndef _INCAST_TCPSTATS_H +#define _INCAST_TCPSTATS_H +void reportTcpStats() +{ + using namespace std; + + // ISSUE-REVIEW + // This is a system-wide statistic for all TCP connections. Can I get a + // per-connection equivalent with GetPerTcpConnectionEStats or another API? + // + // ISSUE-REVIEW + // I aggregate this system-wide value per-ip. I am conflating ip with system. + // This is broken on multi-homed machines. + + int serverRetransmits = + tcpStatsAfter.dwRetransSegs - tcpStatsBefore.dwRetransSegs; + + printf( "\n" ); + printf( "Retransmits (system-wide):\n" ); + printf( "\tserver: %3d\n", serverRetransmits ); + + int clientRetransmits = 0; + + ClientAddressMap::const_iterator i; + for( i = clientAddressMap.begin(); i != clientAddressMap.end(); ++i ) + { + int maxRetransmits = numeric_limits::min(); + int num_clients = i->second.size(); + + for( int j = 0; j < num_clients; ++j ) + { + int client_num = i->second[j]; + maxRetransmits = + max( maxRetransmits, clientResults[client_num].crd.retransmits ); + } + + clientRetransmits += maxRetransmits; + + //printf( "\tall %3d clients from %15.15s: %d\n", + // num_clients, + // i->first.c_str(), + // maxRetransmits ); + } + + printf( "\tclients: %3d\n", clientRetransmits ); + printf( "\ttotal: %3d\n", clientRetransmits + serverRetransmits ); +} + +bool enableTcpEStats() +{ + using namespace std; + + DWORD tcpTableSize = 0; + + DWORD r = GetTcpTable( NULL, &tcpTableSize, 0 ); + + HARD_ASSERT( r == ERROR_INSUFFICIENT_BUFFER ); + HARD_ASSERT( tcpTableSize > 0 ); + + unique_ptr tcpTable( (PMIB_TCPTABLE) malloc(tcpTableSize), free ); + + r = GetTcpTable( tcpTable.get(), &tcpTableSize, TRUE ); + + // ISSUE-REVIEW + // Could get a new TCP connection between GetTcpTable calls. + HARD_ASSERT( r != ERROR_INSUFFICIENT_BUFFER ); + + for( unsigned i = 0; i < tcpTable->dwNumEntries; ++i ) + { + PMIB_TCPROW tr = &tcpTable->table[i]; + if( ntohs((u_short) tr->dwLocalPort) == PORT ) + { + TCP_ESTATS_SND_CONG_RW_v0 snd_rw; + snd_rw.EnableCollection = 1; + + r = SetPerTcpConnectionEStats( + tr, + TcpConnectionEstatsSndCong, + (PUCHAR) &snd_rw, + 0, + sizeof(snd_rw), + 0 ); + + if( r != NO_ERROR ) + return false; + } + } + + return true; +} + +void reportTcpEStats() +{ + using namespace std; + + DWORD tcpTableSize = 0; + + DWORD r = GetTcpTable( NULL, &tcpTableSize, 0 ); + + HARD_ASSERT( r == ERROR_INSUFFICIENT_BUFFER ); + HARD_ASSERT( tcpTableSize > 0 ); + + unique_ptr tcpTable( (PMIB_TCPTABLE) malloc(tcpTableSize), free ); + + r = GetTcpTable( tcpTable.get(), &tcpTableSize, TRUE ); + + // ISSUE-REVIEW + // Could get a new TCP connection between GetTcpTable calls. + HARD_ASSERT( r != ERROR_INSUFFICIENT_BUFFER ); + +#ifndef PRINT_PER_CLIENT_ESTATS + printf( "\nCongestion %%age:\n" ); + + ULONG totalRecvTime = 0; + ULONG totalNetTime = 0; + ULONG totalSendTime = 0; + + for( unsigned i = 0; i < tcpTable->dwNumEntries; ++i ) + { + TCP_ESTATS_SND_CONG_ROD_v0 snd_rod; + + PMIB_TCPROW tr = &tcpTable->table[i]; + if( ntohs((u_short) tr->dwLocalPort) == PORT ) + { + r = GetPerTcpConnectionEStats( + tr, + TcpConnectionEstatsSndCong, + NULL, 0, 0, + NULL, 0, 0, + (PUCHAR) &snd_rod, + 0, + sizeof(snd_rod) ); + + HARD_ASSERT( r == NO_ERROR ); + + totalRecvTime += snd_rod.SndLimTimeRwin; + totalNetTime += snd_rod.SndLimTimeCwnd; + totalSendTime += snd_rod.SndLimTimeSnd; + } + } + + ULONG totalTime = totalRecvTime + totalNetTime + totalSendTime; + + printf( "\treceive: %5.4f\n", totalRecvTime*100.0/totalTime ); + printf( "\tnetwork: %5.4f\n", totalNetTime*100.0/totalTime ); + printf( "\tsend: %5.4f\n", totalSendTime*100.0/totalTime ); + +#else + printf( "\nCongestion msec (recv/net/send):\n" ); + //printf( "\nCongestion %%age (recv/net/send):\n" ); + + for( unsigned i = 0; i < tcpTable->dwNumEntries; ++i ) + { + TCP_ESTATS_SND_CONG_ROD_v0 snd_rod; + + PMIB_TCPROW tr = &tcpTable->table[i]; + if( ntohs((u_short) tr->dwLocalPort) == PORT ) + { + r = GetPerTcpConnectionEStats( + tr, + TcpConnectionEstatsSndCong, + NULL, 0, 0, + NULL, 0, 0, + (PUCHAR) &snd_rod, + 0, + sizeof(snd_rod) ); + + HARD_ASSERT( r == NO_ERROR ); + + printf( "\tclient from %15.15s: ", inet_ntoa( *(IN_ADDR*) &(tr->dwRemoteAddr) ) ); + + printf( "\t%u / %u / %u\n", + snd_rod.SndLimTimeRwin, + snd_rod.SndLimTimeCwnd, + snd_rod.SndLimTimeSnd ); + +// ULONG totalTime = +// snd_rod.SndLimTimeRwin + +// snd_rod.SndLimTimeCwnd + +// snd_rod.SndLimTimeSnd; +// +// printf( "\t%5.2f / %5.2f / %5.2f\n", +// snd_rod.SndLimTimeRwin*100.0/totalTime, +// snd_rod.SndLimTimeCwnd*100.0/totalTime, +// snd_rod.SndLimTimeSnd*100.0/totalTime ); + } + } +#endif +} +#endif //_INCAST_TCPSTATS_H diff --git a/utils.h b/utils.h new file mode 100644 index 0000000..ecad2dc --- /dev/null +++ b/utils.h @@ -0,0 +1,170 @@ +// Incast +// +// Copyright (c) Microsoft Corporation +// +// All rights reserved. +// +// MIT License +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +#ifndef __INCAST_UTILS_H +#define __INCAST_UTILS_H +#include +#include +#include +#include + +// an assertion that persists even in non-debug builds +__declspec(noinline) +void HARD_ASSERT( bool cond, char *msg ) +{ + if( !cond ) + { + fprintf( stderr, "Assertion failed: %s\n", msg ); + abort(); + } +} +#define HARD_ASSERT(x) (HARD_ASSERT(x,#x)); +#define UNREACHED false + +__int64 qpf() +{ + LARGE_INTEGER f; + QueryPerformanceFrequency(&f); + return f.QuadPart; +} + +static __int64 freq = qpf(); + +__int64 qpc() +{ + LARGE_INTEGER t; + QueryPerformanceCounter(&t); + return t.QuadPart; +} + +double qpc_to_msec( __int64 x ) +{ + return x * 1000.0 / freq; +} + +__int64 msec_to_qpc( double x ) +{ + return (__int64) (x * freq / 1000); +} + +// takes desired sleep in msec +// returns actual sleep in qpc ticks +__int64 mySleep( const double target_msec ) +{ + __int64 start = qpc(); + __int64 actual; + + double remaining_msec = target_msec; + + while(1) + { + Sleep( (int) (remaining_msec + 0.5) ); + + actual = qpc() - start; + + double actual_msec = qpc_to_msec( actual ); + + if( actual_msec >= target_msec ) + break; + + remaining_msec = target_msec - actual_msec; + } + + return actual; +} + +void setHighPriority() +{ + SetPriorityClass( GetCurrentProcess(), HIGH_PRIORITY_CLASS ); +} + +void gracefulShutdown( SOCKET s ) +{ + char buf[256]; + int rv; + + if( shutdown(s, SD_SEND) == SOCKET_ERROR ) + { + fprintf(stderr, "in gracefulShutdown, shutdown() failed: %d\n", WSAGetLastError()); + exit(-1); + } + + do + { + if( (rv = recv(s, buf, 256, 0 ) ) == SOCKET_ERROR ) + { + fprintf(stderr, "in gracefulShutdown, recv() failed: %d\n", WSAGetLastError()); + exit(-1); + } + } while( rv != 0 ); + + closesocket(s); +} + +bool isConnectionPending( SOCKET ls ) +{ + fd_set fds; + TIMEVAL tv = {0, 0}; + + FD_ZERO(&fds); + FD_SET(ls, &fds); + + int rv = select( 0, &fds, NULL, NULL, &tv ); + + if (rv == SOCKET_ERROR) + { + fprintf(stderr, "select() failed: %d\n", WSAGetLastError()); + exit(-1); + } + + return rv ? true : false; +} + +void setSocketBufferSize( SOCKET s, int optname, int size ) +{ + if (setsockopt(s, SOL_SOCKET, optname, (char*) &size, sizeof(size)) != 0) + { + fprintf(stderr, "setsockopt() failed: %d\n", WSAGetLastError()); + } + + int rb, rbs = sizeof(rb); + getsockopt(s, SOL_SOCKET, optname, (char*) &rb, &rbs); + + if (rb != size) + { + fprintf(stderr, "setsockopt() didn't take effect\n"); + } +} + +void disableNagle( SOCKET s ) +{ + int flag = 1; + if (setsockopt(s, IPPROTO_TCP, TCP_NODELAY, (char*) &flag, sizeof(flag)) != 0) + { + fprintf(stderr, "setsockopt() failed: %d\n", WSAGetLastError()); + } +} +#endif // __INCAST_UTILS_H