diff --git a/common/data.go b/common/data.go index a8257d0..762c7fa 100644 --- a/common/data.go +++ b/common/data.go @@ -17,4 +17,6 @@ const ( KeyAgentVersion = "a01.reserved.agentver" KeyRunID = "a01.reserved.runid" KeyJobName = "a01.reserved.jobname" + KeyTaskLogPath = "a01.reserved.tasklogpath" + KeyTaskRecordPath = "a01.reserved.taskrecordpath" ) diff --git a/common/names.go b/common/names.go index cdae8c0..16ea2af 100644 --- a/common/names.go +++ b/common/names.go @@ -36,6 +36,11 @@ const ( ConfigKeySecretTaskBroker = "secret.taskbroker" ) +// Defines well-known keys in a product specific secret +const ( + ProductSecretKeyLogPathTemplate = "log.path.template" +) + // GetCurrentNamespace returns the namespace this Pod belongs to. If it fails // to resolve the name, it uses the fallback name. func GetCurrentNamespace(fallback string) string { diff --git a/common/paths.go b/common/paths.go index 90b5895..6ed04a1 100644 --- a/common/paths.go +++ b/common/paths.go @@ -14,3 +14,10 @@ const ( const ( PathKubeNamespace = "/var/run/secrets/kubernetes.io/serviceaccount/namespace" ) + +// Defines the path template for logs +const ( + // PathTemplateTaskLog defines the relative path of a task's log file in a file share. + // It is /task_.log + PathTemplateTaskLog = "%d/task_%d.log" +) diff --git a/droid/main.go b/droid/main.go index 0b299e6..c5f4841 100644 --- a/droid/main.go +++ b/droid/main.go @@ -3,40 +3,31 @@ package main import ( "encoding/json" "fmt" - "io/ioutil" "log" - "net/http" "os" "os/exec" "path" + "strconv" "strings" + "github.com/Azure/adx-automation-agent/kubeutils" + "github.com/Azure/adx-automation-agent/common" "github.com/Azure/adx-automation-agent/models" "github.com/Azure/adx-automation-agent/schedule" ) var ( - taskBroker = schedule.CreateInClusterTaskBroker() - jobName = os.Getenv(common.EnvJobName) - podName = os.Getenv(common.EnvPodName) - runID = strings.Split(jobName, "-")[1] // the job name MUST follows the -- - version = "Unknown" - sourceCommit = "Unknown" + taskBroker = schedule.CreateInClusterTaskBroker() + jobName = os.Getenv(common.EnvJobName) + podName = os.Getenv(common.EnvPodName) + runID = strings.Split(jobName, "-")[1] // the job name MUST follows the -- + productName = strings.Split(jobName, "-")[0] // the job name MUST follows the -- + logPathTemplate = "" + version = "Unknown" + sourceCommit = "Unknown" ) -func ckEndpoint() { - url := fmt.Sprintf("http://%s/api/healthy", common.DNSNameTaskStore) - resp, err := http.Get(url) - if err != nil { - log.Fatalf("Fail to get response from %s. Error %s.\n", url, err) - } - - if resp.StatusCode != http.StatusOK { - log.Fatalf("The endpoint is not healthy. Status code: %d.\n", resp.StatusCode) - } -} - func ckEnvironment() { required := []string{common.EnvKeyInternalCommunicationKey, common.EnvJobName} @@ -62,26 +53,6 @@ func preparePod() { log.Printf("Preparing Pod: \n%s\n", string(output)) } -func saveTaskLog(taskID int, output []byte) error { - stat, err := os.Stat(common.PathMountArtifacts) - if err == nil && stat.IsDir() { - runLogFolder := path.Join(common.PathMountArtifacts, runID) - os.Mkdir(runLogFolder, os.ModeDir) - - taskLogFile := path.Join(runLogFolder, fmt.Sprintf("task_%d.log", taskID)) - err = ioutil.WriteFile(taskLogFile, output, 0644) - if err != nil { - return fmt.Errorf("Fail to save task log. Reason: unable to write file. Exception: %s", err) - } - return nil - } - - // the mount directory doesn't exist, output the log to stdout and let the pod logs handle it. - log.Println("Storage volume is not mount for logging. Print the task output to the stdout instead.") - log.Println("\n" + string(output)) - return nil -} - func afterTask(taskResult *models.TaskResult) error { _, err := os.Stat(common.PathScriptAfterTest) if err != nil && os.IsNotExist(err) { @@ -115,11 +86,16 @@ func main() { common.LogInfo(fmt.Sprintf("Run ID: %s", runID)) ckEnvironment() - ckEndpoint() queue, ch, err := taskBroker.QueueDeclare(jobName) common.ExitOnError(err, "Failed to connect to the task broker.") + if bLogPathTempalte, exists := kubeutils.TryGetSecretInBytes( + productName, + common.ProductSecretKeyLogPathTemplate); exists { + logPathTemplate = string(bLogPathTempalte) + } + preparePod() for { @@ -152,15 +128,34 @@ func main() { if err != nil { common.LogError(fmt.Sprintf("Failed to commit a new task: %s.", err.Error())) } else { - err = saveTaskLog(taskResult.ID, output) + taskLogPath, err := taskResult.SaveTaskLog(output) if err != nil { - common.LogError(fmt.Sprintf("Failed to save task log a new task: %s.", err.Error())) + common.LogError(err.Error()) } err = afterTask(taskResult) if err != nil { common.LogError(fmt.Sprintf("Failed in after task: %s.", err.Error())) } + + if len(logPathTemplate) > 0 { + taskResult.ResultDetails[common.KeyTaskLogPath] = strings.Replace( + logPathTemplate, + "{}", + taskLogPath, + 1) + + taskResult.ResultDetails[common.KeyTaskRecordPath] = strings.Replace( + logPathTemplate, + "{}", + path.Join(strconv.Itoa(taskResult.RunID), fmt.Sprintf("recording_%d.yaml", taskResult.ID)), + 1) + + _, err := taskResult.CommitChanges() + if err != nil { + common.LogError(err.Error()) + } + } } err = delivery.Ack(false) diff --git a/models/taskresult.go b/models/taskresult.go index 3029188..27b6aaf 100644 --- a/models/taskresult.go +++ b/models/taskresult.go @@ -4,8 +4,13 @@ import ( "encoding/json" "fmt" "io/ioutil" + "log" "net/http" + "os" + "path" + "strconv" + "github.com/Azure/adx-automation-agent/common" "github.com/Azure/adx-automation-agent/httputils" ) @@ -59,3 +64,64 @@ func (task *TaskResult) CommitNew() (*TaskResult, error) { return &result, nil } + +// CommitChanges save a commited task's updated properties to the database +func (task *TaskResult) CommitChanges() (*TaskResult, error) { + body, err := json.Marshal(task) + if err != nil { + return nil, fmt.Errorf("unable to marshal JSON: %s", err.Error()) + } + + path := fmt.Sprintf("task/%d", task.ID) + req, err := httputils.CreateRequest(http.MethodPost, path, body) + if err != nil { + return nil, fmt.Errorf("unable to create request: %s", err.Error()) + } + + httpClient := http.Client{CheckRedirect: nil} + resp, err := httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("http error: %s", err.Error()) + } + + defer resp.Body.Close() + respContent, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("unable to read response body: %s", err.Error()) + } + + if resp.StatusCode >= 300 { + return nil, fmt.Errorf("HTTP Status %d: %s", resp.StatusCode, string(respContent)) + } + + var result TaskResult + err = json.Unmarshal(respContent, &result) + if err != nil { + return nil, fmt.Errorf("unable to unmarshal JSON: %s", err.Error()) + } + + return &result, nil +} + +// SaveTaskLog the task execution log to the mounted artifacts folder +func (task *TaskResult) SaveTaskLog(output []byte) (string, error) { + stat, err := os.Stat(common.PathMountArtifacts) + if err == nil && stat.IsDir() { + runLogFolder := path.Join(common.PathMountArtifacts, strconv.Itoa(task.RunID)) + os.Mkdir(runLogFolder, os.ModeDir) + + taskLogFileName := fmt.Sprintf("task_%d.log", task.ID) + taskLogFilePath := path.Join(runLogFolder, taskLogFileName) + err = ioutil.WriteFile(taskLogFilePath, output, 0644) + if err != nil { + return "", fmt.Errorf("Fail to save task log. Reason: unable to write file. Exception: %s", err.Error()) + } + + return path.Join(strconv.Itoa(task.RunID), taskLogFileName), nil + } + + // the mount directory doesn't exist, output the log to stdout and let the pod logs handle it. + log.Println("Storage volume is not mount for logging. Print the task output to the stdout instead.") + log.Println("\n" + string(output)) + return "", nil +}