368 строки
11 KiB
Go
368 строки
11 KiB
Go
// Copyright 2023 The Go Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style
|
|
// license that can be found in the LICENSE file.
|
|
|
|
//go:build go1.22
|
|
|
|
package trace
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/binary"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"math/bits"
|
|
"runtime/trace"
|
|
"slices"
|
|
"sync"
|
|
"time"
|
|
_ "unsafe" // for go:linkname
|
|
|
|
"golang.org/x/exp/trace/internal/event/go122"
|
|
)
|
|
|
|
// FlightRecorder represents a flight recording configuration.
|
|
//
|
|
// Flight recording holds execution trace data in a circular buffer representing
|
|
// the most recent execution data.
|
|
//
|
|
// Only one flight recording may be active at any given time.
|
|
type FlightRecorder struct {
|
|
err error
|
|
|
|
// State specific to the recorder.
|
|
header [16]byte
|
|
active rawGeneration
|
|
ringMu sync.Mutex
|
|
ring []rawGeneration
|
|
|
|
// Externally-set options.
|
|
targetSize int
|
|
targetPeriod time.Duration
|
|
|
|
enabled bool // whether the flight recorder is enabled.
|
|
writing sync.Mutex // protects concurrent calls to WriteTo
|
|
|
|
// The values of targetSize and targetPeriod we've committed to since the last Start.
|
|
wantSize int
|
|
wantDur time.Duration
|
|
}
|
|
|
|
// NewFlightRecorder creates a new flight recording configuration.
|
|
func NewFlightRecorder() *FlightRecorder {
|
|
return &FlightRecorder{
|
|
// These are just some optimistic, reasonable defaults.
|
|
//
|
|
// In reality we're also bound by whatever the runtime defaults are, because
|
|
// we currently have no way to change them.
|
|
//
|
|
// TODO(mknyszek): Consider adding a function that allows mutating one or
|
|
// both of these values' equivalents in the runtime.
|
|
targetSize: 10 << 20, // 10 MiB.
|
|
targetPeriod: 10 * time.Second,
|
|
}
|
|
}
|
|
|
|
// SetPeriod sets the approximate time duration that the flight recorder's circular buffer
|
|
// represents.
|
|
//
|
|
// Note that SetPeriod does not make any guarantees on the amount of time the trace
|
|
// produced by WriteTo will represent.
|
|
// This is just a hint to the runtime to enable some control the resulting trace.
|
|
//
|
|
// The initial period is implementation defined, but can be assumed to be on the order
|
|
// of seconds.
|
|
//
|
|
// Adjustments to this value will not apply to an active flight recorder, and will not apply
|
|
// if tracing is already enabled via trace.Start. All tracing must be stopped and started
|
|
// again to change this value.
|
|
func (r *FlightRecorder) SetPeriod(d time.Duration) {
|
|
r.targetPeriod = d
|
|
}
|
|
|
|
// SetSize sets the approximate size of the flight recorder's circular buffer.
|
|
//
|
|
// This generally takes precedence over the duration passed to SetPeriod.
|
|
// However, it does not make any guarantees on the size of the data WriteTo will write.
|
|
// This is just a hint to the runtime to enable some control over the memory overheads
|
|
// of tracing.
|
|
//
|
|
// The initial size is implementation defined.
|
|
//
|
|
// Adjustments to this value will not apply to an active flight recorder, and will not apply
|
|
// if tracing is already enabled via trace.Start. All tracing must be stopped and started
|
|
// again to change this value.
|
|
func (r *FlightRecorder) SetSize(bytes int) {
|
|
r.targetSize = bytes
|
|
}
|
|
|
|
// A recorder receives bytes from the runtime tracer, processes it.
|
|
type recorder struct {
|
|
r *FlightRecorder
|
|
|
|
headerReceived bool
|
|
}
|
|
|
|
func (w *recorder) Write(p []byte) (n int, err error) {
|
|
r := w.r
|
|
|
|
defer func() {
|
|
if err != nil {
|
|
// Propagate errors to the flightrecorder.
|
|
if r.err == nil {
|
|
r.err = err
|
|
}
|
|
trace.Stop() // Stop the tracer, preventing further writes.
|
|
}
|
|
}()
|
|
|
|
rd := bytes.NewReader(p)
|
|
|
|
if !w.headerReceived {
|
|
if len(p) < len(r.header) {
|
|
return 0, fmt.Errorf("expected at least %d bytes in the first write", len(r.header))
|
|
}
|
|
rd.Read(r.header[:])
|
|
w.headerReceived = true
|
|
}
|
|
|
|
b, gen, err := readBatch(rd) // Every write from the runtime is guaranteed to be a complete batch.
|
|
if err == io.EOF {
|
|
if rd.Len() > 0 {
|
|
return len(p) - rd.Len(), errors.New("short read")
|
|
}
|
|
return len(p), nil
|
|
}
|
|
if err != nil {
|
|
return len(p) - rd.Len(), err
|
|
}
|
|
|
|
// Check if we're entering a new generation.
|
|
if r.active.gen != 0 && r.active.gen+1 == gen {
|
|
r.ringMu.Lock()
|
|
|
|
// Validate r.active.freq before we use it. It's required for a generation
|
|
// to not be considered broken, and without it, we can't correctly handle
|
|
// SetPeriod.
|
|
if r.active.freq == 0 {
|
|
return len(p) - rd.Len(), fmt.Errorf("broken trace: failed to find frequency event in generation %d", r.active.gen)
|
|
}
|
|
|
|
// Get the current trace clock time.
|
|
now := traceTimeNow(r.active.freq)
|
|
|
|
// Add the current generation to the ring. Make sure we always have at least one
|
|
// complete generation by putting the active generation onto the new list, regardless
|
|
// of whatever our settings are.
|
|
//
|
|
// N.B. Let's completely replace the ring here, so that WriteTo can just make a copy
|
|
// and not worry about aliasing. This creates allocations, but at a very low rate.
|
|
newRing := []rawGeneration{r.active}
|
|
size := r.active.size
|
|
for i := len(r.ring) - 1; i >= 0; i-- {
|
|
// Stop adding older generations if the new ring already exceeds the thresholds.
|
|
// This ensures we keep generations that cross a threshold, but not any that lie
|
|
// entirely outside it.
|
|
if size > r.wantSize || now.Sub(newRing[len(newRing)-1].minTraceTime()) > r.wantDur {
|
|
break
|
|
}
|
|
size += r.ring[i].size
|
|
newRing = append(newRing, r.ring[i])
|
|
}
|
|
slices.Reverse(newRing)
|
|
r.ring = newRing
|
|
r.ringMu.Unlock()
|
|
|
|
// Start a new active generation.
|
|
r.active = rawGeneration{}
|
|
}
|
|
|
|
// Obtain the frequency if this is a frequency batch.
|
|
if b.isFreqBatch() {
|
|
freq, err := parseFreq(b)
|
|
if err != nil {
|
|
return len(p) - rd.Len(), err
|
|
}
|
|
r.active.freq = freq
|
|
}
|
|
|
|
// Append the batch to the current generation.
|
|
if r.active.gen == 0 {
|
|
r.active.gen = gen
|
|
}
|
|
if r.active.minTime == 0 || r.active.minTime > b.time {
|
|
r.active.minTime = b.time
|
|
}
|
|
r.active.size += 1
|
|
r.active.size += uvarintSize(gen)
|
|
r.active.size += uvarintSize(uint64(b.m))
|
|
r.active.size += uvarintSize(uint64(b.time))
|
|
r.active.size += uvarintSize(uint64(len(b.data)))
|
|
r.active.size += len(b.data)
|
|
r.active.batches = append(r.active.batches, b)
|
|
|
|
return len(p) - rd.Len(), nil
|
|
}
|
|
|
|
// Start begins flight recording. Only one flight recorder or one call to [runtime/trace.Start]
|
|
// may be active at any given time. Returns an error if starting the flight recorder would
|
|
// violate this rule.
|
|
func (r *FlightRecorder) Start() error {
|
|
if r.enabled {
|
|
return fmt.Errorf("cannot enable a enabled flight recorder")
|
|
}
|
|
|
|
r.wantSize = r.targetSize
|
|
r.wantDur = r.targetPeriod
|
|
r.err = nil
|
|
|
|
// Start tracing, data is sent to a recorder which forwards it to our own
|
|
// storage.
|
|
if err := trace.Start(&recorder{r: r}); err != nil {
|
|
return err
|
|
}
|
|
|
|
r.enabled = true
|
|
return nil
|
|
}
|
|
|
|
// Stop ends flight recording. It waits until any concurrent [FlightRecorder.WriteTo] calls exit.
|
|
// Returns an error if the flight recorder is inactive.
|
|
func (r *FlightRecorder) Stop() error {
|
|
if !r.enabled {
|
|
return fmt.Errorf("cannot disable a disabled flight recorder")
|
|
}
|
|
r.enabled = false
|
|
trace.Stop()
|
|
|
|
// Reset all state. No need to lock because the reader has already exited.
|
|
r.active = rawGeneration{}
|
|
r.ring = nil
|
|
return r.err
|
|
}
|
|
|
|
// Enabled returns true if the flight recorder is active. Specifically, it will return true if
|
|
// Start did not return an error, and Stop has not yet been called.
|
|
// It is safe to call from multiple goroutines simultaneously.
|
|
func (r *FlightRecorder) Enabled() bool {
|
|
return r.enabled
|
|
}
|
|
|
|
// ErrSnapshotActive indicates that a call to WriteTo was made while one was already in progress.
|
|
// If the caller of WriteTo sees this error, they should use the result from the other call to WriteTo.
|
|
var ErrSnapshotActive = fmt.Errorf("call to WriteTo for trace.FlightRecorder already in progress")
|
|
|
|
// WriteTo takes a snapshots of the circular buffer's contents and writes the execution data to w.
|
|
// Returns the number of bytes written and an error.
|
|
// An error is returned upon failure to write to w or if the flight recorder is inactive.
|
|
// Only one goroutine may execute WriteTo at a time, but it is safe to call from multiple goroutines.
|
|
// If a goroutine calls WriteTo while another goroutine is currently executing it, WriteTo will return
|
|
// ErrSnapshotActive to that goroutine.
|
|
func (r *FlightRecorder) WriteTo(w io.Writer) (total int, err error) {
|
|
if !r.enabled {
|
|
return 0, fmt.Errorf("cannot snapshot a disabled flight recorder")
|
|
}
|
|
if !r.writing.TryLock() {
|
|
return 0, ErrSnapshotActive
|
|
}
|
|
defer r.writing.Unlock()
|
|
|
|
// Force a global buffer flush twice.
|
|
//
|
|
// This is pretty unfortunate, but because the signal that a generation is done is that a new
|
|
// generation appears in the trace *or* the trace stream ends, the recorder goroutine will
|
|
// have no idea when to add a generation to the ring if we just flush once. If we flush twice,
|
|
// at least the first one will end up on the ring, which is the one we wanted anyway.
|
|
//
|
|
// In a runtime-internal implementation this is a non-issue. The runtime is fully aware
|
|
// of what generations are complete, so only one flush is necessary.
|
|
runtime_traceAdvance(false)
|
|
runtime_traceAdvance(false)
|
|
|
|
// Now that everything has been flushed and written, grab whatever we have.
|
|
//
|
|
// N.B. traceAdvance blocks until the tracer goroutine has actually written everything
|
|
// out, which means the generation we just flushed must have been already been observed
|
|
// by the recorder goroutine. Because we flushed twice, the first flush is guaranteed to
|
|
// have been both completed *and* processed by the recorder goroutine.
|
|
r.ringMu.Lock()
|
|
gens := r.ring
|
|
r.ringMu.Unlock()
|
|
|
|
// Write the header.
|
|
total, err = w.Write(r.header[:])
|
|
if err != nil {
|
|
return total, err
|
|
}
|
|
|
|
// Helper for writing varints.
|
|
var varintBuf [binary.MaxVarintLen64]byte
|
|
writeUvarint := func(u uint64) error {
|
|
v := binary.PutUvarint(varintBuf[:], u)
|
|
n, err := w.Write(varintBuf[:v])
|
|
total += n
|
|
return err
|
|
}
|
|
|
|
// Write all the data.
|
|
for _, gen := range gens {
|
|
for _, batch := range gen.batches {
|
|
// Rewrite the batch header event with four arguments: gen, M ID, timestamp, and data length.
|
|
n, err := w.Write([]byte{byte(go122.EvEventBatch)})
|
|
total += n
|
|
if err != nil {
|
|
return total, err
|
|
}
|
|
if err := writeUvarint(gen.gen); err != nil {
|
|
return total, err
|
|
}
|
|
if err := writeUvarint(uint64(batch.m)); err != nil {
|
|
return total, err
|
|
}
|
|
if err := writeUvarint(uint64(batch.time)); err != nil {
|
|
return total, err
|
|
}
|
|
if err := writeUvarint(uint64(len(batch.data))); err != nil {
|
|
return total, err
|
|
}
|
|
|
|
// Write batch data.
|
|
n, err = w.Write(batch.data)
|
|
total += n
|
|
if err != nil {
|
|
return total, err
|
|
}
|
|
}
|
|
}
|
|
return total, nil
|
|
}
|
|
|
|
type rawGeneration struct {
|
|
gen uint64
|
|
size int
|
|
minTime timestamp
|
|
freq frequency
|
|
batches []batch
|
|
}
|
|
|
|
func (r *rawGeneration) minTraceTime() Time {
|
|
return r.freq.mul(r.minTime)
|
|
}
|
|
|
|
func traceTimeNow(freq frequency) Time {
|
|
// TODO(mknyszek): It's unfortunate that we have to rely on runtime-internal details
|
|
// like this. This would be better off in the runtime.
|
|
return freq.mul(timestamp(runtime_traceClockNow()))
|
|
}
|
|
|
|
func uvarintSize(x uint64) int {
|
|
return 1 + bits.Len64(x)/7
|
|
}
|
|
|
|
//go:linkname runtime_traceAdvance runtime.traceAdvance
|
|
func runtime_traceAdvance(stopTrace bool)
|
|
|
|
//go:linkname runtime_traceClockNow runtime.traceClockNow
|
|
func runtime_traceClockNow() int64
|