AppInsightTelemetry support for CNI (#458)

* Added AITelemetry support for CNI

* added new files

* added other configs in config file

* fixed ut

* updated disableall similar to cns

* added container name to report

* addressed review comments

* addressed review comments

* added check for azure environment

* added log

* close log handle in unit test

* addressed review comments

* addressed review comments

* fixed a condition

* keep the netagent channel for logs

* fixed error

* addressed review comments
This commit is contained in:
tamilmani1989 2020-01-14 14:53:24 -08:00 коммит произвёл GitHub
Родитель a59a9aa92b
Коммит 2d619b78fe
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
14 изменённых файлов: 468 добавлений и 60 удалений

Просмотреть файл

@ -78,6 +78,8 @@ CNI_MULTITENANCY_BUILD_DIR = $(BUILD_DIR)/cni-multitenancy
CNS_BUILD_DIR = $(BUILD_DIR)/cns
NPM_BUILD_DIR = $(BUILD_DIR)/npm
NPM_TELEMETRY_DIR = $(NPM_BUILD_DIR)/telemetry
CNI_AI_ID = 5515a1eb-b2bc-406a-98eb-ba462e6f0411
ACN_PACKAGE_PATH = github.com/Azure/azure-container-networking
# Containerized build parameters.
BUILD_CONTAINER_IMAGE = acn-build
@ -169,7 +171,7 @@ $(CNI_BUILD_DIR)/azure-vnet-ipam$(EXE_EXT): $(CNIFILES)
# Build the Azure CNI telemetry plugin.
$(CNI_BUILD_DIR)/azure-vnet-telemetry$(EXE_EXT): $(CNIFILES)
go build -v -o $(CNI_BUILD_DIR)/azure-vnet-telemetry$(EXE_EXT) -ldflags "-X main.version=$(VERSION) -s -w" $(CNI_TELEMETRY_DIR)/*.go
go build -v -o $(CNI_BUILD_DIR)/azure-vnet-telemetry$(EXE_EXT) -ldflags "-X main.version=$(VERSION) -X $(ACN_PACKAGE_PATH)/telemetry.aiMetadata=$(CNI_AI_ID) -s -w" $(CNI_TELEMETRY_DIR)/*.go
# Build the Azure CNS Service.
$(CNS_BUILD_DIR)/azure-cns$(EXE_EXT): $(CNSFILES)

Просмотреть файл

@ -28,6 +28,8 @@ type AIConfig struct {
BatchInterval int
DisableMetadataRefreshThread bool
RefreshTimeout int
GetEnvRetryCount int
GetEnvRetryWaitTimeInSecs int
DebugMode bool
}

Просмотреть файл

@ -1,6 +1,7 @@
package aitelemetry
import (
"fmt"
"runtime"
"time"
@ -18,6 +19,8 @@ const (
appNameStr = "AppName"
subscriptionIDStr = "SubscriptionID"
vmNameStr = "VMName"
versionStr = "AppVersion"
azurePublicCloudStr = "AzurePublicCloud"
defaultTimeout = 10
)
@ -26,7 +29,7 @@ var debugMode bool
func messageListener() appinsights.DiagnosticsMessageListener {
if debugMode {
return appinsights.NewDiagnosticsMessageListener(func(msg string) error {
debuglog("[AppInsights] [%s] %s\n", time.Now().Format(time.UnixDate), msg)
debugLog("[AppInsights] [%s] %s\n", time.Now().Format(time.UnixDate), msg)
return nil
})
}
@ -34,7 +37,7 @@ func messageListener() appinsights.DiagnosticsMessageListener {
return nil
}
func debuglog(format string, args ...interface{}) {
func debugLog(format string, args ...interface{}) {
if debugMode {
log.Printf(format, args...)
}
@ -55,12 +58,12 @@ func getMetadata(th *telemetryHandle) {
break
}
debuglog("[AppInsights] Error getting metadata %v. Sleep for %d", err, th.refreshTimeout)
debugLog("[AppInsights] Error getting metadata %v. Sleep for %d", err, th.refreshTimeout)
time.Sleep(time.Duration(th.refreshTimeout) * time.Second)
}
if err != nil {
debuglog("[AppInsights] Error getting metadata %v", err)
debugLog("[AppInsights] Error getting metadata %v", err)
return
}
@ -72,7 +75,7 @@ func getMetadata(th *telemetryHandle) {
// Save metadata retrieved from wireserver to a file
kvs, err := store.NewJsonFileStore(metadataFile)
if err != nil {
debuglog("[AppInsights] Error initializing kvs store: %v", err)
debugLog("[AppInsights] Error initializing kvs store: %v", err)
return
}
@ -80,20 +83,55 @@ func getMetadata(th *telemetryHandle) {
err = common.SaveHostMetadata(th.metadata, metadataFile)
kvs.Unlock(true)
if err != nil {
debuglog("[AppInsights] saving host metadata failed with :%v", err)
debugLog("[AppInsights] saving host metadata failed with :%v", err)
}
}
func isPublicEnvironment(url string, retryCount, waitTimeInSecs int) (bool, error) {
var (
cloudName string
err error
)
for i := 0; i < retryCount; i++ {
cloudName, err = common.GetAzureCloud(url)
if cloudName == azurePublicCloudStr {
debugLog("[AppInsights] CloudName: %s\n", cloudName)
return true, nil
} else if err == nil {
debugLog("[AppInsights] This is not azure public cloud:%s", cloudName)
return false, fmt.Errorf("Not an azure public cloud: %s", cloudName)
}
debugLog("GetAzureCloud returned err :%v", err)
time.Sleep(time.Duration(waitTimeInSecs) * time.Second)
}
return false, err
}
// NewAITelemetry creates telemetry handle with user specified appinsights id.
func NewAITelemetry(
azEnvUrl string,
id string,
aiConfig AIConfig,
) TelemetryHandle {
) (TelemetryHandle, error) {
debugMode = aiConfig.DebugMode
if id == "" {
debugLog("Empty AI key")
return nil, fmt.Errorf("AI key is empty")
}
// check if azure instance is in public cloud
isPublic, err := isPublicEnvironment(azEnvUrl, aiConfig.GetEnvRetryCount, aiConfig.GetEnvRetryWaitTimeInSecs)
if !isPublic {
return nil, err
}
telemetryConfig := appinsights.NewTelemetryConfiguration(id)
telemetryConfig.MaxBatchSize = aiConfig.BatchSize
telemetryConfig.MaxBatchInterval = time.Duration(aiConfig.BatchInterval) * time.Second
debugMode = aiConfig.DebugMode
th := &telemetryHandle{
client: appinsights.NewTelemetryClientFromConfig(telemetryConfig),
@ -110,7 +148,7 @@ func NewAITelemetry(
go getMetadata(th)
}
return th
return th, nil
}
// TrackLog function sends report (trace) to appinsights resource. It overrides few of the existing columns with app information
@ -167,6 +205,7 @@ func (th *telemetryHandle) TrackMetric(metric Metric) {
aimetric.Properties[locationStr] = th.metadata.Location
aimetric.Properties[subscriptionIDStr] = th.metadata.SubscriptionID
aimetric.Properties[vmNameStr] = th.metadata.VMName
aimetric.Properties[versionStr] = th.appVersion
}
// copy custom dimensions

Просмотреть файл

@ -2,17 +2,33 @@ package aitelemetry
import (
"fmt"
"net/http"
"net/url"
"os"
"path/filepath"
"runtime"
"testing"
"github.com/Azure/azure-container-networking/common"
"github.com/Azure/azure-container-networking/log"
"github.com/Azure/azure-container-networking/platform"
)
var th TelemetryHandle
var (
th TelemetryHandle
hostAgentUrl = "localhost:3501"
getCloudResponse = "AzurePublicCloud"
httpURL = "http://" + hostAgentUrl
)
func TestMain(m *testing.M) {
log.SetLogDirectory("/var/log/")
log.SetName("testaitelemetry")
log.SetLevel(log.LevelInfo)
err := log.SetTarget(log.TargetLogfile)
if err == nil {
fmt.Printf("TestST LogDir configuration succeeded\n")
}
if runtime.GOOS == "linux" {
platform.ExecuteCommand("cp metadata_test.json /tmp/azuremetadata.json")
@ -22,6 +38,20 @@ func TestMain(m *testing.M) {
platform.ExecuteCommand(cmd)
}
hostu, _ := url.Parse("tcp://" + hostAgentUrl)
hostAgent, err := common.NewListener(hostu)
if err != nil {
fmt.Printf("Failed to create agent, err:%v.\n", err)
return
}
hostAgent.AddHandler("/", handleGetCloud)
err = hostAgent.Start(make(chan error, 1))
if err != nil {
fmt.Printf("Failed to start agent, err:%v.\n", err)
return
}
exitCode := m.Run()
if runtime.GOOS == "linux" {
@ -32,10 +62,17 @@ func TestMain(m *testing.M) {
platform.ExecuteCommand(cmd)
}
log.Close()
os.Exit(exitCode)
}
func handleGetCloud(w http.ResponseWriter, req *http.Request) {
w.Write([]byte(getCloudResponse))
}
func TestEmptyAIKey(t *testing.T) {
var err error
aiConfig := AIConfig{
AppName: "testapp",
AppVersion: "v1.0.26",
@ -45,26 +82,29 @@ func TestEmptyAIKey(t *testing.T) {
DebugMode: true,
DisableMetadataRefreshThread: true,
}
th := NewAITelemetry("", aiConfig)
if th == nil {
t.Errorf("Error intializing AI telemetry")
_, err = NewAITelemetry(httpURL, "", aiConfig)
if err == nil {
t.Errorf("Error intializing AI telemetry:%v", err)
}
th.Close(10)
}
func TestNewAITelemetry(t *testing.T) {
var err error
aiConfig := AIConfig{
AppName: "testapp",
AppVersion: "v1.0.26",
BatchSize: 4096,
BatchInterval: 2,
RefreshTimeout: 10,
GetEnvRetryCount: 1,
GetEnvRetryWaitTimeInSecs: 2,
DebugMode: true,
DisableMetadataRefreshThread: true,
}
th = NewAITelemetry("00ca2a73-c8d6-4929-a0c2-cf84545ec225", aiConfig)
th, err = NewAITelemetry(httpURL, "00ca2a73-c8d6-4929-a0c2-cf84545ec225", aiConfig)
if th == nil {
t.Errorf("Error intializing AI telemetry")
t.Errorf("Error intializing AI telemetry: %v", err)
}
}
@ -95,6 +135,8 @@ func TestClose(t *testing.T) {
}
func TestClosewithoutSend(t *testing.T) {
var err error
aiConfig := AIConfig{
AppName: "testapp",
AppVersion: "v1.0.26",
@ -102,11 +144,13 @@ func TestClosewithoutSend(t *testing.T) {
BatchInterval: 2,
DisableMetadataRefreshThread: true,
RefreshTimeout: 10,
GetEnvRetryCount: 1,
GetEnvRetryWaitTimeInSecs: 2,
}
thtest := NewAITelemetry("00ca2a73-c8d6-4929-a0c2-cf84545ec225", aiConfig)
thtest, err := NewAITelemetry(httpURL, "00ca2a73-c8d6-4929-a0c2-cf84545ec225", aiConfig)
if thtest == nil {
t.Errorf("Error intializing AI telemetry")
t.Errorf("Error intializing AI telemetry:%v", err)
}
thtest.Close(10)

Просмотреть файл

@ -13,6 +13,7 @@ import (
"strings"
"time"
"github.com/Azure/azure-container-networking/aitelemetry"
"github.com/Azure/azure-container-networking/cni"
"github.com/Azure/azure-container-networking/cns"
"github.com/Azure/azure-container-networking/cns/cnsclient"
@ -61,6 +62,7 @@ type netPlugin struct {
*cni.Plugin
nm network.NetworkManager
report *telemetry.CNIReport
tb *telemetry.TelemetryBuffer
}
// snatConfiguration contains a bool that determines whether CNI enables snat on host and snat for dns
@ -91,8 +93,9 @@ func NewPlugin(name string, config *common.PluginConfig) (*netPlugin, error) {
}, nil
}
func (plugin *netPlugin) SetCNIReport(report *telemetry.CNIReport) {
func (plugin *netPlugin) SetCNIReport(report *telemetry.CNIReport, tb *telemetry.TelemetryBuffer) {
plugin.report = report
plugin.tb = tb
}
// Starts the plugin.
@ -188,6 +191,29 @@ func (plugin *netPlugin) getPodInfo(args string) (string, string, error) {
return k8sPodName, k8sNamespace, nil
}
func SetCustomDimensions(cniMetric *telemetry.AIMetric, nwCfg *cni.NetworkConfig, err error) {
if cniMetric == nil {
log.Errorf("[CNI] Unable to set custom dimension. Report is nil")
return
}
if err != nil {
cniMetric.Metric.CustomDimensions[telemetry.StatusStr] = telemetry.FailedStr
} else {
cniMetric.Metric.CustomDimensions[telemetry.StatusStr] = telemetry.SucceededStr
}
if nwCfg != nil {
if nwCfg.MultiTenancy {
cniMetric.Metric.CustomDimensions[telemetry.CNIModeStr] = telemetry.MultiTenancyStr
} else {
cniMetric.Metric.CustomDimensions[telemetry.CNIModeStr] = telemetry.SingleTenancyStr
}
cniMetric.Metric.CustomDimensions[telemetry.CNINetworkModeStr] = nwCfg.Mode
}
}
func (plugin *netPlugin) setCNIReportDetails(nwCfg *cni.NetworkConfig, opType string, msg string) {
if nwCfg.MultiTenancy {
plugin.report.Context = "AzureCNIMultitenancy"
@ -220,8 +246,11 @@ func (plugin *netPlugin) Add(args *cniSkel.CmdArgs) error {
enableInfraVnet bool
enableSnatForDns bool
nwDNSInfo network.DNSInfo
cniMetric telemetry.AIMetric
)
startTime := time.Now()
log.Printf("[cni-net] Processing ADD command with args {ContainerID:%v Netns:%v IfName:%v Args:%v Path:%v StdinData:%s}.",
args.ContainerID, args.Netns, args.IfName, args.Args, args.Path, args.StdinData)
@ -245,6 +274,15 @@ func (plugin *netPlugin) Add(args *cniSkel.CmdArgs) error {
plugin.setCNIReportDetails(nwCfg, CNI_ADD, "")
defer func() {
operationTimeMs := time.Since(startTime).Milliseconds()
cniMetric.Metric = aitelemetry.Metric{
Name: telemetry.CNIAddTimeMetricStr,
Value: float64(operationTimeMs),
CustomDimensions: make(map[string]string),
}
SetCustomDimensions(&cniMetric, nwCfg, err)
telemetry.SendCNIMetric(&cniMetric, plugin.tb)
// Add Interfaces to result.
if result == nil {
result = &cniTypesCurr.Result{}
@ -255,9 +293,7 @@ func (plugin *netPlugin) Add(args *cniSkel.CmdArgs) error {
}
result.Interfaces = append(result.Interfaces, iface)
addSnatInterface(nwCfg, result)
// Convert result to the requested CNI version.
res, vererr := result.GetAsVersion(nwCfg.CNIVersion)
if vererr != nil {
@ -279,6 +315,8 @@ func (plugin *netPlugin) Add(args *cniSkel.CmdArgs) error {
return err
}
plugin.report.ContainerName = k8sPodName + ":" + k8sNamespace
if nwCfg.MultiTenancy {
// Initialize CNSClient
cnsclient.InitCnsClient(nwCfg.CNSUrl)
@ -655,8 +693,11 @@ func (plugin *netPlugin) Delete(args *cniSkel.CmdArgs) error {
networkId string
nwInfo *network.NetworkInfo
epInfo *network.EndpointInfo
cniMetric telemetry.AIMetric
)
startTime := time.Now()
log.Printf("[cni-net] Processing DEL command with args {ContainerID:%v Netns:%v IfName:%v Args:%v Path:%v, StdinData:%s}.",
args.ContainerID, args.Netns, args.IfName, args.Args, args.Path, args.StdinData)
@ -736,6 +777,15 @@ func (plugin *netPlugin) Delete(args *cniSkel.CmdArgs) error {
msg := fmt.Sprintf("CNI DEL succeeded : Released ip %+v podname %v namespace %v", nwCfg.Ipam.Address, k8sPodName, k8sNamespace)
plugin.setCNIReportDetails(nwCfg, CNI_DEL, msg)
operationTimeMs := time.Since(startTime).Milliseconds()
cniMetric.Metric = aitelemetry.Metric{
Name: telemetry.CNIDelTimeMetricStr,
Value: float64(operationTimeMs),
CustomDimensions: make(map[string]string),
}
SetCustomDimensions(&cniMetric, nwCfg, nil)
telemetry.SendCNIMetric(&cniMetric, plugin.tb)
return nil
}
@ -751,8 +801,11 @@ func (plugin *netPlugin) Update(args *cniSkel.CmdArgs) error {
cnsClient *cnsclient.CNSClient
orchestratorContext []byte
targetNetworkConfig *cns.GetNetworkContainerResponse
cniMetric telemetry.AIMetric
)
startTime := time.Now()
log.Printf("[cni-net] Processing UPDATE command with args {Netns:%v Args:%v Path:%v}.",
args.Netns, args.Args, args.Path)
@ -768,6 +821,15 @@ func (plugin *netPlugin) Update(args *cniSkel.CmdArgs) error {
plugin.setCNIReportDetails(nwCfg, CNI_UPDATE, "")
defer func() {
operationTimeMs := time.Since(startTime).Milliseconds()
cniMetric.Metric = aitelemetry.Metric{
Name: telemetry.CNIUpdateTimeMetricStr,
Value: float64(operationTimeMs),
CustomDimensions: make(map[string]string),
}
SetCustomDimensions(&cniMetric, nwCfg, err)
telemetry.SendCNIMetric(&cniMetric, plugin.tb)
if result == nil {
result = &cniTypesCurr.Result{}
}

Просмотреть файл

@ -11,6 +11,7 @@ import (
"reflect"
"time"
"github.com/Azure/azure-container-networking/aitelemetry"
"github.com/Azure/azure-container-networking/cni"
"github.com/Azure/azure-container-networking/cni/network"
"github.com/Azure/azure-container-networking/common"
@ -126,6 +127,8 @@ func handleIfCniUpdate(update func(*skel.CmdArgs) error) (bool, error) {
// Main is the entry point for CNI network plugin.
func main() {
startTime := time.Now()
// Initialize and parse command line arguments.
acn.ParseArgs(&args, printVersion)
vers := acn.GetArg(acn.OptVersion).(bool)
@ -138,6 +141,7 @@ func main() {
var (
config common.PluginConfig
err error
cnimetric telemetry.AIMetric
)
log.SetName(name)
@ -169,7 +173,6 @@ func main() {
}
cniReport.GetReport(pluginName, version, ipamQueryURL)
startTime := time.Now().UnixNano() / int64(time.Millisecond)
netPlugin, err := network.NewPlugin(name, &config)
if err != nil {
@ -177,8 +180,6 @@ func main() {
return
}
netPlugin.SetCNIReport(cniReport)
// CNI Acquires lock
if err = netPlugin.Plugin.InitializeKeyValueStore(&config); err != nil {
log.Errorf("Failed to initialize key-value store of network plugin, err:%v.\n", err)
@ -214,6 +215,8 @@ func main() {
tb.ConnectToTelemetryService(telemetryNumRetries, telemetryWaitTimeInMilliseconds)
defer tb.Close()
netPlugin.SetCNIReport(cniReport, tb)
t := time.Now()
cniReport.Timestamp = t.Format("2006-01-02 15:04:05")
@ -230,9 +233,6 @@ func main() {
log.Errorf("Failed to execute network plugin, err:%v.\n", err)
}
endTime := time.Now().UnixNano() / int64(time.Millisecond)
reflect.ValueOf(reportManager.Report).Elem().FieldByName("OperationDuration").SetInt(int64(endTime - startTime))
netPlugin.Stop()
// release cni lock
@ -240,6 +240,15 @@ func main() {
log.Errorf("Failed to uninitialize key-value store of network plugin, err:%v.\n", errUninit)
}
executionTimeMs := time.Since(startTime).Milliseconds()
cnimetric.Metric = aitelemetry.Metric{
Name: telemetry.CNIExecutimeMetricStr,
Value: float64(executionTimeMs),
CustomDimensions: make(map[string]string),
}
network.SetCustomDimensions(&cnimetric, nil, err)
telemetry.SendCNIMetric(&cnimetric, tb)
if err != nil {
reportPluginError(reportManager, tb, err)
panic("network plugin execute fatal error")
@ -247,6 +256,7 @@ func main() {
// Report CNI successfully finished execution.
reflect.ValueOf(reportManager.Report).Elem().FieldByName("CniSucceeded").SetBool(true)
reflect.ValueOf(reportManager.Report).Elem().FieldByName("OperationDuration").SetInt(executionTimeMs)
if err = reportManager.SendReport(tb); err != nil {
log.Errorf("SendReport failed due to %v", err)

Просмотреть файл

@ -8,13 +8,20 @@ import (
"runtime"
"time"
"github.com/Azure/azure-container-networking/aitelemetry"
acn "github.com/Azure/azure-container-networking/common"
"github.com/Azure/azure-container-networking/log"
"github.com/Azure/azure-container-networking/telemetry"
)
const (
reportToHostIntervalInSeconds = 30
defaultReportToHostIntervalInSecs = 30
defaultRefreshTimeoutInSecs = 15
defaultBatchSizeInBytes = 16384
defaultBatchIntervalInSecs = 15
defaultGetEnvRetryCount = 2
defaultGetEnvRetryWaitTimeInSecs = 3
pluginName = "AzureCNI"
azureVnetTelemetry = "azure-vnet-telemetry"
configExtension = ".config"
)
@ -76,6 +83,32 @@ func printVersion() {
fmt.Printf("Version %v\n", version)
}
func setTelemetryDefaults(config *telemetry.TelemetryConfig) {
if config.ReportToHostIntervalInSeconds == 0 {
config.ReportToHostIntervalInSeconds = defaultReportToHostIntervalInSecs
}
if config.RefreshTimeoutInSecs == 0 {
config.RefreshTimeoutInSecs = defaultRefreshTimeoutInSecs
}
if config.BatchIntervalInSecs == 0 {
config.BatchIntervalInSecs = defaultBatchIntervalInSecs
}
if config.BatchSizeInBytes == 0 {
config.BatchSizeInBytes = defaultBatchSizeInBytes
}
if config.GetEnvRetryCount == 0 {
config.GetEnvRetryCount = defaultGetEnvRetryCount
}
if config.GetEnvRetryWaitTimeInSecs == 0 {
config.GetEnvRetryWaitTimeInSecs = defaultGetEnvRetryWaitTimeInSecs
}
}
func main() {
var tb *telemetry.TelemetryBuffer
var config telemetry.TelemetryConfig
@ -123,6 +156,10 @@ func main() {
log.Logf("read config returned %+v", config)
setTelemetryDefaults(&config)
log.Logf("Config after setting defaults %+v", config)
// Cleaning up orphan socket if present
tbtemp := telemetry.NewTelemetryBuffer("")
tbtemp.Cleanup(telemetry.FdName)
@ -131,7 +168,7 @@ func main() {
tb = telemetry.NewTelemetryBuffer("")
log.Logf("[Telemetry] Starting telemetry server")
err = tb.StartServer()
err = tb.StartServer(config.DisableTelemetryToNetAgent)
if err == nil || tb.FdExists {
break
}
@ -141,11 +178,23 @@ func main() {
time.Sleep(time.Millisecond * 200)
}
if config.ReportToHostIntervalInSeconds == 0 {
config.ReportToHostIntervalInSeconds = reportToHostIntervalInSeconds
aiConfig := aitelemetry.AIConfig{
AppName: pluginName,
AppVersion: version,
BatchSize: config.BatchSizeInBytes,
BatchInterval: config.BatchIntervalInSecs,
RefreshTimeout: config.RefreshTimeoutInSecs,
DisableMetadataRefreshThread: config.DisableMetadataThread,
DebugMode: config.DebugMode,
GetEnvRetryCount: config.GetEnvRetryCount,
GetEnvRetryWaitTimeInSecs: config.GetEnvRetryWaitTimeInSecs,
}
err = telemetry.CreateAITelemetryHandle(aiConfig, config.DisableAll, config.DisableTrace, config.DisableMetric)
log.Printf("[Telemetry] AI Handle creation status:%v", err)
log.Logf("[Telemetry] Report to host for an interval of %d seconds", config.ReportToHostIntervalInSeconds)
tb.BufferAndPushData(config.ReportToHostIntervalInSeconds * time.Second)
telemetry.CloseAITelemetryHandle()
log.Close()
}

Просмотреть файл

@ -14,6 +14,7 @@ import (
"net"
"net/http"
"os"
"strings"
"time"
"github.com/Azure/azure-container-networking/log"
@ -21,8 +22,9 @@ import (
const (
metadataURL = "http://169.254.169.254/metadata/instance?api-version=2017-08-01&format=json"
httpConnectionTimeout = 10
headerTimeout = 20
azCloudUrl = "http://169.254.169.254/metadata/instance/compute/azEnvironment?api-version=2018-10-01&format=text"
httpConnectionTimeout = 7
headerTimeout = 7
)
// XmlDocument - Azure host agent XML document format.
@ -288,3 +290,36 @@ func SaveHostMetadata(metadata Metadata, fileName string) error {
return err
}
func GetAzureCloud(url string) (string, error) {
if url == "" {
url = azCloudUrl
}
log.Printf("GetAzureCloud querying url: %s", url)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return "", err
}
req.Header.Set("Metadata", "True")
client := InitHttpClient(httpConnectionTimeout, headerTimeout)
resp, err := client.Do(req)
if err != nil {
return "", err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("Bad http status:%v", resp.Status)
}
bodyBytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
return "", err
}
return strings.TrimSpace(string(bodyBytes)), nil
}

82
telemetry/aiwrapper.go Normal file
Просмотреть файл

@ -0,0 +1,82 @@
// Copyright Microsoft. All rights reserved.
package telemetry
import (
"fmt"
"runtime"
"github.com/Azure/azure-container-networking/aitelemetry"
"github.com/Azure/azure-container-networking/log"
)
var (
aiMetadata string
th aitelemetry.TelemetryHandle
gDisableTrace bool
gDisableMetric bool
)
const (
// Wait time for AI to gracefully close AI telemetry session
waitTimeInSecs = 10
)
func CreateAITelemetryHandle(aiConfig aitelemetry.AIConfig, disableAll, disableMetric, disableTrace bool) error {
var err error
if disableAll {
log.Printf("Telemetry is disabled")
return fmt.Errorf("Telmetry disabled")
}
th, err = aitelemetry.NewAITelemetry("", aiMetadata, aiConfig)
if err != nil {
return err
}
gDisableMetric = disableMetric
gDisableTrace = disableTrace
return nil
}
func SendAITelemetry(cnireport CNIReport) {
if th == nil || gDisableTrace {
return
}
var msg string
if cnireport.ErrorMessage != "" {
msg = cnireport.ErrorMessage
} else {
msg = cnireport.EventMessage
}
report := aitelemetry.Report{
Message: "CNI:" + msg,
Context: cnireport.ContainerName,
CustomDimensions: make(map[string]string),
}
report.CustomDimensions[ContextStr] = cnireport.Context
report.CustomDimensions[SubContextStr] = cnireport.SubContext
report.CustomDimensions[VMUptimeStr] = cnireport.VMUptime
report.CustomDimensions[OperationTypeStr] = cnireport.OperationType
report.CustomDimensions[VersionStr] = cnireport.Version
th.TrackLog(report)
}
func SendAIMetric(aiMetric AIMetric) {
if th == nil || gDisableMetric {
return
}
aiMetric.Metric.CustomDimensions[OSTypeStr] = runtime.GOOS
th.TrackMetric(aiMetric.Metric)
}
func CloseAITelemetryHandle() {
if th != nil {
th.Close(waitTimeInSecs)
}
}

Просмотреть файл

@ -1,3 +1,8 @@
{
"reportToHostIntervalInSeconds": 30
"reportToHostIntervalInSeconds": 30,
"BatchSizeInBytes":16384,
"BatchIntervalInSecs":15,
"RefreshTimeoutInSecs": 15,
"DisableAll": false,
"DebugMode":false
}

29
telemetry/constants.go Normal file
Просмотреть файл

@ -0,0 +1,29 @@
// Copyright Microsoft. All rights reserved.
package telemetry
const (
// Metric Names
CNIExecutimeMetricStr = "CNIExecutionTimeMs"
CNIAddTimeMetricStr = "CNIAddTimeMs"
CNIDelTimeMetricStr = "CNIDelTimeMs"
CNIUpdateTimeMetricStr = "CNIUpdateTimeMs"
// Dimension Names
ContextStr = "Context"
SubContextStr = "SubContext"
VMUptimeStr = "VMUptime"
OperationTypeStr = "OperationType"
VersionStr = "Version"
StatusStr = "Status"
CNIModeStr = "CNIMode"
CNINetworkModeStr = "CNINetworkMode"
OSTypeStr = "OSType"
// Values
SucceededStr = "Succeeded"
FailedStr = "Failed"
SingleTenancyStr = "SingleTenancy"
MultiTenancyStr = "MultiTenancy"
)

Просмотреть файл

@ -14,6 +14,7 @@ import (
"reflect"
"strings"
"github.com/Azure/azure-container-networking/aitelemetry"
"github.com/Azure/azure-container-networking/common"
"github.com/Azure/azure-container-networking/log"
"github.com/Azure/azure-container-networking/platform"
@ -100,6 +101,10 @@ type CNIReport struct {
Metadata common.Metadata `json:"compute"`
}
type AIMetric struct {
Metric aitelemetry.Metric
}
// Azure CNS Telemetry Report structure.
type CNSReport struct {
IsNewInstance bool
@ -372,6 +377,7 @@ func (reportMgr *ReportManager) ReportToBytes() ([]byte, error) {
case *NPMReport:
case *DNCReport:
case *CNSReport:
case *AIMetric:
default:
err = fmt.Errorf("[Telemetry] Invalid report type")
}
@ -383,3 +389,24 @@ func (reportMgr *ReportManager) ReportToBytes() ([]byte, error) {
report, err = json.Marshal(reportMgr.Report)
return report, err
}
// This function for sending CNI metrics to telemetry service
func SendCNIMetric(cniMetric *AIMetric, tb *TelemetryBuffer) error {
var (
err error
report []byte
)
if tb != nil && tb.Connected {
reportMgr := &ReportManager{Report: cniMetric}
report, err = reportMgr.ReportToBytes()
if err == nil {
// If write fails, try to re-establish connections as server/client
if _, err = tb.Write(report); err != nil {
tb.Cancel()
}
}
}
return err
}

Просмотреть файл

@ -106,7 +106,7 @@ func TestMain(m *testing.M) {
reportManager.Report = &CNIReport{}
tb = NewTelemetryBuffer(hostAgentUrl)
err = tb.StartServer()
err = tb.StartServer(false)
if err == nil {
go tb.BufferAndPushData(0)
}
@ -186,13 +186,6 @@ func TestSendTelemetry(t *testing.T) {
}
}
func TestReceiveTelemetryData(t *testing.T) {
time.Sleep(300 * time.Millisecond)
if len(tb.buffer.CNIReports) != 1 {
t.Errorf("buffer doesn't contain CNI report")
}
}
func TestCloseTelemetryConnection(t *testing.T) {
tb.Cancel()
time.Sleep(300 * time.Millisecond)
@ -204,7 +197,7 @@ func TestCloseTelemetryConnection(t *testing.T) {
func TestServerCloseTelemetryConnection(t *testing.T) {
// create server telemetrybuffer and start server
tb = NewTelemetryBuffer(hostAgentUrl)
err := tb.StartServer()
err := tb.StartServer(false)
if err == nil {
go tb.BufferAndPushData(0)
}
@ -235,7 +228,7 @@ func TestServerCloseTelemetryConnection(t *testing.T) {
func TestClientCloseTelemetryConnection(t *testing.T) {
// create server telemetrybuffer and start server
tb = NewTelemetryBuffer(hostAgentUrl)
err := tb.StartServer()
err := tb.StartServer(false)
if err == nil {
go tb.BufferAndPushData(0)
}

Просмотреть файл

@ -28,6 +28,17 @@ import (
// TelemetryConfig - telemetry config read by telemetry service
type TelemetryConfig struct {
ReportToHostIntervalInSeconds time.Duration `json:"reportToHostIntervalInSeconds"`
DisableAll bool
DisableTrace bool
DisableMetric bool
DisableMetadataThread bool
DebugMode bool
DisableTelemetryToNetAgent bool
RefreshTimeoutInSecs int
BatchIntervalInSecs int
BatchSizeInBytes int
GetEnvRetryCount int
GetEnvRetryWaitTimeInSecs int
}
// FdName - file descriptor name
@ -50,7 +61,10 @@ const (
cni = "CNI"
)
var payloadSize uint16 = 0
var (
payloadSize uint16 = 0
disableTelemetryToNetAgent bool
)
// TelemetryBuffer object
type TelemetryBuffer struct {
@ -104,7 +118,8 @@ func remove(s []net.Conn, i int) []net.Conn {
}
// Starts Telemetry server listening on unix domain socket
func (tb *TelemetryBuffer) StartServer() error {
func (tb *TelemetryBuffer) StartServer(disableNetAgentChannel bool) error {
disableTelemetryToNetAgent = disableNetAgentChannel
err := tb.Listen(FdName)
if err != nil {
tb.FdExists = strings.Contains(err.Error(), "in use") || strings.Contains(err.Error(), "Access is denied")
@ -136,6 +151,10 @@ func (tb *TelemetryBuffer) StartServer() error {
var cniReport CNIReport
json.Unmarshal([]byte(reportStr), &cniReport)
tb.data <- cniReport
} else if _, ok := tmp["Metric"]; ok {
var aiMetric AIMetric
json.Unmarshal([]byte(reportStr), &aiMetric)
tb.data <- aiMetric
} else if _, ok := tmp["Allocations"]; ok {
var dncReport DNCReport
json.Unmarshal([]byte(reportStr), &dncReport)
@ -279,6 +298,10 @@ func (tb *TelemetryBuffer) Close() {
// sendToHost - send buffer to host
func (tb *TelemetryBuffer) sendToHost() error {
if disableTelemetryToNetAgent {
return nil
}
buf := Buffer{
DNCReports: make([]DNCReport, 0),
CNIReports: make([]CNIReport, 0),
@ -443,7 +466,13 @@ func (buf *Buffer) push(x interface{}) {
}
cniReport := x.(CNIReport)
cniReport.Metadata = metadata
SendAITelemetry(cniReport)
buf.CNIReports = append(buf.CNIReports, cniReport)
case AIMetric:
aiMetric := x.(AIMetric)
SendAIMetric(aiMetric)
case NPMReport:
if len(buf.NPMReports) >= MaxNumReports {
return