net: track bytesWritten in C++ land
Move tracking of `socket.bytesWritten` to C++ land. This makes it easier to provide this functionality for all `StreamBase` instances, and in particular should keep working when they have been 'consumed' in C++ in some way (e.g. for the network sockets that are underlying to TLS or HTTP2 streams). Also, this parallels `socket.bytesRead` a lot more now. PR-URL: https://github.com/nodejs/node/pull/19551 Reviewed-By: James M Snell <jasnell@gmail.com>
This commit is contained in:
Родитель
abc87862ff
Коммит
923fb5cc18
|
@ -32,7 +32,7 @@ function makeSyncWrite(fd) {
|
|||
if (enc !== 'buffer')
|
||||
chunk = Buffer.from(chunk, enc);
|
||||
|
||||
this._bytesDispatched += chunk.length;
|
||||
this._handle.bytesWritten += chunk.length;
|
||||
|
||||
const ctx = {};
|
||||
writeBuffer(fd, chunk, 0, chunk.length, null, undefined, ctx);
|
||||
|
|
28
lib/net.js
28
lib/net.js
|
@ -206,7 +206,6 @@ function normalizeArgs(args) {
|
|||
// called when creating new Socket, or when re-using a closed Socket
|
||||
function initSocketHandle(self) {
|
||||
self._undestroy();
|
||||
self._bytesDispatched = 0;
|
||||
self._sockname = null;
|
||||
|
||||
// Handle creation may be deferred to bind() or connect() time.
|
||||
|
@ -222,7 +221,8 @@ function initSocketHandle(self) {
|
|||
}
|
||||
|
||||
|
||||
const BYTES_READ = Symbol('bytesRead');
|
||||
const kBytesRead = Symbol('kBytesRead');
|
||||
const kBytesWritten = Symbol('kBytesWritten');
|
||||
|
||||
|
||||
function Socket(options) {
|
||||
|
@ -278,6 +278,11 @@ function Socket(options) {
|
|||
|
||||
this._writev = null;
|
||||
this._write = makeSyncWrite(fd);
|
||||
// makeSyncWrite adjusts this value like the original handle would, so
|
||||
// we need to let it do that by turning it into a writable, own property.
|
||||
Object.defineProperty(this._handle, 'bytesWritten', {
|
||||
value: 0, writable: true
|
||||
});
|
||||
}
|
||||
} else {
|
||||
// these will be set once there is a connection
|
||||
|
@ -316,7 +321,8 @@ function Socket(options) {
|
|||
this._server = null;
|
||||
|
||||
// Used after `.destroy()`
|
||||
this[BYTES_READ] = 0;
|
||||
this[kBytesRead] = 0;
|
||||
this[kBytesWritten] = 0;
|
||||
}
|
||||
util.inherits(Socket, stream.Duplex);
|
||||
|
||||
|
@ -588,8 +594,9 @@ Socket.prototype._destroy = function(exception, cb) {
|
|||
if (this !== process.stderr)
|
||||
debug('close handle');
|
||||
var isException = exception ? true : false;
|
||||
// `bytesRead` should be accessible after `.destroy()`
|
||||
this[BYTES_READ] = this._handle.bytesRead;
|
||||
// `bytesRead` and `kBytesWritten` should be accessible after `.destroy()`
|
||||
this[kBytesRead] = this._handle.bytesRead;
|
||||
this[kBytesWritten] = this._handle.bytesWritten;
|
||||
|
||||
this._handle.close(() => {
|
||||
debug('emit close');
|
||||
|
@ -689,7 +696,7 @@ function protoGetter(name, callback) {
|
|||
}
|
||||
|
||||
protoGetter('bytesRead', function bytesRead() {
|
||||
return this._handle ? this._handle.bytesRead : this[BYTES_READ];
|
||||
return this._handle ? this._handle.bytesRead : this[kBytesRead];
|
||||
});
|
||||
|
||||
protoGetter('remoteAddress', function remoteAddress() {
|
||||
|
@ -761,8 +768,6 @@ Socket.prototype._writeGeneric = function(writev, data, encoding, cb) {
|
|||
// Bail out if handle.write* returned an error
|
||||
if (ret) return ret;
|
||||
|
||||
this._bytesDispatched += req.bytes;
|
||||
|
||||
if (!req.async) {
|
||||
cb();
|
||||
return;
|
||||
|
@ -782,6 +787,13 @@ Socket.prototype._write = function(data, encoding, cb) {
|
|||
this._writeGeneric(false, data, encoding, cb);
|
||||
};
|
||||
|
||||
|
||||
// Legacy alias. Having this is probably being overly cautious, but it doesn't
|
||||
// really hurt anyone either. This can probably be removed safely if desired.
|
||||
protoGetter('_bytesDispatched', function _bytesDispatched() {
|
||||
return this._handle ? this._handle.bytesWritten : this[kBytesWritten];
|
||||
});
|
||||
|
||||
protoGetter('bytesWritten', function bytesWritten() {
|
||||
var bytes = this._bytesDispatched;
|
||||
const state = this._writableState;
|
||||
|
|
|
@ -117,6 +117,7 @@ struct PackageConfig {
|
|||
V(bytes_string, "bytes") \
|
||||
V(bytes_parsed_string, "bytesParsed") \
|
||||
V(bytes_read_string, "bytesRead") \
|
||||
V(bytes_written_string, "bytesWritten") \
|
||||
V(cached_data_string, "cachedData") \
|
||||
V(cached_data_produced_string, "cachedDataProduced") \
|
||||
V(cached_data_rejected_string, "cachedDataRejected") \
|
||||
|
|
|
@ -193,6 +193,10 @@ inline StreamWriteResult StreamBase::Write(
|
|||
v8::Local<v8::Object> req_wrap_obj) {
|
||||
Environment* env = stream_env();
|
||||
int err;
|
||||
|
||||
for (size_t i = 0; i < count; ++i)
|
||||
bytes_written_ += bufs[i].len;
|
||||
|
||||
if (send_handle == nullptr) {
|
||||
err = DoTryWrite(&bufs, &count);
|
||||
if (err != 0 || count == 0) {
|
||||
|
@ -301,6 +305,12 @@ void StreamBase::AddMethods(Environment* env,
|
|||
env->as_external(),
|
||||
signature);
|
||||
|
||||
Local<FunctionTemplate> get_bytes_written_templ =
|
||||
FunctionTemplate::New(env->isolate(),
|
||||
GetBytesWritten<Base>,
|
||||
env->as_external(),
|
||||
signature);
|
||||
|
||||
t->PrototypeTemplate()->SetAccessorProperty(env->fd_string(),
|
||||
get_fd_templ,
|
||||
Local<FunctionTemplate>(),
|
||||
|
@ -316,6 +326,11 @@ void StreamBase::AddMethods(Environment* env,
|
|||
Local<FunctionTemplate>(),
|
||||
attributes);
|
||||
|
||||
t->PrototypeTemplate()->SetAccessorProperty(env->bytes_written_string(),
|
||||
get_bytes_written_templ,
|
||||
Local<FunctionTemplate>(),
|
||||
attributes);
|
||||
|
||||
env->SetProtoMethod(t, "readStart", JSMethod<Base, &StreamBase::ReadStartJS>);
|
||||
env->SetProtoMethod(t, "readStop", JSMethod<Base, &StreamBase::ReadStopJS>);
|
||||
if ((flags & kFlagNoShutdown) == 0)
|
||||
|
@ -357,7 +372,6 @@ void StreamBase::GetFD(const FunctionCallbackInfo<Value>& args) {
|
|||
|
||||
template <class Base>
|
||||
void StreamBase::GetBytesRead(const FunctionCallbackInfo<Value>& args) {
|
||||
// The handle instance hasn't been set. So no bytes could have been read.
|
||||
Base* handle;
|
||||
ASSIGN_OR_RETURN_UNWRAP(&handle,
|
||||
args.This(),
|
||||
|
@ -368,6 +382,18 @@ void StreamBase::GetBytesRead(const FunctionCallbackInfo<Value>& args) {
|
|||
args.GetReturnValue().Set(static_cast<double>(wrap->bytes_read_));
|
||||
}
|
||||
|
||||
template <class Base>
|
||||
void StreamBase::GetBytesWritten(const FunctionCallbackInfo<Value>& args) {
|
||||
Base* handle;
|
||||
ASSIGN_OR_RETURN_UNWRAP(&handle,
|
||||
args.This(),
|
||||
args.GetReturnValue().Set(0));
|
||||
|
||||
StreamBase* wrap = static_cast<StreamBase*>(handle);
|
||||
// uint64_t -> double. 53bits is enough for all real cases.
|
||||
args.GetReturnValue().Set(static_cast<double>(wrap->bytes_written_));
|
||||
}
|
||||
|
||||
template <class Base>
|
||||
void StreamBase::GetExternal(const FunctionCallbackInfo<Value>& args) {
|
||||
Base* handle;
|
||||
|
|
|
@ -243,6 +243,7 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
|
|||
uv_buf_t* bufs = &buf;
|
||||
size_t count = 1;
|
||||
err = DoTryWrite(&bufs, &count);
|
||||
bytes_written_ += data_size;
|
||||
|
||||
// Immediate failure or success
|
||||
if (err != 0 || count == 0) {
|
||||
|
|
|
@ -247,6 +247,7 @@ class StreamResource {
|
|||
|
||||
StreamListener* listener_ = nullptr;
|
||||
uint64_t bytes_read_ = 0;
|
||||
uint64_t bytes_written_ = 0;
|
||||
|
||||
friend class StreamListener;
|
||||
};
|
||||
|
@ -324,6 +325,9 @@ class StreamBase : public StreamResource {
|
|||
template <class Base>
|
||||
static void GetBytesRead(const v8::FunctionCallbackInfo<v8::Value>& args);
|
||||
|
||||
template <class Base>
|
||||
static void GetBytesWritten(const v8::FunctionCallbackInfo<v8::Value>& args);
|
||||
|
||||
template <class Base,
|
||||
int (StreamBase::*Method)(
|
||||
const v8::FunctionCallbackInfo<v8::Value>& args)>
|
||||
|
|
Загрузка…
Ссылка в новой задаче