зеркало из https://github.com/golang/build.git
240 строки
6.2 KiB
Go
240 строки
6.2 KiB
Go
// Copyright 2023 The Go Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style
|
|
// license that can be found in the LICENSE file.
|
|
|
|
package buildlet
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"mime/multipart"
|
|
"net/http"
|
|
"os"
|
|
"strings"
|
|
|
|
"golang.org/x/build/internal/gomote/protos"
|
|
"golang.org/x/build/types"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
)
|
|
|
|
type GRPCCoordinatorClient struct {
|
|
Client protos.GomoteServiceClient
|
|
}
|
|
|
|
func (c *GRPCCoordinatorClient) CreateBuildlet(ctx context.Context, builderType string) (RemoteClient, error) {
|
|
return c.CreateBuildletWithStatus(ctx, builderType, func(types.BuildletWaitStatus) {})
|
|
}
|
|
|
|
func (c *GRPCCoordinatorClient) CreateBuildletWithStatus(ctx context.Context, builderType string, status func(types.BuildletWaitStatus)) (RemoteClient, error) {
|
|
stream, err := c.Client.CreateInstance(ctx, &protos.CreateInstanceRequest{BuilderType: builderType})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
var instance *protos.Instance
|
|
for {
|
|
update, err := stream.Recv()
|
|
switch {
|
|
case err == io.EOF:
|
|
return &grpcBuildlet{
|
|
client: c.Client,
|
|
id: instance.GetGomoteId(),
|
|
workDir: instance.GetWorkingDir(),
|
|
}, nil
|
|
case err != nil:
|
|
return nil, err
|
|
case update.GetStatus() != protos.CreateInstanceResponse_COMPLETE:
|
|
status(types.BuildletWaitStatus{
|
|
Ahead: int(update.WaitersAhead),
|
|
})
|
|
|
|
case update.GetStatus() == protos.CreateInstanceResponse_COMPLETE:
|
|
instance = update.GetInstance()
|
|
|
|
}
|
|
}
|
|
}
|
|
|
|
type grpcBuildlet struct {
|
|
client protos.GomoteServiceClient
|
|
id string
|
|
workDir string
|
|
}
|
|
|
|
var _ RemoteClient = (*grpcBuildlet)(nil)
|
|
|
|
func (b *grpcBuildlet) Close() error {
|
|
_, err := b.client.DestroyInstance(context.TODO(), &protos.DestroyInstanceRequest{
|
|
GomoteId: b.id,
|
|
})
|
|
return err
|
|
}
|
|
|
|
func (b *grpcBuildlet) Exec(ctx context.Context, cmd string, opts ExecOpts) (remoteErr error, execErr error) {
|
|
stream, err := b.client.ExecuteCommand(ctx, &protos.ExecuteCommandRequest{
|
|
GomoteId: b.id,
|
|
Command: cmd,
|
|
SystemLevel: opts.SystemLevel,
|
|
Debug: opts.Debug,
|
|
AppendEnvironment: opts.ExtraEnv,
|
|
Path: opts.Path,
|
|
Directory: opts.Dir,
|
|
Args: opts.Args,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if opts.OnStartExec != nil {
|
|
opts.OnStartExec()
|
|
}
|
|
for {
|
|
update, err := stream.Recv()
|
|
if errors.Is(err, io.EOF) {
|
|
return nil, nil
|
|
}
|
|
if err != nil {
|
|
// Execution error.
|
|
if status.Code(err) == codes.Aborted {
|
|
return nil, err
|
|
}
|
|
// Unknown, presumed command error.
|
|
return err, nil
|
|
}
|
|
if opts.Output != nil {
|
|
opts.Output.Write(update.Output)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (b *grpcBuildlet) GetTar(ctx context.Context, dir string) (io.ReadCloser, error) {
|
|
resp, err := b.client.ReadTGZToURL(ctx, &protos.ReadTGZToURLRequest{
|
|
GomoteId: b.id,
|
|
Directory: dir,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, resp.GetUrl(), nil)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("server returned invalid URL %q: %v", resp.GetUrl(), err)
|
|
}
|
|
r, err := http.DefaultClient.Do(req)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error fetching tgz: %v", err)
|
|
}
|
|
if r.StatusCode != http.StatusOK {
|
|
return nil, fmt.Errorf("unexpected status reading tgz: %v", r.Status)
|
|
}
|
|
return r.Body, nil
|
|
}
|
|
|
|
func (b *grpcBuildlet) ListDir(ctx context.Context, dir string, opts ListDirOpts, fn func(DirEntry)) error {
|
|
resp, err := b.client.ListDirectory(ctx, &protos.ListDirectoryRequest{
|
|
GomoteId: b.id,
|
|
Directory: dir,
|
|
Recursive: opts.Recursive,
|
|
SkipFiles: opts.Skip,
|
|
Digest: opts.Digest,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, ent := range resp.Entries {
|
|
fn(DirEntry{ent})
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (b *grpcBuildlet) Put(ctx context.Context, r io.Reader, path string, mode os.FileMode) error {
|
|
url, err := b.upload(ctx, r)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
_, err = b.client.WriteFileFromURL(ctx, &protos.WriteFileFromURLRequest{
|
|
GomoteId: b.id,
|
|
Url: url,
|
|
Filename: path,
|
|
Mode: uint32(mode),
|
|
})
|
|
return err
|
|
}
|
|
|
|
func (b *grpcBuildlet) PutTar(ctx context.Context, r io.Reader, dir string) error {
|
|
url, err := b.upload(ctx, r)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
_, err = b.client.WriteTGZFromURL(ctx, &protos.WriteTGZFromURLRequest{
|
|
GomoteId: b.id,
|
|
Url: url,
|
|
Directory: dir,
|
|
})
|
|
return err
|
|
}
|
|
|
|
func (b *grpcBuildlet) PutTarFromURL(ctx context.Context, url string, dir string) error {
|
|
_, err := b.client.WriteTGZFromURL(ctx, &protos.WriteTGZFromURLRequest{
|
|
GomoteId: b.id,
|
|
Url: url,
|
|
Directory: dir,
|
|
})
|
|
return err
|
|
}
|
|
|
|
func (b *grpcBuildlet) upload(ctx context.Context, r io.Reader) (string, error) {
|
|
resp, err := b.client.UploadFile(ctx, &protos.UploadFileRequest{})
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
buf := new(bytes.Buffer)
|
|
mw := multipart.NewWriter(buf)
|
|
for k, v := range resp.Fields {
|
|
if err := mw.WriteField(k, v); err != nil {
|
|
return "", fmt.Errorf("unable to write field: %s", err)
|
|
}
|
|
}
|
|
_, err = mw.CreateFormFile("file", resp.ObjectName)
|
|
if err != nil {
|
|
return "", fmt.Errorf("unable to create form file: %s", err)
|
|
}
|
|
// Write our own boundary to avoid buffering entire file into the multipart Writer
|
|
bound := fmt.Sprintf("\r\n--%s--\r\n", mw.Boundary())
|
|
req, err := http.NewRequestWithContext(ctx, "POST", resp.Url, io.NopCloser(io.MultiReader(buf, r, strings.NewReader(bound))))
|
|
if err != nil {
|
|
return "", fmt.Errorf("unable to create request: %s", err)
|
|
}
|
|
req.Header.Set("Content-Type", mw.FormDataContentType())
|
|
res, err := http.DefaultClient.Do(req)
|
|
if err != nil {
|
|
return "", fmt.Errorf("http request failed: %s", err)
|
|
}
|
|
if res.StatusCode != http.StatusNoContent {
|
|
return "", fmt.Errorf("http post failed: status code=%d", res.StatusCode)
|
|
}
|
|
return resp.Url + resp.ObjectName, nil
|
|
}
|
|
|
|
func (b *grpcBuildlet) ProxyTCP(port int) (io.ReadWriteCloser, error) {
|
|
return nil, fmt.Errorf("TCP proxying unimplemented in grpc")
|
|
}
|
|
|
|
func (b *grpcBuildlet) RemoteName() string {
|
|
return b.id
|
|
}
|
|
|
|
func (b *grpcBuildlet) RemoveAll(ctx context.Context, paths ...string) error {
|
|
_, err := b.client.RemoveFiles(ctx, &protos.RemoveFilesRequest{
|
|
GomoteId: b.id,
|
|
Paths: paths,
|
|
})
|
|
return err
|
|
}
|
|
|
|
func (b *grpcBuildlet) WorkDir(ctx context.Context) (string, error) {
|
|
return b.workDir, nil
|
|
}
|