Major update: Introduce message broker

Introduce a RabbitMq as the message broker to dispatch tasks. This is a
major overhaul of the internal architecture. As a result both a01droid
and a01dispatch underwent major overhaul.

After database lock is eliminated from the message broker equation,
there are fewer HTTP requests to the task store is needed. Tasks are not
added until it is actually executed. The controller job also stop
querying the tasks for run status update.

Many logics originally in the main.go are moved to be closer to the data
model or other utility package.
This commit is contained in:
Troy Dai 2018-03-11 22:46:20 -07:00
Родитель 021bfc006d
Коммит 0f73504a33
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 3F32D042717286B1
14 изменённых файлов: 616 добавлений и 429 удалений

Просмотреть файл

@ -15,4 +15,5 @@ const (
KeyFromFailure = "a01.reserved.fromrunfailure"
KeyProduct = "a01.reserved.product"
KeyAgentVersion = "a01.reserved.agentver"
KeyRunID = "a01.reserved.runid"
)

Просмотреть файл

@ -15,4 +15,7 @@ const (
// EnvNodeName stores the current node name
EnvNodeName = "ENV_NODE_NAME"
// EnvJobName stores the parent job name if a pod is created in a job
EnvJobName = "ENV_JOB_NAME"
)

25
common/errorhandling.go Normal file
Просмотреть файл

@ -0,0 +1,25 @@
package common
import (
"fmt"
"log"
)
// ExitOnError exists the current program when an error presents
func ExitOnError(err error, message string) {
if err == nil {
return
}
log.Fatalf("%s: %s", message, err)
}
// PanicOnError exists the current program and sent panic message
func PanicOnError(err error, message string) {
if err == nil {
return
}
log.Fatalf("%s: %s", message, err)
panic(fmt.Errorf("%s: %s", message, err))
}

18
common/logging.go Normal file
Просмотреть файл

@ -0,0 +1,18 @@
package common
import "log"
// LogInfo writes information to stdout
func LogInfo(message string) {
log.Printf("INFO: %s", message)
}
// LogWarning writes warning to stdout
func LogWarning(message string) {
log.Printf("WARN: %s", message)
}
// LogError writes warning to stdout
func LogError(message string) {
log.Printf("ERRO: %s", message)
}

Просмотреть файл

