Stage 3: Channelz server implementation (#1919)

This commit is contained in:
lyuxuan 2018-04-30 11:14:29 -07:00 коммит произвёл GitHub
Родитель 7a8c989507
Коммит 8b85126464
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
4 изменённых файлов: 3384 добавлений и 0 удалений

265
channelz/service/service.go Normal file
Просмотреть файл

@ -0,0 +1,265 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
//go:generate protoc -I ../service_proto --go_out=plugins=grpc:../service_proto ../service_proto/service.proto
// Package service provides an implementation for channelz service server.
package service
import (
"net"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/wrappers"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/channelz"
pb "google.golang.org/grpc/channelz/service_proto"
"google.golang.org/grpc/connectivity"
)
// RegisterChannelzServiceToServer registers the channelz service to the given server.
func RegisterChannelzServiceToServer(s *grpc.Server) {
pb.RegisterChannelzServer(s, &serverImpl{})
}
func newCZServer() pb.ChannelzServer {
return &serverImpl{}
}
type serverImpl struct{}
func connectivityStateToProto(s connectivity.State) *pb.ChannelConnectivityState {
switch s {
case connectivity.Idle:
return &pb.ChannelConnectivityState{State: pb.ChannelConnectivityState_IDLE}
case connectivity.Connecting:
return &pb.ChannelConnectivityState{State: pb.ChannelConnectivityState_CONNECTING}
case connectivity.Ready:
return &pb.ChannelConnectivityState{State: pb.ChannelConnectivityState_READY}
case connectivity.TransientFailure:
return &pb.ChannelConnectivityState{State: pb.ChannelConnectivityState_TRANSIENT_FAILURE}
case connectivity.Shutdown:
return &pb.ChannelConnectivityState{State: pb.ChannelConnectivityState_SHUTDOWN}
default:
return &pb.ChannelConnectivityState{State: pb.ChannelConnectivityState_UNKNOWN}
}
}
func channelMetricToProto(cm *channelz.ChannelMetric) *pb.Channel {
c := &pb.Channel{}
c.Ref = &pb.ChannelRef{ChannelId: cm.ID, Name: cm.RefName}
c.Data = &pb.ChannelData{
State: connectivityStateToProto(cm.ChannelData.State),
Target: cm.ChannelData.Target,
CallsStarted: cm.ChannelData.CallsStarted,
CallsSucceeded: cm.ChannelData.CallsSucceeded,
CallsFailed: cm.ChannelData.CallsFailed,
}
if ts, err := ptypes.TimestampProto(cm.ChannelData.LastCallStartedTimestamp); err == nil {
c.Data.LastCallStartedTimestamp = ts
}
nestedChans := make([]*pb.ChannelRef, 0, len(cm.NestedChans))
for id, ref := range cm.NestedChans {
nestedChans = append(nestedChans, &pb.ChannelRef{ChannelId: id, Name: ref})
}
c.ChannelRef = nestedChans
subChans := make([]*pb.SubchannelRef, 0, len(cm.SubChans))
for id, ref := range cm.SubChans {
subChans = append(subChans, &pb.SubchannelRef{SubchannelId: id, Name: ref})
}
c.SubchannelRef = subChans
sockets := make([]*pb.SocketRef, 0, len(cm.Sockets))
for id, ref := range cm.Sockets {
sockets = append(sockets, &pb.SocketRef{SocketId: id, Name: ref})
}
c.SocketRef = sockets
return c
}
func subChannelMetricToProto(cm *channelz.SubChannelMetric) *pb.Subchannel {
sc := &pb.Subchannel{}
sc.Ref = &pb.SubchannelRef{SubchannelId: cm.ID, Name: cm.RefName}
sc.Data = &pb.ChannelData{
State: connectivityStateToProto(cm.ChannelData.State),
Target: cm.ChannelData.Target,
CallsStarted: cm.ChannelData.CallsStarted,
CallsSucceeded: cm.ChannelData.CallsSucceeded,
CallsFailed: cm.ChannelData.CallsFailed,
}
if ts, err := ptypes.TimestampProto(cm.ChannelData.LastCallStartedTimestamp); err == nil {
sc.Data.LastCallStartedTimestamp = ts
}
nestedChans := make([]*pb.ChannelRef, 0, len(cm.NestedChans))
for id, ref := range cm.NestedChans {
nestedChans = append(nestedChans, &pb.ChannelRef{ChannelId: id, Name: ref})
}
sc.ChannelRef = nestedChans
subChans := make([]*pb.SubchannelRef, 0, len(cm.SubChans))
for id, ref := range cm.SubChans {
subChans = append(subChans, &pb.SubchannelRef{SubchannelId: id, Name: ref})
}
sc.SubchannelRef = subChans
sockets := make([]*pb.SocketRef, 0, len(cm.Sockets))
for id, ref := range cm.Sockets {
sockets = append(sockets, &pb.SocketRef{SocketId: id, Name: ref})
}
sc.SocketRef = sockets
return sc
}
func addrToProto(a net.Addr) *pb.Address {
switch a.Network() {
case "udp":
// TODO: Address_OtherAddress{}. Need proto def for Value.
case "ip":
// Note zone info is discarded through the conversion.
return &pb.Address{Address: &pb.Address_TcpipAddress{TcpipAddress: &pb.Address_TcpIpAddress{IpAddress: a.(*net.IPAddr).IP}}}
case "ip+net":
// Note mask info is discarded through the conversion.
return &pb.Address{Address: &pb.Address_TcpipAddress{TcpipAddress: &pb.Address_TcpIpAddress{IpAddress: a.(*net.IPNet).IP}}}
case "tcp":
// Note zone info is discarded through the conversion.
return &pb.Address{Address: &pb.Address_TcpipAddress{TcpipAddress: &pb.Address_TcpIpAddress{IpAddress: a.(*net.TCPAddr).IP, Port: int32(a.(*net.TCPAddr).Port)}}}
case "unix", "unixgram", "unixpacket":
return &pb.Address{Address: &pb.Address_UdsAddress_{UdsAddress: &pb.Address_UdsAddress{Filename: a.String()}}}
default:
}
return &pb.Address{}
}
func socketMetricToProto(sm *channelz.SocketMetric) *pb.Socket {
s := &pb.Socket{}
s.Ref = &pb.SocketRef{SocketId: sm.ID, Name: sm.RefName}
s.Data = &pb.SocketData{
StreamsStarted: sm.SocketData.StreamsStarted,
StreamsSucceeded: sm.SocketData.StreamsSucceeded,
StreamsFailed: sm.SocketData.StreamsFailed,
MessagesSent: sm.SocketData.MessagesSent,
MessagesReceived: sm.SocketData.MessagesReceived,
KeepAlivesSent: sm.SocketData.KeepAlivesSent,
}
if ts, err := ptypes.TimestampProto(sm.SocketData.LastLocalStreamCreatedTimestamp); err == nil {
s.Data.LastLocalStreamCreatedTimestamp = ts
}
if ts, err := ptypes.TimestampProto(sm.SocketData.LastRemoteStreamCreatedTimestamp); err == nil {
s.Data.LastRemoteStreamCreatedTimestamp = ts
}
if ts, err := ptypes.TimestampProto(sm.SocketData.LastMessageSentTimestamp); err == nil {
s.Data.LastMessageSentTimestamp = ts
}
if ts, err := ptypes.TimestampProto(sm.SocketData.LastMessageReceivedTimestamp); err == nil {
s.Data.LastMessageReceivedTimestamp = ts
}
s.Data.LocalFlowControlWindow = &wrappers.Int64Value{Value: sm.SocketData.LocalFlowControlWindow}
s.Data.RemoteFlowControlWindow = &wrappers.Int64Value{Value: sm.SocketData.RemoteFlowControlWindow}
if sm.SocketData.LocalAddr != nil {
s.Local = addrToProto(sm.SocketData.LocalAddr)
}
if sm.SocketData.RemoteAddr != nil {
s.Remote = addrToProto(sm.SocketData.RemoteAddr)
}
s.RemoteName = sm.SocketData.RemoteName
return s
}
func (s *serverImpl) GetTopChannels(ctx context.Context, req *pb.GetTopChannelsRequest) (*pb.GetTopChannelsResponse, error) {
metrics, end := channelz.GetTopChannels(req.GetStartChannelId())
resp := &pb.GetTopChannelsResponse{}
for _, m := range metrics {
resp.Channel = append(resp.Channel, channelMetricToProto(m))
}
resp.End = end
return resp, nil
}
func serverMetricToProto(sm *channelz.ServerMetric) *pb.Server {
s := &pb.Server{}
s.Ref = &pb.ServerRef{ServerId: sm.ID, Name: sm.RefName}
s.Data = &pb.ServerData{
CallsStarted: sm.ServerData.CallsStarted,
CallsSucceeded: sm.ServerData.CallsSucceeded,
CallsFailed: sm.ServerData.CallsFailed,
}
if ts, err := ptypes.TimestampProto(sm.ServerData.LastCallStartedTimestamp); err == nil {
s.Data.LastCallStartedTimestamp = ts
}
sockets := make([]*pb.SocketRef, 0, len(sm.ListenSockets))
for id, ref := range sm.ListenSockets {
sockets = append(sockets, &pb.SocketRef{SocketId: id, Name: ref})
}
s.ListenSocket = sockets
return s
}
func (s *serverImpl) GetServers(ctx context.Context, req *pb.GetServersRequest) (*pb.GetServersResponse, error) {
metrics, end := channelz.GetServers(req.GetStartServerId())
resp := &pb.GetServersResponse{}
for _, m := range metrics {
resp.Server = append(resp.Server, serverMetricToProto(m))
}
resp.End = end
return resp, nil
}
func (s *serverImpl) GetServerSockets(ctx context.Context, req *pb.GetServerSocketsRequest) (*pb.GetServerSocketsResponse, error) {
metrics, end := channelz.GetServerSockets(req.GetServerId(), req.GetStartSocketId())
resp := &pb.GetServerSocketsResponse{}
for _, m := range metrics {
resp.SocketRef = append(resp.SocketRef, &pb.SocketRef{SocketId: m.ID, Name: m.RefName})
}
resp.End = end
return resp, nil
}
func (s *serverImpl) GetChannel(ctx context.Context, req *pb.GetChannelRequest) (*pb.GetChannelResponse, error) {
var metric *channelz.ChannelMetric
if metric = channelz.GetChannel(req.GetChannelId()); metric == nil {
return &pb.GetChannelResponse{}, nil
}
resp := &pb.GetChannelResponse{Channel: channelMetricToProto(metric)}
return resp, nil
}
func (s *serverImpl) GetSubchannel(ctx context.Context, req *pb.GetSubchannelRequest) (*pb.GetSubchannelResponse, error) {
var metric *channelz.SubChannelMetric
if metric = channelz.GetSubChannel(req.GetSubchannelId()); metric == nil {
return &pb.GetSubchannelResponse{}, nil
}
resp := &pb.GetSubchannelResponse{Subchannel: subChannelMetricToProto(metric)}
return resp, nil
}
func (s *serverImpl) GetSocket(ctx context.Context, req *pb.GetSocketRequest) (*pb.GetSocketResponse, error) {
var metric *channelz.SocketMetric
if metric = channelz.GetSocket(req.GetSocketId()); metric == nil {
return &pb.GetSocketResponse{}, nil
}
resp := &pb.GetSocketResponse{Socket: socketMetricToProto(metric)}
return resp, nil
}

Просмотреть файл

@ -0,0 +1,477 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package service
import (
"net"
"reflect"
"strconv"
"testing"
"time"
"github.com/golang/protobuf/ptypes"
"golang.org/x/net/context"
"google.golang.org/grpc/channelz"
pb "google.golang.org/grpc/channelz/service_proto"
"google.golang.org/grpc/connectivity"
)
func init() {
channelz.TurnOn()
}
// emptyTime is used for detecting unset value of time.Time type.
// For go1.7 and earlier, ptypes.Timestamp will fill in the loc field of time.Time
// with &utcLoc. However zero value of a time.Time type value loc field is nil.
// This behavior will make reflect.DeepEqual fail upon unset time.Time field,
// and cause false positive fatal error.
var emptyTime time.Time
type dummyChannel struct {
state connectivity.State
target string
callsStarted int64
callsSucceeded int64
callsFailed int64
lastCallStartedTimestamp time.Time
}
func (d *dummyChannel) ChannelzMetric() *channelz.ChannelInternalMetric {
return &channelz.ChannelInternalMetric{
State: d.state,
Target: d.target,
CallsStarted: d.callsStarted,
CallsSucceeded: d.callsSucceeded,
CallsFailed: d.callsFailed,
LastCallStartedTimestamp: d.lastCallStartedTimestamp,
}
}
type dummyServer struct {
callsStarted int64
callsSucceeded int64
callsFailed int64
lastCallStartedTimestamp time.Time
}
func (d *dummyServer) ChannelzMetric() *channelz.ServerInternalMetric {
return &channelz.ServerInternalMetric{
CallsStarted: d.callsStarted,
CallsSucceeded: d.callsSucceeded,
CallsFailed: d.callsFailed,
LastCallStartedTimestamp: d.lastCallStartedTimestamp,
}
}
type dummySocket struct {
streamsStarted int64
streamsSucceeded int64
streamsFailed int64
messagesSent int64
messagesReceived int64
keepAlivesSent int64
lastLocalStreamCreatedTimestamp time.Time
lastRemoteStreamCreatedTimestamp time.Time
lastMessageSentTimestamp time.Time
lastMessageReceivedTimestamp time.Time
localFlowControlWindow int64
remoteFlowControlWindow int64
//socket options
localAddr net.Addr
remoteAddr net.Addr
// Security
remoteName string
}
func (d *dummySocket) ChannelzMetric() *channelz.SocketInternalMetric {
return &channelz.SocketInternalMetric{
StreamsStarted: d.streamsStarted,
StreamsSucceeded: d.streamsSucceeded,
StreamsFailed: d.streamsFailed,
MessagesSent: d.messagesSent,
MessagesReceived: d.messagesReceived,
KeepAlivesSent: d.keepAlivesSent,
LastLocalStreamCreatedTimestamp: d.lastLocalStreamCreatedTimestamp,
LastRemoteStreamCreatedTimestamp: d.lastRemoteStreamCreatedTimestamp,
LastMessageSentTimestamp: d.lastMessageSentTimestamp,
LastMessageReceivedTimestamp: d.lastMessageReceivedTimestamp,
LocalFlowControlWindow: d.localFlowControlWindow,
RemoteFlowControlWindow: d.remoteFlowControlWindow,
//socket options
LocalAddr: d.localAddr,
RemoteAddr: d.remoteAddr,
// Security
RemoteName: d.remoteName,
}
}
func channelProtoToStruct(c *pb.Channel) *dummyChannel {
dc := &dummyChannel{}
pdata := c.GetData()
switch pdata.GetState().GetState() {
case pb.ChannelConnectivityState_UNKNOWN:
// TODO: what should we set here?
case pb.ChannelConnectivityState_IDLE:
dc.state = connectivity.Idle
case pb.ChannelConnectivityState_CONNECTING:
dc.state = connectivity.Connecting
case pb.ChannelConnectivityState_READY:
dc.state = connectivity.Ready
case pb.ChannelConnectivityState_TRANSIENT_FAILURE:
dc.state = connectivity.TransientFailure
case pb.ChannelConnectivityState_SHUTDOWN:
dc.state = connectivity.Shutdown
}
dc.target = pdata.GetTarget()
dc.callsStarted = pdata.CallsStarted
dc.callsSucceeded = pdata.CallsSucceeded
dc.callsFailed = pdata.CallsFailed
if t, err := ptypes.Timestamp(pdata.GetLastCallStartedTimestamp()); err == nil {
if !t.Equal(emptyTime) {
dc.lastCallStartedTimestamp = t
}
}
return dc
}
func serverProtoToStruct(s *pb.Server) *dummyServer {
ds := &dummyServer{}
pdata := s.GetData()
ds.callsStarted = pdata.CallsStarted
ds.callsSucceeded = pdata.CallsSucceeded
ds.callsFailed = pdata.CallsFailed
if t, err := ptypes.Timestamp(pdata.GetLastCallStartedTimestamp()); err == nil {
if !t.Equal(emptyTime) {
ds.lastCallStartedTimestamp = t
}
}
return ds
}
func protoToAddr(a *pb.Address) net.Addr {
switch v := a.Address.(type) {
case *pb.Address_TcpipAddress:
if port := v.TcpipAddress.GetPort(); port != 0 {
return &net.TCPAddr{IP: v.TcpipAddress.GetIpAddress(), Port: int(port)}
}
return &net.IPAddr{IP: v.TcpipAddress.GetIpAddress()}
case *pb.Address_UdsAddress_:
return &net.UnixAddr{Name: v.UdsAddress.GetFilename(), Net: "unix"}
case *pb.Address_OtherAddress_:
// TODO:
}
return nil
}
func socketProtoToStruct(s *pb.Socket) *dummySocket {
ds := &dummySocket{}
pdata := s.GetData()
ds.streamsStarted = pdata.GetStreamsStarted()
ds.streamsSucceeded = pdata.GetStreamsSucceeded()
ds.streamsFailed = pdata.GetStreamsFailed()
ds.messagesSent = pdata.GetMessagesSent()
ds.messagesReceived = pdata.GetMessagesReceived()
ds.keepAlivesSent = pdata.GetKeepAlivesSent()
if t, err := ptypes.Timestamp(pdata.GetLastLocalStreamCreatedTimestamp()); err == nil {
if !t.Equal(emptyTime) {
ds.lastLocalStreamCreatedTimestamp = t
}
}
if t, err := ptypes.Timestamp(pdata.GetLastRemoteStreamCreatedTimestamp()); err == nil {
if !t.Equal(emptyTime) {
ds.lastRemoteStreamCreatedTimestamp = t
}
}
if t, err := ptypes.Timestamp(pdata.GetLastMessageSentTimestamp()); err == nil {
if !t.Equal(emptyTime) {
ds.lastMessageSentTimestamp = t
}
}
if t, err := ptypes.Timestamp(pdata.GetLastMessageReceivedTimestamp()); err == nil {
if !t.Equal(emptyTime) {
ds.lastMessageReceivedTimestamp = t
}
}
if v := pdata.GetLocalFlowControlWindow(); v != nil {
ds.localFlowControlWindow = v.Value
}
if v := pdata.GetRemoteFlowControlWindow(); v != nil {
ds.remoteFlowControlWindow = v.Value
}
if local := s.GetLocal(); local != nil {
ds.localAddr = protoToAddr(local)
}
if remote := s.GetRemote(); remote != nil {
ds.remoteAddr = protoToAddr(remote)
}
ds.remoteName = s.GetRemoteName()
return ds
}
func convertSocketRefSliceToMap(sktRefs []*pb.SocketRef) map[int64]string {
m := make(map[int64]string)
for _, sr := range sktRefs {
m[sr.SocketId] = sr.Name
}
return m
}
func TestGetTopChannels(t *testing.T) {
tcs := []*dummyChannel{
{
state: connectivity.Connecting,
target: "test.channelz:1234",
callsStarted: 6,
callsSucceeded: 2,
callsFailed: 3,
lastCallStartedTimestamp: time.Now().UTC(),
},
{
state: connectivity.Connecting,
target: "test.channelz:1234",
callsStarted: 1,
callsSucceeded: 2,
callsFailed: 3,
lastCallStartedTimestamp: time.Now().UTC(),
},
{
state: connectivity.Shutdown,
target: "test.channelz:8888",
callsStarted: 0,
callsSucceeded: 0,
callsFailed: 0,
},
{},
}
channelz.NewChannelzStorage()
for _, c := range tcs {
channelz.RegisterChannel(c, 0, "")
}
s := newCZServer()
resp, _ := s.GetTopChannels(context.Background(), &pb.GetTopChannelsRequest{StartChannelId: 0})
if !resp.GetEnd() {
t.Fatalf("resp.GetEnd() want true, got %v", resp.GetEnd())
}
for i, c := range resp.GetChannel() {
if !reflect.DeepEqual(channelProtoToStruct(c), tcs[i]) {
t.Fatalf("dummyChannel: %d, want: %#v, got: %#v", i, tcs[i], channelProtoToStruct(c))
}
}
for i := 0; i < 50; i++ {
channelz.RegisterChannel(tcs[0], 0, "")
}
resp, _ = s.GetTopChannels(context.Background(), &pb.GetTopChannelsRequest{StartChannelId: 0})
if resp.GetEnd() {
t.Fatalf("resp.GetEnd() want false, got %v", resp.GetEnd())
}
}
func TestGetServers(t *testing.T) {
ss := []*dummyServer{
{
callsStarted: 6,
callsSucceeded: 2,
callsFailed: 3,
lastCallStartedTimestamp: time.Now().UTC(),
},
{
callsStarted: 1,
callsSucceeded: 2,
callsFailed: 3,
lastCallStartedTimestamp: time.Now().UTC(),
},
{
callsStarted: 1,
callsSucceeded: 0,
callsFailed: 0,
lastCallStartedTimestamp: time.Now().UTC(),
},
}
channelz.NewChannelzStorage()
for _, s := range ss {
channelz.RegisterServer(s, "")
}
svr := newCZServer()
resp, _ := svr.GetServers(context.Background(), &pb.GetServersRequest{StartServerId: 0})
if !resp.GetEnd() {
t.Fatalf("resp.GetEnd() want true, got %v", resp.GetEnd())
}
for i, s := range resp.GetServer() {
if !reflect.DeepEqual(serverProtoToStruct(s), ss[i]) {
t.Fatalf("dummyServer: %d, want: %#v, got: %#v", i, ss[i], serverProtoToStruct(s))
}
}
for i := 0; i < 50; i++ {
channelz.RegisterServer(ss[0], "")
}
resp, _ = svr.GetServers(context.Background(), &pb.GetServersRequest{StartServerId: 0})
if resp.GetEnd() {
t.Fatalf("resp.GetEnd() want false, got %v", resp.GetEnd())
}
}
func TestGetServerSockets(t *testing.T) {
channelz.NewChannelzStorage()
svrID := channelz.RegisterServer(&dummyServer{}, "")
refNames := []string{"listen socket 1", "normal socket 1", "normal socket 2"}
ids := make([]int64, 3)
ids[0] = channelz.RegisterListenSocket(&dummySocket{}, svrID, refNames[0])
ids[1] = channelz.RegisterNormalSocket(&dummySocket{}, svrID, refNames[1])
ids[2] = channelz.RegisterNormalSocket(&dummySocket{}, svrID, refNames[2])
svr := newCZServer()
resp, _ := svr.GetServerSockets(context.Background(), &pb.GetServerSocketsRequest{ServerId: svrID, StartSocketId: 0})
if !resp.GetEnd() {
t.Fatalf("resp.GetEnd() want: true, got: %v", resp.GetEnd())
}
// GetServerSockets only return normal sockets.
want := map[int64]string{
ids[1]: refNames[1],
ids[2]: refNames[2],
}
if !reflect.DeepEqual(convertSocketRefSliceToMap(resp.GetSocketRef()), want) {
t.Fatalf("GetServerSockets want: %#v, got: %#v", want, resp.GetSocketRef())
}
for i := 0; i < 50; i++ {
channelz.RegisterNormalSocket(&dummySocket{}, svrID, "")
}
resp, _ = svr.GetServerSockets(context.Background(), &pb.GetServerSocketsRequest{ServerId: svrID, StartSocketId: 0})
if resp.GetEnd() {
t.Fatalf("resp.GetEnd() want false, got %v", resp.GetEnd())
}
}
func TestGetChannel(t *testing.T) {
channelz.NewChannelzStorage()
refNames := []string{"top channel 1", "nested channel 1", "nested channel 2", "nested channel 3"}
ids := make([]int64, 4)
ids[0] = channelz.RegisterChannel(&dummyChannel{}, 0, refNames[0])
ids[1] = channelz.RegisterChannel(&dummyChannel{}, ids[0], refNames[1])
ids[2] = channelz.RegisterSubChannel(&dummyChannel{}, ids[0], refNames[2])
ids[3] = channelz.RegisterChannel(&dummyChannel{}, ids[1], refNames[3])
svr := newCZServer()
resp, _ := svr.GetChannel(context.Background(), &pb.GetChannelRequest{ChannelId: ids[0]})
metrics := resp.GetChannel()
subChans := metrics.GetSubchannelRef()
if len(subChans) != 1 || subChans[0].GetName() != refNames[2] || subChans[0].GetSubchannelId() != ids[2] {
t.Fatalf("GetSubChannelRef() want %#v, got %#v", []*pb.SubchannelRef{{ids[2], refNames[2]}}, subChans)
}
nestedChans := metrics.GetChannelRef()
if len(nestedChans) != 1 || nestedChans[0].GetName() != refNames[1] || nestedChans[0].GetChannelId() != ids[1] {
t.Fatalf("GetChannelRef() want %#v, got %#v", []*pb.ChannelRef{{ids[1], refNames[1]}}, nestedChans)
}
resp, _ = svr.GetChannel(context.Background(), &pb.GetChannelRequest{ChannelId: ids[1]})
metrics = resp.GetChannel()
nestedChans = metrics.GetChannelRef()
if len(nestedChans) != 1 || nestedChans[0].GetName() != refNames[3] || nestedChans[0].GetChannelId() != ids[3] {
t.Fatalf("GetChannelRef() want %#v, got %#v", []*pb.ChannelRef{{ids[3], refNames[3]}}, nestedChans)
}
}
func TestGetSubChannel(t *testing.T) {
channelz.NewChannelzStorage()
refNames := []string{"top channel 1", "sub channel 1", "socket 1", "socket 2"}
ids := make([]int64, 4)
ids[0] = channelz.RegisterChannel(&dummyChannel{}, 0, refNames[0])
ids[1] = channelz.RegisterSubChannel(&dummyChannel{}, ids[0], refNames[1])
ids[2] = channelz.RegisterNormalSocket(&dummySocket{}, ids[1], refNames[2])
ids[3] = channelz.RegisterNormalSocket(&dummySocket{}, ids[1], refNames[3])
svr := newCZServer()
resp, _ := svr.GetSubchannel(context.Background(), &pb.GetSubchannelRequest{SubchannelId: ids[1]})
metrics := resp.GetSubchannel()
want := map[int64]string{
ids[2]: refNames[2],
ids[3]: refNames[3],
}
if !reflect.DeepEqual(convertSocketRefSliceToMap(metrics.GetSocketRef()), want) {
t.Fatalf("GetSocketRef() want %#v: got: %#v", want, metrics.GetSocketRef())
}
}
func TestGetSocket(t *testing.T) {
channelz.NewChannelzStorage()
ss := []*dummySocket{
{
streamsStarted: 10,
streamsSucceeded: 2,
streamsFailed: 3,
messagesSent: 20,
messagesReceived: 10,
keepAlivesSent: 2,
lastLocalStreamCreatedTimestamp: time.Now().UTC(),
lastRemoteStreamCreatedTimestamp: time.Now().UTC(),
lastMessageSentTimestamp: time.Now().UTC(),
lastMessageReceivedTimestamp: time.Now().UTC(),
localFlowControlWindow: 65536,
remoteFlowControlWindow: 1024,
localAddr: &net.TCPAddr{IP: net.ParseIP("1.0.0.1"), Port: 10001},
remoteAddr: &net.TCPAddr{IP: net.ParseIP("12.0.0.1"), Port: 10002},
remoteName: "remote.remote",
},
{
streamsStarted: 10,
streamsSucceeded: 2,
streamsFailed: 3,
messagesSent: 20,
messagesReceived: 10,
keepAlivesSent: 2,
lastRemoteStreamCreatedTimestamp: time.Now().UTC(),
lastMessageSentTimestamp: time.Now().UTC(),
lastMessageReceivedTimestamp: time.Now().UTC(),
localFlowControlWindow: 65536,
remoteFlowControlWindow: 1024,
localAddr: &net.UnixAddr{Name: "file.path", Net: "unix"},
remoteAddr: &net.UnixAddr{Name: "another.path", Net: "unix"},
remoteName: "remote.remote",
},
{
streamsStarted: 5,
streamsSucceeded: 2,
streamsFailed: 3,
messagesSent: 20,
messagesReceived: 10,
keepAlivesSent: 2,
lastLocalStreamCreatedTimestamp: time.Now().UTC(),
lastMessageSentTimestamp: time.Now().UTC(),
lastMessageReceivedTimestamp: time.Now().UTC(),
localFlowControlWindow: 65536,
remoteFlowControlWindow: 10240,
localAddr: &net.IPAddr{IP: net.ParseIP("1.0.0.1")},
remoteAddr: &net.IPAddr{IP: net.ParseIP("9.0.0.1")},
remoteName: "",
},
{
localAddr: &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 10001},
},
}
svr := newCZServer()
ids := make([]int64, len(ss))
svrID := channelz.RegisterServer(&dummyServer{}, "")
for i, s := range ss {
ids[i] = channelz.RegisterNormalSocket(s, svrID, strconv.Itoa(i))
}
for i, s := range ss {
resp, _ := svr.GetSocket(context.Background(), &pb.GetSocketRequest{SocketId: ids[i]})
metrics := resp.GetSocket()
if !reflect.DeepEqual(metrics.GetRef(), &pb.SocketRef{SocketId: ids[i], Name: strconv.Itoa(i)}) || !reflect.DeepEqual(socketProtoToStruct(metrics), s) {
t.Fatalf("resp.GetSocket() want: metrics.GetRef() = %#v and %#v, got: metrics.GetRef() = %#v and %#v", &pb.SocketRef{SocketId: ids[i], Name: strconv.Itoa(i)}, s, metrics.GetRef(), socketProtoToStruct(metrics))
}
}
}

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -0,0 +1,437 @@
// Copyright 2018 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// This file defines an interface for exporting monitoring information
// out of gRPC servers. See the full design at
// https://github.com/grpc/proposal/blob/master/A14-channelz.md
syntax = "proto3";
package grpc.channelz;
import "google/protobuf/any.proto";
import "google/protobuf/duration.proto";
import "google/protobuf/timestamp.proto";
import "google/protobuf/wrappers.proto";
option java_multiple_files = true;
option java_package = "io.grpc.channelz.v1";
option java_outer_classname = "ChannelzProto";
// Channel is a logical grouping of channels, subchannels, and sockets.
message Channel {
// The identifier for this channel.
ChannelRef ref = 1;
// Data specific to this channel.
ChannelData data = 2;
// At most one of 'channel_ref+subchannel_ref' and 'socket' is set.
// There are no ordering guarantees on the order of channel refs.
// There may not be cycles in the ref graph.
// A channel ref may be present in more than one channel or subchannel.
repeated ChannelRef channel_ref = 3;
// At most one of 'channel_ref+subchannel_ref' and 'socket' is set.
// There are no ordering guarantees on the order of subchannel refs.
// There may not be cycles in the ref graph.
// A sub channel ref may be present in more than one channel or subchannel.
repeated SubchannelRef subchannel_ref = 4;
// There are no ordering guarantees on the order of sockets.
repeated SocketRef socket_ref = 5;
}
// Subchannel is a logical grouping of channels, subchannels, and sockets.
// A subchannel is load balanced over by it's ancestor
message Subchannel {
// The identifier for this channel.
SubchannelRef ref = 1;
// Data specific to this channel.
ChannelData data = 2;
// At most one of 'channel_ref+subchannel_ref' and 'socket' is set.
// There are no ordering guarantees on the order of channel refs.
// There may not be cycles in the ref graph.
// A channel ref may be present in more than one channel or subchannel.
repeated ChannelRef channel_ref = 3;
// At most one of 'channel_ref+subchannel_ref' and 'socket' is set.
// There are no ordering guarantees on the order of subchannel refs.
// There may not be cycles in the ref graph.
// A sub channel ref may be present in more than one channel or subchannel.
repeated SubchannelRef subchannel_ref = 4;
// There are no ordering guarantees on the order of sockets.
repeated SocketRef socket_ref = 5;
}
// These come from the specified states in this document:
// https://github.com/grpc/grpc/blob/master/doc/connectivity-semantics-and-api.md
message ChannelConnectivityState {
enum State {
UNKNOWN = 0;
IDLE = 1;
CONNECTING = 2;
READY = 3;
TRANSIENT_FAILURE = 4;
SHUTDOWN = 5;
}
State state = 1;
}
message ChannelData {
ChannelConnectivityState state = 1;
// The target this channel originally tried to connect to. May be absent
string target = 2;
ChannelTrace trace = 3;
// The number of calls started on the channel
int64 calls_started = 4;
// The number of calls that have completed with an OK status
int64 calls_succeeded = 5;
// The number of calls that have a completed with a non-OK status
int64 calls_failed = 6;
// The last time a call was started on the channel.
google.protobuf.Timestamp last_call_started_timestamp = 7;
}
message ChannelTrace {
// see the proto in the gRFC for channel tracing:
// A3-channel-tracing.md
}
message ChannelRef {
// The globally unique id for this channel. Must be a positive number.
int64 channel_id = 1;
// An optional name associated with the channel.
string name = 2;
// Intentionally don't use field numbers from other refs.
reserved 3, 4, 5, 6;
}
message SubchannelRef {
// The globally unique id for this subchannel. Must be a positive number.
int64 subchannel_id = 7;
// An optional name associated with the subchannel.
string name = 8;
// Intentionally don't use field numbers from other refs.
reserved 1, 2, 3, 4, 5, 6;
}
message SocketRef {
int64 socket_id = 3;
// An optional name associated with the socket.
string name = 4;
// Intentionally don't use field numbers from other refs.
reserved 1, 2, 5, 6, 7, 8;
}
message ServerRef {
// A globally unique identifier for this server. Must be a positive number.
int64 server_id = 5;
// An optional name associated with the server.
string name = 6;
// Intentionally don't use field numbers from other refs.
reserved 1, 2, 3, 4, 7, 8;
}
message Server {
ServerRef ref = 1;
ServerData data = 2;
// The sockets that the server is listening on. There are no ordering
// guarantees.
repeated SocketRef listen_socket = 3;
}
message ServerData {
ChannelTrace trace = 1;
// The number of incoming calls started on the server
int64 calls_started = 2;
// The number of incoming calls that have completed with an OK status
int64 calls_succeeded = 3;
// The number of incoming calls that have a completed with a non-OK status
int64 calls_failed = 4;
// The last time a call was started on the server.
google.protobuf.Timestamp last_call_started_timestamp = 5;
}
// Information about an actual connection. Pronounced "sock-ay".
message Socket {
SocketRef ref = 1;
SocketData data = 2;
// The locally bound address.
Address local = 3;
// The remote bound address. May be absent.
Address remote = 4;
Security security = 5;
// Optional, represents the name of the remote endpoint, if different than
// the original target name.
string remote_name = 6;
}
message SocketData {
// The number of streams that have been started.
int64 streams_started = 1;
// The number of streams that have ended successfully:
// On client side, received frame with eos bit set;
// On server side, sent frame with eos bit set.
int64 streams_succeeded = 2;
// The number of streams that have ended unsuccessfully:
// On client side, ended without receiving frame with eos bit set;
// On server side, ended without sending frame with eos bit set.
int64 streams_failed = 3;
// The number of messages successfully sent on this socket.
int64 messages_sent = 4;
int64 messages_received = 5;
// The number of keep alives sent. This is typically implemented with HTTP/2
// ping messages.
int64 keep_alives_sent = 6;
// The last time a stream was created by this endpoint. Usually unset for
// servers.
google.protobuf.Timestamp last_local_stream_created_timestamp = 7;
// The last time a stream was created by the remote endpoint. Usually unset
// for clients.
google.protobuf.Timestamp last_remote_stream_created_timestamp = 8;
// The last time a message was sent by this endpoint.
google.protobuf.Timestamp last_message_sent_timestamp = 9;
// The last time a message was received by this endpoint.
google.protobuf.Timestamp last_message_received_timestamp = 10;
// The amount of window, granted to the local endpoint by the remote endpoint.
// This may be slightly out of date due to network latency. This does NOT
// include stream level or TCP level flow control info.
google.protobuf.Int64Value local_flow_control_window = 11;
// The amount of window, granted to the remote endpoint by the local endpoint.
// This may be slightly out of date due to network latency. This does NOT
// include stream level or TCP level flow control info.
google.protobuf.Int64Value remote_flow_control_window = 12;
repeated SocketOption option = 13;
}
message Address {
message TcpIpAddress {
// Either the IPv4 or IPv6 address in bytes. Will either be 4 bytes or 16
// bytes in length.
bytes ip_address = 1;
// 0-64k, or -1 if not appropriate.
int32 port = 2;
}
// A Unix Domain Socket address.
message UdsAddress {
string filename = 1;
}
// An address type not included above.
message OtherAddress {
// The human readable version of the value.
string name = 1;
// The actual address message.
google.protobuf.Any value = 2;
}
oneof address {
TcpIpAddress tcpip_address = 1;
UdsAddress uds_address = 2;
OtherAddress other_address = 3;
}
}
message Security {
message Tls {
oneof cipher_suite {
// The cipher suite name in the RFC 4346 format:
// https://tools.ietf.org/html/rfc4346#appendix-C
string standard_name = 1;
// Some other way to describe the cipher suite if
// the RFC 4346 name is not available.
string other_name = 2;
}
// the certificate used by this endpoint.
bytes local_certificate = 3;
// the certificate used by the remote endpoint.
bytes remote_certificate = 4;
}
message OtherSecurity {
// The human readable version of the value.
string name = 1;
// The actual security details message.
google.protobuf.Any value = 2;
}
oneof model {
Tls tls = 1;
OtherSecurity other = 2;
}
}
message SocketOption {
string name = 1;
// The human readable value of this socket option. At least one of value or
// additional will be set.
string value = 2;
// Additional data associated with the socket option. At least one of value
// or additional will be set.
google.protobuf.Any additional = 3;
}
// For use with SocketOption's additional field. This is primarily used for
// SO_RCVTIMEO and SO_SNDTIMEO
message SocketOptionTimeout {
google.protobuf.Duration duration = 1;
}
message SocketOptionLinger {
bool active = 1;
google.protobuf.Duration duration = 2;
}
// Tcp info for SOL_TCP, TCP_INFO
message SocketOptionTcpInfo {
uint32 tcpi_state = 1;
uint32 tcpi_ca_state = 2;
uint32 tcpi_retransmits = 3;
uint32 tcpi_probes = 4;
uint32 tcpi_backoff = 5;
uint32 tcpi_options = 6;
uint32 tcpi_snd_wscale = 7;
uint32 tcpi_rcv_wscale = 8;
uint32 tcpi_rto = 9;
uint32 tcpi_ato = 10;
uint32 tcpi_snd_mss = 11;
uint32 tcpi_rcv_mss = 12;
uint32 tcpi_unacked = 13;
uint32 tcpi_sacked = 14;
uint32 tcpi_lost = 15;
uint32 tcpi_retrans = 16;
uint32 tcpi_fackets = 17;
uint32 tcpi_last_data_sent = 18;
uint32 tcpi_last_ack_sent = 19;
uint32 tcpi_last_data_recv = 20;
uint32 tcpi_last_ack_recv = 21;
uint32 tcpi_pmtu = 22;
uint32 tcpi_rcv_ssthresh = 23;
uint32 tcpi_rtt = 24;
uint32 tcpi_rttvar = 25;
uint32 tcpi_snd_ssthresh = 26;
uint32 tcpi_snd_cwnd = 27;
uint32 tcpi_advmss = 28;
uint32 tcpi_reordering = 29;
}
service Channelz {
// Gets all root channels (e.g. channels the application has directly
// created). This does not include subchannels nor non-top level channels.
rpc GetTopChannels(GetTopChannelsRequest) returns (GetTopChannelsResponse);
// Gets all servers that exist in the process.
rpc GetServers(GetServersRequest) returns (GetServersResponse);
// Gets all server sockets that exist in the process.
rpc GetServerSockets(GetServerSocketsRequest) returns (GetServerSocketsResponse);
// Returns a single Channel, or else a NOT_FOUND code.
rpc GetChannel(GetChannelRequest) returns (GetChannelResponse);
// Returns a single Subchannel, or else a NOT_FOUND code.
rpc GetSubchannel(GetSubchannelRequest) returns (GetSubchannelResponse);
// Returns a single Socket or else a NOT_FOUND code.
rpc GetSocket(GetSocketRequest) returns (GetSocketResponse);
}
message GetServersRequest {
// start_server_id indicates that only servers at or above this id should be
// included in the results.
int64 start_server_id = 1;
}
message GetServersResponse {
// list of servers that the connection detail service knows about. Sorted in
// ascending server_id order.
repeated Server server = 1;
// If set, indicates that the list of servers is the final list. Requesting
// more servers will only return more if they are created after this RPC
// completes.
bool end = 2;
}
message GetServerSocketsRequest {
int64 server_id = 1;
// start_socket_id indicates that only sockets at or above this id should be
// included in the results.
int64 start_socket_id = 2;
}
message GetServerSocketsResponse {
// list of socket refs that the connection detail service knows about. Sorted in
// ascending socket_id order.
repeated SocketRef socket_ref = 1;
// If set, indicates that the list of sockets is the final list. Requesting
// more sockets will only return more if they are created after this RPC
// completes.
bool end = 2;
}
message GetTopChannelsRequest {
// start_channel_id indicates that only channels at or above this id should be
// included in the results.
int64 start_channel_id = 1;
}
message GetTopChannelsResponse {
// list of channels that the connection detail service knows about. Sorted in
// ascending channel_id order.
repeated Channel channel = 1;
// If set, indicates that the list of channels is the final list. Requesting
// more channels can only return more if they are created after this RPC
// completes.
bool end = 2;
}
message GetChannelRequest {
int64 channel_id = 1;
}
message GetChannelResponse {
Channel channel = 1;
}
message GetSubchannelRequest {
int64 subchannel_id = 1;
}
message GetSubchannelResponse {
Subchannel subchannel = 1;
}
message GetSocketRequest {
int64 socket_id = 1;
}
message GetSocketResponse {
Socket socket = 1;
}