initial commit of monitoring tool

This commit is contained in:
Jim Minter 2020-01-18 17:46:58 -06:00
Родитель 072ce1c441
Коммит 3eb28fe10b
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 0730CBDA10D1A2D3
18 изменённых файлов: 1280 добавлений и 3 удалений

Просмотреть файл

@ -16,6 +16,7 @@ var (
)
func main() {
ctx := context.Background()
log := utillog.GetLogger()
log.Printf("starting, git commit %s", gitCommit)
@ -27,9 +28,14 @@ func main() {
var err error
switch strings.ToLower(os.Args[1]) {
case "mirror":
err = mirror(context.Background(), log)
err = mirror(ctx, log)
case "monitor":
err = monitor(ctx, log)
case "rp":
err = rp(context.Background(), log)
err = rp(ctx, log)
default:
usage()
os.Exit(2)
}
if err != nil {

41
cmd/aro/monitor.go Normal file
Просмотреть файл

@ -0,0 +1,41 @@
package main
// Copyright (c) Microsoft Corporation.
// Licensed under the Apache License 2.0.
import (
"context"
uuid "github.com/satori/go.uuid"
"github.com/sirupsen/logrus"
"github.com/Azure/ARO-RP/pkg/database"
"github.com/Azure/ARO-RP/pkg/env"
"github.com/Azure/ARO-RP/pkg/metrics/statsd"
pkgmonitor "github.com/Azure/ARO-RP/pkg/monitor"
)
func monitor(ctx context.Context, log *logrus.Entry) error {
uuid := uuid.NewV4().String()
log.Printf("uuid %s", uuid)
env, err := env.NewEnv(ctx, log)
if err != nil {
return err
}
db, err := database.NewDatabase(ctx, log.WithField("component", "database"), env, uuid)
if err != nil {
return err
}
m, err := statsd.New(ctx, log.WithField("component", "metrics"), env)
if err != nil {
return err
}
defer m.Close()
mon := pkgmonitor.NewMonitor(log.WithField("component", "monitor"), env, db, m)
return mon.Run(ctx)
}

Просмотреть файл

@ -46,6 +46,28 @@
"[resourceId('Microsoft.DocumentDB/databaseAccounts/sqlDatabases', parameters('databaseAccountName'), parameters('databaseName'))]"
]
},
{
"properties": {
"resource": {
"id": "Monitors",
"partitionKey": {
"paths": [
"/id"
],
"kind": "Hash"
},
"defaultTtl": -1
},
"options": {}
},
"name": "[concat(parameters('databaseAccountName'), '/', parameters('databaseName'), '/Monitors')]",
"type": "Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers",
"location": "[resourceGroup().location]",
"apiVersion": "2019-08-01",
"dependsOn": [
"[resourceId('Microsoft.DocumentDB/databaseAccounts/sqlDatabases', parameters('databaseAccountName'), parameters('databaseName'))]"
]
},
{
"properties": {
"resource": {

Просмотреть файл

@ -398,6 +398,29 @@
"[resourceId('Microsoft.DocumentDB/databaseAccounts', parameters('databaseAccountName'))]"
]
},
{
"properties": {
"resource": {
"id": "Monitors",
"partitionKey": {
"paths": [
"/id"
],
"kind": "Hash"
},
"defaultTtl": -1
},
"options": {}
},
"name": "[concat(parameters('databaseAccountName'), '/', 'ARO', '/Monitors')]",
"type": "Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers",
"location": "[resourceGroup().location]",
"apiVersion": "2019-08-01",
"dependsOn": [
"[resourceId('Microsoft.DocumentDB/databaseAccounts/sqlDatabases', parameters('databaseAccountName'), 'ARO')]",
"[resourceId('Microsoft.DocumentDB/databaseAccounts', parameters('databaseAccountName'))]"
]
},
{
"properties": {
"resource": {

46
docs/monitoring.md Normal file
Просмотреть файл

@ -0,0 +1,46 @@
# Monitoring
## Initial goals
* Jump start our monitoring capabilities to get basic visibility quickly and
enable rapid iteration.
* Enable capabilities which cluster admins cannot significantly tamper with.
* Straightforward operational prerequisites (reliability, upgradeability,
observability, basic scalability, state management, management in multiple
regions, etc.)
The first two imply external monitoring, but not to the exclusion of adding
monitoring from inside the cluster as well as a complementary near-term goal.
## Implementation
* Monitoring is horizontally scalable, active/active.
* Every monitor process advertises its liveness to the database by updating its
own MonitorDocument named with its UUID. These MonitorDocuments have a ttl
set, so each will disappear from the database if it is not regularly
refreshed.
* Every monitor process competes for a lease on a MonitorDocument called
"master". The master lease owner lists the advertised monitors (hopefully
including itself) and shares ownership of 256 monitoring buckets evenly across
the monitors.
* Every monitor process regularly checks the "master" MonitorDocument to learn
what buckets it has been assigned.
* Every cluster is placed at create time into one of the 256 buckets using a
uniform random distribution.
* Each monitor uses a Cosmos DB change feed to keep track of database state
locally (like k8s list/watch). At startup, the cosmos DB change feed returns
the current state of all of the OpenShiftClusterDocuments; subsequently as
OpenShiftClusterDocuments it returns the updated documents.
* Each monitor aims to check each cluster it "owns" every 5 minutes; it walks
the local database map and distributes checking over lots of local goroutine
workers.
* Monitoring stats are output to mdm via statsd.
## Back-of-envelope calculations
* To support 50,000 clusters/RP with (say) 3 monitors, and check every cluster
every 5 minutes, each monitor will need to retire 55 checks per second.
* If each check is allowed up to 30 seconds to run, that implies 1650 active
goroutines per monitor.
* If each cluster's cached data model takes 2KB and each goroutine takes 2KB,
memory usage per monitor would be around 103MB.

11
pkg/api/monitor.go Normal file
Просмотреть файл

@ -0,0 +1,11 @@
package api
// Copyright (c) Microsoft Corporation.
// Licensed under the Apache License 2.0.
// Monitor represents a monitor
type Monitor struct {
MissingFields
Buckets []string `json:"buckets,omitempty"`
}

Просмотреть файл

@ -0,0 +1,33 @@
package api
// Copyright (c) Microsoft Corporation.
// Licensed under the Apache License 2.0.
// MonitorDocuments represents monitor documents.
// pkg/database/cosmosdb requires its definition.
type MonitorDocuments struct {
Count int `json:"_count,omitempty"`
ResourceID string `json:"_rid,omitempty"`
MonitorDocuments []*MonitorDocument `json:"Documents,omitempty"`
}
// MonitorDocument represents a monitor document.
// pkg/database/cosmosdb requires its definition.
type MonitorDocument struct {
MissingFields
ID string `json:"id,omitempty"`
ResourceID string `json:"_rid,omitempty"`
Timestamp int `json:"_ts,omitempty"`
Self string `json:"_self,omitempty"`
ETag string `json:"_etag,omitempty"`
Attachments string `json:"_attachments,omitempty"`
TTL int `json:"ttl,omitempty"`
LSN int `json:"_lsn,omitempty"`
Metadata map[string]interface{} `json:"_metadata,omitempty"`
LeaseOwner string `json:"leaseOwner,omitempty"`
LeaseExpires int `json:"leaseExpires,omitempty"`
Monitor *Monitor `json:"monitor,omitempty"`
}

Просмотреть файл

@ -1,4 +1,4 @@
//go:generate go run ../../../vendor/github.com/jim-minter/go-cosmosdb/cmd/gencosmosdb github.com/Azure/ARO-RP/pkg/api,AsyncOperationDocument github.com/Azure/ARO-RP/pkg/api,OpenShiftClusterDocument github.com/Azure/ARO-RP/pkg/api,SubscriptionDocument
//go:generate go run ../../../vendor/github.com/jim-minter/go-cosmosdb/cmd/gencosmosdb github.com/Azure/ARO-RP/pkg/api,AsyncOperationDocument github.com/Azure/ARO-RP/pkg/api,MonitorDocument github.com/Azure/ARO-RP/pkg/api,OpenShiftClusterDocument github.com/Azure/ARO-RP/pkg/api,SubscriptionDocument
//go:generate go run ../../../vendor/github.com/golang/mock/mockgen -destination=../../util/mocks/database/$GOPACKAGE/$GOPACKAGE.go github.com/Azure/ARO-RP/pkg/database/$GOPACKAGE OpenShiftClusterDocumentIterator
//go:generate go run ../../../vendor/golang.org/x/tools/cmd/goimports -local=github.com/Azure/ARO-RP -e -w ../../util/mocks/database/$GOPACKAGE/$GOPACKAGE.go

Просмотреть файл

@ -0,0 +1,289 @@
// Code generated by github.com/jim-minter/go-cosmosdb, DO NOT EDIT.
package cosmosdb
import (
"context"
"net/http"
"strings"
pkg "github.com/Azure/ARO-RP/pkg/api"
)
type monitorDocumentClient struct {
*databaseClient
path string
}
// MonitorDocumentClient is a monitorDocument client
type MonitorDocumentClient interface {
Create(context.Context, string, *pkg.MonitorDocument, *Options) (*pkg.MonitorDocument, error)
List(*Options) MonitorDocumentRawIterator
ListAll(context.Context, *Options) (*pkg.MonitorDocuments, error)
Get(context.Context, string, string, *Options) (*pkg.MonitorDocument, error)
Replace(context.Context, string, *pkg.MonitorDocument, *Options) (*pkg.MonitorDocument, error)
Delete(context.Context, string, *pkg.MonitorDocument, *Options) error
Query(string, *Query, *Options) MonitorDocumentRawIterator
QueryAll(context.Context, string, *Query, *Options) (*pkg.MonitorDocuments, error)
ChangeFeed(*Options) MonitorDocumentIterator
}
type monitorDocumentChangeFeedIterator struct {
*monitorDocumentClient
continuation string
options *Options
}
type monitorDocumentListIterator struct {
*monitorDocumentClient
continuation string
done bool
options *Options
}
type monitorDocumentQueryIterator struct {
*monitorDocumentClient
partitionkey string
query *Query
continuation string
done bool
options *Options
}
// MonitorDocumentIterator is a monitorDocument iterator
type MonitorDocumentIterator interface {
Next(context.Context) (*pkg.MonitorDocuments, error)
}
// MonitorDocumentRawIterator is a monitorDocument raw iterator
type MonitorDocumentRawIterator interface {
MonitorDocumentIterator
NextRaw(context.Context, interface{}) error
}
// NewMonitorDocumentClient returns a new monitorDocument client
func NewMonitorDocumentClient(collc CollectionClient, collid string) MonitorDocumentClient {
return &monitorDocumentClient{
databaseClient: collc.(*collectionClient).databaseClient,
path: collc.(*collectionClient).path + "/colls/" + collid,
}
}
func (c *monitorDocumentClient) all(ctx context.Context, i MonitorDocumentIterator) (*pkg.MonitorDocuments, error) {
allmonitorDocuments := &pkg.MonitorDocuments{}
for {
monitorDocuments, err := i.Next(ctx)
if err != nil {
return nil, err
}
if monitorDocuments == nil {
break
}
allmonitorDocuments.Count += monitorDocuments.Count
allmonitorDocuments.ResourceID = monitorDocuments.ResourceID
allmonitorDocuments.MonitorDocuments = append(allmonitorDocuments.MonitorDocuments, monitorDocuments.MonitorDocuments...)
}
return allmonitorDocuments, nil
}
func (c *monitorDocumentClient) Create(ctx context.Context, partitionkey string, newmonitorDocument *pkg.MonitorDocument, options *Options) (monitorDocument *pkg.MonitorDocument, err error) {
headers := http.Header{}
headers.Set("X-Ms-Documentdb-Partitionkey", `["`+partitionkey+`"]`)
if options == nil {
options = &Options{}
}
options.NoETag = true
err = c.setOptions(options, newmonitorDocument, headers)
if err != nil {
return
}
err = c.do(ctx, http.MethodPost, c.path+"/docs", "docs", c.path, http.StatusCreated, &newmonitorDocument, &monitorDocument, headers)
return
}
func (c *monitorDocumentClient) List(options *Options) MonitorDocumentRawIterator {
return &monitorDocumentListIterator{monitorDocumentClient: c, options: options}
}
func (c *monitorDocumentClient) ListAll(ctx context.Context, options *Options) (*pkg.MonitorDocuments, error) {
return c.all(ctx, c.List(options))
}
func (c *monitorDocumentClient) Get(ctx context.Context, partitionkey, monitorDocumentid string, options *Options) (monitorDocument *pkg.MonitorDocument, err error) {
headers := http.Header{}
headers.Set("X-Ms-Documentdb-Partitionkey", `["`+partitionkey+`"]`)
err = c.setOptions(options, nil, headers)
if err != nil {
return
}
err = c.do(ctx, http.MethodGet, c.path+"/docs/"+monitorDocumentid, "docs", c.path+"/docs/"+monitorDocumentid, http.StatusOK, nil, &monitorDocument, headers)
return
}
func (c *monitorDocumentClient) Replace(ctx context.Context, partitionkey string, newmonitorDocument *pkg.MonitorDocument, options *Options) (monitorDocument *pkg.MonitorDocument, err error) {
headers := http.Header{}
headers.Set("X-Ms-Documentdb-Partitionkey", `["`+partitionkey+`"]`)
err = c.setOptions(options, newmonitorDocument, headers)
if err != nil {
return
}
err = c.do(ctx, http.MethodPut, c.path+"/docs/"+newmonitorDocument.ID, "docs", c.path+"/docs/"+newmonitorDocument.ID, http.StatusOK, &newmonitorDocument, &monitorDocument, headers)
return
}
func (c *monitorDocumentClient) Delete(ctx context.Context, partitionkey string, monitorDocument *pkg.MonitorDocument, options *Options) (err error) {
headers := http.Header{}
headers.Set("X-Ms-Documentdb-Partitionkey", `["`+partitionkey+`"]`)
err = c.setOptions(options, monitorDocument, headers)
if err != nil {
return
}
err = c.do(ctx, http.MethodDelete, c.path+"/docs/"+monitorDocument.ID, "docs", c.path+"/docs/"+monitorDocument.ID, http.StatusNoContent, nil, nil, headers)
return
}
func (c *monitorDocumentClient) Query(partitionkey string, query *Query, options *Options) MonitorDocumentRawIterator {
return &monitorDocumentQueryIterator{monitorDocumentClient: c, partitionkey: partitionkey, query: query, options: options}
}
func (c *monitorDocumentClient) QueryAll(ctx context.Context, partitionkey string, query *Query, options *Options) (*pkg.MonitorDocuments, error) {
return c.all(ctx, c.Query(partitionkey, query, options))
}
func (c *monitorDocumentClient) ChangeFeed(options *Options) MonitorDocumentIterator {
return &monitorDocumentChangeFeedIterator{monitorDocumentClient: c}
}
func (c *monitorDocumentClient) setOptions(options *Options, monitorDocument *pkg.MonitorDocument, headers http.Header) error {
if options == nil {
return nil
}
if monitorDocument != nil && !options.NoETag {
if monitorDocument.ETag == "" {
return ErrETagRequired
}
headers.Set("If-Match", monitorDocument.ETag)
}
if len(options.PreTriggers) > 0 {
headers.Set("X-Ms-Documentdb-Pre-Trigger-Include", strings.Join(options.PreTriggers, ","))
}
if len(options.PostTriggers) > 0 {
headers.Set("X-Ms-Documentdb-Post-Trigger-Include", strings.Join(options.PostTriggers, ","))
}
if len(options.PartitionKeyRangeID) > 0 {
headers.Set("X-Ms-Documentdb-PartitionKeyRangeID", options.PartitionKeyRangeID)
}
return nil
}
func (i *monitorDocumentChangeFeedIterator) Next(ctx context.Context) (monitorDocuments *pkg.MonitorDocuments, err error) {
headers := http.Header{}
headers.Set("A-IM", "Incremental feed")
headers.Set("X-Ms-Max-Item-Count", "-1")
if i.continuation != "" {
headers.Set("If-None-Match", i.continuation)
}
err = i.setOptions(i.options, nil, headers)
if err != nil {
return
}
err = i.do(ctx, http.MethodGet, i.path+"/docs", "docs", i.path, http.StatusOK, nil, &monitorDocuments, headers)
if IsErrorStatusCode(err, http.StatusNotModified) {
err = nil
}
if err != nil {
return
}
i.continuation = headers.Get("Etag")
return
}
func (i *monitorDocumentListIterator) Next(ctx context.Context) (monitorDocuments *pkg.MonitorDocuments, err error) {
err = i.NextRaw(ctx, &monitorDocuments)
return
}
func (i *monitorDocumentListIterator) NextRaw(ctx context.Context, raw interface{}) (err error) {
if i.done {
return
}
headers := http.Header{}
headers.Set("X-Ms-Max-Item-Count", "-1")
if i.continuation != "" {
headers.Set("X-Ms-Continuation", i.continuation)
}
err = i.setOptions(i.options, nil, headers)
if err != nil {
return
}
err = i.do(ctx, http.MethodGet, i.path+"/docs", "docs", i.path, http.StatusOK, nil, &raw, headers)
if err != nil {
return
}
i.continuation = headers.Get("X-Ms-Continuation")
i.done = i.continuation == ""
return
}
func (i *monitorDocumentQueryIterator) Next(ctx context.Context) (monitorDocuments *pkg.MonitorDocuments, err error) {
err = i.NextRaw(ctx, &monitorDocuments)
return
}
func (i *monitorDocumentQueryIterator) NextRaw(ctx context.Context, raw interface{}) (err error) {
if i.done {
return
}
headers := http.Header{}
headers.Set("X-Ms-Max-Item-Count", "-1")
headers.Set("X-Ms-Documentdb-Isquery", "True")
headers.Set("Content-Type", "application/query+json")
if i.partitionkey != "" {
headers.Set("X-Ms-Documentdb-Partitionkey", `["`+i.partitionkey+`"]`)
} else {
headers.Set("X-Ms-Documentdb-Query-Enablecrosspartition", "True")
}
if i.continuation != "" {
headers.Set("X-Ms-Continuation", i.continuation)
}
err = i.setOptions(i.options, nil, headers)
if err != nil {
return
}
err = i.do(ctx, http.MethodPost, i.path+"/docs", "docs", i.path, http.StatusOK, &i.query, &raw, headers)
if err != nil {
return
}
i.continuation = headers.Get("X-Ms-Continuation")
i.done = i.continuation == ""
return
}

Просмотреть файл

@ -20,6 +20,7 @@ import (
// Database represents a database
type Database struct {
AsyncOperations AsyncOperations
Monitors Monitors
OpenShiftClusters OpenShiftClusters
Subscriptions Subscriptions
}
@ -61,6 +62,11 @@ func NewDatabase(ctx context.Context, log *logrus.Entry, env env.Interface, uuid
return nil, err
}
db.Monitors, err = NewMonitors(ctx, uuid, dbc, env.DatabaseName(), "Monitors")
if err != nil {
return nil, err
}
db.OpenShiftClusters, err = NewOpenShiftClusters(ctx, uuid, dbc, env.DatabaseName(), "OpenShiftClusters")
if err != nil {
return nil, err

179
pkg/database/monitors.go Normal file
Просмотреть файл

@ -0,0 +1,179 @@
package database
// Copyright (c) Microsoft Corporation.
// Licensed under the Apache License 2.0.
import (
"context"
"fmt"
"net/http"
"strings"
"github.com/Azure/ARO-RP/pkg/api"
"github.com/Azure/ARO-RP/pkg/database/cosmosdb"
)
type monitors struct {
c cosmosdb.MonitorDocumentClient
uuid string
}
// Monitors is the database interface for MonitorDocuments
type Monitors interface {
Create(context.Context, *api.MonitorDocument) (*api.MonitorDocument, error)
PatchWithLease(context.Context, string, func(*api.MonitorDocument) error) (*api.MonitorDocument, error)
TryLease(context.Context) (*api.MonitorDocument, error)
ListBuckets(context.Context) ([]int, error)
ListMonitors(context.Context) (*api.MonitorDocuments, error)
MonitorHeartbeat(context.Context) error
}
// NewMonitors returns a new Monitors
func NewMonitors(ctx context.Context, uuid string, dbc cosmosdb.DatabaseClient, dbid, collid string) (Monitors, error) {
collc := cosmosdb.NewCollectionClient(dbc, dbid)
triggers := []*cosmosdb.Trigger{
{
ID: "renewLease",
TriggerOperation: cosmosdb.TriggerOperationAll,
TriggerType: cosmosdb.TriggerTypePre,
Body: `function trigger() {
var request = getContext().getRequest();
var body = request.getBody();
var date = new Date();
body["leaseExpires"] = Math.floor(date.getTime() / 1000) + 60;
request.setBody(body);
}`,
},
}
triggerc := cosmosdb.NewTriggerClient(collc, collid)
for _, trigger := range triggers {
_, err := triggerc.Create(ctx, trigger)
if err != nil && !cosmosdb.IsErrorStatusCode(err, http.StatusConflict) {
return nil, err
}
}
return &monitors{
c: cosmosdb.NewMonitorDocumentClient(collc, collid),
uuid: uuid,
}, nil
}
func (c *monitors) Create(ctx context.Context, doc *api.MonitorDocument) (*api.MonitorDocument, error) {
if doc.ID != strings.ToLower(doc.ID) {
return nil, fmt.Errorf("id %q is not lower case", doc.ID)
}
doc, err := c.c.Create(ctx, doc.ID, doc, nil)
if err, ok := err.(*cosmosdb.Error); ok && err.StatusCode == http.StatusConflict {
err.StatusCode = http.StatusPreconditionFailed
}
return doc, err
}
func (c *monitors) get(ctx context.Context, id string) (*api.MonitorDocument, error) {
if id != strings.ToLower(id) {
return nil, fmt.Errorf("id %q is not lower case", id)
}
return c.c.Get(ctx, id, id, nil)
}
func (c *monitors) patch(ctx context.Context, id string, f func(*api.MonitorDocument) error, options *cosmosdb.Options) (*api.MonitorDocument, error) {
var doc *api.MonitorDocument
err := cosmosdb.RetryOnPreconditionFailed(func() (err error) {
doc, err = c.get(ctx, id)
if err != nil {
return
}
err = f(doc)
if err != nil {
return
}
doc, err = c.update(ctx, doc, options)
return
})
return doc, err
}
func (c *monitors) PatchWithLease(ctx context.Context, id string, f func(*api.MonitorDocument) error) (*api.MonitorDocument, error) {
return c.patch(ctx, id, func(doc *api.MonitorDocument) error {
if doc.LeaseOwner != c.uuid {
return fmt.Errorf("lost lease")
}
return f(doc)
}, &cosmosdb.Options{PreTriggers: []string{"renewLease"}})
}
func (c *monitors) update(ctx context.Context, doc *api.MonitorDocument, options *cosmosdb.Options) (*api.MonitorDocument, error) {
if doc.ID != strings.ToLower(doc.ID) {
return nil, fmt.Errorf("id %q is not lower case", doc.ID)
}
return c.c.Replace(ctx, doc.ID, doc, options)
}
func (c *monitors) TryLease(ctx context.Context) (*api.MonitorDocument, error) {
docs, err := c.c.QueryAll(ctx, "", &cosmosdb.Query{
Query: `SELECT * FROM Monitors doc WHERE doc.id = "master" AND (doc.leaseExpires ?? 0) < GetCurrentTimestamp() / 1000`,
}, nil)
if err != nil {
return nil, err
}
if docs == nil {
return nil, nil
}
for _, doc := range docs.MonitorDocuments {
doc.LeaseOwner = c.uuid
doc, err = c.update(ctx, doc, &cosmosdb.Options{PreTriggers: []string{"renewLease"}})
if cosmosdb.IsErrorStatusCode(err, http.StatusPreconditionFailed) { // someone else got there first
continue
}
return doc, err
}
return nil, nil
}
func (c *monitors) ListBuckets(ctx context.Context) (buckets []int, err error) {
doc, err := c.get(ctx, "master")
if err != nil || doc == nil {
return nil, err
}
for i, monitor := range doc.Monitor.Buckets {
if monitor == c.uuid {
buckets = append(buckets, i)
}
}
return buckets, nil
}
func (c *monitors) ListMonitors(ctx context.Context) (*api.MonitorDocuments, error) {
return c.c.QueryAll(ctx, "", &cosmosdb.Query{
Query: `SELECT * FROM Monitors doc WHERE doc.id != "master"`,
}, nil)
}
func (c *monitors) MonitorHeartbeat(ctx context.Context) error {
doc := &api.MonitorDocument{
ID: c.uuid,
TTL: 60,
}
_, err := c.update(ctx, doc, &cosmosdb.Options{NoETag: true})
if err != nil && cosmosdb.IsErrorStatusCode(err, http.StatusNotFound) {
_, err = c.Create(ctx, doc)
}
return err
}

Просмотреть файл

@ -23,6 +23,7 @@ type openShiftClusters struct {
// OpenShiftClusters is the database interface for OpenShiftClusterDocuments
type OpenShiftClusters interface {
Create(context.Context, *api.OpenShiftClusterDocument) (*api.OpenShiftClusterDocument, error)
ListAll(context.Context) (*api.OpenShiftClusterDocuments, error)
Get(context.Context, string) (*api.OpenShiftClusterDocument, error)
Patch(context.Context, string, func(*api.OpenShiftClusterDocument) error) (*api.OpenShiftClusterDocument, error)
PatchWithLease(context.Context, string, func(*api.OpenShiftClusterDocument) error) (*api.OpenShiftClusterDocument, error)
@ -88,6 +89,10 @@ func (c *openShiftClusters) Create(ctx context.Context, doc *api.OpenShiftCluste
return doc, err
}
func (c *openShiftClusters) ListAll(ctx context.Context) (*api.OpenShiftClusterDocuments, error) {
return c.c.ListAll(ctx, nil)
}
func (c *openShiftClusters) Get(ctx context.Context, key string) (*api.OpenShiftClusterDocument, error) {
if key != strings.ToLower(key) {
return nil, fmt.Errorf("key %q is not lower case", key)

Просмотреть файл

@ -614,6 +614,30 @@ func (g *generator) database(databaseName string, addDependsOn bool) []*arm.Reso
"[resourceId('Microsoft.DocumentDB/databaseAccounts/sqlDatabases', parameters('databaseAccountName'), " + databaseName + ")]",
},
},
{
Resource: &mgmtdocumentdb.SQLContainerCreateUpdateParameters{
SQLContainerCreateUpdateProperties: &mgmtdocumentdb.SQLContainerCreateUpdateProperties{
Resource: &mgmtdocumentdb.SQLContainerResource{
ID: to.StringPtr("Monitors"),
PartitionKey: &mgmtdocumentdb.ContainerPartitionKey{
Paths: &[]string{
"/id",
},
Kind: mgmtdocumentdb.PartitionKindHash,
},
DefaultTTL: to.Int32Ptr(-1),
},
Options: map[string]*string{},
},
Name: to.StringPtr("[concat(parameters('databaseAccountName'), '/', " + databaseName + ", '/Monitors')]"),
Type: to.StringPtr("Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers"),
Location: to.StringPtr("[resourceGroup().location]"),
},
APIVersion: apiVersions["documentdb"],
DependsOn: []string{
"[resourceId('Microsoft.DocumentDB/databaseAccounts/sqlDatabases', parameters('databaseAccountName'), " + databaseName + ")]",
},
},
{
Resource: &mgmtdocumentdb.SQLContainerCreateUpdateParameters{
SQLContainerCreateUpdateProperties: &mgmtdocumentdb.SQLContainerCreateUpdateProperties{

124
pkg/monitor/master.go Normal file
Просмотреть файл

@ -0,0 +1,124 @@
package monitor
// Copyright (c) Microsoft Corporation.
// Licensed under the Apache License 2.0.
import (
"context"
"github.com/Azure/ARO-RP/pkg/api"
)
// master updates the monitor document with the list of buckets balanced between
// registered monitors
func (mon *monitor) master(ctx context.Context) error {
// if we know we're not the master, attempt to gain the lease on the monitor
// document
if !mon.isMaster {
doc, err := mon.db.Monitors.TryLease(ctx)
if err != nil || doc == nil {
return err
}
mon.isMaster = true
}
// we know we're not the master; give up
if !mon.isMaster {
return nil
}
// we think we're the master. Gather up all the registered monitors
// including ourself, balance buckets between them and write the bucket
// allocations to the database. If it turns out that we're not the master,
// the patch will fail
_, err := mon.db.Monitors.PatchWithLease(ctx, "master", func(doc *api.MonitorDocument) error {
docs, err := mon.db.Monitors.ListMonitors(ctx)
if err != nil {
return err
}
var monitors []string
if docs != nil {
monitors = make([]string, 0, len(docs.MonitorDocuments))
for _, doc := range docs.MonitorDocuments {
monitors = append(monitors, doc.ID)
}
}
mon.balance(monitors, doc)
return nil
})
if err != nil && err.Error() == "lost lease" {
mon.isMaster = false
}
return err
}
// balance shares out buckets over a slice of registered monitors
func (mon *monitor) balance(monitors []string, doc *api.MonitorDocument) {
// initialise doc.Monitor
if doc.Monitor == nil {
doc.Monitor = &api.Monitor{}
}
// ensure len(doc.Monitor.Buckets) == m.bucketCount: this should only do
// anything on the very first run
if len(doc.Monitor.Buckets) < mon.bucketCount {
doc.Monitor.Buckets = append(doc.Monitor.Buckets, make([]string, mon.bucketCount-len(doc.Monitor.Buckets))...)
}
if len(doc.Monitor.Buckets) > mon.bucketCount { // should never happen
doc.Monitor.Buckets = doc.Monitor.Buckets[:mon.bucketCount]
}
var unallocated []int
m := make(map[string][]int, len(monitors)) // map of monitor to list of buckets it owns
for _, monitor := range monitors {
m[monitor] = nil
}
var target int // target number of buckets per monitor
if len(monitors) > 0 {
target = mon.bucketCount / len(monitors)
if mon.bucketCount%len(monitors) != 0 {
target++
}
}
// load the current bucket allocations into the map
for i, monitor := range doc.Monitor.Buckets {
if buckets, found := m[monitor]; found && len(buckets) < target {
// if the current bucket is allocated to a known monitor and doesn't
// take its number of buckets above the target, keep it there...
m[monitor] = append(m[monitor], i)
} else {
// ...otherwise we'll reallocate it below
unallocated = append(unallocated, i)
}
}
// reallocate all unallocated buckets, appending to the least loaded monitor
if len(monitors) > 0 {
for _, i := range unallocated {
var leastMonitor string
for monitor := range m {
if leastMonitor == "" ||
len(m[monitor]) < len(m[leastMonitor]) {
leastMonitor = monitor
}
}
m[leastMonitor] = append(m[leastMonitor], i)
}
}
// write the updated bucket allocations back to the document
for _, i := range unallocated {
doc.Monitor.Buckets[i] = "" // should only happen if there are no known monitors
}
for monitor, buckets := range m {
for _, i := range buckets {
doc.Monitor.Buckets[i] = monitor
}
}
}

186
pkg/monitor/master_test.go Normal file
Просмотреть файл

@ -0,0 +1,186 @@
package monitor
// Copyright (c) Microsoft Corporation.
// Licensed under the Apache License 2.0.
import (
"reflect"
"testing"
"github.com/Azure/ARO-RP/pkg/api"
)
func TestBalance(t *testing.T) {
type test struct {
name string
monitors []string
doc func() *api.MonitorDocument
validate func(*testing.T, *test, *api.MonitorDocument)
}
for _, tt := range []*test{
{
name: "0->1",
monitors: []string{"one"},
doc: func() *api.MonitorDocument {
return &api.MonitorDocument{}
},
validate: func(t *testing.T, tt *test, doc *api.MonitorDocument) {
for i, bucket := range doc.Monitor.Buckets {
if bucket != "one" {
t.Error(i, bucket)
}
}
},
},
{
name: "3->1",
monitors: []string{"one"},
doc: func() *api.MonitorDocument {
return &api.MonitorDocument{
Monitor: &api.Monitor{
Buckets: []string{"one", "two", "one", "three", "one", "two", "two", "one", "two"},
},
}
},
validate: func(t *testing.T, tt *test, doc *api.MonitorDocument) {
for i, bucket := range doc.Monitor.Buckets {
if bucket != "one" {
t.Error(i, bucket)
}
}
},
},
{
name: "3->0",
doc: func() *api.MonitorDocument {
return &api.MonitorDocument{
Monitor: &api.Monitor{
Buckets: []string{"one", "one", "one", "one", "one", "one", "two", "three"},
},
}
},
validate: func(t *testing.T, tt *test, doc *api.MonitorDocument) {
for i, bucket := range doc.Monitor.Buckets {
if bucket != "" {
t.Error(i, bucket)
}
}
},
},
{
name: "imbalanced",
doc: func() *api.MonitorDocument {
return &api.MonitorDocument{
Monitor: &api.Monitor{
Buckets: []string{"one", "one", "", "two", "one", "one", "one", "one"},
},
}
},
monitors: []string{"one", "two"},
validate: func(t *testing.T, tt *test, doc *api.MonitorDocument) {
old := tt.doc()
m := map[string]int{}
for i, bucket := range doc.Monitor.Buckets {
m[bucket]++
switch bucket {
case "one":
if old.Monitor.Buckets[i] != bucket {
t.Error(i)
}
case "two":
default:
t.Error(i, bucket)
}
}
for k, v := range m {
switch k {
case "one", "two":
default:
t.Error(k)
}
if v != 4 {
t.Error(k, v)
}
}
},
},
{
name: "stable",
doc: func() *api.MonitorDocument {
return &api.MonitorDocument{
Monitor: &api.Monitor{
Buckets: []string{"one", "two", "three", "one", "two", "three", "one", "three"},
},
}
},
monitors: []string{"one", "two", "three"},
validate: func(t *testing.T, tt *test, doc *api.MonitorDocument) {
old := tt.doc()
if !reflect.DeepEqual(old, doc) {
t.Error(doc.Monitor.Buckets)
}
},
},
{
name: "3->5",
doc: func() *api.MonitorDocument {
return &api.MonitorDocument{
Monitor: &api.Monitor{
Buckets: []string{"one", "two", "three", "one", "two", "three", "one", "three"},
},
}
},
monitors: []string{"one", "two", "three", "four", "five"},
validate: func(t *testing.T, tt *test, doc *api.MonitorDocument) {
old := tt.doc()
m := map[string]int{}
for i, bucket := range doc.Monitor.Buckets {
m[bucket]++
switch bucket {
case "one", "two", "three":
if old.Monitor.Buckets[i] != bucket {
t.Error(i)
}
case "four", "five":
default:
t.Error(i, bucket)
}
}
for k, v := range m {
switch k {
case "one", "two", "three", "four", "five":
default:
t.Error(k)
}
if v > 2 {
t.Error(k, v)
}
}
},
},
} {
t.Run(tt.name, func(t *testing.T) {
mon := &monitor{
bucketCount: 8,
}
doc := tt.doc()
mon.balance(tt.monitors, doc)
if doc.Monitor == nil {
t.Fatal(doc.Monitor)
}
if len(doc.Monitor.Buckets) != 8 {
t.Fatal(len(doc.Monitor.Buckets))
}
tt.validate(t, tt, doc)
})
}
}

99
pkg/monitor/monitor.go Normal file
Просмотреть файл

@ -0,0 +1,99 @@
package monitor
// Copyright (c) Microsoft Corporation.
// Licensed under the Apache License 2.0.
import (
"context"
"fmt"
"net/http"
"sync"
"time"
"github.com/sirupsen/logrus"
"github.com/Azure/ARO-RP/pkg/api"
"github.com/Azure/ARO-RP/pkg/database"
"github.com/Azure/ARO-RP/pkg/database/cosmosdb"
"github.com/Azure/ARO-RP/pkg/env"
"github.com/Azure/ARO-RP/pkg/metrics"
"github.com/Azure/ARO-RP/pkg/util/bucket"
)
type monitor struct {
baseLog *logrus.Entry
env env.Interface
db *database.Database
m metrics.Interface
mu sync.Mutex
docs sync.Map
isMaster bool
bucketCount int
buckets map[int]struct{}
ch chan string
}
type Runnable interface {
Run(context.Context) error
}
func NewMonitor(log *logrus.Entry, env env.Interface, db *database.Database, m metrics.Interface) Runnable {
return &monitor{
baseLog: log,
env: env,
db: db,
m: m,
bucketCount: bucket.Buckets,
buckets: map[int]struct{}{},
ch: make(chan string),
}
}
func (mon *monitor) Run(ctx context.Context) error {
_, err := mon.db.Monitors.Create(ctx, &api.MonitorDocument{
ID: "master",
})
if err != nil && !cosmosdb.IsErrorStatusCode(err, http.StatusPreconditionFailed) {
return err
}
// fill the cache from the database change feed
go mon.changefeed(ctx, mon.baseLog.WithField("component", "changefeed"), nil)
// schedule work across the workers
go mon.schedule(ctx, mon.baseLog.WithField("component", "schedule"), nil)
// populate the workers
for i := 0; i < 100; i++ {
go mon.worker(ctx, mon.baseLog.WithField("component", fmt.Sprintf("worker-%d", i)))
}
t := time.NewTicker(10 * time.Second)
defer t.Stop()
for {
// register ourself as a monitor
err = mon.db.Monitors.MonitorHeartbeat(ctx)
if err != nil {
mon.baseLog.Error(err)
}
// try to become master and share buckets across registered monitors
err = mon.master(ctx)
if err != nil {
mon.baseLog.Error(err)
}
// read our bucket allocation from the master
err = mon.listBuckets(ctx)
if err != nil {
mon.baseLog.Error(err)
}
<-t.C
}
}

168
pkg/monitor/worker.go Normal file
Просмотреть файл

@ -0,0 +1,168 @@
package monitor
// Copyright (c) Microsoft Corporation.
// Licensed under the Apache License 2.0.
import (
"context"
"strconv"
"time"
"github.com/sirupsen/logrus"
"k8s.io/client-go/kubernetes"
"github.com/Azure/ARO-RP/pkg/api"
"github.com/Azure/ARO-RP/pkg/util/recover"
"github.com/Azure/ARO-RP/pkg/util/restconfig"
)
// listBuckets reads our bucket allocation from the master
func (mon *monitor) listBuckets(ctx context.Context) error {
buckets, err := mon.db.Monitors.ListBuckets(ctx)
mon.baseLog.Printf("servicing %d buckets", len(buckets))
mon.mu.Lock()
defer mon.mu.Unlock()
mon.buckets = map[int]struct{}{}
if err != nil {
return err
}
for _, i := range buckets {
mon.buckets[i] = struct{}{}
}
return nil
}
// changefeed tracks the OpenShiftClusters change feed and keeps mon.docs
// up-to-date. We don't monitor clusters in Creating state, hence we don't add
// them to mon.docs. We also don't monitor clusters in Deleting state; when
// this state is reached we delete from mon.docs
func (mon *monitor) changefeed(ctx context.Context, log *logrus.Entry, stop <-chan struct{}) {
defer recover.Panic(log)
i := mon.db.OpenShiftClusters.ChangeFeed()
t := time.NewTicker(time.Second)
defer t.Stop()
for {
for {
docs, err := i.Next(ctx)
if err != nil {
log.Error(err)
break
}
if docs == nil {
break
}
for _, doc := range docs.OpenShiftClusterDocuments {
mon.baseLog.WithField("resource", doc.OpenShiftCluster.ID).Debugf("cluster in provisioningState %s", doc.OpenShiftCluster.Properties.ProvisioningState)
switch doc.OpenShiftCluster.Properties.ProvisioningState {
case api.ProvisioningStateCreating:
case api.ProvisioningStateDeleting:
mon.docs.Delete(doc.ID)
default:
// TODO: improve memory usage by storing a subset of doc in mon.docs
mon.docs.Store(doc.ID, doc)
}
}
}
select {
case <-t.C:
case <-stop:
return
}
}
}
// schedule walks mon.docs and schedules work across the worker goroutines. It
// aims for every cluster to be monitored every five minutes
func (mon *monitor) schedule(ctx context.Context, log *logrus.Entry, stop <-chan struct{}) {
defer recover.Panic(log)
t := time.NewTicker(5 * time.Minute)
defer t.Stop()
for {
mon.docs.Range(func(key, value interface{}) bool {
doc := value.(*api.OpenShiftClusterDocument)
mon.mu.Lock()
_, found := mon.buckets[doc.Bucket]
mon.mu.Unlock()
if found {
mon.ch <- doc.ID
}
return true
})
select {
case <-t.C:
case <-stop:
close(mon.ch)
return
}
}
}
// worker reads clusters to be monitored and monitors them
func (mon *monitor) worker(ctx context.Context, log *logrus.Entry) {
defer recover.Panic(log)
for id := range mon.ch {
_doc, found := mon.docs.Load(id)
if !found {
continue
}
doc := _doc.(*api.OpenShiftClusterDocument)
err := mon.workOne(ctx, mon.baseLog.WithField("resource", doc.OpenShiftCluster.ID), doc)
if err != nil {
log.Error(err)
}
}
}
// workOne checks the API server health of a cluster
func (mon *monitor) workOne(ctx context.Context, log *logrus.Entry, doc *api.OpenShiftClusterDocument) error {
log.Debug("monitoring")
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
restConfig, err := restconfig.RestConfig(ctx, mon.env, doc.OpenShiftCluster)
if err != nil {
return err
}
cli, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return err
}
var statusCode int
err = cli.RESTClient().
Get().
Context(ctx).
AbsPath("/healthz").
Do().
StatusCode(&statusCode).
Error()
if err != nil && statusCode == 0 {
return err
}
return mon.m.EmitGauge("monitoring.apiserver.health", 1, map[string]string{
"resource": doc.OpenShiftCluster.ID,
"code": strconv.FormatInt(int64(statusCode), 10),
})
}

Просмотреть файл

@ -208,6 +208,21 @@ func (mr *MockOpenShiftClustersMockRecorder) Lease(arg0, arg1 interface{}) *gomo
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Lease", reflect.TypeOf((*MockOpenShiftClusters)(nil).Lease), arg0, arg1)
}
// ListAll mocks base method
func (m *MockOpenShiftClusters) ListAll(arg0 context.Context) (*api.OpenShiftClusterDocuments, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ListAll", arg0)
ret0, _ := ret[0].(*api.OpenShiftClusterDocuments)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// ListAll indicates an expected call of ListAll
func (mr *MockOpenShiftClustersMockRecorder) ListAll(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListAll", reflect.TypeOf((*MockOpenShiftClusters)(nil).ListAll), arg0)
}
// ListByPrefix mocks base method
func (m *MockOpenShiftClusters) ListByPrefix(arg0, arg1 string) (cosmosdb.OpenShiftClusterDocumentIterator, error) {
m.ctrl.T.Helper()