* Adding sandbox/job

* Adding asynccommand support

* Fixing sandbox is running

* Adding build.cmd for windows build

* Adding runtime component

* Adressing feedback
This commit is contained in:
David Albertson 2018-11-07 18:11:01 -08:00 коммит произвёл GitHub
Родитель 9a1e6760ed
Коммит 7528eff301
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
18 изменённых файлов: 1007 добавлений и 115 удалений

14
build.cmd Normal file
Просмотреть файл

@ -0,0 +1,14 @@
@echo off
set BINDIR=bin
set BIN_WORKER=worker.exe
set BIN_SANDBOX=sandbox.exe
REM clean
rmdir /s /q %BINDIR%
REM build
set GOOS=windows
set GOARCH=amd64
go build -v -o %BINDIR%\%BIN_WORKER% .\main\worker
go build -v -o %BINDIR%\%BIN_SANDBOX% .\main\sandbox

Просмотреть файл

@ -7,18 +7,20 @@ import (
"encoding/json"
"io/ioutil"
"os"
"strconv"
)
const (
environmentConfigurationKey = "WORKERCONF"
EnvironmentConfigurationKey = "WORKERCONF"
DEFAULT_empty = ""
DEFAULT_workerVersion = "2.0.0"
DEFAULT_sandboxExecutableName = "sandbox"
DEFAULT_jrdsPollingFrequencyInSeconds = 10
DEFAULT_component = "worker"
DEFAULT_component = Component_worker
DEFAULT_debugTraces = false
Component_sandbox = "sandbox"
Component_worker = "worker"
)
type Configuration struct {
@ -33,8 +35,8 @@ type Configuration struct {
WorkerWorkingDirectory string `json:"working_directory_path"`
SandboxExecutablePath string `json:"sandbox_executable_path"`
JrdsPollingFrequency string `json:"jrds_polling_frequency"`
DebugTraces bool `json:"debug_traces"`
JrdsPollingFrequency int `json:"jrds_polling_frequency"`
DebugTraces bool `json:"debug_traces"`
// runtime configuration
Component string `json:"component"`
@ -46,15 +48,23 @@ func LoadConfiguration(path string) error {
if err != nil {
return err
}
err = deserializeConfiguration(content, &configuration)
err = DeserializeConfiguration(content, &configuration)
if err != nil {
return err
}
setConfiguration(configuration)
setConfiguration(&configuration)
return nil
}
func SetConfiguration(configuration *Configuration) {
setConfiguration(configuration)
}
func GetConfiguration() Configuration {
return getEnvironmentConfiguration()
}
var readDiskConfiguration = func(path string) ([]byte, error) {
content, err := ioutil.ReadFile(path)
if err != nil {
@ -64,28 +74,28 @@ var readDiskConfiguration = func(path string) ([]byte, error) {
return content, nil
}
var setConfiguration = func(config Configuration) {
configuration, err := serializeConfiguration(config)
var setConfiguration = func(config *Configuration) {
configuration, err := SerializeConfiguration(config)
if err != nil {
panic("unable to serialize configuration from environment")
}
err = os.Setenv(environmentConfigurationKey, string(configuration))
err = os.Setenv(EnvironmentConfigurationKey, string(configuration))
if err != nil {
panic("unable to set configuration to environment")
}
}
var clearConfiguration = func() {
os.Unsetenv(environmentConfigurationKey)
os.Unsetenv(EnvironmentConfigurationKey)
}
var getEnvironmentConfiguration = func() Configuration {
value, exists := os.LookupEnv(environmentConfigurationKey)
value, exists := os.LookupEnv(EnvironmentConfigurationKey)
configuration := Configuration{}
if exists {
err := deserializeConfiguration([]byte(value), &configuration)
err := DeserializeConfiguration([]byte(value), &configuration)
if err != nil {
panic("unable to deserialize configuration from environment")
}
@ -93,11 +103,11 @@ var getEnvironmentConfiguration = func() Configuration {
return configuration
}
var serializeConfiguration = func(configuration Configuration) ([]byte, error) {
var SerializeConfiguration = func(configuration *Configuration) ([]byte, error) {
return json.Marshal(configuration)
}
var deserializeConfiguration = func(data []byte, configuration *Configuration) error {
var DeserializeConfiguration = func(data []byte, configuration *Configuration) error {
return json.Unmarshal(data, &configuration)
}
@ -113,7 +123,8 @@ var getDefaultConfiguration = func() Configuration {
WorkerWorkingDirectory: DEFAULT_empty,
SandboxExecutablePath: DEFAULT_sandboxExecutableName,
Component: DEFAULT_component,
DebugTraces: DEFAULT_debugTraces}
DebugTraces: DEFAULT_debugTraces,
JrdsPollingFrequency: DEFAULT_jrdsPollingFrequencyInSeconds}
}
var GetJrdsCertificatePath = func() string {
@ -158,12 +169,7 @@ var GetWorkerVersion = func() string {
var GetJrdsPollingFrequencyInSeconds = func() int64 {
config := getEnvironmentConfiguration()
freq, err := strconv.Atoi(config.JrdsPollingFrequency)
if err != nil {
return DEFAULT_jrdsPollingFrequencyInSeconds
}
return int64(freq)
return int64(config.JrdsPollingFrequency)
}
var GetComponent = func() string {

Просмотреть файл

@ -62,3 +62,22 @@ func TestLoadConfiguration_OverrideDefaultValues(t *testing.T) {
t.Fatal("unexpected configuration value")
}
}
func TestSetConfiguration(t *testing.T) {
clearConfiguration()
config := GetConfiguration()
config.Component = Component_worker
SetConfiguration(&config)
updatedConfig := GetConfiguration()
if updatedConfig.Component != Component_worker {
t.Fatal("unexpected configuration value")
}
updatedConfig.Component = Component_sandbox
SetConfiguration(&updatedConfig)
updatedConfig = GetConfiguration()
if updatedConfig.Component != Component_sandbox {
t.Fatal("unexpected configuration value")
}
}

Просмотреть файл

@ -49,13 +49,27 @@ func (jrds *JrdsClient) GetSandboxActions(sandboxAction *SandboxActions) error {
return nil
}
func (jrds *JrdsClient) GetJobActions(sandboxId string, jobData *JobActions) error {
func (jrds *JrdsClient) GetJobActions(sandboxId string, jobActions *JobActions) error {
url := fmt.Sprintf("%s/automationAccounts/%s/Sandboxes/%s/jobs/getJobActions?api-version=%s", jrds.baseUri, jrds.accountId, sandboxId, jrds.protocolVersion)
err := jrds.issueGetRequest(url, jobData)
err := jrds.issueGetRequest(url, jobActions)
if err != nil {
return err
}
actions := (*jobActions).Value
if len(actions) > 0 {
var arr []MessageMetadata
for _, jobaction := range actions {
arr = append(arr, *jobaction.MessageMetadata)
}
metadatas := MessageMetadatas{arr}
err = jrds.AcknowledgeJobAction(sandboxId, metadatas)
if err != nil {
fmt.Printf("error getting messageMetadata %v", err)
}
}
return nil
}
@ -133,8 +147,9 @@ func (jrds *JrdsClient) SetLog(eventId int, activityId string, logType int, args
return nil
}
func (jrds *JrdsClient) UnloadJob(subscriptionId string, sandboxId string, jobId string, isTest bool, startTime string, executionTimeInSeconds int) error {
payload := UnloadJob{JobId: &jobId, IsTest: &isTest, StartTime: &startTime, SubscriptionId: &subscriptionId, ExecutionTimeInSeconds: &executionTimeInSeconds}
func (jrds *JrdsClient) UnloadJob(subscriptionId string, sandboxId string, jobId string, isTest bool, startTime time.Time, executionTimeInSeconds int) error {
jobStartTime := startTime.Format(datetimeFormat)
payload := UnloadJob{JobId: &jobId, IsTest: &isTest, StartTime: &jobStartTime, SubscriptionId: &subscriptionId, ExecutionTimeInSeconds: &executionTimeInSeconds}
url := fmt.Sprintf("%s/automationAccounts/%s/Sandboxes/%s/jobs/%s/unload?api-version=%s", jrds.baseUri, jrds.accountId, sandboxId, jobId, jrds.protocolVersion)
err := jrds.issuePostRequest(url, payload, nil)
if err != nil {
@ -166,15 +181,15 @@ func (jrds *JrdsClient) issuePostRequest(url string, payload interface{}, out in
code, _, err := jrds.client.Post(url, headers, body)
if err != nil {
return NewRequestError(fmt.Sprintf("request error : %v \n", err))
return NewRequestError(fmt.Sprintf("request error %v : %v\n", url, code))
}
if code == 401 {
return NewRequestAuthorizationError(fmt.Sprintf("authorization error : %v\n", code))
return NewRequestAuthorizationError(fmt.Sprintf("authorization error %v : %v\n", url, code))
}
if code != 200 {
return NewRequestInvalidStatusError(fmt.Sprintf("invalid return code : %v\n", code))
return NewRequestInvalidStatusError(fmt.Sprintf("invalid return code for %v : %v\n", url, code))
}
if out != nil {
@ -190,15 +205,15 @@ func (jrds *JrdsClient) issueGetRequest(url string, out interface{}) error {
code, body, err := jrds.client.Get(url, jrds.getDefaultHeaders())
if err != nil {
return NewRequestError(fmt.Sprintf("request error : %v \n", err))
return NewRequestError(fmt.Sprintf("request error %v : %v\n", url, code))
}
if code == 401 {
return NewRequestAuthorizationError(fmt.Sprintf("authorization error : %v\n", code))
return NewRequestAuthorizationError(fmt.Sprintf("authorization error %v : %v\n", url, code))
}
if code != 200 {
return NewRequestInvalidStatusError(fmt.Sprintf("invalid return code : %v\n", code))
return NewRequestInvalidStatusError(fmt.Sprintf("invalid return code for %v : %v\n", url, code))
}
if out != nil {

Просмотреть файл

@ -28,6 +28,10 @@ type MessageMetadatas struct {
type JobData struct {
RunbookVersionId *string `json:"runbookVersionId"`
JobId *string `json:"jobId"`
SubscriptionId *string `json:"subscriptionId"`
PendingAction *int `json:"pendingAction"`
JobStatus *int `json:"jobStatus"`
}
type JobUpdatableData struct {
@ -57,13 +61,13 @@ type JobUpdatableData struct {
}
type RunbookData struct {
Name *string `json:"name"`
AccountId *string `json:"accountId"`
RunbookId *string `json:"runbookId"`
Definition *string `json:"definition"`
RunbookDefinitionKind *int `json:"runbookDefinitionKind"`
RunbookVersionId *int `json:"runbookVersionId"`
Parameters *bool `json:"parameters"`
Name *string `json:"name"`
AccountId *string `json:"accountId"`
RunbookId *string `json:"runbookId"`
Definition *string `json:"definition"`
RunbookDefinitionKind *int `json:"runbookDefinitionKind"`
RunbookVersionId *string `json:"runbookVersionId"`
Parameters *[]string `json:"parameters"`
}
type MessageMetadata struct {

Просмотреть файл

@ -17,20 +17,26 @@ import (
)
const (
logPrefix = "Log"
empty = ""
empty = ""
logPrefix = "Log"
debugTracePrefix = "[DebugTrace]"
traceDatetimeFormat = "2006-01-02T15:04:05.99"
cloudDebugLogType = 0
cloudHybridTraceEventId = 16000
keywordError = "Error"
keywordDebug = "Debug"
keywordStartup = "Startup"
keywordRoutine = "Routine"
keywordError = "Error"
keywordDebug = "Debug"
keywordStartup = "Startup"
keywordRoutine = "Routine"
keywordInformational = "Informational"
keywordJob = "Job"
tasknameTraceError = "TraceError"
tasknameTraceError = "TraceError"
trasknameSandboxStdout = "SandboxStdout"
trasknameSandboxStderr = "SandboxStderr"
)
var (
@ -72,15 +78,15 @@ func InitializeTracer(client jrdsTracer) {
var traceGenericHybridWorkerDebugEvent = func(eventId int, taskName string, message string, keyword string) {
trace := NewTrace(eventId, taskName, message, keyword)
go traceGenericHybridWorkerEventRoutine(trace, true)
go traceGenericHybridWorkerEventRoutine(trace, true, false)
}
var traceGenericHybridWorkerEvent = func(eventId int, taskName string, message string, keyword string) {
trace := NewTrace(eventId, taskName, message, keyword)
go traceGenericHybridWorkerEventRoutine(trace, false)
go traceGenericHybridWorkerEventRoutine(trace, false, false)
}
var traceGenericHybridWorkerEventRoutine = func(trace trace, debug bool) {
var traceGenericHybridWorkerEventRoutine = func(trace trace, debug bool, localonly bool) {
// do not log debug traces based on configuration
if !configuration.GetDebugTraces() && debug {
return
@ -89,6 +95,10 @@ var traceGenericHybridWorkerEventRoutine = func(trace trace, debug bool) {
// local stdout
traceLocally(trace)
if localonly {
return
}
// cloud stdout
err := formatAndIssueTrace(trace)
if err != nil {
@ -119,9 +129,19 @@ var traceErrorLocally = func(message string) {
}
var traceLocally = func(trace trace) {
const format = "%s (%v)[%s] : [%s] %v \n"
var now = time.Now().Format(traceDatetimeFormat)
fmt.Printf(format, now, trace.processId, trace.component, trace.taskName, trace.message)
traceOutput := ""
if configuration.GetComponent() == configuration.Component_worker &&
trace.component == configuration.Component_sandbox {
const format = "%v \n"
traceOutput = fmt.Sprintf(format, trace.message)
} else {
const format = "%s (%v)[%s] : [%s] %v \n"
var now = time.Now().Format(traceDatetimeFormat)
traceOutput = fmt.Sprintf(format, now, trace.processId, trace.component, trace.taskName, trace.message)
}
fmt.Print(traceOutput)
}
var formatAndIssueTrace = func(trace trace) error {
@ -184,6 +204,18 @@ var getTraceName = func() string {
return replacer.Replace(frame.Function)
}
func LogSandboxStdout(message string) {
trace := NewTrace(0, trasknameSandboxStdout, message, keywordInformational)
trace.component = configuration.Component_sandbox
go traceGenericHybridWorkerEventRoutine(trace, strings.Contains(message, debugTracePrefix), true)
}
func LogSandboxStderr(message string) {
trace := NewTrace(0, trasknameSandboxStderr, message, keywordInformational)
trace.component = configuration.Component_sandbox
go traceGenericHybridWorkerEventRoutine(trace, strings.Contains(message, debugTracePrefix), true)
}
func LogWorkerTraceError(message string) {
traceGenericHybridWorkerEvent(20000, getTraceName(), message, keywordStartup)
}
@ -192,8 +224,13 @@ func LogDebugTrace(message string) {
traceGenericHybridWorkerDebugEvent(20001, getTraceName(), message, keywordDebug)
}
func LogErrorTrace(error string) {
message := fmt.Sprintf("Error : %v", error)
traceGenericHybridWorkerDebugEvent(20001, getTraceName(), message, keywordDebug)
}
func LogWorkerStarting() {
message := "Worker Starting"
message := "Worker starting."
traceGenericHybridWorkerEvent(20020, getTraceName(), message, keywordStartup)
}
@ -211,3 +248,28 @@ func LogWorkerFailedToCreateSandbox(err error) {
message := fmt.Sprintf("Error creating sandbox. [error=%v]", err.Error())
traceGenericHybridWorkerEvent(20102, getTraceName(), message, keywordRoutine)
}
func LogWorkerSandboxProcessExited(sandboxId string, pid, exitCode int) {
message := fmt.Sprintf("Sandbox process exited. [sandboxId=%v][pId=%v][exitCode=%v]", sandboxId, pid, exitCode)
traceGenericHybridWorkerEvent(20102, getTraceName(), message, keywordRoutine)
}
func LogSandboxStarting(id string) {
message := fmt.Sprintf("Sandbox starting [sandboxId=%v]", id)
traceGenericHybridWorkerEvent(25000, getTraceName(), message, keywordStartup)
}
func LogSandboxGetJobActions(actions *jrds.JobActions) {
message := fmt.Sprintf("Get job actions. Found %v new action(s).", len(actions.Value))
traceGenericHybridWorkerEvent(25001, getTraceName(), message, keywordRoutine)
}
func LogSandboxJobLoaded(sandboxId, jobId string) {
message := fmt.Sprintf("Job loaded. [sandboxId=%v][jobId=%v]", sandboxId, jobId)
traceGenericHybridWorkerEvent(25010, getTraceName(), message, keywordJob)
}
func LogSandboxJobUnloaded(sandboxId, jobId string) {
message := fmt.Sprintf("Job unloaded. [sandboxId=%v][jobId=%v]", sandboxId, jobId)
traceGenericHybridWorkerEvent(25013, getTraceName(), message, keywordJob)
}

186
main/sandbox/job.go Normal file
Просмотреть файл

@ -0,0 +1,186 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
package main
import (
"fmt"
"github.com/Azure/azure-automation-go-worker/internal/configuration"
"github.com/Azure/azure-automation-go-worker/internal/jrds"
"github.com/Azure/azure-automation-go-worker/internal/tracer"
"github.com/Azure/azure-automation-go-worker/main/sandbox/runtime"
"os"
"path/filepath"
"time"
)
const (
enum_statusActivating = 1
enum_statusRunning = 2
enum_statusCompleted = 3
enum_statusFailed = 4
enum_statusStopped = 5
)
type Job struct {
Id string
// runtime
StartTime time.Time
Completed bool
// channels
PendingActions chan int
Exceptions chan string
jobData jrds.JobData
jobUpdatableData jrds.JobUpdatableData
runbookData jrds.RunbookData
sandboxId string
workingDirectory string
jrdsClient jrdsClient
}
func NewJob(sandboxId string, jobData jrds.JobData, jrdsClient jrdsClient) Job {
workingDirectory := filepath.Join(configuration.GetWorkingDirectory(), *jobData.JobId)
err := os.MkdirAll(workingDirectory, 0750)
panicOnError("Unable to create job working directory", err)
return Job{
Id: *jobData.JobId,
jobData: jobData,
sandboxId: sandboxId,
workingDirectory: workingDirectory,
jrdsClient: jrdsClient,
StartTime: time.Now(),
Completed: false,
PendingActions: make(chan int),
Exceptions: make(chan string)}
}
func (job *Job) Run() {
err := loadJob(job)
panicOnError(fmt.Sprintf("error loading job %v", err), err)
err = initializeRuntime(job)
panicOnError(fmt.Sprintf("error initializing runtime %v", err), err)
err = executeRunbook(job)
panicOnError(fmt.Sprintf("error executing runbook %v", err), err)
}
var loadJob = func(job *Job) error {
err := job.jrdsClient.SetJobStatus(job.sandboxId, job.Id, enum_statusActivating, false, nil)
if err != nil {
return err
}
jobUpdatableData := jrds.JobUpdatableData{}
err = job.jrdsClient.GetUpdatableJobData(job.Id, &jobUpdatableData)
if err != nil {
return err
}
runbookData := jrds.RunbookData{}
err = job.jrdsClient.GetRunbookData(*job.jobData.RunbookVersionId, &runbookData)
if err != nil {
return err
}
job.jobUpdatableData = jobUpdatableData
job.runbookData = runbookData
tracer.LogSandboxJobLoaded(job.sandboxId, job.Id)
return nil
}
var initializeRuntime = func(job *Job) error {
// create runbook
runbook, err := runtime.NewRunbook(
*job.runbookData.Name,
*job.runbookData.RunbookVersionId,
runtime.DefinitionKind(*job.runbookData.RunbookDefinitionKind),
*job.runbookData.Definition)
if err != nil {
return err
}
// create language; failed the job if the language isn't supported by the worker
language, err := runtime.GetLanguage(runbook.Kind)
if err != nil {
return err
}
// create runtime
runtime := runtime.NewRuntime(language, runbook, job.jobData, job.workingDirectory)
err = runtime.Initialize()
if err != nil {
return err
}
// test if is the runtime supported by the os
supp := runtime.IsSupported()
if !supp {
tracer.LogErrorTrace("Runbook definition kind not supported")
}
runtime.StartRunbook()
return nil // TODO
}
var executeRunbook = func(job *Job) error {
err := job.jrdsClient.SetJobStatus(job.sandboxId, job.Id, enum_statusRunning, false, nil)
panicOnError(fmt.Sprintf("error setting job status %v", err), err)
// temporary
// running job
time.Sleep(time.Second * 10)
// temporary
// check pending action while job is running
if action, found := getPendingActions(job); found {
if action == 5 {
err = job.jrdsClient.SetJobStatus(job.sandboxId, job.Id, enum_statusStopped, true, nil)
panicOnError(fmt.Sprintf("error stopping job %v", err), err)
return nil
}
}
err = job.jrdsClient.SetJobStatus(job.sandboxId, job.Id, enum_statusCompleted, true, nil)
panicOnError(fmt.Sprintf("error setting job status %v", err), err)
err = unloadJob(job)
panicOnError(fmt.Sprintf("error unloading job %v", err), err)
job.Completed = true
return nil
}
var unloadJob = func(job *Job) error {
executionTimeInSeconds := int((time.Now().Sub(job.StartTime)).Seconds())
err := job.jrdsClient.UnloadJob(*job.jobData.SubscriptionId, job.sandboxId, job.Id, false, job.StartTime, executionTimeInSeconds)
if err != nil {
return err
}
tracer.LogSandboxJobUnloaded(job.sandboxId, job.Id)
return nil
}
var panicOnError = func(message string, err error) {
if err != nil {
panic(err)
}
}
var getPendingActions = func(job *Job) (pendingAction int, found bool) {
select {
case action := <-job.PendingActions:
return action, true
default:
}
return -1, false
}

Просмотреть файл

@ -0,0 +1,57 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
package runtime
import (
"github.com/Azure/azure-automation-go-worker/pkg/executil"
"runtime"
)
type Interpreter struct {
language string
commandName string
arguments []string
}
var getPowerShellInterpreter = func() Interpreter {
commandName := "pwsh"
if runtime.GOOS == "windows" {
commandName = "powershell"
}
return Interpreter{
language: "PowerShell",
commandName: commandName,
arguments: []string{"-File"}}
}
var getPython2Interpreter = func() Interpreter {
return Interpreter{
language: "Python2",
commandName: "python",
arguments: []string{}}
}
var getPython3Interpreter = func() Interpreter {
return Interpreter{
language: "Python3",
commandName: "python3",
arguments: []string{}}
}
var getBashInterpreter = func() Interpreter {
return Interpreter{
language: "Bash",
commandName: "bash",
arguments: []string{}}
}
func (i *Interpreter) isSupported() bool {
handler := executil.GetCommandHandler()
command := executil.NewCommand(i.commandName)
handler.Execute(&command)
return command.IsSuccessful
}

Просмотреть файл

@ -0,0 +1,49 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
package runtime
import "fmt"
const (
pythonExtension Extension = "py"
powershellExtension Extension = "ps1"
bashExtension Extension = "sh"
)
type Extension string
type Name string
type Language struct {
extension Extension
interpreter Interpreter
}
func (l *Language) GetExtension() Extension {
return l.extension
}
func (l *Language) GetInterpreter() Interpreter {
return l.interpreter
}
var GetLanguage = func(definitionKind DefinitionKind) (Language, error) {
var language Language
switch definitionKind {
case PowerShell:
language = Language{extension: powershellExtension, interpreter: getPowerShellInterpreter()}
break
case Python2:
language = Language{extension: pythonExtension, interpreter: getPython2Interpreter()}
break
case Python3:
language = Language{extension: pythonExtension, interpreter: getPython3Interpreter()}
break
case Bash:
language = Language{extension: bashExtension, interpreter: getBashInterpreter()}
break
default:
return Language{}, fmt.Errorf("unsupported language")
}
return language, nil
}

Просмотреть файл

@ -0,0 +1,40 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
package runtime
import (
"fmt"
)
const (
PowerShell DefinitionKind = 5
Python2 DefinitionKind = 9
Python3 DefinitionKind = 10
Bash DefinitionKind = 11
)
type Runbook struct {
Name string
Kind DefinitionKind
Definition string
FileName string
}
type DefinitionKind int
var NewRunbook = func(Name string, versionId string, kind DefinitionKind, definition string) (Runbook, error) {
language, err := GetLanguage(kind)
if err != nil {
return Runbook{}, err
}
runbook := Runbook{
Name: Name,
Kind: kind,
Definition: definition,
FileName: fmt.Sprintf("%v-%v.%v", Name, versionId, language.extension)}
return runbook, nil
}

Просмотреть файл

@ -0,0 +1,80 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
package runtime
import (
"github.com/Azure/azure-automation-go-worker/internal/jrds"
"github.com/Azure/azure-automation-go-worker/internal/tracer"
"github.com/Azure/azure-automation-go-worker/pkg/executil"
"os"
"path/filepath"
"time"
)
type Runtime struct {
runbook Runbook
language Language
jobData jrds.JobData
workingDirectory string
}
func NewRuntime(language Language, runbook Runbook, jobData jrds.JobData, workingDirectory string) Runtime {
return Runtime{
runbook: runbook,
language: language,
jobData: jobData,
workingDirectory: workingDirectory}
}
func (runtime *Runtime) Initialize() error {
runbookPath := getRunbookPathOnDisk(runtime.workingDirectory, runtime.runbook)
err := writeRunbookToDisk(runbookPath, runtime.runbook)
if err != nil {
return err
}
return nil
}
func (runtime *Runtime) IsSupported() bool {
return runtime.language.interpreter.isSupported()
}
func (runtime *Runtime) StartRunbook() {
arguments := append(runtime.language.interpreter.arguments, getRunbookPathOnDisk(runtime.workingDirectory, runtime.runbook))
handler := executil.GetAsyncCommandHandler()
cmd := executil.NewAsyncCommand(
tracer.LogDebugTrace,
tracer.LogDebugTrace,
runtime.workingDirectory,
nil,
runtime.language.interpreter.commandName,
arguments...)
handler.ExecuteAsync(&cmd)
for cmd.IsRunning {
time.Sleep(10 * time.Millisecond)
}
}
var getRunbookPathOnDisk = func(workingDirectory string, runbook Runbook) string {
return filepath.Join(workingDirectory, runbook.FileName)
}
var writeRunbookToDisk = func(path string, runbook Runbook) error {
const permission = 0640
file, err := os.OpenFile(path, os.O_RDONLY|os.O_CREATE, permission)
if err != nil {
return err
}
_, err = file.Write([]byte(runbook.Definition))
if err != nil {
return err
}
file.Close()
return nil
}

Просмотреть файл

@ -3,8 +3,120 @@
package main
import "fmt"
import (
"fmt"
"github.com/Azure/azure-automation-go-worker/internal/configuration"
"github.com/Azure/azure-automation-go-worker/internal/jrds"
"github.com/Azure/azure-automation-go-worker/internal/tracer"
"os"
"time"
)
type Sandbox struct {
id string
isAlive bool
jrdsClient jrdsClient
jrdsPollingFrequency time.Duration
jobs map[string]*Job
}
func NewSandbox(sandboxId string, jrdsClient jrdsClient) Sandbox {
return Sandbox{id: sandboxId,
isAlive: true,
jrdsClient: jrdsClient,
jrdsPollingFrequency: time.Duration(int64(time.Second) * configuration.GetJrdsPollingFrequencyInSeconds()),
jobs: make(map[string]*Job, 1)}
}
type jrdsClient interface {
GetJobActions(sandboxId string, jobData *jrds.JobActions) error
GetJobData(jobId string, jobData *jrds.JobData) error
GetUpdatableJobData(jobId string, jobData *jrds.JobUpdatableData) error
GetRunbookData(runbookVersionId string, runbookData *jrds.RunbookData) error
AcknowledgeJobAction(sandboxId string, messageMetadata jrds.MessageMetadatas) error
SetJobStatus(sandboxId string, jobId string, status int, isTermial bool, exception *string) error
SetJobStream(jobId string, runbookVersionId string, text string, streamType string, sequence int) error
SetLog(eventId int, activityId string, logType int, args ...string) error
UnloadJob(subscriptionId string, sandboxId string, jobId string, isTest bool, startTime time.Time, executionTimeInSeconds int) error
}
func (sandbox *Sandbox) Start() {
for sandbox.isAlive {
routine(sandbox)
time.Sleep(sandbox.jrdsPollingFrequency)
}
}
var routine = func(sandbox *Sandbox) {
jobActions := jrds.JobActions{}
err := sandbox.jrdsClient.GetJobActions(sandbox.id, &jobActions)
if err != nil {
sandbox.isAlive = false
tracer.LogErrorTrace(err.Error())
}
tracer.LogDebugTrace(fmt.Sprintf("Get job action. Found %v new action(s).", len(jobActions.Value)))
for _, action := range jobActions.Value {
tracer.LogSandboxGetJobActions(&jobActions)
jobData := jrds.JobData{}
err := sandbox.jrdsClient.GetJobData(*action.JobId, &jobData)
if err != nil {
fmt.Printf("error getting jobData %v", err)
}
if (jobData.PendingAction != nil && *jobData.PendingAction == 1) ||
(jobData.PendingAction == nil && *jobData.JobStatus == 1) ||
(jobData.PendingAction == nil && *jobData.JobStatus == 2) {
// new job
job := NewJob(sandbox.id, jobData, sandbox.jrdsClient)
sandbox.jobs[job.Id] = &job
go job.Run()
} else if jobData.PendingAction != nil && *jobData.PendingAction == 5 {
// stop pending action
if job, ok := sandbox.jobs[*jobData.JobId]; ok {
job.PendingActions <- *jobData.PendingAction
}
} else if jobData.PendingAction == nil {
// no pending action
tracer.LogDebugTrace("no pending action")
} else {
//unsupported pending action
tracer.LogDebugTrace("unsupported pending action")
}
}
stopTrackingCompletedJobs(sandbox)
}
var stopTrackingCompletedJobs = func(sandbox *Sandbox) {
completedJob := make([]string, 1)
for jobId, job := range sandbox.jobs {
if job.Completed {
completedJob = append(completedJob, jobId)
}
}
// stop tracking jobs
for _, jobId := range completedJob {
delete(sandbox.jobs, jobId)
}
}
func main() {
fmt.Println("Sandbox starting")
if len(os.Args) < 2 {
panic("missing sandbox id parameter")
}
sandboxId := os.Args[1]
httpClient := jrds.NewSecureHttpClient(configuration.GetJrdsCertificatePath(), configuration.GetJrdsKeyPath())
jrdsClient := jrds.NewJrdsClient(&httpClient, configuration.GetJrdsBaseUri(), configuration.GetAccountId(), configuration.GetHybridWorkerGroupName())
tracer.InitializeTracer(&jrdsClient)
tracer.LogSandboxStarting(sandboxId)
sandbox := NewSandbox(sandboxId, &jrdsClient)
sandbox.Start()
}

Просмотреть файл

@ -0,0 +1,100 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
package main
import (
"github.com/Azure/azure-automation-go-worker/internal/jrds"
"testing"
"time"
)
var (
sandboxId = "ccb7bc90-20e9-4e5c-bbf0-f265c1de7000"
jobId = "ccb7bc90-20e9-4e5c-bbf0-f265c1de7111"
subscriptionId = "ccb7bc90-20e9-4e5c-bbf0-f265c1de7222"
)
type jrdsMock struct {
unloadJob_f func(subscriptionId string, sandboxId string, jobId string, isTest bool, startTime time.Time, executionTimeInSeconds int) error
}
func (jrds *jrdsMock) GetJobActions(sandboxId string, jobData *jrds.JobActions) error {
panic("implement me")
}
func (jrds *jrdsMock) GetJobData(jobId string, jobData *jrds.JobData) error {
panic("implement me")
}
func (jrds *jrdsMock) GetUpdatableJobData(jobId string, jobData *jrds.JobUpdatableData) error {
panic("implement me")
}
func (jrds *jrdsMock) GetRunbookData(runbookVersionId string, runbookData *jrds.RunbookData) error {
panic("implement me")
}
func (jrds *jrdsMock) AcknowledgeJobAction(sandboxId string, messageMetadata jrds.MessageMetadatas) error {
panic("implement me")
}
func (jrds *jrdsMock) SetJobStatus(sandboxId string, jobId string, status int, isTermial bool, exception *string) error {
panic("implement me")
}
func (jrds *jrdsMock) SetJobStream(jobId string, runbookVersionId string, text string, streamType string, sequence int) error {
panic("implement me")
}
func (jrds *jrdsMock) SetLog(eventId int, activityId string, logType int, args ...string) error {
panic("implement me")
}
func (jrds *jrdsMock) UnloadJob(subscriptionId string, sandboxId string, jobId string, isTest bool, startTime time.Time, executionTimeInSeconds int) error {
return jrds.unloadJob_f(subscriptionId, sandboxId, jobId, isTest, startTime, executionTimeInSeconds)
}
func Test_CleanCompletedJobs_DoesNotCleanRunningJobs(t *testing.T) {
// create sandbox
sbx := NewSandbox(sandboxId, nil)
//create job
job := NewJob(sandboxId, jrds.JobData{JobId: &jobId}, nil)
sbx.jobs[jobId] = &job
stopTrackingCompletedJobs(&sbx)
if sbx.jobs[jobId] != &job {
t.Fatal("unexpected error : job is not tracked by sandbox")
}
}
func Test_CleanCompletedJobs_CleansCompletedJobs(t *testing.T) {
// create jrdsmock
unloadCalled := false
jrdsMock := jrdsMock{}
jrdsMock.unloadJob_f = func(subscriptionId string, sandboxId string, jobId string, isTest bool, startTime time.Time, executionTimeInSeconds int) error {
unloadCalled = true
return nil
}
// create job
job := NewJob(sandboxId, jrds.JobData{JobId: &jobId, SubscriptionId: &subscriptionId}, &jrdsMock)
job.Completed = true
job.StartTime = time.Now()
// create sandbox
sbx := NewSandbox(sandboxId, &jrdsMock)
sbx.jobs[jobId] = &job
stopTrackingCompletedJobs(&sbx)
j := sbx.jobs[jobId]
if j != nil {
t.Fatal("unexpected error : job is still tracked by sandbox")
}
if !unloadCalled {
t.Fatal("unexpected error : job was not unloaded")
}
}

Просмотреть файл

@ -3,35 +3,60 @@ package sandbox
import (
"fmt"
"github.com/Azure/azure-automation-go-worker/internal/configuration"
"github.com/Azure/azure-automation-go-worker/internal/tracer"
"github.com/Azure/azure-automation-go-worker/pkg/executil"
"os"
"path/filepath"
"strings"
)
const (
sandboxWorkingDirectoryName = "sandboxes"
)
type Sandbox struct {
Id string
workingDirectory string
workingDirectoryPermission os.FileMode
Id string
workingDirectory string
command *executil.Command
isAlive bool
isRunning *bool
faulted *bool
commandHandler executil.CommandHandler
command executil.AsyncCommand
commandHandler executil.AsyncCommandHandler
}
var NewSandbox = func(sandboxId string) Sandbox {
const permission = 644
return Sandbox{Id: sandboxId,
command: nil,
workingDirectory: filepath.Join(configuration.GetWorkingDirectory(), sandboxId),
workingDirectoryPermission: permission,
commandHandler: executil.GetCommandHandler(),
isAlive: false,
isRunning := false
faulted := false
return Sandbox{
Id: sandboxId,
workingDirectory: filepath.Join(configuration.GetWorkingDirectory(), sandboxWorkingDirectoryName, sandboxId),
isRunning: &isRunning,
faulted: &faulted,
commandHandler: executil.GetAsyncCommandHandler(),
}
}
func (s *Sandbox) CreateBaseDirectory() error {
err := os.MkdirAll(s.workingDirectory, s.workingDirectoryPermission) // TODO: change sb permission
const permission = 0750
err := os.MkdirAll(s.workingDirectory, permission) // TODO: change sb permission
if err != nil {
return err
}
return nil
}
func (sandbox *Sandbox) Start() error {
// start sandbox
command, err := getSandboxCommand(tracer.LogSandboxStdout, tracer.LogSandboxStderr, sandbox.Id, sandbox.workingDirectory)
if err != nil {
return nil
}
sandbox.isRunning = &command.IsRunning
sandbox.faulted = &command.IsSuccessful
err = sandbox.commandHandler.ExecuteAsync(command)
if err != nil {
return err
}
@ -40,10 +65,17 @@ func (s *Sandbox) CreateBaseDirectory() error {
}
func (s *Sandbox) Cleanup() error {
// TODO: do not cleanup if sandbox crashed
if s.command == nil {
return fmt.Errorf("sandbox not started")
if *s.isRunning {
return fmt.Errorf("sandbox is running")
}
tracer.LogWorkerSandboxProcessExited(s.Id, 0, 0)
// do not clean if sandbox faulted
if *s.faulted {
return nil
}
err := os.RemoveAll(s.workingDirectory)
if err != nil {
return err
@ -52,34 +84,32 @@ func (s *Sandbox) Cleanup() error {
return nil
}
func (s *Sandbox) Start() {
s.command = getSandboxCommand(s.Id, s.workingDirectory) // TODO: start sandbox command; this is a blocking call will need to become async
s.isAlive = true
s.commandHandler.Execute(s.command)
s.isAlive = false
}
func (s *Sandbox) GetOutput() (string, error) {
if s.command == nil {
return "", fmt.Errorf("sandbox process not started")
}
return s.command.Stdout.String(), nil // TODO: this will need to be refactored so we can get output async
}
func (s *Sandbox) GetErrorOutput() (string, error) {
if s.command == nil {
return "", fmt.Errorf("sandbox process not started")
}
return s.command.Stderr.String(), nil // TODO: this will need to be refactored so we can get output async
}
func (s *Sandbox) IsAlive() bool {
return s.isAlive
return *s.isRunning
}
var getSandboxCommand = func(sandboxId string, workingDirectory string) *executil.Command {
cmd := executil.NewCommand(configuration.GetSandboxExecutablePath(), sandboxId)
return &cmd
var getSandboxCommand = func(stdout func(str string), stderr func(str string), sandboxId string, workingDirectory string) (*executil.AsyncCommand, error) {
environ, err := getSandboxProcessEnvrion(workingDirectory, os.Environ(), configuration.GetConfiguration())
if err != nil {
return nil, err
}
cmd := executil.NewAsyncCommand(stdout, stderr, workingDirectory, environ, configuration.GetSandboxExecutablePath(), sandboxId)
return &cmd, nil
}
var getSandboxProcessEnvrion = func(workingDirectory string, environ []string, config configuration.Configuration) ([]string, error) {
config.WorkerWorkingDirectory = workingDirectory
config.Component = configuration.Component_sandbox
serialized, err := configuration.SerializeConfiguration(&config)
if err != nil {
return []string{}, err
}
for i, v := range environ {
if strings.Contains(v, configuration.EnvironmentConfigurationKey) {
environ[i] = fmt.Sprintf("%v=%v", configuration.EnvironmentConfigurationKey, string(serialized))
break
}
}
return environ, nil
}

Просмотреть файл

@ -75,28 +75,20 @@ var createAndStartSandbox = func(sandbox *sandbox.Sandbox) error {
return err
}
sandbox.Start()
err = sandbox.Start()
if err != nil {
return err
}
go monitorSandbox(sandbox)
return nil
}
var monitorSandbox = func(sandbox *sandbox.Sandbox) {
for sandbox.IsAlive() {
time.Sleep(time.Millisecond * 300) // TODO: temporary until async output is implemented
time.Sleep(time.Millisecond * 100) // TODO: temporary until async output is implemented
}
stdout, err := sandbox.GetOutput()
if err != nil {
fmt.Println("error getting sandbox sdout")
}
stderr, err := sandbox.GetErrorOutput()
if err != nil {
fmt.Println("error getting sandbox stderr")
}
fmt.Printf("%v : stdout [%v]\n", sandbox.Id, stdout)
fmt.Printf("%v : stderr [%v]\n", sandbox.Id, stderr)
fmt.Printf("Done monitoring sandbox %v \n", sandbox.Id)
sandbox.Cleanup()
}

Просмотреть файл

@ -13,8 +13,7 @@
"bypass_certificate_verification" : "",
"enforce_runbook_signature_validation" : "",
"gpg_public_keyring_path" : "",
"state_directory_path" : "",
"jrds_polling_frequency" : "",
"jrds_polling_frequency" : 10,
"proxy_configuration_path" : "",
"vm_id" : "",

Просмотреть файл

@ -0,0 +1,42 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
package executil
import (
"io"
"os/exec"
)
type AsyncCommand struct {
Name string
Arguments []string
ExitCode int
IsRunning bool
IsSuccessful bool
CommandError error
cmd *exec.Cmd
workingDirectory string
environment []string
stdout_f func(str string)
stderr_f func(str string)
stdoutPipe io.Reader
stderrPipe io.Reader
}
func NewAsyncCommand(stdout func(str string), stderr func(str string), workingDirectory string, environment []string, name string, arguments ...string) AsyncCommand {
command := AsyncCommand{Name: name,
Arguments: arguments,
stdout_f: stdout,
stderr_f: stderr,
workingDirectory: workingDirectory,
environment: environment,
IsSuccessful: false,
IsRunning: false}
return command
}

Просмотреть файл

@ -0,0 +1,85 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
package executil
import (
"bufio"
"io"
"os/exec"
"syscall"
)
type AsyncHandler interface {
ExecuteAsync(command *AsyncCommand)
}
type AsyncCommandHandler struct {
}
func (h AsyncCommandHandler) ExecuteAsync(cmd *AsyncCommand) error {
return executeAsyncCommand(cmd)
}
func GetAsyncCommandHandler() AsyncCommandHandler {
return AsyncCommandHandler{}
}
func executeAsyncCommand(command *AsyncCommand) error {
cmd := exec.Command(command.Name, command.Arguments...)
cmd.Env = command.environment
cmd.Dir = command.workingDirectory
command.stdoutPipe, _ = cmd.StdoutPipe()
command.stderrPipe, _ = cmd.StderrPipe()
command.cmd = cmd
err := command.cmd.Start()
if err != nil {
return err
}
command.IsRunning = true
go startAndMonitorCommand(command)
return nil
}
func startAndMonitorCommand(command *AsyncCommand) {
// scan stdout
if command.stdoutPipe != nil && command.stdout_f != nil {
NewScanner(command.stdout_f, command.stdoutPipe)
}
// scan stderr
if command.stderrPipe != nil && command.stderr_f != nil {
NewScanner(command.stderr_f, command.stderrPipe)
}
// wait for command to complete
err := command.cmd.Wait()
command.IsRunning = false
// set command error and exit code
exitError, _ := err.(*exec.ExitError)
if err != nil && exitError == nil {
command.CommandError = err
return
}
if exitError != nil {
waitStatus := exitError.Sys().(syscall.WaitStatus)
command.ExitCode = waitStatus.ExitStatus()
} else {
command.ExitCode = 0
}
command.IsSuccessful = true
}
func NewScanner(print func(str string), reader io.Reader) *bufio.Scanner {
scanner := bufio.NewScanner(reader)
scanner.Split(bufio.ScanLines)
for scanner.Scan() {
print(scanner.Text())
}
return scanner
}