@ -10,11 +10,12 @@ const (
StorageVolumeNameTools = "tools-storage"
DNSNameTaskStore = "store-internal-svc"
DNSNameEmailService = "email-internal-svc"
DNSNameTaskBroker = "taskbroker-internal-svc"
SecretNameAgents = "agent-secrets"
)
// GetCurrentNamespace returns the namespace this Pod belongs to. If it fails to resolve the name, it uses the fallback
// name.
// GetCurrentNamespace returns the namespace this Pod belongs to. If it fails
// to resolve the name, it uses the fallback name.
func GetCurrentNamespace(fallback string) string {
if content, err := ioutil.ReadFile(PathKubeNamespace); err == nil {
return string(content)

Просмотреть файл

@ -1,15 +1,12 @@
package main
import (
"bytes"
"crypto/rand"
"encoding/base32"
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"os/exec"
"regexp"
@ -17,18 +14,21 @@ import (
"strings"
"time"
"github.com/streadway/amqp"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/Azure/adx-automation-agent/common"
"github.com/Azure/adx-automation-agent/httputils"
"github.com/Azure/adx-automation-agent/kubeutils"
"github.com/Azure/adx-automation-agent/models"
"github.com/Azure/adx-automation-agent/reportutils"
"github.com/Azure/adx-automation-agent/schedule"
)
var (
httpClient = &http.Client{CheckRedirect: nil}
taskBroker = schedule.CreateInClusterTaskBroker()
namespace = common.GetCurrentNamespace("a01-prod")
droidMetadata = models.ReadDroidMetadata(common.PathMetadataYml)
clientset = kubeutils.TryCreateKubeClientset()
@ -37,8 +37,8 @@ var (
)
func main() {
info(fmt.Sprintf("A01 Droid Dispatcher.\nVersion: %s.\nCommit: %s.\n", version, sourceCommit))
info(fmt.Sprintf("Pod name: %s", os.Getenv(common.EnvPodName)))
common.LogInfo(fmt.Sprintf("A01 Droid Dispatcher.\nVersion: %s.\nCommit: %s.\n", version, sourceCommit))
common.LogInfo(fmt.Sprintf("Pod name: %s", os.Getenv(common.EnvPodName)))
var pRunID *int
pRunID = flag.Int("run", -1, "The run ID")
@ -48,142 +48,95 @@ func main() {
log.Fatal("Missing runID")
}
run := getRun(*pRunID)
// query the run and then update the product name in the details
run, err := models.QueryRun(*pRunID)
common.ExitOnError(err, "fail to query the run")
err := postTasks(run, queryTests(run))
run.Details[common.KeyProduct] = droidMetadata.Product
run, err = run.Patch()
common.ExitOnError(err, "fail to update the run")
printRunInfo(run)
// generate a job name. the name will be used through out the remaining
// session to identify the group of operations and resources
jobName := fmt.Sprintf("%s-%d-%s", droidMetadata.Product, run.ID, getRandomString())
// publish tasks to the task broker which will establish a worker queue
err = publishTasks(run, jobName, queryTests(run))
common.ExitOnError(err, "Fail to publish tasks to the task broker.")
defer taskBroker.Close()
// creates a kubernete job to manage test droid
jobDef, err := createTaskJob(run, jobName)
if err != nil {
log.Fatal(err.Error())
}
jobDef, err := createTaskJob(run)
if err != nil {
log.Fatal(err.Error())
}
// Ignore this error for now. This API's latest version seems to sending inaccurate
// error
// ignore this error for now. This API's latest version seems to sending
// inaccurate error
job, _ := clientset.BatchV1().Jobs(namespace).Create(jobDef)
job, err = clientset.BatchV1().Jobs(namespace).Get(jobDef.Name, metav1.GetOptions{})
if err != nil {
log.Fatal(err.Error())
}
info(fmt.Sprintf("Job %s started", job.Name))
// begin monitoring the job status till the end
monitor(run, job)
}
func info(message string) {
log.Printf("INFO: %s", message)
}
func monitor(run *models.Run, job *batchv1.Job) {
common.LogInfo("Begin monitoring task execution ...")
ch, err := taskBroker.GetChannel()
common.PanicOnError(err, "Fail to establish channel to the task broker during monitoring.")
podListOpt := metav1.ListOptions{LabelSelector: fmt.Sprintf("job-name=%s", job.Name)}
for {
content, err := sendRequest(http.MethodGet, fmt.Sprintf("run/%d/tasks", run.ID), nil, "Fail to query tests. Reason %s. Exception %s.")
time.Sleep(time.Second * 10)
queue, err := ch.QueueInspect(job.ObjectMeta.Name)
if err != nil {
log.Println(err.Error())
common.LogWarning(fmt.Errorf("Fail to insepct the queue %s: %s", job.ObjectMeta.Name, err).Error())
continue
}
common.LogInfo(fmt.Sprintf("Messages %d => Consumers %d.", queue.Messages, queue.Consumers))
if queue.Messages != 0 {
// there are tasks to be run
continue
}
var tasks []models.Task
if err := json.Unmarshal(content, &tasks); err != nil {
log.Println(err.Error())
continue
}
statuses := make(map[string]int)
for _, task := range tasks {
statuses[task.Status]++
}
statusInfo := make([]string, 0, len(statuses))
for name, count := range statuses {
statusInfo = append(statusInfo, fmt.Sprintf("%s=%d", name, count))
}
info(strings.Join(statusInfo, "|"))
lostTask := make([]int, 0, 10) // those tests where pod crashes during execution therfore entering limbo
podList, err := clientset.CoreV1().Pods(namespace).List(metav1.ListOptions{LabelSelector: "job-name = " + job.Name})
// the number of the message in the queue is zero. make sure all the
// pods in this job have finished
podList, err := clientset.CoreV1().Pods(namespace).List(podListOpt)
if err != nil {
log.Println(err.Error())
common.LogWarning(fmt.Errorf("Fail to list pod of %s: %s", job.Name, err).Error())
continue
}
for _, task := range tasks {
if task.Status != "schedules" {
continue
}
if agent, ok := task.ResultDetails["agent"]; ok {
podName := strings.Split(agent.(string), "@")[0]
if len(podName) > 0 {
for _, pod := range podList.Items {
if pod.ObjectMeta.Name == podName {
if pod.Status.Phase != corev1.PodRunning {
lostTask = append(lostTask, task.ID)
}
}
}
}
runningPods := 0
for _, pod := range podList.Items {
if pod.Status.Phase == corev1.PodRunning {
runningPods++
}
}
if _, ok := statuses["initialized"]; !ok {
if _, ok := statuses["scheduled"]; !ok {
info(fmt.Sprintf("Run %d is finished", run.ID))
report(run)
os.Exit(0)
} else if statuses["scheduled"]-len(lostTask) == 0 {
info(fmt.Sprintf("Run %d is finished despite %d lost tasks.", run.ID, len(lostTask)))
report(run)
os.Exit(0)
}
if runningPods != 0 {
common.LogInfo(fmt.Sprintf("%d pod are still running.", runningPods))
continue
}
time.Sleep(time.Second * 30)
// zero task in the queue and all pod stop.
break
}
}
func report(run *models.Run) {
info("Sending report...")
if email, ok := run.Settings[common.KeyUserEmail]; ok {
content := make(map[string]string)
content["run_id"] = strconv.Itoa(run.ID)
content["receivers"] = email.(string)
body, err := json.Marshal(content)
if err != nil {
info("Fail to marshal JSON during request sending email.")
return
}
info(string(body))
req, err := http.NewRequest(
http.MethodPost,
fmt.Sprintf("http://%s/report", common.DNSNameEmailService),
bytes.NewBuffer(body))
if err != nil {
info("Fail to create request to requesting email.")
return
}
req.Header.Set("Content-Type", "application/json")
resp, err := httpClient.Do(req)
if err != nil {
info("Fail to send request to email service.")
return
}
if resp.StatusCode != http.StatusOK {
info("The request may have failed.")
}
} else {
info("Skip sending report")
}
reportutils.Report(run)
}
func queryTests(run *models.Run) []models.TaskSetting {
info(fmt.Sprintf("Expecting script %s.", common.PathScriptGetIndex))
common.LogInfo(fmt.Sprintf("Expecting script %s.", common.PathScriptGetIndex))
content, err := exec.Command(common.PathScriptGetIndex).Output()
if err != nil {
panic(err.Error())
@ -196,7 +149,7 @@ func queryTests(run *models.Run) []models.TaskSetting {
}
if query, ok := run.Settings[common.KeyTestQuery]; ok {
info(fmt.Sprintf("Query string is '%s'", query))
common.LogInfo(fmt.Sprintf("Query string is '%s'", query))
result := make([]models.TaskSetting, 0, len(input))
for _, test := range input {
matched, regerr := regexp.MatchString(query.(string), test.Classifier["identifier"])
@ -211,101 +164,64 @@ func queryTests(run *models.Run) []models.TaskSetting {
return input
}
func sendRequest(method string, path string, body interface{}, templateError string) ([]byte, error) {
var content []byte
if body != nil {
var err error
content, err = json.Marshal(body)
if err != nil {
return nil, fmt.Errorf(templateError, "unable to marshal the body in JSON.")
}
}
req, err := httputils.CreateRequest(method, path, content)
if err != nil {
return nil, fmt.Errorf(templateError, "unable to create request", err)
}
resp, err := httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf(templateError, "http request failure", err)
}
defer resp.Body.Close()
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf(templateError, "unable to read respond body", err)
}
if resp.StatusCode >= 300 {
reason := fmt.Sprintf("status code: %d.", resp.StatusCode) + " Body %s"
return nil, fmt.Errorf(templateError, fmt.Sprintf(reason, string(b)), "N/A")
}
return b, nil
}
// Read run data and update the run details accordingly. exit the program as fatal if failed.
func getRun(runID int) (result *models.Run) {
templateError := fmt.Sprintf("Fail to get the run %d.", runID) + " Reason %s. Exception %s."
content, err := sendRequest(http.MethodGet, fmt.Sprintf("run/%d", runID), nil, templateError)
if err != nil {
log.Fatalf(fmt.Errorf(templateError, "http failure", err).Error())
}
var run models.Run
err = json.Unmarshal(content, &run)
if err != nil {
log.Fatalf(fmt.Errorf(templateError, "json unmarshal failure", err).Error())
}
info(fmt.Sprintf("Find run %d: %s.", run.ID, run.Name))
func printRunInfo(run *models.Run) {
common.LogInfo(fmt.Sprintf("Find run %d: %s.", run.ID, run.Name))
if run.Details != nil {
info(" Details")
common.LogInfo(" Details")
for key, value := range run.Details {
info(fmt.Sprintf(" %s = %s", key, value))
common.LogInfo(fmt.Sprintf(" %s = %s", key, value))
}
}
if run.Settings != nil {
info(" Settings")
common.LogInfo(" Settings")
for key, value := range run.Settings {
info(fmt.Sprintf(" %s = %s", key, value))
common.LogInfo(fmt.Sprintf(" %s = %s", key, value))
}
}
}
func publishTasks(run *models.Run, jobName string, settings []models.TaskSetting) (err error) {
common.LogInfo(fmt.Sprintf("To schedule %d tests.", len(settings)))
_, ch, err := taskBroker.QueueDeclare(jobName)
if err != nil {
// TODO: update run's status in DB to failed
common.ExitOnError(err, "Fail to declare queue in task broker.")
}
common.LogInfo(fmt.Sprintf("Declared queue %s. Begin publishing tasks ...", jobName))
for _, setting := range settings {
body, err := json.Marshal(setting)
if err != nil {
common.LogWarning(fmt.Sprintf("Fail to marshal task %s setting in JSON. Error %s. The task is skipped.", setting, err.Error()))
continue
}
err = ch.Publish(
"", // default exchange
jobName, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "application/json",
Body: body,
})
if err != nil {
common.LogWarning(fmt.Sprintf("Fail to publish task %s. Error %s. The task is skipped.", setting, err.Error()))
}
}
info(fmt.Sprintf("Update run product in details to %s.", droidMetadata.Product))
run.Details[common.KeyProduct] = droidMetadata.Product
_, err = sendRequest(http.MethodPatch, fmt.Sprintf("run/%d", runID), run, templateError)
if err != nil {
log.Fatal(err.Error())
}
common.LogInfo("Finish publish tasks")
return &run
return nil
}
func postTasks(run *models.Run, settings []models.TaskSetting) (err error) {
info(fmt.Sprintf("To schedule %d tests.", len(settings)))
err = nil
tasks := make([]models.Task, len(settings))
///////////////////////////////////////////////////////////////////////////////////////////////////
// Kubernete JOB
for idx, setting := range settings {
var task models.Task
task.Name = fmt.Sprintf("Test: %s", setting.Classifier["identifier"])
task.Settings = setting
task.Annotation = run.Settings[common.KeyImageName].(string)
tasks[idx] = task
}
templateError := fmt.Sprintf("Fail to create task for run %d.", run.ID) + " Reason %s. Exception %s."
info("Posting tasks ...")
_, err = sendRequest(http.MethodPost, fmt.Sprintf("run/%d/tasks", run.ID), tasks, templateError)
info("Finish posting tasks ...")
return
}
func createTaskJob(run *models.Run) (job *batchv1.Job, err error) {
func createTaskJob(run *models.Run, jobName string) (job *batchv1.Job, err error) {
client, err := kubeutils.CreateKubeClientset()
if err != nil {
return nil, err
@ -314,20 +230,19 @@ func createTaskJob(run *models.Run) (job *batchv1.Job, err error) {
parallelism := int32(run.Settings[common.KeyInitParallelism].(float64))
var backoff int32 = 5
name := fmt.Sprintf("%s-%d-%s", droidMetadata.Product, run.ID, getRandomString())
definition := batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Name: jobName,
},
Spec: batchv1.JobSpec{
Parallelism: &parallelism,
BackoffLimit: &backoff,
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Name: jobName,
},
Spec: corev1.PodSpec{
Containers: getContainerSpecs(run),
Containers: getContainerSpecs(run, jobName),
ImagePullSecrets: getImagePullSource(run),
Volumes: getVolumes(run),
RestartPolicy: corev1.RestartPolicyNever,
@ -382,12 +297,12 @@ func getImagePullSource(run *models.Run) []corev1.LocalObjectReference {
return []corev1.LocalObjectReference{corev1.LocalObjectReference{Name: run.Settings[common.KeyImagePullSecret].(string)}}
}
func getContainerSpecs(run *models.Run) (containers []corev1.Container) {
func getContainerSpecs(run *models.Run, jobName string) (containers []corev1.Container) {
c := corev1.Container{
Name: "main",
Image: run.Settings[common.KeyImageName].(string),
Env: getEnvironmentVariableDef(run),
Command: []string{common.PathMountTools + "/a01droid", "-run", strconv.Itoa(run.ID)},
Env: getEnvironmentVariableDef(run, jobName),
Command: []string{common.PathMountTools + "/a01droid"},
}
volumeMounts := []corev1.VolumeMount{
@ -406,7 +321,7 @@ func getContainerSpecs(run *models.Run) (containers []corev1.Container) {
return []corev1.Container{c}
}
func getEnvironmentVariableDef(run *models.Run) []corev1.EnvVar {
func getEnvironmentVariableDef(run *models.Run, jobName string) []corev1.EnvVar {
result := []corev1.EnvVar{
corev1.EnvVar{
Name: common.EnvPodName,
@ -414,6 +329,7 @@ func getEnvironmentVariableDef(run *models.Run) []corev1.EnvVar {
corev1.EnvVar{
Name: common.EnvNodeName,
ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{FieldPath: "spec.nodeName"}}},
corev1.EnvVar{Name: common.EnvJobName, Value: jobName},
corev1.EnvVar{
Name: common.EnvKeyInternalCommunicationKey,
ValueFrom: &corev1.EnvVarSource{

Просмотреть файл

@ -2,7 +2,6 @@ package main
import (
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"log"
@ -11,15 +10,17 @@ import (
"os/exec"
"path"
"strings"
"time"
"github.com/Azure/adx-automation-agent/common"
"github.com/Azure/adx-automation-agent/httputils"
"github.com/Azure/adx-automation-agent/models"
"github.com/Azure/adx-automation-agent/schedule"
)
var (
httpClient = &http.Client{CheckRedirect: nil}
taskBroker = schedule.CreateInClusterTaskBroker()
jobName = os.Getenv(common.EnvJobName)
podName = os.Getenv(common.EnvPodName)
runID = strings.Split(jobName, "-")[1] // the job name MUST follows the <product>-<runID>-<random ID>
version = "Unknown"
sourceCommit = "Unknown"
)
@ -37,7 +38,7 @@ func ckEndpoint() {
}
func ckEnvironment() {
required := []string{common.EnvKeyInternalCommunicationKey}
required := []string{common.EnvKeyInternalCommunicationKey, common.EnvJobName}
for _, r := range required {
_, exists := os.LookupEnv(r)
@ -61,128 +62,7 @@ func preparePod() {
log.Printf("Preparing Pod: \n%s\n", string(output))
}
// checkoutTask finds a new task to run and updates in which pod it will run (this pod!)
func checkoutTask(runID string) (id int, err error) {
templateError := fmt.Sprintf("Fail to checkout task from run %s.", runID) + " Reason: %s. Exception: %s."
request, err := httputils.CreateRequest(http.MethodPost, fmt.Sprintf("run/%s/checkout", runID), nil)
if err != nil {
return 0, fmt.Errorf(templateError, "unable to create new request", err)
}
resp, err := httpClient.Do(request)
if err != nil {
return 0, fmt.Errorf(templateError, "http request failed.", err)
}
if resp.StatusCode == http.StatusOK {
// continue
} else if resp.StatusCode == http.StatusNoContent {
log.Println("No more tasks. This droid's work is done.")
os.Exit(0)
} else {
reason := fmt.Sprintf("status code: %d.", resp.StatusCode) + " Body %s"
defer resp.Body.Close()
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return 0, fmt.Errorf(templateError, fmt.Sprintf(reason, "fail to read."), err)
}
return 0, fmt.Errorf(templateError, fmt.Sprintf(reason, string(b)), "N/A")
}
defer resp.Body.Close()
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return 0, fmt.Errorf(templateError, "unable to read response body", err)
}
var task models.Task
err = json.Unmarshal(b, &task)
if err != nil {
return 0, fmt.Errorf(templateError, "unable to parse body in JSON", err)
}
// update task
if task.ResultDetails == nil {
task.ResultDetails = make(map[string]interface{})
}
task.ResultDetails["agent"] = fmt.Sprintf("%s@%s", os.Getenv("ENV_POD_NAME"), os.Getenv("ENV_NODE_NAME"))
err = patchTask(task)
if err != nil {
return 0, fmt.Errorf(templateError, "unable to update the task", err)
}
log.Printf("Checked out task %d.\n", task.ID)
return task.ID, nil
}
func patchTask(task models.Task) error {
templateError := fmt.Sprintf("Fail to path task %d.", task.ID) + " Reason: %s. Exception: %s."
path := fmt.Sprintf("task/%d", task.ID)
// Marshal the task
content, err := json.Marshal(task)
if err != nil {
return fmt.Errorf(templateError, "unable to marshal body in JSON", err)
}
req, err := httputils.CreateRequest(http.MethodPatch, path, content)
if err != nil {
return fmt.Errorf(templateError, "unable to create new request", err)
}
resp, err := httpClient.Do(req)
if err != nil {
return fmt.Errorf(templateError, "http request failed.", err)
}
if resp.StatusCode >= 300 {
reason := fmt.Sprintf("status code: %d.", resp.StatusCode) + " Body %s"
defer resp.Body.Close()
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf(templateError, fmt.Sprintf(reason, "fail to read."), err)
}
return fmt.Errorf(templateError, fmt.Sprintf(reason, string(b)), "N/A")
}
return nil
}
func getTask(taskID int) (task models.Task, err error) {
templateError := fmt.Sprintf("Fail to get task %d.", taskID) + " Reason: %s. Exception: %s."
path := fmt.Sprintf("task/%d", taskID)
request, err := httputils.CreateRequest(http.MethodGet, path, nil)
if err != nil {
return task, fmt.Errorf(templateError, "unable to create new request", err)
}
resp, err := httpClient.Do(request)
if err != nil {
return task, fmt.Errorf(templateError, "http request failed.", err)
}
defer resp.Body.Close()
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return task, fmt.Errorf(templateError, "unable to read response body.", err)
}
if resp.StatusCode >= 300 {
reason := fmt.Sprintf("status code: %d. Body %s", resp.StatusCode, string(b))
return task, fmt.Errorf(templateError, reason, "N/A")
}
err = json.Unmarshal(b, &task)
if err != nil {
return task, fmt.Errorf(templateError, "unable to parse body in JSON", err)
}
return
}
func saveTaskLog(runID string, taskID int, output []byte) error {
func saveTaskLog(taskID int, output []byte) error {
stat, err := os.Stat(common.PathMountArtifacts)
if err == nil && stat.IsDir() {
runLogFolder := path.Join(common.PathMountArtifacts, runID)
@ -202,108 +82,92 @@ func saveTaskLog(runID string, taskID int, output []byte) error {
return nil
}
func afterTask(taskID int) error {
templateError := "Fail to exectue after task action. Reason: %s. Exception: %s."
func afterTask(taskResult *models.TaskResult) error {
_, err := os.Stat(common.PathScriptAfterTest)
if err != nil && os.IsNotExist(err) {
// Missing after task execuable is not considerred an error.
log.Printf("Skip the after task action because the executable %s doesn't exist.", common.PathScriptAfterTest)
return nil
}
task, err := getTask(taskID)
log.Printf("Executing after task %s.", common.PathScriptAfterTest)
taskInBytes, err := json.Marshal(taskResult)
if err != nil {
return fmt.Errorf(templateError, "unable to get the task", err)
return fmt.Errorf("unable to encode task to JSON: %s", err.Error())
}
taskInBytes, err := json.Marshal(task)
output, err := exec.Command(
common.PathScriptAfterTest,
common.PathMountArtifacts,
string(taskInBytes),
).CombinedOutput()
if err != nil {
return fmt.Errorf(templateError, "unable to encode task to JSON", err)
return fmt.Errorf("execution failed: %s", err.Error())
}
output, err := exec.Command(common.PathScriptAfterTest, common.PathMountArtifacts, string(taskInBytes)).CombinedOutput()
if err != nil {
return fmt.Errorf(templateError, "task executable failure", err)
}
log.Printf("After task executed.\n%s\n", string(output))
return nil
}
func runTask(taskID int, runID string) error {
templateError := fmt.Sprintf("Fail to run task %d.", taskID) + " Reason: %s. Exception: %s."
task, err := getTask(taskID)
if err != nil {
return fmt.Errorf(templateError, "unable to get task", err)
}
execution := strings.Fields(task.Settings.Execution["command"])
var cmd *exec.Cmd
if len(execution) < 2 {
cmd = exec.Command(execution[0])
} else {
cmd = exec.Command(execution[0], execution[1:]...)
}
begin := time.Now()
output, err := cmd.CombinedOutput()
duration := time.Now().Sub(begin) / time.Millisecond
if err == nil {
task.Result = "Passed"
} else {
task.Result = "Failed"
}
task.Status = "completed"
if task.ResultDetails == nil {
task.ResultDetails = make(map[string]interface{})
}
task.ResultDetails["duration"] = int(duration)
err = patchTask(task)
if err != nil {
return fmt.Errorf(templateError, "unable to update task", err)
}
err = saveTaskLog(runID, taskID, output)
if err != nil {
log.Println(err.Error())
}
log.Printf("[%s] Task %s", task.Result, task.Name)
common.LogInfo(fmt.Sprintf("After task executed. %s.", string(output)))
return nil
}
func main() {
pRunID := flag.String("run", "", "The run ID")
flag.Parse()
if pRunID == nil || len(*pRunID) == 0 {
log.Fatal("Missing runID")
}
log.Printf("A01 Droid Engine.\nVersion: %s.\nCommit: %s.\n", version, sourceCommit)
common.LogInfo(fmt.Sprintf("A01 Droid Engine.\nVersion: %s.\nCommit: %s.\n", version, sourceCommit))
common.LogInfo(fmt.Sprintf("Run ID: %s", runID))
ckEnvironment()
ckEndpoint()
queue, ch, err := taskBroker.QueueDeclare(jobName)
common.ExitOnError(err, "Failed to connect to the task broker.")
preparePod()
for {
taskID, err := checkoutTask(*pRunID)
if err != nil {
log.Fatalf(err.Error())
delivery, ok, err := ch.Get(queue.Name, false /* autoAck*/)
common.ExitOnError(err, "Failed to get a delivery.")
if !ok {
common.LogInfo("No more task in the queue. Exiting successfully.")
break
}
err = runTask(taskID, *pRunID)
var output []byte
var taskResult *models.TaskResult
var setting models.TaskSetting
err = json.Unmarshal(delivery.Body, &setting)
if err != nil {
log.Fatalf(err.Error())
errorMsg := fmt.Sprintf("Failed to unmarshel a delivery's body in JSON: %s", err.Error())
common.LogError(errorMsg)
taskResult = setting.CreateIncompletedTask(podName, runID, errorMsg)
} else {
common.LogInfo(fmt.Sprintf("Run task %s", setting.GetIdentifier()))
result, duration, executeOutput := setting.Execute()
taskResult = setting.CreateCompletedTask(result, duration, podName, runID)
output = executeOutput
}
err = afterTask(taskID)
taskResult, err = taskResult.CommitNew()
if err != nil {
// after task action's failure is not fatal.
log.Println(err.Error())
common.LogError(fmt.Sprintf("Failed to commit a new task: %s.", err.Error()))
} else {
err = saveTaskLog(taskResult.ID, output)
if err != nil {
common.LogError(fmt.Sprintf("Failed to save task log a new task: %s.", err.Error()))
}
err = afterTask(taskResult)
if err != nil {
common.LogError(fmt.Sprintf("Failed in after task: %s.", err.Error()))
}
}
err = delivery.Ack(false)
if err != nil {
common.LogError(fmt.Sprintf("Failed to ack delivery: %s", err.Error()))
} else {
common.LogInfo("ACK")
}
}
}

Просмотреть файл

@ -4,6 +4,7 @@ import (
"bytes"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
@ -21,7 +22,6 @@ func getEndpointFromEnv() string {
// CreateRequest returns a new HTTP request
func CreateRequest(method string, path string, body []byte) (request *http.Request, err error) {
templateError := fmt.Sprintf("Fail to create request [%s %s].", method, path) + " Reason %s. Exception %s."
authorizationHeader := os.Getenv(common.EnvKeyInternalCommunicationKey)
endpoint := getEndpointFromEnv()
@ -32,7 +32,7 @@ func CreateRequest(method string, path string, body []byte) (request *http.Reque
request, err = http.NewRequest(method, fmt.Sprintf("%s/%s", endpoint, path), buffer)
if err != nil {
return nil, fmt.Errorf(templateError, "unable to create request", err)
return nil, fmt.Errorf("unable to create request: %s", err.Error())
}
request.Header.Set("Authorization", authorizationHeader)
@ -42,3 +42,24 @@ func CreateRequest(method string, path string, body []byte) (request *http.Reque
return
}
// SendRequest sends the given request and verify the response's status code
func SendRequest(request *http.Request) ([]byte, error) {
httpClient := http.Client{CheckRedirect: nil}
resp, err := httpClient.Do(request)
if err != nil {
return nil, fmt.Errorf("http error: %s", err.Error())
}
defer resp.Body.Close()
respContent, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("unable to read response body: %s", err.Error())
}
if resp.StatusCode >= 300 {
return nil, fmt.Errorf("HTTP Status %d: %s", resp.StatusCode, string(respContent))
}
return respContent, nil
}

Просмотреть файл

@ -1,6 +1,13 @@
package models
import "github.com/Azure/adx-automation-agent/common"
import (
"encoding/json"
"fmt"
"net/http"
"github.com/Azure/adx-automation-agent/common"
"github.com/Azure/adx-automation-agent/httputils"
)
// Run is the data structure of A01 run
type Run struct {
@ -20,3 +27,50 @@ func (run *Run) GetSecretName(metadata *DroidMetadata) string {
return metadata.Product
}
// Patch submit a patch
func (run *Run) Patch() (*Run, error) {
body, err := json.Marshal(run)
if err != nil {
return nil, fmt.Errorf("unable to marshal in JSON: %s", err.Error())
}
req, err := httputils.CreateRequest(http.MethodPatch, fmt.Sprintf("run/%d", run.ID), body)
if err != nil {
return nil, err
}
respContent, err := httputils.SendRequest(req)
if err != nil {
return nil, err
}
var updated Run
err = json.Unmarshal(respContent, &updated)
if err != nil {
return nil, fmt.Errorf("unable to unmarshal JSON: %s", err.Error())
}
return &updated, nil
}
// QueryRun returns the run of the runID
func QueryRun(runID int) (*Run, error) {
req, err := httputils.CreateRequest(http.MethodGet, fmt.Sprintf("run/%d", runID), nil)
if err != nil {
return nil, err
}
respContent, err := httputils.SendRequest(req)
if err != nil {
return nil, err
}
var run Run
err = json.Unmarshal(respContent, &run)
if err != nil {
return nil, fmt.Errorf("unable to unmarshal JSON: %s", err.Error())
}
return &run, nil
}

Просмотреть файл

@ -1,22 +0,0 @@
package models
// Task is the data model of a task in A01 system
type Task struct {
Annotation string `json:"annotation,omitempty"`
Duration int `json:"duration,omitempty"`
ID int `json:"id,omitempty"`
Name string `json:"name,omitempty"`
Result string `json:"result,omitempty"`
ResultDetails map[string]interface{} `json:"result_details,omitempty"`
RunID int `json:"run_id,omitempty"`
Settings TaskSetting `json:"settings,omitempty"`
Status string `json:"status,omitempty"`
}
// TaskSetting is the setting data model of A01Task
type TaskSetting struct {
Version string `json:"ver,omitempty"`
Execution map[string]string `json:"execution,omitempty"`
Classifier map[string]string `json:"classifier,omitempty"`
Miscellanea map[string]string `json:"misc,omitempty"`
}

61
models/taskresult.go Normal file
Просмотреть файл

@ -0,0 +1,61 @@
package models
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"github.com/Azure/adx-automation-agent/httputils"
)
// TaskResult is the data model of a task in A01 system
type TaskResult struct {
Annotation string `json:"annotation,omitempty"`
Duration int `json:"duration,omitempty"`
ID int `json:"id,omitempty"`
Name string `json:"name,omitempty"`
Result string `json:"result,omitempty"`
ResultDetails map[string]interface{} `json:"result_details,omitempty"`
RunID int `json:"run_id,omitempty"`
Settings TaskSetting `json:"settings,omitempty"`
Status string `json:"status,omitempty"`
}
// CommitNew save an uncommited Task to the database
func (task *TaskResult) CommitNew() (*TaskResult, error) {
body, err := json.Marshal(task)
if err != nil {
return nil, fmt.Errorf("unable to marshal JSON: %s", err.Error())
}
path := fmt.Sprintf("run/%d/task", task.RunID)
req, err := httputils.CreateRequest(http.MethodPost, path, body)
if err != nil {
return nil, fmt.Errorf("unable to create request: %s", err.Error())
}
httpClient := http.Client{CheckRedirect: nil}
resp, err := httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("http error: %s", err.Error())
}
defer resp.Body.Close()
respContent, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("unable to read response body: %s", err.Error())
}
if resp.StatusCode >= 300 {
return nil, fmt.Errorf("HTTP Status %d: %s", resp.StatusCode, string(respContent))
}
var result TaskResult
err = json.Unmarshal(respContent, &result)
if err != nil {
return nil, fmt.Errorf("unable to unmarshal JSON: %s", err.Error())
}
return &result, nil
}

86
models/tasksetting.go Normal file
Просмотреть файл

@ -0,0 +1,86 @@
package models
import (
"fmt"
"os/exec"
"strconv"
"strings"
"time"
)
// TaskSetting is the setting data model of A01Task
type TaskSetting struct {
Version string `json:"ver,omitempty"`
Execution map[string]string `json:"execution,omitempty"`
Classifier map[string]string `json:"classifier,omitempty"`
Miscellanea map[string]string `json:"misc,omitempty"`
}
// GetIdentifier returns the unique identifier of the task setting
func (setting *TaskSetting) GetIdentifier() string {
return setting.Classifier["identifier"]
}
// GetCommand returns the string slice of the command to execute
func (setting *TaskSetting) GetCommand() []string {
return strings.Fields(setting.Execution["command"])
}
// Execute runs the command and returns the execution results
func (setting *TaskSetting) Execute() (result string, duration int, output []byte) {
execution := setting.GetCommand()
var cmd *exec.Cmd
if len(execution) < 2 {
cmd = exec.Command(execution[0])
} else {
cmd = exec.Command(execution[0], execution[1:]...)
}
begin := time.Now()
output, err := cmd.CombinedOutput()
duration = int(time.Now().Sub(begin) / time.Millisecond)
if err == nil {
result = "Passed"
} else {
result = "Failed"
}
return
}
// CreateCompletedTask returns a uncommited Task instance represents a completed task
func (setting *TaskSetting) CreateCompletedTask(result string, duration int, podName string, runID string) *TaskResult {
nRunID, _ := strconv.Atoi(runID)
task := TaskResult{
Name: fmt.Sprintf("Test: %s", setting.GetIdentifier()),
Duration: duration,
Result: result,
ResultDetails: map[string]interface{}{"agent": podName},
RunID: nRunID,
Settings: *setting,
Status: "Completed",
}
return &task
}
// CreateIncompletedTask returns a uncommited Task instance represents an incompleted task
func (setting *TaskSetting) CreateIncompletedTask(podName string, runID string, errorMsg string) *TaskResult {
nRunID, _ := strconv.Atoi(runID)
task := TaskResult{
Name: fmt.Sprintf("Test: %s", setting.GetIdentifier()),
Result: "Error",
ResultDetails: map[string]interface{}{
"agent": podName,
"error": errorMsg,
},
RunID: nRunID,
Settings: *setting,
Status: "Error",
}
return &task
}

53
reportutils/report.go Normal file
Просмотреть файл

@ -0,0 +1,53 @@
package reportutils
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"strconv"
"github.com/Azure/adx-automation-agent/common"
"github.com/Azure/adx-automation-agent/models"
)
var httpClient = &http.Client{}
// Report method requests the email service to send emails
func Report(run *models.Run) {
common.LogInfo("Sending report...")
if email, ok := run.Settings[common.KeyUserEmail]; ok {
content := make(map[string]string)
content["run_id"] = strconv.Itoa(run.ID)
content["receivers"] = email.(string)
body, err := json.Marshal(content)
if err != nil {
common.LogInfo("Fail to marshal JSON during request sending email.")
return
}
common.LogInfo(string(body))
req, err := http.NewRequest(
http.MethodPost,
fmt.Sprintf("http://%s/report", common.DNSNameEmailService),
bytes.NewBuffer(body))
if err != nil {
common.LogInfo("Fail to create request to requesting email.")
return
}
req.Header.Set("Content-Type", "application/json")
resp, err := httpClient.Do(req)
if err != nil {
common.LogInfo("Fail to send request to email service.")
return
}
if resp.StatusCode != http.StatusOK {
common.LogInfo("The request may have failed.")
}
} else {
common.LogInfo("Skip sending report")
}
}

106
schedule/taskbroker.go Normal file
Просмотреть файл

@ -0,0 +1,106 @@
package schedule
import (
"fmt"
"github.com/Azure/adx-automation-agent/common"
"github.com/streadway/amqp"
)
// TaskBroker represents an instance of message broker used in the A01 system
type TaskBroker struct {
ConnectionName string
connected bool
channel *amqp.Channel
connection *amqp.Connection
declaredQueues []string
}
// GetChannel returns the channel to this task broker. If a channel hasn't been
// establised, a new channel as well as a connection will be created.
func (broker *TaskBroker) GetChannel() (*amqp.Channel, error) {
if broker.channel == nil {
if broker.connection == nil {
conn, err := amqp.Dial(broker.ConnectionName)
if err != nil {
broker.connection = nil
return nil, err
}
broker.connection = conn
}
ch, err := broker.connection.Channel()
if err != nil {
broker.Close()
return nil, err
}
// ensure fair fetch
err = ch.Qos(
1, // perfetch count
0, // prefetch size
false, // global
)
if err != nil {
broker.Close()
return nil, err
}
broker.channel = ch
}
return broker.channel, nil
}
// QueueDeclare declare a queue associated with the given name. It returns the
// queue as well as the channel associate with this connetion. If a channel has
// not been established, a new one will be created.
func (broker *TaskBroker) QueueDeclare(name string) (queue amqp.Queue, ch *amqp.Channel, err error) {
ch, err = broker.GetChannel()
if err != nil {
return amqp.Queue{}, nil, err
}
queue, err = ch.QueueDeclare(
name, // queue name
true, // durable
true, // delete when used
false, // exclusive
false, // no-wait
nil, // argument
)
broker.declaredQueues = append(broker.declaredQueues, queue.Name)
return
}
// Close the channel and connection
func (broker *TaskBroker) Close() {
for _, queueName := range broker.declaredQueues {
broker.channel.QueueDelete(queueName, false, false, true)
}
if broker.channel != nil {
defer broker.channel.Close()
}
if broker.connection != nil {
defer broker.connection.Close()
}
}
// CreateLocalTaskBroker returns a TaskBroker instance used in local testing.
// The instance expects a message broker running at local 5672 port.
func CreateLocalTaskBroker() *TaskBroker {
return &TaskBroker{
ConnectionName: "amqp://localhost:5672",
}
}
// CreateInClusterTaskBroker returns a in-cluster task broker instance
func CreateInClusterTaskBroker() *TaskBroker {
return &TaskBroker{
ConnectionName: fmt.Sprintf("amqp://%s:5672", common.DNSNameTaskBroker),
}
}