Removing telemetry processed on HostNetAgent (#903)

Co-authored-by: Jaeryn <tsun.chu@microsoft.com>
This commit is contained in:
Jaeryn 2021-06-21 13:22:44 -07:00 коммит произвёл GitHub
Родитель ae9aedcfdf
Коммит 8acc37d6eb
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
10 изменённых файлов: 40 добавлений и 418 удалений

Просмотреть файл

@ -184,7 +184,7 @@ func main() {
// CNI Acquires lock
if err = netPlugin.Plugin.InitializeKeyValueStore(&config); err != nil {
log.Errorf("Failed to initialize key-value store of network plugin, err:%v.\n", err)
tb := telemetry.NewTelemetryBuffer("")
tb := telemetry.NewTelemetryBuffer()
if tberr := tb.Connect(); tberr == nil {
reportPluginError(reportManager, tb, err)
tb.Close()
@ -212,7 +212,7 @@ func main() {
// Start telemetry process if not already started. This should be done inside lock, otherwise multiple process
// end up creating/killing telemetry process results in undesired state.
tb := telemetry.NewTelemetryBuffer("")
tb := telemetry.NewTelemetryBuffer()
tb.ConnectToTelemetryService(telemetryNumRetries, telemetryWaitTimeInMilliseconds)
defer tb.Close()

Просмотреть файл

@ -157,14 +157,14 @@ func main() {
log.Logf("Config after setting defaults %+v", config)
// Cleaning up orphan socket if present
tbtemp := telemetry.NewTelemetryBuffer("")
tbtemp := telemetry.NewTelemetryBuffer()
tbtemp.Cleanup(telemetry.FdName)
for {
tb = telemetry.NewTelemetryBuffer("")
tb = telemetry.NewTelemetryBuffer()
log.Logf("[Telemetry] Starting telemetry server")
err = tb.StartServer(config.DisableTelemetryToNetAgent)
err = tb.StartServer()
if err == nil || tb.FdExists {
break
}
@ -189,7 +189,7 @@ func main() {
err = telemetry.CreateAITelemetryHandle(aiConfig, config.DisableAll, config.DisableTrace, config.DisableMetric)
log.Printf("[Telemetry] AI Handle creation status:%v", err)
log.Logf("[Telemetry] Report to host for an interval of %d seconds", config.ReportToHostIntervalInSeconds)
tb.BufferAndPushData(config.ReportToHostIntervalInSeconds * time.Second)
tb.PushData()
telemetry.CloseAITelemetryHandle()
log.Close()

Просмотреть файл

@ -135,7 +135,7 @@ func main() {
CNIReport: reportManager.Report.(*telemetry.CNIReport),
}
tb := telemetry.NewTelemetryBuffer("")
tb := telemetry.NewTelemetryBuffer()
tb.ConnectToTelemetryService(telemetryNumRetries, telemetryWaitTimeInMilliseconds)
defer tb.Close()

Просмотреть файл

@ -4,29 +4,11 @@
package logger
import (
"reflect"
"regexp"
"time"
"github.com/Azure/azure-container-networking/aitelemetry"
"github.com/Azure/azure-container-networking/log"
"github.com/Azure/azure-container-networking/platform"
"github.com/Azure/azure-container-networking/telemetry"
"github.com/google/uuid"
)
const (
// CNSTelemetryFile - telemetry file path.
cnsTelemetryFile = platform.CNSRuntimePath + "AzureCNSTelemetry.json"
errorcodePrefix = 5
heartbeatIntervalInMinutes = 30
retryWaitTimeInSeconds = 60
telemetryNumRetries = 5
telemetryWaitTimeInMilliseconds = 200
)
var codeRegex = regexp.MustCompile(`Code:(\w*)`)
func SendHeartBeat(heartbeatIntervalInMins int, stopheartbeat chan bool) {
heartbeat := time.NewTicker(time.Minute * time.Duration(heartbeatIntervalInMins)).C
metric := aitelemetry.Metric{
@ -44,55 +26,3 @@ func SendHeartBeat(heartbeatIntervalInMins int, stopheartbeat chan bool) {
}
}
}
// SendCnsTelemetry - handles cns telemetry reports
func SendToTelemetryService(reports chan interface{}, telemetryStopProcessing chan bool) {
CONNECT:
tb := telemetry.NewTelemetryBuffer("")
tb.ConnectToTelemetryService(telemetryNumRetries, telemetryWaitTimeInMilliseconds)
if tb.Connected {
reportMgr := telemetry.ReportManager{
ContentType: telemetry.ContentType,
Report: &telemetry.CNSReport{},
}
reportMgr.GetReportState(cnsTelemetryFile)
reportMgr.GetKernelVersion()
for {
select {
case msg := <-reports:
codeStr := codeRegex.FindString(msg.(string))
if len(codeStr) > errorcodePrefix {
reflect.ValueOf(reportMgr.Report).Elem().FieldByName("Errorcode").SetString(codeStr[errorcodePrefix:])
}
reflect.ValueOf(reportMgr.Report).Elem().FieldByName("EventMessage").SetString(msg.(string))
case <-telemetryStopProcessing:
tb.Close()
return
}
reflect.ValueOf(reportMgr.Report).Elem().FieldByName("Timestamp").SetString(time.Now().UTC().String())
if id, err := uuid.NewUUID(); err == nil {
reflect.ValueOf(reportMgr.Report).Elem().FieldByName("UUID").SetString(id.String())
}
if !reportMgr.GetReportState(cnsTelemetryFile) {
reportMgr.SetReportState(cnsTelemetryFile)
}
report, err := reportMgr.ReportToBytes()
if err == nil {
// If write fails, try to re-establish connections as server/client
if _, err = tb.Write(report); err != nil {
log.Logf("[CNS-Telemetry] Telemetry write failed: %v", err)
tb.Close()
goto CONNECT
}
}
}
}
}

Просмотреть файл

@ -51,10 +51,6 @@ func InitAI(aiConfig aitelemetry.AIConfig, disableTraceLogging, disableMetricLog
Log.DisableEventLogging = disableEventLogging
}
func InitReportChannel(reports chan interface{}) {
Log.logger.SetChannel(reports)
}
// Close CNS and AI telemetry handle
func Close() {
Log.logger.Close()

Просмотреть файл

@ -57,9 +57,6 @@ const (
// Version is populated by make during build.
var version string
// Reports channel
var reports = make(chan interface{})
var telemetryStopProcessing = make(chan bool)
var stopheartbeat = make(chan bool)
var stopSnapshots = make(chan bool)
@ -436,7 +433,6 @@ func main() {
}
logger.InitAI(aiConfig, ts.DisableTrace, ts.DisableMetric, ts.DisableEvent)
logger.InitReportChannel(reports)
}
// Log platform information.
@ -537,7 +533,6 @@ func main() {
}
if !disableTelemetry {
go logger.SendToTelemetryService(reports, telemetryStopProcessing)
go logger.SendHeartBeat(cnsconfig.TelemetrySettings.HeartBeatIntervalInMins, stopheartbeat)
go httpRestService.SendNCSnapShotPeriodically(cnsconfig.TelemetrySettings.SnapshotIntervalInMins, stopSnapshots)
}
@ -859,8 +854,6 @@ func InitializeCRDState(httpRestService cns.HTTPService, cnsconfig configuration
return
}
}
logger.Printf("[Azure CNS] Exiting SyncHostNCVersion")
}()
return nil

Просмотреть файл

@ -53,7 +53,6 @@ type Logger struct {
maxFileCount int
callCount int
directory string
reports chan interface{}
mutex *sync.Mutex
}
@ -91,11 +90,6 @@ func (logger *Logger) SetLogFileLimits(maxFileSize int, maxFileCount int) {
logger.maxFileCount = maxFileCount
}
// SetChannel sets the channel for error message reports.
func (logger *Logger) SetChannel(reports chan interface{}) {
logger.reports = reports
}
// Close closes the log stream.
func (logger *Logger) Close() {
if logger.out != nil {
@ -223,14 +217,7 @@ func (logger *Logger) Printf(format string, args ...interface{}) {
return
}
logger.mutex.Lock()
logger.logf(format, args...)
logger.mutex.Unlock()
go func() {
if logger.reports != nil {
logger.reports <- fmt.Sprintf(format, args...)
}
}()
logger.Logf(format, args...)
}
// Debugf logs a formatted string at info level.
@ -239,22 +226,10 @@ func (logger *Logger) Debugf(format string, args ...interface{}) {
return
}
logger.mutex.Lock()
logger.logf(format, args...)
logger.mutex.Unlock()
go func() {
if logger.reports != nil {
logger.reports <- fmt.Sprintf(format, args...)
}
}()
logger.Logf(format, args...)
}
// Errorf logs a formatted string at info level and sends the string to TelemetryBuffer.
func (logger *Logger) Errorf(format string, args ...interface{}) {
logger.Logf(format, args...)
go func() {
if logger.reports != nil {
logger.reports <- fmt.Sprintf(format, args...)
}
}()
}

Просмотреть файл

@ -105,20 +105,6 @@ type AIMetric struct {
Metric aitelemetry.Metric
}
// Azure CNS Telemetry Report structure.
type CNSReport struct {
IsNewInstance bool
CPUUsage string
MemoryUsage string
Processes string
EventMessage string
DncPartitionKey string
Timestamp string
UUID string
Errorcode string
Metadata common.Metadata `json:"compute"`
}
// ClusterState contains the current kubernetes cluster state.
type ClusterState struct {
PodCount int
@ -142,24 +128,6 @@ type NPMReport struct {
Metadata common.Metadata `json:"compute"`
}
// DNCReport structure.
type DNCReport struct {
IsNewInstance bool
CPUUsage string
MemoryUsage string
Processes string
EventMessage string
PartitionKey string
Allocations string
Timestamp string
NumberOfNodes int
NumberOfNCs int
Orchestrator string
ContainerType string
Errorcode string
Metadata common.Metadata `json:"compute"`
}
// ReportManager structure.
type ReportManager struct {
HostNetAgentURL string
@ -374,9 +342,6 @@ func (reportMgr *ReportManager) ReportToBytes() ([]byte, error) {
switch reportMgr.Report.(type) {
case *CNIReport:
case *NPMReport:
case *DNCReport:
case *CNSReport:
case *AIMetric:
default:
err = fmt.Errorf("[Telemetry] Invalid report type")

Просмотреть файл

@ -105,10 +105,10 @@ func TestMain(m *testing.M) {
reportManager.ContentType = "application/json"
reportManager.Report = &CNIReport{}
tb = NewTelemetryBuffer(hostAgentUrl)
err = tb.StartServer(false)
tb = NewTelemetryBuffer()
err = tb.StartServer()
if err == nil {
go tb.BufferAndPushData(0)
go tb.PushData()
}
if err := tb.Connect(); err != nil {
@ -200,14 +200,14 @@ func TestCloseTelemetryConnection(t *testing.T) {
func TestServerCloseTelemetryConnection(t *testing.T) {
// create server telemetrybuffer and start server
tb = NewTelemetryBuffer(hostAgentUrl)
err := tb.StartServer(false)
tb = NewTelemetryBuffer()
err := tb.StartServer()
if err == nil {
go tb.BufferAndPushData(0)
go tb.PushData()
}
// create client telemetrybuffer and connect to server
tb1 := NewTelemetryBuffer(hostAgentUrl)
tb1 := NewTelemetryBuffer()
if err := tb1.Connect(); err != nil {
t.Errorf("connection to telemetry server failed %v", err)
}
@ -233,10 +233,10 @@ func TestServerCloseTelemetryConnection(t *testing.T) {
func TestClientCloseTelemetryConnection(t *testing.T) {
// create server telemetrybuffer and start server
tb = NewTelemetryBuffer(hostAgentUrl)
err := tb.StartServer(false)
tb = NewTelemetryBuffer()
err := tb.StartServer()
if err == nil {
go tb.BufferAndPushData(0)
go tb.PushData()
}
if !SockExists() {
@ -244,7 +244,7 @@ func TestClientCloseTelemetryConnection(t *testing.T) {
}
// create client telemetrybuffer and connect to server
tb1 := NewTelemetryBuffer(hostAgentUrl)
tb1 := NewTelemetryBuffer()
if err := tb1.Connect(); err != nil {
t.Errorf("connection to telemetry server failed %v", err)
}

Просмотреть файл

@ -5,16 +5,12 @@ package telemetry
import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"math/rand"
"net"
"net/http"
"os"
"path/filepath"
"reflect"
"strings"
"sync"
"time"
@ -43,27 +39,12 @@ type TelemetryConfig struct {
// FdName - file descriptor name
// Delimiter - delimiter for socket reads/writes
// azureHostReportURL - host net agent url of type buffer
// DefaultInterval - default interval for sending buffer to host
// logName - telemetry log name
// MaxPayloadSize - max buffer size in bytes
const (
FdName = "azure-vnet-telemetry"
Delimiter = '\n'
azureHostReportURL = "http://168.63.129.16/machine/plugins?comp=netagent&type=payload"
minInterval = 10 * time.Second
logName = "azure-vnet-telemetry"
MaxPayloadSize = 4096
MaxNumReports = 1000
dnc = "DNC"
cns = "CNS"
npm = "NPM"
cni = "CNI"
)
var (
payloadSize uint16 = 0
disableTelemetryToNetAgent bool
FdName = "azure-vnet-telemetry"
Delimiter = '\n'
MaxPayloadSize = 4096
MaxNumReports = 1000
)
// TelemetryBuffer object
@ -72,7 +53,6 @@ type TelemetryBuffer struct {
listener net.Listener
connections []net.Conn
azureHostReportURL string
buffer Buffer
FdExists bool
Connected bool
data chan interface{}
@ -82,27 +62,16 @@ type TelemetryBuffer struct {
// Buffer object holds the different types of reports
type Buffer struct {
DNCReports []DNCReport
CNIReports []CNIReport
NPMReports []NPMReport
CNSReports []CNSReport
}
// NewTelemetryBuffer - create a new TelemetryBuffer
func NewTelemetryBuffer(hostReportURL string) *TelemetryBuffer {
func NewTelemetryBuffer() *TelemetryBuffer {
var tb TelemetryBuffer
if hostReportURL == "" {
tb.azureHostReportURL = azureHostReportURL
}
tb.data = make(chan interface{}, MaxNumReports)
tb.cancel = make(chan bool, 1)
tb.connections = make([]net.Conn, 0)
tb.buffer.DNCReports = make([]DNCReport, 0, MaxNumReports)
tb.buffer.CNIReports = make([]CNIReport, 0, MaxNumReports)
tb.buffer.NPMReports = make([]NPMReport, 0, MaxNumReports)
tb.buffer.CNSReports = make([]CNSReport, 0, MaxNumReports)
return &tb
}
@ -118,8 +87,7 @@ func remove(s []net.Conn, i int) []net.Conn {
}
// Starts Telemetry server listening on unix domain socket
func (tb *TelemetryBuffer) StartServer(disableNetAgentChannel bool) error {
disableTelemetryToNetAgent = disableNetAgentChannel
func (tb *TelemetryBuffer) StartServer() error {
err := tb.Listen(FdName)
if err != nil {
tb.FdExists = strings.Contains(err.Error(), "in use") || strings.Contains(err.Error(), "Access is denied")
@ -143,11 +111,7 @@ func (tb *TelemetryBuffer) StartServer(disableNetAgentChannel bool) error {
if err == nil {
var tmp map[string]interface{}
json.Unmarshal(reportStr, &tmp)
if _, ok := tmp["NpmVersion"]; ok {
var npmReport NPMReport
json.Unmarshal([]byte(reportStr), &npmReport)
tb.data <- npmReport
} else if _, ok := tmp["CniSucceeded"]; ok {
if _, ok := tmp["CniSucceeded"]; ok {
var cniReport CNIReport
json.Unmarshal([]byte(reportStr), &cniReport)
tb.data <- cniReport
@ -155,14 +119,6 @@ func (tb *TelemetryBuffer) StartServer(disableNetAgentChannel bool) error {
var aiMetric AIMetric
json.Unmarshal([]byte(reportStr), &aiMetric)
tb.data <- aiMetric
} else if _, ok := tmp["Allocations"]; ok {
var dncReport DNCReport
json.Unmarshal([]byte(reportStr), &dncReport)
tb.data <- dncReport
} else if _, ok := tmp["DncPartitionKey"]; ok {
var cnsReport CNSReport
json.Unmarshal([]byte(reportStr), &cnsReport)
tb.data <- cnsReport
}
} else {
var index int
@ -209,39 +165,22 @@ func (tb *TelemetryBuffer) Connect() error {
return err
}
// BufferAndPushData - BufferAndPushData running an instance if it isn't already being run elsewhere
func (tb *TelemetryBuffer) BufferAndPushData(intervalms time.Duration) {
defer tb.Close()
if !tb.FdExists {
log.Logf("[Telemetry] Buffer telemetry data and send it to host")
if intervalms < minInterval {
intervalms = minInterval
// PushData - PushData running an instance if it isn't already being run elsewhere
func (tb *TelemetryBuffer) PushData() {
for {
select {
case report := <-tb.data:
tb.mutex.Lock()
push(report)
tb.mutex.Unlock()
case <-tb.cancel:
log.Logf("[Telemetry] server cancel event")
goto EXIT
}
interval := time.NewTicker(intervalms).C
for {
select {
case <-interval:
// Send buffer to host and clear cache when sent successfully
// To-do : if we hit max slice size in buffer, write to disk and process the logs on disk on future sends
tb.mutex.Lock()
tb.sendToHost()
tb.mutex.Unlock()
case report := <-tb.data:
tb.mutex.Lock()
tb.buffer.push(report)
tb.mutex.Unlock()
case <-tb.cancel:
log.Logf("[Telemetry] server cancel event")
goto EXIT
}
}
} else {
<-tb.cancel
log.Logf("[Telemetry] Received cancel event")
}
EXIT:
tb.Close()
}
// read - read from the file descriptor
@ -296,145 +235,8 @@ func (tb *TelemetryBuffer) Close() {
tb.connections = make([]net.Conn, 0)
}
// sendToHost - send buffer to host
func (tb *TelemetryBuffer) sendToHost() error {
if disableTelemetryToNetAgent {
return nil
}
buf := Buffer{
DNCReports: make([]DNCReport, 0),
CNIReports: make([]CNIReport, 0),
NPMReports: make([]NPMReport, 0),
CNSReports: make([]CNSReport, 0),
}
seed := rand.NewSource(time.Now().UnixNano())
i, payloadSize, maxPayloadSizeReached := rand.New(seed).Intn(reflect.ValueOf(&buf).Elem().NumField()), 0, false
isDNCReportsEmpty, isCNIReportsEmpty, isCNSReportsEmpty, isNPMReportsEmpty := false, false, false, false
for {
// craft payload in a round-robin manner.
switch i % 4 {
case 0:
reportLen := len(tb.buffer.DNCReports)
if reportLen == 0 || isDNCReportsEmpty {
isDNCReportsEmpty = true
break
}
if reportLen == 1 {
isDNCReportsEmpty = true
}
report := tb.buffer.DNCReports[0]
if bytes, err := json.Marshal(report); err == nil {
payloadSize += len(bytes)
if payloadSize > MaxPayloadSize {
maxPayloadSizeReached = true
break
}
}
buf.DNCReports = append(buf.DNCReports, report)
tb.buffer.DNCReports = tb.buffer.DNCReports[1:]
case 1:
reportLen := len(tb.buffer.CNIReports)
if reportLen == 0 || isCNIReportsEmpty {
isCNIReportsEmpty = true
break
}
if reportLen == 1 {
isCNIReportsEmpty = true
}
report := tb.buffer.CNIReports[0]
if bytes, err := json.Marshal(report); err == nil {
payloadSize += len(bytes)
if payloadSize > MaxPayloadSize {
maxPayloadSizeReached = true
break
}
}
buf.CNIReports = append(buf.CNIReports, report)
tb.buffer.CNIReports = tb.buffer.CNIReports[1:]
case 2:
reportLen := len(tb.buffer.CNSReports)
if reportLen == 0 || isCNSReportsEmpty {
isCNSReportsEmpty = true
break
}
if reportLen == 1 {
isCNSReportsEmpty = true
}
report := tb.buffer.CNSReports[0]
if bytes, err := json.Marshal(report); err == nil {
payloadSize += len(bytes)
if payloadSize > MaxPayloadSize {
maxPayloadSizeReached = true
break
}
}
buf.CNSReports = append(buf.CNSReports, report)
tb.buffer.CNSReports = tb.buffer.CNSReports[1:]
case 3:
reportLen := len(tb.buffer.NPMReports)
if reportLen == 0 || isNPMReportsEmpty {
isNPMReportsEmpty = true
break
}
if reportLen == 1 {
isNPMReportsEmpty = true
}
report := tb.buffer.NPMReports[0]
if bytes, err := json.Marshal(report); err == nil {
payloadSize += len(bytes)
if payloadSize > MaxPayloadSize {
maxPayloadSizeReached = true
break
}
}
buf.NPMReports = append(buf.NPMReports, report)
tb.buffer.NPMReports = tb.buffer.NPMReports[1:]
}
if isDNCReportsEmpty && isCNIReportsEmpty && isCNSReportsEmpty && isNPMReportsEmpty {
break
}
if maxPayloadSizeReached {
break
}
i++
}
httpc := &http.Client{}
var body bytes.Buffer
log.Logf("Sending buffer %+v", buf)
if err := json.NewEncoder(&body).Encode(buf); err != nil {
log.Logf("[Telemetry] Encode buffer error %v", err)
}
resp, err := httpc.Post(tb.azureHostReportURL, ContentType, &body)
log.Logf("[Telemetry] Got response %v", resp)
if err != nil {
return fmt.Errorf("[Telemetry] HTTP Post returned error %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("[Telemetry] HTTP Post returned statuscode %d", resp.StatusCode)
}
return nil
}
// push - push the report (x) to corresponding slice
func (buf *Buffer) push(x interface{}) {
func push(x interface{}) {
metadata, err := common.GetHostMetadata(metadataFile)
if err != nil {
log.Logf("Error getting metadata %v", err)
@ -453,56 +255,17 @@ func (buf *Buffer) push(x interface{}) {
}
switch x.(type) {
case DNCReport:
if len(buf.DNCReports) >= MaxNumReports {
return
}
dncReport := x.(DNCReport)
dncReport.Metadata = metadata
buf.DNCReports = append(buf.DNCReports, dncReport)
case CNIReport:
if len(buf.CNIReports) >= MaxNumReports {
return
}
cniReport := x.(CNIReport)
cniReport.Metadata = metadata
SendAITelemetry(cniReport)
buf.CNIReports = append(buf.CNIReports, cniReport)
case AIMetric:
aiMetric := x.(AIMetric)
SendAIMetric(aiMetric)
case NPMReport:
if len(buf.NPMReports) >= MaxNumReports {
return
}
npmReport := x.(NPMReport)
npmReport.Metadata = metadata
buf.NPMReports = append(buf.NPMReports, npmReport)
case CNSReport:
if len(buf.CNSReports) >= MaxNumReports {
return
}
cnsReport := x.(CNSReport)
cnsReport.Metadata = metadata
buf.CNSReports = append(buf.CNSReports, cnsReport)
}
}
// reset - reset buffer slices and sets payloadSize to 0
func (buf *Buffer) reset() {
buf.DNCReports = nil
buf.DNCReports = make([]DNCReport, 0)
buf.CNIReports = nil
buf.CNIReports = make([]CNIReport, 0)
buf.NPMReports = nil
buf.NPMReports = make([]NPMReport, 0)
buf.CNSReports = nil
buf.CNSReports = make([]CNSReport, 0)
payloadSize = 0
}
// WaitForTelemetrySocket - Block still pipe/sock created or until max attempts retried
func WaitForTelemetrySocket(maxAttempt int, waitTimeInMillisecs time.Duration) {
for attempt := 0; attempt < maxAttempt; attempt++ {