asyncjob helps organize code in graph of blocks instead of sequential blocks.
Перейти к файлу
dependabot[bot] 47884d2ee1
Bump the github-actions group across 1 directory with 4 updates (#119)
Bumps the github-actions group with 4 updates in the / directory: [actions/checkout](https://github.com/actions/checkout), [github/codeql-action](https://github.com/github/codeql-action), [actions/dependency-review-action](https://github.com/actions/dependency-review-action) and [actions/setup-go](https://github.com/actions/setup-go).


Updates `actions/checkout` from 4.2.1 to 4.2.2
- [Release notes](https://github.com/actions/checkout/releases)
- [Changelog](https://github.com/actions/checkout/blob/main/CHANGELOG.md)
- [Commits](eef61447b9...11bd71901b)

Updates `github/codeql-action` from 3.26.13 to 3.27.1
- [Release notes](https://github.com/github/codeql-action/releases)
- [Changelog](https://github.com/github/codeql-action/blob/main/CHANGELOG.md)
- [Commits](f779452ac5...4f3212b617)

Updates `actions/dependency-review-action` from 4.3.4 to 4.4.0
- [Release notes](https://github.com/actions/dependency-review-action/releases)
- [Commits](5a2ce3f5b9...4081bf99e2)

Updates `actions/setup-go` from 5.0.2 to 5.1.0
- [Release notes](https://github.com/actions/setup-go/releases)
- [Commits](0a12ed9d6a...41dfa10bad)

---
updated-dependencies:
- dependency-name: actions/checkout
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: github-actions
- dependency-name: github/codeql-action
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: github-actions
- dependency-name: actions/dependency-review-action
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: github-actions
- dependency-name: actions/setup-go
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: github-actions
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-11-15 15:19:17 -08:00
.github Bump the github-actions group across 1 directory with 4 updates (#119) 2024-11-15 15:19:17 -08:00
graph
media
.gitignore
.pre-commit-config.yaml
CODE_OF_CONDUCT.md
LICENSE
README.md
SECURITY.md
SUPPORT.md
error.go
go.mod Bump github.com/Azure/go-asynctask in the gomod group (#109) 2024-09-12 23:11:47 -07:00
go.sum Bump github.com/Azure/go-asynctask in the gomod group (#109) 2024-09-12 23:11:47 -07:00
job_definition.go
job_instance.go
job_result.go
job_test.go fix retry interface with tried count (#99) 2024-06-23 23:19:22 -07:00
retryer.go fix retry interface with tried count (#99) 2024-06-23 23:19:22 -07:00
step_builder.go
step_builder_test.go
step_definition.go
step_exec_data.go fix retry interface with tried count (#99) 2024-06-23 23:19:22 -07:00
step_exec_options.go fix retry interface with tried count (#99) 2024-06-23 23:19:22 -07:00
step_instance.go
test_joblib_test.go fix retry interface with tried count (#99) 2024-06-23 23:19:22 -07:00

README.md

AsyncJob

AsyncJob aiming to help you organize code in dependencyGraph(DAG), instead of a sequential chain.

Concepts

JobDefinition is a graph describe code blocks and their connections.

  • you can use AddStep, StepAfter, StepAfterBoth to organize steps in a JobDefinition.
  • jobDefinition can be and should be build and seal in package init time.
  • jobDefinition have a generic typed input
  • calling Start with the input, will instantiate an jobInstance, and steps will began to execute.
  • jobDefinition can be visualized using graphviz, easier for human to understand.

JobInstance is an instance of JobDefinition, after calling .Start() method from JobDefinition

  • all Steps on the definition will be copied to JobInstance.
  • each step will be executed once it's precedent step is done.
  • jobInstance can be visualized as well, instance visualize contains detailed info(startTime, duration) on each step.

StepDefinition is a individual code block which can be executed and have inputs, output.

  • StepDefinition describe it's preceding steps.
  • StepDefinition contains generic Params
  • ideally all stepMethod should come from JobInput (generic type on JobDefinition), or static method. To avoid shared state between jobs.
  • output of a step can be feed into next step as input, type is checked by go generics.

StepInstance is instance of StepDefinition

  • step is wrapped in AsyncTask
  • a step would be started once all it's dependency is finished.
  • executionPolicy can be applied {Retry, ContextEnrichment}

Usage

Build and run a asyncjob


// SqlSummaryAsyncJobDefinition is the job definition for the SqlSummaryJobLib
//   JobDefinition fit perfectly in init() function
var SqlSummaryAsyncJobDefinition *asyncjob.JobDefinitionWithResult[SqlSummaryJobLib, SummarizedResult]

func init() {
	var err error
	SqlSummaryAsyncJobDefinition, err = BuildJobWithResult(map[string]asyncjob.RetryPolicy{})
	if err != nil {
		panic(err)
	}

	SqlSummaryAsyncJobDefinition.Seal()
}

func BuildJob(retryPolicies map[string]asyncjob.RetryPolicy) (*asyncjob.JobDefinition[SqlSummaryJobLib], error) {
	job := asyncjob.NewJobDefinition[SqlSummaryJobLib]("sqlSummaryJob")

	connTsk, err := asyncjob.AddStep(job, "GetConnection", connectionStepFunc, asyncjob.WithRetry(retryPolicies["GetConnection"]), asyncjob.WithContextEnrichment(EnrichContext))
	if err != nil {
		return nil, fmt.Errorf("error adding step GetConnection: %w", err)
	}

	checkAuthTask, err := asyncjob.AddStep(job, "CheckAuth", checkAuthStepFunc, asyncjob.WithContextEnrichment(EnrichContext))
	if err != nil {
		return nil, fmt.Errorf("error adding step CheckAuth: %w", err)
	}

	table1ClientTsk, err := asyncjob.StepAfter(job, "GetTableClient1", connTsk, tableClient1StepFunc, asyncjob.WithContextEnrichment(EnrichContext))
	if err != nil {
		return nil, fmt.Errorf("error adding step GetTableClient1: %w", err)
	}

	qery1ResultTsk, err := asyncjob.StepAfter(job, "QueryTable1", table1ClientTsk, queryTable1StepFunc, asyncjob.WithRetry(retryPolicies["QueryTable1"]), asyncjob.ExecuteAfter(checkAuthTask), asyncjob.WithContextEnrichment(EnrichContext))
	if err != nil {
		return nil, fmt.Errorf("error adding step QueryTable1: %w", err)
	}

	table2ClientTsk, err := asyncjob.StepAfter(job, "GetTableClient2", connTsk, tableClient2StepFunc, asyncjob.WithContextEnrichment(EnrichContext))
	if err != nil {
		return nil, fmt.Errorf("error adding step GetTableClient2: %w", err)
	}

	qery2ResultTsk, err := asyncjob.StepAfter(job, "QueryTable2", table2ClientTsk, queryTable2StepFunc, asyncjob.WithRetry(retryPolicies["QueryTable2"]), asyncjob.ExecuteAfter(checkAuthTask), asyncjob.WithContextEnrichment(EnrichContext))
	if err != nil {
		return nil, fmt.Errorf("error adding step QueryTable2: %w", err)
	}

	summaryTsk, err := asyncjob.StepAfterBoth(job, "Summarize", qery1ResultTsk, qery2ResultTsk, summarizeQueryResultStepFunc, asyncjob.WithRetry(retryPolicies["Summarize"]), asyncjob.WithContextEnrichment(EnrichContext))
	if err != nil {
		return nil, fmt.Errorf("error adding step Summarize: %w", err)
	}

	_, err = asyncjob.AddStep(job, "EmailNotification", emailNotificationStepFunc, asyncjob.ExecuteAfter(summaryTsk), asyncjob.WithContextEnrichment(EnrichContext))
	if err != nil {
		return nil, fmt.Errorf("error adding step EmailNotification: %w", err)
	}
	return job, nil
}
	// execute job
	jobInstance1 := SqlSummaryAsyncJobDefinition.Start(ctx, &SqlSummaryJobLib{...})
	jobInstance2 := SqlSummaryAsyncJobDefinition.Start(ctx, &SqlSummaryJobLib{...})

    // ...

	jobInstance1.Wait(context.WithTimeout(context.Background(), 10*time.Second))
	jobInstance2.Wait(context.WithTimeout(context.Background(), 10*time.Second))

visualize of a job

	# visualize the job
	dotGraph := job.Visualize()
	fmt.Println(dotGraph)

visualize job graph

digraph {
	newrank = "true"
		"QueryTable2" [label="QueryTable2" shape=hexagon style=filled tooltip="State: completed\nStartAt: 2022-12-12T12:00:32.254054-08:00\nDuration: 13.207µs" fillcolor=green] 
		"QueryTable1" [label="QueryTable1" shape=hexagon style=filled tooltip="State: completed\nStartAt: 2022-12-12T12:00:32.254098-08:00\nDuration: 11.394µs" fillcolor=green] 
		"EmailNotification" [label="EmailNotification" shape=hexagon style=filled tooltip="State: completed\nStartAt: 2022-12-12T12:00:32.254143-08:00\nDuration: 11.757µs" fillcolor=green] 
		"sqlSummaryJob" [label="sqlSummaryJob" shape=triangle style=filled tooltip="State: completed\nStartAt: 0001-01-01T00:00:00Z\nDuration: 0s" fillcolor=green] 
		"GetConnection" [label="GetConnection" shape=hexagon style=filled tooltip="State: completed\nStartAt: 2022-12-12T12:00:32.253844-08:00\nDuration: 154.825µs" fillcolor=green] 
		"GetTableClient2" [label="GetTableClient2" shape=hexagon style=filled tooltip="State: completed\nStartAt: 2022-12-12T12:00:32.254017-08:00\nDuration: 25.793µs" fillcolor=green] 
		"GetTableClient1" [label="GetTableClient1" shape=hexagon style=filled tooltip="State: completed\nStartAt: 2022-12-12T12:00:32.254076-08:00\nDuration: 12.459µs" fillcolor=green] 
		"Summarize" [label="Summarize" shape=hexagon style=filled tooltip="State: completed\nStartAt: 2022-12-12T12:00:32.254121-08:00\nDuration: 7.88µs" fillcolor=green] 
		"CheckAuth" [label="CheckAuth" shape=hexagon style=filled tooltip="State: completed\nStartAt: 2022-12-12T12:00:32.253818-08:00\nDuration: 18.52µs" fillcolor=green] 

		"CheckAuth" -> "QueryTable2" [style=bold tooltip="Time: 2022-12-12T12:00:32.254054-08:00" color=green] 
		"CheckAuth" -> "QueryTable1" [style=bold tooltip="Time: 2022-12-12T12:00:32.254098-08:00" color=green] 
		"GetTableClient2" -> "QueryTable2" [style=bold tooltip="Time: 2022-12-12T12:00:32.254054-08:00" color=green] 
		"GetTableClient1" -> "QueryTable1" [style=bold tooltip="Time: 2022-12-12T12:00:32.254098-08:00" color=green] 
		"QueryTable1" -> "Summarize" [style=bold tooltip="Time: 2022-12-12T12:00:32.254121-08:00" color=green] 
		"QueryTable2" -> "Summarize" [style=bold tooltip="Time: 2022-12-12T12:00:32.254121-08:00" color=green] 
		"Summarize" -> "EmailNotification" [style=bold tooltip="Time: 2022-12-12T12:00:32.254143-08:00" color=green] 
		"sqlSummaryJob" -> "CheckAuth" [style=bold tooltip="Time: 2022-12-12T12:00:32.253818-08:00" color=green] 
		"sqlSummaryJob" -> "GetConnection" [style=bold tooltip="Time: 2022-12-12T12:00:32.253844-08:00" color=green] 
		"GetConnection" -> "GetTableClient2" [style=bold tooltip="Time: 2022-12-12T12:00:32.254017-08:00" color=green] 
		"GetConnection" -> "GetTableClient1" [style=bold tooltip="Time: 2022-12-12T12:00:32.254076-08:00" color=green] 
}

collect result from job

you can enrich job to aware result from given step, then you can collect result (strongly typed) from that step

var SqlSummaryAsyncJobDefinition *asyncjob.JobDefinitionWithResult[SqlSummaryJobLib, SummarizedResult]
SqlSummaryAsyncJobDefinition = asyncjob.JobWithResult(job /*from previous section*/, summaryTsk)

jobInstance1 := SqlSummaryAsyncJobDefinition.Start(ctx, &SqlSummaryJobLib{...})
result, err := jobInstance1.Result(ctx)

Overhead?

  • go routine will be created for each step in your jobDefinition, when you call .Start()
  • each step also hold tiny memory as well for state tracking.
  • userFunction is instrumented with state tracking, panic handling.

Here is some simple visualize on how it actual looks like:

gantt
    title       asyncjob.Start()
    dateFormat  HH:mm

    section GetConnection
    WaitPrecedingTasks            :des11, 00:00,0ms
    userFunction                  :des12, after des11, 20ms

    section GetTableClient1
    WaitPrecedingTasks            :des21, 00:00,20ms
    userFunction                  :des22, after des21, 15ms

    section GetTableClient2
    WaitPrecedingTasks            :des31, 00:00,20ms
    userFunction                  :des32, after des31, 21ms

    section QueryTable1
    WaitPrecedingTasks            :des41, 00:00,35ms
    userFunction                  :des42, after des41, 24ms

    section QueryTable2
    WaitPrecedingTasks            :des51, 00:00,41ms
    userFunction                  :des52, after des51, 30ms

    section QueryResultSummarize
    WaitPrecedingTasks            :des61, 00:00, 71ms
    userFunction                  :des62, after des61, 10ms