416 строки
10 KiB
Go
416 строки
10 KiB
Go
// Copyright 2018 Microsoft. All rights reserved.
|
|
// MIT License
|
|
|
|
package telemetry
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"encoding/json"
|
|
"net"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/Azure/azure-container-networking/common"
|
|
"github.com/Azure/azure-container-networking/log"
|
|
"github.com/Azure/azure-container-networking/platform"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// TelemetryConfig - telemetry config read by telemetry service
|
|
type TelemetryConfig struct {
|
|
ReportToHostIntervalInSeconds time.Duration `json:"reportToHostIntervalInSeconds"`
|
|
DisableAll bool
|
|
DisableTrace bool
|
|
DisableMetric bool
|
|
DisableMetadataThread bool
|
|
DebugMode bool
|
|
DisableTelemetryToNetAgent bool
|
|
RefreshTimeoutInSecs int
|
|
BatchIntervalInSecs int
|
|
BatchSizeInBytes int
|
|
GetEnvRetryCount int
|
|
GetEnvRetryWaitTimeInSecs int
|
|
}
|
|
|
|
// FdName - file descriptor name
|
|
// Delimiter - delimiter for socket reads/writes
|
|
// MaxPayloadSize - max buffer size in bytes
|
|
const (
|
|
FdName = "azure-vnet-telemetry"
|
|
Delimiter = '\n'
|
|
MaxPayloadSize = 4096
|
|
MaxNumReports = 1000
|
|
)
|
|
|
|
// TelemetryBuffer object
|
|
type TelemetryBuffer struct {
|
|
client net.Conn
|
|
listener net.Listener
|
|
connections []net.Conn
|
|
FdExists bool
|
|
Connected bool
|
|
data chan interface{}
|
|
cancel chan bool
|
|
mutex sync.Mutex
|
|
logger *zap.Logger
|
|
plc platform.ExecClient
|
|
}
|
|
|
|
// Buffer object holds the different types of reports
|
|
type Buffer struct {
|
|
CNIReports []CNIReport
|
|
}
|
|
|
|
// NewTelemetryBuffer - create a new TelemetryBuffer
|
|
func NewTelemetryBuffer(logger *zap.Logger) *TelemetryBuffer {
|
|
var tb TelemetryBuffer
|
|
|
|
tb.data = make(chan interface{}, MaxNumReports)
|
|
tb.cancel = make(chan bool, 1)
|
|
tb.connections = make([]net.Conn, 0)
|
|
tb.logger = logger
|
|
tb.plc = platform.NewExecClient(tb.logger)
|
|
|
|
return &tb
|
|
}
|
|
|
|
func remove(s []net.Conn, i int) []net.Conn {
|
|
if len(s) > 0 && i < len(s) {
|
|
s[i] = s[len(s)-1]
|
|
return s[:len(s)-1]
|
|
}
|
|
|
|
log.Logf("tb connections remove failed index %v len %v", i, len(s))
|
|
return s
|
|
}
|
|
|
|
// Starts Telemetry server listening on unix domain socket
|
|
func (tb *TelemetryBuffer) StartServer() error {
|
|
err := tb.Listen(FdName)
|
|
if err != nil {
|
|
tb.FdExists = strings.Contains(err.Error(), "in use") || strings.Contains(err.Error(), "Access is denied")
|
|
if tb.logger != nil {
|
|
tb.logger.Error("Listen returns", zap.Error(err))
|
|
} else {
|
|
log.Logf("Listen returns: %v", err.Error())
|
|
}
|
|
return err
|
|
}
|
|
|
|
if tb.logger != nil {
|
|
tb.logger.Info("Telemetry service started")
|
|
} else {
|
|
log.Logf("Telemetry service started")
|
|
}
|
|
// Spawn server goroutine to handle incoming connections
|
|
go func() {
|
|
for {
|
|
// Spawn worker goroutines to communicate with client
|
|
conn, err := tb.listener.Accept()
|
|
if err == nil {
|
|
tb.mutex.Lock()
|
|
tb.connections = append(tb.connections, conn)
|
|
tb.mutex.Unlock()
|
|
go func() {
|
|
defer func() {
|
|
var index int
|
|
var value net.Conn
|
|
var found bool
|
|
|
|
tb.mutex.Lock()
|
|
defer tb.mutex.Unlock()
|
|
|
|
for index, value = range tb.connections {
|
|
if value == conn {
|
|
conn.Close()
|
|
found = true
|
|
break
|
|
}
|
|
}
|
|
|
|
if found {
|
|
tb.connections = remove(tb.connections, index)
|
|
}
|
|
}()
|
|
reader := bufio.NewReader(conn)
|
|
for {
|
|
reportStr, readErr := reader.ReadBytes(Delimiter)
|
|
if readErr != nil {
|
|
return
|
|
}
|
|
reportStr = reportStr[:len(reportStr)-1]
|
|
|
|
var tmp map[string]interface{}
|
|
err = json.Unmarshal(reportStr, &tmp)
|
|
if err != nil {
|
|
if tb.logger != nil {
|
|
tb.logger.Error("StartServer: unmarshal error", zap.Error(err))
|
|
} else {
|
|
log.Logf("StartServer: unmarshal error:%v", err)
|
|
}
|
|
return
|
|
}
|
|
if _, ok := tmp["CniSucceeded"]; ok {
|
|
var cniReport CNIReport
|
|
err = json.Unmarshal([]byte(reportStr), &cniReport)
|
|
if err != nil {
|
|
return
|
|
}
|
|
tb.data <- cniReport
|
|
} else if _, ok := tmp["Metric"]; ok {
|
|
var aiMetric AIMetric
|
|
err = json.Unmarshal([]byte(reportStr), &aiMetric)
|
|
if err != nil {
|
|
return
|
|
}
|
|
tb.data <- aiMetric
|
|
} else {
|
|
if tb.logger != nil {
|
|
tb.logger.Info("StartServer: default", zap.Any("case", tmp))
|
|
} else {
|
|
log.Logf("StartServer: default case:%+v...", tmp)
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
} else {
|
|
if tb.logger != nil {
|
|
tb.logger.Error("Telemetry Server accept error", zap.Error(err))
|
|
} else {
|
|
log.Logf("Telemetry Server accept error %v", err)
|
|
}
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (tb *TelemetryBuffer) Connect() error {
|
|
err := tb.Dial(FdName)
|
|
if err == nil {
|
|
tb.Connected = true
|
|
} else if tb.FdExists {
|
|
tb.Cleanup(FdName)
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
// PushData - PushData running an instance if it isn't already being run elsewhere
|
|
func (tb *TelemetryBuffer) PushData(ctx context.Context) {
|
|
defer tb.Close()
|
|
|
|
for {
|
|
select {
|
|
case report := <-tb.data:
|
|
tb.mutex.Lock()
|
|
push(report)
|
|
tb.mutex.Unlock()
|
|
case <-tb.cancel:
|
|
if tb.logger != nil {
|
|
tb.logger.Info("server cancel event")
|
|
} else {
|
|
log.Logf("[Telemetry] server cancel event")
|
|
}
|
|
return
|
|
case <-ctx.Done():
|
|
if tb.logger != nil {
|
|
tb.logger.Info("received context done event")
|
|
} else {
|
|
log.Logf("[Telemetry] received context done event")
|
|
}
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Write - write to the file descriptor.
|
|
func (tb *TelemetryBuffer) Write(b []byte) (c int, err error) {
|
|
buf := make([]byte, len(b))
|
|
copy(buf, b)
|
|
//nolint:makezero //keeping old code
|
|
buf = append(buf, Delimiter)
|
|
w := bufio.NewWriter(tb.client)
|
|
c, err = w.Write(buf)
|
|
if err == nil {
|
|
err = w.Flush()
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// Cancel - signal to tear down telemetry buffer
|
|
func (tb *TelemetryBuffer) Cancel() {
|
|
tb.cancel <- true
|
|
}
|
|
|
|
// Close - close all connections
|
|
func (tb *TelemetryBuffer) Close() {
|
|
if tb.client != nil {
|
|
tb.client.Close()
|
|
tb.client = nil
|
|
}
|
|
|
|
if tb.listener != nil {
|
|
if tb.logger != nil {
|
|
tb.logger.Info("server close")
|
|
} else {
|
|
log.Logf("server close")
|
|
}
|
|
tb.listener.Close()
|
|
}
|
|
|
|
tb.mutex.Lock()
|
|
defer tb.mutex.Unlock()
|
|
|
|
for _, conn := range tb.connections {
|
|
if conn != nil {
|
|
conn.Close()
|
|
}
|
|
}
|
|
|
|
tb.connections = nil
|
|
tb.connections = make([]net.Conn, 0)
|
|
}
|
|
|
|
// push - push the report (x) to corresponding slice
|
|
func push(x interface{}) {
|
|
switch y := x.(type) {
|
|
case CNIReport:
|
|
SendAITelemetry(y)
|
|
|
|
case AIMetric:
|
|
SendAIMetric(y)
|
|
default:
|
|
log.Printf("Push fn: Default case:%+v", y)
|
|
}
|
|
}
|
|
|
|
// WaitForTelemetrySocket - Block still pipe/sock created or until max attempts retried
|
|
func WaitForTelemetrySocket(maxAttempt int, waitTimeInMillisecs time.Duration) {
|
|
for attempt := 0; attempt < maxAttempt; attempt++ {
|
|
if SockExists() {
|
|
break
|
|
}
|
|
|
|
time.Sleep(waitTimeInMillisecs * time.Millisecond)
|
|
}
|
|
}
|
|
|
|
// StartTelemetryService - Kills if any telemetry service runs and start new telemetry service
|
|
func (tb *TelemetryBuffer) StartTelemetryService(path string, args []string) error {
|
|
err := tb.plc.KillProcessByName(TelemetryServiceProcessName)
|
|
if err != nil {
|
|
if tb.logger != nil {
|
|
tb.logger.Error("Failed to kill process by", zap.String("TelemetryServiceProcessName", TelemetryServiceProcessName), zap.Error(err))
|
|
} else {
|
|
log.Logf("[Telemetry] Failed to kill process by telemetryServiceProcessName %s due to %v", TelemetryServiceProcessName, err)
|
|
}
|
|
}
|
|
|
|
if tb.logger != nil {
|
|
tb.logger.Info("Starting telemetry service process", zap.String("path", path), zap.Any("args", args))
|
|
} else {
|
|
log.Logf("[Telemetry] Starting telemetry service process :%v args:%v", path, args)
|
|
}
|
|
|
|
if err := common.StartProcess(path, args); err != nil {
|
|
if tb.logger != nil {
|
|
tb.logger.Error("Failed to start telemetry service process", zap.Error(err))
|
|
} else {
|
|
log.Logf("[Telemetry] Failed to start telemetry service process :%v", err)
|
|
}
|
|
return err
|
|
}
|
|
|
|
if tb.logger != nil {
|
|
tb.logger.Info("Telemetry service started")
|
|
} else {
|
|
log.Logf("[Telemetry] Telemetry service started")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ReadConfigFile - Read telemetry config file and populate to structure
|
|
func ReadConfigFile(filePath string) (TelemetryConfig, error) {
|
|
config := TelemetryConfig{}
|
|
|
|
b, err := os.ReadFile(filePath)
|
|
if err != nil {
|
|
return config, err
|
|
}
|
|
|
|
if err = json.Unmarshal(b, &config); err != nil {
|
|
return config, err // nolint
|
|
}
|
|
|
|
return config, err
|
|
}
|
|
|
|
// ConnectToTelemetryService - Attempt to spawn telemetry process if it's not already running.
|
|
func (tb *TelemetryBuffer) ConnectToTelemetryService(telemetryNumRetries, telemetryWaitTimeInMilliseconds int) {
|
|
path, dir := getTelemetryServiceDirectory()
|
|
args := []string{"-d", dir}
|
|
|
|
for attempt := 0; attempt < 2; attempt++ {
|
|
if err := tb.Connect(); err != nil {
|
|
if tb.logger != nil {
|
|
tb.logger.Error("Connection to telemetry socket failed", zap.Error(err))
|
|
} else {
|
|
log.Logf("Connection to telemetry socket failed: %v", err)
|
|
}
|
|
if _, exists := os.Stat(path); exists != nil {
|
|
if tb.logger != nil {
|
|
tb.logger.Info("Skip starting telemetry service as file didn't exist")
|
|
} else {
|
|
log.Logf("Skip starting telemetry service as file didn't exist")
|
|
}
|
|
return
|
|
}
|
|
tb.Cleanup(FdName)
|
|
tb.StartTelemetryService(path, args) // nolint
|
|
WaitForTelemetrySocket(telemetryNumRetries, time.Duration(telemetryWaitTimeInMilliseconds))
|
|
} else {
|
|
tb.Connected = true
|
|
if tb.logger != nil {
|
|
tb.logger.Info("Connected to telemetry service")
|
|
} else {
|
|
log.Logf("Connected to telemetry service")
|
|
}
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// ConnectToTelemetry - attempt to connect to telemetry service
|
|
func (tb *TelemetryBuffer) ConnectToTelemetry() {
|
|
if err := tb.Connect(); err != nil {
|
|
log.Logf("Connection to telemetry socket failed: %v", err)
|
|
return
|
|
}
|
|
tb.Connected = true
|
|
log.Logf("Connected to telemetry service")
|
|
}
|
|
|
|
// getTelemetryServiceDirectory - check CNI install directory and Executable location for telemetry binary
|
|
func getTelemetryServiceDirectory() (path string, dir string) {
|
|
path = filepath.Join(CniInstallDir, TelemetryServiceProcessName)
|
|
|
|
if _, exists := os.Stat(path); exists != nil {
|
|
ex, _ := os.Executable()
|
|
exDir := filepath.Dir(ex)
|
|
path = filepath.Join(exDir, TelemetryServiceProcessName)
|
|
dir = exDir
|
|
} else {
|
|
dir = CniInstallDir
|
|
}
|
|
return
|
|
}
|