335 строки
10 KiB
Go
335 строки
10 KiB
Go
package omsnozzle
|
|
|
|
import (
|
|
"encoding/json"
|
|
"os"
|
|
"os/signal"
|
|
"strings"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
|
|
"code.cloudfoundry.org/lager"
|
|
"github.com/Azure/oms-log-analytics-firehose-nozzle/caching"
|
|
"github.com/Azure/oms-log-analytics-firehose-nozzle/client"
|
|
"github.com/Azure/oms-log-analytics-firehose-nozzle/firehose"
|
|
"github.com/Azure/oms-log-analytics-firehose-nozzle/messages"
|
|
"github.com/cloudfoundry/sonde-go/events"
|
|
)
|
|
|
|
type OmsNozzle struct {
|
|
logger lager.Logger
|
|
errChan <-chan error
|
|
msgChan <-chan *events.Envelope
|
|
signalChan chan os.Signal
|
|
omsClient client.Client
|
|
firehoseClient firehose.Client
|
|
nozzleConfig *NozzleConfig
|
|
goroutineSem chan int // to control the number of active post goroutines
|
|
cachingClient caching.CachingClient
|
|
totalEventsReceived uint64
|
|
totalEventsSent uint64
|
|
totalEventsLost uint64
|
|
totalDataSent uint64
|
|
mutex *sync.Mutex
|
|
}
|
|
|
|
type NozzleConfig struct {
|
|
OmsTypePrefix string
|
|
OmsBatchTime time.Duration
|
|
OmsMaxMsgNumPerBatch int
|
|
ExcludeMetricEvents bool
|
|
ExcludeLogEvents bool
|
|
ExcludeHttpEvents bool
|
|
LogEventCount bool
|
|
LogEventCountInterval time.Duration
|
|
}
|
|
|
|
func NewOmsNozzle(logger lager.Logger, firehoseClient firehose.Client, omsClient client.Client, nozzleConfig *NozzleConfig, caching caching.CachingClient) *OmsNozzle {
|
|
maxPostGoroutines := int(100000 / nozzleConfig.OmsMaxMsgNumPerBatch)
|
|
return &OmsNozzle{
|
|
logger: logger,
|
|
errChan: make(<-chan error),
|
|
msgChan: make(<-chan *events.Envelope),
|
|
signalChan: make(chan os.Signal, 2),
|
|
omsClient: omsClient,
|
|
firehoseClient: firehoseClient,
|
|
nozzleConfig: nozzleConfig,
|
|
goroutineSem: make(chan int, maxPostGoroutines),
|
|
cachingClient: caching,
|
|
totalEventsReceived: uint64(0),
|
|
totalEventsSent: uint64(0),
|
|
totalEventsLost: uint64(0),
|
|
totalDataSent: uint64(0),
|
|
mutex: &sync.Mutex{},
|
|
}
|
|
}
|
|
|
|
func (o *OmsNozzle) Start() error {
|
|
o.cachingClient.Initialize(true)
|
|
|
|
// setup for termination signal from CF
|
|
signal.Notify(o.signalChan, syscall.SIGTERM, syscall.SIGINT)
|
|
|
|
o.msgChan, o.errChan = o.firehoseClient.Connect()
|
|
|
|
if o.nozzleConfig.LogEventCount {
|
|
o.logTotalEvents(o.nozzleConfig.LogEventCountInterval)
|
|
}
|
|
err := o.routeEvents()
|
|
return err
|
|
}
|
|
|
|
func (o *OmsNozzle) logTotalEvents(interval time.Duration) {
|
|
logEventCountTicker := time.NewTicker(interval)
|
|
lastReceivedCount := uint64(0)
|
|
lastSentCount := uint64(0)
|
|
lastLostCount := uint64(0)
|
|
|
|
go func() {
|
|
for range logEventCountTicker.C {
|
|
timeStamp := time.Now().UnixNano()
|
|
totalReceivedCount := o.totalEventsReceived
|
|
totalSentCount := o.totalEventsSent
|
|
totalLostCount := o.totalEventsLost
|
|
currentEvents := make(map[string][]interface{})
|
|
|
|
// Generate CounterEvent
|
|
o.addEventCountEvent("eventsReceived", totalReceivedCount-lastReceivedCount, totalReceivedCount, &timeStamp, ¤tEvents)
|
|
o.addEventCountEvent("eventsSent", totalSentCount-lastSentCount, totalSentCount, &timeStamp, ¤tEvents)
|
|
o.addEventCountEvent("eventsLost", totalLostCount-lastLostCount, totalLostCount, &timeStamp, ¤tEvents)
|
|
|
|
o.goroutineSem <- 1
|
|
o.postData(¤tEvents, false)
|
|
|
|
lastReceivedCount = totalReceivedCount
|
|
lastSentCount = totalSentCount
|
|
lastLostCount = totalLostCount
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (o *OmsNozzle) addEventCountEvent(name string, deltaCount uint64, count uint64, timeStamp *int64, currentEvents *map[string][]interface{}) {
|
|
counterEvent := &events.CounterEvent{
|
|
Name: &name,
|
|
Delta: &deltaCount,
|
|
Total: &count,
|
|
}
|
|
|
|
eventType := events.Envelope_CounterEvent
|
|
job := "nozzle"
|
|
origin := "stats"
|
|
envelope := &events.Envelope{
|
|
EventType: &eventType,
|
|
Timestamp: timeStamp,
|
|
Job: &job,
|
|
Origin: &origin,
|
|
CounterEvent: counterEvent,
|
|
}
|
|
|
|
var omsMsg OMSMessage
|
|
eventTypeString := eventType.String()
|
|
omsMsg = messages.NewCounterEvent(envelope, o.cachingClient)
|
|
(*currentEvents)[eventTypeString] = append((*currentEvents)[eventTypeString], omsMsg)
|
|
}
|
|
|
|
func (o *OmsNozzle) postData(events *map[string][]interface{}, addCount bool) {
|
|
for k, v := range *events {
|
|
if len(v) > 0 {
|
|
if msgAsJson, err := json.Marshal(&v); err != nil {
|
|
o.logger.Error("error marshalling message to JSON", err,
|
|
lager.Data{"event type": k},
|
|
lager.Data{"event count": len(v)})
|
|
} else {
|
|
o.logger.Debug("Posting to OMS",
|
|
lager.Data{"event type": k},
|
|
lager.Data{"event count": len(v)},
|
|
lager.Data{"total size": len(msgAsJson)})
|
|
if len(o.nozzleConfig.OmsTypePrefix) > 0 {
|
|
k = o.nozzleConfig.OmsTypePrefix + k
|
|
}
|
|
nRetries := 4
|
|
for nRetries > 0 {
|
|
requestStartTime := time.Now()
|
|
if err = o.omsClient.PostData(&msgAsJson, k); err != nil {
|
|
nRetries--
|
|
elapsedTime := time.Since(requestStartTime)
|
|
o.logger.Error("error posting message to OMS", err,
|
|
lager.Data{"event type": k},
|
|
lager.Data{"elapse time": elapsedTime.String()},
|
|
lager.Data{"event count": len(v)},
|
|
lager.Data{"total size": len(msgAsJson)},
|
|
lager.Data{"remaining attempts": nRetries})
|
|
time.Sleep(time.Second * 1)
|
|
} else {
|
|
if addCount {
|
|
o.mutex.Lock()
|
|
o.totalEventsSent += uint64(len(v))
|
|
o.totalDataSent += uint64(len(msgAsJson))
|
|
o.mutex.Unlock()
|
|
}
|
|
break
|
|
}
|
|
}
|
|
if nRetries == 0 && addCount {
|
|
o.mutex.Lock()
|
|
o.totalEventsLost += uint64(len(v))
|
|
o.mutex.Unlock()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
<-o.goroutineSem
|
|
}
|
|
|
|
func (o *OmsNozzle) routeEvents() error {
|
|
pendingEvents := make(map[string][]interface{})
|
|
// Firehose message processing loop
|
|
ticker := time.NewTicker(o.nozzleConfig.OmsBatchTime)
|
|
for {
|
|
// loop over message and signal channel
|
|
select {
|
|
case s := <-o.signalChan:
|
|
o.logger.Info("exiting", lager.Data{"signal caught": s.String()})
|
|
err := o.firehoseClient.CloseConsumer()
|
|
if err != nil {
|
|
o.logger.Error("error closing consumer", err)
|
|
}
|
|
os.Exit(1)
|
|
case <-ticker.C:
|
|
// get the pending as current
|
|
currentEvents := pendingEvents
|
|
// reset the pending events
|
|
pendingEvents = make(map[string][]interface{})
|
|
o.goroutineSem <- 1
|
|
go o.postData(¤tEvents, true)
|
|
case msg := <-o.msgChan:
|
|
o.totalEventsReceived++
|
|
// process message
|
|
var omsMessage OMSMessage
|
|
var omsMessageType = msg.GetEventType().String()
|
|
switch msg.GetEventType() {
|
|
// Metrics
|
|
case events.Envelope_ValueMetric:
|
|
if !o.nozzleConfig.ExcludeMetricEvents {
|
|
omsMessage = messages.NewValueMetric(msg, o.cachingClient)
|
|
pendingEvents[omsMessageType] = append(pendingEvents[omsMessageType], omsMessage)
|
|
}
|
|
case events.Envelope_CounterEvent:
|
|
m := messages.NewCounterEvent(msg, o.cachingClient)
|
|
if strings.Contains(m.Name, "TruncatingBuffer.DroppedMessage") {
|
|
o.logger.Error("received TruncatingBuffer alert", nil)
|
|
o.logSlowConsumerAlert()
|
|
}
|
|
if strings.Contains(m.Name, "doppler_proxy.slow_consumer") && m.Delta > 0 {
|
|
o.logger.Error("received slow_consumer alert", nil)
|
|
o.logSlowConsumerAlert()
|
|
}
|
|
if !o.nozzleConfig.ExcludeMetricEvents {
|
|
omsMessage = m
|
|
pendingEvents[omsMessageType] = append(pendingEvents[omsMessageType], omsMessage)
|
|
}
|
|
|
|
case events.Envelope_ContainerMetric:
|
|
if !o.nozzleConfig.ExcludeMetricEvents {
|
|
omsMessage = messages.NewContainerMetric(msg, o.cachingClient)
|
|
if omsMessage != nil {
|
|
pendingEvents[omsMessageType] = append(pendingEvents[omsMessageType], omsMessage)
|
|
}
|
|
}
|
|
|
|
// Logs Errors
|
|
case events.Envelope_LogMessage:
|
|
if !o.nozzleConfig.ExcludeLogEvents {
|
|
omsMessage = messages.NewLogMessage(msg, o.cachingClient)
|
|
if omsMessage != nil {
|
|
pendingEvents[omsMessageType] = append(pendingEvents[omsMessageType], omsMessage)
|
|
}
|
|
}
|
|
|
|
case events.Envelope_Error:
|
|
if !o.nozzleConfig.ExcludeLogEvents {
|
|
omsMessage = messages.NewError(msg, o.cachingClient)
|
|
pendingEvents[omsMessageType] = append(pendingEvents[omsMessageType], omsMessage)
|
|
}
|
|
|
|
// HTTP Start/Stop
|
|
case events.Envelope_HttpStartStop:
|
|
if !o.nozzleConfig.ExcludeHttpEvents {
|
|
omsMessage = messages.NewHTTPStartStop(msg, o.cachingClient)
|
|
if omsMessage != nil {
|
|
pendingEvents[omsMessageType] = append(pendingEvents[omsMessageType], omsMessage)
|
|
}
|
|
}
|
|
default:
|
|
o.logger.Info("uncategorized message", lager.Data{"message": msg.String()})
|
|
continue
|
|
}
|
|
// When the number of one type of events reaches the max per batch, trigger the post immediately
|
|
doPost := false
|
|
for _, v := range pendingEvents {
|
|
if len(v) >= o.nozzleConfig.OmsMaxMsgNumPerBatch {
|
|
doPost = true
|
|
break
|
|
}
|
|
}
|
|
if doPost {
|
|
currentEvents := pendingEvents
|
|
pendingEvents = make(map[string][]interface{})
|
|
o.goroutineSem <- 1
|
|
go o.postData(¤tEvents, true)
|
|
}
|
|
case err := <-o.errChan:
|
|
o.logger.Error("Error while reading from the firehose", err)
|
|
|
|
if strings.Contains(err.Error(), "close 1008 (policy violation)") {
|
|
o.logger.Error("Disconnected because nozzle couldn't keep up. Please try scaling up the nozzle.", nil)
|
|
o.logSlowConsumerAlert()
|
|
}
|
|
|
|
// post the buffered messages
|
|
o.goroutineSem <- 1
|
|
o.postData(&pendingEvents, true)
|
|
|
|
o.logger.Error("Closing connection with traffic controller", nil)
|
|
o.firehoseClient.CloseConsumer()
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
// Log slowConsumerAlert as a ValueMetric event to OMS
|
|
func (o *OmsNozzle) logSlowConsumerAlert() {
|
|
name := "slowConsumerAlert"
|
|
value := float64(1)
|
|
unit := "b"
|
|
valueMetric := &events.ValueMetric{
|
|
Name: &name,
|
|
Value: &value,
|
|
Unit: &unit,
|
|
}
|
|
|
|
timeStamp := time.Now().UnixNano()
|
|
eventType := events.Envelope_ValueMetric
|
|
job := "nozzle"
|
|
origin := "alert"
|
|
envelope := &events.Envelope{
|
|
EventType: &eventType,
|
|
Timestamp: &timeStamp,
|
|
Job: &job,
|
|
Origin: &origin,
|
|
ValueMetric: valueMetric,
|
|
}
|
|
|
|
var omsMsg OMSMessage
|
|
omsMsg = messages.NewValueMetric(envelope, o.cachingClient)
|
|
currentEvents := make(map[string][]interface{})
|
|
currentEvents[eventType.String()] = append(currentEvents[eventType.String()], omsMsg)
|
|
|
|
o.goroutineSem <- 1
|
|
o.postData(¤tEvents, false)
|
|
}
|
|
|
|
// OMSMessage is a marker inteface for JSON formatted messages published to OMS
|
|
type OMSMessage interface{}
|