pull container builders into a separate package

This commit is contained in:
Matthew Fisher 2018-04-16 12:20:12 -07:00
Родитель 57c8714abd
Коммит 022e2200ca
3 изменённых файлов: 254 добавлений и 230 удалений

Просмотреть файл

@ -8,6 +8,7 @@ import (
"path/filepath"
"github.com/Azure/draft/pkg/builder"
"github.com/Azure/draft/pkg/builder/docker"
"github.com/Azure/draft/pkg/cmdline"
"github.com/Azure/draft/pkg/draft/draftpath"
"github.com/Azure/draft/pkg/storage/kube/configmap"
@ -137,10 +138,9 @@ func (u *upCmd) run(environment string) (err error) {
buildctx *builder.Context
kubeConfig *rest.Config
ctx = context.Background()
bldr = &builder.Builder{
LogsDir: u.home.Logs(),
}
bldr = builder.New()
)
bldr.LogsDir = u.home.Logs()
if buildctx, err = builder.LoadWithEnv(u.src, environment); err != nil {
return fmt.Errorf("failed loading build context with env %q: %v", environment, err)
}
@ -163,7 +163,9 @@ func (u *upCmd) run(environment string) (err error) {
if err := cli.Initialize(u.dockerClientOptions); err != nil {
return fmt.Errorf("failed to create docker client: %v", err)
}
bldr.DockerClient = cli
bldr.ContainerBuilder = &docker.Builder{
DockerClient: cli,
}
// setup kube
bldr.Kube, kubeConfig, err = getKubeClient(kubeContext)
@ -191,14 +193,14 @@ func (u *upCmd) run(environment string) (err error) {
// setup the storage engine
bldr.Storage = configmap.NewConfigMaps(bldr.Kube.CoreV1().ConfigMaps(tillerNamespace))
progressC := bldr.Up(ctx, buildctx)
buildID := bldr.ID()
cmdline.Display(ctx, buildctx.Env.Name, progressC, cmdline.WithBuildID(buildID))
cmdline.Display(ctx, buildctx.Env.Name, progressC, cmdline.WithBuildID(bldr.ID))
if buildctx.Env.AutoConnect || autoConnect {
c := newConnectCmd(u.out)
return c.RunE(c, []string{})
}
if err := runPostDeployTasks(taskList, buildID); err != nil {
if err := runPostDeployTasks(taskList, bldr.ID); err != nil {
debug(err.Error())
return nil
}

Просмотреть файл

@ -12,7 +12,6 @@ import (
"path/filepath"
"strings"
"sync"
"time"
"github.com/Azure/draft/pkg/draft/manifest"
"github.com/Azure/draft/pkg/draft/pack"
@ -21,13 +20,9 @@ import (
"github.com/Azure/draft/pkg/storage"
"github.com/docker/cli/cli/command"
"github.com/docker/cli/cli/command/image/build"
"github.com/docker/docker/api/types"
"github.com/docker/docker/builder/dockerignore"
dockerclient "github.com/docker/docker/client"
"github.com/docker/docker/pkg/archive"
"github.com/docker/docker/pkg/fileutils"
"github.com/docker/docker/pkg/jsonmessage"
"github.com/docker/docker/pkg/term"
"github.com/sirupsen/logrus"
"golang.org/x/net/context"
"k8s.io/api/core/v1"
@ -50,26 +45,26 @@ const (
// Builder contains information about the build environment
type Builder struct {
DockerClient command.Cli
Helm helm.Interface
Kube k8s.Interface
Storage storage.Store
LogsDir string
id string
ID string
ContainerBuilder ContainerBuilder
DockerClient command.Cli
Helm helm.Interface
Kube k8s.Interface
Storage storage.Store
LogsDir string
}
// ContainerBuilder defines how a container is built and pushed to a container registry using the supplied app context.
type ContainerBuilder interface {
Build(ctx context.Context, app *AppContext, out chan<- *Summary) error
Push(ctx context.Context, app *AppContext, out chan<- *Summary) error
}
// Logs returns the path to the build logs.
//
// Set after Up is called (otherwise "").
func (b *Builder) Logs(appName string) string {
return filepath.Join(b.LogsDir, appName, b.id)
}
// ID returns the build id.
//
// Set after Up is called (otherwise "").
func (b *Builder) ID() string {
return b.id
return filepath.Join(b.LogsDir, appName, b.ID)
}
// Context contains information about the application
@ -85,15 +80,21 @@ type Context struct {
// AppContext contains state information carried across the various draft stage boundaries.
type AppContext struct {
obj *storage.Object
bldr *Builder
ctx *Context
buf *bytes.Buffer
tags []string
img string
log io.WriteCloser
id string
vals chartutil.Values
Obj *storage.Object
Bldr *Builder
Ctx *Context
Buf *bytes.Buffer
Tags []string
Img string
Log io.WriteCloser
ID string
Vals chartutil.Values
}
func New() *Builder {
return &Builder{
ID: getulid(),
}
}
// newAppContext prepares state carried across the various draft stage boundaries.
@ -122,7 +123,7 @@ func newAppContext(b *Builder, buildCtx *Context) (*AppContext, error) {
// inject certain values into the chart such as the registry location,
// the application name, buildID and the application version.
tplstr := "image.repository=%s,image.tag=%s,%s=%s,%s=%s"
inject := fmt.Sprintf(tplstr, imageRepository, imgtag, local.DraftLabelKey, buildCtx.Env.Name, local.BuildIDKey, b.ID())
inject := fmt.Sprintf(tplstr, imageRepository, imgtag, local.DraftLabelKey, buildCtx.Env.Name, local.BuildIDKey, b.ID)
vals, err := chartutil.ReadValues([]byte(buildCtx.Values.Raw))
if err != nil {
@ -142,20 +143,20 @@ func newAppContext(b *Builder, buildCtx *Context) (*AppContext, error) {
return nil, err
}
state := &storage.Object{
BuildID: b.ID(),
BuildID: b.ID,
ContextID: ctxtID,
LogsFileRef: b.Logs(buildCtx.Env.Name),
}
return &AppContext{
obj: state,
id: b.ID(),
bldr: b,
ctx: buildCtx,
buf: buf,
tags: t,
img: image,
log: logf,
vals: vals,
Obj: state,
ID: b.ID,
Bldr: b,
Ctx: buildCtx,
Buf: buf,
Tags: t,
Img: image,
Log: logf,
Vals: vals,
}, nil
}
@ -329,7 +330,6 @@ func archiveSrc(ctx *Context) error {
// Up handles incoming draft up requests and returns a stream of summaries or error.
func (b *Builder) Up(ctx context.Context, bctx *Context) <-chan *Summary {
b.id = getulid()
ch := make(chan *Summary, 1)
var wg sync.WaitGroup
wg.Add(1)
@ -343,22 +343,20 @@ func (b *Builder) Up(ctx context.Context, bctx *Context) <-chan *Summary {
wg.Done()
}()
if app, err = newAppContext(b, bctx); err != nil {
log.Printf("buildApp: error creating app context: %v\n", err)
log.Printf("error creating app context: %v\n", err)
return
}
log.SetOutput(app.log)
if err := b.buildImg(ctx, app, ch); err != nil {
log.Printf("buildApp: buildImg error: %v\n", err)
log.SetOutput(app.Log)
if err := b.ContainerBuilder.Build(ctx, app, ch); err != nil {
log.Printf("error while building: %v\n", err)
return
}
if app.ctx.Env.Registry != "" {
if err := b.pushImg(ctx, app, ch); err != nil {
log.Printf("buildApp: pushImg error: %v\n", err)
return
}
if err := b.ContainerBuilder.Push(ctx, app, ch); err != nil {
log.Printf("error while pushing: %v\n", err)
return
}
if err := b.release(ctx, app, ch); err != nil {
log.Printf("buildApp: release error: %v\n", err)
log.Printf("error while releasing: %v\n", err)
return
}
}()
@ -371,163 +369,27 @@ func (b *Builder) Up(ctx context.Context, bctx *Context) <-chan *Summary {
// saveState saves information collected from a draft build.
func (b *Builder) saveState(app *AppContext) {
if err := b.Storage.UpdateBuild(context.Background(), app.ctx.Env.Name, app.obj); err != nil {
log.Printf("complete: failed to store build object for app %q: %v\n", app.ctx.Env.Name, err)
if err := b.Storage.UpdateBuild(context.Background(), app.Ctx.Env.Name, app.Obj); err != nil {
log.Printf("complete: failed to store build object for app %q: %v\n", app.Ctx.Env.Name, err)
return
}
if app.log != nil {
app.log.Close()
if app.Log != nil {
app.Log.Close()
}
}
// buildImg builds the docker image.
func (b *Builder) buildImg(ctx context.Context, app *AppContext, out chan<- *Summary) (err error) {
const stageDesc = "Building Docker Image"
defer complete(app.id, stageDesc, out, &err)
summary := summarize(app.id, stageDesc, out)
// notify that particular stage has started.
summary("started", SummaryStarted)
msgc := make(chan string)
errc := make(chan error)
go func() {
buildopts := types.ImageBuildOptions{
Tags: app.tags,
Dockerfile: app.ctx.Env.Dockerfile,
}
resp, err := b.DockerClient.Client().ImageBuild(ctx, app.buf, buildopts)
if err != nil {
errc <- err
return
}
defer func() {
resp.Body.Close()
close(msgc)
close(errc)
}()
outFd, isTerm := term.GetFdInfo(app.buf)
if err := jsonmessage.DisplayJSONMessagesStream(resp.Body, app.log, outFd, isTerm, nil); err != nil {
errc <- err
return
}
if _, _, err = b.DockerClient.Client().ImageInspectWithRaw(ctx, app.img); err != nil {
if dockerclient.IsErrNotFound(err) {
errc <- fmt.Errorf("Could not locate image for %s: %v", app.ctx.Env.Name, err)
return
}
errc <- fmt.Errorf("ImageInspectWithRaw error: %v", err)
return
}
}()
for msgc != nil || errc != nil {
select {
case msg, ok := <-msgc:
if !ok {
msgc = nil
continue
}
summary(msg, SummaryLogging)
case err, ok := <-errc:
if !ok {
errc = nil
continue
}
return err
default:
summary("ongoing", SummaryOngoing)
time.Sleep(time.Second)
}
}
return nil
}
// pushImg pushes the results of buildImg to the image repository.
func (b *Builder) pushImg(ctx context.Context, app *AppContext, out chan<- *Summary) (err error) {
const stageDesc = "Pushing Docker Image"
defer complete(app.id, stageDesc, out, &err)
summary := summarize(app.id, stageDesc, out)
// notify that particular stage has started.
summary("started", SummaryStarted)
msgc := make(chan string, 1)
errc := make(chan error, 1)
var wg sync.WaitGroup
wg.Add(len(app.tags))
go func() {
registryAuth, err := command.RetrieveAuthTokenFromImage(ctx, b.DockerClient, app.img)
if err != nil {
errc <- err
return
}
for _, img := range app.tags {
go func(img string) {
defer wg.Done()
resp, err := b.DockerClient.Client().ImagePush(ctx, img, types.ImagePushOptions{RegistryAuth: registryAuth})
if err != nil {
errc <- err
return
}
defer resp.Close()
outFd, isTerm := term.GetFdInfo(app.log)
if err := jsonmessage.DisplayJSONMessagesStream(resp, app.log, outFd, isTerm, nil); err != nil {
errc <- err
return
}
}(img)
}
defer func() {
close(errc)
close(msgc)
}()
}()
for msgc != nil || errc != nil {
select {
case msg, ok := <-msgc:
if !ok {
msgc = nil
continue
}
summary(msg, SummaryLogging)
case err, ok := <-errc:
if !ok {
errc = nil
continue
}
return err
default:
summary("ongoing", SummaryOngoing)
time.Sleep(time.Second)
}
}
wg.Wait()
return nil
}
// release installs or updates the application deployment.
func (b *Builder) release(ctx context.Context, app *AppContext, out chan<- *Summary) (err error) {
const stageDesc = "Releasing Application"
defer complete(app.id, stageDesc, out, &err)
summary := summarize(app.id, stageDesc, out)
defer Complete(app.ID, stageDesc, out, &err)
summary := Summarize(app.ID, stageDesc, out)
// notify that particular stage has started.
summary("started", SummaryStarted)
// inject a registry secret only if a registry was configured
if app.ctx.Env.Registry != "" {
if app.Ctx.Env.Registry != "" {
if err := b.prepareReleaseEnvironment(ctx, app); err != nil {
return err
}
@ -539,46 +401,46 @@ func (b *Builder) release(ctx context.Context, app *AppContext, out chan<- *Summ
// The returned error is a gSummaryhat wraps the message from the original error.
// So we're stuck doing string matching against the wrapped error, which is nested inside
// of the gSummaryessage.
_, err = b.Helm.ReleaseContent(app.ctx.Env.Name, helm.ContentReleaseVersion(1))
_, err = b.Helm.ReleaseContent(app.Ctx.Env.Name, helm.ContentReleaseVersion(1))
if err != nil && strings.Contains(err.Error(), "not found") {
msg := fmt.Sprintf("Release %q does not exist. Installing it now.", app.ctx.Env.Name)
msg := fmt.Sprintf("Release %q does not exist. Installing it now.", app.Ctx.Env.Name)
summary(msg, SummaryLogging)
vals, err := app.vals.YAML()
vals, err := app.Vals.YAML()
if err != nil {
return err
}
opts := []helm.InstallOption{
helm.ReleaseName(app.ctx.Env.Name),
helm.ReleaseName(app.Ctx.Env.Name),
helm.ValueOverrides([]byte(vals)),
helm.InstallWait(app.ctx.Env.Wait),
helm.InstallWait(app.Ctx.Env.Wait),
}
rls, err := b.Helm.InstallReleaseFromChart(app.ctx.Chart, app.ctx.Env.Namespace, opts...)
rls, err := b.Helm.InstallReleaseFromChart(app.Ctx.Chart, app.Ctx.Env.Namespace, opts...)
if err != nil {
return fmt.Errorf("could not install release: %v", err)
}
app.obj.Release = rls.Release.Name
app.Obj.Release = rls.Release.Name
formatReleaseStatus(app, rls.Release, summary)
} else {
msg := fmt.Sprintf("Upgrading %s.", app.ctx.Env.Name)
msg := fmt.Sprintf("Upgrading %s.", app.Ctx.Env.Name)
summary(msg, SummaryLogging)
vals, err := app.vals.YAML()
vals, err := app.Vals.YAML()
if err != nil {
return err
}
opts := []helm.UpdateOption{
helm.UpdateValueOverrides([]byte(vals)),
helm.UpgradeWait(app.ctx.Env.Wait),
helm.UpgradeWait(app.Ctx.Env.Wait),
}
rls, err := b.Helm.UpdateReleaseFromChart(app.ctx.Env.Name, app.ctx.Chart, opts...)
rls, err := b.Helm.UpdateReleaseFromChart(app.Ctx.Env.Name, app.Ctx.Chart, opts...)
if err != nil {
return fmt.Errorf("could not upgrade release: %v", err)
}
app.obj.Release = rls.Release.Name
app.Obj.Release = rls.Release.Name
formatReleaseStatus(app, rls.Release, summary)
}
return nil
@ -586,21 +448,21 @@ func (b *Builder) release(ctx context.Context, app *AppContext, out chan<- *Summ
func (b *Builder) prepareReleaseEnvironment(ctx context.Context, app *AppContext) error {
// determine if the destination namespace exists, create it if not.
if _, err := b.Kube.CoreV1().Namespaces().Get(app.ctx.Env.Namespace, metav1.GetOptions{}); err != nil {
if _, err := b.Kube.CoreV1().Namespaces().Get(app.Ctx.Env.Namespace, metav1.GetOptions{}); err != nil {
if !apiErrors.IsNotFound(err) {
return err
}
_, err = b.Kube.CoreV1().Namespaces().Create(&v1.Namespace{
ObjectMeta: metav1.ObjectMeta{Name: app.ctx.Env.Namespace},
ObjectMeta: metav1.ObjectMeta{Name: app.Ctx.Env.Namespace},
})
if err != nil {
return fmt.Errorf("could not create namespace %q: %v", app.ctx.Env.Namespace, err)
return fmt.Errorf("could not create namespace %q: %v", app.Ctx.Env.Namespace, err)
}
}
regAuthToken, err := command.RetrieveAuthTokenFromImage(ctx, b.DockerClient, app.img)
regAuthToken, err := command.RetrieveAuthTokenFromImage(ctx, b.DockerClient, app.Img)
if err != nil {
return fmt.Errorf("failed to retrieve auth token from image %s: %v", app.img, err)
return fmt.Errorf("failed to retrieve auth token from image %s: %v", app.Img, err)
}
// we need to translate the auth token Docker gives us into a Kubernetes registry auth secret token.
@ -610,22 +472,22 @@ func (b *Builder) prepareReleaseEnvironment(ctx context.Context, app *AppContext
}
// create a new json string with the full dockerauth, including the registry URL.
js, err := json.Marshal(map[string]*DockerConfigEntryWithAuth{app.ctx.Env.Registry: regAuth})
js, err := json.Marshal(map[string]*DockerConfigEntryWithAuth{app.Ctx.Env.Registry: regAuth})
if err != nil {
return fmt.Errorf("could not json encode docker authentication string: %v", err)
}
// determine if the registry pull secret exists in the desired namespace, create it if not.
var secret *v1.Secret
if secret, err = b.Kube.CoreV1().Secrets(app.ctx.Env.Namespace).Get(PullSecretName, metav1.GetOptions{}); err != nil {
if secret, err = b.Kube.CoreV1().Secrets(app.Ctx.Env.Namespace).Get(PullSecretName, metav1.GetOptions{}); err != nil {
if !apiErrors.IsNotFound(err) {
return err
}
_, err = b.Kube.CoreV1().Secrets(app.ctx.Env.Namespace).Create(
_, err = b.Kube.CoreV1().Secrets(app.Ctx.Env.Namespace).Create(
&v1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: PullSecretName,
Namespace: app.ctx.Env.Namespace,
Namespace: app.Ctx.Env.Namespace,
},
Type: v1.SecretTypeDockercfg,
StringData: map[string]string{
@ -640,7 +502,7 @@ func (b *Builder) prepareReleaseEnvironment(ctx context.Context, app *AppContext
// the registry pull secret exists, check if it needs to be updated.
if data, ok := secret.StringData[".dockercfg"]; ok && data != string(js) {
secret.StringData[".dockercfg"] = string(js)
_, err = b.Kube.CoreV1().Secrets(app.ctx.Env.Namespace).Update(secret)
_, err = b.Kube.CoreV1().Secrets(app.Ctx.Env.Namespace).Update(secret)
if err != nil {
return fmt.Errorf("could not update registry pull secret: %v", err)
}
@ -649,7 +511,7 @@ func (b *Builder) prepareReleaseEnvironment(ctx context.Context, app *AppContext
// determine if the default service account in the desired namespace has the correct
// imagePullSecret. If not, add it.
svcAcct, err := b.Kube.CoreV1().ServiceAccounts(app.ctx.Env.Namespace).Get(DefaultServiceAccountName, metav1.GetOptions{})
svcAcct, err := b.Kube.CoreV1().ServiceAccounts(app.Ctx.Env.Namespace).Get(DefaultServiceAccountName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("could not load default service account: %v", err)
}
@ -664,7 +526,7 @@ func (b *Builder) prepareReleaseEnvironment(ctx context.Context, app *AppContext
svcAcct.ImagePullSecrets = append(svcAcct.ImagePullSecrets, v1.LocalObjectReference{
Name: PullSecretName,
})
_, err := b.Kube.CoreV1().ServiceAccounts(app.ctx.Env.Namespace).Update(svcAcct)
_, err := b.Kube.CoreV1().ServiceAccounts(app.Ctx.Env.Namespace).Update(svcAcct)
if err != nil {
return fmt.Errorf("could not modify default service account with registry pull secret: %v", err)
}
@ -674,7 +536,7 @@ func (b *Builder) prepareReleaseEnvironment(ctx context.Context, app *AppContext
}
func formatReleaseStatus(app *AppContext, rls *release.Release, summary func(string, SummaryStatusCode)) {
status := fmt.Sprintf("%s %v", app.ctx.Env.Name, rls.Info.Status.Code)
status := fmt.Sprintf("%s %v", app.Ctx.Env.Name, rls.Info.Status.Code)
summary(status, SummaryLogging)
if rls.Info.Status.Notes != "" {
notes := fmt.Sprintf("notes: %v", rls.Info.Status.Notes)
@ -682,16 +544,16 @@ func formatReleaseStatus(app *AppContext, rls *release.Release, summary func(str
}
}
// summarize returns a function closure that wraps writing SummaryStatusCode.
func summarize(id, desc string, out chan<- *Summary) func(string, SummaryStatusCode) {
// Summarize returns a function closure that wraps writing SummaryStatusCode.
func Summarize(id, desc string, out chan<- *Summary) func(string, SummaryStatusCode) {
return func(info string, code SummaryStatusCode) {
out <- &Summary{StageDesc: desc, StatusText: info, StatusCode: code, BuildID: id}
}
}
// complete marks the end of a draft build stage.
func complete(id, desc string, out chan<- *Summary, err *error) {
switch fn := summarize(id, desc, out); {
// Complete marks the end of a draft build stage.
func Complete(id, desc string, out chan<- *Summary, err *error) {
switch fn := Summarize(id, desc, out); {
case *err != nil:
fn(fmt.Sprintf("failure: %v", *err), SummaryFailure)
default:

Просмотреть файл

@ -0,0 +1,160 @@
package docker
import (
"fmt"
"sync"
"time"
"github.com/Azure/draft/pkg/builder"
"github.com/docker/cli/cli/command"
"github.com/docker/docker/api/types"
dockerclient "github.com/docker/docker/client"
"github.com/docker/docker/pkg/jsonmessage"
"github.com/docker/docker/pkg/term"
"golang.org/x/net/context"
)
// Builder contains information about the build environment
type Builder struct {
DockerClient command.Cli
}
// Build builds the docker image.
func (b *Builder) Build(ctx context.Context, app *builder.AppContext, out chan<- *builder.Summary) (err error) {
const stageDesc = "Building Docker Image"
defer builder.Complete(app.ID, stageDesc, out, &err)
summary := builder.Summarize(app.ID, stageDesc, out)
// notify that particular stage has started.
summary("started", builder.SummaryStarted)
msgc := make(chan string)
errc := make(chan error)
go func() {
buildopts := types.ImageBuildOptions{
Tags: app.Tags,
Dockerfile: app.Ctx.Env.Dockerfile,
}
resp, err := b.DockerClient.Client().ImageBuild(ctx, app.Buf, buildopts)
if err != nil {
errc <- err
return
}
defer func() {
resp.Body.Close()
close(msgc)
close(errc)
}()
outFd, isTerm := term.GetFdInfo(app.Buf)
if err := jsonmessage.DisplayJSONMessagesStream(resp.Body, app.Log, outFd, isTerm, nil); err != nil {
errc <- err
return
}
if _, _, err = b.DockerClient.Client().ImageInspectWithRaw(ctx, app.Img); err != nil {
if dockerclient.IsErrNotFound(err) {
errc <- fmt.Errorf("Could not locate image for %s: %v", app.Ctx.Env.Name, err)
return
}
errc <- fmt.Errorf("ImageInspectWithRaw error: %v", err)
return
}
}()
for msgc != nil || errc != nil {
select {
case msg, ok := <-msgc:
if !ok {
msgc = nil
continue
}
summary(msg, builder.SummaryLogging)
case err, ok := <-errc:
if !ok {
errc = nil
continue
}
return err
default:
summary("ongoing", builder.SummaryOngoing)
time.Sleep(time.Second)
}
}
return nil
}
// Push pushes the results of Build to the image repository.
func (b *Builder) Push(ctx context.Context, app *builder.AppContext, out chan<- *builder.Summary) (err error) {
if app.Ctx.Env.Registry == "" {
return
}
const stageDesc = "Pushing Docker Image"
defer builder.Complete(app.ID, stageDesc, out, &err)
summary := builder.Summarize(app.ID, stageDesc, out)
// notify that particular stage has started.
summary("started", builder.SummaryStarted)
msgc := make(chan string, 1)
errc := make(chan error, 1)
var wg sync.WaitGroup
wg.Add(len(app.Tags))
go func() {
registryAuth, err := command.RetrieveAuthTokenFromImage(ctx, b.DockerClient, app.Img)
if err != nil {
errc <- err
return
}
for _, img := range app.Tags {
go func(img string) {
defer wg.Done()
resp, err := b.DockerClient.Client().ImagePush(ctx, img, types.ImagePushOptions{RegistryAuth: registryAuth})
if err != nil {
errc <- err
return
}
defer resp.Close()
outFd, isTerm := term.GetFdInfo(app.Log)
if err := jsonmessage.DisplayJSONMessagesStream(resp, app.Log, outFd, isTerm, nil); err != nil {
errc <- err
return
}
}(img)
}
defer func() {
close(errc)
close(msgc)
}()
}()
for msgc != nil || errc != nil {
select {
case msg, ok := <-msgc:
if !ok {
msgc = nil
continue
}
summary(msg, builder.SummaryLogging)
case err, ok := <-errc:
if !ok {
errc = nil
continue
}
return err
default:
summary("ongoing", builder.SummaryOngoing)
time.Sleep(time.Second)
}
}
wg.Wait()
return nil
}