This commit is contained in:
Mangirdas Judeikis 2020-01-09 20:34:15 +00:00
Родитель 3758345e31
Коммит d36f0e72c6
8 изменённых файлов: 189 добавлений и 23 удалений

Просмотреть файл

@ -24,17 +24,17 @@ func monitor(ctx context.Context, log *logrus.Entry) error {
return err
}
db, err := database.NewDatabase(ctx, log.WithField("component", "database"), env, uuid)
if err != nil {
return err
}
m, err := statsd.New(ctx, log.WithField("component", "metrics"), env)
if err != nil {
return err
}
defer m.Close()
db, err := database.NewDatabase(ctx, log.WithField("component", "database"), env, m, uuid)
if err != nil {
return err
}
mon := pkgmonitor.NewMonitor(log.WithField("component", "monitor"), env, db, m)
return mon.Run(ctx)

Просмотреть файл

@ -30,7 +30,13 @@ func rp(ctx context.Context, log *logrus.Entry) error {
return err
}
db, err := database.NewDatabase(ctx, log.WithField("component", "database"), env, uuid)
m, err := statsd.New(ctx, log.WithField("component", "metrics"), env)
if err != nil {
return err
}
defer m.Close()
db, err := database.NewDatabase(ctx, log.WithField("component", "database"), env, m, uuid)
if err != nil {
return err
}
@ -40,12 +46,6 @@ func rp(ctx context.Context, log *logrus.Entry) error {
done := make(chan struct{})
signal.Notify(sigterm, syscall.SIGTERM)
m, err := statsd.New(ctx, log.WithField("component", "metrics"), env)
if err != nil {
return err
}
defer m.Close()
f, err := frontend.NewFrontend(ctx, log.WithField("component", "frontend"), env, db, api.APIs, m)
if err != nil {
return err

Просмотреть файл

@ -15,6 +15,7 @@ import (
"github.com/Azure/ARO-RP/pkg/api"
"github.com/Azure/ARO-RP/pkg/database"
"github.com/Azure/ARO-RP/pkg/env"
"github.com/Azure/ARO-RP/pkg/metrics/noop"
utillog "github.com/Azure/ARO-RP/pkg/util/log"
)
@ -28,7 +29,7 @@ func run(ctx context.Context, log *logrus.Entry) error {
return err
}
db, err := database.NewDatabase(ctx, log.WithField("component", "database"), env, "")
db, err := database.NewDatabase(ctx, log.WithField("component", "database"), env, &noop.Noop{}, "")
if err != nil {
return err
}

Просмотреть файл

@ -15,10 +15,15 @@ import (
"github.com/Azure/ARO-RP/pkg/api"
"github.com/Azure/ARO-RP/pkg/database/cosmosdb"
"github.com/Azure/ARO-RP/pkg/env"
"github.com/Azure/ARO-RP/pkg/metrics"
dbmetrics "github.com/Azure/ARO-RP/pkg/metrics/statsd/cosmosdb"
)
// Database represents a database
type Database struct {
log *logrus.Entry
m metrics.Interface
AsyncOperations AsyncOperations
Monitors Monitors
OpenShiftClusters OpenShiftClusters
@ -26,7 +31,7 @@ type Database struct {
}
// NewDatabase returns a new Database
func NewDatabase(ctx context.Context, log *logrus.Entry, env env.Interface, uuid string) (db *Database, err error) {
func NewDatabase(ctx context.Context, log *logrus.Entry, env env.Interface, m metrics.Interface, uuid string) (db *Database, err error) {
databaseAccount, masterKey := env.CosmosDB()
h := &codec.JsonHandle{
@ -43,10 +48,11 @@ func NewDatabase(ctx context.Context, log *logrus.Entry, env env.Interface, uuid
}
c := &http.Client{
Transport: &http.Transport{
Transport: dbmetrics.New(log, &http.Transport{
// disable HTTP/2 for now: https://github.com/golang/go/issues/36026
TLSNextProto: map[string]func(string, *tls.Conn) http.RoundTripper{},
},
TLSNextProto: map[string]func(string, *tls.Conn) http.RoundTripper{},
MaxIdleConnsPerHost: 20,
}, m),
Timeout: 30 * time.Second,
}
@ -55,7 +61,10 @@ func NewDatabase(ctx context.Context, log *logrus.Entry, env env.Interface, uuid
return nil, err
}
db = &Database{}
db = &Database{
log: log,
m: m,
}
db.AsyncOperations, err = NewAsyncOperations(uuid, dbc, env.DatabaseName(), "AsyncOperations")
if err != nil {
@ -77,5 +86,7 @@ func NewDatabase(ctx context.Context, log *logrus.Entry, env env.Interface, uuid
return nil, err
}
go db.emitMetrics(ctx)
return db, nil
}

26
pkg/database/metrics.go Normal file
Просмотреть файл

@ -0,0 +1,26 @@
package database
// Copyright (c) Microsoft Corporation.
// Licensed under the Apache License 2.0.
import (
"context"
"time"
"github.com/Azure/ARO-RP/pkg/util/recover"
)
func (db *Database) emitMetrics(ctx context.Context) {
defer recover.Panic(db.log)
t := time.NewTicker(time.Minute)
defer t.Stop()
for range t.C {
i, err := db.OpenShiftClusters.QueueLength(ctx, "OpenShiftClusters")
if err != nil {
db.log.Error(err)
} else {
db.m.EmitGauge("database.openshiftclusters.queue.length", int64(i), nil)
}
}
}

Просмотреть файл

@ -16,8 +16,9 @@ import (
)
type openShiftClusters struct {
c cosmosdb.OpenShiftClusterDocumentClient
uuid string
c cosmosdb.OpenShiftClusterDocumentClient
collc cosmosdb.CollectionClient
uuid string
}
// OpenShiftClusters is the database interface for OpenShiftClusterDocuments
@ -25,6 +26,7 @@ type OpenShiftClusters interface {
Create(context.Context, *api.OpenShiftClusterDocument) (*api.OpenShiftClusterDocument, error)
ListAll(context.Context) (*api.OpenShiftClusterDocuments, error)
Get(context.Context, string) (*api.OpenShiftClusterDocument, error)
QueueLength(context.Context, string) (int, error)
Patch(context.Context, string, func(*api.OpenShiftClusterDocument) error) (*api.OpenShiftClusterDocument, error)
PatchWithLease(context.Context, string, func(*api.OpenShiftClusterDocument) error) (*api.OpenShiftClusterDocument, error)
Update(context.Context, *api.OpenShiftClusterDocument) (*api.OpenShiftClusterDocument, error)
@ -64,8 +66,9 @@ func NewOpenShiftClusters(ctx context.Context, uuid string, dbc cosmosdb.Databas
}
return &openShiftClusters{
c: cosmosdb.NewOpenShiftClusterDocumentClient(collc, collid),
uuid: uuid,
c: cosmosdb.NewOpenShiftClusterDocumentClient(collc, collid),
collc: collc,
uuid: uuid,
}, nil
}
@ -126,6 +129,37 @@ func (c *openShiftClusters) Get(ctx context.Context, key string) (*api.OpenShift
}
}
// QueueLength returns OpenShiftClusters un-queued document count.
// If error occurs, 0 is returned with error message
func (c *openShiftClusters) QueueLength(ctx context.Context, collid string) (int, error) {
partitions, err := c.collc.PartitionKeyRanges(ctx, collid)
if err != nil {
return 0, err
}
var countTotal int
for _, r := range partitions.PartitionKeyRanges {
result := c.c.Query("", &cosmosdb.Query{
Query: `SELECT VALUE COUNT(1) FROM OpenShiftClusters doc WHERE doc.openShiftCluster.properties.provisioningState IN ("Creating", "Deleting", "Updating") AND (doc.leaseExpires ?? 0) < GetCurrentTimestamp() / 1000`,
}, &cosmosdb.Options{
PartitionKeyRangeID: r.ID,
})
// because we aggregate count we don't expect pagination in this query result,
// so we gonna call Next() only once per partition.
var data struct {
api.MissingFields
Document []int `json:"Documents,omitempty"`
}
err := result.NextRaw(ctx, &data)
if err != nil {
return 0, err
}
countTotal = countTotal + data.Document[0]
}
return countTotal, nil
}
func (c *openShiftClusters) Patch(ctx context.Context, key string, f func(*api.OpenShiftClusterDocument) error) (*api.OpenShiftClusterDocument, error) {
return c.patch(ctx, key, f, nil)
}
@ -207,7 +241,7 @@ func (c *openShiftClusters) ListByPrefix(subscriptionID string, prefix string) (
func (c *openShiftClusters) Dequeue(ctx context.Context) (*api.OpenShiftClusterDocument, error) {
i := c.c.Query("", &cosmosdb.Query{
Query: `SELECT * FROM OpenShiftClusters doc WHERE NOT (doc.openShiftCluster.properties.provisioningState IN ("Succeeded", "Failed")) AND (doc.leaseExpires ?? 0) < GetCurrentTimestamp() / 1000`,
Query: `SELECT * FROM OpenShiftClusters doc WHERE doc.openShiftCluster.properties.provisioningState IN ("Creating", "Deleting", "Updating") AND (doc.leaseExpires ?? 0) < GetCurrentTimestamp() / 1000`,
}, nil)
for {

Просмотреть файл

@ -0,0 +1,79 @@
package cosmodb
// Copyright (c) Microsoft Corporation.
// Licensed under the Apache License 2.0.
import (
"net/http"
"strconv"
"strings"
"time"
"github.com/sirupsen/logrus"
"github.com/Azure/ARO-RP/pkg/metrics"
)
var _ http.RoundTripper = (*tracerRoundTripper)(nil)
type tracerRoundTripper struct {
log *logrus.Entry
m metrics.Interface
tr http.RoundTripper
}
func New(log *logrus.Entry, tr *http.Transport, m metrics.Interface) *tracerRoundTripper {
return &tracerRoundTripper{
log: log,
m: m,
tr: tr,
}
}
func (t *tracerRoundTripper) RoundTrip(req *http.Request) (resp *http.Response, err error) {
start := time.Now()
defer func() {
var ru float64
// Sometimes we get request-charge="" because pkranges API is free
// We log this on debug mode only and ignore
requestCharge := strings.Trim(resp.Header.Get("x-ms-request-charge"), `"`)
if requestCharge != "" {
ru, err = strconv.ParseFloat(requestCharge, 64)
if err != nil {
// we don't want to kill all DB calls if this fails
t.log.Error(err)
}
}
parts := strings.Split(req.URL.Path, "/")
if len(parts) >= 2 && parts[len(parts)-2] == "docs" {
parts[len(parts)-1] = "{id}"
}
path := strings.Join(parts, "/")
// emit RU only if we managed to parse RU value
if err == nil {
t.m.EmitFloat("client.cosmosdb.requestunits", ru, map[string]string{
"code": strconv.Itoa(resp.StatusCode),
"verb": req.Method,
"path": path,
})
}
t.m.EmitGauge("client.cosmosdb.count", 1, map[string]string{
"code": strconv.Itoa(resp.StatusCode),
"verb": req.Method,
"path": path,
})
t.m.EmitFloat("client.cosmosdb.duration", time.Now().Sub(start).Seconds(), map[string]string{
"code": strconv.Itoa(resp.StatusCode),
"verb": req.Method,
"path": path,
})
}()
return t.tr.RoundTrip(req)
}

Просмотреть файл

@ -268,6 +268,21 @@ func (mr *MockOpenShiftClustersMockRecorder) PatchWithLease(arg0, arg1, arg2 int
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PatchWithLease", reflect.TypeOf((*MockOpenShiftClusters)(nil).PatchWithLease), arg0, arg1, arg2)
}
// QueueLength mocks base method
func (m *MockOpenShiftClusters) QueueLength(arg0 context.Context, arg1 string) (int, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "QueueLength", arg0, arg1)
ret0, _ := ret[0].(int)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// QueueLength indicates an expected call of QueueLength
func (mr *MockOpenShiftClustersMockRecorder) QueueLength(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "QueueLength", reflect.TypeOf((*MockOpenShiftClusters)(nil).QueueLength), arg0, arg1)
}
// Update mocks base method
func (m *MockOpenShiftClusters) Update(arg0 context.Context, arg1 *api.OpenShiftClusterDocument) (*api.OpenShiftClusterDocument, error) {
m.ctrl.T.Helper()