зеркало из https://github.com/microsoft/docker.git
Merge pull request #28599 from aluzzardi/revendor-swarmkit
[master] Revendor SwarmKit to 9bca23b0de42a9ebcc71622a30d646afa1e2b564
This commit is contained in:
Коммит
a0866bd679
|
@ -100,7 +100,7 @@ github.com/docker/containerd 03e5862ec0d8d3b3f750e19fca3ee367e13c090e
|
|||
github.com/tonistiigi/fifo 1405643975692217d6720f8b54aeee1bf2cd5cf4
|
||||
|
||||
# cluster
|
||||
github.com/docker/swarmkit 91c6e2db9c0c91c466a83529ed16649a1de7ccc4
|
||||
github.com/docker/swarmkit 9bca23b0de42a9ebcc71622a30d646afa1e2b564
|
||||
github.com/golang/mock bd3c8e81be01eef76d4b503f5e687d2d1354d2d9
|
||||
github.com/gogo/protobuf v0.3
|
||||
github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a
|
||||
|
|
|
@ -406,37 +406,41 @@ func (a *Agent) UpdateTaskStatus(ctx context.Context, taskID string, status *api
|
|||
}
|
||||
|
||||
// Publisher returns a LogPublisher for the given subscription
|
||||
func (a *Agent) Publisher(ctx context.Context, subscriptionID string) (exec.LogPublisher, error) {
|
||||
// as well as a cancel function that should be called when the log stream
|
||||
// is completed.
|
||||
func (a *Agent) Publisher(ctx context.Context, subscriptionID string) (exec.LogPublisher, func(), error) {
|
||||
// TODO(stevvooe): The level of coordination here is WAY too much for logs.
|
||||
// These should only be best effort and really just buffer until a session is
|
||||
// ready. Ideally, they would use a separate connection completely.
|
||||
|
||||
var (
|
||||
err error
|
||||
client api.LogBroker_PublishLogsClient
|
||||
err error
|
||||
publisher api.LogBroker_PublishLogsClient
|
||||
)
|
||||
|
||||
err = a.withSession(ctx, func(session *session) error {
|
||||
client, err = api.NewLogBrokerClient(session.conn).PublishLogs(ctx)
|
||||
publisher, err = api.NewLogBrokerClient(session.conn).PublishLogs(ctx)
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return exec.LogPublisherFunc(func(ctx context.Context, message api.LogMessage) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
client.CloseSend()
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
publisher.CloseSend()
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
return client.Send(&api.PublishLogsMessage{
|
||||
SubscriptionID: subscriptionID,
|
||||
Messages: []api.LogMessage{message},
|
||||
})
|
||||
}), nil
|
||||
return publisher.Send(&api.PublishLogsMessage{
|
||||
SubscriptionID: subscriptionID,
|
||||
Messages: []api.LogMessage{message},
|
||||
})
|
||||
}), func() {
|
||||
publisher.CloseSend()
|
||||
}, nil
|
||||
}
|
||||
|
||||
// nodeDescriptionWithHostname retrieves node description, and overrides hostname if available
|
||||
|
|
|
@ -69,7 +69,7 @@ func (fn LogPublisherFunc) Publish(ctx context.Context, message api.LogMessage)
|
|||
|
||||
// LogPublisherProvider defines the protocol for receiving a log publisher
|
||||
type LogPublisherProvider interface {
|
||||
Publisher(ctx context.Context, subscriptionID string) (LogPublisher, error)
|
||||
Publisher(ctx context.Context, subscriptionID string) (LogPublisher, func(), error)
|
||||
}
|
||||
|
||||
// ContainerStatuser reports status of a container.
|
||||
|
|
|
@ -226,6 +226,16 @@ func (s *session) logSubscriptions(ctx context.Context) error {
|
|||
|
||||
client := api.NewLogBrokerClient(s.conn)
|
||||
subscriptions, err := client.ListenSubscriptions(ctx, &api.ListenSubscriptionsRequest{})
|
||||
if grpc.Code(err) == codes.Unimplemented {
|
||||
log.Warning("manager does not support log subscriptions")
|
||||
// Don't return, because returning would bounce the session
|
||||
select {
|
||||
case <-s.closed:
|
||||
return errSessionClosed
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -406,12 +406,12 @@ func (w *worker) updateTaskStatus(ctx context.Context, tx *bolt.Tx, taskID strin
|
|||
func (w *worker) Subscribe(ctx context.Context, subscription *api.SubscriptionMessage) error {
|
||||
log.G(ctx).Debugf("Received subscription %s (selector: %v)", subscription.ID, subscription.Selector)
|
||||
|
||||
publisher, err := w.publisherProvider.Publisher(ctx, subscription.ID)
|
||||
publisher, cancel, err := w.publisherProvider.Publisher(ctx, subscription.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Send a close once we're done
|
||||
defer publisher.Publish(ctx, api.LogMessage{})
|
||||
defer cancel()
|
||||
|
||||
match := func(t *api.Task) bool {
|
||||
// TODO(aluzzardi): Consider using maps to limit the iterations.
|
||||
|
@ -436,26 +436,49 @@ func (w *worker) Subscribe(ctx context.Context, subscription *api.SubscriptionMe
|
|||
return false
|
||||
}
|
||||
|
||||
ch, cancel := w.taskevents.Watch()
|
||||
defer cancel()
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
w.mu.Lock()
|
||||
for _, tm := range w.taskManagers {
|
||||
if match(tm.task) {
|
||||
go tm.Logs(ctx, *subscription.Options, publisher)
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
tm.Logs(ctx, *subscription.Options, publisher)
|
||||
}()
|
||||
}
|
||||
}
|
||||
w.mu.Unlock()
|
||||
|
||||
// If follow mode is disabled, wait for the current set of matched tasks
|
||||
// to finish publishing logs, then close the subscription by returning.
|
||||
if subscription.Options == nil || !subscription.Options.Follow {
|
||||
waitCh := make(chan struct{})
|
||||
go func() {
|
||||
defer close(waitCh)
|
||||
wg.Wait()
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-waitCh:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// In follow mode, watch for new tasks. Don't close the subscription
|
||||
// until it's cancelled.
|
||||
ch, cancel := w.taskevents.Watch()
|
||||
defer cancel()
|
||||
for {
|
||||
select {
|
||||
case v := <-ch:
|
||||
w.mu.Lock()
|
||||
task := v.(*api.Task)
|
||||
if match(task) {
|
||||
w.mu.Lock()
|
||||
go w.taskManagers[task.ID].Logs(ctx, *subscription.Options, publisher)
|
||||
w.mu.Unlock()
|
||||
}
|
||||
w.mu.Unlock()
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
|
|
|
@ -153,7 +153,7 @@ func (rca *RootCA) IssueAndSaveNewCertificates(kw KeyWriter, cn, ou, org string)
|
|||
|
||||
// RequestAndSaveNewCertificates gets new certificates issued, either by signing them locally if a signer is
|
||||
// available, or by requesting them from the remote server at remoteAddr.
|
||||
func (rca *RootCA) RequestAndSaveNewCertificates(ctx context.Context, kw KeyWriter, token string, r remotes.Remotes, transport credentials.TransportCredentials, nodeInfo chan<- api.IssueNodeCertificateResponse) (*tls.Certificate, error) {
|
||||
func (rca *RootCA) RequestAndSaveNewCertificates(ctx context.Context, kw KeyWriter, config CertificateRequestConfig) (*tls.Certificate, error) {
|
||||
// Create a new key/pair and CSR
|
||||
csr, key, err := GenerateNewCSR()
|
||||
if err != nil {
|
||||
|
@ -165,7 +165,7 @@ func (rca *RootCA) RequestAndSaveNewCertificates(ctx context.Context, kw KeyWrit
|
|||
// responding properly (for example, it may have just been demoted).
|
||||
var signedCert []byte
|
||||
for i := 0; i != 5; i++ {
|
||||
signedCert, err = GetRemoteSignedCertificate(ctx, csr, token, rca.Pool, r, transport, nodeInfo)
|
||||
signedCert, err = GetRemoteSignedCertificate(ctx, csr, rca.Pool, config)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
|
@ -202,7 +202,7 @@ func (rca *RootCA) RequestAndSaveNewCertificates(ctx context.Context, kw KeyWrit
|
|||
|
||||
var kekUpdate *KEKData
|
||||
for i := 0; i < 5; i++ {
|
||||
kekUpdate, err = rca.getKEKUpdate(ctx, X509Cert, tlsKeyPair, r)
|
||||
kekUpdate, err = rca.getKEKUpdate(ctx, X509Cert, tlsKeyPair, config.Remotes)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
|
@ -545,18 +545,20 @@ func CreateRootCA(rootCN string, paths CertPaths) (RootCA, error) {
|
|||
|
||||
// GetRemoteSignedCertificate submits a CSR to a remote CA server address,
|
||||
// and that is part of a CA identified by a specific certificate pool.
|
||||
func GetRemoteSignedCertificate(ctx context.Context, csr []byte, token string, rootCAPool *x509.CertPool, r remotes.Remotes, creds credentials.TransportCredentials, nodeInfo chan<- api.IssueNodeCertificateResponse) ([]byte, error) {
|
||||
func GetRemoteSignedCertificate(ctx context.Context, csr []byte, rootCAPool *x509.CertPool, config CertificateRequestConfig) ([]byte, error) {
|
||||
if rootCAPool == nil {
|
||||
return nil, errors.New("valid root CA pool required")
|
||||
}
|
||||
|
||||
creds := config.Credentials
|
||||
|
||||
if creds == nil {
|
||||
// This is our only non-MTLS request, and it happens when we are boostraping our TLS certs
|
||||
// We're using CARole as server name, so an external CA doesn't also have to have ManagerRole in the cert SANs
|
||||
creds = credentials.NewTLS(&tls.Config{ServerName: CARole, RootCAs: rootCAPool})
|
||||
}
|
||||
|
||||
conn, peer, err := getGRPCConnection(creds, r)
|
||||
conn, peer, err := getGRPCConnection(creds, config.Remotes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -566,18 +568,13 @@ func GetRemoteSignedCertificate(ctx context.Context, csr []byte, token string, r
|
|||
caClient := api.NewNodeCAClient(conn)
|
||||
|
||||
// Send the Request and retrieve the request token
|
||||
issueRequest := &api.IssueNodeCertificateRequest{CSR: csr, Token: token}
|
||||
issueRequest := &api.IssueNodeCertificateRequest{CSR: csr, Token: config.Token}
|
||||
issueResponse, err := caClient.IssueNodeCertificate(ctx, issueRequest)
|
||||
if err != nil {
|
||||
r.Observe(peer, -remotes.DefaultObservationWeight)
|
||||
config.Remotes.Observe(peer, -remotes.DefaultObservationWeight)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Send back the NodeID on the nodeInfo, so the caller can know what ID was assigned by the CA
|
||||
if nodeInfo != nil {
|
||||
nodeInfo <- *issueResponse
|
||||
}
|
||||
|
||||
statusRequest := &api.NodeCertificateStatusRequest{NodeID: issueResponse.NodeID}
|
||||
expBackoff := events.NewExponentialBackoff(events.ExponentialBackoffConfig{
|
||||
Base: time.Second,
|
||||
|
@ -592,7 +589,7 @@ func GetRemoteSignedCertificate(ctx context.Context, csr []byte, token string, r
|
|||
defer cancel()
|
||||
statusResponse, err := caClient.NodeCertificateStatus(ctx, statusRequest)
|
||||
if err != nil {
|
||||
r.Observe(peer, -remotes.DefaultObservationWeight)
|
||||
config.Remotes.Observe(peer, -remotes.DefaultObservationWeight)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -608,7 +605,7 @@ func GetRemoteSignedCertificate(ctx context.Context, csr []byte, token string, r
|
|||
// retry until the certificate gets updated per our
|
||||
// current request.
|
||||
if bytes.Equal(statusResponse.Certificate.CSR, csr) {
|
||||
r.Observe(peer, remotes.DefaultObservationWeight)
|
||||
config.Remotes.Observe(peer, remotes.DefaultObservationWeight)
|
||||
return statusResponse.Certificate.Certificate, nil
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ import (
|
|||
"github.com/docker/swarmkit/log"
|
||||
"github.com/docker/swarmkit/remotes"
|
||||
"github.com/pkg/errors"
|
||||
"google.golang.org/grpc/credentials"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
@ -52,8 +53,13 @@ const (
|
|||
// SecurityConfig is used to represent a node's security configuration. It includes information about
|
||||
// the RootCA and ServerTLSCreds/ClientTLSCreds transport authenticators to be used for MTLS
|
||||
type SecurityConfig struct {
|
||||
// mu protects against concurrent access to fields inside the structure.
|
||||
mu sync.Mutex
|
||||
|
||||
// renewalMu makes sure only one certificate renewal attempt happens at
|
||||
// a time. It should never be locked after mu is already locked.
|
||||
renewalMu sync.Mutex
|
||||
|
||||
rootCA *RootCA
|
||||
externalCA *ExternalCA
|
||||
keyReadWriter *KeyReadWriter
|
||||
|
@ -234,89 +240,138 @@ func DownloadRootCA(ctx context.Context, paths CertPaths, token string, r remote
|
|||
return rootCA, nil
|
||||
}
|
||||
|
||||
// LoadOrCreateSecurityConfig encapsulates the security logic behind joining a cluster.
|
||||
// Every node requires at least a set of TLS certificates with which to join the cluster with.
|
||||
// In the case of a manager, these certificates will be used both for client and server credentials.
|
||||
func LoadOrCreateSecurityConfig(ctx context.Context, rootCA RootCA, token, proposedRole string, remotes remotes.Remotes, nodeInfo chan<- api.IssueNodeCertificateResponse, krw *KeyReadWriter) (*SecurityConfig, error) {
|
||||
// LoadSecurityConfig loads TLS credentials from disk, or returns an error if
|
||||
// these credentials do not exist or are unusable.
|
||||
func LoadSecurityConfig(ctx context.Context, rootCA RootCA, krw *KeyReadWriter) (*SecurityConfig, error) {
|
||||
ctx = log.WithModule(ctx, "tls")
|
||||
|
||||
// At this point we've successfully loaded the CA details from disk, or
|
||||
// successfully downloaded them remotely. The next step is to try to
|
||||
// load our certificates.
|
||||
clientTLSCreds, serverTLSCreds, err := LoadTLSCreds(rootCA, krw)
|
||||
|
||||
// Read both the Cert and Key from disk
|
||||
cert, key, err := krw.Read()
|
||||
if err != nil {
|
||||
if _, ok := errors.Cause(err).(ErrInvalidKEK); ok {
|
||||
return nil, err
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
log.G(ctx).WithError(err).Debugf("no node credentials found in: %s", krw.Target())
|
||||
// Create an x509 certificate out of the contents on disk
|
||||
certBlock, _ := pem.Decode([]byte(cert))
|
||||
if certBlock == nil {
|
||||
return nil, errors.New("failed to parse certificate PEM")
|
||||
}
|
||||
|
||||
var (
|
||||
tlsKeyPair *tls.Certificate
|
||||
err error
|
||||
)
|
||||
// Create an X509Cert so we can .Verify()
|
||||
X509Cert, err := x509.ParseCertificate(certBlock.Bytes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if rootCA.CanSign() {
|
||||
// Create a new random ID for this certificate
|
||||
cn := identity.NewID()
|
||||
org := identity.NewID()
|
||||
// Include our root pool
|
||||
opts := x509.VerifyOptions{
|
||||
Roots: rootCA.Pool,
|
||||
}
|
||||
|
||||
if nodeInfo != nil {
|
||||
nodeInfo <- api.IssueNodeCertificateResponse{
|
||||
NodeID: cn,
|
||||
NodeMembership: api.NodeMembershipAccepted,
|
||||
}
|
||||
}
|
||||
tlsKeyPair, err = rootCA.IssueAndSaveNewCertificates(krw, cn, proposedRole, org)
|
||||
if err != nil {
|
||||
log.G(ctx).WithFields(logrus.Fields{
|
||||
"node.id": cn,
|
||||
"node.role": proposedRole,
|
||||
}).WithError(err).Errorf("failed to issue and save new certificate")
|
||||
return nil, err
|
||||
}
|
||||
// Check to see if this certificate was signed by our CA, and isn't expired
|
||||
if _, err := X509Cert.Verify(opts); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Now that we know this certificate is valid, create a TLS Certificate for our
|
||||
// credentials
|
||||
keyPair, err := tls.X509KeyPair(cert, key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Load the Certificates as server credentials
|
||||
serverTLSCreds, err := rootCA.NewServerTLSCredentials(&keyPair)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Load the Certificates also as client credentials.
|
||||
// Both workers and managers always connect to remote managers,
|
||||
// so ServerName is always set to ManagerRole here.
|
||||
clientTLSCreds, err := rootCA.NewClientTLSCredentials(&keyPair, ManagerRole)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
log.G(ctx).WithFields(logrus.Fields{
|
||||
"node.id": clientTLSCreds.NodeID(),
|
||||
"node.role": clientTLSCreds.Role(),
|
||||
}).Debug("loaded node credentials")
|
||||
|
||||
return NewSecurityConfig(&rootCA, krw, clientTLSCreds, serverTLSCreds), nil
|
||||
}
|
||||
|
||||
// CertificateRequestConfig contains the information needed to request a
|
||||
// certificate from a remote CA.
|
||||
type CertificateRequestConfig struct {
|
||||
// Token is the join token that authenticates us with the CA.
|
||||
Token string
|
||||
// Remotes is the set of remote CAs.
|
||||
Remotes remotes.Remotes
|
||||
// Credentials provides transport credentials for communicating with the
|
||||
// remote server.
|
||||
Credentials credentials.TransportCredentials
|
||||
}
|
||||
|
||||
// CreateSecurityConfig creates a new key and cert for this node, either locally
|
||||
// or via a remote CA.
|
||||
func (rootCA RootCA) CreateSecurityConfig(ctx context.Context, krw *KeyReadWriter, config CertificateRequestConfig) (*SecurityConfig, error) {
|
||||
ctx = log.WithModule(ctx, "tls")
|
||||
|
||||
var (
|
||||
tlsKeyPair *tls.Certificate
|
||||
err error
|
||||
)
|
||||
|
||||
if rootCA.CanSign() {
|
||||
// Create a new random ID for this certificate
|
||||
cn := identity.NewID()
|
||||
org := identity.NewID()
|
||||
|
||||
proposedRole := ManagerRole
|
||||
tlsKeyPair, err = rootCA.IssueAndSaveNewCertificates(krw, cn, proposedRole, org)
|
||||
if err != nil {
|
||||
log.G(ctx).WithFields(logrus.Fields{
|
||||
"node.id": cn,
|
||||
"node.role": proposedRole,
|
||||
}).Debug("issued new TLS certificate")
|
||||
} else {
|
||||
// There was an error loading our Credentials, let's get a new certificate issued
|
||||
// Last argument is nil because at this point we don't have any valid TLS creds
|
||||
tlsKeyPair, err = rootCA.RequestAndSaveNewCertificates(ctx, krw, token, remotes, nil, nodeInfo)
|
||||
if err != nil {
|
||||
log.G(ctx).WithError(err).Error("failed to request save new certificate")
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
// Create the Server TLS Credentials for this node. These will not be used by workers.
|
||||
serverTLSCreds, err = rootCA.NewServerTLSCredentials(tlsKeyPair)
|
||||
if err != nil {
|
||||
}).WithError(err).Errorf("failed to issue and save new certificate")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Create a TLSConfig to be used when this node connects as a client to another remote node.
|
||||
// We're using ManagerRole as remote serverName for TLS host verification
|
||||
clientTLSCreds, err = rootCA.NewClientTLSCredentials(tlsKeyPair, ManagerRole)
|
||||
log.G(ctx).WithFields(logrus.Fields{
|
||||
"node.id": cn,
|
||||
"node.role": proposedRole,
|
||||
}).Debug("issued new TLS certificate")
|
||||
} else {
|
||||
// Request certificate issuance from a remote CA.
|
||||
// Last argument is nil because at this point we don't have any valid TLS creds
|
||||
tlsKeyPair, err = rootCA.RequestAndSaveNewCertificates(ctx, krw, config)
|
||||
if err != nil {
|
||||
log.G(ctx).WithError(err).Error("failed to request save new certificate")
|
||||
return nil, err
|
||||
}
|
||||
log.G(ctx).WithFields(logrus.Fields{
|
||||
"node.id": clientTLSCreds.NodeID(),
|
||||
"node.role": clientTLSCreds.Role(),
|
||||
}).Debugf("new node credentials generated: %s", krw.Target())
|
||||
} else {
|
||||
if nodeInfo != nil {
|
||||
nodeInfo <- api.IssueNodeCertificateResponse{
|
||||
NodeID: clientTLSCreds.NodeID(),
|
||||
NodeMembership: api.NodeMembershipAccepted,
|
||||
}
|
||||
}
|
||||
log.G(ctx).WithFields(logrus.Fields{
|
||||
"node.id": clientTLSCreds.NodeID(),
|
||||
"node.role": clientTLSCreds.Role(),
|
||||
}).Debug("loaded node credentials")
|
||||
}
|
||||
// Create the Server TLS Credentials for this node. These will not be used by workers.
|
||||
serverTLSCreds, err := rootCA.NewServerTLSCredentials(tlsKeyPair)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Create a TLSConfig to be used when this node connects as a client to another remote node.
|
||||
// We're using ManagerRole as remote serverName for TLS host verification
|
||||
clientTLSCreds, err := rootCA.NewClientTLSCredentials(tlsKeyPair, ManagerRole)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
log.G(ctx).WithFields(logrus.Fields{
|
||||
"node.id": clientTLSCreds.NodeID(),
|
||||
"node.role": clientTLSCreds.Role(),
|
||||
}).Debugf("new node credentials generated: %s", krw.Target())
|
||||
|
||||
return NewSecurityConfig(&rootCA, krw, clientTLSCreds, serverTLSCreds), nil
|
||||
}
|
||||
|
@ -324,6 +379,9 @@ func LoadOrCreateSecurityConfig(ctx context.Context, rootCA RootCA, token, propo
|
|||
// RenewTLSConfigNow gets a new TLS cert and key, and updates the security config if provided. This is similar to
|
||||
// RenewTLSConfig, except while that monitors for expiry, and periodically renews, this renews once and is blocking
|
||||
func RenewTLSConfigNow(ctx context.Context, s *SecurityConfig, r remotes.Remotes) error {
|
||||
s.renewalMu.Lock()
|
||||
defer s.renewalMu.Unlock()
|
||||
|
||||
ctx = log.WithModule(ctx, "tls")
|
||||
log := log.G(ctx).WithFields(logrus.Fields{
|
||||
"node.id": s.ClientTLSCreds.NodeID(),
|
||||
|
@ -334,10 +392,10 @@ func RenewTLSConfigNow(ctx context.Context, s *SecurityConfig, r remotes.Remotes
|
|||
rootCA := s.RootCA()
|
||||
tlsKeyPair, err := rootCA.RequestAndSaveNewCertificates(ctx,
|
||||
s.KeyWriter(),
|
||||
"",
|
||||
r,
|
||||
s.ClientTLSCreds,
|
||||
nil)
|
||||
CertificateRequestConfig{
|
||||
Remotes: r,
|
||||
Credentials: s.ClientTLSCreds,
|
||||
})
|
||||
if err != nil {
|
||||
log.WithError(err).Errorf("failed to renew the certificate")
|
||||
return err
|
||||
|
@ -463,61 +521,6 @@ func calculateRandomExpiry(validFrom, validUntil time.Time) time.Duration {
|
|||
return expiry
|
||||
}
|
||||
|
||||
// LoadTLSCreds loads tls credentials from the specified path and verifies that
|
||||
// thay are valid for the RootCA.
|
||||
func LoadTLSCreds(rootCA RootCA, kr KeyReader) (*MutableTLSCreds, *MutableTLSCreds, error) {
|
||||
// Read both the Cert and Key from disk
|
||||
cert, key, err := kr.Read()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// Create an x509 certificate out of the contents on disk
|
||||
certBlock, _ := pem.Decode([]byte(cert))
|
||||
if certBlock == nil {
|
||||
return nil, nil, errors.New("failed to parse certificate PEM")
|
||||
}
|
||||
|
||||
// Create an X509Cert so we can .Verify()
|
||||
X509Cert, err := x509.ParseCertificate(certBlock.Bytes)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// Include our root pool
|
||||
opts := x509.VerifyOptions{
|
||||
Roots: rootCA.Pool,
|
||||
}
|
||||
|
||||
// Check to see if this certificate was signed by our CA, and isn't expired
|
||||
if _, err := X509Cert.Verify(opts); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// Now that we know this certificate is valid, create a TLS Certificate for our
|
||||
// credentials
|
||||
keyPair, err := tls.X509KeyPair(cert, key)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// Load the Certificates as server credentials
|
||||
serverTLSCreds, err := rootCA.NewServerTLSCredentials(&keyPair)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// Load the Certificates also as client credentials.
|
||||
// Both workers and managers always connect to remote managers,
|
||||
// so ServerName is always set to ManagerRole here.
|
||||
clientTLSCreds, err := rootCA.NewClientTLSCredentials(&keyPair, ManagerRole)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return clientTLSCreds, serverTLSCreds, nil
|
||||
}
|
||||
|
||||
// NewServerTLSConfig returns a tls.Config configured for a TLS Server, given a tls.Certificate
|
||||
// and the PEM-encoded root CA Certificate
|
||||
func NewServerTLSConfig(cert *tls.Certificate, rootCAPool *x509.CertPool) (*tls.Config, error) {
|
||||
|
@ -554,8 +557,8 @@ func NewClientTLSConfig(cert *tls.Certificate, rootCAPool *x509.CertPool, server
|
|||
|
||||
// NewClientTLSCredentials returns GRPC credentials for a TLS GRPC client, given a tls.Certificate
|
||||
// a PEM-Encoded root CA Certificate, and the name of the remote server the client wants to connect to.
|
||||
func (rca *RootCA) NewClientTLSCredentials(cert *tls.Certificate, serverName string) (*MutableTLSCreds, error) {
|
||||
tlsConfig, err := NewClientTLSConfig(cert, rca.Pool, serverName)
|
||||
func (rootCA *RootCA) NewClientTLSCredentials(cert *tls.Certificate, serverName string) (*MutableTLSCreds, error) {
|
||||
tlsConfig, err := NewClientTLSConfig(cert, rootCA.Pool, serverName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -567,8 +570,8 @@ func (rca *RootCA) NewClientTLSCredentials(cert *tls.Certificate, serverName str
|
|||
|
||||
// NewServerTLSCredentials returns GRPC credentials for a TLS GRPC client, given a tls.Certificate
|
||||
// a PEM-Encoded root CA Certificate, and the name of the remote server the client wants to connect to.
|
||||
func (rca *RootCA) NewServerTLSCredentials(cert *tls.Certificate) (*MutableTLSCreds, error) {
|
||||
tlsConfig, err := NewServerTLSConfig(cert, rca.Pool)
|
||||
func (rootCA *RootCA) NewServerTLSCredentials(cert *tls.Certificate) (*MutableTLSCreds, error) {
|
||||
tlsConfig, err := NewServerTLSConfig(cert, rootCA.Pool)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -12,6 +12,8 @@ import (
|
|||
"github.com/cloudflare/cfssl/api"
|
||||
"github.com/cloudflare/cfssl/signer"
|
||||
"github.com/pkg/errors"
|
||||
"golang.org/x/net/context"
|
||||
"golang.org/x/net/context/ctxhttp"
|
||||
)
|
||||
|
||||
// ErrNoExternalCAURLs is an error used it indicate that an ExternalCA is
|
||||
|
@ -65,7 +67,7 @@ func (eca *ExternalCA) UpdateURLs(urls ...string) {
|
|||
|
||||
// Sign signs a new certificate by proxying the given certificate signing
|
||||
// request to an external CFSSL API server.
|
||||
func (eca *ExternalCA) Sign(req signer.SignRequest) (cert []byte, err error) {
|
||||
func (eca *ExternalCA) Sign(ctx context.Context, req signer.SignRequest) (cert []byte, err error) {
|
||||
// Get the current HTTP client and list of URLs in a small critical
|
||||
// section. We will use these to make certificate signing requests.
|
||||
eca.mu.Lock()
|
||||
|
@ -85,7 +87,7 @@ func (eca *ExternalCA) Sign(req signer.SignRequest) (cert []byte, err error) {
|
|||
// Try each configured proxy URL. Return after the first success. If
|
||||
// all fail then the last error will be returned.
|
||||
for _, url := range urls {
|
||||
cert, err = makeExternalSignRequest(client, url, csrJSON)
|
||||
cert, err = makeExternalSignRequest(ctx, client, url, csrJSON)
|
||||
if err == nil {
|
||||
return eca.rootCA.AppendFirstRootPEM(cert)
|
||||
}
|
||||
|
@ -96,14 +98,31 @@ func (eca *ExternalCA) Sign(req signer.SignRequest) (cert []byte, err error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
func makeExternalSignRequest(client *http.Client, url string, csrJSON []byte) (cert []byte, err error) {
|
||||
resp, err := client.Post(url, "application/json", bytes.NewReader(csrJSON))
|
||||
func makeExternalSignRequest(ctx context.Context, client *http.Client, url string, csrJSON []byte) (cert []byte, err error) {
|
||||
resp, err := ctxhttp.Post(ctx, client, url, "application/json", bytes.NewReader(csrJSON))
|
||||
if err != nil {
|
||||
return nil, recoverableErr{err: errors.Wrap(err, "unable to perform certificate signing request")}
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
doneReading := make(chan struct{})
|
||||
bodyClosed := make(chan struct{})
|
||||
go func() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-doneReading:
|
||||
}
|
||||
resp.Body.Close()
|
||||
close(bodyClosed)
|
||||
}()
|
||||
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
close(doneReading)
|
||||
<-bodyClosed
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
default:
|
||||
}
|
||||
if err != nil {
|
||||
return nil, recoverableErr{err: errors.Wrap(err, "unable to read CSR response body")}
|
||||
}
|
||||
|
|
|
@ -617,7 +617,7 @@ func (s *Server) signNodeCert(ctx context.Context, node *api.Node) error {
|
|||
)
|
||||
|
||||
// Try using the external CA first.
|
||||
cert, err := externalCA.Sign(PrepareCSR(rawCSR, cn, ou, org))
|
||||
cert, err := externalCA.Sign(ctx, PrepareCSR(rawCSR, cn, ou, org))
|
||||
if err == ErrNoExternalCAURLs {
|
||||
// No external CA servers configured. Try using the local CA.
|
||||
cert, err = rootCA.ParseValidateAndSignCSR(rawCSR, cn, ou, org)
|
||||
|
|
|
@ -2,6 +2,7 @@ package logbroker
|
|||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
|
@ -24,6 +25,12 @@ var (
|
|||
errNotRunning = errors.New("broker is not running")
|
||||
)
|
||||
|
||||
type logMessage struct {
|
||||
*api.PublishLogsMessage
|
||||
completed bool
|
||||
err error
|
||||
}
|
||||
|
||||
// LogBroker coordinates log subscriptions to services and tasks. Clients can
|
||||
// publish and subscribe to logs channels.
|
||||
//
|
||||
|
@ -35,6 +42,7 @@ type LogBroker struct {
|
|||
subscriptionQueue *watch.Queue
|
||||
|
||||
registeredSubscriptions map[string]*subscription
|
||||
connectedNodes map[string]struct{}
|
||||
|
||||
pctx context.Context
|
||||
cancelAll context.CancelFunc
|
||||
|
@ -62,6 +70,7 @@ func (lb *LogBroker) Run(ctx context.Context) error {
|
|||
lb.logQueue = watch.NewQueue()
|
||||
lb.subscriptionQueue = watch.NewQueue()
|
||||
lb.registeredSubscriptions = make(map[string]*subscription)
|
||||
lb.connectedNodes = make(map[string]struct{})
|
||||
lb.mu.Unlock()
|
||||
|
||||
select {
|
||||
|
@ -112,12 +121,30 @@ func (lb *LogBroker) newSubscription(selector *api.LogSelector, options *api.Log
|
|||
return subscription
|
||||
}
|
||||
|
||||
func (lb *LogBroker) getSubscription(id string) *subscription {
|
||||
lb.mu.RLock()
|
||||
defer lb.mu.RUnlock()
|
||||
|
||||
subscription, ok := lb.registeredSubscriptions[id]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
return subscription
|
||||
}
|
||||
|
||||
func (lb *LogBroker) registerSubscription(subscription *subscription) {
|
||||
lb.mu.Lock()
|
||||
defer lb.mu.Unlock()
|
||||
|
||||
lb.registeredSubscriptions[subscription.message.ID] = subscription
|
||||
lb.subscriptionQueue.Publish(subscription)
|
||||
|
||||
// Mark nodes that won't receive the message as done.
|
||||
for _, node := range subscription.Nodes() {
|
||||
if _, ok := lb.connectedNodes[node]; !ok {
|
||||
subscription.Done(node, fmt.Errorf("node %s is not available", node))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (lb *LogBroker) unregisterSubscription(subscription *subscription) {
|
||||
|
@ -160,7 +187,7 @@ func (lb *LogBroker) subscribe(id string) (chan events.Event, func()) {
|
|||
defer lb.mu.RUnlock()
|
||||
|
||||
return lb.logQueue.CallbackWatch(events.MatcherFunc(func(event events.Event) bool {
|
||||
publish := event.(*api.PublishLogsMessage)
|
||||
publish := event.(*logMessage)
|
||||
return publish.SubscriptionID == id
|
||||
}))
|
||||
}
|
||||
|
@ -169,7 +196,7 @@ func (lb *LogBroker) publish(log *api.PublishLogsMessage) {
|
|||
lb.mu.RLock()
|
||||
defer lb.mu.RUnlock()
|
||||
|
||||
lb.logQueue.Publish(log)
|
||||
lb.logQueue.Publish(&logMessage{PublishLogsMessage: log})
|
||||
}
|
||||
|
||||
// SubscribeLogs creates a log subscription and streams back logs
|
||||
|
@ -190,7 +217,6 @@ func (lb *LogBroker) SubscribeLogs(request *api.SubscribeLogsRequest, stream api
|
|||
"subscription.id": subscription.message.ID,
|
||||
},
|
||||
)
|
||||
|
||||
log.Debug("subscribed")
|
||||
|
||||
publishCh, publishCancel := lb.subscribe(subscription.message.ID)
|
||||
|
@ -199,23 +225,50 @@ func (lb *LogBroker) SubscribeLogs(request *api.SubscribeLogsRequest, stream api
|
|||
lb.registerSubscription(subscription)
|
||||
defer lb.unregisterSubscription(subscription)
|
||||
|
||||
completed := subscription.Wait(ctx)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-lb.pctx.Done():
|
||||
return lb.pctx.Err()
|
||||
case event := <-publishCh:
|
||||
publish := event.(*api.PublishLogsMessage)
|
||||
publish := event.(*logMessage)
|
||||
if publish.completed {
|
||||
return publish.err
|
||||
}
|
||||
if err := stream.Send(&api.SubscribeLogsMessage{
|
||||
Messages: publish.Messages,
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-lb.pctx.Done():
|
||||
return nil
|
||||
case <-completed:
|
||||
completed = nil
|
||||
lb.logQueue.Publish(&logMessage{
|
||||
PublishLogsMessage: &api.PublishLogsMessage{
|
||||
SubscriptionID: subscription.message.ID,
|
||||
},
|
||||
completed: true,
|
||||
err: subscription.Err(),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (lb *LogBroker) nodeConnected(nodeID string) {
|
||||
lb.mu.Lock()
|
||||
defer lb.mu.Unlock()
|
||||
|
||||
lb.connectedNodes[nodeID] = struct{}{}
|
||||
}
|
||||
|
||||
func (lb *LogBroker) nodeDisconnected(nodeID string) {
|
||||
lb.mu.Lock()
|
||||
defer lb.mu.Unlock()
|
||||
|
||||
delete(lb.connectedNodes, nodeID)
|
||||
}
|
||||
|
||||
// ListenSubscriptions returns a stream of matching subscriptions for the current node
|
||||
func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest, stream api.LogBroker_ListenSubscriptionsServer) error {
|
||||
remote, err := ca.RemoteNode(stream.Context())
|
||||
|
@ -223,6 +276,9 @@ func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest
|
|||
return err
|
||||
}
|
||||
|
||||
lb.nodeConnected(remote.NodeID)
|
||||
defer lb.nodeDisconnected(remote.NodeID)
|
||||
|
||||
log := log.G(stream.Context()).WithFields(
|
||||
logrus.Fields{
|
||||
"method": "(*LogBroker).ListenSubscriptions",
|
||||
|
@ -234,7 +290,13 @@ func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest
|
|||
|
||||
log.Debug("node registered")
|
||||
|
||||
activeSubscriptions := make(map[string]struct{})
|
||||
activeSubscriptions := make(map[string]*subscription)
|
||||
defer func() {
|
||||
// If the worker quits, mark all active subscriptions as finished.
|
||||
for _, subscription := range activeSubscriptions {
|
||||
subscription.Done(remote.NodeID, fmt.Errorf("node %s disconnected unexpectedly", remote.NodeID))
|
||||
}
|
||||
}()
|
||||
|
||||
// Start by sending down all active subscriptions.
|
||||
for _, subscription := range subscriptions {
|
||||
|
@ -250,7 +312,7 @@ func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest
|
|||
log.Error(err)
|
||||
return err
|
||||
}
|
||||
activeSubscriptions[subscription.message.ID] = struct{}{}
|
||||
activeSubscriptions[subscription.message.ID] = subscription
|
||||
}
|
||||
|
||||
// Send down new subscriptions.
|
||||
|
@ -261,12 +323,13 @@ func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest
|
|||
|
||||
if subscription.message.Close {
|
||||
log.WithField("subscription.id", subscription.message.ID).Debug("subscription closed")
|
||||
delete(activeSubscriptions, subscription.message.ID)
|
||||
} else {
|
||||
// Avoid sending down the same subscription multiple times
|
||||
if _, ok := activeSubscriptions[subscription.message.ID]; ok {
|
||||
continue
|
||||
}
|
||||
activeSubscriptions[subscription.message.ID] = struct{}{}
|
||||
activeSubscriptions[subscription.message.ID] = subscription
|
||||
log.WithField("subscription.id", subscription.message.ID).Debug("subscription added")
|
||||
}
|
||||
if err := stream.Send(subscription.message); err != nil {
|
||||
|
@ -282,12 +345,19 @@ func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest
|
|||
}
|
||||
|
||||
// PublishLogs publishes log messages for a given subscription
|
||||
func (lb *LogBroker) PublishLogs(stream api.LogBroker_PublishLogsServer) error {
|
||||
func (lb *LogBroker) PublishLogs(stream api.LogBroker_PublishLogsServer) (err error) {
|
||||
remote, err := ca.RemoteNode(stream.Context())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var currentSubscription *subscription
|
||||
defer func() {
|
||||
if currentSubscription != nil {
|
||||
currentSubscription.Done(remote.NodeID, err)
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
log, err := stream.Recv()
|
||||
if err == io.EOF {
|
||||
|
@ -301,6 +371,17 @@ func (lb *LogBroker) PublishLogs(stream api.LogBroker_PublishLogsServer) error {
|
|||
return grpc.Errorf(codes.InvalidArgument, "missing subscription ID")
|
||||
}
|
||||
|
||||
if currentSubscription == nil {
|
||||
currentSubscription = lb.getSubscription(log.SubscriptionID)
|
||||
if currentSubscription == nil {
|
||||
return grpc.Errorf(codes.NotFound, "unknown subscription ID")
|
||||
}
|
||||
} else {
|
||||
if log.SubscriptionID != currentSubscription.message.ID {
|
||||
return grpc.Errorf(codes.InvalidArgument, "different subscription IDs in the same session")
|
||||
}
|
||||
}
|
||||
|
||||
// Make sure logs are emitted using the right Node ID to avoid impersonation.
|
||||
for _, msg := range log.Messages {
|
||||
if msg.Context.NodeID != remote.NodeID {
|
||||
|
|
130
vendor/github.com/docker/swarmkit/manager/logbroker/subscription.go
сгенерированный
поставляемый
130
vendor/github.com/docker/swarmkit/manager/logbroker/subscription.go
сгенерированный
поставляемый
|
@ -2,6 +2,8 @@ package logbroker
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
events "github.com/docker/go-events"
|
||||
|
@ -14,6 +16,7 @@ import (
|
|||
|
||||
type subscription struct {
|
||||
mu sync.RWMutex
|
||||
wg sync.WaitGroup
|
||||
|
||||
store *store.MemoryStore
|
||||
message *api.SubscriptionMessage
|
||||
|
@ -22,18 +25,25 @@ type subscription struct {
|
|||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
|
||||
nodes map[string]struct{}
|
||||
errors []error
|
||||
nodes map[string]struct{}
|
||||
pendingTasks map[string]struct{}
|
||||
}
|
||||
|
||||
func newSubscription(store *store.MemoryStore, message *api.SubscriptionMessage, changed *watch.Queue) *subscription {
|
||||
return &subscription{
|
||||
store: store,
|
||||
message: message,
|
||||
changed: changed,
|
||||
nodes: make(map[string]struct{}),
|
||||
store: store,
|
||||
message: message,
|
||||
changed: changed,
|
||||
nodes: make(map[string]struct{}),
|
||||
pendingTasks: make(map[string]struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *subscription) follow() bool {
|
||||
return s.message.Options != nil && s.message.Options.Follow
|
||||
}
|
||||
|
||||
func (s *subscription) Contains(nodeID string) bool {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
@ -42,15 +52,28 @@ func (s *subscription) Contains(nodeID string) bool {
|
|||
return ok
|
||||
}
|
||||
|
||||
func (s *subscription) Nodes() []string {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
nodes := make([]string, 0, len(s.nodes))
|
||||
for node := range s.nodes {
|
||||
nodes = append(nodes, node)
|
||||
}
|
||||
return nodes
|
||||
}
|
||||
|
||||
func (s *subscription) Run(ctx context.Context) {
|
||||
s.ctx, s.cancel = context.WithCancel(ctx)
|
||||
|
||||
wq := s.store.WatchQueue()
|
||||
ch, cancel := state.Watch(wq, state.EventCreateTask{}, state.EventUpdateTask{})
|
||||
go func() {
|
||||
defer cancel()
|
||||
s.watch(ch)
|
||||
}()
|
||||
if s.follow() {
|
||||
wq := s.store.WatchQueue()
|
||||
ch, cancel := state.Watch(wq, state.EventCreateTask{}, state.EventUpdateTask{})
|
||||
go func() {
|
||||
defer cancel()
|
||||
s.watch(ch)
|
||||
}()
|
||||
}
|
||||
|
||||
s.match()
|
||||
}
|
||||
|
@ -61,10 +84,74 @@ func (s *subscription) Stop() {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *subscription) Wait(ctx context.Context) <-chan struct{} {
|
||||
// Follow subscriptions never end
|
||||
if s.follow() {
|
||||
return nil
|
||||
}
|
||||
|
||||
ch := make(chan struct{})
|
||||
go func() {
|
||||
defer close(ch)
|
||||
s.wg.Wait()
|
||||
}()
|
||||
return ch
|
||||
}
|
||||
|
||||
func (s *subscription) Done(nodeID string, err error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if err != nil {
|
||||
s.errors = append(s.errors, err)
|
||||
}
|
||||
|
||||
if s.follow() {
|
||||
return
|
||||
}
|
||||
|
||||
if _, ok := s.nodes[nodeID]; !ok {
|
||||
return
|
||||
}
|
||||
|
||||
delete(s.nodes, nodeID)
|
||||
s.wg.Done()
|
||||
}
|
||||
|
||||
func (s *subscription) Err() error {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
if len(s.errors) == 0 && len(s.pendingTasks) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
messages := make([]string, 0, len(s.errors))
|
||||
for _, err := range s.errors {
|
||||
messages = append(messages, err.Error())
|
||||
}
|
||||
for t := range s.pendingTasks {
|
||||
messages = append(messages, fmt.Sprintf("task %s has not been scheduled", t))
|
||||
}
|
||||
|
||||
return fmt.Errorf("warning: incomplete log stream. some logs could not be retrieved for the following reasons: %s", strings.Join(messages, ", "))
|
||||
}
|
||||
|
||||
func (s *subscription) match() {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
add := func(t *api.Task) {
|
||||
if t.NodeID == "" {
|
||||
s.pendingTasks[t.ID] = struct{}{}
|
||||
return
|
||||
}
|
||||
if _, ok := s.nodes[t.NodeID]; !ok {
|
||||
s.nodes[t.NodeID] = struct{}{}
|
||||
s.wg.Add(1)
|
||||
}
|
||||
}
|
||||
|
||||
s.store.View(func(tx store.ReadTx) {
|
||||
for _, nid := range s.message.Selector.NodeIDs {
|
||||
s.nodes[nid] = struct{}{}
|
||||
|
@ -72,7 +159,7 @@ func (s *subscription) match() {
|
|||
|
||||
for _, tid := range s.message.Selector.TaskIDs {
|
||||
if task := store.GetTask(tx, tid); task != nil {
|
||||
s.nodes[task.NodeID] = struct{}{}
|
||||
add(task)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -83,7 +170,7 @@ func (s *subscription) match() {
|
|||
continue
|
||||
}
|
||||
for _, task := range tasks {
|
||||
s.nodes[task.NodeID] = struct{}{}
|
||||
add(task)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
@ -100,12 +187,19 @@ func (s *subscription) watch(ch <-chan events.Event) error {
|
|||
matchServices[sid] = struct{}{}
|
||||
}
|
||||
|
||||
add := func(nodeID string) {
|
||||
add := func(t *api.Task) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if _, ok := s.nodes[nodeID]; !ok {
|
||||
s.nodes[nodeID] = struct{}{}
|
||||
// Un-allocated task.
|
||||
if t.NodeID == "" {
|
||||
s.pendingTasks[t.ID] = struct{}{}
|
||||
return
|
||||
}
|
||||
|
||||
delete(s.pendingTasks, t.ID)
|
||||
if _, ok := s.nodes[t.NodeID]; !ok {
|
||||
s.nodes[t.NodeID] = struct{}{}
|
||||
s.changed.Publish(s)
|
||||
}
|
||||
}
|
||||
|
@ -129,10 +223,10 @@ func (s *subscription) watch(ch <-chan events.Event) error {
|
|||
}
|
||||
|
||||
if _, ok := matchTasks[t.ID]; ok {
|
||||
add(t.NodeID)
|
||||
add(t)
|
||||
}
|
||||
if _, ok := matchServices[t.ServiceID]; ok {
|
||||
add(t.NodeID)
|
||||
add(t)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -385,14 +385,10 @@ func (m *Manager) Run(parent context.Context) error {
|
|||
|
||||
close(m.started)
|
||||
|
||||
watchDone := make(chan struct{})
|
||||
watchCtx, watchCtxCancel := context.WithCancel(parent)
|
||||
go func() {
|
||||
err := m.raftNode.Run(ctx)
|
||||
watchCtxCancel()
|
||||
<-watchDone
|
||||
if err != nil {
|
||||
log.G(ctx).Error(err)
|
||||
log.G(ctx).WithError(err).Error("raft node stopped")
|
||||
m.Stop(ctx)
|
||||
}
|
||||
}()
|
||||
|
@ -407,7 +403,7 @@ func (m *Manager) Run(parent context.Context) error {
|
|||
}
|
||||
raftConfig := c.Spec.Raft
|
||||
|
||||
if err := m.watchForKEKChanges(watchCtx, watchDone); err != nil {
|
||||
if err := m.watchForKEKChanges(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -544,8 +540,7 @@ func (m *Manager) updateKEK(ctx context.Context, cluster *api.Cluster) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (m *Manager) watchForKEKChanges(ctx context.Context, watchDone chan struct{}) error {
|
||||
defer close(watchDone)
|
||||
func (m *Manager) watchForKEKChanges(ctx context.Context) error {
|
||||
clusterID := m.config.SecurityConfig.ClientTLSCreds.Organization()
|
||||
clusterWatch, clusterWatchCancel, err := store.ViewAndWatch(m.raftNode.MemoryStore(),
|
||||
func(tx store.ReadTx) error {
|
||||
|
|
11
vendor/github.com/docker/swarmkit/manager/orchestrator/global/global.go
сгенерированный
поставляемый
11
vendor/github.com/docker/swarmkit/manager/orchestrator/global/global.go
сгенерированный
поставляемый
|
@ -240,6 +240,8 @@ func (g *Orchestrator) reconcileServices(ctx context.Context, serviceIDs []strin
|
|||
}
|
||||
})
|
||||
|
||||
updates := make(map[*api.Service][]orchestrator.Slot)
|
||||
|
||||
_, err := g.store.Batch(func(batch *store.Batch) error {
|
||||
var updateTasks []orchestrator.Slot
|
||||
for _, serviceID := range serviceIDs {
|
||||
|
@ -274,8 +276,9 @@ func (g *Orchestrator) reconcileServices(ctx context.Context, serviceIDs []strin
|
|||
updateTasks = append(updateTasks, ntasks)
|
||||
}
|
||||
}
|
||||
|
||||
if len(updateTasks) > 0 {
|
||||
g.updater.Update(ctx, g.cluster, service.Service, updateTasks)
|
||||
updates[service.Service] = updateTasks
|
||||
}
|
||||
|
||||
// Remove any tasks assigned to nodes not found in g.nodes.
|
||||
|
@ -287,9 +290,15 @@ func (g *Orchestrator) reconcileServices(ctx context.Context, serviceIDs []strin
|
|||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
log.G(ctx).WithError(err).Errorf("global orchestrator: reconcileServices transaction failed")
|
||||
}
|
||||
|
||||
for service, updateTasks := range updates {
|
||||
g.updater.Update(ctx, g.cluster, service, updateTasks)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// updateNode updates g.nodes based on the current node value
|
||||
|
|
|
@ -100,26 +100,24 @@ type Config struct {
|
|||
// cluster. Node handles workloads and may also run as a manager.
|
||||
type Node struct {
|
||||
sync.RWMutex
|
||||
config *Config
|
||||
remotes *persistentRemotes
|
||||
role string
|
||||
roleCond *sync.Cond
|
||||
conn *grpc.ClientConn
|
||||
connCond *sync.Cond
|
||||
nodeID string
|
||||
nodeMembership api.NodeSpec_Membership
|
||||
started chan struct{}
|
||||
startOnce sync.Once
|
||||
stopped chan struct{}
|
||||
stopOnce sync.Once
|
||||
ready chan struct{} // closed when agent has completed registration and manager(if enabled) is ready to receive control requests
|
||||
certificateRequested chan struct{} // closed when certificate issue request has been sent by node
|
||||
closed chan struct{}
|
||||
err error
|
||||
agent *agent.Agent
|
||||
manager *manager.Manager
|
||||
notifyNodeChange chan *api.Node // used to send role updates from the dispatcher api on promotion/demotion
|
||||
unlockKey []byte
|
||||
config *Config
|
||||
remotes *persistentRemotes
|
||||
role string
|
||||
roleCond *sync.Cond
|
||||
conn *grpc.ClientConn
|
||||
connCond *sync.Cond
|
||||
nodeID string
|
||||
started chan struct{}
|
||||
startOnce sync.Once
|
||||
stopped chan struct{}
|
||||
stopOnce sync.Once
|
||||
ready chan struct{} // closed when agent has completed registration and manager(if enabled) is ready to receive control requests
|
||||
closed chan struct{}
|
||||
err error
|
||||
agent *agent.Agent
|
||||
manager *manager.Manager
|
||||
notifyNodeChange chan *api.Node // used to send role updates from the dispatcher api on promotion/demotion
|
||||
unlockKey []byte
|
||||
}
|
||||
|
||||
// RemoteAPIAddr returns address on which remote manager api listens.
|
||||
|
@ -155,16 +153,15 @@ func New(c *Config) (*Node, error) {
|
|||
}
|
||||
|
||||
n := &Node{
|
||||
remotes: newPersistentRemotes(stateFile, p...),
|
||||
role: ca.WorkerRole,
|
||||
config: c,
|
||||
started: make(chan struct{}),
|
||||
stopped: make(chan struct{}),
|
||||
closed: make(chan struct{}),
|
||||
ready: make(chan struct{}),
|
||||
certificateRequested: make(chan struct{}),
|
||||
notifyNodeChange: make(chan *api.Node, 1),
|
||||
unlockKey: c.UnlockKey,
|
||||
remotes: newPersistentRemotes(stateFile, p...),
|
||||
role: ca.WorkerRole,
|
||||
config: c,
|
||||
started: make(chan struct{}),
|
||||
stopped: make(chan struct{}),
|
||||
closed: make(chan struct{}),
|
||||
ready: make(chan struct{}),
|
||||
notifyNodeChange: make(chan *api.Node, 1),
|
||||
unlockKey: c.UnlockKey,
|
||||
}
|
||||
|
||||
if n.config.JoinAddr != "" || n.config.ForceNewCluster {
|
||||
|
@ -403,13 +400,6 @@ func (n *Node) Ready() <-chan struct{} {
|
|||
return n.ready
|
||||
}
|
||||
|
||||
// CertificateRequested returns a channel that is closed after node has
|
||||
// requested a certificate. After this call a caller can expect calls to
|
||||
// NodeID() and `NodeMembership()` to succeed.
|
||||
func (n *Node) CertificateRequested() <-chan struct{} {
|
||||
return n.certificateRequested
|
||||
}
|
||||
|
||||
func (n *Node) setControlSocket(conn *grpc.ClientConn) {
|
||||
n.Lock()
|
||||
if n.conn != nil {
|
||||
|
@ -461,13 +451,6 @@ func (n *Node) NodeID() string {
|
|||
return n.nodeID
|
||||
}
|
||||
|
||||
// NodeMembership returns current node's membership. May be empty if not set.
|
||||
func (n *Node) NodeMembership() api.NodeSpec_Membership {
|
||||
n.RLock()
|
||||
defer n.RUnlock()
|
||||
return n.nodeMembership
|
||||
}
|
||||
|
||||
// Manager returns manager instance started by node. May be nil.
|
||||
func (n *Node) Manager() *manager.Manager {
|
||||
n.RLock()
|
||||
|
@ -507,18 +490,14 @@ func (n *Node) loadSecurityConfig(ctx context.Context) (*ca.SecurityConfig, erro
|
|||
return nil, err
|
||||
}
|
||||
if err == nil {
|
||||
clientTLSCreds, serverTLSCreds, err := ca.LoadTLSCreds(rootCA, krw)
|
||||
_, ok := errors.Cause(err).(ca.ErrInvalidKEK)
|
||||
switch {
|
||||
case err == nil:
|
||||
securityConfig = ca.NewSecurityConfig(&rootCA, krw, clientTLSCreds, serverTLSCreds)
|
||||
log.G(ctx).Debug("loaded CA and TLS certificates")
|
||||
case ok:
|
||||
return nil, ErrInvalidUnlockKey
|
||||
case os.IsNotExist(err):
|
||||
break
|
||||
default:
|
||||
return nil, errors.Wrapf(err, "error while loading TLS certificate in %s", paths.Node.Cert)
|
||||
securityConfig, err = ca.LoadSecurityConfig(ctx, rootCA, krw)
|
||||
if err != nil {
|
||||
_, isInvalidKEK := errors.Cause(err).(ca.ErrInvalidKEK)
|
||||
if isInvalidKEK {
|
||||
return nil, ErrInvalidUnlockKey
|
||||
} else if !os.IsNotExist(err) {
|
||||
return nil, errors.Wrapf(err, "error while loading TLS certificate in %s", paths.Node.Cert)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -544,44 +523,36 @@ func (n *Node) loadSecurityConfig(ctx context.Context) (*ca.SecurityConfig, erro
|
|||
}
|
||||
|
||||
// Obtain new certs and setup TLS certificates renewal for this node:
|
||||
// - We call LoadOrCreateSecurityConfig which blocks until a valid certificate has been issued
|
||||
// - We retrieve the nodeID from LoadOrCreateSecurityConfig through the info channel. This allows
|
||||
// us to display the ID before the certificate gets issued (for potential approval).
|
||||
// - We wait for LoadOrCreateSecurityConfig to finish since we need a certificate to operate.
|
||||
// - Given a valid certificate, spin a renewal go-routine that will ensure that certificates stay
|
||||
// up to date.
|
||||
issueResponseChan := make(chan api.IssueNodeCertificateResponse, 1)
|
||||
go func() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case resp := <-issueResponseChan:
|
||||
log.G(log.WithModule(ctx, "tls")).WithFields(logrus.Fields{
|
||||
"node.id": resp.NodeID,
|
||||
}).Debugf("loaded TLS certificate")
|
||||
n.Lock()
|
||||
n.nodeID = resp.NodeID
|
||||
n.nodeMembership = resp.NodeMembership
|
||||
n.Unlock()
|
||||
close(n.certificateRequested)
|
||||
}
|
||||
}()
|
||||
// - If certificates weren't present on disk, we call CreateSecurityConfig, which blocks
|
||||
// until a valid certificate has been issued.
|
||||
// - We wait for CreateSecurityConfig to finish since we need a certificate to operate.
|
||||
|
||||
// LoadOrCreateSecurityConfig is the point at which a new node joining a cluster will retrieve TLS
|
||||
// certificates and write them to disk
|
||||
securityConfig, err = ca.LoadOrCreateSecurityConfig(
|
||||
ctx, rootCA, n.config.JoinToken, ca.ManagerRole, n.remotes, issueResponseChan, krw)
|
||||
if err != nil {
|
||||
// Attempt to load certificate from disk
|
||||
securityConfig, err = ca.LoadSecurityConfig(ctx, rootCA, krw)
|
||||
if err == nil {
|
||||
log.G(ctx).WithFields(logrus.Fields{
|
||||
"node.id": securityConfig.ClientTLSCreds.NodeID(),
|
||||
}).Debugf("loaded TLS certificate")
|
||||
} else {
|
||||
if _, ok := errors.Cause(err).(ca.ErrInvalidKEK); ok {
|
||||
return nil, ErrInvalidUnlockKey
|
||||
}
|
||||
return nil, err
|
||||
log.G(ctx).WithError(err).Debugf("no node credentials found in: %s", krw.Target())
|
||||
|
||||
securityConfig, err = rootCA.CreateSecurityConfig(ctx, krw, ca.CertificateRequestConfig{
|
||||
Token: n.config.JoinToken,
|
||||
Remotes: n.remotes,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
n.Lock()
|
||||
n.role = securityConfig.ClientTLSCreds.Role()
|
||||
n.nodeID = securityConfig.ClientTLSCreds.NodeID()
|
||||
n.nodeMembership = api.NodeMembershipAccepted
|
||||
n.roleCond.Broadcast()
|
||||
n.Unlock()
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче