go-asyncjob/job_definition.go

114 строки
2.8 KiB
Go

package asyncjob
import (
"context"
"errors"
"fmt"
"github.com/Azure/go-asyncjob/graph"
)
// Interface for a job definition
type JobDefinitionMeta interface {
GetName() string
GetStep(stepName string) (StepDefinitionMeta, bool) // TODO: switch bool to error
Seal()
Sealed() bool
Visualize() (string, error)
// not exposing for now.
addStep(step StepDefinitionMeta, precedingSteps ...StepDefinitionMeta) error
getRootStep() StepDefinitionMeta
}
// JobDefinition defines a job with child steps, and step is organized in a Directed Acyclic Graph (DAG).
type JobDefinition[T any] struct {
name string
sealed bool
steps map[string]StepDefinitionMeta
stepsDag *graph.Graph[StepDefinitionMeta]
rootStep *StepDefinition[T]
}
// Create new JobDefinition
//
// it is suggest to build jobDefinition statically on process start, and reuse it for each job instance.
func NewJobDefinition[T any](name string) *JobDefinition[T] {
j := &JobDefinition[T]{
name: name,
steps: make(map[string]StepDefinitionMeta),
stepsDag: graph.NewGraph(connectStepDefinition),
}
rootStep := newStepDefinition[T](name, stepTypeRoot)
j.rootStep = rootStep
j.steps[j.rootStep.GetName()] = j.rootStep
j.stepsDag.AddNode(j.rootStep)
return j
}
// Start execution of the job definition.
//
// this will create and return new instance of the job
// caller will then be able to wait for the job instance
func (jd *JobDefinition[T]) Start(ctx context.Context, input T, jobOptions ...JobOptionPreparer) *JobInstance[T] {
if !jd.Sealed() {
jd.Seal()
}
ji := newJobInstance(jd, input, jobOptions...)
ji.start(ctx)
return ji
}
func (jd *JobDefinition[T]) getRootStep() StepDefinitionMeta {
return jd.rootStep
}
func (jd *JobDefinition[T]) GetName() string {
return jd.name
}
func (jd *JobDefinition[T]) Seal() {
if jd.sealed {
return
}
jd.sealed = true
}
func (jd *JobDefinition[T]) Sealed() bool {
return jd.sealed
}
// GetStep returns the stepDefinition by name
func (jd *JobDefinition[T]) GetStep(stepName string) (StepDefinitionMeta, bool) {
stepMeta, ok := jd.steps[stepName]
return stepMeta, ok
}
// AddStep adds a step to the job definition, with optional preceding steps
func (jd *JobDefinition[T]) addStep(step StepDefinitionMeta, precedingSteps ...StepDefinitionMeta) error {
jd.steps[step.GetName()] = step
jd.stepsDag.AddNode(step)
for _, precedingStep := range precedingSteps {
if err := jd.stepsDag.Connect(precedingStep, step); err != nil {
if errors.Is(err, graph.ErrConnectNotExistingNode) {
return ErrRefStepNotInJob.WithMessage(fmt.Sprintf("referenced step %s not found", precedingStep.GetName()))
}
return err
}
}
return nil
}
// Visualize the job definition in graphviz dot format
func (jd *JobDefinition[T]) Visualize() (string, error) {
return jd.stepsDag.ToDotGraph()
}