fixed logging part of telemetry (#312)
* fixed logging part of telemetry * fixed an issue * added more test coverage * fixed an issue * fixed invalid condition and added UT * initialize telemetry logger as part of telemetry server * changed checkifsockexists to sockexists * changed interval time to push telemetry data to host
This commit is contained in:
Родитель
159cd855cb
Коммит
83945fd6bf
|
@ -133,6 +133,52 @@ func handleIfCniUpdate(update func(*skel.CmdArgs) error) (bool, error) {
|
|||
return isupdate, nil
|
||||
}
|
||||
|
||||
// startTelemetryService - Kills if any telemetry service runs and start new telemetry service
|
||||
func startTelemetryService(path string) error {
|
||||
platform.KillProcessByName(telemetry.TelemetryServiceProcessName)
|
||||
|
||||
log.Printf("[cni] Starting telemetry service process")
|
||||
|
||||
if err := common.StartProcess(path); err != nil {
|
||||
log.Printf("[Telemetry] Failed to start telemetry service process :%v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
log.Printf("[cni] Telemetry service started")
|
||||
|
||||
for attempt := 0; attempt < 5; attempt++ {
|
||||
if telemetry.SockExists() {
|
||||
break
|
||||
}
|
||||
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func connectToTelemetryService(tb *telemetry.TelemetryBuffer) {
|
||||
path := fmt.Sprintf("%v/%v", telemetry.CniInstallDir, telemetry.TelemetryServiceProcessName)
|
||||
|
||||
for attempt := 0; attempt < 2; attempt++ {
|
||||
if err := tb.Connect(); err != nil {
|
||||
log.Printf("Connection to telemetry socket failed: %v", err)
|
||||
tb.Cleanup(telemetry.FdName)
|
||||
|
||||
if isExists, _ := common.CheckIfFileExists(path); !isExists {
|
||||
log.Printf("Skip starting telemetry service as file didn't exist")
|
||||
return
|
||||
}
|
||||
|
||||
startTelemetryService(path)
|
||||
} else {
|
||||
tb.Connected = true
|
||||
log.Printf("Connected to telemetry service")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Main is the entry point for CNI network plugin.
|
||||
func main() {
|
||||
|
||||
|
@ -170,20 +216,7 @@ func main() {
|
|||
}
|
||||
|
||||
tb := telemetry.NewTelemetryBuffer("")
|
||||
|
||||
for attempt := 0; attempt < 2; attempt++ {
|
||||
err = tb.Connect()
|
||||
if err != nil {
|
||||
log.Printf("Connection to telemetry socket failed: %v", err)
|
||||
tb.Cleanup(telemetry.FdName)
|
||||
telemetry.StartTelemetryService()
|
||||
} else {
|
||||
tb.Connected = true
|
||||
log.Printf("Connected to telemetry service")
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
connectToTelemetryService(tb)
|
||||
defer tb.Close()
|
||||
|
||||
t := time.Now()
|
||||
|
|
|
@ -17,6 +17,7 @@ import (
|
|||
"strings"
|
||||
|
||||
"github.com/Azure/azure-container-networking/common"
|
||||
"github.com/Azure/azure-container-networking/log"
|
||||
"github.com/Azure/azure-container-networking/platform"
|
||||
)
|
||||
|
||||
|
@ -236,29 +237,16 @@ func (report *NPMReport) GetReport(clusterID, nodeName, npmVersion, kubernetesVe
|
|||
// SendReport will send telemetry report to HostNetAgent.
|
||||
func (reportMgr *ReportManager) SendReport(tb *TelemetryBuffer) error {
|
||||
var err error
|
||||
var report []byte
|
||||
|
||||
if tb != nil && tb.Connected {
|
||||
telemetryLogger.Printf("[Telemetry] Going to send Telemetry report to hostnetagent")
|
||||
|
||||
switch reportMgr.Report.(type) {
|
||||
case *CNIReport:
|
||||
telemetryLogger.Printf("[Telemetry] %+v", reportMgr.Report.(*CNIReport))
|
||||
case *NPMReport:
|
||||
telemetryLogger.Printf("[Telemetry] %+v", reportMgr.Report.(*NPMReport))
|
||||
case *DNCReport:
|
||||
telemetryLogger.Printf("[Telemetry] %+v", reportMgr.Report.(*DNCReport))
|
||||
default:
|
||||
telemetryLogger.Printf("[Telemetry] Invalid report type")
|
||||
}
|
||||
|
||||
report, err := reportMgr.ReportToBytes()
|
||||
report, err = reportMgr.ReportToBytes()
|
||||
if err == nil {
|
||||
// If write fails, try to re-establish connections as server/client
|
||||
if _, err = tb.Write(report); err != nil {
|
||||
tb.Cancel()
|
||||
}
|
||||
}
|
||||
} else {
|
||||
err = fmt.Errorf("Not connected to telemetry server or tb is nil")
|
||||
}
|
||||
|
||||
return err
|
||||
|
@ -284,13 +272,12 @@ func (reportMgr *ReportManager) SetReportState(telemetryFile string) error {
|
|||
|
||||
_, err = f.Write(reportBytes)
|
||||
if err != nil {
|
||||
telemetryLogger.Printf("[Telemetry] Error while writing to file %v", err)
|
||||
log.Printf("[Telemetry] Error while writing to file %v", err)
|
||||
return fmt.Errorf("[Telemetry] Error while writing to file %v", err)
|
||||
}
|
||||
|
||||
// set IsNewInstance in report
|
||||
reflect.ValueOf(reportMgr.Report).Elem().FieldByName("IsNewInstance").SetBool(false)
|
||||
telemetryLogger.Printf("[Telemetry] SetReportState succeeded")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -298,7 +285,7 @@ func (reportMgr *ReportManager) SetReportState(telemetryFile string) error {
|
|||
func (reportMgr *ReportManager) GetReportState(telemetryFile string) bool {
|
||||
// try to set IsNewInstance in report
|
||||
if _, err := os.Stat(telemetryFile); os.IsNotExist(err) {
|
||||
telemetryLogger.Printf("[Telemetry] File not exist %v", telemetryFile)
|
||||
log.Printf("[Telemetry] File not exist %v", telemetryFile)
|
||||
reflect.ValueOf(reportMgr.Report).Elem().FieldByName("IsNewInstance").SetBool(true)
|
||||
return false
|
||||
}
|
||||
|
@ -430,7 +417,10 @@ func (report *CNIReport) GetOrchestratorDetails() {
|
|||
}
|
||||
|
||||
// ReportToBytes - returns the report bytes
|
||||
func (reportMgr *ReportManager) ReportToBytes() (report []byte, err error) {
|
||||
func (reportMgr *ReportManager) ReportToBytes() ([]byte, error) {
|
||||
var err error
|
||||
var report []byte
|
||||
|
||||
switch reportMgr.Report.(type) {
|
||||
case *CNIReport:
|
||||
case *NPMReport:
|
||||
|
@ -440,9 +430,10 @@ func (reportMgr *ReportManager) ReportToBytes() (report []byte, err error) {
|
|||
err = fmt.Errorf("[Telemetry] Invalid report type")
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
report, err = json.Marshal(reportMgr.Report)
|
||||
if err != nil {
|
||||
return []byte{}, err
|
||||
}
|
||||
|
||||
return
|
||||
report, err = json.Marshal(reportMgr.Report)
|
||||
return report, err
|
||||
}
|
||||
|
|
|
@ -94,6 +94,10 @@ func TestMain(m *testing.M) {
|
|||
reportManager.ContentType = "application/json"
|
||||
reportManager.Report = &CNIReport{}
|
||||
|
||||
if err := InitTelemetryLogger(); err == nil {
|
||||
defer CloseTelemetryLogger()
|
||||
}
|
||||
|
||||
tb = NewTelemetryBuffer(hostAgentUrl)
|
||||
err = tb.StartServer()
|
||||
if err == nil {
|
||||
|
@ -158,6 +162,14 @@ func TestSendTelemetry(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Errorf("SendTelemetry failed due to %v", err)
|
||||
}
|
||||
|
||||
i := 3
|
||||
rpMgr := &ReportManager{}
|
||||
rpMgr.Report = &i
|
||||
err = rpMgr.SendReport(tb)
|
||||
if err == nil {
|
||||
t.Errorf("SendTelemetry not failed for incorrect report type")
|
||||
}
|
||||
}
|
||||
|
||||
func TestReceiveTelemetryData(t *testing.T) {
|
||||
|
@ -214,6 +226,10 @@ func TestClientCloseTelemetryConnection(t *testing.T) {
|
|||
go tb.BufferAndPushData(0)
|
||||
}
|
||||
|
||||
if !SockExists() {
|
||||
t.Errorf("telemetry sock doesn't exist")
|
||||
}
|
||||
|
||||
// create client telemetrybuffer and connect to server
|
||||
tb1 := NewTelemetryBuffer(hostAgentUrl)
|
||||
if err := tb1.Connect(); err != nil {
|
||||
|
|
|
@ -15,9 +15,7 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/Azure/azure-container-networking/common"
|
||||
"github.com/Azure/azure-container-networking/log"
|
||||
"github.com/Azure/azure-container-networking/platform"
|
||||
)
|
||||
|
||||
// FdName - file descriptor name
|
||||
|
@ -30,7 +28,7 @@ const (
|
|||
FdName = "azure-vnet-telemetry"
|
||||
Delimiter = '\n'
|
||||
azureHostReportURL = "http://168.63.129.16/machine/plugins?comp=netagent&type=payload"
|
||||
DefaultInterval = 10 * time.Second
|
||||
DefaultInterval = 30 * time.Second
|
||||
logName = "azure-vnet-telemetry"
|
||||
MaxPayloadSize uint16 = 65535
|
||||
dnc = "DNC"
|
||||
|
@ -64,6 +62,14 @@ type Payload struct {
|
|||
CNSReports []CNSReport
|
||||
}
|
||||
|
||||
func InitTelemetryLogger() error {
|
||||
return telemetryLogger.SetTarget(log.TargetLogfile)
|
||||
}
|
||||
|
||||
func CloseTelemetryLogger() {
|
||||
telemetryLogger.Close()
|
||||
}
|
||||
|
||||
// NewTelemetryBuffer - create a new TelemetryBuffer
|
||||
func NewTelemetryBuffer(hostReportURL string) *TelemetryBuffer {
|
||||
var tb TelemetryBuffer
|
||||
|
@ -80,11 +86,6 @@ func NewTelemetryBuffer(hostReportURL string) *TelemetryBuffer {
|
|||
tb.payload.NPMReports = make([]NPMReport, 0)
|
||||
tb.payload.CNSReports = make([]CNSReport, 0)
|
||||
|
||||
err := telemetryLogger.SetTarget(log.TargetLogfile)
|
||||
if err != nil {
|
||||
fmt.Printf("Failed to configure logging: %v\n", err)
|
||||
}
|
||||
|
||||
return &tb
|
||||
}
|
||||
|
||||
|
@ -107,6 +108,8 @@ func (tb *TelemetryBuffer) StartServer() error {
|
|||
return err
|
||||
}
|
||||
|
||||
InitTelemetryLogger()
|
||||
|
||||
telemetryLogger.Printf("Telemetry service started")
|
||||
// Spawn server goroutine to handle incoming connections
|
||||
go func() {
|
||||
|
@ -251,7 +254,6 @@ func (tb *TelemetryBuffer) Cancel() {
|
|||
// Close - close all connections
|
||||
func (tb *TelemetryBuffer) Close() {
|
||||
if tb.client != nil {
|
||||
telemetryLogger.Printf("client close")
|
||||
tb.client.Close()
|
||||
tb.client = nil
|
||||
}
|
||||
|
@ -259,7 +261,7 @@ func (tb *TelemetryBuffer) Close() {
|
|||
if tb.listener != nil {
|
||||
telemetryLogger.Printf("server close")
|
||||
tb.listener.Close()
|
||||
tb.listener = nil
|
||||
CloseTelemetryLogger()
|
||||
}
|
||||
|
||||
tb.mutex.Lock()
|
||||
|
@ -267,7 +269,6 @@ func (tb *TelemetryBuffer) Close() {
|
|||
|
||||
for _, conn := range tb.connections {
|
||||
if conn != nil {
|
||||
telemetryLogger.Printf("connection close as server closed")
|
||||
conn.Close()
|
||||
}
|
||||
}
|
||||
|
@ -390,11 +391,12 @@ func getHostMetadata() (Metadata, error) {
|
|||
if err == nil {
|
||||
var metadata Metadata
|
||||
if err = json.Unmarshal(content, &metadata); err == nil {
|
||||
telemetryLogger.Printf("[Telemetry] Returning hostmetadata from state")
|
||||
return metadata, nil
|
||||
}
|
||||
}
|
||||
|
||||
telemetryLogger.Printf("[Telemetry] Request metadata from wireserver")
|
||||
|
||||
req, err := http.NewRequest("GET", metadataURL, nil)
|
||||
if err != nil {
|
||||
return Metadata{}, err
|
||||
|
@ -424,27 +426,3 @@ func getHostMetadata() (Metadata, error) {
|
|||
|
||||
return metareport.Metadata, err
|
||||
}
|
||||
|
||||
// StartTelemetryService - Kills if any telemetry service runs and start new telemetry service
|
||||
func StartTelemetryService() error {
|
||||
platform.KillProcessByName(telemetryServiceProcessName)
|
||||
|
||||
telemetryLogger.Printf("[Telemetry] Starting telemetry service process")
|
||||
path := fmt.Sprintf("%v/%v", cniInstallDir, telemetryServiceProcessName)
|
||||
if err := common.StartProcess(path); err != nil {
|
||||
telemetryLogger.Printf("[Telemetry] Failed to start telemetry service process :%v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
telemetryLogger.Printf("[Telemetry] Telemetry service started")
|
||||
|
||||
for attempt := 0; attempt < 5; attempt++ {
|
||||
if checkIfSockExists() {
|
||||
break
|
||||
}
|
||||
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -10,9 +10,9 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
fdTemplate = "/tmp/%s.sock"
|
||||
telemetryServiceProcessName = "azure-vnet-telemetry"
|
||||
cniInstallDir = "/opt/cni/bin"
|
||||
fdTemplate = "/var/run/%s.sock"
|
||||
TelemetryServiceProcessName = "azure-vnet-telemetry"
|
||||
CniInstallDir = "/opt/cni/bin"
|
||||
metadataFile = "/tmp/azuremetadata.json"
|
||||
)
|
||||
|
||||
|
@ -41,7 +41,7 @@ func (tb *TelemetryBuffer) Cleanup(name string) error {
|
|||
return os.Remove(fmt.Sprintf(fdTemplate, name))
|
||||
}
|
||||
|
||||
func checkIfSockExists() bool {
|
||||
func SockExists() bool {
|
||||
if _, err := os.Stat(fmt.Sprintf(fdTemplate, FdName)); !os.IsNotExist(err) {
|
||||
return true
|
||||
}
|
||||
|
|
|
@ -12,8 +12,8 @@ import (
|
|||
|
||||
const (
|
||||
fdTemplate = "\\\\.\\pipe\\%s"
|
||||
telemetryServiceProcessName = "azure-vnet-telemetry.exe"
|
||||
cniInstallDir = "c:\\k\\azurecni\\bin"
|
||||
TelemetryServiceProcessName = "azure-vnet-telemetry.exe"
|
||||
CniInstallDir = "c:\\k\\azurecni\\bin"
|
||||
metadataFile = "azuremetadata.json"
|
||||
)
|
||||
|
||||
|
@ -43,7 +43,7 @@ func (tb *TelemetryBuffer) Cleanup(name string) error {
|
|||
}
|
||||
|
||||
// Check if telemetry unix domain socket exists
|
||||
func checkIfSockExists() bool {
|
||||
func SockExists() bool {
|
||||
if _, err := os.Stat(fmt.Sprintf(fdTemplate, FdName)); !os.IsNotExist(err) {
|
||||
return true
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче