Add support for OpenTelemetry tracing of certain
important functions, using the event package.

We don't have a burning need for traces, but they
are nice to have, and this was an important exercise
for validating the approach of the event package.

Change-Id: I37d1f56f06f425f3b1eb885877a0d2f5ac85a098
Reviewed-on: https://go-review.googlesource.com/c/vulndb/+/380440
Trust: Jonathan Amsterdam <jba@google.com>
Run-TryBot: Jonathan Amsterdam <jba@google.com>
Reviewed-by: Julie Qiu <julie@golang.org>
This commit is contained in:
Jonathan Amsterdam 2022-01-08 15:53:43 -05:00
Родитель fa811db071
Коммит 821b6d568a
11 изменённых файлов: 104 добавлений и 20 удалений

Просмотреть файл

@ -61,10 +61,8 @@ check_headers() {
# check_unparam runs unparam on source files.
check_unparam() {
if [[ $(go version) = *go1.17* ]]; then
ensure_go_binary mvdan.cc/unparam
runcmd unparam ./...
fi
ensure_go_binary mvdan.cc/unparam
runcmd unparam ./...
}
# check_vet runs go vet on source files.

Просмотреть файл

@ -21,6 +21,7 @@ import (
"text/tabwriter"
"time"
"golang.org/x/exp/event"
"golang.org/x/vulndb/internal/cvelistrepo"
"golang.org/x/vulndb/internal/gitrepo"
"golang.org/x/vulndb/internal/issues"
@ -85,7 +86,8 @@ func main() {
dieWithUsage("%v", err)
}
ctx := log.WithLineLogger(context.Background())
ctx := event.WithExporter(context.Background(),
event.NewExporter(log.NewLineHandler(os.Stderr), nil))
if img := os.Getenv("DOCKER_IMAGE"); img != "" {
log.Infof(ctx, "running in docker image %s", img)
}

10
go.mod
Просмотреть файл

@ -10,6 +10,8 @@ require (
require (
cloud.google.com/go/errorreporting v0.1.0
cloud.google.com/go/firestore v1.6.1
github.com/GoogleCloudPlatform/opentelemetry-operations-go v1.0.0
github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace v1.0.0
github.com/client9/misspell v0.3.4
github.com/go-git/go-billy/v5 v5.3.1
github.com/go-git/go-git/v5 v5.4.2
@ -17,6 +19,9 @@ require (
github.com/google/go-github/v41 v41.0.0
github.com/google/safehtml v0.0.2
github.com/jba/templatecheck v0.6.0
go.opentelemetry.io/otel v1.3.0
go.opentelemetry.io/otel/sdk v1.3.0
go.opentelemetry.io/otel/trace v1.3.0
golang.org/x/exp v0.0.0-20220124173137-7a6bfc487013
golang.org/x/exp/vulncheck v0.0.0-20220114162006-9d54fb35363c
golang.org/x/mod v0.6.0-dev.0.20211013180041-c96bc1413d57
@ -33,6 +38,7 @@ require (
require (
cloud.google.com/go v0.97.0 // indirect
cloud.google.com/go/trace v1.0.0 // indirect
github.com/BurntSushi/toml v0.3.1 // indirect
github.com/Microsoft/go-winio v0.4.16 // indirect
github.com/ProtonMail/go-crypto v0.0.0-20210428141323-04723f9f07d7 // indirect
@ -45,7 +51,9 @@ require (
github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0 // indirect
github.com/envoyproxy/protoc-gen-validate v0.1.0 // indirect
github.com/go-git/gcfg v1.5.0 // indirect
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
github.com/go-logr/logr v1.2.2 // indirect
github.com/go-logr/stdr v1.2.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/go-querystring v1.1.0 // indirect
github.com/googleapis/gax-go/v2 v2.1.1 // indirect

Просмотреть файл

@ -16,6 +16,7 @@ import (
"github.com/go-git/go-git/v5/plumbing"
"github.com/go-git/go-git/v5/plumbing/object"
"github.com/go-git/go-git/v5/storage/memory"
"golang.org/x/exp/event"
"golang.org/x/tools/txtar"
"golang.org/x/vulndb/internal/derrors"
"golang.org/x/vulndb/internal/worker/log"
@ -24,6 +25,9 @@ import (
// Clone returns a repo by cloning the repo at repoURL.
func Clone(ctx context.Context, repoURL string) (repo *git.Repository, err error) {
defer derrors.Wrap(&err, "gitrepo.Clone(%q)", repoURL)
ctx = event.Start(ctx, "gitrepo.Clone")
defer event.End(ctx)
log.Infof(ctx, "Cloning repo %q at HEAD", repoURL)
return git.Clone(memory.NewStorage(), nil, &git.CloneOptions{
URL: repoURL,
@ -37,6 +41,9 @@ func Clone(ctx context.Context, repoURL string) (repo *git.Repository, err error
// Open returns a repo by opening the repo at the local path dirpath.
func Open(ctx context.Context, dirpath string) (repo *git.Repository, err error) {
defer derrors.Wrap(&err, "gitrepo.Open(%q)", dirpath)
ctx = event.Start(ctx, "gitrepo.Open")
defer event.End(ctx)
log.Infof(ctx, "Opening repo at %q", dirpath)
repo, err = git.PlainOpen(dirpath)
if err != nil {

Просмотреть файл

@ -7,6 +7,7 @@ package worker
import (
"context"
"golang.org/x/exp/event"
"golang.org/x/vulndb/internal/derrors"
"golang.org/x/vulndb/internal/worker/store"
)
@ -14,6 +15,8 @@ import (
// updateFalsePositives makes sure the store reflects the list of false positives.
func updateFalsePositives(ctx context.Context, st store.Store) (err error) {
defer derrors.Wrap(&err, "updateFalsePositives")
ctx = event.Start(ctx, "updateFalsePositives")
defer event.End(ctx)
for i := 0; i < len(falsePositives); i += maxTransactionWrites {
j := i + maxTransactionWrites

Просмотреть файл

@ -8,18 +8,16 @@ import (
"context"
"fmt"
"io"
"os"
"sync"
"time"
"golang.org/x/exp/event"
)
// WithGCPJSONLogger returns a context which will log events in a format that is
// NewGCPJSONLogger returns a handler which logs events in a format that is
// understood by Google Cloud Platform logging.
func WithGCPJSONLogger(ctx context.Context, traceID string) context.Context {
return event.WithExporter(ctx,
event.NewExporter(&gcpJSONHandler{w: os.Stderr, traceID: traceID}, nil))
func NewGCPJSONHandler(w io.Writer, traceID string) event.Handler {
return &gcpJSONHandler{w: w, traceID: traceID}
}
type gcpJSONHandler struct {

Просмотреть файл

@ -9,7 +9,6 @@ import (
"context"
"fmt"
"io"
"os"
"reflect"
"strings"
"sync"
@ -19,12 +18,13 @@ import (
"golang.org/x/exp/event/severity"
)
func WithLineLogger(ctx context.Context) context.Context {
return event.WithExporter(ctx, event.NewExporter(&lineHandler{w: os.Stderr}, nil))
// NewLineHandler returns an event Handler that writes log events one per line
// in an easy-to-read format:
// time level message label1=value1 label2=value2 ...
func NewLineHandler(w io.Writer) event.Handler {
return &lineHandler{w: w}
}
// lineHandler writes log events one per line in an easy-to-read format:
// time level message label1=value1 label2=value2 ...
type lineHandler struct {
mu sync.Mutex // ensure a log line is not interrupted
w io.Writer

Просмотреть файл

@ -19,6 +19,7 @@ import (
"cloud.google.com/go/errorreporting"
"github.com/google/safehtml/template"
"golang.org/x/exp/event"
"golang.org/x/sync/errgroup"
"golang.org/x/vulndb/internal/cvelistrepo"
"golang.org/x/vulndb/internal/derrors"
@ -26,6 +27,13 @@ import (
"golang.org/x/vulndb/internal/issues"
"golang.org/x/vulndb/internal/worker/log"
"golang.org/x/vulndb/internal/worker/store"
texporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace"
gcppropagator "github.com/GoogleCloudPlatform/opentelemetry-operations-go/propagator"
"go.opentelemetry.io/otel/propagation"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
otrace "go.opentelemetry.io/otel/trace"
eotel "golang.org/x/exp/event/otel"
)
const pkgsiteURL = "https://pkg.go.dev"
@ -36,6 +44,9 @@ type Server struct {
cfg Config
indexTemplate *template.Template
issueClient issues.Client
traceHandler event.Handler
propagator propagation.TextMapPropagator
afterRequest func()
}
func NewServer(ctx context.Context, cfg Config) (_ *Server, err error) {
@ -43,6 +54,20 @@ func NewServer(ctx context.Context, cfg Config) (_ *Server, err error) {
s := &Server{cfg: cfg}
tracerProvider, err := initOpenTelemetry(cfg.Project)
if err != nil {
return nil, err
}
s.traceHandler = eotel.NewTraceHandler(tracerProvider.Tracer("vulndb-worker"))
s.afterRequest = func() { tracerProvider.ForceFlush(ctx) }
// The propagator extracts incoming trace IDs so that we can connect our trace spans
// to the incoming ones constructed by Cloud Run.
s.propagator = propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{},
propagation.Baggage{},
gcppropagator.New(),
)
if cfg.UseErrorReporting {
reportingClient, err := errorreporting.NewClient(ctx, cfg.Project, errorreporting.Config{
ServiceName: serviceID,
@ -89,9 +114,16 @@ func NewServer(ctx context.Context, cfg Config) (_ *Server, err error) {
func (s *Server) handle(_ context.Context, pattern string, handler func(w http.ResponseWriter, r *http.Request) error) {
http.HandleFunc(pattern, func(w http.ResponseWriter, r *http.Request) {
log.Debugf(r.Context(), "#### SpanContext: %+v\n", otrace.SpanContextFromContext(r.Context()))
start := time.Now()
defer s.afterRequest()
traceID := r.Header.Get("X-Cloud-Trace-Context")
ctx := log.WithGCPJSONLogger(r.Context(), traceID)
exporter := event.NewExporter(eventHandlers{
log.NewGCPJSONHandler(os.Stderr, traceID),
s.traceHandler,
}, nil)
ctx := event.WithExporter(r.Context(), exporter)
ctx = s.propagator.Extract(ctx, propagation.HeaderCarrier(r.Header))
r = r.WithContext(ctx)
log.With("httpRequest", r).Infof(ctx, "starting %s", r.URL.Path)
@ -314,3 +346,27 @@ func (s *Server) handleUpdateAndIssues(w http.ResponseWriter, r *http.Request) e
}
return s.handleIssues(w, r)
}
func initOpenTelemetry(projectID string) (tp *sdktrace.TracerProvider, err error) {
defer derrors.Wrap(&err, "initOpenTelemetry(%q)", projectID)
exporter, err := texporter.New(texporter.WithProjectID(projectID))
if err != nil {
return nil, err
}
tp = sdktrace.NewTracerProvider(
// Enable tracing if there is no incoming request, or if the incoming
// request is sampled.
sdktrace.WithSampler(sdktrace.ParentBased(sdktrace.AlwaysSample())),
sdktrace.WithBatcher(exporter))
return tp, nil
}
type eventHandlers []event.Handler
func (eh eventHandlers) Event(ctx context.Context, ev *event.Event) context.Context {
for _, h := range eh {
ctx = h.Event(ctx, ev)
}
return ctx
}

Просмотреть файл

@ -13,6 +13,7 @@ import (
"github.com/go-git/go-git/v5"
"github.com/go-git/go-git/v5/plumbing/object"
"golang.org/x/exp/event"
"golang.org/x/vulndb/internal/cvelistrepo"
"golang.org/x/vulndb/internal/cveschema"
"golang.org/x/vulndb/internal/derrors"
@ -66,6 +67,8 @@ func (u *updater) update(ctx context.Context) (ur *store.CommitUpdateRecord, err
// transaction can do, so the CVE files in the repo are processed in
// batches, one transaction per batch.
defer derrors.Wrap(&err, "updater.update(%s)", u.commit.Hash)
ctx = event.Start(ctx, "updater.update")
defer event.End(ctx)
defer func() {
if err != nil {
@ -122,7 +125,7 @@ func (u *updater) update(ctx context.Context) (ur *store.CommitUpdateRecord, err
}
if stats.skipped {
skippedDirs = append(skippedDirs, dirFiles[0].DirPath)
if len(skippedDirs) > logSkippedEvery {
if len(skippedDirs) >= logSkippedEvery {
log.Infof(ctx, "skipping directory %s and %d others because the hashes match",
skippedDirs[0], len(skippedDirs)-1)
skippedDirs = nil

Просмотреть файл

@ -17,6 +17,7 @@ import (
"github.com/go-git/go-git/v5/plumbing"
"github.com/go-git/go-git/v5/plumbing/object"
"golang.org/x/exp/event"
"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"
vulnc "golang.org/x/vuln/client"
@ -78,6 +79,9 @@ func UpdateCommit(ctx context.Context, repoPath, commitHashString string, st sto
// It verifies that there is not an update currently in progress,
// and it makes sure that the update is to a more recent commit.
func checkUpdate(ctx context.Context, commit *object.Commit, st store.Store) error {
ctx = event.Start(ctx, "checkUpdate")
defer event.End(ctx)
urs, err := st.ListCommitUpdateRecords(ctx, 1)
if err != nil {
return err
@ -171,7 +175,9 @@ const issueQPS = 1
var issueRateLimiter = rate.NewLimiter(rate.Every(time.Duration(1000/float64(issueQPS))*time.Millisecond), 1)
func CreateIssues(ctx context.Context, st store.Store, ic issues.Client, limit int) (err error) {
derrors.Wrap(&err, "CreateIssues(destination: %s)", ic.Destination())
defer derrors.Wrap(&err, "CreateIssues(destination: %s)", ic.Destination())
ctx = event.Start(ctx, "CreateIssues")
defer event.End(ctx)
needsIssue, err := st.ListCVERecordsWithTriageState(ctx, store.TriageStateNeedsIssue)
if err != nil {

Просмотреть файл

@ -10,12 +10,14 @@ package worker
import (
"context"
"math"
"os"
"strings"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"golang.org/x/exp/event"
"golang.org/x/vulndb/internal/cveschema"
"golang.org/x/vulndb/internal/gitrepo"
"golang.org/x/vulndb/internal/issues"
@ -95,7 +97,8 @@ func TestCheckUpdate(t *testing.T) {
}
func TestCreateIssues(t *testing.T) {
ctx := log.WithLineLogger(context.Background())
ctx := event.WithExporter(context.Background(),
event.NewExporter(log.NewLineHandler(os.Stderr), nil))
mstore := store.NewMemStore()
ic := issues.NewFakeClient()
ctime := time.Date(2020, 1, 2, 0, 0, 0, 0, time.UTC)