Save log and recording path in task details

This commit is contained in:
Troy Dai 2018-03-19 16:42:10 -07:00
Родитель 15d7da0809
Коммит 1ca14c1bc4
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 9608535492BEDAC8
5 изменённых файлов: 118 добавлений и 43 удалений

Просмотреть файл

@ -17,4 +17,6 @@ const (
KeyAgentVersion = "a01.reserved.agentver"
KeyRunID = "a01.reserved.runid"
KeyJobName = "a01.reserved.jobname"
KeyTaskLogPath = "a01.reserved.tasklogpath"
KeyTaskRecordPath = "a01.reserved.taskrecordpath"
)

Просмотреть файл

@ -36,6 +36,11 @@ const (
ConfigKeySecretTaskBroker = "secret.taskbroker"
)
// Defines well-known keys in a product specific secret
const (
ProductSecretKeyLogPathTemplate = "log.path.template"
)
// GetCurrentNamespace returns the namespace this Pod belongs to. If it fails
// to resolve the name, it uses the fallback name.
func GetCurrentNamespace(fallback string) string {

Просмотреть файл

@ -14,3 +14,10 @@ const (
const (
PathKubeNamespace = "/var/run/secrets/kubernetes.io/serviceaccount/namespace"
)
// Defines the path template for logs
const (
// PathTemplateTaskLog defines the relative path of a task's log file in a file share.
// It is <run_id>/task_<task_id>.log
PathTemplateTaskLog = "%d/task_%d.log"
)

Просмотреть файл

@ -3,40 +3,31 @@ package main
import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"os/exec"
"path"
"strconv"
"strings"
"github.com/Azure/adx-automation-agent/kubeutils"
"github.com/Azure/adx-automation-agent/common"
"github.com/Azure/adx-automation-agent/models"
"github.com/Azure/adx-automation-agent/schedule"
)
var (
taskBroker = schedule.CreateInClusterTaskBroker()
jobName = os.Getenv(common.EnvJobName)
podName = os.Getenv(common.EnvPodName)
runID = strings.Split(jobName, "-")[1] // the job name MUST follows the <product>-<runID>-<random ID>
version = "Unknown"
sourceCommit = "Unknown"
taskBroker = schedule.CreateInClusterTaskBroker()
jobName = os.Getenv(common.EnvJobName)
podName = os.Getenv(common.EnvPodName)
runID = strings.Split(jobName, "-")[1] // the job name MUST follows the <product>-<runID>-<random ID>
productName = strings.Split(jobName, "-")[0] // the job name MUST follows the <product>-<runID>-<random ID>
logPathTemplate = ""
version = "Unknown"
sourceCommit = "Unknown"
)
func ckEndpoint() {
url := fmt.Sprintf("http://%s/api/healthy", common.DNSNameTaskStore)
resp, err := http.Get(url)
if err != nil {
log.Fatalf("Fail to get response from %s. Error %s.\n", url, err)
}
if resp.StatusCode != http.StatusOK {
log.Fatalf("The endpoint is not healthy. Status code: %d.\n", resp.StatusCode)
}
}
func ckEnvironment() {
required := []string{common.EnvKeyInternalCommunicationKey, common.EnvJobName}
@ -62,26 +53,6 @@ func preparePod() {
log.Printf("Preparing Pod: \n%s\n", string(output))
}
func saveTaskLog(taskID int, output []byte) error {
stat, err := os.Stat(common.PathMountArtifacts)
if err == nil && stat.IsDir() {
runLogFolder := path.Join(common.PathMountArtifacts, runID)
os.Mkdir(runLogFolder, os.ModeDir)
taskLogFile := path.Join(runLogFolder, fmt.Sprintf("task_%d.log", taskID))
err = ioutil.WriteFile(taskLogFile, output, 0644)
if err != nil {
return fmt.Errorf("Fail to save task log. Reason: unable to write file. Exception: %s", err)
}
return nil
}
// the mount directory doesn't exist, output the log to stdout and let the pod logs handle it.
log.Println("Storage volume is not mount for logging. Print the task output to the stdout instead.")
log.Println("\n" + string(output))
return nil
}
func afterTask(taskResult *models.TaskResult) error {
_, err := os.Stat(common.PathScriptAfterTest)
if err != nil && os.IsNotExist(err) {
@ -115,11 +86,16 @@ func main() {
common.LogInfo(fmt.Sprintf("Run ID: %s", runID))
ckEnvironment()
ckEndpoint()
queue, ch, err := taskBroker.QueueDeclare(jobName)
common.ExitOnError(err, "Failed to connect to the task broker.")
if bLogPathTempalte, exists := kubeutils.TryGetSecretInBytes(
productName,
common.ProductSecretKeyLogPathTemplate); exists {
logPathTemplate = string(bLogPathTempalte)
}
preparePod()
for {
@ -152,15 +128,34 @@ func main() {
if err != nil {
common.LogError(fmt.Sprintf("Failed to commit a new task: %s.", err.Error()))
} else {
err = saveTaskLog(taskResult.ID, output)
taskLogPath, err := taskResult.SaveTaskLog(output)
if err != nil {
common.LogError(fmt.Sprintf("Failed to save task log a new task: %s.", err.Error()))
common.LogError(err.Error())
}
err = afterTask(taskResult)
if err != nil {
common.LogError(fmt.Sprintf("Failed in after task: %s.", err.Error()))
}
if len(logPathTemplate) > 0 {
taskResult.ResultDetails[common.KeyTaskLogPath] = strings.Replace(
logPathTemplate,
"{}",
taskLogPath,
1)
taskResult.ResultDetails[common.KeyTaskRecordPath] = strings.Replace(
logPathTemplate,
"{}",
path.Join(strconv.Itoa(taskResult.RunID), fmt.Sprintf("recording_%d.yaml", taskResult.ID)),
1)
_, err := taskResult.CommitChanges()
if err != nil {
common.LogError(err.Error())
}
}
}
err = delivery.Ack(false)

Просмотреть файл

@ -4,8 +4,13 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"path"
"strconv"
"github.com/Azure/adx-automation-agent/common"
"github.com/Azure/adx-automation-agent/httputils"
)
@ -59,3 +64,64 @@ func (task *TaskResult) CommitNew() (*TaskResult, error) {
return &result, nil
}
// CommitChanges save a commited task's updated properties to the database
func (task *TaskResult) CommitChanges() (*TaskResult, error) {
body, err := json.Marshal(task)
if err != nil {
return nil, fmt.Errorf("unable to marshal JSON: %s", err.Error())
}
path := fmt.Sprintf("task/%d", task.ID)
req, err := httputils.CreateRequest(http.MethodPost, path, body)
if err != nil {
return nil, fmt.Errorf("unable to create request: %s", err.Error())
}
httpClient := http.Client{CheckRedirect: nil}
resp, err := httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("http error: %s", err.Error())
}
defer resp.Body.Close()
respContent, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("unable to read response body: %s", err.Error())
}
if resp.StatusCode >= 300 {
return nil, fmt.Errorf("HTTP Status %d: %s", resp.StatusCode, string(respContent))
}
var result TaskResult
err = json.Unmarshal(respContent, &result)
if err != nil {
return nil, fmt.Errorf("unable to unmarshal JSON: %s", err.Error())
}
return &result, nil
}
// SaveTaskLog the task execution log to the mounted artifacts folder
func (task *TaskResult) SaveTaskLog(output []byte) (string, error) {
stat, err := os.Stat(common.PathMountArtifacts)
if err == nil && stat.IsDir() {
runLogFolder := path.Join(common.PathMountArtifacts, strconv.Itoa(task.RunID))
os.Mkdir(runLogFolder, os.ModeDir)
taskLogFileName := fmt.Sprintf("task_%d.log", task.ID)
taskLogFilePath := path.Join(runLogFolder, taskLogFileName)
err = ioutil.WriteFile(taskLogFilePath, output, 0644)
if err != nil {
return "", fmt.Errorf("Fail to save task log. Reason: unable to write file. Exception: %s", err.Error())
}
return path.Join(strconv.Itoa(task.RunID), taskLogFileName), nil
}
// the mount directory doesn't exist, output the log to stdout and let the pod logs handle it.
log.Println("Storage volume is not mount for logging. Print the task output to the stdout instead.")
log.Println("\n" + string(output))
return "", nil
}