зеркало из https://github.com/github/vitess-gh.git
Change the internal tracing API
The old API had an issue with contexts not being passed around correctly. This new API should make it easier to remember to do it correctly. Signed-off-by: Andres Taylor <antaylor@squareup.com>
This commit is contained in:
Родитель
b4cd56f9f4
Коммит
4392122102
|
@ -75,12 +75,4 @@ func (jf OpenTracingFactory) NewContext(parent context.Context, s Span) context.
|
|||
type nilCloser struct {
|
||||
}
|
||||
|
||||
func (c *nilCloser) Close() error { return nil }
|
||||
|
||||
type SpanType int
|
||||
|
||||
const (
|
||||
Local SpanType = iota
|
||||
Client
|
||||
Server
|
||||
)
|
||||
func (c *nilCloser) Close() error { return nil }
|
|
@ -28,26 +28,28 @@ import (
|
|||
// represented by this Span. Call Finish() when that work is done to record the
|
||||
// Span. A Span may be reused by calling Start again.
|
||||
type Span interface {
|
||||
// StartLocal marks the beginning of a span representing time spent doing
|
||||
// work locally.
|
||||
StartLocal(label string)
|
||||
// StartClient marks the beginning of a span representing time spent acting as
|
||||
// a client and waiting for a response.
|
||||
StartClient(label string)
|
||||
// StartServer marks the beginning of a span representing time spent doing
|
||||
// work in service of a remote client request.
|
||||
StartServer(label string)
|
||||
// Finish marks the span as complete.
|
||||
Finish()
|
||||
// Annotate records a key/value pair associated with a Span. It should be
|
||||
// called between Start and Finish.
|
||||
Annotate(key string, value interface{})
|
||||
}
|
||||
|
||||
type SpanType int
|
||||
|
||||
const (
|
||||
Local SpanType = iota
|
||||
Client
|
||||
Server
|
||||
)
|
||||
|
||||
// NewSpan creates a new Span with the currently installed tracing plugin.
|
||||
// If no tracing plugin is installed, it returns a fake Span that does nothing.
|
||||
func NewSpan(parent Span) Span {
|
||||
return spanFactory.New(parent)
|
||||
func NewSpan(inCtx context.Context, label string, spanType SpanType) (Span, context.Context) {
|
||||
parent, _ := spanFactory.FromContext(inCtx)
|
||||
span := spanFactory.New(parent, label, spanType)
|
||||
outCtx := spanFactory.NewContext(inCtx, span)
|
||||
|
||||
return span, outCtx
|
||||
}
|
||||
|
||||
// FromContext returns the Span from a Context if present. The bool return
|
||||
|
@ -61,15 +63,6 @@ func NewContext(parent context.Context, span Span) context.Context {
|
|||
return spanFactory.NewContext(parent, span)
|
||||
}
|
||||
|
||||
// NewSpanFromContext returns a new Span whose parent is the Span from the given
|
||||
// Context if present, or a new Span with no parent if not.
|
||||
func NewSpanFromContext(ctx context.Context) Span {
|
||||
if parent, ok := FromContext(ctx); ok {
|
||||
return NewSpan(parent)
|
||||
}
|
||||
return NewSpan(nil)
|
||||
}
|
||||
|
||||
// CopySpan creates a new context from parentCtx, with only the trace span
|
||||
// copied over from spanCtx, if it has any. If not, parentCtx is returned.
|
||||
func CopySpan(parentCtx, spanCtx context.Context) context.Context {
|
||||
|
@ -81,13 +74,11 @@ func CopySpan(parentCtx, spanCtx context.Context) context.Context {
|
|||
|
||||
// SpanFactory is an interface for creating spans or extracting them from Contexts.
|
||||
type SpanFactory interface {
|
||||
New(parent Span) Span
|
||||
FromContext(ctx context.Context) (Span, bool)
|
||||
NewContext(parent context.Context, span Span) context.Context
|
||||
New(parent Span, label string, spanType SpanType) Span
|
||||
FromContext(ctx context.Context) (Span, bool)
|
||||
NewContext(parent context.Context, span Span) context.Context
|
||||
}
|
||||
|
||||
var spanFactory SpanFactory = fakeSpanFactory{}
|
||||
|
||||
// RegisterSpanFactory should be called by a plugin during init() to install a
|
||||
// factory that creates Spans for that plugin's tracing framework. Each call to
|
||||
// RegisterSpanFactory will overwrite any previous setting. If no factory is
|
||||
|
@ -97,17 +88,16 @@ func RegisterSpanFactory(sf SpanFactory) {
|
|||
spanFactory = sf
|
||||
}
|
||||
|
||||
var spanFactory SpanFactory = fakeSpanFactory{}
|
||||
|
||||
type fakeSpanFactory struct{}
|
||||
|
||||
func (fakeSpanFactory) New(parent Span) Span { return fakeSpan{} }
|
||||
func (fakeSpanFactory) FromContext(ctx context.Context) (Span, bool) { return nil, false }
|
||||
func (fakeSpanFactory) NewContext(parent context.Context, span Span) context.Context { return parent }
|
||||
func (fakeSpanFactory) New(Span, string, SpanType) Span { return fakeSpan{} }
|
||||
func (fakeSpanFactory) FromContext(context.Context) (Span, bool) { return nil, false }
|
||||
func (fakeSpanFactory) NewContext(parent context.Context, _ Span) context.Context { return parent }
|
||||
|
||||
// fakeSpan implements Span with no-op methods.
|
||||
type fakeSpan struct{}
|
||||
|
||||
func (fakeSpan) StartLocal(string) {}
|
||||
func (fakeSpan) StartClient(string) {}
|
||||
func (fakeSpan) StartServer(string) {}
|
||||
func (fakeSpan) Finish() {}
|
||||
func (fakeSpan) Annotate(string, interface{}) {}
|
||||
|
|
|
@ -27,12 +27,14 @@ func TestFakeSpan(t *testing.T) {
|
|||
RegisterSpanFactory(fakeSpanFactory{})
|
||||
|
||||
// It should be safe to call all the usual methods as if a plugin were installed.
|
||||
span := NewSpanFromContext(ctx)
|
||||
span.StartLocal("label")
|
||||
span.StartClient("label")
|
||||
span.StartServer("label")
|
||||
span.Annotate("key", "value")
|
||||
span.Finish()
|
||||
NewContext(ctx, span)
|
||||
CopySpan(ctx, ctx)
|
||||
span1, ctx := NewSpan(ctx, "label", Local)
|
||||
span1.Finish()
|
||||
|
||||
span2, ctx := NewSpan(ctx, "label", Client)
|
||||
span2.Annotate("key", 42)
|
||||
span2.Finish()
|
||||
|
||||
span3, ctx := NewSpan(ctx, "label", Server)
|
||||
span3.Annotate("key", 42)
|
||||
span3.Finish()
|
||||
}
|
||||
|
|
|
@ -220,8 +220,7 @@ func (l *Lock) lockKeyspace(ctx context.Context, ts *Server, keyspace string) (L
|
|||
ctx, cancel := context.WithTimeout(ctx, *RemoteOperationTimeout)
|
||||
defer cancel()
|
||||
|
||||
span := trace.NewSpanFromContext(ctx)
|
||||
span.StartClient("TopoServer.LockKeyspaceForAction")
|
||||
span, ctx := trace.NewSpan(ctx, "TopoServer.LockKeyspaceForAction", trace.Client)
|
||||
span.Annotate("action", l.Action)
|
||||
span.Annotate("keyspace", keyspace)
|
||||
defer span.Finish()
|
||||
|
@ -245,8 +244,7 @@ func (l *Lock) unlockKeyspace(ctx context.Context, ts *Server, keyspace string,
|
|||
ctx, cancel := context.WithTimeout(ctx, defaultLockTimeout)
|
||||
defer cancel()
|
||||
|
||||
span := trace.NewSpanFromContext(ctx)
|
||||
span.StartClient("TopoServer.UnlockKeyspaceForAction")
|
||||
span, ctx := trace.NewSpan(ctx, "TopoServer.UnlockKeyspaceForAction", trace.Client)
|
||||
span.Annotate("action", l.Action)
|
||||
span.Annotate("keyspace", keyspace)
|
||||
defer span.Finish()
|
||||
|
@ -363,8 +361,7 @@ func (l *Lock) lockShard(ctx context.Context, ts *Server, keyspace, shard string
|
|||
ctx, cancel := context.WithTimeout(ctx, *RemoteOperationTimeout)
|
||||
defer cancel()
|
||||
|
||||
span := trace.NewSpanFromContext(ctx)
|
||||
span.StartClient("TopoServer.LockShardForAction")
|
||||
span, ctx := trace.NewSpan(ctx, "TopoServer.LockShardForAction", trace.Client)
|
||||
span.Annotate("action", l.Action)
|
||||
span.Annotate("keyspace", keyspace)
|
||||
span.Annotate("shard", shard)
|
||||
|
@ -388,8 +385,7 @@ func (l *Lock) unlockShard(ctx context.Context, ts *Server, keyspace, shard stri
|
|||
ctx, cancel := context.WithTimeout(ctx, defaultLockTimeout)
|
||||
defer cancel()
|
||||
|
||||
span := trace.NewSpanFromContext(ctx)
|
||||
span.StartClient("TopoServer.UnlockShardForAction")
|
||||
span, ctx := trace.NewSpan(ctx, "TopoServer.UnlockShardForAction", trace.Client)
|
||||
span.Annotate("action", l.Action)
|
||||
span.Annotate("keyspace", keyspace)
|
||||
span.Annotate("shard", shard)
|
||||
|
|
|
@ -78,8 +78,7 @@ func (sri *ShardReplicationInfo) GetShardReplicationNode(tabletAlias *topodatapb
|
|||
// UpdateShardReplicationRecord is a low level function to add / update an
|
||||
// entry to the ShardReplication object.
|
||||
func UpdateShardReplicationRecord(ctx context.Context, ts *Server, keyspace, shard string, tabletAlias *topodatapb.TabletAlias) error {
|
||||
span := trace.NewSpanFromContext(ctx)
|
||||
span.StartClient("TopoServer.UpdateShardReplicationFields")
|
||||
span, ctx := trace.NewSpan(ctx, "TopoServer.UpdateShardReplicationFields", trace.Client)
|
||||
span.Annotate("keyspace", keyspace)
|
||||
span.Annotate("shard", shard)
|
||||
span.Annotate("tablet", topoproto.TabletAliasString(tabletAlias))
|
||||
|
|
|
@ -177,8 +177,7 @@ func (si *ShardInfo) HasMaster() bool {
|
|||
// GetShard is a high level function to read shard data.
|
||||
// It generates trace spans.
|
||||
func (ts *Server) GetShard(ctx context.Context, keyspace, shard string) (*ShardInfo, error) {
|
||||
span := trace.NewSpanFromContext(ctx)
|
||||
span.StartClient("TopoServer.GetShard")
|
||||
span, ctx := trace.NewSpan(ctx, "TopoServer.GetShard", trace.Local)
|
||||
span.Annotate("keyspace", keyspace)
|
||||
span.Annotate("shard", shard)
|
||||
defer span.Finish()
|
||||
|
@ -204,8 +203,7 @@ func (ts *Server) GetShard(ctx context.Context, keyspace, shard string) (*ShardI
|
|||
// updateShard updates the shard data, with the right version.
|
||||
// It also creates a span, and dispatches the event.
|
||||
func (ts *Server) updateShard(ctx context.Context, si *ShardInfo) error {
|
||||
span := trace.NewSpanFromContext(ctx)
|
||||
span.StartClient("TopoServer.UpdateShard")
|
||||
span, ctx := trace.NewSpan(ctx, "TopoServer.UpdateShard", trace.Client)
|
||||
span.Annotate("keyspace", si.keyspace)
|
||||
span.Annotate("shard", si.shardName)
|
||||
defer span.Finish()
|
||||
|
@ -478,8 +476,7 @@ func (ts *Server) FindAllTabletAliasesInShard(ctx context.Context, keyspace, sha
|
|||
//
|
||||
// The tablet aliases are sorted by cell, then by UID.
|
||||
func (ts *Server) FindAllTabletAliasesInShardByCell(ctx context.Context, keyspace, shard string, cells []string) ([]*topodatapb.TabletAlias, error) {
|
||||
span := trace.NewSpanFromContext(ctx)
|
||||
span.StartLocal("topo.FindAllTabletAliasesInShardbyCell")
|
||||
span, ctx := trace.NewSpan(ctx, "topo.FindAllTabletAliasesInShardbyCell", trace.Client)
|
||||
span.Annotate("keyspace", keyspace)
|
||||
span.Annotate("shard", shard)
|
||||
span.Annotate("num_cells", len(cells))
|
||||
|
|
|
@ -222,8 +222,7 @@ func (ts *Server) GetTablet(ctx context.Context, alias *topodatapb.TabletAlias)
|
|||
return nil, err
|
||||
}
|
||||
|
||||
span := trace.NewSpanFromContext(ctx)
|
||||
span.StartClient("TopoServer.GetTablet")
|
||||
span, ctx := trace.NewSpan(ctx, "TopoServer.GetTablet", trace.Client)
|
||||
span.Annotate("tablet", topoproto.TabletAliasString(alias))
|
||||
defer span.Finish()
|
||||
|
||||
|
@ -251,8 +250,7 @@ func (ts *Server) UpdateTablet(ctx context.Context, ti *TabletInfo) error {
|
|||
return err
|
||||
}
|
||||
|
||||
span := trace.NewSpanFromContext(ctx)
|
||||
span.StartClient("TopoServer.UpdateTablet")
|
||||
span, ctx := trace.NewSpan(ctx, "TopoServer.UpdateTablet", trace.Local)
|
||||
span.Annotate("tablet", topoproto.TabletAliasString(ti.Alias))
|
||||
defer span.Finish()
|
||||
|
||||
|
@ -281,8 +279,7 @@ func (ts *Server) UpdateTablet(ctx context.Context, ti *TabletInfo) error {
|
|||
// If the update method returns ErrNoUpdateNeeded, nothing is written,
|
||||
// and nil,nil is returned.
|
||||
func (ts *Server) UpdateTabletFields(ctx context.Context, alias *topodatapb.TabletAlias, update func(*topodatapb.Tablet) error) (*topodatapb.Tablet, error) {
|
||||
span := trace.NewSpanFromContext(ctx)
|
||||
span.StartClient("TopoServer.UpdateTabletFields")
|
||||
span, ctx := trace.NewSpan(ctx, "TopoServer.UpdateTabletFields", trace.Local)
|
||||
span.Annotate("tablet", topoproto.TabletAliasString(alias))
|
||||
defer span.Finish()
|
||||
|
||||
|
@ -405,8 +402,7 @@ func DeleteTabletReplicationData(ctx context.Context, ts *Server, tablet *topoda
|
|||
// incomplete, meaning some tablets couldn't be read.
|
||||
// The map is indexed by topoproto.TabletAliasString(tablet alias).
|
||||
func (ts *Server) GetTabletMap(ctx context.Context, tabletAliases []*topodatapb.TabletAlias) (map[string]*TabletInfo, error) {
|
||||
span := trace.NewSpanFromContext(ctx)
|
||||
span.StartLocal("topo.GetTabletMap")
|
||||
span, ctx := trace.NewSpan(ctx, "topo.GetTabletMap", trace.Local)
|
||||
span.Annotate("num_tablets", len(tabletAliases))
|
||||
defer span.Finish()
|
||||
|
||||
|
|
|
@ -130,8 +130,7 @@ func (agent *ActionAgent) refreshTablet(ctx context.Context, reason string) erro
|
|||
agent.checkLock()
|
||||
log.Infof("Executing post-action state refresh: %v", reason)
|
||||
|
||||
span := trace.NewSpanFromContext(ctx)
|
||||
span.StartLocal("ActionAgent.refreshTablet")
|
||||
span, ctx := trace.NewSpan(ctx, "ActionAgent.refreshTablet", trace.Local)
|
||||
span.Annotate("reason", reason)
|
||||
defer span.Finish()
|
||||
ctx = trace.NewContext(ctx, span)
|
||||
|
@ -191,8 +190,7 @@ func (agent *ActionAgent) updateState(ctx context.Context, newTablet *topodatapb
|
|||
func (agent *ActionAgent) changeCallback(ctx context.Context, oldTablet, newTablet *topodatapb.Tablet) {
|
||||
agent.checkLock()
|
||||
|
||||
span := trace.NewSpanFromContext(ctx)
|
||||
span.StartLocal("ActionAgent.changeCallback")
|
||||
span, ctx := trace.NewSpan(ctx, "ActionAgent.changeCallback", trace.Local)
|
||||
defer span.Finish()
|
||||
|
||||
allowQuery := topo.IsRunningQueryService(newTablet.Type)
|
||||
|
|
|
@ -94,8 +94,7 @@ func NewDBConnNoPool(params *mysql.ConnParams, dbaPool *dbconnpool.ConnectionPoo
|
|||
// Exec executes the specified query. If there is a connection error, it will reconnect
|
||||
// and retry. A failed reconnect will trigger a CheckMySQL.
|
||||
func (dbc *DBConn) Exec(ctx context.Context, query string, maxrows int, wantfields bool) (*sqltypes.Result, error) {
|
||||
span := trace.NewSpanFromContext(ctx)
|
||||
span.StartClient("DBConn.Exec")
|
||||
span, ctx := trace.NewSpan(ctx, "DBConn.Exec", trace.Client)
|
||||
defer span.Finish()
|
||||
|
||||
for attempt := 1; attempt <= 2; attempt++ {
|
||||
|
@ -161,8 +160,7 @@ func (dbc *DBConn) ExecOnce(ctx context.Context, query string, maxrows int, want
|
|||
|
||||
// Stream executes the query and streams the results.
|
||||
func (dbc *DBConn) Stream(ctx context.Context, query string, callback func(*sqltypes.Result) error, streamBufferSize int, includedFields querypb.ExecuteOptions_IncludedFields) error {
|
||||
span := trace.NewSpanFromContext(ctx)
|
||||
span.StartClient("DBConn.Stream")
|
||||
span, ctx := trace.NewSpan(ctx, "DBConn.Stream", trace.Client)
|
||||
defer span.Finish()
|
||||
|
||||
resultSent := false
|
||||
|
|
|
@ -318,8 +318,7 @@ func (qe *QueryEngine) Close() {
|
|||
|
||||
// GetPlan returns the TabletPlan that for the query. Plans are cached in a cache.LRUCache.
|
||||
func (qe *QueryEngine) GetPlan(ctx context.Context, logStats *tabletenv.LogStats, sql string, skipQueryPlanCache bool) (*TabletPlan, error) {
|
||||
span := trace.NewSpanFromContext(ctx)
|
||||
span.StartLocal("QueryEngine.GetPlan")
|
||||
span, ctx := trace.NewSpan(ctx, "QueryEngine.GetPlan", trace.Local)
|
||||
defer span.Finish()
|
||||
|
||||
if plan := qe.getQuery(sql); plan != nil {
|
||||
|
|
|
@ -789,12 +789,11 @@ func (qre *QueryExecutor) execSet() (*sqltypes.Result, error) {
|
|||
}
|
||||
|
||||
func (qre *QueryExecutor) getConn() (*connpool.DBConn, error) {
|
||||
span := trace.NewSpanFromContext(qre.ctx)
|
||||
span.StartLocal("QueryExecutor.getConn")
|
||||
span, ctx := trace.NewSpan(qre.ctx, "QueryExecutor.getConn", trace.Local)
|
||||
defer span.Finish()
|
||||
|
||||
start := time.Now()
|
||||
conn, err := qre.tsv.qe.getQueryConn(qre.ctx)
|
||||
conn, err := qre.tsv.qe.getQueryConn(ctx)
|
||||
switch err {
|
||||
case nil:
|
||||
qre.logStats.WaitingForConnection += time.Since(start)
|
||||
|
@ -806,12 +805,11 @@ func (qre *QueryExecutor) getConn() (*connpool.DBConn, error) {
|
|||
}
|
||||
|
||||
func (qre *QueryExecutor) getStreamConn() (*connpool.DBConn, error) {
|
||||
span := trace.NewSpanFromContext(qre.ctx)
|
||||
span.StartLocal("QueryExecutor.getStreamConn")
|
||||
span, ctx := trace.NewSpan(qre.ctx, "QueryExecutor.getStreamConn", trace.Local)
|
||||
defer span.Finish()
|
||||
|
||||
start := time.Now()
|
||||
conn, err := qre.tsv.qe.streamConns.Get(qre.ctx)
|
||||
conn, err := qre.tsv.qe.streamConns.Get(ctx)
|
||||
switch err {
|
||||
case nil:
|
||||
qre.logStats.WaitingForConnection += time.Since(start)
|
||||
|
|
Загрузка…
Ссылка в новой задаче