This commit is contained in:
David Albertson 2018-11-02 14:47:24 -07:00
Родитель 3cb0ca4dd9
Коммит 4de1feaaba
8 изменённых файлов: 380 добавлений и 24 удалений

Просмотреть файл

@ -14,6 +14,8 @@ const (
DEFAULT_workerVersion = "2.0.0"
DEFAULT_sandboxExecutableName = "sandbox"
DEFAULT_jrdsPollingFrequencyInSeconds = 10
DEFAULT_component = "worker"
DEFAULT_debugTraces = false
)
type Configuration struct {
@ -29,6 +31,10 @@ type Configuration struct {
SandboxExecutablePath string `json:"sandbox_executable_path"`
JrdsPollingFrequency string `json:"jrds_polling_frequency"`
DebugTraces bool `json:"debug_traces"`
// runtime configuration
Component string `json:"component"`
}
func LoadConfiguration(path string) error {
@ -73,14 +79,13 @@ var clearConfiguration = func() {
var getEnvironmentConfiguration = func() Configuration {
value, exists := os.LookupEnv(environmentConfigurationKey)
if exists == false {
panic("unable to get configuration from environment")
}
configuration := Configuration{}
err := deserializeConfiguration([]byte(value), &configuration)
if err != nil {
panic("unable to deserialize configuration from environment")
if exists {
err := deserializeConfiguration([]byte(value), &configuration)
if err != nil {
panic("unable to deserialize configuration from environment")
}
}
return configuration
}
@ -103,7 +108,9 @@ var getDefaultConfiguration = func() Configuration {
HybridWorkerGroupName: DEFAULT_empty,
WorkerVersion: DEFAULT_workerVersion,
WorkerWorkingDirectory: DEFAULT_empty,
SandboxExecutablePath: DEFAULT_sandboxExecutableName}
SandboxExecutablePath: DEFAULT_sandboxExecutableName,
Component: DEFAULT_component,
DebugTraces: DEFAULT_debugTraces}
}
var GetJrdsCertificatePath = func() string {
@ -155,3 +162,13 @@ var GetJrdsPollingFrequencyInSeconds = func() int64 {
return int64(freq)
}
var GetComponent = func() string {
config := getEnvironmentConfiguration()
return config.Component
}
var GetDebugTraces = func() bool {
config := getEnvironmentConfiguration()
return config.DebugTraces
}

Просмотреть файл

@ -24,7 +24,7 @@ const (
appjson_headerValue = "application/json"
keepalive_headerValue = "keep-alive"
datetimeFormat = "2018-10-20T01:00:00.0000000"
datetimeFormat = "2006-01-02T15:04:05.999999"
)
type httpClient interface {
@ -119,7 +119,7 @@ func (jrds *JrdsClient) SetJobStream(jobId string, runbookVersionId string, text
return nil
}
func (jrds *JrdsClient) SetLog(eventId string, activityId string, logType string, args ...string) error {
func (jrds *JrdsClient) SetLog(eventId int, activityId string, logType int, args ...string) error {
log := Log{EventId: &eventId, Arguments: &args, LogType: &logType, ActivityId: &activityId}
url := fmt.Sprintf("%s/automationAccounts/%s/logs?api-version=%s", jrds.baseUri, jrds.accountId, jrds.protocolVersion)
err := jrds.issuePostRequest(url, log, nil)

Просмотреть файл

@ -18,20 +18,31 @@ type HttpClient struct {
httpClient *http.Client
}
func NewInSecureHttpClient() HttpClient {
httpClient := &http.Client{}
return HttpClient{httpClient}
}
func NewSecureHttpClient(certificate string, key string) HttpClient {
cert, err := tls.LoadX509KeyPair(certificate, key)
if err != nil {
log.Fatal(err)
}
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
Renegotiation: tls.RenegotiateFreelyAsClient,
}
transport := &http.Transport{TLSClientConfig: tlsConfig}
httpClient := &http.Client{Transport: transport}
return HttpClient{httpClient}
}
func NewInsecureHttpClient(certificate string, key string) HttpClient {
cert, err := tls.LoadX509KeyPair(certificate, key)
if err != nil {
log.Fatal(err)
}
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
InsecureSkipVerify: true,
InsecureSkipVerify: true, // TODO: remove for prod
Renegotiation: tls.RenegotiateFreelyAsClient,
}

Просмотреть файл

@ -92,8 +92,8 @@ type Stream struct {
type Log struct {
ActivityId *string `json:"activityId"`
Arguments *[]string `json:"args"`
EventId *string `json:"eventId"`
LogType *string `json:"logtype"`
EventId *int `json:"eventId"`
LogType *int `json:"logtype"`
}
type UnloadJob struct {

200
internal/tracer/tracer.go Normal file
Просмотреть файл

@ -0,0 +1,200 @@
package tracer
import (
"fmt"
"github.com/Azure/azure-automation-go-worker/internal/configuration"
"github.com/Azure/azure-automation-go-worker/internal/jrds"
"math/rand"
"os"
"reflect"
"runtime"
"strconv"
"strings"
"time"
)
const (
logPrefix = "Log"
empty = ""
traceDatetimeFormat = "2006-01-02T15:04:05.99"
cloudDebugLogType = 0
cloudHybridTraceEventId = 16000
keywordError = "Error"
keywordDebug = "Debug"
keywordStartup = "Startup"
keywordRoutine = "Routine"
tasknameTraceError = "TraceError"
)
var (
jrdsClient jrdsTracer
activityId = generateActivityId()
tracerPackageName = reflect.TypeOf(tracer{}).PkgPath()
)
type tracer struct {
}
type trace struct {
component string
threadId int
processId int
eventId int
taskName string
message string
keyword string
activityId string
accountId string
subscriptionId string
machineId string
hybridWorkerGroupName string
hybridWorkerVersion string
}
type jrdsTracer interface {
SetLog(eventId int, activityId string, logType int, args ...string) error
}
func InitializeTracer(client jrdsTracer) {
jrdsClient = client
}
var traceGenericHybridWorkerEvent = func(eventId int, taskName string, message string, keyword string) {
trace := NewTrace(eventId, taskName, message, keyword)
go traceGenericHybridWorkerEventRoutine(trace)
}
var traceGenericHybridWorkerEventRoutine = func(trace trace) {
// local stdout
traceLocally(trace)
// cloud stdout
err := formatAndIssueTrace(trace)
if err != nil {
traceErrorLocally(fmt.Sprintf("error while calling formatAndIssueTrace : %v \n", err))
}
}
var NewTrace = func(eventId int, taskName string, message string, keyword string) trace {
return trace{
component: configuration.GetComponent(),
threadId: 1,
processId: os.Getpid(),
eventId: eventId,
taskName: taskName,
message: message,
keyword: keyword,
activityId: activityId,
accountId: configuration.GetAccountId(),
subscriptionId: empty,
machineId: empty,
hybridWorkerGroupName: configuration.GetHybridWorkerGroupName(),
hybridWorkerVersion: configuration.GetWorkerVersion()}
}
var traceErrorLocally = func(message string) {
errorTrace := NewTrace(9, tasknameTraceError, message, keywordError)
traceLocally(errorTrace)
}
var traceLocally = func(trace trace) {
const format = "%s (%v)[%s] : [%s] %v \n"
var now = time.Now().Format(traceDatetimeFormat)
fmt.Printf(format, now, trace.processId, trace.component, trace.taskName, trace.message)
}
var formatAndIssueTrace = func(trace trace) error {
// this format matches the cloud etw manifest; do not reorder
cloudTraceFormat := []string{
trace.accountId,
trace.subscriptionId,
trace.hybridWorkerGroupName,
trace.machineId,
trace.component,
strconv.Itoa(trace.eventId),
trace.taskName,
trace.keyword,
strconv.Itoa(trace.threadId),
strconv.Itoa(trace.processId),
trace.activityId,
trace.hybridWorkerVersion,
trace.message,
}
err := issueJrdsTrace(cloudHybridTraceEventId, activityId, cloudDebugLogType, cloudTraceFormat)
if err != nil {
return err
}
return nil
}
var issueJrdsTrace = func(eventId int, activityId string, logType int, arg []string) error {
if jrdsClient == nil {
return fmt.Errorf("error emitting trace; nil jrds client in tracer package \n")
}
err := jrdsClient.SetLog(eventId, activityId, logType, arg...)
if err != nil {
return fmt.Errorf("error emitting trace to jrds : %v \n", err)
}
return nil
}
// generateActivityId generates a somewhat unique uuid; this isn't a proper uuid implementation and should only
// be used temporally until we have time to implement a proper uuid generation algorithm
var generateActivityId = func() string {
b := make([]byte, 16)
rand.Read(b)
return fmt.Sprintf("%x-%x-%x-%x-%x", b[0:4], b[4:6], b[6:8], b[8:10], b[10:16])
}
var getTraceName = func() string {
pc := make([]uintptr, 10)
// skip 2 frames (Callers() and getTraceName()); assumes this is called from the tracing function directly only
callers := runtime.Callers(2, pc)
frames := runtime.CallersFrames(pc[:callers])
frame, _ := frames.Next()
replacer := strings.NewReplacer(logPrefix, empty, fmt.Sprintf("%s.", tracerPackageName), empty)
return replacer.Replace(frame.Function)
}
func LogWorkerTraceError(message string) {
traceGenericHybridWorkerEvent(20000, getTraceName(), message, keywordStartup)
}
func LogDebugTrace(message string) {
traceGenericHybridWorkerEvent(20001, getTraceName(), message, keywordDebug)
}
func LogWorkerStarting() {
message := "Worker Starting"
traceGenericHybridWorkerEvent(20020, getTraceName(), message, keywordStartup)
}
func LogWorkerSandboxActionsFound(actions jrds.SandboxActions) {
message := fmt.Sprintf("Get sandbox actions found %v new action(s).", len(actions.Value))
traceGenericHybridWorkerEvent(20100, getTraceName(), message, keywordRoutine)
}
func LogWorkerErrorGettingSandboxActions(err error) {
message := fmt.Sprintf("Error getting sandbox actions. [error=%v]", err.Error())
traceGenericHybridWorkerEvent(20101, getTraceName(), message, keywordRoutine)
}
func LogWorkerFailedToCreateSandbox(err error) {
message := fmt.Sprintf("Error creating sandbox. [error=%v]", err.Error())
traceGenericHybridWorkerEvent(20102, getTraceName(), message, keywordRoutine)
}

Просмотреть файл

@ -0,0 +1,122 @@
package tracer
import (
"fmt"
"github.com/Azure/azure-automation-go-worker/internal/configuration"
"github.com/Azure/azure-automation-go-worker/internal/jrds"
"strconv"
"testing"
"time"
)
func TestInitializeTracer(t *testing.T) {
configuration.LoadConfiguration("C:\\Users\\dalbe\\go\\src\\github.com\\Azure\\azure-automation-go-worker\\misc\\windows-test.json")
httpClient := jrds.NewInsecureHttpClient(configuration.GetJrdsCertificatePath(), configuration.GetJrdsKeyPath())
jrdsClient := jrds.NewJrdsClient(&httpClient, configuration.GetJrdsBaseUri(), configuration.GetAccountId(), configuration.GetHybridWorkerGroupName())
InitializeTracer(&jrdsClient)
LogWorkerStarting()
}
type jrdsMock struct {
setLog_f func(eventId int, activityId string, logType int, args ...string) error
}
func (j *jrdsMock) SetLog(eventId int, activityId string, logType int, args ...string) error {
return j.setLog_f(eventId, activityId, logType, args...)
}
func TestLogWorkerStarting(t *testing.T) {
mock := jrdsMock{}
var (
setLogCalled = false
traceLocallyCalled = false
)
mock.setLog_f = func(eventId int, activityId string, logType int, args ...string) error {
setLogCalled = true
return nil
}
traceLocally = func(trace trace) {
traceLocallyCalled = true
}
InitializeTracer(&mock)
LogWorkerStarting()
// tracing are background routine
time.Sleep(10 * time.Millisecond)
if !setLogCalled || !traceLocallyCalled {
t.Fatal("unexpected missing call to local or cloud trace")
}
}
func Test_LocalTraceOnJrdsTraceError(t *testing.T) {
mock := jrdsMock{}
mock.setLog_f = func(eventId int, activityId string, logType int, args ...string) error {
return fmt.Errorf(empty)
}
localTraceCount := 0
traceLocally = func(trace trace) {
localTraceCount += 1
}
InitializeTracer(&mock)
LogWorkerStarting()
// tracing are background routine
time.Sleep(10 * time.Millisecond)
if localTraceCount != 2 {
t.Fatal("missing local trace on jrds trace exception")
}
}
func Test_CloudTraceFormat(t *testing.T) {
// we need to follow a strict contract for cloud trace format;
// this test is simply to ensure that this format is enforced; do not change this format
trace := NewTrace(0, empty, empty, empty)
issueJrdsTrace = func(eventId int, activityId string, logType int, arg []string) error {
if eventId != 16000 ||
logType != 0 ||
arg[0] != trace.accountId ||
arg[1] != trace.subscriptionId ||
arg[2] != trace.hybridWorkerGroupName ||
arg[3] != trace.machineId ||
arg[4] != trace.component ||
arg[5] != strconv.Itoa(trace.eventId) ||
arg[6] != trace.taskName ||
arg[7] != trace.keyword ||
arg[8] != strconv.Itoa(trace.threadId) ||
arg[9] != strconv.Itoa(trace.processId) ||
arg[10] != trace.activityId ||
arg[11] != trace.hybridWorkerVersion ||
arg[12] != trace.message {
t.Fatal("unexpected cloud trace format")
}
return nil
}
formatAndIssueTrace(trace)
// tracing are background routine
time.Sleep(10 * time.Millisecond)
}
func LogWorkerTestMethod() string {
return getTraceName()
}
func Test_GetTraceName(t *testing.T) {
expectedTraceName := "WorkerTestMethod"
traceName := LogWorkerTestMethod()
if traceName != expectedTraceName {
t.Fatal("unexpected trace name")
}
}

Просмотреть файл

@ -4,6 +4,7 @@ import (
"fmt"
"github.com/Azure/azure-automation-go-worker/internal/configuration"
"github.com/Azure/azure-automation-go-worker/internal/jrds"
"github.com/Azure/azure-automation-go-worker/internal/tracer"
"github.com/Azure/azure-automation-go-worker/main/worker/sandbox"
"os"
"time"
@ -40,13 +41,14 @@ func (worker *Worker) routine() {
actions := jrds.SandboxActions{}
err := worker.jrdsClient.GetSandboxActions(&actions)
if err != nil {
fmt.Printf("error getting sandbox.exe action")
tracer.LogWorkerErrorGettingSandboxActions(err)
return
}
// start a new sandbox for each actions returned by jrds
fmt.Printf("Get sandox action : %v actions \n", len(actions.Value))
tracer.LogDebugTrace(fmt.Sprintf("Get sandbox action. Found %v action(s).", len(actions.Value)))
if len(actions.Value) > 0 {
tracer.LogWorkerSandboxActionsFound(actions)
for _, action := range actions.Value {
sandboxId := *action.SandboxId
if _, tracked := worker.sandboxCollection[sandboxId]; tracked {
@ -58,7 +60,7 @@ func (worker *Worker) routine() {
worker.sandboxCollection[sandbox.Id] = &sandbox
err := createAndStartSandbox(&sandbox)
if err != nil {
fmt.Printf("error calling create and start sandbox %v", err)
tracer.LogWorkerFailedToCreateSandbox(err)
}
}
}
@ -96,13 +98,17 @@ var monitorSandbox = func(sandbox *sandbox.Sandbox) {
}
func main() {
fmt.Println("Worker starting")
// always load configuration and initialize tracer before anything else
err := configuration.LoadConfiguration(os.Args[1])
if err != nil {
panic(err)
}
// always load configuration before anything else
configuration.LoadConfiguration(os.Args[1])
httpClient := jrds.NewSecureHttpClient(configuration.GetJrdsCertificatePath(), configuration.GetJrdsKeyPath())
jrdsClient := jrds.NewJrdsClient(&httpClient, configuration.GetJrdsBaseUri(), configuration.GetAccountId(), configuration.GetHybridWorkerGroupName())
tracer.InitializeTracer(&jrdsClient)
tracer.LogWorkerStarting()
worker := NewWorker(&jrdsClient)
worker.Start()
}

Просмотреть файл

@ -9,7 +9,7 @@
"worker_version" : "",
"working_directory_path" : "",
"debug_traces" : "",
"debug_traces" : false,
"bypass_certificate_verification" : "",
"enforce_runbook_signature_validation" : "",
"gpg_public_keyring_path" : "",