From e4ddcb37c1326e8eef7abbcc84effb016c7a3336 Mon Sep 17 00:00:00 2001 From: Alexander Morozov Date: Mon, 12 Sep 2016 10:38:55 -0700 Subject: [PATCH] libcontainerd: use healthcheck to track containerd conn Signed-off-by: Alexander Morozov --- Dockerfile | 2 +- Dockerfile.aarch64 | 2 +- Dockerfile.armhf | 2 +- Dockerfile.ppc64le | 2 +- Dockerfile.s390x | 2 +- Dockerfile.simple | 2 +- hack/vendor.sh | 2 +- libcontainerd/remote_linux.go | 70 +++---- .../grpc/health/grpc_health_v1/health.pb.go | 172 ++++++++++++++++++ .../grpc/health/grpc_health_v1/health.proto | 20 ++ 10 files changed, 237 insertions(+), 39 deletions(-) create mode 100644 vendor/src/google.golang.org/grpc/health/grpc_health_v1/health.pb.go create mode 100644 vendor/src/google.golang.org/grpc/health/grpc_health_v1/health.proto diff --git a/Dockerfile b/Dockerfile index 65b35be024..1a96e4dae8 100644 --- a/Dockerfile +++ b/Dockerfile @@ -243,7 +243,7 @@ RUN set -x \ && rm -rf "$GOPATH" # Install containerd -ENV CONTAINERD_COMMIT 35a736c471ccd3ebfc7b80ceeb0ee303129acd61 +ENV CONTAINERD_COMMIT 4c21ad662f71af56c0e6b29c0afef72df441d1ff RUN set -x \ && export GOPATH="$(mktemp -d)" \ && git clone https://github.com/docker/containerd.git "$GOPATH/src/github.com/docker/containerd" \ diff --git a/Dockerfile.aarch64 b/Dockerfile.aarch64 index 212259bf02..78b1bf08b8 100644 --- a/Dockerfile.aarch64 +++ b/Dockerfile.aarch64 @@ -186,7 +186,7 @@ RUN set -x \ && rm -rf "$GOPATH" # Install containerd -ENV CONTAINERD_COMMIT 35a736c471ccd3ebfc7b80ceeb0ee303129acd61 +ENV CONTAINERD_COMMIT 4c21ad662f71af56c0e6b29c0afef72df441d1ff RUN set -x \ && export GOPATH="$(mktemp -d)" \ && git clone https://github.com/docker/containerd.git "$GOPATH/src/github.com/docker/containerd" \ diff --git a/Dockerfile.armhf b/Dockerfile.armhf index 9722381b89..2c11144c5b 100644 --- a/Dockerfile.armhf +++ b/Dockerfile.armhf @@ -184,7 +184,7 @@ RUN set -x \ && rm -rf "$GOPATH" # Install containerd -ENV CONTAINERD_COMMIT 35a736c471ccd3ebfc7b80ceeb0ee303129acd61 +ENV CONTAINERD_COMMIT 4c21ad662f71af56c0e6b29c0afef72df441d1ff RUN set -x \ && export GOPATH="$(mktemp -d)" \ && git clone https://github.com/docker/containerd.git "$GOPATH/src/github.com/docker/containerd" \ diff --git a/Dockerfile.ppc64le b/Dockerfile.ppc64le index e0e25c00d8..28ebb8a1ed 100644 --- a/Dockerfile.ppc64le +++ b/Dockerfile.ppc64le @@ -204,7 +204,7 @@ RUN set -x \ && rm -rf "$GOPATH" # Install containerd -ENV CONTAINERD_COMMIT 35a736c471ccd3ebfc7b80ceeb0ee303129acd61 +ENV CONTAINERD_COMMIT 4c21ad662f71af56c0e6b29c0afef72df441d1ff RUN set -x \ && export GOPATH="$(mktemp -d)" \ && git clone https://github.com/docker/containerd.git "$GOPATH/src/github.com/docker/containerd" \ diff --git a/Dockerfile.s390x b/Dockerfile.s390x index 1d57f797c1..cf024a69bc 100644 --- a/Dockerfile.s390x +++ b/Dockerfile.s390x @@ -196,7 +196,7 @@ RUN set -x \ && rm -rf "$GOPATH" # Install containerd -ENV CONTAINERD_COMMIT 35a736c471ccd3ebfc7b80ceeb0ee303129acd61 +ENV CONTAINERD_COMMIT 4c21ad662f71af56c0e6b29c0afef72df441d1ff RUN set -x \ && export GOPATH="$(mktemp -d)" \ && git clone https://github.com/docker/containerd.git "$GOPATH/src/github.com/docker/containerd" \ diff --git a/Dockerfile.simple b/Dockerfile.simple index d24a41041d..623bf43750 100644 --- a/Dockerfile.simple +++ b/Dockerfile.simple @@ -68,7 +68,7 @@ RUN set -x \ && rm -rf "$GOPATH" # Install containerd -ENV CONTAINERD_COMMIT 35a736c471ccd3ebfc7b80ceeb0ee303129acd61 +ENV CONTAINERD_COMMIT 4c21ad662f71af56c0e6b29c0afef72df441d1ff RUN set -x \ && export GOPATH="$(mktemp -d)" \ && git clone https://github.com/docker/containerd.git "$GOPATH/src/github.com/docker/containerd" \ diff --git a/hack/vendor.sh b/hack/vendor.sh index 17b32ce577..5efd4a444b 100755 --- a/hack/vendor.sh +++ b/hack/vendor.sh @@ -141,7 +141,7 @@ clone git google.golang.org/cloud dae7e3d993bc3812a2185af60552bb6b847e52a0 https clone git github.com/docker/docker-credential-helpers v0.3.0 # containerd -clone git github.com/docker/containerd 35a736c471ccd3ebfc7b80ceeb0ee303129acd61 +clone git github.com/docker/containerd 4c21ad662f71af56c0e6b29c0afef72df441d1ff # cluster clone git github.com/docker/swarmkit 27fbaef4ceed648bb575969ccc9083a6e104a719 diff --git a/libcontainerd/remote_linux.go b/libcontainerd/remote_linux.go index 7afe0da399..e7fcb257b6 100644 --- a/libcontainerd/remote_linux.go +++ b/libcontainerd/remote_linux.go @@ -25,18 +25,20 @@ import ( "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/transport" ) const ( - maxConnectionRetryCount = 3 - connectionRetryDelay = 3 * time.Second - containerdShutdownTimeout = 15 * time.Second - containerdBinary = "docker-containerd" - containerdPidFilename = "docker-containerd.pid" - containerdSockFilename = "docker-containerd.sock" - containerdStateDir = "containerd" - eventTimestampFilename = "event.ts" + maxConnectionRetryCount = 3 + connectionRetryDelay = 3 * time.Second + containerdHealthCheckTimeout = 3 * time.Second + containerdShutdownTimeout = 15 * time.Second + containerdBinary = "docker-containerd" + containerdPidFilename = "docker-containerd.pid" + containerdSockFilename = "docker-containerd.sock" + containerdStateDir = "containerd" + eventTimestampFilename = "event.ts" ) type remote struct { @@ -134,37 +136,41 @@ func (r *remote) UpdateOptions(options ...RemoteOption) error { func (r *remote) handleConnectionChange() { var transientFailureCount = 0 - state := grpc.Idle + + ticker := time.NewTicker(500 * time.Millisecond) + defer ticker.Stop() + healthClient := grpc_health_v1.NewHealthClient(r.rpcConn) + for { - s, err := r.rpcConn.WaitForStateChange(context.Background(), state) - if err != nil { - break + <-ticker.C + ctx, cancel := context.WithTimeout(context.Background(), containerdHealthCheckTimeout) + _, err := healthClient.Check(ctx, &grpc_health_v1.HealthCheckRequest{}) + cancel() + if err == nil { + continue } - state = s - logrus.Debugf("libcontainerd: containerd connection state change: %v", s) + + logrus.Debugf("libcontainerd: containerd health check returned error: %v", err) if r.daemonPid != -1 { - switch state { - case grpc.TransientFailure: - // Reset state to be notified of next failure - transientFailureCount++ - if transientFailureCount >= maxConnectionRetryCount { - transientFailureCount = 0 - if utils.IsProcessAlive(r.daemonPid) { - utils.KillProcess(r.daemonPid) - } - <-r.daemonWaitCh - if err := r.runContainerdDaemon(); err != nil { //FIXME: Handle error - logrus.Errorf("libcontainerd: error restarting containerd: %v", err) - } - } else { - state = grpc.Idle - time.Sleep(connectionRetryDelay) - } - case grpc.Shutdown: + if strings.Contains(err.Error(), "is closing") { // Well, we asked for it to stop, just return return } + // all other errors are transient + // Reset state to be notified of next failure + transientFailureCount++ + if transientFailureCount >= maxConnectionRetryCount { + transientFailureCount = 0 + if utils.IsProcessAlive(r.daemonPid) { + utils.KillProcess(r.daemonPid) + } + <-r.daemonWaitCh + if err := r.runContainerdDaemon(); err != nil { //FIXME: Handle error + logrus.Errorf("libcontainerd: error restarting containerd: %v", err) + } + continue + } } } } diff --git a/vendor/src/google.golang.org/grpc/health/grpc_health_v1/health.pb.go b/vendor/src/google.golang.org/grpc/health/grpc_health_v1/health.pb.go new file mode 100644 index 0000000000..d9550c728b --- /dev/null +++ b/vendor/src/google.golang.org/grpc/health/grpc_health_v1/health.pb.go @@ -0,0 +1,172 @@ +// Code generated by protoc-gen-go. +// source: health/grpc_health_v1/health.proto +// DO NOT EDIT! + +/* +Package grpc_health_v1 is a generated protocol buffer package. + +It is generated from these files: + health/grpc_health_v1/health.proto + +It has these top-level messages: + HealthCheckRequest + HealthCheckResponse +*/ +package grpc_health_v1 + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" + +import ( + context "golang.org/x/net/context" + grpc "google.golang.org/grpc" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +const _ = proto.ProtoPackageIsVersion1 + +type HealthCheckResponse_ServingStatus int32 + +const ( + HealthCheckResponse_UNKNOWN HealthCheckResponse_ServingStatus = 0 + HealthCheckResponse_SERVING HealthCheckResponse_ServingStatus = 1 + HealthCheckResponse_NOT_SERVING HealthCheckResponse_ServingStatus = 2 +) + +var HealthCheckResponse_ServingStatus_name = map[int32]string{ + 0: "UNKNOWN", + 1: "SERVING", + 2: "NOT_SERVING", +} +var HealthCheckResponse_ServingStatus_value = map[string]int32{ + "UNKNOWN": 0, + "SERVING": 1, + "NOT_SERVING": 2, +} + +func (x HealthCheckResponse_ServingStatus) String() string { + return proto.EnumName(HealthCheckResponse_ServingStatus_name, int32(x)) +} +func (HealthCheckResponse_ServingStatus) EnumDescriptor() ([]byte, []int) { + return fileDescriptor0, []int{1, 0} +} + +type HealthCheckRequest struct { + Service string `protobuf:"bytes,1,opt,name=service" json:"service,omitempty"` +} + +func (m *HealthCheckRequest) Reset() { *m = HealthCheckRequest{} } +func (m *HealthCheckRequest) String() string { return proto.CompactTextString(m) } +func (*HealthCheckRequest) ProtoMessage() {} +func (*HealthCheckRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } + +type HealthCheckResponse struct { + Status HealthCheckResponse_ServingStatus `protobuf:"varint,1,opt,name=status,enum=grpc.health.v1.HealthCheckResponse_ServingStatus" json:"status,omitempty"` +} + +func (m *HealthCheckResponse) Reset() { *m = HealthCheckResponse{} } +func (m *HealthCheckResponse) String() string { return proto.CompactTextString(m) } +func (*HealthCheckResponse) ProtoMessage() {} +func (*HealthCheckResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } + +func init() { + proto.RegisterType((*HealthCheckRequest)(nil), "grpc.health.v1.HealthCheckRequest") + proto.RegisterType((*HealthCheckResponse)(nil), "grpc.health.v1.HealthCheckResponse") + proto.RegisterEnum("grpc.health.v1.HealthCheckResponse_ServingStatus", HealthCheckResponse_ServingStatus_name, HealthCheckResponse_ServingStatus_value) +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion2 + +// Client API for Health service + +type HealthClient interface { + Check(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckResponse, error) +} + +type healthClient struct { + cc *grpc.ClientConn +} + +func NewHealthClient(cc *grpc.ClientConn) HealthClient { + return &healthClient{cc} +} + +func (c *healthClient) Check(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckResponse, error) { + out := new(HealthCheckResponse) + err := grpc.Invoke(ctx, "/grpc.health.v1.Health/Check", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// Server API for Health service + +type HealthServer interface { + Check(context.Context, *HealthCheckRequest) (*HealthCheckResponse, error) +} + +func RegisterHealthServer(s *grpc.Server, srv HealthServer) { + s.RegisterService(&_Health_serviceDesc, srv) +} + +func _Health_Check_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(HealthCheckRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(HealthServer).Check(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/grpc.health.v1.Health/Check", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(HealthServer).Check(ctx, req.(*HealthCheckRequest)) + } + return interceptor(ctx, in, info, handler) +} + +var _Health_serviceDesc = grpc.ServiceDesc{ + ServiceName: "grpc.health.v1.Health", + HandlerType: (*HealthServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Check", + Handler: _Health_Check_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, +} + +var fileDescriptor0 = []byte{ + // 209 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0x52, 0xca, 0x48, 0x4d, 0xcc, + 0x29, 0xc9, 0xd0, 0x4f, 0x2f, 0x2a, 0x48, 0x8e, 0x87, 0xb0, 0xe3, 0xcb, 0x0c, 0xf5, 0x21, 0x2c, + 0xbd, 0x82, 0xa2, 0xfc, 0x92, 0x7c, 0x21, 0x3e, 0x90, 0xa4, 0x1e, 0x54, 0xa8, 0xcc, 0x50, 0x49, + 0x95, 0x4b, 0xc8, 0x03, 0xcc, 0x71, 0xce, 0x48, 0x4d, 0xce, 0x0e, 0x4a, 0x2d, 0x2c, 0x4d, 0x2d, + 0x2e, 0x11, 0xe2, 0xe7, 0x62, 0x2f, 0x4e, 0x2d, 0x2a, 0xcb, 0x4c, 0x4e, 0x95, 0x60, 0x54, 0x60, + 0xd4, 0xe0, 0x54, 0x9a, 0xc2, 0xc8, 0x25, 0x8c, 0xa2, 0xae, 0xb8, 0x20, 0x3f, 0xaf, 0x38, 0x55, + 0xc8, 0x91, 0x8b, 0xad, 0xb8, 0x24, 0xb1, 0xa4, 0xb4, 0x18, 0xac, 0x8e, 0xcf, 0xc8, 0x50, 0x0f, + 0xd5, 0x7c, 0x3d, 0x2c, 0x9a, 0xf4, 0x82, 0x41, 0x46, 0xe7, 0xa5, 0x07, 0x83, 0x35, 0x2a, 0x59, + 0x71, 0xf1, 0xa2, 0x08, 0x08, 0x71, 0x73, 0xb1, 0x87, 0xfa, 0x79, 0xfb, 0xf9, 0x87, 0xfb, 0x09, + 0x30, 0x80, 0x38, 0xc1, 0xae, 0x41, 0x61, 0x9e, 0x7e, 0xee, 0x02, 0x8c, 0x40, 0x67, 0x71, 0xfb, + 0xf9, 0x87, 0xc4, 0xc3, 0x04, 0x98, 0x8c, 0xa2, 0xb8, 0xd8, 0x20, 0x16, 0x08, 0x05, 0x70, 0xb1, + 0x82, 0x2d, 0x11, 0x52, 0xc2, 0xeb, 0x02, 0xb0, 0xf7, 0xa4, 0x94, 0x89, 0x70, 0x65, 0x12, 0x1b, + 0x38, 0xc0, 0x8c, 0x01, 0x01, 0x00, 0x00, 0xff, 0xff, 0xa9, 0xf7, 0x2e, 0x1f, 0x56, 0x01, 0x00, + 0x00, +} diff --git a/vendor/src/google.golang.org/grpc/health/grpc_health_v1/health.proto b/vendor/src/google.golang.org/grpc/health/grpc_health_v1/health.proto new file mode 100644 index 0000000000..e2dc088925 --- /dev/null +++ b/vendor/src/google.golang.org/grpc/health/grpc_health_v1/health.proto @@ -0,0 +1,20 @@ +syntax = "proto3"; + +package grpc.health.v1; + +message HealthCheckRequest { + string service = 1; +} + +message HealthCheckResponse { + enum ServingStatus { + UNKNOWN = 0; + SERVING = 1; + NOT_SERVING = 2; + } + ServingStatus status = 1; +} + +service Health{ + rpc Check(HealthCheckRequest) returns (HealthCheckResponse); +}