This commit is contained in:
Garrett Gutierrez 2020-06-26 12:04:47 -07:00 коммит произвёл GitHub
Родитель 4241954407
Коммит 506b773066
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
44 изменённых файлов: 426 добавлений и 287 удалений

Просмотреть файл

@ -28,6 +28,8 @@ import (
"google.golang.org/grpc/resolver"
)
var logger = grpclog.Component("balancer")
type baseBuilder struct {
name string
pickerBuilder PickerBuilder
@ -91,8 +93,8 @@ func (b *baseBalancer) ResolverError(err error) {
func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
// TODO: handle s.ResolverState.ServiceConfig?
if grpclog.V(2) {
grpclog.Infoln("base.baseBalancer: got new ClientConn state: ", s)
if logger.V(2) {
logger.Info("base.baseBalancer: got new ClientConn state: ", s)
}
// Successful resolution; clear resolver error and ensure we return nil.
b.resolverErr = nil
@ -104,7 +106,7 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
// a is a new address (not existing in b.subConns).
sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: b.config.HealthCheck})
if err != nil {
grpclog.Warningf("base.baseBalancer: failed to create new SubConn: %v", err)
logger.Warningf("base.baseBalancer: failed to create new SubConn: %v", err)
continue
}
b.subConns[a] = sc
@ -168,13 +170,13 @@ func (b *baseBalancer) regeneratePicker() {
func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
s := state.ConnectivityState
if grpclog.V(2) {
grpclog.Infof("base.baseBalancer: handle SubConn state change: %p, %v", sc, s)
if logger.V(2) {
logger.Infof("base.baseBalancer: handle SubConn state change: %p, %v", sc, s)
}
oldS, ok := b.scStates[sc]
if !ok {
if grpclog.V(2) {
grpclog.Infof("base.baseBalancer: got state changes for an unknown SubConn: %p, %v", sc, s)
if logger.V(2) {
logger.Infof("base.baseBalancer: got state changes for an unknown SubConn: %p, %v", sc, s)
}
return
}

Просмотреть файл

@ -50,6 +50,7 @@ const (
)
var errServerTerminatedConnection = errors.New("grpclb: failed to recv server list: server terminated connection")
var logger = grpclog.Component("grpclb")
func convertDuration(d *durationpb.Duration) time.Duration {
if d == nil {
@ -150,11 +151,11 @@ func (b *lbBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) bal
if opt.CredsBundle != nil {
lb.grpclbClientConnCreds, err = opt.CredsBundle.NewWithMode(internal.CredsBundleModeBalancer)
if err != nil {
grpclog.Warningf("lbBalancer: client connection creds NewWithMode failed: %v", err)
logger.Warningf("lbBalancer: client connection creds NewWithMode failed: %v", err)
}
lb.grpclbBackendCreds, err = opt.CredsBundle.NewWithMode(internal.CredsBundleModeBackendFromBalancer)
if err != nil {
grpclog.Warningf("lbBalancer: backend creds NewWithMode failed: %v", err)
logger.Warningf("lbBalancer: backend creds NewWithMode failed: %v", err)
}
}
@ -310,16 +311,16 @@ func (lb *lbBalancer) aggregateSubConnStates() connectivity.State {
func (lb *lbBalancer) UpdateSubConnState(sc balancer.SubConn, scs balancer.SubConnState) {
s := scs.ConnectivityState
if grpclog.V(2) {
grpclog.Infof("lbBalancer: handle SubConn state change: %p, %v", sc, s)
if logger.V(2) {
logger.Infof("lbBalancer: handle SubConn state change: %p, %v", sc, s)
}
lb.mu.Lock()
defer lb.mu.Unlock()
oldS, ok := lb.scStates[sc]
if !ok {
if grpclog.V(2) {
grpclog.Infof("lbBalancer: got state changes for an unknown SubConn: %p, %v", sc, s)
if logger.V(2) {
logger.Infof("lbBalancer: got state changes for an unknown SubConn: %p, %v", sc, s)
}
return
}
@ -393,8 +394,8 @@ func (lb *lbBalancer) handleServiceConfig(gc *grpclbServiceConfig) {
if lb.usePickFirst == newUsePickFirst {
return
}
if grpclog.V(2) {
grpclog.Infof("lbBalancer: switching mode, new usePickFirst: %+v", newUsePickFirst)
if logger.V(2) {
logger.Infof("lbBalancer: switching mode, new usePickFirst: %+v", newUsePickFirst)
}
lb.refreshSubConns(lb.backendAddrs, lb.inFallback, newUsePickFirst)
}
@ -405,8 +406,8 @@ func (lb *lbBalancer) ResolverError(error) {
}
func (lb *lbBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
if grpclog.V(2) {
grpclog.Infof("lbBalancer: UpdateClientConnState: %+v", ccs)
if logger.V(2) {
logger.Infof("lbBalancer: UpdateClientConnState: %+v", ccs)
}
gc, _ := ccs.BalancerConfig.(*grpclbServiceConfig)
lb.handleServiceConfig(gc)

Просмотреть файл

@ -33,7 +33,6 @@ import (
"google.golang.org/grpc/balancer"
lbpb "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/keepalive"
@ -44,8 +43,8 @@ import (
// processServerList updates balancer's internal state, create/remove SubConns
// and regenerates picker using the received serverList.
func (lb *lbBalancer) processServerList(l *lbpb.ServerList) {
if grpclog.V(2) {
grpclog.Infof("lbBalancer: processing server list: %+v", l)
if logger.V(2) {
logger.Infof("lbBalancer: processing server list: %+v", l)
}
lb.mu.Lock()
defer lb.mu.Unlock()
@ -56,8 +55,8 @@ func (lb *lbBalancer) processServerList(l *lbpb.ServerList) {
// If the new server list == old server list, do nothing.
if cmp.Equal(lb.fullServerList, l.Servers, cmp.Comparer(proto.Equal)) {
if grpclog.V(2) {
grpclog.Infof("lbBalancer: new serverlist same as the previous one, ignoring")
if logger.V(2) {
logger.Infof("lbBalancer: new serverlist same as the previous one, ignoring")
}
return
}
@ -81,8 +80,8 @@ func (lb *lbBalancer) processServerList(l *lbpb.ServerList) {
Addr: fmt.Sprintf("%s:%d", ipStr, s.Port),
Metadata: &md,
}
if grpclog.V(2) {
grpclog.Infof("lbBalancer: server list entry[%d]: ipStr:|%s|, port:|%d|, load balancer token:|%v|",
if logger.V(2) {
logger.Infof("lbBalancer: server list entry[%d]: ipStr:|%s|, port:|%d|, load balancer token:|%v|",
i, ipStr, s.Port, s.LoadBalanceToken)
}
backendAddrs = append(backendAddrs, addr)
@ -150,7 +149,7 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback
// This bypasses the cc wrapper with SubConn cache.
sc, err := lb.cc.cc.NewSubConn(backendAddrs, opts)
if err != nil {
grpclog.Warningf("grpclb: failed to create new SubConn: %v", err)
logger.Warningf("grpclb: failed to create new SubConn: %v", err)
return
}
sc.Connect()
@ -173,7 +172,7 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback
// Use addrWithMD to create the SubConn.
sc, err := lb.cc.NewSubConn([]resolver.Address{addr}, opts)
if err != nil {
grpclog.Warningf("grpclb: failed to create new SubConn: %v", err)
logger.Warningf("grpclb: failed to create new SubConn: %v", err)
continue
}
lb.subConns[addrWithoutMD] = sc // Use the addr without MD as key for the map.
@ -245,7 +244,7 @@ func (lb *lbBalancer) newRemoteBalancerCCWrapper() {
// receive ServerName as authority.
cc, err := grpc.DialContext(context.Background(), lb.manualResolver.Scheme()+":///grpclb.subClientConn", dopts...)
if err != nil {
grpclog.Fatalf("failed to dial: %v", err)
logger.Fatalf("failed to dial: %v", err)
}
ccw := &remoteBalancerCCWrapper{
cc: cc,
@ -373,9 +372,9 @@ func (ccw *remoteBalancerCCWrapper) watchRemoteBalancer() {
default:
if err != nil {
if err == errServerTerminatedConnection {
grpclog.Info(err)
logger.Info(err)
} else {
grpclog.Warning(err)
logger.Warning(err)
}
}
}

Просмотреть файл

@ -34,6 +34,7 @@ var (
// For overriding in tests.
newRLSClientFunc = newRLSClient
logger = grpclog.Component("rls")
)
// rlsBalancer implements the RLS LB policy.
@ -75,18 +76,18 @@ func (lb *rlsBalancer) run() {
// channel accordingly.
// TODO(easwars): Handle updates to other fields in the service config.
func (lb *rlsBalancer) handleClientConnUpdate(ccs *balancer.ClientConnState) {
grpclog.Infof("rls: service config: %+v", ccs.BalancerConfig)
logger.Infof("rls: service config: %+v", ccs.BalancerConfig)
lb.mu.Lock()
defer lb.mu.Unlock()
if lb.done.HasFired() {
grpclog.Warning("rls: received service config after balancer close")
logger.Warning("rls: received service config after balancer close")
return
}
newCfg := ccs.BalancerConfig.(*lbConfig)
if lb.lbCfg.Equal(newCfg) {
grpclog.Info("rls: new service config matches existing config")
logger.Info("rls: new service config matches existing config")
return
}
@ -109,12 +110,12 @@ func (lb *rlsBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error
func (lb *rlsBalancer) ResolverError(error) {
// ResolverError is called by gRPC when the name resolver reports an error.
// TODO(easwars): How do we handle this?
grpclog.Fatal("rls: ResolverError is not yet unimplemented")
logger.Fatal("rls: ResolverError is not yet unimplemented")
}
// UpdateSubConnState implements balancer.V2Balancer interface.
func (lb *rlsBalancer) UpdateSubConnState(_ balancer.SubConn, _ balancer.SubConnState) {
grpclog.Fatal("rls: UpdateSubConnState is not yet implemented")
logger.Fatal("rls: UpdateSubConnState is not yet implemented")
}
// Cleans up the resources allocated by the LB policy including the clientConn
@ -162,7 +163,7 @@ func (lb *rlsBalancer) updateControlChannel(newCfg *lbConfig) {
cc, err := grpc.Dial(newCfg.lookupService, dopts...)
if err != nil {
grpclog.Errorf("rls: dialRLS(%s, %v): %v", newCfg.lookupService, lb.opts, err)
logger.Errorf("rls: dialRLS(%s, %v): %v", newCfg.lookupService, lb.opts, err)
// An error from a non-blocking dial indicates something serious. We
// should continue to use the old control channel if one exists, and
// return so that the rest of the config updates can be processes.
@ -185,14 +186,14 @@ func dialCreds(opts balancer.BuildOptions) grpc.DialOption {
switch {
case opts.DialCreds != nil:
if err := opts.DialCreds.OverrideServerName(server); err != nil {
grpclog.Warningf("rls: OverrideServerName(%s) = (%v), using Insecure", server, err)
logger.Warningf("rls: OverrideServerName(%s) = (%v), using Insecure", server, err)
return grpc.WithInsecure()
}
return grpc.WithTransportCredentials(opts.DialCreds)
case opts.CredsBundle != nil:
return grpc.WithTransportCredentials(opts.CredsBundle.TransportCredentials())
default:
grpclog.Warning("rls: no credentials available, using Insecure")
logger.Warning("rls: no credentials available, using Insecure")
return grpc.WithInsecure()
}
}

6
balancer/rls/internal/cache/cache.go поставляемый
Просмотреть файл

@ -30,6 +30,8 @@ import (
"google.golang.org/grpc/internal/backoff"
)
var logger = grpclog.Component("rls")
// Key represents the cache key used to uniquely identify a cache entry.
type Key struct {
// Path is the full path of the incoming RPC request.
@ -175,7 +177,7 @@ func (lru *LRU) removeToFit(newSize int64) {
if elem == nil {
// This is a corner case where the cache is empty, but the new entry
// to be added is bigger than maxSize.
grpclog.Info("rls: newly added cache entry exceeds cache maxSize")
logger.Info("rls: newly added cache entry exceeds cache maxSize")
return
}
@ -184,7 +186,7 @@ func (lru *LRU) removeToFit(newSize int64) {
// When the oldest entry is too new (it hasn't even spent a default
// minimum amount of time in the cache), we abort and allow the
// cache to grow bigger than the configured maxSize.
grpclog.Info("rls: LRU eviction finds oldest entry to be too new. Allowing cache to exceed maxSize momentarily")
logger.Info("rls: LRU eviction finds oldest entry to be too new. Allowing cache to exceed maxSize momentarily")
return
}
lru.removeElement(elem)

Просмотреть файл

@ -32,7 +32,6 @@ import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/rls/internal/keys"
rlspb "google.golang.org/grpc/balancer/rls/internal/proto/grpc_lookup_v1"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/grpcutil"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
@ -238,11 +237,11 @@ func (*rlsBB) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig,
return nil, fmt.Errorf("rls: stale_age is set, but max_age is not in service config {%+v}", string(c))
}
if staleAge >= maxAge {
grpclog.Info("rls: stale_age {%v} is greater than max_age {%v}, ignoring it", staleAge, maxAge)
logger.Info("rls: stale_age {%v} is greater than max_age {%v}, ignoring it", staleAge, maxAge)
staleAge = 0
}
if maxAge == 0 || maxAge > maxMaxAge {
grpclog.Infof("rls: max_age in service config is %v, using %v", maxAge, maxMaxAge)
logger.Infof("rls: max_age in service config is %v, using %v", maxAge, maxMaxAge)
maxAge = maxMaxAge
}

Просмотреть файл

@ -33,6 +33,8 @@ import (
// Name is the name of round_robin balancer.
const Name = "round_robin"
var logger = grpclog.Component("roundrobin")
// newBuilder creates a new roundrobin balancer builder.
func newBuilder() balancer.Builder {
return base.NewBalancerBuilder(Name, &rrPickerBuilder{}, base.Config{HealthCheck: true})
@ -45,7 +47,7 @@ func init() {
type rrPickerBuilder struct{}
func (*rrPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
grpclog.Infof("roundrobinPicker: newPicker called with info: %v", info)
logger.Infof("roundrobinPicker: newPicker called with info: %v", info)
if len(info.ReadySCs) == 0 {
return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
}

Просмотреть файл

@ -220,7 +220,7 @@ func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
ac, err := cc.newAddrConn(addrs, opts)
if err != nil {
channelz.Warningf(acbw.ac.channelzID, "acBalancerWrapper: UpdateAddresses: failed to newAddrConn: %v", err)
channelz.Warningf(logger, acbw.ac.channelzID, "acBalancerWrapper: UpdateAddresses: failed to newAddrConn: %v", err)
return
}
acbw.ac = ac

Просмотреть файл

@ -31,6 +31,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/status"
)
@ -39,6 +40,8 @@ func init() {
channelz.TurnOn()
}
var logger = grpclog.Component("channelz")
// RegisterChannelzServiceToServer registers the channelz service to the given server.
func RegisterChannelzServiceToServer(s *grpc.Server) {
channelzgrpc.RegisterChannelzServer(s, newCZServer())

Просмотреть файл

@ -468,12 +468,12 @@ func (s) TestGetChannel(t *testing.T) {
refNames := []string{"top channel 1", "nested channel 1", "sub channel 2", "nested channel 3"}
ids := make([]int64, 4)
ids[0] = channelz.RegisterChannel(&dummyChannel{}, 0, refNames[0])
channelz.AddTraceEvent(ids[0], 0, &channelz.TraceEventDesc{
channelz.AddTraceEvent(logger, ids[0], 0, &channelz.TraceEventDesc{
Desc: "Channel Created",
Severity: channelz.CtINFO,
})
ids[1] = channelz.RegisterChannel(&dummyChannel{}, ids[0], refNames[1])
channelz.AddTraceEvent(ids[1], 0, &channelz.TraceEventDesc{
channelz.AddTraceEvent(logger, ids[1], 0, &channelz.TraceEventDesc{
Desc: "Channel Created",
Severity: channelz.CtINFO,
Parent: &channelz.TraceEventDesc{
@ -483,7 +483,7 @@ func (s) TestGetChannel(t *testing.T) {
})
ids[2] = channelz.RegisterSubChannel(&dummyChannel{}, ids[0], refNames[2])
channelz.AddTraceEvent(ids[2], 0, &channelz.TraceEventDesc{
channelz.AddTraceEvent(logger, ids[2], 0, &channelz.TraceEventDesc{
Desc: "SubChannel Created",
Severity: channelz.CtINFO,
Parent: &channelz.TraceEventDesc{
@ -492,7 +492,7 @@ func (s) TestGetChannel(t *testing.T) {
},
})
ids[3] = channelz.RegisterChannel(&dummyChannel{}, ids[1], refNames[3])
channelz.AddTraceEvent(ids[3], 0, &channelz.TraceEventDesc{
channelz.AddTraceEvent(logger, ids[3], 0, &channelz.TraceEventDesc{
Desc: "Channel Created",
Severity: channelz.CtINFO,
Parent: &channelz.TraceEventDesc{
@ -500,11 +500,11 @@ func (s) TestGetChannel(t *testing.T) {
Severity: channelz.CtINFO,
},
})
channelz.AddTraceEvent(ids[0], 0, &channelz.TraceEventDesc{
channelz.AddTraceEvent(logger, ids[0], 0, &channelz.TraceEventDesc{
Desc: fmt.Sprintf("Channel Connectivity change to %v", connectivity.Ready),
Severity: channelz.CtINFO,
})
channelz.AddTraceEvent(ids[0], 0, &channelz.TraceEventDesc{
channelz.AddTraceEvent(logger, ids[0], 0, &channelz.TraceEventDesc{
Desc: "Resolver returns an empty address list",
Severity: channelz.CtWarning,
})
@ -571,12 +571,12 @@ func (s) TestGetSubChannel(t *testing.T) {
refNames := []string{"top channel 1", "sub channel 1", "socket 1", "socket 2"}
ids := make([]int64, 4)
ids[0] = channelz.RegisterChannel(&dummyChannel{}, 0, refNames[0])
channelz.AddTraceEvent(ids[0], 0, &channelz.TraceEventDesc{
channelz.AddTraceEvent(logger, ids[0], 0, &channelz.TraceEventDesc{
Desc: "Channel Created",
Severity: channelz.CtINFO,
})
ids[1] = channelz.RegisterSubChannel(&dummyChannel{}, ids[0], refNames[1])
channelz.AddTraceEvent(ids[1], 0, &channelz.TraceEventDesc{
channelz.AddTraceEvent(logger, ids[1], 0, &channelz.TraceEventDesc{
Desc: subchanCreated,
Severity: channelz.CtINFO,
Parent: &channelz.TraceEventDesc{
@ -586,11 +586,11 @@ func (s) TestGetSubChannel(t *testing.T) {
})
ids[2] = channelz.RegisterNormalSocket(&dummySocket{}, ids[1], refNames[2])
ids[3] = channelz.RegisterNormalSocket(&dummySocket{}, ids[1], refNames[3])
channelz.AddTraceEvent(ids[1], 0, &channelz.TraceEventDesc{
channelz.AddTraceEvent(logger, ids[1], 0, &channelz.TraceEventDesc{
Desc: subchanConnectivityChange,
Severity: channelz.CtINFO,
})
channelz.AddTraceEvent(ids[1], 0, &channelz.TraceEventDesc{
channelz.AddTraceEvent(logger, ids[1], 0, &channelz.TraceEventDesc{
Desc: subChanPickNewAddress,
Severity: channelz.CtINFO,
})

Просмотреть файл

@ -149,7 +149,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
if channelz.IsOn() {
if cc.dopts.channelzParentID != 0 {
cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)
channelz.AddTraceEvent(cc.channelzID, 0, &channelz.TraceEventDesc{
channelz.AddTraceEvent(logger, cc.channelzID, 0, &channelz.TraceEventDesc{
Desc: "Channel Created",
Severity: channelz.CtINFO,
Parent: &channelz.TraceEventDesc{
@ -159,7 +159,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
})
} else {
cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target)
channelz.Info(cc.channelzID, "Channel Created")
channelz.Info(logger, cc.channelzID, "Channel Created")
}
cc.csMgr.channelzID = cc.channelzID
}
@ -245,13 +245,13 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
// Determine the resolver to use.
cc.parsedTarget = grpcutil.ParseTarget(cc.target)
channelz.Infof(cc.channelzID, "parsed scheme: %q", cc.parsedTarget.Scheme)
channelz.Infof(logger, cc.channelzID, "parsed scheme: %q", cc.parsedTarget.Scheme)
resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme)
if resolverBuilder == nil {
// If resolver builder is still nil, the parsed target's scheme is
// not registered. Fallback to default resolver and set Endpoint to
// the original target.
channelz.Infof(cc.channelzID, "scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)
channelz.Infof(logger, cc.channelzID, "scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)
cc.parsedTarget = resolver.Target{
Scheme: resolver.GetDefaultScheme(),
Endpoint: target,
@ -422,7 +422,7 @@ func (csm *connectivityStateManager) updateState(state connectivity.State) {
return
}
csm.state = state
channelz.Infof(csm.channelzID, "Channel Connectivity change to %v", state)
channelz.Infof(logger, csm.channelzID, "Channel Connectivity change to %v", state)
if csm.notifyChan != nil {
// There are other goroutines waiting on this channel.
close(csm.notifyChan)
@ -675,9 +675,9 @@ func (cc *ClientConn) switchBalancer(name string) {
return
}
channelz.Infof(cc.channelzID, "ClientConn switching balancer to %q", name)
channelz.Infof(logger, cc.channelzID, "ClientConn switching balancer to %q", name)
if cc.dopts.balancerBuilder != nil {
channelz.Info(cc.channelzID, "ignoring balancer switching: Balancer DialOption used instead")
channelz.Info(logger, cc.channelzID, "ignoring balancer switching: Balancer DialOption used instead")
return
}
if cc.balancerWrapper != nil {
@ -686,11 +686,11 @@ func (cc *ClientConn) switchBalancer(name string) {
builder := balancer.Get(name)
if builder == nil {
channelz.Warningf(cc.channelzID, "Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName)
channelz.Infof(cc.channelzID, "failed to get balancer builder for: %v, using pick_first instead", name)
channelz.Warningf(logger, cc.channelzID, "Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName)
channelz.Infof(logger, cc.channelzID, "failed to get balancer builder for: %v, using pick_first instead", name)
builder = newPickfirstBuilder()
} else {
channelz.Infof(cc.channelzID, "Channel switches to new LB policy %q", name)
channelz.Infof(logger, cc.channelzID, "Channel switches to new LB policy %q", name)
}
cc.curBalancerName = builder.Name()
@ -731,7 +731,7 @@ func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSub
}
if channelz.IsOn() {
ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "")
channelz.AddTraceEvent(ac.channelzID, 0, &channelz.TraceEventDesc{
channelz.AddTraceEvent(logger, ac.channelzID, 0, &channelz.TraceEventDesc{
Desc: "Subchannel Created",
Severity: channelz.CtINFO,
Parent: &channelz.TraceEventDesc{
@ -829,7 +829,7 @@ func (ac *addrConn) connect() error {
func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
ac.mu.Lock()
defer ac.mu.Unlock()
channelz.Infof(ac.channelzID, "addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)
channelz.Infof(logger, ac.channelzID, "addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)
if ac.state == connectivity.Shutdown ||
ac.state == connectivity.TransientFailure ||
ac.state == connectivity.Idle {
@ -849,7 +849,7 @@ func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
break
}
}
channelz.Infof(ac.channelzID, "addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)
channelz.Infof(logger, ac.channelzID, "addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)
if curAddrFound {
ac.addrs = addrs
}
@ -1020,7 +1020,7 @@ func (cc *ClientConn) Close() error {
Severity: channelz.CtINFO,
}
}
channelz.AddTraceEvent(cc.channelzID, 0, ted)
channelz.AddTraceEvent(logger, cc.channelzID, 0, ted)
// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
// the entity being deleted, and thus prevent it from being deleted right away.
channelz.RemoveEntry(cc.channelzID)
@ -1064,7 +1064,7 @@ func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error)
return
}
ac.state = s
channelz.Infof(ac.channelzID, "Subchannel Connectivity change to %v", s)
channelz.Infof(logger, ac.channelzID, "Subchannel Connectivity change to %v", s)
ac.cc.handleSubConnStateChange(ac.acbw, s, lastErr)
}
@ -1201,7 +1201,7 @@ func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.T
}
ac.mu.Unlock()
channelz.Infof(ac.channelzID, "Subchannel picks a new address %q to connect", addr.Addr)
channelz.Infof(logger, ac.channelzID, "Subchannel picks a new address %q to connect", addr.Addr)
newTr, reconnect, err := ac.createTransport(addr, copts, connectDeadline)
if err == nil {
@ -1276,7 +1276,7 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, onPrefaceReceipt, onGoAway, onClose)
if err != nil {
// newTr is either nil, or closed.
channelz.Warningf(ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v. Err: %v. Reconnecting...", addr, err)
channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v. Err: %v. Reconnecting...", addr, err)
return nil, nil, err
}
@ -1284,7 +1284,7 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
case <-time.After(time.Until(connectDeadline)):
// We didn't get the preface in time.
newTr.Close()
channelz.Warningf(ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
return nil, nil, errors.New("timed out waiting for server handshake")
case <-prefaceReceived:
// We got the preface - huzzah! things are good.
@ -1331,7 +1331,7 @@ func (ac *addrConn) startHealthCheck(ctx context.Context) {
// The health package is not imported to set health check function.
//
// TODO: add a link to the health check doc in the error message.
channelz.Error(ac.channelzID, "Health check is requested but health check function is not set.")
channelz.Error(logger, ac.channelzID, "Health check is requested but health check function is not set.")
return
}
@ -1361,9 +1361,9 @@ func (ac *addrConn) startHealthCheck(ctx context.Context) {
err := ac.cc.dopts.healthCheckFunc(ctx, newStream, setConnectivityState, healthCheckConfig.ServiceName)
if err != nil {
if status.Code(err) == codes.Unimplemented {
channelz.Error(ac.channelzID, "Subchannel health check is unimplemented at server side, thus health check is disabled")
channelz.Error(logger, ac.channelzID, "Subchannel health check is unimplemented at server side, thus health check is disabled")
} else {
channelz.Errorf(ac.channelzID, "HealthCheckFunc exits with unexpected error %v", err)
channelz.Errorf(logger, ac.channelzID, "HealthCheckFunc exits with unexpected error %v", err)
}
}
}()
@ -1428,7 +1428,7 @@ func (ac *addrConn) tearDown(err error) {
ac.mu.Lock()
}
if channelz.IsOn() {
channelz.AddTraceEvent(ac.channelzID, 0, &channelz.TraceEventDesc{
channelz.AddTraceEvent(logger, ac.channelzID, 0, &channelz.TraceEventDesc{
Desc: "Subchannel Deleted",
Severity: channelz.CtINFO,
Parent: &channelz.TraceEventDesc{

Просмотреть файл

@ -67,6 +67,7 @@ var (
// ServerHandshake is running on a platform where the trustworthiness of
// the handshaker service is not guaranteed.
ErrUntrustedPlatform = errors.New("ALTS: untrusted platform. ALTS is only supported on GCP")
logger = grpclog.Component("alts")
)
// AuthInfo exposes security information from the ALTS handshake to the
@ -307,7 +308,7 @@ func compareRPCVersions(v1, v2 *altspb.RpcProtocolVersions_Version) int {
// agreed on.
func checkRPCVersions(local, peer *altspb.RpcProtocolVersions) (bool, *altspb.RpcProtocolVersions_Version) {
if local == nil || peer == nil {
grpclog.Error("invalid checkRPCVersions argument, either local or peer is nil.")
logger.Error("invalid checkRPCVersions argument, either local or peer is nil.")
return false, nil
}

Просмотреть файл

@ -27,7 +27,6 @@ import (
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
internalbackoff "google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/envconfig"
@ -423,7 +422,7 @@ func WithUserAgent(s string) DialOption {
// for the client transport.
func WithKeepaliveParams(kp keepalive.ClientParameters) DialOption {
if kp.Time < internal.KeepaliveMinPingTime {
grpclog.Warningf("Adjusting keepalive ping interval to minimum period of %v", internal.KeepaliveMinPingTime)
logger.Warningf("Adjusting keepalive ping interval to minimum period of %v", internal.KeepaliveMinPingTime)
kp.Time = internal.KeepaliveMinPingTime
}
return newFuncDialOption(func(o *dialOptions) {

117
grpclog/component.go Normal file
Просмотреть файл

@ -0,0 +1,117 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package grpclog
import (
"fmt"
"google.golang.org/grpc/internal/grpclog"
)
// componentData records the settings for a component.
type componentData struct {
name string
}
var cache = map[string]*componentData{}
func (c *componentData) InfoDepth(depth int, args ...interface{}) {
args = append([]interface{}{"[" + string(c.name) + "]"}, args...)
grpclog.InfoDepth(depth+1, args...)
}
func (c *componentData) WarningDepth(depth int, args ...interface{}) {
args = append([]interface{}{"[" + string(c.name) + "]"}, args...)
grpclog.WarningDepth(depth+1, args...)
}
func (c *componentData) ErrorDepth(depth int, args ...interface{}) {
args = append([]interface{}{"[" + string(c.name) + "]"}, args...)
grpclog.ErrorDepth(depth+1, args...)
}
func (c *componentData) FatalDepth(depth int, args ...interface{}) {
args = append([]interface{}{"[" + string(c.name) + "]"}, args...)
grpclog.FatalDepth(depth+1, args...)
}
func (c *componentData) Info(args ...interface{}) {
c.InfoDepth(1, args...)
}
func (c *componentData) Warning(args ...interface{}) {
c.WarningDepth(1, args...)
}
func (c *componentData) Error(args ...interface{}) {
c.ErrorDepth(1, args...)
}
func (c *componentData) Fatal(args ...interface{}) {
c.FatalDepth(1, args...)
}
func (c *componentData) Infof(format string, args ...interface{}) {
c.InfoDepth(1, fmt.Sprintf(format, args...))
}
func (c *componentData) Warningf(format string, args ...interface{}) {
c.WarningDepth(1, fmt.Sprintf(format, args...))
}
func (c *componentData) Errorf(format string, args ...interface{}) {
c.ErrorDepth(1, fmt.Sprintf(format, args...))
}
func (c *componentData) Fatalf(format string, args ...interface{}) {
c.FatalDepth(1, fmt.Sprintf(format, args...))
}
func (c *componentData) Infoln(args ...interface{}) {
c.InfoDepth(1, args...)
}
func (c *componentData) Warningln(args ...interface{}) {
c.WarningDepth(1, args...)
}
func (c *componentData) Errorln(args ...interface{}) {
c.ErrorDepth(1, args...)
}
func (c *componentData) Fatalln(args ...interface{}) {
c.FatalDepth(1, args...)
}
func (c *componentData) V(l int) bool {
return grpclog.Logger.V(l)
}
// Component creates a new component and returns it for logging. If a component
// with the name already exists, nothing will be created and it will be
// returned. SetLoggerV2 will panic if it is called with a logger created by
// Component.
func Component(componentName string) DepthLoggerV2 {
if cData, ok := cache[componentName]; ok {
return cData
}
c := &componentData{componentName}
cache[componentName] = c
return c
}

Просмотреть файл

@ -27,6 +27,8 @@ import (
"google.golang.org/grpc/grpclog"
)
const d = 2
func init() {
grpclog.SetLoggerV2(&glogger{})
}
@ -34,67 +36,67 @@ func init() {
type glogger struct{}
func (g *glogger) Info(args ...interface{}) {
glog.InfoDepth(2, args...)
glog.InfoDepth(d, args...)
}
func (g *glogger) Infoln(args ...interface{}) {
glog.InfoDepth(2, fmt.Sprintln(args...))
glog.InfoDepth(d, fmt.Sprintln(args...))
}
func (g *glogger) Infof(format string, args ...interface{}) {
glog.InfoDepth(2, fmt.Sprintf(format, args...))
glog.InfoDepth(d, fmt.Sprintf(format, args...))
}
func (g *glogger) InfoDepth(depth int, args ...interface{}) {
glog.InfoDepth(depth+2, args...)
glog.InfoDepth(depth+d, args...)
}
func (g *glogger) Warning(args ...interface{}) {
glog.WarningDepth(2, args...)
glog.WarningDepth(d, args...)
}
func (g *glogger) Warningln(args ...interface{}) {
glog.WarningDepth(2, fmt.Sprintln(args...))
glog.WarningDepth(d, fmt.Sprintln(args...))
}
func (g *glogger) Warningf(format string, args ...interface{}) {
glog.WarningDepth(2, fmt.Sprintf(format, args...))
glog.WarningDepth(d, fmt.Sprintf(format, args...))
}
func (g *glogger) WarningDepth(depth int, args ...interface{}) {
glog.WarningDepth(depth+2, args...)
glog.WarningDepth(depth+d, args...)
}
func (g *glogger) Error(args ...interface{}) {
glog.ErrorDepth(2, args...)
glog.ErrorDepth(d, args...)
}
func (g *glogger) Errorln(args ...interface{}) {
glog.ErrorDepth(2, fmt.Sprintln(args...))
glog.ErrorDepth(d, fmt.Sprintln(args...))
}
func (g *glogger) Errorf(format string, args ...interface{}) {
glog.ErrorDepth(2, fmt.Sprintf(format, args...))
glog.ErrorDepth(d, fmt.Sprintf(format, args...))
}
func (g *glogger) ErrorDepth(depth int, args ...interface{}) {
glog.ErrorDepth(depth+2, args...)
glog.ErrorDepth(depth+d, args...)
}
func (g *glogger) Fatal(args ...interface{}) {
glog.FatalDepth(2, args...)
glog.FatalDepth(d, args...)
}
func (g *glogger) Fatalln(args ...interface{}) {
glog.FatalDepth(2, fmt.Sprintln(args...))
glog.FatalDepth(d, fmt.Sprintln(args...))
}
func (g *glogger) Fatalf(format string, args ...interface{}) {
glog.FatalDepth(2, fmt.Sprintf(format, args...))
glog.FatalDepth(d, fmt.Sprintf(format, args...))
}
func (g *glogger) FatalDepth(depth int, args ...interface{}) {
glog.FatalDepth(depth+2, args...)
glog.FatalDepth(depth+d, args...)
}
func (g *glogger) V(l int) bool {

Просмотреть файл

@ -67,6 +67,9 @@ type LoggerV2 interface {
// SetLoggerV2 sets logger that is used in grpc to a V2 logger.
// Not mutex-protected, should be called before any gRPC functions.
func SetLoggerV2(l LoggerV2) {
if _, ok := l.(*componentData); ok {
panic("cannot use component logger as grpclog logger")
}
grpclog.Logger = l
grpclog.DepthLogger, _ = l.(grpclog.DepthLoggerV2)
}
@ -203,6 +206,7 @@ func (g *loggerT) V(l int) bool {
//
// This API is EXPERIMENTAL.
type DepthLoggerV2 interface {
LoggerV2
// InfoDepth logs to INFO log at the specified depth. Arguments are handled in the manner of fmt.Print.
InfoDepth(depth int, args ...interface{})
// WarningDepth logs to WARNING log at the specified depth. Arguments are handled in the manner of fmt.Print.

Просмотреть файл

@ -40,6 +40,8 @@ type Logger interface {
// It is used to get a methodLogger for each individual method.
var binLogger Logger
var grpclogLogger = grpclog.Component("binarylog")
// SetLogger sets the binarg logger.
//
// Only call this at init time.
@ -149,7 +151,7 @@ func (l *logger) setBlacklist(method string) error {
func (l *logger) getMethodLogger(methodName string) *MethodLogger {
s, m, err := grpcutil.ParseMethod(methodName)
if err != nil {
grpclog.Infof("binarylogging: failed to parse %q: %v", methodName, err)
grpclogLogger.Infof("binarylogging: failed to parse %q: %v", methodName, err)
return nil
}
if ml, ok := l.methods[s+"/"+m]; ok {

Просмотреть файл

@ -24,8 +24,6 @@ import (
"regexp"
"strconv"
"strings"
"google.golang.org/grpc/grpclog"
)
// NewLoggerFromConfigString reads the string and build a logger. It can be used
@ -52,7 +50,7 @@ func NewLoggerFromConfigString(s string) Logger {
methods := strings.Split(s, ",")
for _, method := range methods {
if err := l.fillMethodLoggerWithConfigString(method); err != nil {
grpclog.Warningf("failed to parse binary log config: %v", err)
grpclogLogger.Warningf("failed to parse binary log config: %v", err)
return nil
}
}

Просмотреть файл

@ -27,7 +27,6 @@ import (
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
pb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)
@ -219,12 +218,12 @@ func (c *ClientMessage) toProto() *pb.GrpcLogEntry {
if m, ok := c.Message.(proto.Message); ok {
data, err = proto.Marshal(m)
if err != nil {
grpclog.Infof("binarylogging: failed to marshal proto message: %v", err)
grpclogLogger.Infof("binarylogging: failed to marshal proto message: %v", err)
}
} else if b, ok := c.Message.([]byte); ok {
data = b
} else {
grpclog.Infof("binarylogging: message to log is neither proto.message nor []byte")
grpclogLogger.Infof("binarylogging: message to log is neither proto.message nor []byte")
}
ret := &pb.GrpcLogEntry{
Type: pb.GrpcLogEntry_EVENT_TYPE_CLIENT_MESSAGE,
@ -259,12 +258,12 @@ func (c *ServerMessage) toProto() *pb.GrpcLogEntry {
if m, ok := c.Message.(proto.Message); ok {
data, err = proto.Marshal(m)
if err != nil {
grpclog.Infof("binarylogging: failed to marshal proto message: %v", err)
grpclogLogger.Infof("binarylogging: failed to marshal proto message: %v", err)
}
} else if b, ok := c.Message.([]byte); ok {
data = b
} else {
grpclog.Infof("binarylogging: message to log is neither proto.message nor []byte")
grpclogLogger.Infof("binarylogging: message to log is neither proto.message nor []byte")
}
ret := &pb.GrpcLogEntry{
Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_MESSAGE,
@ -315,7 +314,7 @@ type ServerTrailer struct {
func (c *ServerTrailer) toProto() *pb.GrpcLogEntry {
st, ok := status.FromError(c.Err)
if !ok {
grpclog.Info("binarylogging: error in trailer is not a status error")
grpclogLogger.Info("binarylogging: error in trailer is not a status error")
}
var (
detailsBytes []byte
@ -325,7 +324,7 @@ func (c *ServerTrailer) toProto() *pb.GrpcLogEntry {
if stProto != nil && len(stProto.Details) != 0 {
detailsBytes, err = proto.Marshal(stProto)
if err != nil {
grpclog.Infof("binarylogging: failed to marshal status proto: %v", err)
grpclogLogger.Infof("binarylogging: failed to marshal status proto: %v", err)
}
}
ret := &pb.GrpcLogEntry{

Просмотреть файл

@ -29,7 +29,6 @@ import (
"github.com/golang/protobuf/proto"
pb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
"google.golang.org/grpc/grpclog"
)
var (
@ -78,7 +77,7 @@ type writerSink struct {
func (ws *writerSink) Write(e *pb.GrpcLogEntry) error {
b, err := proto.Marshal(e)
if err != nil {
grpclog.Infof("binary logging: failed to marshal proto message: %v", err)
grpclogLogger.Infof("binary logging: failed to marshal proto message: %v", err)
}
hdr := make([]byte, 4)
binary.BigEndian.PutUint32(hdr, uint32(len(b)))

Просмотреть файл

@ -30,7 +30,7 @@ import (
"sync/atomic"
"time"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/grpclog"
)
const (
@ -216,7 +216,7 @@ func RegisterChannel(c Channel, pid int64, ref string) int64 {
// by pid). It returns the unique channelz tracking id assigned to this subchannel.
func RegisterSubChannel(c Channel, pid int64, ref string) int64 {
if pid == 0 {
grpclog.ErrorDepth(0, "a SubChannel's parent id cannot be 0")
logger.Error("a SubChannel's parent id cannot be 0")
return 0
}
id := idGen.genID()
@ -253,7 +253,7 @@ func RegisterServer(s Server, ref string) int64 {
// this listen socket.
func RegisterListenSocket(s Socket, pid int64, ref string) int64 {
if pid == 0 {
grpclog.ErrorDepth(0, "a ListenSocket's parent id cannot be 0")
logger.Error("a ListenSocket's parent id cannot be 0")
return 0
}
id := idGen.genID()
@ -268,7 +268,7 @@ func RegisterListenSocket(s Socket, pid int64, ref string) int64 {
// this normal socket.
func RegisterNormalSocket(s Socket, pid int64, ref string) int64 {
if pid == 0 {
grpclog.ErrorDepth(0, "a NormalSocket's parent id cannot be 0")
logger.Error("a NormalSocket's parent id cannot be 0")
return 0
}
id := idGen.genID()
@ -294,17 +294,17 @@ type TraceEventDesc struct {
}
// AddTraceEvent adds trace related to the entity with specified id, using the provided TraceEventDesc.
func AddTraceEvent(id int64, depth int, desc *TraceEventDesc) {
func AddTraceEvent(l grpclog.DepthLoggerV2, id int64, depth int, desc *TraceEventDesc) {
for d := desc; d != nil; d = d.Parent {
switch d.Severity {
case CtUNKNOWN:
grpclog.InfoDepth(depth+1, d.Desc)
l.InfoDepth(depth+1, d.Desc)
case CtINFO:
grpclog.InfoDepth(depth+1, d.Desc)
l.InfoDepth(depth+1, d.Desc)
case CtWarning:
grpclog.WarningDepth(depth+1, d.Desc)
l.WarningDepth(depth+1, d.Desc)
case CtError:
grpclog.ErrorDepth(depth+1, d.Desc)
l.ErrorDepth(depth+1, d.Desc)
}
}
if getMaxTraceEntry() == 0 {

Просмотреть файл

@ -21,80 +21,82 @@ package channelz
import (
"fmt"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/grpclog"
)
// Info logs through grpclog.Info and adds a trace event if channelz is on.
func Info(id int64, args ...interface{}) {
var logger = grpclog.Component("channelz")
// Info logs and adds a trace event if channelz is on.
func Info(l grpclog.DepthLoggerV2, id int64, args ...interface{}) {
if IsOn() {
AddTraceEvent(id, 1, &TraceEventDesc{
AddTraceEvent(l, id, 1, &TraceEventDesc{
Desc: fmt.Sprint(args...),
Severity: CtINFO,
})
} else {
grpclog.InfoDepth(1, args...)
l.InfoDepth(1, args...)
}
}
// Infof logs through grpclog.Infof and adds a trace event if channelz is on.
func Infof(id int64, format string, args ...interface{}) {
// Infof logs and adds a trace event if channelz is on.
func Infof(l grpclog.DepthLoggerV2, id int64, format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
if IsOn() {
AddTraceEvent(id, 1, &TraceEventDesc{
AddTraceEvent(l, id, 1, &TraceEventDesc{
Desc: msg,
Severity: CtINFO,
})
} else {
grpclog.InfoDepth(1, msg)
l.InfoDepth(1, msg)
}
}
// Warning logs through grpclog.Warning and adds a trace event if channelz is on.
func Warning(id int64, args ...interface{}) {
// Warning logs and adds a trace event if channelz is on.
func Warning(l grpclog.DepthLoggerV2, id int64, args ...interface{}) {
if IsOn() {
AddTraceEvent(id, 1, &TraceEventDesc{
AddTraceEvent(l, id, 1, &TraceEventDesc{
Desc: fmt.Sprint(args...),
Severity: CtWarning,
})
} else {
grpclog.WarningDepth(1, args...)
l.WarningDepth(1, args...)
}
}
// Warningf logs through grpclog.Warningf and adds a trace event if channelz is on.
func Warningf(id int64, format string, args ...interface{}) {
// Warningf logs and adds a trace event if channelz is on.
func Warningf(l grpclog.DepthLoggerV2, id int64, format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
if IsOn() {
AddTraceEvent(id, 1, &TraceEventDesc{
AddTraceEvent(l, id, 1, &TraceEventDesc{
Desc: msg,
Severity: CtWarning,
})
} else {
grpclog.WarningDepth(1, msg)
l.WarningDepth(1, msg)
}
}
// Error logs through grpclog.Error and adds a trace event if channelz is on.
func Error(id int64, args ...interface{}) {
// Error logs and adds a trace event if channelz is on.
func Error(l grpclog.DepthLoggerV2, id int64, args ...interface{}) {
if IsOn() {
AddTraceEvent(id, 1, &TraceEventDesc{
AddTraceEvent(l, id, 1, &TraceEventDesc{
Desc: fmt.Sprint(args...),
Severity: CtError,
})
} else {
grpclog.ErrorDepth(1, args...)
l.ErrorDepth(1, args...)
}
}
// Errorf logs through grpclog.Errorf and adds a trace event if channelz is on.
func Errorf(id int64, format string, args ...interface{}) {
// Errorf logs and adds a trace event if channelz is on.
func Errorf(l grpclog.DepthLoggerV2, id int64, format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
if IsOn() {
AddTraceEvent(id, 1, &TraceEventDesc{
AddTraceEvent(l, id, 1, &TraceEventDesc{
Desc: msg,
Severity: CtError,
})
} else {
grpclog.ErrorDepth(1, msg)
l.ErrorDepth(1, msg)
}
}

Просмотреть файл

@ -26,7 +26,6 @@ import (
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
)
// entry represents a node in the channelz database.
@ -60,17 +59,17 @@ func (d *dummyEntry) addChild(id int64, e entry) {
// the addrConn will create a new transport. And when registering the new transport in
// channelz, its parent addrConn could have already been torn down and deleted
// from channelz tracking, and thus reach the code here.
grpclog.Infof("attempt to add child of type %T with id %d to a parent (id=%d) that doesn't currently exist", e, id, d.idNotFound)
logger.Infof("attempt to add child of type %T with id %d to a parent (id=%d) that doesn't currently exist", e, id, d.idNotFound)
}
func (d *dummyEntry) deleteChild(id int64) {
// It is possible for a normal program to reach here under race condition.
// Refer to the example described in addChild().
grpclog.Infof("attempt to delete child with id %d from a parent (id=%d) that doesn't currently exist", id, d.idNotFound)
logger.Infof("attempt to delete child with id %d from a parent (id=%d) that doesn't currently exist", id, d.idNotFound)
}
func (d *dummyEntry) triggerDelete() {
grpclog.Warningf("attempt to delete an entry (id=%d) that doesn't currently exist", d.idNotFound)
logger.Warningf("attempt to delete an entry (id=%d) that doesn't currently exist", d.idNotFound)
}
func (*dummyEntry) deleteSelfIfReady() {
@ -215,7 +214,7 @@ func (c *channel) addChild(id int64, e entry) {
case *channel:
c.nestedChans[id] = v.refName
default:
grpclog.Errorf("cannot add a child (id = %d) of type %T to a channel", id, e)
logger.Errorf("cannot add a child (id = %d) of type %T to a channel", id, e)
}
}
@ -326,7 +325,7 @@ func (sc *subChannel) addChild(id int64, e entry) {
if v, ok := e.(*normalSocket); ok {
sc.sockets[id] = v.refName
} else {
grpclog.Errorf("cannot add a child (id = %d) of type %T to a subChannel", id, e)
logger.Errorf("cannot add a child (id = %d) of type %T to a subChannel", id, e)
}
}
@ -493,11 +492,11 @@ type listenSocket struct {
}
func (ls *listenSocket) addChild(id int64, e entry) {
grpclog.Errorf("cannot add a child (id = %d) of type %T to a listen socket", id, e)
logger.Errorf("cannot add a child (id = %d) of type %T to a listen socket", id, e)
}
func (ls *listenSocket) deleteChild(id int64) {
grpclog.Errorf("cannot delete a child (id = %d) from a listen socket", id)
logger.Errorf("cannot delete a child (id = %d) from a listen socket", id)
}
func (ls *listenSocket) triggerDelete() {
@ -506,7 +505,7 @@ func (ls *listenSocket) triggerDelete() {
}
func (ls *listenSocket) deleteSelfIfReady() {
grpclog.Errorf("cannot call deleteSelfIfReady on a listen socket")
logger.Errorf("cannot call deleteSelfIfReady on a listen socket")
}
func (ls *listenSocket) getParentID() int64 {
@ -522,11 +521,11 @@ type normalSocket struct {
}
func (ns *normalSocket) addChild(id int64, e entry) {
grpclog.Errorf("cannot add a child (id = %d) of type %T to a normal socket", id, e)
logger.Errorf("cannot add a child (id = %d) of type %T to a normal socket", id, e)
}
func (ns *normalSocket) deleteChild(id int64) {
grpclog.Errorf("cannot delete a child (id = %d) from a normal socket", id)
logger.Errorf("cannot delete a child (id = %d) from a normal socket", id)
}
func (ns *normalSocket) triggerDelete() {
@ -535,7 +534,7 @@ func (ns *normalSocket) triggerDelete() {
}
func (ns *normalSocket) deleteSelfIfReady() {
grpclog.Errorf("cannot call deleteSelfIfReady on a normal socket")
logger.Errorf("cannot call deleteSelfIfReady on a normal socket")
}
func (ns *normalSocket) getParentID() int64 {
@ -594,7 +593,7 @@ func (s *server) addChild(id int64, e entry) {
case *listenSocket:
s.listenSockets[id] = v.refName
default:
grpclog.Errorf("cannot add a child (id = %d) of type %T to a server", id, e)
logger.Errorf("cannot add a child (id = %d) of type %T to a server", id, e)
}
}

Просмотреть файл

@ -22,8 +22,6 @@ package channelz
import (
"sync"
"google.golang.org/grpc/grpclog"
)
var once sync.Once
@ -39,6 +37,6 @@ type SocketOptionData struct {
// Windows OS doesn't support Socket Option
func (s *SocketOptionData) Getsockopt(fd uintptr) {
once.Do(func() {
grpclog.Warningln("Channelz: socket options are not supported on non-linux os and appengine.")
logger.Warning("Channelz: socket options are not supported on non-linux os and appengine.")
})
}

Просмотреть файл

@ -19,6 +19,10 @@
// Package grpclog (internal) defines depth logging for grpc.
package grpclog
import (
"os"
)
// Logger is the logger used for the non-depth log functions.
var Logger LoggerV2
@ -30,7 +34,7 @@ func InfoDepth(depth int, args ...interface{}) {
if DepthLogger != nil {
DepthLogger.InfoDepth(depth, args...)
} else {
Logger.Info(args...)
Logger.Infoln(args...)
}
}
@ -39,7 +43,7 @@ func WarningDepth(depth int, args ...interface{}) {
if DepthLogger != nil {
DepthLogger.WarningDepth(depth, args...)
} else {
Logger.Warning(args...)
Logger.Warningln(args...)
}
}
@ -48,7 +52,7 @@ func ErrorDepth(depth int, args ...interface{}) {
if DepthLogger != nil {
DepthLogger.ErrorDepth(depth, args...)
} else {
Logger.Error(args...)
Logger.Errorln(args...)
}
}
@ -57,8 +61,9 @@ func FatalDepth(depth int, args ...interface{}) {
if DepthLogger != nil {
DepthLogger.FatalDepth(depth, args...)
} else {
Logger.Fatal(args...)
Logger.Fatalln(args...)
}
os.Exit(1)
}
// LoggerV2 does underlying logging work for grpclog.

Просмотреть файл

@ -44,6 +44,8 @@ import (
// addresses from SRV records. Must not be changed after init time.
var EnableSRVLookups = false
var logger = grpclog.Component("dns")
func init() {
resolver.Register(NewBuilder())
}
@ -272,7 +274,7 @@ func handleDNSError(err error, lookupType string) error {
err = filterError(err)
if err != nil {
err = fmt.Errorf("dns: %v record lookup error: %v", lookupType, err)
grpclog.Infoln(err)
logger.Info(err)
}
return err
}
@ -295,7 +297,7 @@ func (d *dnsResolver) lookupTXT() *serviceconfig.ParseResult {
// TXT record must have "grpc_config=" attribute in order to be used as service config.
if !strings.HasPrefix(res, txtAttribute) {
grpclog.Warningf("dns: TXT record %v missing %v attribute", res, txtAttribute)
logger.Warningf("dns: TXT record %v missing %v attribute", res, txtAttribute)
// This is not an error; it is the equivalent of not having a service config.
return nil
}
@ -421,12 +423,12 @@ func canaryingSC(js string) string {
var rcs []rawChoice
err := json.Unmarshal([]byte(js), &rcs)
if err != nil {
grpclog.Warningf("dns: error parsing service config json: %v", err)
logger.Warningf("dns: error parsing service config json: %v", err)
return ""
}
cliHostname, err := os.Hostname()
if err != nil {
grpclog.Warningf("dns: error getting client hostname: %v", err)
logger.Warningf("dns: error getting client hostname: %v", err)
return ""
}
var sc string

Просмотреть файл

@ -32,11 +32,13 @@ import (
"google.golang.org/grpc/grpclog"
)
var logger = grpclog.Component("core")
// GetCPUTime returns the how much CPU time has passed since the start of this process.
func GetCPUTime() int64 {
var ts unix.Timespec
if err := unix.ClockGettime(unix.CLOCK_PROCESS_CPUTIME_ID, &ts); err != nil {
grpclog.Fatal(err)
logger.Fatal(err)
}
return ts.Nano()
}

Просмотреть файл

@ -31,10 +31,11 @@ import (
)
var once sync.Once
var logger = grpclog.Component("core")
func log() {
once.Do(func() {
grpclog.Info("CPU time info is unavailable on non-linux or appengine environment.")
logger.Info("CPU time info is unavailable on non-linux or appengine environment.")
})
}

Просмотреть файл

@ -505,7 +505,9 @@ func (l *loopyWriter) run() (err error) {
// 1. When the connection is closed by some other known issue.
// 2. User closed the connection.
// 3. A graceful close of connection.
infof("transport: loopyWriter.run returning. %v", err)
if logger.V(logLevel) {
logger.Infof("transport: loopyWriter.run returning. %v", err)
}
err = nil
}
}()
@ -605,7 +607,9 @@ func (l *loopyWriter) headerHandler(h *headerFrame) error {
if l.side == serverSide {
str, ok := l.estdStreams[h.streamID]
if !ok {
warningf("transport: loopy doesn't recognize the stream: %d", h.streamID)
if logger.V(logLevel) {
logger.Warningf("transport: loopy doesn't recognize the stream: %d", h.streamID)
}
return nil
}
// Case 1.A: Server is responding back with headers.
@ -658,7 +662,9 @@ func (l *loopyWriter) writeHeader(streamID uint32, endStream bool, hf []hpack.He
l.hBuf.Reset()
for _, f := range hf {
if err := l.hEnc.WriteField(f); err != nil {
warningf("transport: loopyWriter.writeHeader encountered error while encoding headers:", err)
if logger.V(logLevel) {
logger.Warningf("transport: loopyWriter.writeHeader encountered error while encoding headers: %v", err)
}
}
}
var (

Просмотреть файл

@ -354,7 +354,9 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst)
err := t.loopy.run()
if err != nil {
errorf("transport: loopyWriter.run returning. Err: %v", err)
if logger.V(logLevel) {
logger.Errorf("transport: loopyWriter.run returning. Err: %v", err)
}
}
// If it's a connection error, let reader goroutine handle it
// since there might be data in the buffers.
@ -1013,7 +1015,9 @@ func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
}
statusCode, ok := http2ErrConvTab[f.ErrCode]
if !ok {
warningf("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error %v", f.ErrCode)
if logger.V(logLevel) {
logger.Warningf("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error %v", f.ErrCode)
}
statusCode = codes.Unknown
}
if statusCode == codes.Canceled {
@ -1095,7 +1099,9 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
return
}
if f.ErrCode == http2.ErrCodeEnhanceYourCalm {
infof("Client received GoAway with http2.ErrCodeEnhanceYourCalm.")
if logger.V(logLevel) {
logger.Infof("Client received GoAway with http2.ErrCodeEnhanceYourCalm.")
}
}
id := f.LastStreamID
if id > 0 && id%2 != 1 {
@ -1325,7 +1331,9 @@ func (t *http2Client) reader() {
case *http2.WindowUpdateFrame:
t.handleWindowUpdate(frame)
default:
errorf("transport: http2Client.reader got unhandled frame type %v.", frame)
if logger.V(logLevel) {
logger.Errorf("transport: http2Client.reader got unhandled frame type %v.", frame)
}
}
}
}

Просмотреть файл

@ -37,7 +37,6 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/keepalive"
@ -289,7 +288,9 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
if err := t.loopy.run(); err != nil {
errorf("transport: loopyWriter.run returning. Err: %v", err)
if logger.V(logLevel) {
logger.Errorf("transport: loopyWriter.run returning. Err: %v", err)
}
}
t.conn.Close()
close(t.writerDone)
@ -360,7 +361,9 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
}
s.ctx, err = t.inTapHandle(s.ctx, info)
if err != nil {
warningf("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err)
if logger.V(logLevel) {
logger.Warningf("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err)
}
t.controlBuf.put(&cleanupStream{
streamID: s.id,
rst: true,
@ -391,7 +394,9 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
if streamID%2 != 1 || streamID <= t.maxStreamID {
t.mu.Unlock()
// illegal gRPC stream id.
errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
if logger.V(logLevel) {
logger.Errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
}
s.cancel()
return true
}
@ -454,7 +459,9 @@ func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.
atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
if err != nil {
if se, ok := err.(http2.StreamError); ok {
warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se)
if logger.V(logLevel) {
logger.Warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se)
}
t.mu.Lock()
s := t.activeStreams[se.StreamID]
t.mu.Unlock()
@ -474,7 +481,9 @@ func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.
t.Close()
return
}
warningf("transport: http2Server.HandleStreams failed to read frame: %v", err)
if logger.V(logLevel) {
logger.Warningf("transport: http2Server.HandleStreams failed to read frame: %v", err)
}
t.Close()
return
}
@ -497,7 +506,9 @@ func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.
case *http2.GoAwayFrame:
// TODO: Handle GoAway from the client appropriately.
default:
errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
if logger.V(logLevel) {
logger.Errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
}
}
}
}
@ -719,7 +730,9 @@ func (t *http2Server) handlePing(f *http2.PingFrame) {
if t.pingStrikes > maxPingStrikes {
// Send goaway and close the connection.
errorf("transport: Got too many pings from the client, closing the connection.")
if logger.V(logLevel) {
logger.Errorf("transport: Got too many pings from the client, closing the connection.")
}
t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
}
}
@ -752,7 +765,9 @@ func (t *http2Server) checkForHeaderListSize(it interface{}) bool {
var sz int64
for _, f := range hdrFrame.hf {
if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
errorf("header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize)
if logger.V(logLevel) {
logger.Errorf("header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize)
}
return false
}
}
@ -849,7 +864,7 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
stBytes, err := proto.Marshal(p)
if err != nil {
// TODO: return error instead, when callers are able to handle it.
grpclog.Errorf("transport: failed to marshal rpc status: %v, error: %v", p, err)
logger.Errorf("transport: failed to marshal rpc status: %v, error: %v", p, err)
} else {
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)})
}
@ -980,7 +995,9 @@ func (t *http2Server) keepalive() {
select {
case <-ageTimer.C:
// Close the connection after grace period.
infof("transport: closing server transport due to maximum connection age.")
if logger.V(logLevel) {
logger.Infof("transport: closing server transport due to maximum connection age.")
}
t.Close()
case <-t.done:
}
@ -997,7 +1014,9 @@ func (t *http2Server) keepalive() {
continue
}
if outstandingPing && kpTimeoutLeft <= 0 {
infof("transport: closing server transport due to idleness.")
if logger.V(logLevel) {
logger.Infof("transport: closing server transport due to idleness.")
}
t.Close()
return
}

Просмотреть файл

@ -37,6 +37,7 @@ import (
"golang.org/x/net/http2/hpack"
spb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/status"
)
@ -97,6 +98,7 @@ var (
// 504 Gateway timeout - UNAVAILABLE.
http.StatusGatewayTimeout: codes.Unavailable,
}
logger = grpclog.Component("transport")
)
type parsedHeaderData struct {
@ -412,7 +414,9 @@ func (d *decodeState) processHeaderField(f hpack.HeaderField) {
}
v, err := decodeMetadataHeader(f.Name, f.Value)
if err != nil {
errorf("Failed to decode metadata header (%q, %q): %v", f.Name, f.Value, err)
if logger.V(logLevel) {
logger.Errorf("Failed to decode metadata header (%q, %q): %v", f.Name, f.Value, err)
}
return
}
d.addMetadata(f.Name, v)

Просмотреть файл

@ -1,44 +0,0 @@
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// This file contains wrappers for grpclog functions.
// The transport package only logs to verbose level 2 by default.
package transport
import "google.golang.org/grpc/grpclog"
const logLevel = 2
func infof(format string, args ...interface{}) {
if grpclog.V(logLevel) {
grpclog.Infof(format, args...)
}
}
func warningf(format string, args ...interface{}) {
if grpclog.V(logLevel) {
grpclog.Warningf(format, args...)
}
}
func errorf(format string, args ...interface{}) {
if grpclog.V(logLevel) {
grpclog.Errorf(format, args...)
}
}

Просмотреть файл

@ -41,6 +41,8 @@ import (
"google.golang.org/grpc/tap"
)
const logLevel = 2
type bufferPool struct {
pool sync.Pool
}

Просмотреть файл

@ -25,7 +25,6 @@ import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/status"
@ -145,7 +144,7 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.
acw, ok := pickResult.SubConn.(*acBalancerWrapper)
if !ok {
grpclog.Error("subconn returned from pick is not *acBalancerWrapper")
logger.Error("subconn returned from pick is not *acBalancerWrapper")
continue
}
if t, ok := acw.getAddrConn().getReadyTransport(); ok {
@ -159,7 +158,7 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.
// DoneInfo with default value works.
pickResult.Done(balancer.DoneInfo{})
}
grpclog.Infof("blockingPicker: the picked transport is not ready, loop back to repick")
logger.Infof("blockingPicker: the picked transport is not ready, loop back to repick")
// If ok == false, ac.state is not READY.
// A valid picker always returns READY subConn. This means the state of ac
// just changed, and picker will be updated shortly.

Просмотреть файл

@ -24,7 +24,6 @@ import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
)
// PickFirstBalancerName is the name of the pick_first balancer.
@ -58,8 +57,8 @@ func (b *pickfirstBalancer) ResolverError(err error) {
Picker: &picker{err: fmt.Errorf("name resolver error: %v", err)},
})
}
if grpclog.V(2) {
grpclog.Infof("pickfirstBalancer: ResolverError called with error %v", err)
if logger.V(2) {
logger.Infof("pickfirstBalancer: ResolverError called with error %v", err)
}
}
@ -72,8 +71,8 @@ func (b *pickfirstBalancer) UpdateClientConnState(cs balancer.ClientConnState) e
var err error
b.sc, err = b.cc.NewSubConn(cs.ResolverState.Addresses, balancer.NewSubConnOptions{})
if err != nil {
if grpclog.V(2) {
grpclog.Errorf("pickfirstBalancer: failed to NewSubConn: %v", err)
if logger.V(2) {
logger.Errorf("pickfirstBalancer: failed to NewSubConn: %v", err)
}
b.state = connectivity.TransientFailure
b.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure,
@ -92,12 +91,12 @@ func (b *pickfirstBalancer) UpdateClientConnState(cs balancer.ClientConnState) e
}
func (b *pickfirstBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) {
if grpclog.V(2) {
grpclog.Infof("pickfirstBalancer: UpdateSubConnState: %p, %v", sc, s)
if logger.V(2) {
logger.Infof("pickfirstBalancer: UpdateSubConnState: %p, %v", sc, s)
}
if b.sc != sc {
if grpclog.V(2) {
grpclog.Infof("pickfirstBalancer: ignored state change because sc is not recognized")
if logger.V(2) {
logger.Infof("pickfirstBalancer: ignored state change because sc is not recognized")
}
return
}

Просмотреть файл

@ -140,7 +140,7 @@ func (ccr *ccResolverWrapper) UpdateState(s resolver.State) {
if ccr.done.HasFired() {
return
}
channelz.Infof(ccr.cc.channelzID, "ccResolverWrapper: sending update to cc: %v", s)
channelz.Infof(logger, ccr.cc.channelzID, "ccResolverWrapper: sending update to cc: %v", s)
if channelz.IsOn() {
ccr.addChannelzTraceEvent(s)
}
@ -152,7 +152,7 @@ func (ccr *ccResolverWrapper) ReportError(err error) {
if ccr.done.HasFired() {
return
}
channelz.Warningf(ccr.cc.channelzID, "ccResolverWrapper: reporting error to cc: %v", err)
channelz.Warningf(logger, ccr.cc.channelzID, "ccResolverWrapper: reporting error to cc: %v", err)
ccr.poll(ccr.cc.updateResolverState(resolver.State{}, err))
}
@ -161,7 +161,7 @@ func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) {
if ccr.done.HasFired() {
return
}
channelz.Infof(ccr.cc.channelzID, "ccResolverWrapper: sending new addresses to cc: %v", addrs)
channelz.Infof(logger, ccr.cc.channelzID, "ccResolverWrapper: sending new addresses to cc: %v", addrs)
if channelz.IsOn() {
ccr.addChannelzTraceEvent(resolver.State{Addresses: addrs, ServiceConfig: ccr.curState.ServiceConfig})
}
@ -175,14 +175,14 @@ func (ccr *ccResolverWrapper) NewServiceConfig(sc string) {
if ccr.done.HasFired() {
return
}
channelz.Infof(ccr.cc.channelzID, "ccResolverWrapper: got new service config: %v", sc)
channelz.Infof(logger, ccr.cc.channelzID, "ccResolverWrapper: got new service config: %v", sc)
if ccr.cc.dopts.disableServiceConfig {
channelz.Info(ccr.cc.channelzID, "Service config lookups disabled; ignoring config")
channelz.Info(logger, ccr.cc.channelzID, "Service config lookups disabled; ignoring config")
return
}
scpr := parseServiceConfig(sc)
if scpr.Err != nil {
channelz.Warningf(ccr.cc.channelzID, "ccResolverWrapper: error parsing service config: %v", scpr.Err)
channelz.Warningf(logger, ccr.cc.channelzID, "ccResolverWrapper: error parsing service config: %v", scpr.Err)
ccr.poll(balancer.ErrBadResolverState)
return
}
@ -215,7 +215,7 @@ func (ccr *ccResolverWrapper) addChannelzTraceEvent(s resolver.State) {
} else if len(ccr.curState.Addresses) == 0 && len(s.Addresses) > 0 {
updates = append(updates, "resolver returned new addresses")
}
channelz.AddTraceEvent(ccr.cc.channelzID, 0, &channelz.TraceEventDesc{
channelz.AddTraceEvent(logger, ccr.cc.channelzID, 0, &channelz.TraceEventDesc{
Desc: fmt.Sprintf("Resolver state updated: %+v (%v)", s, strings.Join(updates, "; ")),
Severity: channelz.CtINFO,
})

Просмотреть файл

@ -59,6 +59,7 @@ const (
)
var statusOK = status.New(codes.OK, "")
var logger = grpclog.Component("core")
type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
@ -223,7 +224,7 @@ func InitialConnWindowSize(s int32) ServerOption {
// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
if kp.Time > 0 && kp.Time < time.Second {
grpclog.Warning("Adjusting keepalive ping interval to minimum period of 1s")
logger.Warning("Adjusting keepalive ping interval to minimum period of 1s")
kp.Time = time.Second
}
@ -537,7 +538,7 @@ func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
ht := reflect.TypeOf(sd.HandlerType).Elem()
st := reflect.TypeOf(ss)
if !st.Implements(ht) {
grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
logger.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
}
s.register(sd, ss)
}
@ -547,10 +548,10 @@ func (s *Server) register(sd *ServiceDesc, ss interface{}) {
defer s.mu.Unlock()
s.printf("RegisterService(%q)", sd.ServiceName)
if s.serve {
grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
logger.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
}
if _, ok := s.m[sd.ServiceName]; ok {
grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
logger.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
}
srv := &service{
server: ss,
@ -756,7 +757,7 @@ func (s *Server) handleRawConn(rawConn net.Conn) {
s.mu.Lock()
s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
s.mu.Unlock()
channelz.Warningf(s.channelzID, "grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
channelz.Warningf(logger, s.channelzID, "grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
rawConn.Close()
}
rawConn.SetDeadline(time.Time{})
@ -803,7 +804,7 @@ func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) tr
s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
s.mu.Unlock()
c.Close()
channelz.Warning(s.channelzID, "grpc: Server.Serve failed to create ServerTransport: ", err)
channelz.Warning(logger, s.channelzID, "grpc: Server.Serve failed to create ServerTransport: ", err)
return nil
}
@ -957,12 +958,12 @@ func (s *Server) incrCallsFailed() {
func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
if err != nil {
channelz.Error(s.channelzID, "grpc: server failed to encode response: ", err)
channelz.Error(logger, s.channelzID, "grpc: server failed to encode response: ", err)
return err
}
compData, err := compress(data, cp, comp)
if err != nil {
channelz.Error(s.channelzID, "grpc: server failed to compress response: ", err)
channelz.Error(logger, s.channelzID, "grpc: server failed to compress response: ", err)
return err
}
hdr, payload := msgHeader(data, compData)
@ -1136,7 +1137,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
if err != nil {
if st, ok := status.FromError(err); ok {
if e := t.WriteStatus(stream, st); e != nil {
channelz.Warningf(s.channelzID, "grpc: Server.processUnaryRPC failed to write status %v", e)
channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status %v", e)
}
}
return err
@ -1181,7 +1182,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
trInfo.tr.SetError()
}
if e := t.WriteStatus(stream, appStatus); e != nil {
channelz.Warningf(s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
}
if binlog != nil {
if h, _ := stream.Header(); h.Len() > 0 {
@ -1210,7 +1211,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
}
if sts, ok := status.FromError(err); ok {
if e := t.WriteStatus(stream, sts); e != nil {
channelz.Warningf(s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
}
} else {
switch st := err.(type) {
@ -1478,7 +1479,7 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
trInfo.tr.SetError()
}
channelz.Warningf(s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
}
if trInfo != nil {
trInfo.tr.Finish()
@ -1519,7 +1520,7 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
trInfo.tr.SetError()
}
channelz.Warningf(s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
}
if trInfo != nil {
trInfo.tr.Finish()

Просмотреть файл

@ -27,7 +27,6 @@ import (
"time"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/serviceconfig"
@ -269,7 +268,7 @@ func parseServiceConfig(js string) *serviceconfig.ParseResult {
var rsc jsonSC
err := json.Unmarshal([]byte(js), &rsc)
if err != nil {
grpclog.Warningf("grpc: parseServiceConfig error unmarshaling %s due to %v", js, err)
logger.Warningf("grpc: parseServiceConfig error unmarshaling %s due to %v", js, err)
return &serviceconfig.ParseResult{Err: err}
}
sc := ServiceConfig{
@ -295,7 +294,7 @@ func parseServiceConfig(js string) *serviceconfig.ParseResult {
}
d, err := parseDuration(m.Timeout)
if err != nil {
grpclog.Warningf("grpc: parseServiceConfig error unmarshaling %s due to %v", js, err)
logger.Warningf("grpc: parseServiceConfig error unmarshaling %s due to %v", js, err)
return &serviceconfig.ParseResult{Err: err}
}
@ -304,7 +303,7 @@ func parseServiceConfig(js string) *serviceconfig.ParseResult {
Timeout: d,
}
if mc.retryPolicy, err = convertRetryPolicy(m.RetryPolicy); err != nil {
grpclog.Warningf("grpc: parseServiceConfig error unmarshaling %s due to %v", js, err)
logger.Warningf("grpc: parseServiceConfig error unmarshaling %s due to %v", js, err)
return &serviceconfig.ParseResult{Err: err}
}
if m.MaxRequestMessageBytes != nil {
@ -357,7 +356,7 @@ func convertRetryPolicy(jrp *jsonRetryPolicy) (p *retryPolicy, err error) {
*mb <= 0 ||
jrp.BackoffMultiplier <= 0 ||
len(jrp.RetryableStatusCodes) == 0 {
grpclog.Warningf("grpc: ignoring retry policy %v due to illegal configuration", jrp)
logger.Warningf("grpc: ignoring retry policy %v due to illegal configuration", jrp)
return nil, nil
}

Просмотреть файл

@ -510,13 +510,13 @@ func (cs *clientStream) shouldRetry(err error) error {
if len(sps) == 1 {
var e error
if pushback, e = strconv.Atoi(sps[0]); e != nil || pushback < 0 {
channelz.Infof(cs.cc.channelzID, "Server retry pushback specified to abort (%q).", sps[0])
channelz.Infof(logger, cs.cc.channelzID, "Server retry pushback specified to abort (%q).", sps[0])
cs.retryThrottler.throttle() // This counts as a failure for throttling.
return err
}
hasPushback = true
} else if len(sps) > 1 {
channelz.Warningf(cs.cc.channelzID, "Server retry pushback specified multiple values (%q); not retrying.", sps)
channelz.Warningf(logger, cs.cc.channelzID, "Server retry pushback specified multiple values (%q); not retrying.", sps)
cs.retryThrottler.throttle() // This counts as a failure for throttling.
return err
}

Просмотреть файл

@ -28,6 +28,7 @@ import (
"google.golang.org/grpc/grpclog"
)
var logger = grpclog.Component("xds")
var errAllPrioritiesRemoved = errors.New("eds: no locality is provided, all priorities are removed")
// handlePriorityChange handles priority after EDS adds/removes a
@ -134,13 +135,13 @@ func (edsImpl *edsBalancerImpl) handlePriorityWithNewState(priority priorityType
defer edsImpl.priorityMu.Unlock()
if !edsImpl.priorityInUse.isSet() {
grpclog.Infof("eds: received picker update when no priority is in use (EDS returned an empty list)")
logger.Infof("eds: received picker update when no priority is in use (EDS returned an empty list)")
return false
}
if edsImpl.priorityInUse.higherThan(priority) {
// Lower priorities should all be closed, this is an unexpected update.
grpclog.Infof("eds: received picker update from priority lower then priorityInUse")
logger.Infof("eds: received picker update from priority lower then priorityInUse")
return false
}

Просмотреть файл

@ -36,6 +36,8 @@ import (
const negativeOneUInt64 = ^uint64(0)
var logger = grpclog.Component("xds")
// Store defines the interface for a load store. It keeps loads and can report
// them to a server when requested.
type Store interface {
@ -310,25 +312,25 @@ func (ls *lrsStore) ReportTo(ctx context.Context, cc *grpc.ClientConn, clusterNa
doBackoff = true
stream, err := c.StreamLoadStats(ctx)
if err != nil {
grpclog.Warningf("lrs: failed to create stream: %v", err)
logger.Warningf("lrs: failed to create stream: %v", err)
continue
}
grpclog.Infof("lrs: created LRS stream")
logger.Infof("lrs: created LRS stream")
req := &lrspb.LoadStatsRequest{Node: node}
grpclog.Infof("lrs: sending init LoadStatsRequest: %v", req)
logger.Infof("lrs: sending init LoadStatsRequest: %v", req)
if err := stream.Send(req); err != nil {
grpclog.Warningf("lrs: failed to send first request: %v", err)
logger.Warningf("lrs: failed to send first request: %v", err)
continue
}
first, err := stream.Recv()
if err != nil {
grpclog.Warningf("lrs: failed to receive first response: %v", err)
logger.Warningf("lrs: failed to receive first response: %v", err)
continue
}
grpclog.Infof("lrs: received first LoadStatsResponse: %+v", first)
logger.Infof("lrs: received first LoadStatsResponse: %+v", first)
interval, err := ptypes.Duration(first.LoadReportingInterval)
if err != nil {
grpclog.Warningf("lrs: failed to convert report interval: %v", err)
logger.Warningf("lrs: failed to convert report interval: %v", err)
continue
}
// The LRS client should join the clusters it knows with the cluster
@ -343,12 +345,12 @@ func (ls *lrsStore) ReportTo(ctx context.Context, cc *grpc.ClientConn, clusterNa
}
}
if !clusterFoundInResponse {
grpclog.Warningf("lrs: received clusters %v does not contain expected {%v}", first.Clusters, clusterName)
logger.Warningf("lrs: received clusters %v does not contain expected {%v}", first.Clusters, clusterName)
continue
}
if first.ReportEndpointGranularity {
// TODO: fixme to support per endpoint loads.
grpclog.Warningf("lrs: endpoint loads requested, but not supported by current implementation")
logger.Warningf("lrs: endpoint loads requested, but not supported by current implementation")
continue
}
@ -369,9 +371,9 @@ func (ls *lrsStore) sendLoads(ctx context.Context, stream lrsgrpc.LoadReportingS
return
}
req := &lrspb.LoadStatsRequest{ClusterStats: ls.buildStats(clusterName)}
grpclog.Infof("lrs: sending LRS loads: %+v", req)
logger.Infof("lrs: sending LRS loads: %+v", req)
if err := stream.Send(req); err != nil {
grpclog.Warningf("lrs: failed to send report: %v", err)
logger.Warningf("lrs: failed to send report: %v", err)
return
}
}

Просмотреть файл

@ -27,6 +27,8 @@ import (
const mdKey = "X-Endpoint-Load-Metrics-Bin"
var logger = grpclog.Component("xds")
// toBytes converts a orca load report into bytes.
func toBytes(r *orcapb.OrcaLoadReport) []byte {
if r == nil {
@ -35,7 +37,7 @@ func toBytes(r *orcapb.OrcaLoadReport) []byte {
b, err := proto.Marshal(r)
if err != nil {
grpclog.Warningf("orca: failed to marshal load report: %v", err)
logger.Warningf("orca: failed to marshal load report: %v", err)
return nil
}
return b
@ -54,7 +56,7 @@ func ToMetadata(r *orcapb.OrcaLoadReport) metadata.MD {
func fromBytes(b []byte) *orcapb.OrcaLoadReport {
ret := new(orcapb.OrcaLoadReport)
if err := proto.Unmarshal(b, ret); err != nil {
grpclog.Warningf("orca: failed to unmarshal load report: %v", err)
logger.Warningf("orca: failed to unmarshal load report: %v", err)
return nil
}
return ret

Просмотреть файл

@ -30,6 +30,8 @@ import (
const nodeMetadataHostnameKey = "PROXYLESS_CLIENT_HOSTNAME"
var logger = grpclog.Component("xds")
// ReportLoad sends the load of the given clusterName from loadStore to the
// given server. If the server is not an empty string, and is different from the
// xds server, a new ClientConn will be created.
@ -53,7 +55,7 @@ func (c *Client) ReportLoad(server string, clusterName string, loadStore lrs.Sto
ccNew, err := grpc.Dial(server, dopts...)
if err != nil {
// An error from a non-blocking dial indicates something serious.
grpclog.Infof("xds: failed to dial load report server {%s}: %v", server, err)
logger.Infof("xds: failed to dial load report server {%s}: %v", server, err)
return func() {}
}
cc = ccNew