Remove azure-vnet-telemetry for windows multitenancy (#1430)
* Remove azure-vne-telemetry for windows multitenancy and telemetry service for windows multitenancy will be started from cns. * start telemetry service from cns * lint and log fix * minor change * addressed comment
This commit is contained in:
Родитель
fa487c4255
Коммит
ba3bbe0f26
7
Makefile
7
Makefile
|
@ -146,7 +146,7 @@ acncli-binary:
|
|||
|
||||
# Build the Azure CNS binary.
|
||||
azure-cns-binary:
|
||||
cd $(CNS_DIR) && CGO_ENABLED=0 go build -v -o $(CNS_BUILD_DIR)/azure-cns$(EXE_EXT) -ldflags "-X main.version=$(VERSION) -X $(CNS_AI_PATH)=$(CNS_AI_ID)" -gcflags="-dwarflocationlists=true"
|
||||
cd $(CNS_DIR) && CGO_ENABLED=0 go build -v -o $(CNS_BUILD_DIR)/azure-cns$(EXE_EXT) -ldflags "-X main.version=$(VERSION) -X $(CNS_AI_PATH)=$(CNS_AI_ID) -X $(CNI_AI_PATH)=$(CNI_AI_ID)" -gcflags="-dwarflocationlists=true"
|
||||
|
||||
# Build the Azure NPM binary.
|
||||
azure-npm-binary:
|
||||
|
@ -392,8 +392,11 @@ cni-archive: azure-vnet-binary azure-vnet-ipam-binary azure-vnet-ipamv6-binary a
|
|||
|
||||
$(MKDIR) $(CNI_MULTITENANCY_BUILD_DIR)
|
||||
cp cni/azure-$(GOOS)-multitenancy.conflist $(CNI_MULTITENANCY_BUILD_DIR)/10-azure.conflist
|
||||
cp $(CNI_BUILD_DIR)/azure-vnet$(EXE_EXT) $(CNI_BUILD_DIR)/azure-vnet-ipam$(EXE_EXT) $(CNI_MULTITENANCY_BUILD_DIR)
|
||||
ifeq ($(GOOS),linux)
|
||||
cp telemetry/azure-vnet-telemetry.config $(CNI_MULTITENANCY_BUILD_DIR)/azure-vnet-telemetry.config
|
||||
cp $(CNI_BUILD_DIR)/azure-vnet$(EXE_EXT) $(CNI_BUILD_DIR)/azure-vnet-ipam$(EXE_EXT) $(CNI_BUILD_DIR)/azure-vnet-telemetry$(EXE_EXT) $(CNI_MULTITENANCY_BUILD_DIR)
|
||||
cp $(CNI_BUILD_DIR)/azure-vnet-telemetry$(EXE_EXT) $(CNI_MULTITENANCY_BUILD_DIR)
|
||||
endif
|
||||
cd $(CNI_MULTITENANCY_BUILD_DIR) && $(ARCHIVE_CMD) $(CNI_MULTITENANCY_ARCHIVE_NAME) azure-vnet$(EXE_EXT) azure-vnet-ipam$(EXE_EXT) azure-vnet-telemetry$(EXE_EXT) 10-azure.conflist azure-vnet-telemetry.config
|
||||
|
||||
$(MKDIR) $(CNI_SWIFT_BUILD_DIR)
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
type Report struct {
|
||||
Message string
|
||||
Context string
|
||||
AppVersion string
|
||||
CustomDimensions map[string]string
|
||||
}
|
||||
|
||||
|
@ -25,6 +26,7 @@ type Event struct {
|
|||
type Metric struct {
|
||||
Name string
|
||||
Value float64
|
||||
AppVersion string
|
||||
CustomDimensions map[string]string
|
||||
}
|
||||
|
||||
|
|
|
@ -205,6 +205,11 @@ func (th *telemetryHandle) TrackLog(report Report) {
|
|||
// Initialize new trace message
|
||||
trace := appinsights.NewTraceTelemetry(report.Message, appinsights.Warning)
|
||||
|
||||
// will be empty if cns used as telemetry service for cni
|
||||
if th.appVersion == "" {
|
||||
th.appVersion = report.AppVersion
|
||||
}
|
||||
|
||||
// Override few of existing columns with metadata
|
||||
trace.Tags.User().SetAuthUserId(runtime.GOOS)
|
||||
trace.Tags.Operation().SetId(report.Context)
|
||||
|
@ -295,6 +300,10 @@ func (th *telemetryHandle) TrackMetric(metric Metric) {
|
|||
metadata := th.metadata
|
||||
th.rwmutex.RUnlock()
|
||||
|
||||
if th.appVersion == "" {
|
||||
th.appVersion = metric.AppVersion
|
||||
}
|
||||
|
||||
// Check if metadata is populated
|
||||
if metadata.SubscriptionID != "" {
|
||||
aimetric.Properties[locationStr] = metadata.Location
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"encoding/json"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/Azure/azure-container-networking/aitelemetry"
|
||||
|
@ -161,6 +162,19 @@ func (plugin *NetPlugin) Start(config *common.PluginConfig) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// This function for sending CNI metrics to telemetry service
|
||||
func logAndSendEvent(plugin *NetPlugin, msg string) {
|
||||
log.Printf(msg)
|
||||
sendEvent(plugin, msg)
|
||||
}
|
||||
|
||||
func sendEvent(plugin *NetPlugin, msg string) {
|
||||
eventMsg := fmt.Sprintf("[%d] %s", os.Getpid(), msg)
|
||||
plugin.report.Version = plugin.Version
|
||||
plugin.report.EventMessage = eventMsg
|
||||
telemetry.SendCNIEvent(plugin.tb, plugin.report)
|
||||
}
|
||||
|
||||
func (plugin *NetPlugin) GetAllEndpointState(networkid string) (*api.AzureCNIState, error) {
|
||||
st := api.AzureCNIState{
|
||||
ContainerInterfaces: make(map[string]api.PodNetworkInterfaceInfo),
|
||||
|
@ -321,7 +335,7 @@ func (plugin *NetPlugin) Add(args *cniSkel.CmdArgs) error {
|
|||
|
||||
startTime := time.Now()
|
||||
|
||||
telemetry.LogAndSendEvent(plugin.tb, fmt.Sprintf("[cni-net] Processing ADD command with args {ContainerID:%v Netns:%v IfName:%v Args:%v Path:%v StdinData:%s}.",
|
||||
logAndSendEvent(plugin, fmt.Sprintf("[cni-net] Processing ADD command with args {ContainerID:%v Netns:%v IfName:%v Args:%v Path:%v StdinData:%s}.",
|
||||
args.ContainerID, args.Netns, args.IfName, args.Args, args.Path, args.StdinData))
|
||||
|
||||
// Parse network configuration from stdin.
|
||||
|
@ -339,6 +353,7 @@ func (plugin *NetPlugin) Add(args *cniSkel.CmdArgs) error {
|
|||
cniMetric.Metric = aitelemetry.Metric{
|
||||
Name: telemetry.CNIAddTimeMetricStr,
|
||||
Value: float64(operationTimeMs),
|
||||
AppVersion: plugin.Version,
|
||||
CustomDimensions: make(map[string]string),
|
||||
}
|
||||
SetCustomDimensions(&cniMetric, nwCfg, err)
|
||||
|
@ -501,7 +516,7 @@ func (plugin *NetPlugin) Add(args *cniSkel.CmdArgs) error {
|
|||
}
|
||||
}
|
||||
|
||||
telemetry.SendCNIEvent(plugin.tb, fmt.Sprintf("Allocated IPAddress from ipam:%+v v6:%+v", ipamAddResult.ipv4Result, ipamAddResult.ipv6Result))
|
||||
sendEvent(plugin, fmt.Sprintf("Allocated IPAddress from ipam:%+v v6:%+v", ipamAddResult.ipv4Result, ipamAddResult.ipv6Result))
|
||||
|
||||
defer func() {
|
||||
if err != nil {
|
||||
|
@ -512,14 +527,14 @@ func (plugin *NetPlugin) Add(args *cniSkel.CmdArgs) error {
|
|||
// Create network
|
||||
if nwInfoErr != nil {
|
||||
// Network does not exist.
|
||||
telemetry.LogAndSendEvent(plugin.tb, fmt.Sprintf("[cni-net] Creating network %v.", networkID))
|
||||
logAndSendEvent(plugin, fmt.Sprintf("[cni-net] Creating network %v.", networkID))
|
||||
// opts map needs to get passed in here
|
||||
if nwInfo, err = plugin.createNetworkInternal(networkID, policies, ipamAddConfig, ipamAddResult); err != nil {
|
||||
log.Errorf("Create network failed: %w", err)
|
||||
return err
|
||||
}
|
||||
|
||||
telemetry.LogAndSendEvent(plugin.tb, fmt.Sprintf("[cni-net] Created network %v with subnet %v.", networkID, ipamAddResult.hostSubnetPrefix.String()))
|
||||
logAndSendEvent(plugin, fmt.Sprintf("[cni-net] Created network %v with subnet %v.", networkID, ipamAddResult.hostSubnetPrefix.String()))
|
||||
}
|
||||
|
||||
natInfo := getNATInfo(nwCfg.ExecutionMode, options[network.SNATIPKey], nwCfg.MultiTenancy, enableSnatForDNS)
|
||||
|
@ -546,7 +561,7 @@ func (plugin *NetPlugin) Add(args *cniSkel.CmdArgs) error {
|
|||
return err
|
||||
}
|
||||
|
||||
telemetry.LogAndSendEvent(plugin.tb, fmt.Sprintf("CNI ADD succeeded : IP:%+v, VlanID: %v, podname %v, namespace %v numendpoints:%d",
|
||||
sendEvent(plugin, fmt.Sprintf("CNI ADD succeeded : IP:%+v, VlanID: %v, podname %v, namespace %v numendpoints:%d",
|
||||
ipamAddResult.ipv4Result.IPs, epInfo.Data[network.VlanIDKey], k8sPodName, k8sNamespace, plugin.nm.GetNumberOfEndpoints("", nwCfg.Name)))
|
||||
|
||||
return nil
|
||||
|
@ -749,7 +764,7 @@ func (plugin *NetPlugin) createEndpointInternal(opt *createEndpointInternalOpt)
|
|||
}
|
||||
|
||||
// Create the endpoint.
|
||||
telemetry.LogAndSendEvent(plugin.tb, fmt.Sprintf("[cni-net] Creating endpoint %s.", epInfo.PrettyString()))
|
||||
logAndSendEvent(plugin, fmt.Sprintf("[cni-net] Creating endpoint %s.", epInfo.PrettyString()))
|
||||
err = plugin.nm.CreateEndpoint(cnsclient, opt.nwInfo.Id, &epInfo)
|
||||
if err != nil {
|
||||
err = plugin.Errorf("Failed to create endpoint: %v", err)
|
||||
|
@ -859,12 +874,11 @@ func (plugin *NetPlugin) Delete(args *cniSkel.CmdArgs) error {
|
|||
nwInfo network.NetworkInfo
|
||||
epInfo *network.EndpointInfo
|
||||
cniMetric telemetry.AIMetric
|
||||
msg string
|
||||
)
|
||||
|
||||
startTime := time.Now()
|
||||
|
||||
telemetry.LogAndSendEvent(plugin.tb, fmt.Sprintf("[cni-net] Processing DEL command with args {ContainerID:%v Netns:%v IfName:%v Args:%v Path:%v, StdinData:%s}.",
|
||||
logAndSendEvent(plugin, fmt.Sprintf("[cni-net] Processing DEL command with args {ContainerID:%v Netns:%v IfName:%v Args:%v Path:%v, StdinData:%s}.",
|
||||
args.ContainerID, args.Netns, args.IfName, args.Args, args.Path, args.StdinData))
|
||||
|
||||
defer func() {
|
||||
|
@ -883,6 +897,8 @@ func (plugin *NetPlugin) Delete(args *cniSkel.CmdArgs) error {
|
|||
}
|
||||
|
||||
plugin.setCNIReportDetails(nwCfg, CNI_DEL, "")
|
||||
plugin.report.ContainerName = k8sPodName + ":" + k8sNamespace
|
||||
|
||||
iptables.DisableIPTableLock = nwCfg.DisableIPTableLock
|
||||
|
||||
sendMetricFunc := func() {
|
||||
|
@ -890,6 +906,7 @@ func (plugin *NetPlugin) Delete(args *cniSkel.CmdArgs) error {
|
|||
cniMetric.Metric = aitelemetry.Metric{
|
||||
Name: telemetry.CNIDelTimeMetricStr,
|
||||
Value: float64(operationTimeMs),
|
||||
AppVersion: plugin.Version,
|
||||
CustomDimensions: make(map[string]string),
|
||||
}
|
||||
SetCustomDimensions(&cniMetric, nwCfg, err)
|
||||
|
@ -957,7 +974,7 @@ func (plugin *NetPlugin) Delete(args *cniSkel.CmdArgs) error {
|
|||
// attempt to release address associated with this Endpoint id
|
||||
// This is to ensure clean up is done even in failure cases
|
||||
log.Printf("[cni-net] Failed to query endpoint %s: %v", endpointID, err)
|
||||
telemetry.LogAndSendEvent(plugin.tb, fmt.Sprintf("Release ip by ContainerID (endpoint not found):%v", args.ContainerID))
|
||||
logAndSendEvent(plugin, fmt.Sprintf("Release ip by ContainerID (endpoint not found):%v", args.ContainerID))
|
||||
if err = plugin.ipamInvoker.Delete(nil, nwCfg, args, nwInfo.Options); err != nil {
|
||||
return plugin.RetriableError(fmt.Errorf("failed to release address(no endpoint): %w", err))
|
||||
}
|
||||
|
@ -970,7 +987,7 @@ func (plugin *NetPlugin) Delete(args *cniSkel.CmdArgs) error {
|
|||
|
||||
// schedule send metric before attempting delete
|
||||
defer sendMetricFunc()
|
||||
telemetry.LogAndSendEvent(plugin.tb, fmt.Sprintf("Deleting endpoint:%v", endpointID))
|
||||
logAndSendEvent(plugin, fmt.Sprintf("Deleting endpoint:%v", endpointID))
|
||||
// Delete the endpoint.
|
||||
if err = plugin.nm.DeleteEndpoint(networkID, endpointID); err != nil {
|
||||
// return a retriable error so the container runtime will retry this DEL later
|
||||
|
@ -982,7 +999,7 @@ func (plugin *NetPlugin) Delete(args *cniSkel.CmdArgs) error {
|
|||
if !nwCfg.MultiTenancy {
|
||||
// Call into IPAM plugin to release the endpoint's addresses.
|
||||
for _, address := range epInfo.IPAddresses {
|
||||
telemetry.LogAndSendEvent(plugin.tb, fmt.Sprintf("Release ip:%s", address.IP.String()))
|
||||
logAndSendEvent(plugin, fmt.Sprintf("Release ip:%s", address.IP.String()))
|
||||
err = plugin.ipamInvoker.Delete(&address, nwCfg, args, nwInfo.Options)
|
||||
if err != nil {
|
||||
return plugin.RetriableError(fmt.Errorf("failed to release address: %w", err))
|
||||
|
@ -997,8 +1014,7 @@ func (plugin *NetPlugin) Delete(args *cniSkel.CmdArgs) error {
|
|||
}
|
||||
}
|
||||
|
||||
plugin.setCNIReportDetails(nwCfg, CNI_DEL, msg)
|
||||
telemetry.SendCNIEvent(plugin.tb, fmt.Sprintf("CNI DEL succeeded : Released ip %+v podname %v namespace %v", nwCfg.Ipam.Address, k8sPodName, k8sNamespace))
|
||||
sendEvent(plugin, fmt.Sprintf("CNI DEL succeeded : Released ip %+v podname %v namespace %v", nwCfg.Ipam.Address, k8sPodName, k8sNamespace))
|
||||
|
||||
return err
|
||||
}
|
||||
|
@ -1038,6 +1054,7 @@ func (plugin *NetPlugin) Update(args *cniSkel.CmdArgs) error {
|
|||
cniMetric.Metric = aitelemetry.Metric{
|
||||
Name: telemetry.CNIUpdateTimeMetricStr,
|
||||
Value: float64(operationTimeMs),
|
||||
AppVersion: plugin.Version,
|
||||
CustomDimensions: make(map[string]string),
|
||||
}
|
||||
SetCustomDimensions(&cniMetric, nwCfg, err)
|
||||
|
|
|
@ -152,6 +152,7 @@ func rootExecute() error {
|
|||
SystemDetails: telemetry.SystemInfo{},
|
||||
InterfaceDetails: telemetry.InterfaceInfo{},
|
||||
BridgeDetails: telemetry.BridgeInfo{},
|
||||
Version: version,
|
||||
},
|
||||
}
|
||||
|
||||
|
|
|
@ -3,6 +3,7 @@ package main
|
|||
// Entry point of the telemetry service if started by CNI
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"runtime"
|
||||
|
@ -189,7 +190,7 @@ func main() {
|
|||
err = telemetry.CreateAITelemetryHandle(aiConfig, config.DisableAll, config.DisableTrace, config.DisableMetric)
|
||||
log.Printf("[Telemetry] AI Handle creation status:%v", err)
|
||||
log.Logf("[Telemetry] Report to host for an interval of %d seconds", config.ReportToHostIntervalInSeconds)
|
||||
tb.PushData()
|
||||
tb.PushData(context.Background())
|
||||
telemetry.CloseAITelemetryHandle()
|
||||
|
||||
log.Close()
|
||||
|
|
|
@ -46,6 +46,7 @@ import (
|
|||
"github.com/Azure/azure-container-networking/processlock"
|
||||
localtls "github.com/Azure/azure-container-networking/server/tls"
|
||||
"github.com/Azure/azure-container-networking/store"
|
||||
"github.com/Azure/azure-container-networking/telemetry"
|
||||
"github.com/avast/retry-go/v3"
|
||||
"github.com/pkg/errors"
|
||||
"go.uber.org/zap"
|
||||
|
@ -266,6 +267,13 @@ var args = acn.ArgumentList{
|
|||
Type: "string",
|
||||
DefaultValue: "",
|
||||
},
|
||||
{
|
||||
Name: acn.OptTelemetryService,
|
||||
Shorthand: acn.OptTelemetryServiceAlias,
|
||||
Description: "Flag to start telemetry service to receive telemetry events from CNI. Default, disabled.",
|
||||
Type: "bool",
|
||||
DefaultValue: false,
|
||||
},
|
||||
}
|
||||
|
||||
// init() is executed before main() whenever this package is imported
|
||||
|
@ -367,6 +375,28 @@ func sendRegisterNodeRequest(httpc *http.Client, httpRestService cns.HTTPService
|
|||
return nil
|
||||
}
|
||||
|
||||
func startTelemetryService(ctx context.Context) {
|
||||
var config aitelemetry.AIConfig
|
||||
|
||||
err := telemetry.CreateAITelemetryHandle(config, false, false, false)
|
||||
if err != nil {
|
||||
log.Errorf("AI telemetry handle creation failed..:%w", err)
|
||||
return
|
||||
}
|
||||
|
||||
tbtemp := telemetry.NewTelemetryBuffer()
|
||||
//nolint:errcheck // best effort to cleanup leaked pipe/socket before start
|
||||
tbtemp.Cleanup(telemetry.FdName)
|
||||
|
||||
tb := telemetry.NewTelemetryBuffer()
|
||||
err = tb.StartServer()
|
||||
if err != nil {
|
||||
log.Errorf("Telemetry service failed to start: %w", err)
|
||||
return
|
||||
}
|
||||
tb.PushData(rootCtx)
|
||||
}
|
||||
|
||||
// Main is the entry point for CNS.
|
||||
func main() {
|
||||
// Initialize and parse command line arguments.
|
||||
|
@ -396,6 +426,7 @@ func main() {
|
|||
clientDebugCmd := acn.GetArg(acn.OptDebugCmd).(string)
|
||||
clientDebugArg := acn.GetArg(acn.OptDebugArg).(string)
|
||||
cmdLineConfigPath := acn.GetArg(acn.OptCNSConfigPath).(string)
|
||||
telemetryDaemonEnabled := acn.GetArg(acn.OptTelemetryService).(bool)
|
||||
|
||||
if vers {
|
||||
printVersion()
|
||||
|
@ -475,6 +506,10 @@ func main() {
|
|||
logger.InitAI(aiConfig, ts.DisableTrace, ts.DisableMetric, ts.DisableEvent)
|
||||
}
|
||||
|
||||
if telemetryDaemonEnabled {
|
||||
go startTelemetryService(rootCtx)
|
||||
}
|
||||
|
||||
// Log platform information.
|
||||
logger.Printf("Running on %v", platform.GetOSInfo())
|
||||
|
||||
|
|
|
@ -87,6 +87,10 @@ const (
|
|||
OptTelemetry = "telemetry"
|
||||
OptTelemetryAlias = "dt"
|
||||
|
||||
// Enable Telemetry service
|
||||
OptTelemetryService = "telemetry-service"
|
||||
OptTelemetryServiceAlias = "ts"
|
||||
|
||||
// HTTP connection timeout
|
||||
OptHttpConnectionTimeout = "http-connection-timeout"
|
||||
OptHttpConnectionTimeoutAlias = "httpcontimeout"
|
||||
|
|
|
@ -101,9 +101,9 @@ type apipaClient interface {
|
|||
}
|
||||
|
||||
func (epInfo *EndpointInfo) PrettyString() string {
|
||||
return fmt.Sprintf("Id:%s ContainerID:%s NetNsPath:%s IfName:%s IfIndex:%d MacAddr:%s IPAddrs:%v Gateways:%v",
|
||||
return fmt.Sprintf("Id:%s ContainerID:%s NetNsPath:%s IfName:%s IfIndex:%d MacAddr:%s IPAddrs:%v Gateways:%v Data:%+v",
|
||||
epInfo.Id, epInfo.ContainerID, epInfo.NetNsPath, epInfo.IfName, epInfo.IfIndex, epInfo.MacAddress.String(), epInfo.IPAddresses,
|
||||
epInfo.Gateways)
|
||||
epInfo.Gateways, epInfo.Data)
|
||||
}
|
||||
|
||||
// NewEndpoint creates a new endpoint in the network.
|
||||
|
|
|
@ -318,7 +318,7 @@ func (nw *network) newEndpointImplHnsV2(cli apipaClient, epInfo *EndpointInfo) (
|
|||
}
|
||||
|
||||
// Create the HCN endpoint.
|
||||
log.Printf("[net] Creating hcn endpoint: %+v", hcnEndpoint)
|
||||
log.Printf("[net] Creating hcn endpoint: %s computenetwork:%s", hcnEndpoint.Name, hcnEndpoint.HostComputeNetwork)
|
||||
hnsResponse, err := Hnsv2.CreateEndpoint(hcnEndpoint)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to create endpoint: %s due to error: %v", hcnEndpoint.Name, err)
|
||||
|
|
|
@ -205,9 +205,8 @@ func (nm *networkManager) restore(isRehydrationRequired bool) error {
|
|||
}
|
||||
}
|
||||
|
||||
log.Printf("[net] Restored state, %+v\n", nm)
|
||||
log.Printf("[net] Restored state")
|
||||
for _, extIf := range nm.ExternalInterfaces {
|
||||
log.Printf("External Interface %+v", extIf)
|
||||
for _, nw := range extIf.Networks {
|
||||
log.Printf("Number of endpoints: %d", len(nw.Endpoints))
|
||||
}
|
||||
|
|
|
@ -54,6 +54,7 @@ func SendAITelemetry(cnireport CNIReport) {
|
|||
report := aitelemetry.Report{
|
||||
Message: msg,
|
||||
Context: cnireport.ContainerName,
|
||||
AppVersion: cnireport.Version,
|
||||
CustomDimensions: make(map[string]string),
|
||||
}
|
||||
|
||||
|
|
|
@ -5,8 +5,6 @@ package telemetry
|
|||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"github.com/Azure/azure-container-networking/aitelemetry"
|
||||
"github.com/Azure/azure-container-networking/common"
|
||||
|
@ -157,26 +155,13 @@ func SendCNIMetric(cniMetric *AIMetric, tb *TelemetryBuffer) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// This function for sending CNI metrics to telemetry service
|
||||
func LogAndSendEvent(tb *TelemetryBuffer, msg string) {
|
||||
log.Printf(msg)
|
||||
SendCNIEvent(tb, msg)
|
||||
}
|
||||
|
||||
func SendCNIEvent(tb *TelemetryBuffer, msg string) {
|
||||
var err error
|
||||
var report []byte
|
||||
|
||||
eventMsg := fmt.Sprintf("[%d] %s", os.Getpid(), msg)
|
||||
cniReport := &CNIReport{
|
||||
EventMessage: eventMsg,
|
||||
}
|
||||
func SendCNIEvent(tb *TelemetryBuffer, report *CNIReport) {
|
||||
if tb != nil && tb.Connected {
|
||||
reportMgr := &ReportManager{Report: cniReport}
|
||||
report, err = reportMgr.ReportToBytes()
|
||||
reportMgr := &ReportManager{Report: report}
|
||||
reportBytes, err := reportMgr.ReportToBytes()
|
||||
if err == nil {
|
||||
// If write fails, try to re-establish connections as server/client
|
||||
if _, err = tb.Write(report); err != nil {
|
||||
if _, err = tb.Write(reportBytes); err != nil {
|
||||
log.Printf("Error writing to telemetry socket:%v", err)
|
||||
tb.Cancel()
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@ package telemetry
|
|||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net"
|
||||
|
@ -17,8 +18,6 @@ import (
|
|||
"github.com/Azure/azure-container-networking/common"
|
||||
"github.com/Azure/azure-container-networking/log"
|
||||
"github.com/Azure/azure-container-networking/platform"
|
||||
"github.com/Azure/azure-container-networking/processlock"
|
||||
"github.com/Azure/azure-container-networking/store"
|
||||
)
|
||||
|
||||
// TelemetryConfig - telemetry config read by telemetry service
|
||||
|
@ -171,7 +170,9 @@ func (tb *TelemetryBuffer) Connect() error {
|
|||
}
|
||||
|
||||
// PushData - PushData running an instance if it isn't already being run elsewhere
|
||||
func (tb *TelemetryBuffer) PushData() {
|
||||
func (tb *TelemetryBuffer) PushData(ctx context.Context) {
|
||||
defer tb.Close()
|
||||
|
||||
for {
|
||||
select {
|
||||
case report := <-tb.data:
|
||||
|
@ -180,12 +181,12 @@ func (tb *TelemetryBuffer) PushData() {
|
|||
tb.mutex.Unlock()
|
||||
case <-tb.cancel:
|
||||
log.Logf("[Telemetry] server cancel event")
|
||||
goto EXIT
|
||||
return
|
||||
case <-ctx.Done():
|
||||
log.Logf("[Telemetry] received context done event")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
EXIT:
|
||||
tb.Close()
|
||||
}
|
||||
|
||||
// read - read from the file descriptor
|
||||
|
@ -245,43 +246,8 @@ func (tb *TelemetryBuffer) Close() {
|
|||
|
||||
// push - push the report (x) to corresponding slice
|
||||
func push(x interface{}) {
|
||||
metadata, err := common.GetHostMetadata(metadataFile)
|
||||
if err != nil {
|
||||
log.Logf("Error getting metadata %v", err)
|
||||
|
||||
var lockclient processlock.Interface
|
||||
lockclient, err = processlock.NewFileLock(metadataFile + store.LockExtension)
|
||||
if err != nil {
|
||||
log.Printf("Error initializing file lock:%v", err)
|
||||
return
|
||||
}
|
||||
|
||||
var kvs store.KeyValueStore
|
||||
kvs, err = store.NewJsonFileStore(metadataFile, lockclient)
|
||||
if err != nil {
|
||||
log.Printf("Error acuiring lock for writing metadata file: %v", err)
|
||||
}
|
||||
|
||||
err = kvs.Lock(store.DefaultLockTimeout)
|
||||
if err != nil {
|
||||
log.Errorf("push: Not able to acquire lock:%v", err)
|
||||
return
|
||||
}
|
||||
|
||||
err = common.SaveHostMetadata(metadata, metadataFile)
|
||||
if err != nil {
|
||||
log.Logf("saving host metadata failed with :%v", err)
|
||||
}
|
||||
|
||||
err = kvs.Unlock()
|
||||
if err != nil {
|
||||
log.Errorf("push: Not able to release lock:%v", err)
|
||||
}
|
||||
}
|
||||
|
||||
switch y := x.(type) {
|
||||
case CNIReport:
|
||||
y.Metadata = metadata
|
||||
SendAITelemetry(y)
|
||||
|
||||
case AIMetric:
|
||||
|
|
Загрузка…
Ссылка в новой задаче