Config support for Telemetry service (#317)

* 1. Start telemetry service as separate process for all components cns/networkmonitor
2. Added telemetry config through which we can configure reportTohost interval
3. Added unit tests for changes

* added new files for telemetry testing

* added tests for invalid cases

* updated with dummy subid
This commit is contained in:
tamilmani1989 2019-03-25 14:12:32 -07:00 коммит произвёл GitHub
Родитель 0012ae5344
Коммит a11d5104a0
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
10 изменённых файлов: 262 добавлений и 72 удалений

Просмотреть файл

@ -244,14 +244,16 @@ publish-azure-npm-image:
.PHONY: cni-archive
cni-archive:
cp cni/azure-$(GOOS).conflist $(CNI_BUILD_DIR)/10-azure.conflist
cp telemetry/azure-vnet-telemetry.config $(CNI_BUILD_DIR)/azure-vnet-telemetry.config
chmod 0755 $(CNI_BUILD_DIR)/azure-vnet$(EXE_EXT) $(CNI_BUILD_DIR)/azure-vnet-ipam$(EXE_EXT) $(CNI_BUILD_DIR)/azure-vnet-telemetry$(EXE_EXT)
cd $(CNI_BUILD_DIR) && $(ARCHIVE_CMD) $(CNI_ARCHIVE_NAME) azure-vnet$(EXE_EXT) azure-vnet-ipam$(EXE_EXT) azure-vnet-telemetry$(EXE_EXT) 10-azure.conflist
cd $(CNI_BUILD_DIR) && $(ARCHIVE_CMD) $(CNI_ARCHIVE_NAME) azure-vnet$(EXE_EXT) azure-vnet-ipam$(EXE_EXT) azure-vnet-telemetry$(EXE_EXT) 10-azure.conflist azure-vnet-telemetry.config
chown $(BUILD_USER):$(BUILD_USER) $(CNI_BUILD_DIR)/$(CNI_ARCHIVE_NAME)
mkdir -p $(CNI_MULTITENANCY_BUILD_DIR)
cp cni/azure-$(GOOS)-multitenancy.conflist $(CNI_MULTITENANCY_BUILD_DIR)/10-azure.conflist
cp $(CNI_BUILD_DIR)/azure-vnet$(EXE_EXT) $(CNI_BUILD_DIR)/azure-vnet-ipam$(EXE_EXT) $(CNI_MULTITENANCY_BUILD_DIR)
cp telemetry/azure-vnet-telemetry.config $(CNI_MULTITENANCY_BUILD_DIR)/azure-vnet-telemetry.config
cp $(CNI_BUILD_DIR)/azure-vnet$(EXE_EXT) $(CNI_BUILD_DIR)/azure-vnet-ipam$(EXE_EXT) $(CNI_BUILD_DIR)/azure-vnet-telemetry$(EXE_EXT) $(CNI_MULTITENANCY_BUILD_DIR)
chmod 0755 $(CNI_MULTITENANCY_BUILD_DIR)/azure-vnet$(EXE_EXT) $(CNI_MULTITENANCY_BUILD_DIR)/azure-vnet-ipam$(EXE_EXT)
cd $(CNI_MULTITENANCY_BUILD_DIR) && $(ARCHIVE_CMD) $(CNI_MULTITENANCY_ARCHIVE_NAME) azure-vnet$(EXE_EXT) azure-vnet-ipam$(EXE_EXT) 10-azure.conflist
cd $(CNI_MULTITENANCY_BUILD_DIR) && $(ARCHIVE_CMD) $(CNI_MULTITENANCY_ARCHIVE_NAME) azure-vnet$(EXE_EXT) azure-vnet-ipam$(EXE_EXT) azure-vnet-telemetry$(EXE_EXT) 10-azure.conflist azure-vnet-telemetry.config
chown $(BUILD_USER):$(BUILD_USER) $(CNI_MULTITENANCY_BUILD_DIR)/$(CNI_MULTITENANCY_ARCHIVE_NAME)
# Create a CNM archive for the target platform.

Просмотреть файл

@ -22,9 +22,11 @@ import (
)
const (
hostNetAgentURL = "http://168.63.129.16/machine/plugins?comp=netagent&type=cnireport"
ipamQueryURL = "http://168.63.129.16/machine/plugins?comp=nmagent&type=getinterfaceinfov1"
pluginName = "CNI"
hostNetAgentURL = "http://168.63.129.16/machine/plugins?comp=netagent&type=cnireport"
ipamQueryURL = "http://168.63.129.16/machine/plugins?comp=nmagent&type=getinterfaceinfov1"
pluginName = "CNI"
telemetryNumRetries = 5
telemetryWaitTimeInMilliseconds = 200
)
// Version is populated by make during build.
@ -133,32 +135,9 @@ func handleIfCniUpdate(update func(*skel.CmdArgs) error) (bool, error) {
return isupdate, nil
}
// startTelemetryService - Kills if any telemetry service runs and start new telemetry service
func startTelemetryService(path string) error {
platform.KillProcessByName(telemetry.TelemetryServiceProcessName)
log.Printf("[cni] Starting telemetry service process")
if err := common.StartProcess(path); err != nil {
log.Printf("[Telemetry] Failed to start telemetry service process :%v", err)
return err
}
log.Printf("[cni] Telemetry service started")
for attempt := 0; attempt < 5; attempt++ {
if telemetry.SockExists() {
break
}
time.Sleep(200 * time.Millisecond)
}
return nil
}
func connectToTelemetryService(tb *telemetry.TelemetryBuffer) {
path := fmt.Sprintf("%v/%v", telemetry.CniInstallDir, telemetry.TelemetryServiceProcessName)
args := []string{"-d", telemetry.CniInstallDir}
for attempt := 0; attempt < 2; attempt++ {
if err := tb.Connect(); err != nil {
@ -170,7 +149,8 @@ func connectToTelemetryService(tb *telemetry.TelemetryBuffer) {
return
}
startTelemetryService(path)
telemetry.StartTelemetryService(path, args)
telemetry.WaitForTelemetrySocket(telemetryNumRetries, telemetryWaitTimeInMilliseconds)
} else {
tb.Connected = true
log.Printf("Connected to telemetry service")

Просмотреть файл

@ -3,30 +3,145 @@ package main
// Entry point of the telemetry service if started by CNI
import (
"fmt"
"os"
"runtime"
"time"
acn "github.com/Azure/azure-container-networking/common"
"github.com/Azure/azure-container-networking/log"
"github.com/Azure/azure-container-networking/telemetry"
)
const (
reportToHostIntervalInSeconds = 60 * time.Second
azurecnitelemetry = "azure-vnet-telemetry"
reportToHostIntervalInSeconds = 30
azureVnetTelemetry = "azure-vnet-telemetry"
configExtension = ".config"
)
var version string
var args = acn.ArgumentList{
{
Name: acn.OptLogLevel,
Shorthand: acn.OptLogLevelAlias,
Description: "Set the logging level",
Type: "int",
DefaultValue: acn.OptLogLevelInfo,
ValueMap: map[string]interface{}{
acn.OptLogLevelInfo: log.LevelInfo,
acn.OptLogLevelDebug: log.LevelDebug,
},
},
{
Name: acn.OptLogTarget,
Shorthand: acn.OptLogTargetAlias,
Description: "Set the logging target",
Type: "int",
DefaultValue: acn.OptLogTargetFile,
ValueMap: map[string]interface{}{
acn.OptLogTargetSyslog: log.TargetSyslog,
acn.OptLogTargetStderr: log.TargetStderr,
acn.OptLogTargetFile: log.TargetLogfile,
acn.OptLogStdout: log.TargetStdout,
acn.OptLogMultiWrite: log.TargetStdOutAndLogFile,
},
},
{
Name: acn.OptLogLocation,
Shorthand: acn.OptLogLocationAlias,
Description: "Set the directory location where logs will be saved",
Type: "string",
DefaultValue: "",
},
{
Name: acn.OptVersion,
Shorthand: acn.OptVersionAlias,
Description: "Print version information",
Type: "bool",
DefaultValue: false,
},
{
Name: acn.OptTelemetryConfigDir,
Shorthand: acn.OptTelemetryConfigDirAlias,
Description: "Set the telmetry config directory",
Type: "string",
DefaultValue: telemetry.CniInstallDir,
},
}
// Prints description and version information.
func printVersion() {
fmt.Printf("Azure Container Telemetry Service\n")
fmt.Printf("Version %v\n", version)
}
func main() {
var tb *telemetry.TelemetryBuffer
var config telemetry.TelemetryConfig
var configPath string
var err error
acn.ParseArgs(&args, printVersion)
logTarget := acn.GetArg(acn.OptLogTarget).(int)
logDirectory := acn.GetArg(acn.OptLogLocation).(string)
logLevel := acn.GetArg(acn.OptLogLevel).(int)
configDirectory := acn.GetArg(acn.OptTelemetryConfigDir).(string)
vers := acn.GetArg(acn.OptVersion).(bool)
if vers {
printVersion()
os.Exit(0)
}
log.SetName(azureVnetTelemetry)
log.SetLevel(logLevel)
if logDirectory != "" {
log.SetLogDirectory(logDirectory)
}
err = log.SetTarget(logTarget)
if err != nil {
fmt.Printf("Failed to configure logging: %v\n", err)
return
}
log.Printf("args %+v", os.Args)
if runtime.GOOS == "linux" {
configPath = fmt.Sprintf("%s/%s%s", configDirectory, azureVnetTelemetry, configExtension)
} else {
configPath = fmt.Sprintf("%s\\%s%s", configDirectory, azureVnetTelemetry, configExtension)
}
log.Printf("[Telemetry] Config path: %s", configPath)
config, err = telemetry.ReadConfigFile(configPath)
if err != nil {
log.Printf("[Telemetry] Error reading telemetry config: %v", err)
}
log.Printf("read config returned %+v", config)
for {
tb = telemetry.NewTelemetryBuffer("")
log.Printf("[Telemetry] Starting telemetry server")
err = tb.StartServer()
if err == nil || tb.FdExists {
break
}
log.Printf("[Telemetry] Telemetry service starting failed: %v", err)
tb.Cleanup(telemetry.FdName)
time.Sleep(time.Millisecond * 200)
}
tb.BufferAndPushData(reportToHostIntervalInSeconds)
if config.ReportToHostIntervalInSeconds == 0 {
config.ReportToHostIntervalInSeconds = reportToHostIntervalInSeconds
}
log.Printf("[Telemetry] Report to host for an interval of %d seconds", config.ReportToHostIntervalInSeconds)
tb.BufferAndPushData(config.ReportToHostIntervalInSeconds * time.Second)
log.Close()
}

Просмотреть файл

@ -67,4 +67,8 @@ const (
// CNI binary location
OptCNIConfigFile = "cni-config-file"
OptCNIConfigFileAlias = "cniconfig"
// Telemetry config Location
OptTelemetryConfigDir = "telemetry-config-file"
OptTelemetryConfigDirAlias = "d"
)

Просмотреть файл

@ -104,7 +104,7 @@ func GetInterfaceSubnetWithSpecificIp(ipAddr string) *net.IPNet {
return nil
}
func StartProcess(path string) error {
func StartProcess(path string, args []string) error {
var attr = os.ProcAttr{
Env: os.Environ(),
Files: []*os.File{
@ -114,8 +114,8 @@ func StartProcess(path string) error {
},
}
args := []string{path}
process, err := os.StartProcess(path, args, &attr)
processArgs := append([]string{path}, args...)
process, err := os.StartProcess(path, processArgs, &attr)
if err == nil {
// Release detaches the process
return process.Release()

Просмотреть файл

@ -0,0 +1,3 @@
{
"reportToHostIntervalInSeconds": 30
}

Просмотреть файл

@ -0,0 +1 @@
{"location":"eastus","name":"k8s-agentpool1-42685608-0","offer":"aks","osType":"Linux","placementGroupId":"","platformFaultDomain":"0","platformUpdateDomain":"0","publisher":"microsoft-aks","resourceGroupName":"rgcnideftesttamil","sku":"aks-ubuntu-1604-201902","subscriptionId":"ea821859-912a-4d20-a4dd-e18a3ce5ba2c","tags":"aksEngineVersion:canary;creationSource:aksengine-k8s-agentpool1-42685608-0;orchestrator:Kubernetes:1.10.13;poolName:agentpool1;resourceNameSuffix:42685608","version":"2019.02.12","vmId":"6baf785b-397c-4967-9f75-cdb3d0df66c4","vmSize":"Standard_DS2_v2","KernelVersion":""}

Просмотреть файл

@ -325,7 +325,7 @@ func (report *CNIReport) GetInterfaceDetails(queryUrl string) {
if resp.StatusCode != http.StatusOK {
errMsg := fmt.Sprintf("Error while getting interface details. http code :%d", resp.StatusCode)
report.InterfaceDetails.ErrorMessage = errMsg
telemetryLogger.Printf(errMsg)
log.Printf(errMsg)
return
}

Просмотреть файл

@ -11,10 +11,16 @@ import (
"net/http"
"net/url"
"os"
"runtime"
"testing"
"time"
"github.com/Azure/azure-container-networking/common"
"github.com/Azure/azure-container-networking/platform"
)
const (
telemetryConfig = "azure-vnet-telemetry.config"
)
var reportManager *ReportManager
@ -89,15 +95,17 @@ func TestMain(m *testing.M) {
return
}
if runtime.GOOS == "linux" {
platform.ExecuteCommand("cp metadata_test.json /tmp/azuremetadata.json")
} else {
platform.ExecuteCommand("copy metadata_test.json azuremetadata.json")
}
reportManager = &ReportManager{}
reportManager.HostNetAgentURL = "http://" + hostAgentUrl
reportManager.ContentType = "application/json"
reportManager.Report = &CNIReport{}
if err := InitTelemetryLogger(); err == nil {
defer CloseTelemetryLogger()
}
tb = NewTelemetryBuffer(hostAgentUrl)
err = tb.StartServer()
if err == nil {
@ -109,6 +117,13 @@ func TestMain(m *testing.M) {
}
exitCode := m.Run()
if runtime.GOOS == "linux" {
platform.ExecuteCommand("rm /tmp/azuremetadata.json")
} else {
platform.ExecuteCommand("del azuremetadata.json")
}
tb.Cleanup(FdName)
os.Exit(exitCode)
}
@ -248,6 +263,38 @@ func TestClientCloseTelemetryConnection(t *testing.T) {
tb.Cancel()
}
func TestReadConfigFile(t *testing.T) {
config, err := ReadConfigFile(telemetryConfig)
if err != nil {
t.Errorf("Read telemetry config failed with error %v", err)
}
if config.ReportToHostIntervalInSeconds != 30 {
t.Errorf("ReportToHostIntervalInSeconds not expected value. Got %d", config.ReportToHostIntervalInSeconds)
}
config, err = ReadConfigFile("a.config")
if err == nil {
t.Errorf("[Telemetry] Didn't throw not found error: %v", err)
}
config, err = ReadConfigFile("telemetry.go")
if err == nil {
t.Errorf("[Telemetry] Didn't report invalid telemetry config: %v", err)
}
}
func TestStartTelemetryService(t *testing.T) {
err := StartTelemetryService("", nil)
if err == nil {
t.Errorf("StartTelemetryService didnt return error for incorrect service name %v", err)
}
}
func TestWaitForTelemetrySocket(t *testing.T) {
WaitForTelemetrySocket(1, 10)
}
func TestSetReportState(t *testing.T) {
err := reportManager.SetReportState("a.json")
if err != nil {

Просмотреть файл

@ -15,9 +15,16 @@ import (
"sync"
"time"
"github.com/Azure/azure-container-networking/common"
"github.com/Azure/azure-container-networking/log"
"github.com/Azure/azure-container-networking/platform"
)
// TelemetryConfig - telemetry config read by telemetry service
type TelemetryConfig struct {
ReportToHostIntervalInSeconds time.Duration `json:"reportToHostIntervalInSeconds"`
}
// FdName - file descriptor name
// Delimiter - delimiter for socket reads/writes
// azureHostReportURL - host net agent url of type payload
@ -28,7 +35,7 @@ const (
FdName = "azure-vnet-telemetry"
Delimiter = '\n'
azureHostReportURL = "http://168.63.129.16/machine/plugins?comp=netagent&type=payload"
DefaultInterval = 30 * time.Second
minInterval = 10 * time.Second
logName = "azure-vnet-telemetry"
MaxPayloadSize uint16 = 65535
dnc = "DNC"
@ -37,7 +44,6 @@ const (
cni = "CNI"
)
var telemetryLogger = log.NewLogger(logName, log.LevelInfo, log.TargetStderr)
var payloadSize uint16 = 0
// TelemetryBuffer object
@ -62,14 +68,6 @@ type Payload struct {
CNSReports []CNSReport
}
func InitTelemetryLogger() error {
return telemetryLogger.SetTarget(log.TargetLogfile)
}
func CloseTelemetryLogger() {
telemetryLogger.Close()
}
// NewTelemetryBuffer - create a new TelemetryBuffer
func NewTelemetryBuffer(hostReportURL string) *TelemetryBuffer {
var tb TelemetryBuffer
@ -95,7 +93,7 @@ func remove(s []net.Conn, i int) []net.Conn {
return s[:len(s)-1]
}
telemetryLogger.Printf("tb connections remove failed index %v len %v", i, len(s))
log.Printf("tb connections remove failed index %v len %v", i, len(s))
return s
}
@ -104,13 +102,11 @@ func (tb *TelemetryBuffer) StartServer() error {
err := tb.Listen(FdName)
if err != nil {
tb.FdExists = strings.Contains(err.Error(), "in use") || strings.Contains(err.Error(), "Access is denied")
telemetryLogger.Printf("Listen returns: %v", err.Error())
log.Printf("Listen returns: %v", err.Error())
return err
}
InitTelemetryLogger()
telemetryLogger.Printf("Telemetry service started")
log.Printf("Telemetry service started")
// Spawn server goroutine to handle incoming connections
go func() {
for {
@ -153,7 +149,6 @@ func (tb *TelemetryBuffer) StartServer() error {
for index, value = range tb.connections {
if value == conn {
telemetryLogger.Printf("Server closing client connection")
conn.Close()
found = true
break
@ -169,7 +164,7 @@ func (tb *TelemetryBuffer) StartServer() error {
}
}()
} else {
telemetryLogger.Printf("Telemetry Server accept error %v", err)
log.Printf("Telemetry Server accept error %v", err)
return
}
}
@ -193,9 +188,9 @@ func (tb *TelemetryBuffer) Connect() error {
func (tb *TelemetryBuffer) BufferAndPushData(intervalms time.Duration) {
defer tb.Close()
if !tb.FdExists {
telemetryLogger.Printf("[Telemetry] Buffer telemetry data and send it to host")
if intervalms < DefaultInterval {
intervalms = DefaultInterval
log.Printf("[Telemetry] Buffer telemetry data and send it to host")
if intervalms < minInterval {
intervalms = minInterval
}
interval := time.NewTicker(intervalms).C
@ -207,18 +202,18 @@ func (tb *TelemetryBuffer) BufferAndPushData(intervalms time.Duration) {
if err := tb.sendToHost(); err == nil {
tb.payload.reset()
} else {
telemetryLogger.Printf("[Telemetry] sending to host failed with error %+v", err)
log.Printf("[Telemetry] sending to host failed with error %+v", err)
}
case report := <-tb.data:
tb.payload.push(report)
case <-tb.cancel:
telemetryLogger.Printf("server cancel event")
log.Printf("server cancel event")
goto EXIT
}
}
} else {
<-tb.cancel
telemetryLogger.Printf("Received cancel event")
log.Printf("Received cancel event")
}
EXIT:
@ -259,9 +254,8 @@ func (tb *TelemetryBuffer) Close() {
}
if tb.listener != nil {
telemetryLogger.Printf("server close")
log.Printf("server close")
tb.listener.Close()
CloseTelemetryLogger()
}
tb.mutex.Lock()
@ -281,7 +275,7 @@ func (tb *TelemetryBuffer) Close() {
func (tb *TelemetryBuffer) sendToHost() error {
httpc := &http.Client{}
var body bytes.Buffer
telemetryLogger.Printf("Sending payload %+v", tb.payload)
log.Printf("Sending payload %+v", tb.payload)
json.NewEncoder(&body).Encode(tb.payload)
resp, err := httpc.Post(tb.azureHostReportURL, ContentType, &body)
if err != nil {
@ -301,11 +295,11 @@ func (tb *TelemetryBuffer) sendToHost() error {
func (pl *Payload) push(x interface{}) {
metadata, err := getHostMetadata()
if err != nil {
telemetryLogger.Printf("Error getting metadata %v", err)
log.Printf("Error getting metadata %v", err)
} else {
err = saveHostMetadata(metadata)
if err != nil {
telemetryLogger.Printf("saving host metadata failed with :%v", err)
log.Printf("saving host metadata failed with :%v", err)
}
}
@ -379,7 +373,7 @@ func saveHostMetadata(metadata Metadata) error {
}
if err = ioutil.WriteFile(metadataFile, dataBytes, 0644); err != nil {
telemetryLogger.Printf("[Telemetry] Writing metadata to file failed: %v", err)
log.Printf("[Telemetry] Writing metadata to file failed: %v", err)
}
return err
@ -395,7 +389,7 @@ func getHostMetadata() (Metadata, error) {
}
}
telemetryLogger.Printf("[Telemetry] Request metadata from wireserver")
log.Printf("[Telemetry] Request metadata from wireserver")
req, err := http.NewRequest("GET", metadataURL, nil)
if err != nil {
@ -426,3 +420,47 @@ func getHostMetadata() (Metadata, error) {
return metareport.Metadata, err
}
// WaitForTelemetrySocket - Block still pipe/sock created or until max attempts retried
func WaitForTelemetrySocket(maxAttempt int, waitTimeInMillisecs time.Duration) {
for attempt := 0; attempt < maxAttempt; attempt++ {
if SockExists() {
break
}
time.Sleep(waitTimeInMillisecs * time.Millisecond)
}
}
// StartTelemetryService - Kills if any telemetry service runs and start new telemetry service
func StartTelemetryService(path string, args []string) error {
platform.KillProcessByName(TelemetryServiceProcessName)
log.Printf("[Telemetry] Starting telemetry service process :%v args:%v", path, args)
if err := common.StartProcess(path, args); err != nil {
log.Printf("[Telemetry] Failed to start telemetry service process :%v", err)
return err
}
log.Printf("[Telemetry] Telemetry service started")
return nil
}
// ReadConfigFile - Read telemetry config file and populate to structure
func ReadConfigFile(filePath string) (TelemetryConfig, error) {
config := TelemetryConfig{}
b, err := ioutil.ReadFile(filePath)
if err != nil {
log.Printf("[Telemetry] Failed to read telemetry config: %v", err)
return config, err
}
if err = json.Unmarshal(b, &config); err != nil {
log.Printf("[Telemetry] unmarshal failed with %v", err)
}
return config, err
}