Merge pull request #49 from danielli90/connect

change Connect() to use async connect call and honor receiveTimeoutMs…
This commit is contained in:
Soumyajit Sahu 2016-04-22 17:35:50 -07:00
Родитель bdb85152dd 4b743bc0a9
Коммит d4c2dda4c3
1 изменённых файлов: 46 добавлений и 18 удалений

Просмотреть файл

@ -191,7 +191,8 @@ namespace Kafka.Client
private void Connect()
{
if (socket != null)
var watch = Stopwatch.StartNew();
if (this.socket != null)
{
try
{
@ -199,13 +200,12 @@ namespace Kafka.Client
}
catch (Exception e)
{
Logger.Error(this.ToString() + ExceptionUtil.GetExceptionDetailInfo(e));
Logger.Error(string.Format("KafkaConnectio.Connect() exception in CloseConnection, duration={0}ms", watch.ElapsedMilliseconds), e);
}
}
this.socket = null;
IPAddress targetAddress;
if (IPAddress.TryParse(server, out targetAddress))
{
@ -220,12 +220,23 @@ namespace Kafka.Client
ReceiveBufferSize = bufferSize
};
newSocket.Connect(targetAddress, port);
socket = newSocket;
var result = newSocket.BeginConnect(targetAddress, port, null, null);
// use receiveTimeoutMs as connectionTimeoutMs
result.AsyncWaitHandle.WaitOne(this.receiveTimeoutMs, true);
result.AsyncWaitHandle.Close();
if (newSocket.Connected)
{
this.socket = newSocket;
}
else
{
newSocket.Close();
}
}
catch (Exception ex)
{
Logger.Error(this.ToString() + ExceptionUtil.GetExceptionDetailInfo(ex));
Logger.Error(string.Format("KafkaConnectio.Connect() failed, duration={0}ms,this={1},targetAddress={2}", watch.ElapsedMilliseconds, this, targetAddress), ex);
throw new UnableToConnectToHostException(targetAddress.ToString(), port, ex);
}
}
@ -243,20 +254,31 @@ namespace Kafka.Client
try
{
var newSocket = new Socket(address.AddressFamily, SocketType.Stream, ProtocolType.Tcp)
{
NoDelay = true,
ReceiveTimeout = this.receiveTimeoutMs,
SendTimeout = this.sendTimeoutMs,
SendBufferSize = bufferSize,
ReceiveBufferSize = bufferSize
};
{
NoDelay = true,
ReceiveTimeout = this.receiveTimeoutMs,
SendTimeout = this.sendTimeoutMs,
SendBufferSize = bufferSize,
ReceiveBufferSize = bufferSize
};
newSocket.Connect(address, port);
socket = newSocket;
var result = newSocket.BeginConnect(address, port, null, null);
// use receiveTimeoutMs as connectionTimeoutMs
result.AsyncWaitHandle.WaitOne(this.receiveTimeoutMs, true);
result.AsyncWaitHandle.Close();
if (!newSocket.Connected)
{
newSocket.Close();
continue;
}
this.socket = newSocket;
break;
}
catch (Exception e)
{
Logger.Error(string.Format("ErrorConnectingToAddress, duration={0}ms,address={1},server={2},port={3}", watch.ElapsedMilliseconds, address, server, port), e);
throw new UnableToConnectToHostException(server, port, e);
}
}
@ -264,12 +286,18 @@ namespace Kafka.Client
if (socket == null)
{
Logger.ErrorFormat("UnableToConnectToHostException, duration={0}ms,server={1},port={2}", watch.ElapsedMilliseconds, server, port);
throw new UnableToConnectToHostException(server, port);
}
this.stream = new NetworkStream(socket, true);
this.stream.ReadTimeout = networkStreamReadTimeoutMs;
this.stream.WriteTimeout = networkStreamWriteTimeoutMs;
Logger.DebugFormat("KafkaConnection.Connect() succeeded, duration={0}ms,server={1},port={2}",
watch.ElapsedMilliseconds, server, port);
this.stream = new NetworkStream(socket, true)
{
ReadTimeout = this.networkStreamReadTimeoutMs,
WriteTimeout = this.networkStreamWriteTimeoutMs
};
this.reader = new KafkaBinaryReader(stream);
Connected = true;
this.lastActiveTimeMs = Environment.TickCount;