зеркало из https://github.com/golang/build.git
698 строки
21 KiB
Go
698 строки
21 KiB
Go
// Copyright 2017 The Go Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style
|
|
// license that can be found in the LICENSE file.
|
|
|
|
package maintner
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/sha256"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"io/fs"
|
|
"log"
|
|
"net"
|
|
"net/http"
|
|
"net/url"
|
|
"os"
|
|
"path/filepath"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/golang/protobuf/proto"
|
|
"golang.org/x/build/maintner/internal/robustio"
|
|
"golang.org/x/build/maintner/maintpb"
|
|
"golang.org/x/build/maintner/reclog"
|
|
)
|
|
|
|
// NewNetworkMutationSource returns a mutation source from a master server.
|
|
// The server argument should be a URL to the JSON logs index.
|
|
func NewNetworkMutationSource(server, cacheDir string) MutationSource {
|
|
base, err := url.Parse(server)
|
|
if err != nil {
|
|
panic(fmt.Sprintf("invalid URL: %q", server))
|
|
}
|
|
return &netMutSource{
|
|
server: server,
|
|
base: base,
|
|
cacheDir: cacheDir,
|
|
}
|
|
}
|
|
|
|
// TailNetworkMutationSource calls fn for all new mutations added to the log on server.
|
|
// Events with the End field set to true are not sent, so all events will
|
|
// have exactly one of Mutation or Err fields set to a non-zero value.
|
|
// It ignores prior events.
|
|
// If the server is restarted and its history diverges,
|
|
// TailNetworkMutationSource may return duplicate events. This therefore does not
|
|
// return a MutationSource, so it can't be accidentally misused for important things.
|
|
// TailNetworkMutationSource returns if fn returns an error, if ctx expires,
|
|
// or if it runs into a network error.
|
|
func TailNetworkMutationSource(ctx context.Context, server string, fn func(MutationStreamEvent) error) error {
|
|
td, err := os.MkdirTemp("", "maintnertail")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer robustio.RemoveAll(td)
|
|
|
|
ns := NewNetworkMutationSource(server, td).(*netMutSource)
|
|
ns.quiet = true
|
|
getSegs := func(waitSizeNot int64) ([]LogSegmentJSON, error) {
|
|
for {
|
|
segs, err := ns.getServerSegments(ctx, waitSizeNot)
|
|
if err != nil {
|
|
if err := ctx.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
// Sleep a minimum of 5 seconds before trying
|
|
// again. The user's fn function might sleep
|
|
// longer or shorter.
|
|
timer := time.NewTimer(5 * time.Second)
|
|
err := fn(MutationStreamEvent{Err: err})
|
|
if err != nil {
|
|
timer.Stop()
|
|
return nil, err
|
|
}
|
|
<-timer.C
|
|
continue
|
|
}
|
|
return segs, nil
|
|
}
|
|
}
|
|
|
|
// See how long the log is at start. Then we'll only fetch
|
|
// things after that.
|
|
segs, err := getSegs(0)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
segSize := sumJSONSegSize(segs)
|
|
lastSeg := segs[len(segs)-1]
|
|
if _, _, err := ns.syncSeg(ctx, lastSeg); err != nil {
|
|
return err
|
|
}
|
|
|
|
ticker := time.NewTicker(time.Second) // max re-fetch interval
|
|
defer ticker.Stop()
|
|
for {
|
|
segs, err := getSegs(segSize)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
segSize = sumJSONSegSize(segs)
|
|
|
|
for _, seg := range segs {
|
|
if seg.Number < lastSeg.Number {
|
|
continue
|
|
}
|
|
var off int64
|
|
if seg.Number == lastSeg.Number {
|
|
off = lastSeg.Size
|
|
}
|
|
_, newData, err := ns.syncSeg(ctx, seg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := reclog.ForeachRecord(bytes.NewReader(newData), off, func(off int64, hdr, rec []byte) error {
|
|
m := new(maintpb.Mutation)
|
|
if err := proto.Unmarshal(rec, m); err != nil {
|
|
return err
|
|
}
|
|
return fn(MutationStreamEvent{Mutation: m})
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
lastSeg = segs[len(segs)-1]
|
|
|
|
<-ticker.C
|
|
}
|
|
}
|
|
|
|
type netMutSource struct {
|
|
server string
|
|
base *url.URL
|
|
cacheDir string
|
|
|
|
last []fileSeg
|
|
quiet bool // disable verbose logging
|
|
|
|
// Hooks for testing. If nil, unused:
|
|
testHookGetServerSegments func(context.Context, int64) ([]LogSegmentJSON, error)
|
|
testHookSyncSeg func(context.Context, LogSegmentJSON) (fileSeg, []byte, error)
|
|
testHookOnSplit func(sumCommon int64)
|
|
testHookFilePrefixSum224 func(file string, n int64) string
|
|
}
|
|
|
|
func (ns *netMutSource) GetMutations(ctx context.Context) <-chan MutationStreamEvent {
|
|
ch := make(chan MutationStreamEvent, 50)
|
|
go func() {
|
|
err := ns.fetchAndSendMutations(ctx, ch)
|
|
final := MutationStreamEvent{Err: err}
|
|
if err == nil {
|
|
final.End = true
|
|
}
|
|
select {
|
|
case ch <- final:
|
|
case <-ctx.Done():
|
|
}
|
|
}()
|
|
return ch
|
|
}
|
|
|
|
// isNoInternetError reports whether the provided error is because there's no
|
|
// network connectivity.
|
|
func isNoInternetError(err error) bool {
|
|
if err == nil {
|
|
return false
|
|
}
|
|
switch err := err.(type) {
|
|
case fetchError:
|
|
return isNoInternetError(err.Err)
|
|
case *url.Error:
|
|
return isNoInternetError(err.Err)
|
|
case *net.OpError:
|
|
return isNoInternetError(err.Err)
|
|
case *net.DNSError:
|
|
// Trashy:
|
|
return err.Err == "no such host"
|
|
default:
|
|
log.Printf("Unknown error type %T: %#v", err, err)
|
|
return false
|
|
}
|
|
}
|
|
|
|
func (ns *netMutSource) locallyCachedSegments() (segs []fileSeg, err error) {
|
|
defer func() {
|
|
if err != nil {
|
|
log.Printf("No network connection and failed to use local cache: %v", err)
|
|
} else {
|
|
log.Printf("No network connection; using %d locally cached segments.", len(segs))
|
|
}
|
|
}()
|
|
des, err := os.ReadDir(ns.cacheDir)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
fiMap := map[string]fs.FileInfo{}
|
|
segHex := map[int]string{}
|
|
segGrowing := map[int]bool{}
|
|
for _, de := range des {
|
|
name := de.Name()
|
|
if !strings.HasSuffix(name, ".mutlog") {
|
|
continue
|
|
}
|
|
fi, err := de.Info()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
fiMap[name] = fi
|
|
|
|
if len(name) == len("0000.6897fab4d3afcda332424b2a2a1a4469021074282bc7be5606aaa221.mutlog") {
|
|
num, err := strconv.Atoi(name[:4])
|
|
if err != nil {
|
|
continue
|
|
}
|
|
segHex[num] = strings.TrimSuffix(name[5:], ".mutlog")
|
|
} else if strings.HasSuffix(name, ".growing.mutlog") {
|
|
num, err := strconv.Atoi(name[:4])
|
|
if err != nil {
|
|
continue
|
|
}
|
|
segGrowing[num] = true
|
|
}
|
|
}
|
|
for num := 0; ; num++ {
|
|
if hex, ok := segHex[num]; ok {
|
|
name := fmt.Sprintf("%04d.%s.mutlog", num, hex)
|
|
segs = append(segs, fileSeg{
|
|
seg: num,
|
|
file: filepath.Join(ns.cacheDir, name),
|
|
size: fiMap[name].Size(),
|
|
sha224: hex,
|
|
})
|
|
continue
|
|
}
|
|
if segGrowing[num] {
|
|
name := fmt.Sprintf("%04d.growing.mutlog", num)
|
|
slurp, err := robustio.ReadFile(filepath.Join(ns.cacheDir, name))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
segs = append(segs, fileSeg{
|
|
seg: num,
|
|
file: filepath.Join(ns.cacheDir, name),
|
|
size: int64(len(slurp)),
|
|
sha224: fmt.Sprintf("%x", sha256.Sum224(slurp)),
|
|
})
|
|
}
|
|
return segs, nil
|
|
}
|
|
}
|
|
|
|
// getServerSegments fetches the JSON logs handler (ns.server, usually
|
|
// https://maintner.golang.org/logs) and returns the parsed JSON.
|
|
// It sends the "waitsizenot" URL parameter, which specifies that the
|
|
// request should long-poll waiting for the server to have a sum of
|
|
// log segment sizes different than the value specified. As a result,
|
|
// it blocks until the server has new data to send or ctx expires.
|
|
//
|
|
// getServerSegments returns an error that matches fetchError with
|
|
// PossiblyRetryable set to true when it has signal that repeating
|
|
// the same call after some time may succeed.
|
|
func (ns *netMutSource) getServerSegments(ctx context.Context, waitSizeNot int64) ([]LogSegmentJSON, error) {
|
|
if fn := ns.testHookGetServerSegments; fn != nil {
|
|
return fn(ctx, waitSizeNot)
|
|
}
|
|
logsURL := fmt.Sprintf("%s?waitsizenot=%d", ns.server, waitSizeNot)
|
|
for {
|
|
req, err := http.NewRequestWithContext(ctx, "GET", logsURL, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
res, err := http.DefaultClient.Do(req)
|
|
if err != nil {
|
|
return nil, fetchError{Err: err, PossiblyRetryable: true}
|
|
}
|
|
// When we're doing a long poll and the server replies
|
|
// with a 304 response, that means the server is just
|
|
// heart-beating us and trying to get a response back
|
|
// within its various deadlines. But we should just
|
|
// try again.
|
|
if res.StatusCode == http.StatusNotModified {
|
|
res.Body.Close()
|
|
continue
|
|
}
|
|
defer res.Body.Close()
|
|
if res.StatusCode/100 == 5 {
|
|
// Consider a 5xx server response to possibly succeed later.
|
|
return nil, fetchError{Err: fmt.Errorf("%s: %v", ns.server, res.Status), PossiblyRetryable: true}
|
|
} else if res.StatusCode != http.StatusOK {
|
|
return nil, fmt.Errorf("%s: %v", ns.server, res.Status)
|
|
}
|
|
b, err := io.ReadAll(res.Body)
|
|
if err != nil {
|
|
return nil, fetchError{Err: err, PossiblyRetryable: true}
|
|
}
|
|
var segs []LogSegmentJSON
|
|
err = json.Unmarshal(b, &segs)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unmarshaling %s JSON: %v", ns.server, err)
|
|
}
|
|
return segs, nil
|
|
}
|
|
}
|
|
|
|
// getNewSegments fetches new mutations from the network mutation source.
|
|
// It tries to absorb the expected network bumps by trying multiple times,
|
|
// and returns an error only when it considers the problem to be terminal.
|
|
//
|
|
// If there's no internet connectivity from the start, it returns locally
|
|
// cached segments that might be available from before. Otherwise it waits
|
|
// for internet connectivity to come back and keeps going when it does.
|
|
func (ns *netMutSource) getNewSegments(ctx context.Context) ([]fileSeg, error) {
|
|
sumLast := sumSegSize(ns.last)
|
|
|
|
// First, fetch JSON metadata for the segments from the server.
|
|
var serverSegs []LogSegmentJSON
|
|
for try := 1; ; {
|
|
segs, err := ns.getServerSegments(ctx, sumLast)
|
|
if isNoInternetError(err) {
|
|
if sumLast == 0 {
|
|
return ns.locallyCachedSegments()
|
|
}
|
|
log.Printf("No internet; blocking.")
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
case <-time.After(15 * time.Second):
|
|
try = 1
|
|
continue
|
|
}
|
|
} else if fe := (fetchError{}); errors.As(err, &fe) && fe.PossiblyRetryable {
|
|
// Fetching the JSON logs handler happens over an unreliable network connection,
|
|
// and will fail at some point. Prefer to try again over reporting a terminal error.
|
|
const maxTries = 5
|
|
if try == maxTries {
|
|
// At this point, promote it to a terminal error.
|
|
return nil, fmt.Errorf("after %d attempts, fetching server segments still failed: %v", maxTries, err)
|
|
}
|
|
someDelay := time.Duration(try*try) * time.Second
|
|
log.Printf("fetching server segments did not succeed on attempt %d, will try again in %v: %v", try, someDelay, err)
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
case <-time.After(someDelay):
|
|
try++
|
|
continue
|
|
}
|
|
} else if err != nil {
|
|
return nil, err
|
|
}
|
|
serverSegs = segs
|
|
break
|
|
}
|
|
// TODO: optimization: if already on GCE, skip sync to disk part and just
|
|
// read from network. fast & free network inside.
|
|
|
|
// Second, fetch the new segments or their fragments
|
|
// that we don't yet have locally.
|
|
var fileSegs []fileSeg
|
|
for _, seg := range serverSegs {
|
|
for try := 1; ; {
|
|
fileSeg, _, err := ns.syncSeg(ctx, seg)
|
|
if isNoInternetError(err) {
|
|
log.Printf("No internet; blocking.")
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
case <-time.After(15 * time.Second):
|
|
try = 1
|
|
continue
|
|
}
|
|
} else if fe := (fetchError{}); errors.As(err, &fe) && fe.PossiblyRetryable {
|
|
// Syncing a segment fetches a good deal of data over a network connection,
|
|
// and will fail at some point. Be very willing to try again at this layer,
|
|
// since it's much more efficient than having GetMutations return an error
|
|
// and possibly cause a higher level retry to redo significantly more work.
|
|
const maxTries = 10
|
|
if try == maxTries {
|
|
// At this point, promote it to a terminal error.
|
|
return nil, fmt.Errorf("after %d attempts, syncing segment %d still failed: %v", maxTries, seg.Number, err)
|
|
}
|
|
someDelay := time.Duration(try*try) * time.Second
|
|
log.Printf("syncing segment %d did not succeed on attempt %d, will try again in %v: %v", seg.Number, try, someDelay, err)
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
case <-time.After(someDelay):
|
|
try++
|
|
continue
|
|
}
|
|
} else if err != nil {
|
|
return nil, err
|
|
}
|
|
fileSegs = append(fileSegs, fileSeg)
|
|
break
|
|
}
|
|
}
|
|
|
|
// Verify consistency of newly fetched data,
|
|
// and check there is in fact something new.
|
|
sumCommon := ns.sumCommonPrefixSize(fileSegs, ns.last)
|
|
if sumCommon != sumLast {
|
|
if fn := ns.testHookOnSplit; fn != nil {
|
|
fn(sumCommon)
|
|
}
|
|
// Our history diverged from the source.
|
|
return nil, ErrSplit
|
|
} else if sumCur := sumSegSize(fileSegs); sumCommon == sumCur {
|
|
// Nothing new. This shouldn't happen since the maintnerd server is required to handle
|
|
// the "?waitsizenot=NNN" long polling parameter, so it's a problem if we get here.
|
|
return nil, fmt.Errorf("maintner.netsource: maintnerd server returned unchanged log segments")
|
|
}
|
|
ns.last = fileSegs
|
|
|
|
newSegs := trimLeadingSegBytes(fileSegs, sumCommon)
|
|
return newSegs, nil
|
|
}
|
|
|
|
func trimLeadingSegBytes(in []fileSeg, trim int64) []fileSeg {
|
|
// First trim off whole segments, sharing the same underlying memory.
|
|
for len(in) > 0 && trim >= in[0].size {
|
|
trim -= in[0].size
|
|
in = in[1:]
|
|
}
|
|
if len(in) == 0 {
|
|
return nil
|
|
}
|
|
// Now copy, since we'll be modifying the first element.
|
|
out := append([]fileSeg(nil), in...)
|
|
out[0].skip = trim
|
|
return out
|
|
}
|
|
|
|
// filePrefixSum224 returns the lowercase hex SHA-224 of the first n bytes of file.
|
|
func (ns *netMutSource) filePrefixSum224(file string, n int64) string {
|
|
if fn := ns.testHookFilePrefixSum224; fn != nil {
|
|
return fn(file, n)
|
|
}
|
|
f, err := os.Open(file)
|
|
if err != nil {
|
|
if !os.IsNotExist(err) {
|
|
log.Print(err)
|
|
}
|
|
return ""
|
|
}
|
|
defer f.Close()
|
|
h := sha256.New224()
|
|
_, err = io.CopyN(h, f, n)
|
|
if err != nil {
|
|
log.Print(err)
|
|
return ""
|
|
}
|
|
return fmt.Sprintf("%x", h.Sum(nil))
|
|
}
|
|
|
|
func sumSegSize(segs []fileSeg) (sum int64) {
|
|
for _, seg := range segs {
|
|
sum += seg.size
|
|
}
|
|
return
|
|
}
|
|
|
|
func sumJSONSegSize(segs []LogSegmentJSON) (sum int64) {
|
|
for _, seg := range segs {
|
|
sum += seg.Size
|
|
}
|
|
return
|
|
}
|
|
|
|
// sumCommonPrefixSize computes the size of the longest common prefix of file segments a and b
|
|
// that can be found quickly by checking for matching checksums between segment boundaries.
|
|
func (ns *netMutSource) sumCommonPrefixSize(a, b []fileSeg) (sum int64) {
|
|
for len(a) > 0 && len(b) > 0 {
|
|
sa, sb := a[0], b[0]
|
|
if sa.sha224 == sb.sha224 {
|
|
// Whole chunk in common.
|
|
sum += sa.size
|
|
a, b = a[1:], b[1:]
|
|
continue
|
|
}
|
|
if sa.size == sb.size {
|
|
// If they're the same size but different
|
|
// sums, it must've forked.
|
|
return
|
|
}
|
|
// See if one chunk is a prefix of the other.
|
|
// Make sa be the smaller one.
|
|
if sb.size < sa.size {
|
|
sa, sb = sb, sa
|
|
}
|
|
// Hash the beginning of the bigger size.
|
|
bPrefixSum := ns.filePrefixSum224(sb.file, sa.size)
|
|
if bPrefixSum == sa.sha224 {
|
|
sum += sa.size
|
|
}
|
|
break
|
|
}
|
|
return
|
|
}
|
|
|
|
// fetchAndSendMutations fetches new mutations from the network mutation source
|
|
// and sends them to ch.
|
|
func (ns *netMutSource) fetchAndSendMutations(ctx context.Context, ch chan<- MutationStreamEvent) error {
|
|
newSegs, err := ns.getNewSegments(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return foreachFileSeg(newSegs, func(seg fileSeg) error {
|
|
f, err := os.Open(seg.file)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer f.Close()
|
|
if seg.skip > 0 {
|
|
if _, err := f.Seek(seg.skip, io.SeekStart); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return reclog.ForeachRecord(io.LimitReader(f, seg.size-seg.skip), seg.skip, func(off int64, hdr, rec []byte) error {
|
|
m := new(maintpb.Mutation)
|
|
if err := proto.Unmarshal(rec, m); err != nil {
|
|
return err
|
|
}
|
|
select {
|
|
case ch <- MutationStreamEvent{Mutation: m}:
|
|
return nil
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
})
|
|
})
|
|
}
|
|
|
|
func foreachFileSeg(segs []fileSeg, fn func(seg fileSeg) error) error {
|
|
for _, seg := range segs {
|
|
if err := fn(seg); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// TODO: add a constructor for this? or simplify it. make it Size +
|
|
// File + embedded LogSegmentJSON?
|
|
type fileSeg struct {
|
|
seg int
|
|
file string // full path
|
|
sha224 string
|
|
skip int64
|
|
size int64
|
|
}
|
|
|
|
// syncSeg syncs the provided log segment, returning its on-disk metadata.
|
|
// The newData result is the new data that was added to the segment in this sync.
|
|
//
|
|
// syncSeg returns an error that matches fetchError with PossiblyRetryable set
|
|
// to true when it has signal that repeating the same call after some time may
|
|
// succeed.
|
|
func (ns *netMutSource) syncSeg(ctx context.Context, seg LogSegmentJSON) (_ fileSeg, newData []byte, _ error) {
|
|
if fn := ns.testHookSyncSeg; fn != nil {
|
|
return fn(ctx, seg)
|
|
}
|
|
|
|
isFinalSeg := !strings.HasPrefix(seg.URL, "https://storage.googleapis.com/")
|
|
relURL, err := url.Parse(seg.URL)
|
|
if err != nil {
|
|
return fileSeg{}, nil, err
|
|
}
|
|
segURL := ns.base.ResolveReference(relURL)
|
|
|
|
frozen := filepath.Join(ns.cacheDir, fmt.Sprintf("%04d.%s.mutlog", seg.Number, seg.SHA224))
|
|
|
|
// Do we already have it? Files named in their final form with the sha224 are considered
|
|
// complete and immutable.
|
|
if fi, err := os.Stat(frozen); err == nil && fi.Size() == seg.Size {
|
|
return fileSeg{seg: seg.Number, file: frozen, size: fi.Size(), sha224: seg.SHA224}, nil, nil
|
|
}
|
|
|
|
// See how much data we already have in the partial growing file.
|
|
partial := filepath.Join(ns.cacheDir, fmt.Sprintf("%04d.growing.mutlog", seg.Number))
|
|
have, _ := robustio.ReadFile(partial)
|
|
if int64(len(have)) == seg.Size {
|
|
got224 := fmt.Sprintf("%x", sha256.Sum224(have))
|
|
if got224 == seg.SHA224 {
|
|
if !isFinalSeg {
|
|
// This was growing for us, but the server started a new growing segment.
|
|
if err := robustio.Rename(partial, frozen); err != nil {
|
|
return fileSeg{}, nil, err
|
|
}
|
|
return fileSeg{seg: seg.Number, file: frozen, sha224: seg.SHA224, size: seg.Size}, nil, nil
|
|
}
|
|
return fileSeg{seg: seg.Number, file: partial, sha224: seg.SHA224, size: seg.Size}, nil, nil
|
|
}
|
|
}
|
|
|
|
// Otherwise, download new data.
|
|
if int64(len(have)) < seg.Size {
|
|
req, err := http.NewRequestWithContext(ctx, "GET", segURL.String(), nil)
|
|
if err != nil {
|
|
return fileSeg{}, nil, err
|
|
}
|
|
req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", len(have), seg.Size-1))
|
|
|
|
if !ns.quiet {
|
|
log.Printf("Downloading %d bytes of %s ...", seg.Size-int64(len(have)), segURL)
|
|
}
|
|
res, err := http.DefaultClient.Do(req)
|
|
if err != nil {
|
|
return fileSeg{}, nil, fetchError{Err: err, PossiblyRetryable: true}
|
|
}
|
|
defer res.Body.Close()
|
|
if res.StatusCode/100 == 5 {
|
|
// Consider a 5xx server response to possibly succeed later.
|
|
return fileSeg{}, nil, fetchError{Err: fmt.Errorf("%s: %s", segURL.String(), res.Status), PossiblyRetryable: true}
|
|
} else if res.StatusCode != http.StatusOK && res.StatusCode != http.StatusPartialContent {
|
|
return fileSeg{}, nil, fmt.Errorf("%s: %s", segURL.String(), res.Status)
|
|
}
|
|
newData, err = io.ReadAll(res.Body)
|
|
res.Body.Close()
|
|
if err != nil {
|
|
return fileSeg{}, nil, fetchError{Err: err, PossiblyRetryable: true}
|
|
}
|
|
}
|
|
|
|
// Commit to disk.
|
|
var newContents []byte
|
|
if int64(len(newData)) == seg.Size {
|
|
newContents = newData
|
|
} else if int64(len(have)+len(newData)) == seg.Size {
|
|
newContents = append(have, newData...)
|
|
} else if int64(len(have)) > seg.Size {
|
|
// We have more data than the server; likely because it restarted with uncommitted
|
|
// transactions, and so we're headed towards an ErrSplit. Reuse the longest common
|
|
// prefix as long as its checksum matches.
|
|
newContents = have[:seg.Size]
|
|
}
|
|
got224 := fmt.Sprintf("%x", sha256.Sum224(newContents))
|
|
if got224 != seg.SHA224 {
|
|
if len(have) == 0 {
|
|
return fileSeg{}, nil, errors.New("corrupt download")
|
|
}
|
|
// Try again.
|
|
os.Remove(partial)
|
|
return ns.syncSeg(ctx, seg)
|
|
}
|
|
// TODO: this is a quadratic amount of write I/O as the 16 MB
|
|
// segment grows. Switch to appending to the existing file,
|
|
// then perhaps encoding the desired file size into the
|
|
// filename suffix (instead of just *.growing.mutlog) so
|
|
// concurrent readers know where to stop.
|
|
tf, err := os.CreateTemp(ns.cacheDir, "tempseg")
|
|
if err != nil {
|
|
return fileSeg{}, nil, err
|
|
}
|
|
if _, err := tf.Write(newContents); err != nil {
|
|
return fileSeg{}, nil, err
|
|
}
|
|
if err := tf.Close(); err != nil {
|
|
return fileSeg{}, nil, err
|
|
}
|
|
finalName := partial
|
|
if !isFinalSeg {
|
|
finalName = frozen
|
|
}
|
|
if err := robustio.Rename(tf.Name(), finalName); err != nil {
|
|
return fileSeg{}, nil, err
|
|
}
|
|
if !ns.quiet {
|
|
log.Printf("wrote %v", finalName)
|
|
}
|
|
return fileSeg{seg: seg.Number, file: finalName, size: seg.Size, sha224: seg.SHA224}, newData, nil
|
|
}
|
|
|
|
type LogSegmentJSON struct {
|
|
Number int `json:"number"`
|
|
Size int64 `json:"size"`
|
|
SHA224 string `json:"sha224"`
|
|
URL string `json:"url"`
|
|
}
|
|
|
|
// fetchError records an error during a fetch operation over an unreliable network.
|
|
type fetchError struct {
|
|
Err error // Non-nil.
|
|
|
|
// PossiblyRetryable indicates whether Err is believed to be possibly caused by a
|
|
// non-terminal network error, such that the caller can expect it may not happen
|
|
// again if it simply tries the same fetch operation again after waiting a bit.
|
|
PossiblyRetryable bool
|
|
}
|
|
|
|
func (e fetchError) Error() string { return e.Err.Error() }
|
|
func (e fetchError) Unwrap() error { return e.Err }
|