зеркало из https://github.com/github/vitess-gh.git
Adds an optional plugin that logs all queries to syslog
Removes superfluous copyright text, tweaks an error logging statement. Further fleshes out sysloglogger unit test suite (still incomplete) Makes tabletserver.newLogStats(...) public, and completes unit testing of the syslog query logger plugin. Reworks unit test logic to eliminate unnecessary time.Sleep() calls Backs the mock syslog with a buffered channel rather than a map, to help eliminate remaining `time.Sleep()` calls. Fixes broken unit test Adds mutex to fix race condition in unit test Creates a separate mock syslog type for one unit test
This commit is contained in:
Родитель
8c9b8a228c
Коммит
888797aece
|
@ -0,0 +1,7 @@
|
|||
package main
|
||||
|
||||
// Imports and register the syslog-based query logger
|
||||
|
||||
import (
|
||||
_ "github.com/youtube/vitess/go/vt/tabletserver/sysloglogger"
|
||||
)
|
|
@ -50,7 +50,9 @@ type LogStats struct {
|
|||
Error *TabletError
|
||||
}
|
||||
|
||||
func newLogStats(methodName string, ctx context.Context) *LogStats {
|
||||
// NewLogStats constructs a new LogStats with supplied Method and ctx field values, and the StartTime field set to
|
||||
// the present time.
|
||||
func NewLogStats(methodName string, ctx context.Context) *LogStats {
|
||||
return &LogStats{
|
||||
Method: methodName,
|
||||
StartTime: time.Now(),
|
||||
|
|
|
@ -19,7 +19,7 @@ import (
|
|||
)
|
||||
|
||||
func TestLogStats(t *testing.T) {
|
||||
logStats := newLogStats("test", context.Background())
|
||||
logStats := NewLogStats("test", context.Background())
|
||||
logStats.AddRewrittenSQL("sql1", time.Now())
|
||||
|
||||
if !strings.Contains(logStats.RewrittenSQL(), "sql1") {
|
||||
|
@ -41,7 +41,7 @@ func TestLogStats(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestLogStatsFormatBindVariables(t *testing.T) {
|
||||
logStats := newLogStats("test", context.Background())
|
||||
logStats := NewLogStats("test", context.Background())
|
||||
logStats.BindVariables = make(map[string]interface{})
|
||||
logStats.BindVariables["key_1"] = "val_1"
|
||||
logStats.BindVariables["key_2"] = 789
|
||||
|
@ -71,7 +71,7 @@ func TestLogStatsFormatBindVariables(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestLogStatsFormatQuerySources(t *testing.T) {
|
||||
logStats := newLogStats("test", context.Background())
|
||||
logStats := NewLogStats("test", context.Background())
|
||||
if logStats.FmtQuerySources() != "none" {
|
||||
t.Fatalf("should return none since log stats does not have any query source, but got: %s", logStats.FmtQuerySources())
|
||||
}
|
||||
|
@ -93,14 +93,14 @@ func TestLogStatsContextHTML(t *testing.T) {
|
|||
html: html,
|
||||
}
|
||||
ctx := callinfo.NewContext(context.Background(), callInfo)
|
||||
logStats := newLogStats("test", ctx)
|
||||
logStats := NewLogStats("test", ctx)
|
||||
if string(logStats.ContextHTML()) != html {
|
||||
t.Fatalf("expect to get html: %s, but got: %s", html, string(logStats.ContextHTML()))
|
||||
}
|
||||
}
|
||||
|
||||
func TestLogStatsErrorStr(t *testing.T) {
|
||||
logStats := newLogStats("test", context.Background())
|
||||
logStats := NewLogStats("test", context.Background())
|
||||
if logStats.ErrorStr() != "" {
|
||||
t.Fatalf("should not get error in stats, but got: %s", logStats.ErrorStr())
|
||||
}
|
||||
|
@ -115,7 +115,7 @@ func TestLogStatsErrorStr(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestLogStatsRemoteAddrUsername(t *testing.T) {
|
||||
logStats := newLogStats("test", context.Background())
|
||||
logStats := NewLogStats("test", context.Background())
|
||||
addr, user := logStats.RemoteAddrUsername()
|
||||
if addr != "" {
|
||||
t.Fatalf("remote addr should be empty")
|
||||
|
@ -131,7 +131,7 @@ func TestLogStatsRemoteAddrUsername(t *testing.T) {
|
|||
username: username,
|
||||
}
|
||||
ctx := callinfo.NewContext(context.Background(), callInfo)
|
||||
logStats = newLogStats("test", ctx)
|
||||
logStats = NewLogStats("test", ctx)
|
||||
addr, user = logStats.RemoteAddrUsername()
|
||||
if addr != remoteAddr {
|
||||
t.Fatalf("expected to get remote addr: %s, but got: %s", remoteAddr, addr)
|
||||
|
|
|
@ -1206,7 +1206,7 @@ func newTransaction(tsv *TabletServer) int64 {
|
|||
}
|
||||
|
||||
func newTestQueryExecutor(ctx context.Context, tsv *TabletServer, sql string, txID int64) *QueryExecutor {
|
||||
logStats := newLogStats("TestQueryExecutor", ctx)
|
||||
logStats := NewLogStats("TestQueryExecutor", ctx)
|
||||
return &QueryExecutor{
|
||||
ctx: ctx,
|
||||
query: sql,
|
||||
|
|
|
@ -32,7 +32,7 @@ func TestQuerylogzHandlerInvalidLogStats(t *testing.T) {
|
|||
|
||||
func TestQuerylogzHandler(t *testing.T) {
|
||||
req, _ := http.NewRequest("GET", "/querylogz?timeout=10&limit=1", nil)
|
||||
logStats := newLogStats("Execute", context.Background())
|
||||
logStats := NewLogStats("Execute", context.Background())
|
||||
logStats.PlanType = planbuilder.PlanPassSelect.String()
|
||||
logStats.OriginalSQL = "select name from test_table limit 1000"
|
||||
logStats.RowsAffected = 1000
|
||||
|
|
|
@ -307,7 +307,7 @@ func TestSchemaInfoGetPlanPanicDuetoEmptyQuery(t *testing.T) {
|
|||
defer schemaInfo.Close()
|
||||
|
||||
ctx := context.Background()
|
||||
logStats := newLogStats("GetPlanStats", ctx)
|
||||
logStats := NewLogStats("GetPlanStats", ctx)
|
||||
defer handleAndVerifyTabletError(
|
||||
t,
|
||||
"schema info GetPlan should fail because of empty query",
|
||||
|
@ -350,7 +350,7 @@ func TestSchemaInfoQueryCache(t *testing.T) {
|
|||
defer schemaInfo.Close()
|
||||
|
||||
ctx := context.Background()
|
||||
logStats := newLogStats("GetPlanStats", ctx)
|
||||
logStats := NewLogStats("GetPlanStats", ctx)
|
||||
schemaInfo.SetQueryCacheCap(1)
|
||||
firstPlan := schemaInfo.GetPlan(ctx, logStats, firstQuery)
|
||||
if firstPlan == nil {
|
||||
|
@ -471,7 +471,7 @@ func TestSchemaInfoStatsURL(t *testing.T) {
|
|||
defer schemaInfo.Close()
|
||||
// warm up cache
|
||||
ctx := context.Background()
|
||||
logStats := newLogStats("GetPlanStats", ctx)
|
||||
logStats := NewLogStats("GetPlanStats", ctx)
|
||||
schemaInfo.GetPlan(ctx, logStats, query)
|
||||
|
||||
request, _ := http.NewRequest("GET", schemaInfo.endpoints[debugQueryPlansKey], nil)
|
||||
|
|
|
@ -0,0 +1,64 @@
|
|||
// Package sysloglogger implements an optional plugin that logs all queries to syslog.
|
||||
package sysloglogger
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"log/syslog"
|
||||
|
||||
log "github.com/golang/glog"
|
||||
"github.com/youtube/vitess/go/vt/servenv"
|
||||
"github.com/youtube/vitess/go/vt/tabletserver"
|
||||
)
|
||||
|
||||
// syslogWriter is an interface that wraps syslog.Writer, so it can be mocked in unit tests.
|
||||
type syslogWriter interface {
|
||||
Info(string) error
|
||||
Close() error
|
||||
}
|
||||
|
||||
// writer holds a persistent connection to the syslog daemon (or a mock when under test).
|
||||
var writer syslogWriter
|
||||
|
||||
// ch holds the tabletserver.StatsLogger channel to which this plugin subscribes (or a mock when under test).
|
||||
var ch chan interface{}
|
||||
|
||||
// logQueries is the vttablet startup flag that must be set for this plugin to be active.
|
||||
var logQueries = flag.Bool("log_queries", false, "Enable query logging to syslog.")
|
||||
|
||||
func init() {
|
||||
servenv.OnRun(func() {
|
||||
if *logQueries {
|
||||
var err error
|
||||
writer, err = syslog.New(syslog.LOG_INFO, "vtquerylogger")
|
||||
if err != nil {
|
||||
log.Errorf("Query logger is unable to connect to syslog: %v", err)
|
||||
return
|
||||
}
|
||||
go run()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// Run logs queries to syslog, if the "log_queries" flag is set to true when starting vttablet.
|
||||
func run() {
|
||||
defer writer.Close()
|
||||
|
||||
// ch will only be non-nil in a unit test context, when a mock has been populated
|
||||
if ch == nil {
|
||||
ch := tabletserver.StatsLogger.Subscribe("gwslog")
|
||||
defer tabletserver.StatsLogger.Unsubscribe(ch)
|
||||
}
|
||||
|
||||
formatParams := map[string][]string{"full": {}}
|
||||
for out := range ch {
|
||||
stats, ok := out.(*tabletserver.LogStats)
|
||||
if !ok {
|
||||
log.Errorf("Unexpected value in query logs: %#v (expecting value of type %T)", out, &tabletserver.LogStats{})
|
||||
continue
|
||||
}
|
||||
if err := writer.Info(stats.Format(formatParams)); err != nil {
|
||||
log.Errorf("Error writing to syslog: %v", err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,189 @@
|
|||
package sysloglogger
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log/syslog"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/youtube/vitess/go/vt/tabletserver"
|
||||
)
|
||||
|
||||
// fakeWriter is a mock of the real syslog writer, to enable capturing and playing back of log messages in unit testing.
|
||||
type fakeWriter struct {
|
||||
messages map[string]bool
|
||||
}
|
||||
|
||||
func newFakeWriter() *fakeWriter {
|
||||
return &fakeWriter{
|
||||
messages: make(map[string]bool),
|
||||
}
|
||||
}
|
||||
func (fw *fakeWriter) write(pri syslog.Priority, msg string) error {
|
||||
fw.messages[strings.TrimSpace(msg)] = true
|
||||
return nil
|
||||
}
|
||||
func (fw *fakeWriter) Info(msg string) error { return fw.write(syslog.LOG_INFO, msg) }
|
||||
func (fw *fakeWriter) Close() error { return nil }
|
||||
|
||||
// mockLogStats generates a dummy tabletserver.LogStats message for testing.
|
||||
func mockLogStats(originalSQL string) *tabletserver.LogStats {
|
||||
logstats := tabletserver.NewLogStats("Execute", context.Background())
|
||||
logstats.StartTime = time.Time{}
|
||||
logstats.PlanType = "PASS_SELECT"
|
||||
logstats.OriginalSQL = originalSQL
|
||||
return logstats
|
||||
}
|
||||
|
||||
// failingFakeWriter is a mock syslog writer that deliberately simulates an intermittent syslog outage, which causes
|
||||
// every 4th message log message to be dropped.
|
||||
type failingFakeWriter struct {
|
||||
messages map[string]bool
|
||||
numberProcessed int
|
||||
}
|
||||
|
||||
func newFailingFakeWriter() *failingFakeWriter {
|
||||
return &failingFakeWriter{
|
||||
messages: make(map[string]bool),
|
||||
numberProcessed: 0,
|
||||
}
|
||||
}
|
||||
func (fw *failingFakeWriter) write(pri syslog.Priority, msg string) error {
|
||||
fw.numberProcessed++
|
||||
if fw.numberProcessed%4 == 0 {
|
||||
return errors.New("Cannot connect to syslog")
|
||||
}
|
||||
fw.messages[strings.TrimSpace(msg)] = true
|
||||
return nil
|
||||
}
|
||||
func (fw *failingFakeWriter) Info(msg string) error { return fw.write(syslog.LOG_INFO, msg) }
|
||||
func (fw *failingFakeWriter) Close() error { return nil }
|
||||
|
||||
// expectedLogStatsText returns the results expected from the plugin processing a dummy message generated by mockLogStats(...).
|
||||
func expectedLogStatsText(originalSQL string) string {
|
||||
return fmt.Sprintf("Execute\t\t\t''\t''\tJan 1 00:00:00.000000\tJan 1 00:00:00.000000\t0.000000\tPASS_SELECT\t"+
|
||||
"\"%s\"\tnull\t0\t\"\"\tnone\t0.000000\t0.000000\t0\t0\t\"\"", originalSQL)
|
||||
}
|
||||
|
||||
// TestSyslog sends a stream of five query records to the plugin, and verifies that they are logged.
|
||||
func TestSyslog(t *testing.T) {
|
||||
// Overwrite the usual syslog writer and StatsLogger subscription channel with mocks
|
||||
mock := newFakeWriter()
|
||||
writer = mock
|
||||
ch = make(chan interface{}, 10)
|
||||
|
||||
// Start running the plugin loop
|
||||
syncChannel := make(chan bool)
|
||||
go func() {
|
||||
run()
|
||||
close(syncChannel)
|
||||
}()
|
||||
|
||||
// Send fake messages to the mock channel, and then close the channel to end the plugin loop
|
||||
ch <- mockLogStats("select 1")
|
||||
ch <- mockLogStats("select 2")
|
||||
ch <- mockLogStats("select 3")
|
||||
ch <- mockLogStats("select 4")
|
||||
ch <- mockLogStats("select 5")
|
||||
close(ch)
|
||||
<-syncChannel
|
||||
|
||||
// Collect everything that the plugin logged
|
||||
queriesLogged := make(map[string]bool)
|
||||
for received := range mock.messages {
|
||||
queriesLogged[received] = true
|
||||
}
|
||||
|
||||
// Verify the count and contents
|
||||
if len(queriesLogged) != 5 {
|
||||
t.Fatalf("Expected 5 queries to be logged, but found %d", len(queriesLogged))
|
||||
}
|
||||
for i := 1; i <= 5; i++ {
|
||||
if _, ok := queriesLogged[expectedLogStatsText("select "+strconv.Itoa(i))]; !ok {
|
||||
t.Fatalf("Expected query \"%s\" was not logged", expectedLogStatsText("select "+strconv.Itoa(i)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestSyslogWithBadData sends a stream of query records, including one that doesn't fit the type specification...
|
||||
// verifying that the bad record is gracefully skipped and the others are still logged successfully.
|
||||
func TestSyslogWithBadData(t *testing.T) {
|
||||
mock := newFakeWriter()
|
||||
writer = mock
|
||||
ch = make(chan interface{}, 10)
|
||||
|
||||
syncChannel := make(chan bool)
|
||||
go func() {
|
||||
run()
|
||||
close(syncChannel)
|
||||
}()
|
||||
|
||||
// Send 5 records for logging, one of which is bad
|
||||
ch <- mockLogStats("select 1")
|
||||
ch <- mockLogStats("select 2")
|
||||
ch <- mockLogStats("select 3")
|
||||
ch <- "Wait... this is just a garbage 'string', not of type '*tabletserver.LogStats'!"
|
||||
ch <- mockLogStats("select 5")
|
||||
close(ch)
|
||||
<-syncChannel
|
||||
|
||||
// Collect everything that the plugin logged
|
||||
queriesLogged := make(map[string]bool)
|
||||
for received := range mock.messages {
|
||||
queriesLogged[received] = true
|
||||
}
|
||||
|
||||
// Verify the count and contents
|
||||
if len(queriesLogged) != 4 {
|
||||
t.Fatalf("Expected 4 queries to be logged, but found %d", len(queriesLogged))
|
||||
}
|
||||
validNums := []int{1, 2, 3, 5}
|
||||
for _, num := range validNums {
|
||||
if _, ok := queriesLogged[expectedLogStatsText("select "+strconv.Itoa(num))]; !ok {
|
||||
t.Fatalf("Expected query \"%s\" was not logged", expectedLogStatsText("select "+strconv.Itoa(num)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestSyslogWithInterruptedConnection sends a stream of query records, simulating temporary syslog outage
|
||||
// while they're processing. Verifies that the plugin gracefully handles and recovers from the broken connectivity,
|
||||
// and that all messages received while the connection is alive are logged successfully.
|
||||
func TestSyslogWithInterruptedConnection(t *testing.T) {
|
||||
|
||||
// This mock will simulate a broken syslog connection when processing every 4th record
|
||||
mock := newFailingFakeWriter()
|
||||
writer = mock
|
||||
ch = make(chan interface{}, 10)
|
||||
|
||||
syncChannel := make(chan bool)
|
||||
go func() {
|
||||
run()
|
||||
close(syncChannel)
|
||||
}()
|
||||
|
||||
ch <- mockLogStats("select 1")
|
||||
ch <- mockLogStats("select 2")
|
||||
ch <- mockLogStats("select 3")
|
||||
ch <- mockLogStats("select 4") // This record will get dropped due to a syslog outage
|
||||
ch <- mockLogStats("select 5")
|
||||
close(ch)
|
||||
<-syncChannel
|
||||
|
||||
queriesLogged := make(map[string]bool)
|
||||
for received := range mock.messages {
|
||||
queriesLogged[received] = true
|
||||
}
|
||||
if len(queriesLogged) != 4 {
|
||||
t.Fatalf("Expected 4 queries to be logged, but found %d", len(queriesLogged))
|
||||
}
|
||||
expectedLogs := []int{1, 2, 3, 5}
|
||||
for _, num := range expectedLogs {
|
||||
if _, ok := queriesLogged[expectedLogStatsText("select "+strconv.Itoa(num))]; !ok {
|
||||
t.Fatalf("Expected query \"%s\" was not logged", expectedLogStatsText("select "+strconv.Itoa(num)))
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1100,7 +1100,7 @@ func (tsv *TabletServer) execRequest(
|
|||
target *querypb.Target, isTx, allowOnShutdown bool,
|
||||
exec func(ctx context.Context, logStats *LogStats) error,
|
||||
) (err error) {
|
||||
logStats := newLogStats(requestName, ctx)
|
||||
logStats := NewLogStats(requestName, ctx)
|
||||
logStats.OriginalSQL = sql
|
||||
logStats.BindVariables = bindVariables
|
||||
defer tsv.handleError(sql, bindVariables, &err, logStats)
|
||||
|
|
|
@ -1488,7 +1488,7 @@ func TestTabletServerSplitQueryInvalidParams(t *testing.T) {
|
|||
|
||||
func TestHandleExecUnknownError(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
logStats := newLogStats("TestHandleExecError", ctx)
|
||||
logStats := NewLogStats("TestHandleExecError", ctx)
|
||||
var err error
|
||||
testUtils := newTestUtils()
|
||||
config := testUtils.newQueryServiceConfig()
|
||||
|
@ -1499,7 +1499,7 @@ func TestHandleExecUnknownError(t *testing.T) {
|
|||
|
||||
func TestHandleExecTabletError(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
logStats := newLogStats("TestHandleExecError", ctx)
|
||||
logStats := NewLogStats("TestHandleExecError", ctx)
|
||||
var err error
|
||||
defer func() {
|
||||
want := "fatal: tablet error"
|
||||
|
@ -1516,7 +1516,7 @@ func TestHandleExecTabletError(t *testing.T) {
|
|||
|
||||
func TestTerseErrorsNonSQLError(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
logStats := newLogStats("TestHandleExecError", ctx)
|
||||
logStats := NewLogStats("TestHandleExecError", ctx)
|
||||
var err error
|
||||
defer func() {
|
||||
want := "fatal: tablet error"
|
||||
|
@ -1534,7 +1534,7 @@ func TestTerseErrorsNonSQLError(t *testing.T) {
|
|||
|
||||
func TestTerseErrorsBindVars(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
logStats := newLogStats("TestHandleExecError", ctx)
|
||||
logStats := NewLogStats("TestHandleExecError", ctx)
|
||||
var err error
|
||||
defer func() {
|
||||
want := "error: (errno 10) (sqlstate HY000) during query: select * from test_table"
|
||||
|
@ -1557,7 +1557,7 @@ func TestTerseErrorsBindVars(t *testing.T) {
|
|||
|
||||
func TestTerseErrorsNoBindVars(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
logStats := newLogStats("TestHandleExecError", ctx)
|
||||
logStats := NewLogStats("TestHandleExecError", ctx)
|
||||
var err error
|
||||
defer func() {
|
||||
want := "error: msg"
|
||||
|
|
|
@ -453,7 +453,7 @@ func TestNoTwopc(t *testing.T) {
|
|||
func newTestTxExecutor() (txe *TxExecutor, tsv *TabletServer, db *fakesqldb.DB) {
|
||||
db = setUpQueryExecutorTest()
|
||||
ctx := context.Background()
|
||||
logStats := newLogStats("TestTxExecutor", ctx)
|
||||
logStats := NewLogStats("TestTxExecutor", ctx)
|
||||
tsv = newTestTabletServer(ctx, smallTxPool, db)
|
||||
db.AddQueryPattern("insert into `_vt`\\.redo_log_transaction\\(dtid, state, time_created\\) values \\('aa', 'Prepared',.*", &sqltypes.Result{})
|
||||
db.AddQueryPattern("insert into `_vt`\\.redo_log_statement.*", &sqltypes.Result{})
|
||||
|
@ -471,7 +471,7 @@ func newTestTxExecutor() (txe *TxExecutor, tsv *TabletServer, db *fakesqldb.DB)
|
|||
func newShortAgeExecutor() (txe *TxExecutor, tsv *TabletServer, db *fakesqldb.DB) {
|
||||
db = setUpQueryExecutorTest()
|
||||
ctx := context.Background()
|
||||
logStats := newLogStats("TestTxExecutor", ctx)
|
||||
logStats := NewLogStats("TestTxExecutor", ctx)
|
||||
tsv = newTestTabletServer(ctx, smallTxPool|shortTwopcAge, db)
|
||||
db.AddQueryPattern("insert into `_vt`\\.redo_log_transaction\\(dtid, state, time_created\\) values \\('aa', 'Prepared',.*", &sqltypes.Result{})
|
||||
db.AddQueryPattern("insert into `_vt`\\.redo_log_statement.*", &sqltypes.Result{})
|
||||
|
@ -489,7 +489,7 @@ func newShortAgeExecutor() (txe *TxExecutor, tsv *TabletServer, db *fakesqldb.DB
|
|||
func newNoTwopcExecutor() (txe *TxExecutor, tsv *TabletServer, db *fakesqldb.DB) {
|
||||
db = setUpQueryExecutorTest()
|
||||
ctx := context.Background()
|
||||
logStats := newLogStats("TestTxExecutor", ctx)
|
||||
logStats := NewLogStats("TestTxExecutor", ctx)
|
||||
tsv = newTestTabletServer(ctx, noTwopc, db)
|
||||
return &TxExecutor{
|
||||
ctx: ctx,
|
||||
|
|
Загрузка…
Ссылка в новой задаче