Merged PR 465: Experiment and Run task

Created the Kubeflow experiment and run task, and updated parts of the upload task and pipelines.
- All validations made and tested
- Creating a new run and/or new experiment works
- CI/CD pipelines updated for test coverage and for deploying both tasks
- Additional validations and UI changes made to upload task
- Read me updated

Related work items: #1649, #1655
This commit is contained in:
Ryan Zuber 2020-05-26 18:33:16 +00:00 коммит произвёл Sushant Divate
Родитель 89e38bff5d
Коммит ffec13bc88
28 изменённых файлов: 1784 добавлений и 326 удалений

Просмотреть файл

@ -0,0 +1,29 @@
## Kubeflow Task Build and Release Pipeline
This pipeline was created to run tests, and package and publish the extensions within this repository. For more information on any tasks used here and more, visit https://github.com/microsoft/azure-pipelines-tasks/tree/master/Tasks or https://github.com/microsoft/azure-devops-extension-tasks/tree/master/BuildTasks.
### Code Quality Stage Tasks
- **TFX Installer:** Ensures that the tfx cli is installed on the agent. This will allow you to query, package, and publish your extensions.
- **NPM:** This task runs twice, and installs the node dependencies for both tasks. This task can be used to run a variety of npm scripts. To use custom scripts, add scripts to the package.json file.
- **Bash:** Configured to compile Javascript files for both tasks. This task can be used for anything that can be done in your bash CLI.
- **NPM:** This task runs twice, and runs unit tests for both tasks.
- **Publish Test Results:** Publishes the test results regardless of pass or fail. For this to work, you will need to ensure that your test script includes some way of reporting the tests, such as outputting an xml file.
### Build Manifest Files
- **TFX Installer:** Ensures that the tfx cli is installed on the agent.
- **NPM:** This task runs twice, and installs the node dependencies for both tasks.
- **Bash:** Configured to compile Javascript files for both tasks.
- **Query Azure DevOps Extension Version:** Retrieves the current version of the run task from the marketplace. This task is used to get the version of whichever extension you need in the Visual Studio Marketplace: https://marketplace.visualstudio.com/.
- **Package Azure DevOps Extension:** Increments the version, and packages the run task into a .vsix file. This task is able to increase the version of your extension, and/or it is used to package your extension into a single manifest file.
- **Query Azure DevOps Extension Version:** Retrieves the current version of the upload task from the marketplace.
- **Package Azure DevOps Extension:** Increments the version, and packages the upload task into a .vsix file.
- **Copy Files:** Copies both packaged extensions into the Build.ArtifactStagingDirectory. This task is used to copy any files currently available in the repository, to anywhere else in the repository, or variable directories. All predifined pipeline variables can be found here: https://docs.microsoft.com/en-us/azure/devops/pipelines/build/variables?view=azure-devops&tabs=yaml.
- **Publish Build Artifacts:** Publish both extension artifacts together as ExtensionFiles. This task allows you to publish files into artifacts that can be used by other pipelines, other pipeline stages, or manually by you. If there are multiple files that need to be published, they can all be published in a new folder created by this task.
### Publish Extensions
- **TFX Installer:** Ensures that the tfx cli is installed on the agent.
- **Download Build Artifact:** Downloads the ExtensionFiles artifact. This task can get build artifacts from any other pipeline or pipeline stage, and make those artifacts available for use in the current pipeline or pipeline stage that it is run in.
- **Publish Azure DevOps Extension:** Publishes the extensions on the marketplace with the incremented version. This task uses an extension manifest file to publish your extension on the Visual Studio Marketplace. The version of the new manifest file must be higher than the current version of the extension on the marketplace for this task to work.

Просмотреть файл

@ -0,0 +1,185 @@
#Pipeline for building and publishing the Kubeflow tasks
trigger: none
# - master
pool:
vmImage: 'ubuntu-latest'
variables:
productionPublisher: 'CSE-DevOps'
uploadExtensionId: 'UploadKubeflowPipeline'
runExtensionId: 'CreateExperimentRun'
visibility: 'private'
stages:
- stage:
displayName: Code Quality
jobs:
- job:
steps:
- task: TfxInstaller@2
inputs:
version: 'v0.6.x'
- task: Npm@1
displayName: 'Installing Dependencies required by Kubeflow Upload Task'
inputs:
command: 'install'
workingDir: 'src/Tasks/Upload_Pipeline'
- task: Npm@1
displayName: 'Installing Dependencies required by Kubeflow Run Task'
inputs:
command: 'install'
workingDir: 'src/Tasks/Create_Run_Experiments'
- task: Bash@3
displayName: Compile Javascript
inputs:
targetType: 'inline'
script: |
cd src/Tasks/Upload_Pipeline
tsc
cd ..
cd Create_Run_Experiments
tsc
- task: Npm@1
displayName: Running Kubeflow Upload Unit Tests
inputs:
command: 'custom'
workingDir: 'src/Tasks/Upload_Pipeline'
customCommand: 'test'
- task: Npm@1
displayName: Running Kubeflow Run Unit Tests
inputs:
command: 'custom'
workingDir: 'src/Tasks/Create_Run_Experiments'
customCommand: 'test'
- task: PublishTestResults@2
displayName: "Publish unit test result"
inputs:
testResultsFormat: 'JUnit'
testResultsFiles: |
**/UploadTestResults.xml
**/RunTestResults.xml
mergeTestResults: true
- stage: Build_Manifest_Files
displayName: Build Manifest Files
jobs:
- job:
steps:
- task: TfxInstaller@2
inputs:
version: 'v0.6.x'
- task: Npm@1
displayName: 'Installing Dependencies required by Kubeflow Upload Task'
inputs:
command: 'install'
workingDir: 'src/Tasks/Upload_Pipeline'
- task: Npm@1
displayName: 'Installing Dependencies required by Kubeflow Run Task'
inputs:
command: 'install'
workingDir: 'src/Tasks/Create_Run_Experiments'
- task: Bash@3
displayName: Compile Javascript
inputs:
targetType: 'inline'
script: |
cd src/Tasks/Upload_Pipeline
tsc
cd ..
cd Create_Run_Experiments
tsc
- task: QueryAzureDevOpsExtensionVersion@2
displayName: CSE-DevOps Get Existing Kubeflow Run Patch Number
inputs:
connectTo: 'VsTeam'
connectedServiceName: 'KubeflowExtensionBuildAndPublish'
publisherId: '$(productionPublisher)'
extensionId: '$(runExtensionId)'
versionAction: 'Patch'
outputVariable: 'Run.Extension.Version'
- task: PackageAzureDevOpsExtension@2
displayName: CSE-DevOps Package Kubeflow Run Extension
inputs:
rootFolder: '$(System.DefaultWorkingDirectory)'
patternManifest: 'vss-extension-Run.json'
publisherId: '$(productionPublisher)'
extensionId: '$(runExtensionId)'
extensionName: 'Kubeflow Experiment and Run'
extensionVersion: '$(Run.Extension.Version)'
updateTasksVersion: true
updateTasksVersionType: 'patch'
extensionVisibility: 'private'
extensionPricing: 'free'
- task: QueryAzureDevOpsExtensionVersion@2
displayName: CSE-DevOps Get Existing Kubeflow Upload Patch Number
inputs:
connectTo: 'VsTeam'
connectedServiceName: 'KubeflowExtensionBuildAndPublish'
publisherId: '$(productionPublisher)'
extensionId: '$(uploadExtensionId)'
versionAction: 'Patch'
outputVariable: 'Upload.Extension.Version'
- task: PackageAzureDevOpsExtension@2
displayName: CSE-DevOps Package Kubeflow Upload Extension
inputs:
rootFolder: '$(System.DefaultWorkingDirectory)'
patternManifest: 'vss-extension-Upload.json'
publisherId: '$(productionPublisher)'
extensionId: '$(uploadExtensionId)'
extensionName: 'Kubeflow Upload Pipeline'
extensionVersion: '$(Upload.Extension.Version)'
updateTasksVersion: true
updateTasksVersionType: 'patch'
extensionVisibility: 'private'
extensionPricing: 'free'
- task: CopyFiles@2
displayName: 'Copy Files to: $(Build.ArtifactStagingDirectory)'
inputs:
Contents: '**/*.vsix'
TargetFolder: '$(Build.ArtifactStagingDirectory)'
- task: PublishBuildArtifacts@1
inputs:
PathtoPublish: '$(Build.ArtifactStagingDirectory)'
ArtifactName: 'ExtensionFiles'
publishLocation: 'Container'
- stage: Publish_Extensions
displayName: Publish Extensions
jobs:
- job:
steps:
- task: TfxInstaller@2
inputs:
version: 'v0.6.x'
- task: DownloadBuildArtifacts@0
inputs:
buildType: 'current'
downloadType: 'single'
artifactName: 'ExtensionFiles'
downloadPath: '$(System.DefaultWorkingDirectory)'
- task: PublishAzureDevOpsExtension@2
inputs:
connectTo: 'VsTeam'
connectedServiceName: 'KubeflowExtensionBuildAndPublish'
fileType: 'vsix'
vsixFile: '$(System.DefaultWorkingDirectory)/ExtensionFiles/CSE-DevOps.UploadKubeflowPipeline-*.vsix'
publisherId: 'CSE-DevOps'
extensionId: 'UploadKubeflowPipeline'
extensionName: 'Kubeflow Upload Pipeline'
updateTasksVersion: false
extensionVisibility: '$(visibility)'
extensionPricing: 'free'
- task: PublishAzureDevOpsExtension@2
inputs:
connectTo: 'VsTeam'
connectedServiceName: 'KubeflowExtensionBuildAndPublish'
fileType: 'vsix'
vsixFile: '$(System.DefaultWorkingDirectory)/ExtensionFiles/CSE-DevOps.CreateExperimentRun-*.vsix'
publisherId: 'CSE-DevOps'
extensionId: 'CreateExperimentRun'
extensionName: 'Kubeflow Experiment and Run'
updateTasksVersion: false
extensionVisibility: '$(visibility)'
extensionPricing: 'free'

Просмотреть файл

@ -6,10 +6,20 @@ pool:
vmImage: 'ubuntu-latest'
steps:
- task: UploadKubeflowPipeline@0
- task: KubeflowUploadPipeline@0
inputs:
kubeflowEndpoint: 'http://52.149.247.172/'
kubeflowEndpoint: 'http://52.149.62.186/'
kubeflowPipelineTask: 'uploadNewVersion'
pipelineFilePath: '$(System.DefaultWorkingDirectory)/src/Tasks/Upload_Pipeline/Tests/pipeline.py.tar.gz'
existingPipelineName: 'testPipeline'
versionName: 'finalization'
existingPipelineName: 'azdoTestPipeline'
versionName: 'azdoTestV5'
- task: KubeflowExperimentRun@0
inputs:
kubeflowEndpoint: 'http://52.149.62.186/'
pipeline: 'azdoTestPipeline'
useDefaultVersion: false
pipelineVersion: 'azdoTestV5'
runName: 'newRun'
pipelineParams: '[{"name":"resource_group", "value":"kubeflow-integration-rg"}, {"name":"workspace", "value":"kubeflow-integration-aml"}]'
experiment: 'useExistingExperiment'
experimentName: 'azdoTestExperiment'

Просмотреть файл

@ -1,124 +0,0 @@
#Pipeline for building the task and publishing the extension
trigger: none
# - master
pool:
vmImage: 'ubuntu-latest'
variables:
stagingPublisher: 'v-ryzube'
productionPublisher: 'CSE-DevOps'
extensionId: 'UploadKubeflowPipeline'
jobs:
- job: BuildAndPackage
displayName: Build and Package Extension
steps:
- task: UseNode@1
inputs:
version: '12.16.0'
checkLatest: false
- task: Npm@1
displayName: 'Installing Dependencies required by Upload Task'
inputs:
command: 'install'
workingDir: 'src/Tasks/Upload_Pipeline'
# - task: Npm@1
# displayName: 'Installing Dependencies required by Create Task'
# inputs:
# command: 'install'
# workingDir: 'src/Tasks/Create_Run_Experiments'
- task: Bash@3
inputs:
targetType: 'inline'
script: |
cd src/Tasks/Upload_Pipeline
tsc
# cd ..
# cd Create_Run_Experiments
# tsc
- task: Npm@1
displayName: Running Unit Tests
inputs:
command: 'custom'
workingDir: 'src/Tasks/Upload_Pipeline'
customCommand: 'test'
# - task: Npm@1
# displayName: Running Unit Tests
# inputs:
# command: 'custom'
# workingDir: 'src/Tasks/Create_Run_Experiments'
# customCommand: 'test'
- task: PublishTestResults@2
displayName: "Publish unit test result"
inputs:
testResultsFiles: "unit_test_report.xml"
testRunTitle: "Unit tests with Mocha"
mergeTestResults: false
searchFolder: "$(Build.SourcesDirectory)"
# - task: QueryAzureDevOpsExtensionVersion@2
# displayName: Private - Get Existing Patch Number Preview
# inputs:
# connectTo: 'VsTeam'
# connectedServiceName: 'KubeflowExtensionBuildAndPublish'
# publisherId: '$(stagingPublisher)'
# extensionId: '$(extensionId)'
# versionAction: 'Patch'
# outputVariable: 'Private.Extension.Version'
# - task: PackageAzureDevOpsExtension@2
# inputs:
# rootFolder: '$(System.DefaultWorkingDirectory)'
# publisherId: '$(stagingPublisher)'
# extensionId: '$(extensionId)'
# extensionName: 'Kubeflow Upload Pipeline'
# extensionVersion: '$(Private.Extension.Version)'
# updateTasksVersion: true
# updateTasksVersionType: 'patch'
# extensionVisibility: 'private'
# extensionPricing: 'free'
- task: QueryAzureDevOpsExtensionVersion@2
displayName: CSE-DevOps Get Existing Patch Number
inputs:
connectTo: 'VsTeam'
connectedServiceName: 'KubeflowExtensionBuildAndPublish'
publisherId: '$(productionPublisher)'
extensionId: '$(extensionId)'
versionAction: 'Patch'
outputVariable: 'Public.Extension.Version'
- task: PackageAzureDevOpsExtension@2
displayName: CSE-DevOps package extension
inputs:
rootFolder: '$(System.DefaultWorkingDirectory)'
publisherId: '$(productionPublisher)'
extensionId: '$(extensionId)'
extensionName: 'Kubeflow Upload Pipeline'
extensionVersion: '$(Public.Extension.Version)'
updateTasksVersion: true
updateTasksVersionType: 'patch'
extensionVisibility: 'private'
extensionPricing: 'free'
- task: CopyFiles@2
displayName: 'Copy Files to: $(Build.ArtifactStagingDirectory)'
inputs:
Contents: '**/*.vsix'
TargetFolder: '$(Build.ArtifactStagingDirectory)'
- task: PublishPipelineArtifact@1
inputs:
targetPath: '$(Build.ArtifactStagingDirectory)'
artifact: 'ExtensionFiles'
publishLocation: 'pipeline'

Просмотреть файл

@ -1,10 +1,50 @@
# Kubeflow Pipeline Uploader
# Kubeflow Tasks for Azure Pipelines
<p>This task will allow you to upload an ML pipeline to your Kubeflow workspace. The task utilizes the Kubeflow API to validate user input and upload new pipelines to Kubeflow.</p>
## Setup
# Fields
<p>In order to use these extensions for working with Kubeflow, you will need to install them to your Azure Devops organization from the Visual Studio Marketplace. The marketplace can be found here: https://marketplace.visualstudio.com/.
The other thing that you will need, is a Kubeflow workspace. More informaion on creating a workspace can be found here: https://www.kubeflow.org/docs/started/getting-started/.</p>
## Kubeflow Upload Pipeline
<p>This task will allow you to upload an ML pipeline to your Kubeflow workspace. The task utilizes the Kubeflow API to validate user input and upload new pipelines and versions to Kubeflow.</p>
### Fields
- **Kubeflow Endpoint:** This is the base url of your Kubeflow workspace in *http://your_URL_here/* format.
- **Bearer Token:** The bearer token is used only if you have bearer authentication on your Kubeflow workspace. If you do not include this field, and have bearer authentication, then this task will not be able to access your workspace.
- **Kubeflow Pipeline Task:** This field allows you to choose which task you would like to perform. Currently the only option is to upload a new Kubeflow pipeline.
- **Pipeline Path:** This is the path that your .tar.gz pipeline file is located in your repository. The maximum file size is 32MB.
- **Pipeline Name:** This is the name that you would like to give the new pipeline.
- **Kubeflow Pipeline Task:** This field allows you to choose which task you would like to perform. You can either upload a pipeline, or a version of a pipeline.
- **Pipeline Path:** The path that your .tar.gz pipeline file is located in your repository. The maximum file size is 32MB.
#### New Pipeline Specific
- **New Pipeline Name:** The name that you would like to give the new pipeline, if you choose to make a new pipeline. Must be unique.
#### New Pipeline Version Specific
- **Existing Pipeline Name:** The name of the pipeline you would like to make a new version of.
- **Version Name:** The name that you would like to give the new pipeline versoin, if you choose to make a new version. Must be unique.
### Outputs
- **kf_pipeline_id:** The ID of the pipeline, whether the newly created, or pre-existing.
- **kf_pipeline_name:** The name of the pipeline, whether the newly created, or pre-existing.
- **kf_version_id:** The ID of the newly created pipeline version.
- **kf_version_name:** The name of the newly created pipeline version.
## Kubeflow Experiment and Run
<p>This task is used for creating and monitoring a new run. It also allows you to create a new experiment, if you do not want to use an existing one. This task utilizes the Kubeflow API to validate user input and create new runs and experiments on Kubeflow.</p>
### Fields
- **Kubeflow Endpoint:** The base url of your Kubeflow workspace in *http://your_URL_here/* format.
- **Bearer Token:** The bearer token is used only if you have bearer authentication on your Kubeflow workspace. If you do not include this field, and have bearer authentication, then this task will not be able to access your workspace.
- **Pipeline:** The name of the pipeline that you would like to use for your run.
- **Pipeline Version:** The name of the version of the pipeline that you would like to use for your run.
- **Run Name:** The name of your new run. Does not have to be unique.
- **Pipeline Params:** The parameters you would like to pass the pipeline in JSON object array format *[{"name":"resource_group", "value":"kubeflow-integration-rg"}, {"name":"workspace", "value":"kubeflow-integration-aml"}]*. Both resource_group and workspace are required, but other fields can be added.
- **Description:** This field is optional. Provides a description for your run.
- **Wait for run to complete:** If checked, this field will allow the task to monitor the run until completion. It will update the status every 15 seconds.
- **Experiment:** This field allows you to either create a new experiment or use an existing experiment.
- **Experiment Name:** The name of the experiment you would like your run to use. If the experiment field is set to create a new experiment, this name will need to be unique.
- **Description:** The optional description of a new experiment. Does not apply to existing experiments.
### Outputs
- **kf_pipeline_id:** The ID of the pipeline being used.
- **kf_version_id:** The ID of the pipeline version being used.
- **kf_experiment_id:** The ID of the experiment, whether newly created, or pre-existing.
- **kf_run_id:** The ID of the new run.
- **kf_run_status:** The end status of the run. Only available if you choose to wait for the run to complete.

Двоичные данные
images/uploadIcon.png

Двоичный файл не отображается.

До

Ширина:  |  Высота:  |  Размер: 7.2 KiB

После

Ширина:  |  Высота:  |  Размер: 5.2 KiB

Просмотреть файл

@ -0,0 +1,81 @@
import path = require("path");
import fs = require("fs");
import task = require("azure-pipelines-task-lib/task");
import * as rest from "typed-rest-client";
import{request, OutgoingHttpHeaders} from "http";
import FormData from "form-data";
import { IAllExperiment } from "../operations/interfaces";
export class ExperimentMock {
public endpointUrl: string;
public name: string;
public description: string;
public getAllExperimentsEndpoint: string;
public restAPIClient: rest.RestClient;
private bearerToken: string;
constructor(endpointUrl: string, name: string, description?: string, bearerToken?: string) {
this.endpointUrl = endpointUrl;
this.name = name;
this.description = description!;
this.getAllExperimentsEndpoint = 'pipeline/apis/v1beta1/experiments';
this.restAPIClient = new rest.RestClient('agent');
this.bearerToken = bearerToken!;
}
public async validateEndpointUrl() {
try {
var options: rest.IRequestOptions = {additionalHeaders: {'authorization': `Bearer ${this.bearerToken}`}};
var req = await this.restAPIClient.get(this.endpointUrl, options);
if(req.statusCode == 200) {
return true;
}
return false;
}
catch(error) {
return false;
}
}
public async validateName() {
try {
var url = `${this.endpointUrl}${this.getAllExperimentsEndpoint}`;
var options: rest.IRequestOptions = {additionalHeaders: {'authorization': `Bearer ${this.bearerToken}`}};
var webRequest = await this.restAPIClient.get<IAllExperiment>(url, options)!;
if(webRequest.result != null) {
if(webRequest.result.experiments != undefined){
for(var exp of webRequest.result.experiments) {
if(exp.name == this.name) {
return false;
}
}
return true;
}
else {
return true;
}
}
else {
return false;
}
}
catch(error) {
return false;
}
}
public async runValidations() {
try {
if(!await this.validateEndpointUrl()) {
return false;
}
if(!await this.validateName()) {
return false;
}
return true;
}
catch(error) {
return false;
}
}
}

Просмотреть файл

@ -0,0 +1,45 @@
import { ExperimentMock } from "./ExperimentMock";
import 'mocha';
import { async } from "q";
var assert = require('assert');
var fs = require('fs');
var Exp = new ExperimentMock('http://52.149.62.186/', 'RandomUnusedName');
describe('Run all validations pass', async function() {
it('should return true saying validations passed', async function() {
assert.equal(await Exp.runValidations(), true);
Promise.resolve();
});
});
describe('Validate experiment name', async function() {
it('should return true saying that the name is unique', async function() {
assert.equal(await Exp.validateName(), true);
Promise.resolve();
});
it('should return false saying that the name is not unique', async function() {
Exp.name = 'testExperiment';
assert.equal(await Exp.validateName(), false);
Promise.resolve();
});
});
describe('Validate endpoint url', async function() {
it('should return true saying that the url exists', async function() {
assert.equal(await Exp.validateEndpointUrl(), true);
Promise.resolve();
});
it('should return false saying that the url does not exist', async function() {
Exp.endpointUrl = 'not a valid endpoint';
assert.equal(await Exp.validateEndpointUrl(), false);
Promise.resolve();
});
});
describe('Run all validations fail', async function() {
it('should return false saying validations failed', async function() {
assert.equal(await Exp.runValidations(), false);
Promise.resolve();
});
});

Просмотреть файл

@ -0,0 +1,229 @@
import path = require("path");
import fs = require("fs");
import task = require("azure-pipelines-task-lib/task");
import { IExecSyncResult } from "azure-pipelines-task-lib/toolrunner";
import * as rest from "typed-rest-client";
import * as HttpC from "typed-rest-client/HttpClient";
import{request, OutgoingHttpHeaders} from "http";
import FormData from "form-data";
import { IAllPipeline, IAllPipelineVersion, IAllExperiment } from "../operations/interfaces";
export class RunMock {
public endpointUrl: string;
public runName: string;
public pipeline: string;
public pipelineVersion: string;
public pipelineParams: string;
public description: string;
// public waitForRunToFinish: boolean;
public experiment: string;
public experimentName: string;
public runType: string;
public getAllRunsEndpoint: string;
public getAllPipelinesEndpoint: string;
public getAllVersionsEndpoint: string;
public getAllExperimentsEndpoint: string;
public pipelineID: string;
public pipelineVersionID: string;
public experimentID: string;
public runID: string;
public restAPIClient: rest.RestClient;
private bearerToken: string;
constructor(endpointUrl: string, runName: string, pipeline: string, pipelineVersion: string, pipelineParams: string,
experiment: string, experimentName: string, description?: string, bearerToken?: string) {
this.endpointUrl = endpointUrl;
this.runName = runName;
this.pipeline = pipeline;
this.pipelineVersion = pipelineVersion;
this.pipelineParams = pipelineParams;
this.experiment = experiment;
this.experimentName = experimentName;
this.description = description!;
this.bearerToken = bearerToken!;
this.runType = 'One-Off';
this.getAllRunsEndpoint = 'pipeline/apis/v1beta1/runs';
this.getAllPipelinesEndpoint = 'pipeline/apis/v1beta1/pipelines';
this.getAllVersionsEndpoint = 'pipeline/apis/v1beta1/pipeline_versions';
this.getAllExperimentsEndpoint = 'pipeline/apis/v1beta1/experiments';
this.pipelineID = '';
this.pipelineVersionID = '';
this.experimentID = '';
this.runID = '';
this.restAPIClient = new rest.RestClient('agent');
}
public async validateEndpointUrl() {
try {
var options: rest.IRequestOptions = {additionalHeaders: {'authorization': `Bearer ${this.bearerToken}`}};
var req = await this.restAPIClient.get(this.endpointUrl, options);
if(req.statusCode == 200) {
return true;
}
return false;
}
catch(error) {
return false;
}
}
public async validatePipeline() {
try {
var pipelineID = await this.getPipelineID();
if(pipelineID != 'Not a valid id.' && pipelineID != '') {
this.pipelineID = pipelineID;
return true;
}
else{
return false;
}
}
catch(error) {
return false;
}
}
public async validatePipelineVersion() {
try {
var versionID = await this.getPipelineVersionID();
console.log(versionID);
if(versionID != 'Not a valid id.' && versionID != '') {
this.pipelineVersionID = versionID;
return true;
}
else {
return false;
}
}
catch(error) {
return false;
}
}
public async validateExperimentName() {
try {
if(this.experiment == 'createNewExperiment') {
this.experimentID = await this.getExperimentID();
return true;
}
else {
var experimentID = await this.getExperimentID();
if(experimentID != 'Not a valid id.' && experimentID != '') {
this.experimentID = experimentID;
return true;
}
else {
return false;
}
}
}
catch(error) {
return false;
}
}
public async validatePipelineParams() {
try {
if(this.pipelineParams == '') {
return true;
}
JSON.parse(this.pipelineParams);
return true;
}
catch(error) {
return false;
}
}
public async runValidations() {
try {
if(!await this.validateEndpointUrl) {
return false;
}
if(!await this.validatePipeline()) {
return false;
}
if(!await this.validatePipelineVersion()) {
return false;
}
if(!await this.validateExperimentName()) {
return false;
}
if(!await this.validatePipelineParams()) {
return false;
}
return true;
}
catch(error) {
return false;
}
}
public async getPipelineID(): Promise<string> {
try {
var url = `${this.endpointUrl}${this.getAllPipelinesEndpoint}?filter={"predicates":[{"key":"name","op":"EQUALS","string_value":"${this.pipeline}"}]}`;
url = encodeURI(url);
var options: rest.IRequestOptions = {additionalHeaders: {'authorization': `Bearer ${this.bearerToken}`}};
var webRequest = await this.restAPIClient.get<IAllPipeline>(url, options)!;
if(webRequest.result != null) {
if(webRequest.result.pipelines[0].id != undefined) {
return webRequest.result.pipelines[0].id;
}
return 'Not a valid id.';
}
return 'Not a valid id.';
}
catch(error) {
return 'Not a valid id.';
}
}
public async getPipelineVersionID(): Promise<string> {
try {
var url = `${this.endpointUrl}${this.getAllVersionsEndpoint}?resource_key.type=PIPELINE&resource_key.id=${this.pipelineID}&filter={"predicates":[{"key":"name","op":"EQUALS","string_value":"${this.pipelineVersion}"}]}`;
console.log(url);
url = encodeURI(url);
console.log(url);
var options: rest.IRequestOptions = {additionalHeaders: {'authorization': `Bearer ${this.bearerToken}`}};
var webRequest = await this.restAPIClient.get<IAllPipelineVersion>(url, options)!;
if(webRequest.result != null) {
var versions = webRequest.result.versions;
if(versions != undefined) {
for(var i = 0; i < versions.length; i++) {
if(versions[i].name == this.pipelineVersion) {
console.log(versions[i].name);
console.log(this.pipelineVersion);
return versions[i].id;
}
}
return 'Not a valid id.';
}
return 'Not a valid id.';
}
return 'Not a valid id.';
}
catch(error) {
return 'Not a valid id.';
}
}
public async getExperimentID(): Promise<string> {
try {
var url = `${this.endpointUrl}${this.getAllExperimentsEndpoint}?filter={"predicates":[{"key":"name","op":"EQUALS","string_value":"${this.experimentName}"}]}`;
url = encodeURI(url);
var options: rest.IRequestOptions = {additionalHeaders: {'authorization': `Bearer ${this.bearerToken}`}};
var webRequest = await this.restAPIClient.get<IAllExperiment>(url, options)!;
if(webRequest.result != null) {
var experiments = webRequest.result.experiments;
if(experiments[0].id != undefined) {
return experiments[0].id;
}
return 'Not a valid id.';
}
return 'Not a valid id.';
}
catch(error) {
return 'Not a valid id.';
}
}
}

Просмотреть файл

@ -0,0 +1,89 @@
import { RunMock } from "./RunMock";
import 'mocha';
import { async } from "q";
var assert = require('assert');
var fs = require('fs');
var Run = new RunMock('http://52.149.62.186/', 'newRun', 'testPipeline', 'testPipeline',
'[{"name":"resource_group", "value":"kubeflow-integration-rg"}, {"name":"workspace", "value":"kubeflow-integration-aml"}]',
'useExistingExperiment', 'testExperiment');
describe('All validations pass', async function() {
it('should return true saying that validations have passed', async function() {
assert.equal(await Run.runValidations(), true);
Promise.resolve();
});
});
describe('Validate existing pipeline name', async function() {
it('should fail saying that the pipeline name does not exist', async function() {
Run.pipeline = 'not an existing pipeline';
assert.equal(await Run.validatePipeline(), false);
Promise.resolve();
});
it('should pass saying that the pipeline name exists', async function() {
Run.pipeline = 'testPipeline';
assert.equal(await Run.validatePipeline(), true);
Promise.resolve();
});
});
describe('Validate existing pipeline version name', async function() {
it('should pass saying that the pipeline version name exists', async function() {
assert.equal(await Run.validatePipelineVersion(), true);
Promise.resolve();
});
it('should fail saying that the pipeline version name does not exist', async function() {
Run.pipelineVersion = 'versionDoesNotExist';
assert.equal(await Run.validatePipelineVersion(), false);
Promise.resolve();
});
});
describe('Validate existing experiment name', async function() {
it('should pass saying that the experiment name exists', async function() {
assert.equal(await Run.validateExperimentName(), true);
Promise.resolve();
});
it('should fail saying that the experiment name does not exist', async function() {
Run.experimentName = 'not an existing experiment';
assert.equal(await Run.validateExperimentName(), false);
Promise.resolve();
});
});
describe('Validate pipeline parameters', async function() {
it('should pass saying that the pipeline parameters are valid', async function() {
assert.equal(await Run.validatePipelineParams(), true);
Promise.resolve();
});
it('should pass saying that the empty pipeline parameters are valid', async function() {
Run.pipelineParams = '';
assert.equal(await Run.validatePipelineParams(), true);
Promise.resolve();
});
it('should fail saying that the pipeline parameters are not valid', async function() {
Run.pipelineParams = 'these are not valid pipeline parameters at all, but they are of proper length';
assert.equal(await Run.validatePipelineParams(), false);
Promise.resolve();
});
});
describe('Validate endoint url', async function() {
it('should pass saying that the endpoint url is valid', async function() {
assert.equal(await Run.validateEndpointUrl(), true);
Promise.resolve();
});
it('should fail saying that the endpoint url is not valid', async function() {
Run.endpointUrl = 'not a valid endpoint';
assert.equal(await Run.validateEndpointUrl(), false);
Promise.resolve();
});
});
describe('All validations fail', async function() {
it('should return false saying that validations have failed', async function() {
assert.equal(await Run.runValidations(), false);
Promise.resolve();
});
});

Двоичные данные
src/Tasks/Create_Run_Experiments/icon.png Normal file

Двоичный файл не отображается.

После

Ширина:  |  Высота:  |  Размер: 5.2 KiB

Просмотреть файл

@ -0,0 +1,28 @@
import path = require("path");
import fs = require("fs");
import task = require("azure-pipelines-task-lib/task");
import { Experiment } from "./operations/Experiment";
import { Run } from "./operations/Run";
async function run() {
try {
var RUN = new Run();
if(RUN.experiment == 'createNewExperiment') {
var EXP = new Experiment();
if(await EXP.runValidations()) {
await EXP.createExperiment();
}
}
if(await RUN.runValidations()) {
await RUN.createRun();
if(RUN.waitForRunToFinish == true) {
await RUN.monitorRun();
}
}
}
catch(error) {
task.setResult(task.TaskResult.Failed, error.message);
}
}
run();

Просмотреть файл

@ -0,0 +1,128 @@
import path = require("path");
import fs = require("fs");
import task = require("azure-pipelines-task-lib/task");
import { IExecSyncResult } from "azure-pipelines-task-lib/toolrunner";
import * as rest from "typed-rest-client";
import{request, OutgoingHttpHeaders} from "http";
import FormData from "form-data";
import { IAllExperiment } from "./interfaces"
export class Experiment {
public endpointUrl: string;
public name: string;
public description: string;
public getAllExperimentsEndpoint: string;
public restAPIClient: rest.RestClient;
private bearerToken: string;
constructor() {
this.endpointUrl = task.getInput('kubeflowEndpoint', true)!;
this.name = task.getInput('experimentName', true)!;
this.description = task.getInput('experimentDescription', false)!;
this.getAllExperimentsEndpoint = 'pipeline/apis/v1beta1/experiments';
this.restAPIClient = new rest.RestClient('agent');
this.bearerToken = task.getInput('bearerToken', false)!;
}
public async validateEndpointUrl() {
try {
var options: rest.IRequestOptions = {additionalHeaders: {'authorization': `Bearer ${this.bearerToken}`}};
var req = await this.restAPIClient.get(this.endpointUrl, options);
if(req.statusCode == 200) {
return true;
}
return false;
}
catch(error) {
task.setResult(task.TaskResult.Failed, error.message);
}
}
public async validateName() {
try {
var url = `${this.endpointUrl}${this.getAllExperimentsEndpoint}`;
var options: rest.IRequestOptions = {additionalHeaders: {'authorization': `Bearer ${this.bearerToken}`}};
var webRequest = await this.restAPIClient.get<IAllExperiment>(url, options)!;
if(webRequest.result != null) {
if(webRequest.result.experiments != undefined){
for(var exp of webRequest.result.experiments) {
if(exp.name == this.name) {
return false;
}
}
return true;
}
else {
return true;
}
}
else {
throw new Error('Request did not go through. Make sure your Url is valid, and that you have the correct bearer token, if needed.');
}
}
catch(error) {
task.setResult(task.TaskResult.Failed, error.message);
}
}
public async runValidations() {
try {
if(!await this.validateEndpointUrl()) {
throw new Error('Endpoint Url must be a valid Url.')
}
if(!await this.validateName()) {
throw new Error('Experiment name field is either empty, or experiment name is already in use.');
}
return true;
}
catch(error) {
task.setResult(task.TaskResult.Failed, error.message);
}
}
//The payload that posting a new experiment takes follows this format as a string: {name: string, description: string}
public async createExperiment() {
try {
if(this.description == undefined || this.description == null) {
var form: string = JSON.stringify({"name": this.name});
}
else {
var form: string = JSON.stringify({"name": this.name, "description": this.description});
}
var reqHost = this.endpointUrl.substring(7, this.endpointUrl.length - 1);
var reqHeaders = {
'authorization': `Bearer ${this.bearerToken}`,
'content-type': 'application/json'
}
await this.postRequest(reqHost, form, reqHeaders);
}
catch(error) {
task.setResult(task.TaskResult.Failed, error.message);
}
}
public async postRequest(reqHost: string, form: string, reqHeaders: OutgoingHttpHeaders) {
var req = request(
{
host: reqHost,
path: `/${this.getAllExperimentsEndpoint}`,
method: 'POST',
headers: reqHeaders,
},
response => {
try {
response.on('data', d => {
process.stdout.write(d);
})
console.log(`Response returned with status code ${response.statusCode}: ${response.statusMessage}`);
}
catch(error) {
task.setResult(task.TaskResult.Failed, `${error.message} Make sure that your endpoint is correct, and that you are using the correct bearer token, if neccessary.`);
}
}
);
req.write(form);
req.end();
}
}

Просмотреть файл

@ -0,0 +1,387 @@
import path = require("path");
import fs = require("fs");
import task = require("azure-pipelines-task-lib/task");
import { IExecSyncResult } from "azure-pipelines-task-lib/toolrunner";
import * as rest from "typed-rest-client";
import{request, OutgoingHttpHeaders} from "http";
import FormData from "form-data";
import { IAllRun, IAllPipeline, IAllExperiment, IAllPipelineVersion, ISingleRun } from "./interfaces"
export class Run {
public endpointUrl: string;
public runName: string;
public pipeline: string;
public useDefaultVersion: boolean;
public pipelineVersion: string;
public pipelineParams: string;
public description: string;
public waitForRunToFinish: boolean;
public experiment: string;
public experimentName: string;
public runType: string;
public getAllRunsEndpoint: string;
public getAllPipelinesEndpoint: string;
public getAllVersionsEndpoint: string;
public getAllExperimentsEndpoint: string;
public pipelineID: string;
public pipelineVersionID: string;
public experimentID: string;
public runID: string;
public restAPIClient: rest.RestClient;
private bearerToken: string;
constructor() {
this.endpointUrl = task.getInput('KubeflowEndpoint', true)!;
this.runName = task.getInput('runName', true)!;
this.pipeline = task.getInput('pipeline', true)!;
this.useDefaultVersion = task.getBoolInput('useDefaultVersion', true)!;
if(this.useDefaultVersion == true) {
this.pipelineVersion = this.pipeline;
}
else {
this.pipelineVersion = task.getInput('pipelineVersion', true)!;
}
this.pipelineParams = task.getInput('pipelineParams', false)!;
this.description = task.getInput('runDescription', false)!;
this.waitForRunToFinish = task.getBoolInput('waitForRunToFinish', true);
this.experiment = task.getInput('experiment', true)!;
this.experimentName = task.getInput('experimentName', true)!;
this.runType = 'One-Off';
this.getAllRunsEndpoint = 'pipeline/apis/v1beta1/runs';
this.getAllPipelinesEndpoint = 'pipeline/apis/v1beta1/pipelines';
this.getAllVersionsEndpoint = 'pipeline/apis/v1beta1/pipeline_versions';
this.getAllExperimentsEndpoint = 'pipeline/apis/v1beta1/experiments';
this.pipelineID = '';
this.pipelineVersionID = '';
this.experimentID = '';
this.runID = '';
this.restAPIClient = new rest.RestClient('agent');
this.bearerToken = task.getInput('bearerToken', false)!;
}
public async validateEndpointUrl() {
try {
var options: rest.IRequestOptions = {additionalHeaders: {'authorization': `Bearer ${this.bearerToken}`}};
var req = await this.restAPIClient.get(this.endpointUrl, options);
if(req.statusCode == 200) {
return true;
}
return false;
}
catch(error) {
task.setResult(task.TaskResult.Failed, error.message);
}
}
public async validatePipeline() {
try {
var pipelineID = await this.getPipelineID();
if(pipelineID != 'Not a valid pipeline id.') {
this.pipelineID = pipelineID;
task.setVariable("kf_pipeline_id", this.pipelineID);
return true;
}
else{
return false;
}
}
catch(error) {
task.setResult(task.TaskResult.Failed, error.message);
}
}
public async validatePipelineVersion() {
try {
var versionID = await this.getPipelineVersionID();
if(versionID != 'Not a valid version id.') {
this.pipelineVersionID = versionID;
task.setVariable("kf_version_id", this.pipelineVersionID);
return true;
}
else {
return false;
}
}
catch(error) {
task.setResult(task.TaskResult.Failed, error.message);
}
}
public async validateExperimentName() {
try {
if(this.experiment == 'createNewExperiment') {
this.experimentID = await this.getExperimentID();
return true;
}
else {
var experimentID = await this.getExperimentID();
if(experimentID != 'Not a valid experiment id.') {
this.experimentID = experimentID;
task.setVariable("kf_experiment_id", this.experimentID);
return true;
}
else {
return false;
}
}
}
catch(error) {
task.setResult(task.TaskResult.Failed, error.message);
}
}
public async validatePipelineParams() {
try {
if(this.pipelineParams == '' || this.pipelineParams == undefined) {
return true;
}
JSON.parse(this.pipelineParams);
return true;
}
catch(error) {
task.setResult(task.TaskResult.Failed, `Pipeline Params is not a valid json object array. ${error.message}`);
}
}
public async runValidations() {
try {
if(!await this.validateEndpointUrl) {
throw new Error('Endpoint Url must be a valid Url.');
}
if(!await this.validatePipeline()) {
throw new Error('Pipeline not found. Please make sure you are using an existing pipeline from your Kubeflow workspace.');
}
if(!await this.validatePipelineVersion()) {
throw new Error('Pipeline version not found. Please make sure you are using an existing pipeline version from your Kubeflow workspace.');
}
if(!await this.validateExperimentName()) {
throw new Error('Experiment not found. Please make sure you are using an existing experiment from your Kubeflow workspace.');
}
if(!await this.validatePipelineParams()) {
throw new Error('Pipeline Params must contain resource_group and workspace parameters.');
}
return true;
}
catch(error) {
task.setResult(task.TaskResult.Failed, error.message);
}
}
// The payload that posting a new run takes follows this format as a string: {name: string, description: string,
// pipeline_spec: {parameters: [{}]}, resource_references: [{key: {id: string, type: EXPERIMENT}, relationship: OWNER},
// {key: {id: string, type: PIPELINE_VERSION}, relationship: CREATOR}]}
public async createRun() {
try {
if(this.pipelineParams == '' || this.pipelineParams == undefined) {
console.log('hitting the right place');
var form = `{"name": "${this.runName}", "description": "${this.description}",
"pipeline_spec": {"parameters": []},
"resource_references": [{"key": {"id": "${this.experimentID}", "type": "EXPERIMENT"}, "relationship": "OWNER"},
{"key": {"id": "${this.pipelineVersionID}", "type": "PIPELINE_VERSION"}, "relationship": "CREATOR"}]}`;
}
else {
var form = `{"name": "${this.runName}", "description": "${this.description}",
"pipeline_spec": {"parameters": ${this.pipelineParams}},
"resource_references": [{"key": {"id": "${this.experimentID}", "type": "EXPERIMENT"}, "relationship": "OWNER"},
{"key": {"id": "${this.pipelineVersionID}", "type": "PIPELINE_VERSION"}, "relationship": "CREATOR"}]}`;
}
var reqHost = this.endpointUrl.substring(7, this.endpointUrl.length - 1);
var reqHeaders = {
'authorization': `Bearer ${this.bearerToken}`,
'content-type': 'application/json'
}
await this.postRequest(reqHost, form, reqHeaders);
await this.wait(10000);
var runID = await this.getRunID();
if(runID != 'Not a valid run id.') {
this.runID = runID;
task.setVariable("kf_run_id", this.runID);
console.log(`The new run can be viewed at: ${this.endpointUrl}_/pipeline/#/runs/details/${this.runID}`);
console.log(`The new Runs ID is: ${this.runID}`);
}
else {
throw new Error('Failed to retrieve ID of new run. Make sure you are using the correct endpoint, and that you are using the correct bearer token, if necessary.');
}
}
catch(error) {
task.setResult(task.TaskResult.Failed, error.message);
}
}
public async postRequest(reqHost: string, form: string, reqHeaders: OutgoingHttpHeaders) {
try {
var req = request(
{
host: reqHost,
path: `/${this.getAllRunsEndpoint}`,
method: 'POST',
headers: reqHeaders,
},
response => {
try {
response.on('data', d => {
process.stdout.write(d);
});
console.log(`Response returned with status code ${response.statusCode}: ${response.statusMessage}`);
}
catch(error) {
task.setResult(task.TaskResult.Failed, `${error.message} Make sure that your endpoint is correct, and that you are using the correct bearer token, if neccessary.`);
}
}
);
req.write(form);
req.end();
}
catch(error) {
task.setResult(task.TaskResult.Failed, error.message);
}
}
public async monitorRun() {
try {
var status = '';
while(true) {
status = await this.getRunStatus();
var date = new Date();
console.log(`Time: ${date.toTimeString().split(' ')[0]} Status: ${status}`);
if(status == 'Succeeded') {
console.log('Succeeded');
task.setVariable('kf_run_status', status);
return;
}
else if(status == 'Failed') {
task.setVariable('kf_run_status', status);
throw new Error('Run has failed.');
}
await this.wait(15000);
}
}
catch(error) {
task.setResult(task.TaskResult.Failed, error.message);
}
}
public async wait(ms: number) {
await new Promise((resolve) => {
setTimeout(resolve, ms);
});
}
public async getRunID() {
try {
var url = `${this.endpointUrl}${this.getAllRunsEndpoint}?resource_key.type=PIPELINE_VERSION&resource_key.id=${this.pipelineVersionID}&filter={"predicates":[{"key":"name","op":"EQUALS","string_value":"${this.runName}"}]}&sort_by=created_at desc`;
url = encodeURI(url);
var options: rest.IRequestOptions = {additionalHeaders: {'authorization': `Bearer ${this.bearerToken}`}};
var webRequest = await this.restAPIClient.get<IAllRun>(url, options)!;
if(webRequest.result != null) {
if(webRequest.result.runs[0].id != undefined) {
return webRequest.result.runs[0].id;
}
console.log('Run not found. Make sure your endpoint and/or bearer token are correct.');
return 'Not a valid run id.';
}
console.log('Request did not go through. Make sure your endpoint and/or bearer token are correct.');
return 'Not a valid run id.';
}
catch(error) {
task.setResult(task.TaskResult.Failed, error.message);
return 'Not a valid run id.';
}
}
public async getRunStatus() {
try {
var url = `${this.endpointUrl}${this.getAllRunsEndpoint}/${this.runID}`;
var options: rest.IRequestOptions = {additionalHeaders: {'authorization': `Bearer ${this.bearerToken}`}};
var webRequest = await this.restAPIClient.get<ISingleRun>(url, options)!;
if(webRequest.result != null) {
if(webRequest.result.run.status != undefined) {
return webRequest.result.run.status;
}
return 'Not a valid status.';
}
console.log('Request did not go through. Make sure your endpoint and/or bearer token are correct.');
return 'Not a valid status.';
}
catch(error) {
task.setResult(task.TaskResult.Failed, error.message);
return 'Not a valid status.';
}
}
public async getPipelineID(): Promise<string> {
try {
var url = `${this.endpointUrl}${this.getAllPipelinesEndpoint}?filter={"predicates":[{"key":"name","op":"EQUALS","string_value":"${this.pipeline}"}]}`;
url = encodeURI(url);
var options: rest.IRequestOptions = {additionalHeaders: {'authorization': `Bearer ${this.bearerToken}`}};
var webRequest = await this.restAPIClient.get<IAllPipeline>(url, options)!;
if(webRequest.result != null) {
if(webRequest.result.pipelines[0].id != undefined) {
return webRequest.result.pipelines[0].id;
}
console.log('Pipeline not found. Make sure your endpoint and/or bearer token are correct.');
return 'Not a valid pipeline id.';
}
console.log('Request did not go through. Make sure your endpoint and/or bearer token are correct.');
return 'Not a valid pipeline id.';
}
catch(error) {
task.setResult(task.TaskResult.Failed, error.message);
return 'Not a valid pipeline id.';
}
}
public async getPipelineVersionID(): Promise<string> {
try {
var url = `${this.endpointUrl}${this.getAllVersionsEndpoint}?resource_key.type=PIPELINE&resource_key.id=${this.pipelineID}&filter={"predicates":[{"key":"name","op":"EQUALS","string_value":"${this.pipelineVersion}"}]}`;
url = encodeURI(url);
var options: rest.IRequestOptions = {additionalHeaders: {'authorization': `Bearer ${this.bearerToken}`}};
var webRequest = await this.restAPIClient.get<IAllPipelineVersion>(url, options)!;
if(webRequest.result != null) {
var versions = webRequest.result.versions;
if(versions != undefined) {
for(var i = 0; i < versions.length; i++) {
if(versions[i].name == this.pipelineVersion) {
return versions[i].id;
}
}
console.log('Version not found. Make sure your endpoint and/or bearer token are correct.');
return 'Not a valid version id.';
}
console.log('Version not found. Make sure your endpoint and/or bearer token are correct.');
return 'Not a valid version id.';
}
console.log('Request did not go through. Make sure your endpoint and/or bearer token are correct.');
return 'Not a valid version id.';
}
catch(error) {
task.setResult(task.TaskResult.Failed, error.message);
return 'Not a valid version id.';
}
}
public async getExperimentID(): Promise<string> {
try {
var url = `${this.endpointUrl}${this.getAllExperimentsEndpoint}?filter={"predicates":[{"key":"name","op":"EQUALS","string_value":"${this.experimentName}"}]}`;
url = encodeURI(url);
var options: rest.IRequestOptions = {additionalHeaders: {'authorization': `Bearer ${this.bearerToken}`}};
var webRequest = await this.restAPIClient.get<IAllExperiment>(url, options)!;
if(webRequest.result != null) {
var experiments = webRequest.result.experiments;
if(experiments[0].id != undefined) {
return experiments[0].id;
}
console.log('Experiment not found. Make sure your endpoint and/or bearer token are correct.');
return 'Not a valid experiment id.';
}
console.log('Request did not go through. Make sure your endpoint and/or bearer token are correct.');
return 'Not a valid experiment id.';
}
catch(error) {
task.setResult(task.TaskResult.Failed, error.message);
return 'Not a valid experiment id.';
}
}
}

Просмотреть файл

@ -0,0 +1,52 @@
export interface IExperiment {
id: string,
name: string,
description: string,
created_at: Date,
// resource_references: [{key: {}, relationship: string}];
}
export interface IAllExperiment {
experiments: IExperiment[]
}
export interface IRun {
id: string,
name: string,
description: string,
created_at: Date,
status: string;
}
export interface ISingleRun {
run: IRun;
}
export interface IAllRun {
runs: IRun[]
}
export interface IPipeline {
id: string;
created_at: Date;
name: string;
description: string;
parameters: [];
default_version: Object;
}
export interface IAllPipeline {
pipelines: IPipeline[];
}
export interface IPipelineVersion {
id: string;
created_at: Date;
name: string;
parameters: [];
resource_references: [{key: {}, relationship: string}];
}
export interface IAllPipelineVersion {
versions: IPipelineVersion[];
}

Просмотреть файл

@ -0,0 +1,29 @@
{
"name": "create_run_experiments",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "mocha ./Tests/RunTests.js ./Tests/ExperimentTests.js --reporter xunit --reporter-option output=RunTestResults.xml"
},
"repository": {
"type": "git",
"url": "https://dev.azure.com/csedevops/Kubeflow%20Integration/_git/kubeflow_azdo_task"
},
"author": "v-ryzube",
"license": "ISC",
"dependencies": {
"@types/request": "^2.48.4",
"azure-pipelines-task-lib": "^2.9.3",
"form-data": "^3.0.0",
"mocha": "^7.1.1",
"request": "^2.88.2",
"typed-rest-client": "^1.7.3"
},
"devDependencies": {
"@types/mocha": "^7.0.2",
"@types/node": "^13.11.1",
"@types/q": "^1.5.2",
"sync-request": "^6.1.0"
}
}

Просмотреть файл

@ -0,0 +1,148 @@
{
"id": "0f73780c-5793-4c97-a2e6-6835cb554c42",
"name": "KubeflowExperimentRun",
"friendlyName": "Kubeflow Experiment and Run",
"description": "Create an Experiment and Run with Kubeflow",
"helpMarkDown": "",
"category": "Utility",
"author": "CSE-DevOps",
"version": {
"Major": 0,
"Minor": 1,
"Patch": 0
},
"instanceNameFormat": "Create an experiment and a run",
"groups": [
{
"name": "Run",
"displayName": "Run",
"isExpanded": true,
"visibleRule": ""
},
{
"name": "Experiment",
"displayName": "Experiment",
"isExpanded": true,
"visibleRule": ""
}
],
"inputs": [
{
"name": "kubeflowEndpoint",
"type": "string",
"label": "Kubeflow Endpoint",
"defaultValue": "",
"required": true,
"helpMarkDown": "Input the URL of the Kubeflow workspace you are trying to use. Make sure you put in the url for the homepage in the following format: http://yourURL/",
"groupName": "Run"
},
{
"name": "bearerToken",
"type": "string",
"label": "Bearer Token",
"defaultValue": "",
"required": false,
"helpMarkDown": "DO NOT USE RAW TOKEN! For help creating secret variables visit https://docs.microsoft.com/en-us/azure/devops/pipelines/process/variables?view=azure-devops&tabs=yaml%2Cbatch#secret-variables",
"groupName": "Run"
},
{
"name": "pipeline",
"type": "string",
"label": "Pipeline",
"defaultValue": "",
"required": true,
"helpMarkDown": "Choose the pipeline you would like to run.",
"groupName": "Run"
},
{
"name": "useDefaultVersion",
"type": "boolean",
"label": "Use Default Version",
"defaultValue": "false",
"helpMarkDown": "If checked, will use the original version created with the pipeline.",
"groupName": "Run",
"required": true
},
{
"name": "pipelineVersion",
"type": "string",
"label": "Pipeline Version",
"defaultValue": "",
"required": true,
"helpMarkDown": "Choose the pipeline version you would like to run.",
"groupName": "Run"
},
{
"name": "runName",
"type": "string",
"label": "Run Name",
"defaultValue": "",
"required": true,
"helpMarkDown": "The name of the new run.",
"groupName": "Run"
},
{
"name": "pipelineParams",
"type": "string",
"label": "Pipeline Params",
"defaultValue": "",
"required": false,
"helpMarkDown": "Input any parameters you would like to add in the following format: [{\"name\":\"VAR_NAME\", \"value\":\"VAR_VALUE\"}, {\"name\":\"VAR_NAME\", \"value\":\"VAR_VALUE\"}]",
"groupName": "Run"
},
{
"name": "runDescription",
"type": "string",
"label": "Description",
"defaultValue": "",
"required": false,
"helpMarkDown": "Description of the new run.",
"groupName": "Run"
},
{
"name": "waitForRunToFinish",
"type": "boolean",
"label": "Wait for run to complete",
"defaultValue": true,
"helpMarkDown": "If checked, the task will monitor the runs status and fail if the run fails.",
"groupName": "Run"
},
{
"name": "experiment",
"type": "radio",
"label": "Experiment",
"defaultValue": "createNewExperiment",
"required": true,
"helpMarkDown": "Choose to create a new experiment or use an existing one.",
"options": {
"createNewExperiment": "Create New Experiment",
"useExistingExperiment": "Use Existing Experiment"
},
"groupName": "Experiment"
},
{
"name": "experimentName",
"type": "string",
"label": "Experiment Name",
"defaultValue": "",
"required": true,
"helpMarkDown": "Name a new experiment, or choose an existing experiment.",
"groupName": "Experiment"
},
{
"name": "experimentDescription",
"type": "string",
"label": "Description",
"defaultValue": "",
"required": false,
"helpMarkDown": "Description of the new experiment.",
"visibleRule": "experiment=createNewExperiment",
"groupName": "Experiment"
}
],
"execution": {
"Node10": {
"target": "index.js"
}
}
}

Просмотреть файл

@ -0,0 +1,66 @@
{
"compilerOptions": {
/* Basic Options */
// "incremental": true, /* Enable incremental compilation */
"target": "es2020", /* Specify ECMAScript target version: 'ES3' (default), 'ES5', 'ES2015', 'ES2016', 'ES2017', 'ES2018', 'ES2019' or 'ESNEXT'. */
"module": "commonjs", /* Specify module code generation: 'none', 'commonjs', 'amd', 'system', 'umd', 'es2015', or 'ESNext'. */
// "lib": [], /* Specify library files to be included in the compilation. */
// "allowJs": true, /* Allow javascript files to be compiled. */
// "checkJs": true, /* Report errors in .js files. */
// "jsx": "preserve", /* Specify JSX code generation: 'preserve', 'react-native', or 'react'. */
// "declaration": true, /* Generates corresponding '.d.ts' file. */
// "declarationMap": true, /* Generates a sourcemap for each corresponding '.d.ts' file. */
// "sourceMap": true, /* Generates corresponding '.map' file. */
// "outFile": "./", /* Concatenate and emit output to single file. */
// "outDir": "./", /* Redirect output structure to the directory. */
// "rootDir": "./", /* Specify the root directory of input files. Use to control the output directory structure with --outDir. */
// "composite": true, /* Enable project compilation */
// "tsBuildInfoFile": "./", /* Specify file to store incremental compilation information */
// "removeComments": true, /* Do not emit comments to output. */
// "noEmit": true, /* Do not emit outputs. */
// "importHelpers": true, /* Import emit helpers from 'tslib'. */
// "downlevelIteration": true, /* Provide full support for iterables in 'for-of', spread, and destructuring when targeting 'ES5' or 'ES3'. */
// "isolatedModules": true, /* Transpile each file as a separate module (similar to 'ts.transpileModule'). */
/* Strict Type-Checking Options */
"strict": true, /* Enable all strict type-checking options. */
// "noImplicitAny": true, /* Raise error on expressions and declarations with an implied 'any' type. */
// "strictNullChecks": true, /* Enable strict null checks. */
// "strictFunctionTypes": true, /* Enable strict checking of function types. */
// "strictBindCallApply": true, /* Enable strict 'bind', 'call', and 'apply' methods on functions. */
// "strictPropertyInitialization": true, /* Enable strict checking of property initialization in classes. */
// "noImplicitThis": true, /* Raise error on 'this' expressions with an implied 'any' type. */
// "alwaysStrict": true, /* Parse in strict mode and emit "use strict" for each source file. */
/* Additional Checks */
// "noUnusedLocals": true, /* Report errors on unused locals. */
// "noUnusedParameters": true, /* Report errors on unused parameters. */
// "noImplicitReturns": true, /* Report error when not all code paths in function return a value. */
// "noFallthroughCasesInSwitch": true, /* Report errors for fallthrough cases in switch statement. */
/* Module Resolution Options */
// "moduleResolution": "node", /* Specify module resolution strategy: 'node' (Node.js) or 'classic' (TypeScript pre-1.6). */
// "baseUrl": "./", /* Base directory to resolve non-absolute module names. */
// "paths": {}, /* A series of entries which re-map imports to lookup locations relative to the 'baseUrl'. */
// "rootDirs": [], /* List of root folders whose combined content represents the structure of the project at runtime. */
// "typeRoots": [], /* List of folders to include type definitions from. */
// "types": [], /* Type declaration files to be included in compilation. */
// "allowSyntheticDefaultImports": true, /* Allow default imports from modules with no default export. This does not affect code emit, just typechecking. */
"esModuleInterop": true, /* Enables emit interoperability between CommonJS and ES Modules via creation of namespace objects for all imports. Implies 'allowSyntheticDefaultImports'. */
// "preserveSymlinks": true, /* Do not resolve the real path of symlinks. */
// "allowUmdGlobalAccess": true, /* Allow accessing UMD globals from modules. */
/* Source Map Options */
// "sourceRoot": "", /* Specify the location where debugger should locate TypeScript files instead of source locations. */
// "mapRoot": "", /* Specify the location where debugger should locate map files instead of generated locations. */
// "inlineSourceMap": true, /* Emit a single file with source maps instead of having a separate file. */
// "inlineSources": true, /* Emit the source alongside the sourcemaps within a single file; requires '--inlineSourceMap' or '--sourceMap' to be set. */
/* Experimental Options */
// "experimentalDecorators": true, /* Enables experimental support for ES7 decorators. */
// "emitDecoratorMetadata": true, /* Enables experimental support for emitting type metadata for decorators. */
/* Advanced Options */
"forceConsistentCasingInFileNames": true /* Disallow inconsistently-cased references to the same file. */
}
}

Просмотреть файл

@ -6,17 +6,19 @@ import * as rest from "typed-rest-client";
import * as HttpC from "typed-rest-client/HttpClient";
import{request, OutgoingHttpHeaders} from "http";
import FormData from "form-data";
import { IAllPipeline, IUploadPipeline } from "../operations/interfaces";
import { IAllPipeline, IUploadPipeline, IAllPipelineVersion } from "../operations/interfaces";
export class UploadPipelineMock implements IUploadPipeline {
public endpointUrl: string;
public getAllPipelinesEndpoint: string;
public getAllVersionsEndpoint: string;
private bearerToken: string;
public pipelineTask: string;
public pipelineFilePath: string;
public newPipelineName: string;
public existingPipelineName: string;
public versionName: string;
public pipelineID: string;
public restAPIClient: rest.RestClient;
public maxFileSizeBytes: number;
@ -24,11 +26,13 @@ export class UploadPipelineMock implements IUploadPipeline {
newPipelineName: string, existingPipelineName: string, versionName: string, bearerToken?: string) {
this.endpointUrl = endpointUrl;
this.getAllPipelinesEndpoint = 'pipeline/apis/v1beta1/pipelines';
this.getAllVersionsEndpoint = 'pipeline/apis/v1beta1/pipeline_versions';
this.pipelineTask = pipelineTask;
this.pipelineFilePath = pipelineFilePath;
this.newPipelineName = newPipelineName;
this.existingPipelineName = existingPipelineName;
this.versionName = versionName;
this.pipelineID = '';
this.bearerToken = bearerToken!;
this.restAPIClient = new rest.RestClient('some-agent');
this.maxFileSizeBytes = 32000000;
@ -36,13 +40,8 @@ export class UploadPipelineMock implements IUploadPipeline {
public async validateEndpointUrl() {
try {
if(this.bearerToken == undefined || this.bearerToken == null) {
var req = await this.restAPIClient.get(this.endpointUrl);
}
else {
var options: rest.IRequestOptions = {additionalHeaders: {'authorization': `Bearer ${this.bearerToken}`}};
var req = await this.restAPIClient.get(this.endpointUrl, options);
}
var options: rest.IRequestOptions = {additionalHeaders: {'authorization': `Bearer ${this.bearerToken}`}};
var req = await this.restAPIClient.get(this.endpointUrl, options);
if(req.statusCode == 200) {
return true;
}
@ -103,13 +102,8 @@ export class UploadPipelineMock implements IUploadPipeline {
public async validateNewPipelineName() {
try {
var url = `${this.endpointUrl}${this.getAllPipelinesEndpoint}`;
if(this.bearerToken == undefined || this.bearerToken == null) {
var webRequest = await this.restAPIClient.get<IAllPipeline>(url)!;
}
else {
var options: rest.IRequestOptions = {additionalHeaders: {'authorization': `Bearer ${this.bearerToken}`}};
var webRequest = await this.restAPIClient.get<IAllPipeline>(url, options)!;
}
var options: rest.IRequestOptions = {additionalHeaders: {'authorization': `Bearer ${this.bearerToken}`}};
var webRequest = await this.restAPIClient.get<IAllPipeline>(url, options)!;
if(webRequest.result != null) {
if(webRequest.result.pipelines != undefined){
for(var PL of webRequest.result.pipelines) {
@ -135,17 +129,13 @@ export class UploadPipelineMock implements IUploadPipeline {
public async validateExistingPipelineName() {
try {
var url = `${this.endpointUrl}${this.getAllPipelinesEndpoint}`;
if(this.bearerToken == undefined || this.bearerToken == null) {
var webRequest = await this.restAPIClient.get<IAllPipeline>(url)!;
}
else {
var options: rest.IRequestOptions = {additionalHeaders: {'authorization': `Bearer ${this.bearerToken}`}};
var webRequest = await this.restAPIClient.get<IAllPipeline>(url, options)!;
}
var options: rest.IRequestOptions = {additionalHeaders: {'authorization': `Bearer ${this.bearerToken}`}};
var webRequest = await this.restAPIClient.get<IAllPipeline>(url, options)!;
if(webRequest.result != null) {
if(webRequest.result.pipelines != undefined){
for(var PL of webRequest.result.pipelines!) {
if(PL.name == this.existingPipelineName) {
this.pipelineID = PL.id;
return true;
}
}
@ -164,6 +154,31 @@ export class UploadPipelineMock implements IUploadPipeline {
}
}
public async validateNewVersionName() {
try{
var url = `${this.endpointUrl}${this.getAllVersionsEndpoint}?resource_key.type=PIPELINE&resource_key.id=${this.pipelineID}&filter={"predicates":[{"key":"name","op":"EQUALS","string_value":"${this.versionName}"}]}`;
url = encodeURI(url);
var options: rest.IRequestOptions = {additionalHeaders: {'authorization': `Bearer ${this.bearerToken}`}};
var webRequest = await this.restAPIClient.get<IAllPipelineVersion>(url, options)!;
if(webRequest.result != null) {
var versions = webRequest.result.versions;
if(versions != undefined) {
for(var i = 0; i < versions.length; i++) {
if(versions[i].name == this.versionName) {
return false;
}
}
return true;
}
return false;
}
return false;
}
catch(error) {
task.setResult(task.TaskResult.Failed, error.message);
}
}
public async runValidations() {
try {
if(!await this.validateEndpointUrl()) {return false;}
@ -181,61 +196,4 @@ export class UploadPipelineMock implements IUploadPipeline {
task.setResult(task.TaskResult.Failed, error.message);
}
}
// public async uploadNewPipeline() {
// try {
// var uploadFile = fs.createReadStream(this.pipelineFilePath);
// var form: FormData = new FormData();
// form.append('uploadfile', uploadFile);
// var reqHost = this.endpointUrl.substring(7, this.endpointUrl.length - 1);
// if(this.bearerToken == undefined || this.bearerToken == null) {
// await this.postRequest(form.getHeaders(), reqHost, form);
// }
// else {
// var reqHeaders = {
// 'content-type': 'multipart/form-data',
// 'authorization': `Bearer ${this.bearerToken}`
// }
// await this.postRequest(reqHeaders, reqHost, form);
// }
// }
// catch(error) {
// console.log(error.message);
// task.setResult(task.TaskResult.Failed, 'Failed to upload pipeline with the above error.');
// }
// }
// public async postRequest(reqHeaders: OutgoingHttpHeaders, reqHost: string, form: FormData) {
// var req = request(
// {
// host: reqHost,
// path: `/${this.getRequestPath}/upload?name=${this.newPipelineName}`,
// method: 'POST',
// headers: reqHeaders,
// },
// response => {
// try {
// response.on('data', d => {
// process.stdout.write(d);
// })
// console.log(`Response returned with status code ${response.statusCode}: ${response.statusMessage}`);
// }
// catch(error) {
// task.setResult(task.TaskResult.Failed, error.message);
// }
// }
// );
// form.pipe(req);
// }
// public async uploadNewPipelineVersion() {
// try {
// console.log('nothing here yet');
// }
// catch(error) {
// console.log(error.message);
// task.setResult(task.TaskResult.Failed, 'Failed to upload new pipeline version with the above error.');
// }
// }
}

Просмотреть файл

@ -4,7 +4,7 @@ import { async } from "q";
var assert = require('assert');
var fs = require('fs');
var UP = new UploadPipelineMock('http://52.149.247.172/', 'uploadNew',
var UP = new UploadPipelineMock('http://52.149.62.186/', 'uploadNew',
'/home/vsts/work/1/s/src/Tasks/Upload_Pipeline/Tests/pipeline.py.tar.gz', //Azure DevOps Tests
// 'C:/users/v-ryzube/source/repos/kubeflow_azdo_task/src/Tasks/Upload_Pipeline/Tests/pipeline.py.tar.gz', //Local Tests
'newPLName', 'testPipeline', '12345');

Двоичный файл не отображается.

Двоичные данные
src/Tasks/Upload_Pipeline/icon.png

Двоичный файл не отображается.

До

Ширина:  |  Высота:  |  Размер: 7.2 KiB

После

Ширина:  |  Высота:  |  Размер: 5.2 KiB

Просмотреть файл

@ -7,43 +7,43 @@ import * as HttpC from "typed-rest-client/HttpClient";
import{request, OutgoingHttpHeaders} from "http";
import FormData from "form-data";
import { IAllPipeline } from "./interfaces";
import { IUploadPipeline } from "./interfaces";
import { IUploadPipeline, IAllPipelineVersion } from "./interfaces";
import { IRequestOptions } from "typed-rest-client/Interfaces";
import { pipeline } from "stream";
export class UploadPipeline implements IUploadPipeline {
public endpointUrl: string;
public getAllPipelinesEndpoint: string;
public getAllVersionsEndpoint: string;
private bearerToken: string;
public pipelineTask: string;
public pipelineFilePath: string;
public newPipelineName: string;
public existingPipelineName: string;
public versionName: string;
public newPipelineName: string | undefined;
public existingPipelineName: string | undefined;
public versionName: string | undefined;
public pipelineID: string;
public restAPIClient: rest.RestClient;
public maxFileSizeBytes: number;
constructor() {
this.endpointUrl = task.getInput('kubeflowEndpoint', true)!;
this.getAllPipelinesEndpoint = 'pipeline/apis/v1beta1/pipelines';
this.getAllVersionsEndpoint = 'pipeline/apis/v1beta1/pipeline_versions';
this.bearerToken = task.getInput('bearerToken', false)!;
this.pipelineTask = task.getInput('kubeflowPipelineTask', true)!;
this.pipelineFilePath = task.getInput('pipelineFilePath', true)!;
this.newPipelineName = task.getInput('newPipelineName', true)!;
this.existingPipelineName = task.getInput('existingPipelineName', true)!;
this.versionName = task.getInput('versionName', true)!;
this.restAPIClient = new rest.RestClient('some-agent');
this.newPipelineName = task.getInput('newPipelineName', false)!;
this.existingPipelineName = task.getInput('existingPipelineName', false)!;
this.versionName = task.getInput('versionName', false)!;
this.pipelineID = '';
this.restAPIClient = new rest.RestClient('agent');
this.maxFileSizeBytes = 32000000;
}
public async validateEndpointUrl() {
try {
if(this.bearerToken == undefined || this.bearerToken == null) {
var req = await this.restAPIClient.get(this.endpointUrl);
}
else {
var options: rest.IRequestOptions = {additionalHeaders: {'authorization': `Bearer ${this.bearerToken}`}};
var req = await this.restAPIClient.get(this.endpointUrl, options);
}
var options: rest.IRequestOptions = {additionalHeaders: {'authorization': `Bearer ${this.bearerToken}`}};
var req = await this.restAPIClient.get(this.endpointUrl, options);
if(req.statusCode == 200) {
return true;
}
@ -90,14 +90,13 @@ export class UploadPipeline implements IUploadPipeline {
public async validateNewPipelineName() {
try {
var url = `${this.endpointUrl}${this.getAllPipelinesEndpoint}`;
if(this.bearerToken == undefined || this.bearerToken == null) {
var webRequest = await this.restAPIClient.get<IAllPipeline>(url)!;
}
else {
var options: rest.IRequestOptions = {additionalHeaders: {'authorization': `Bearer ${this.bearerToken}`}};
var webRequest = await this.restAPIClient.get<IAllPipeline>(url, options)!;
if(this.newPipelineName == undefined || this.newPipelineName == '') {
return false;
}
var url = `${this.endpointUrl}${this.getAllPipelinesEndpoint}?filter={"predicates":[{"key":"name","op":"EQUALS","string_value":"${this.newPipelineName}"}]}`;
url = encodeURI(url);
var options: rest.IRequestOptions = {additionalHeaders: {'authorization': `Bearer ${this.bearerToken}`}};
var webRequest = await this.restAPIClient.get<IAllPipeline>(url, options)!;
if(webRequest.result != null) {
if(webRequest.result.pipelines != undefined){
for(var PL of webRequest.result.pipelines) {
@ -105,6 +104,7 @@ export class UploadPipeline implements IUploadPipeline {
return false;
}
}
task.setVariable('kf_pipeline_name', this.newPipelineName);
return true;
}
else {
@ -122,18 +122,20 @@ export class UploadPipeline implements IUploadPipeline {
public async validateExistingPipelineName() {
try {
var url = `${this.endpointUrl}${this.getAllPipelinesEndpoint}`;
if(this.bearerToken == undefined || this.bearerToken == null) {
var webRequest = await this.restAPIClient.get<IAllPipeline>(url)!;
}
else {
var options: rest.IRequestOptions = {additionalHeaders: {'authorization': `Bearer ${this.bearerToken}`}};
var webRequest = await this.restAPIClient.get<IAllPipeline>(url, options)!;
if(this.existingPipelineName == undefined || this.existingPipelineName == '') {
return false;
}
var url = `${this.endpointUrl}${this.getAllPipelinesEndpoint}?filter={"predicates":[{"key":"name","op":"EQUALS","string_value":"${this.existingPipelineName}"}]}`;
url = encodeURI(url);
var options: rest.IRequestOptions = {additionalHeaders: {'authorization': `Bearer ${this.bearerToken}`}};
var webRequest = await this.restAPIClient.get<IAllPipeline>(url, options)!;
if(webRequest.result != null) {
if(webRequest.result.pipelines != undefined){
for(var PL of webRequest.result.pipelines!) {
if(PL.name == this.existingPipelineName) {
this.pipelineID = PL.id;
task.setVariable('kf_pipeline_name', PL.name);
task.setVariable('kf_pipeline_id', PL.id);
return true;
}
}
@ -152,6 +154,35 @@ export class UploadPipeline implements IUploadPipeline {
}
}
public async validateNewVersionName() {
try{
if(this.versionName == undefined || this.versionName == '') {
return false;
}
var url = `${this.endpointUrl}${this.getAllVersionsEndpoint}?resource_key.type=PIPELINE&resource_key.id=${this.pipelineID}&filter={"predicates":[{"key":"name","op":"EQUALS","string_value":"${this.versionName}"}]}`;
url = encodeURI(url);
var options: rest.IRequestOptions = {additionalHeaders: {'authorization': `Bearer ${this.bearerToken}`}};
var webRequest = await this.restAPIClient.get<IAllPipelineVersion>(url, options)!;
if(webRequest.result != null) {
var versions = webRequest.result.versions;
if(versions != undefined) {
for(var i = 0; i < versions.length; i++) {
if(versions[i].name == this.versionName) {
return false;
}
}
task.setVariable('kf_version_name', this.versionName);
return true;
}
return false;
}
return false;
}
catch(error) {
task.setResult(task.TaskResult.Failed, error.message);
}
}
public async runValidations() {
try {
if(!await this.validateEndpointUrl()) {
@ -172,6 +203,9 @@ export class UploadPipeline implements IUploadPipeline {
if(!await this.validateExistingPipelineName()) {
throw new Error('Pipeline name does not yet exist. You must enter an existing pipeline name or choose to upload a new pipeline.');
}
if(!await this.validateNewVersionName()) {
throw new Error('Version name already exists. You must enter a unique version name.');
}
}
return true;
}
@ -180,6 +214,7 @@ export class UploadPipeline implements IUploadPipeline {
}
}
// To post a new pipeline you have to pipe a file payload as form data and add the name onto the url as a string
public async uploadNewPipeline() {
try {
var uploadFile = fs.createReadStream(this.pipelineFilePath);
@ -187,20 +222,18 @@ export class UploadPipeline implements IUploadPipeline {
form.append('uploadfile', uploadFile);
var reqHost = this.endpointUrl.substring(7, this.endpointUrl.length - 1);
if(this.bearerToken == undefined || this.bearerToken == null) {
await this.newPLPostRequest(form.getHeaders(), reqHost, form);
}
else {
var reqHeaders = {
'content-type': 'multipart/form-data',
'authorization': `Bearer ${this.bearerToken}`
}
await this.newPLPostRequest(reqHeaders, reqHost, form);
var reqHeaders = form.getHeaders({'authorization': `Bearer ${this.bearerToken}`});
await this.newPLPostRequest(reqHeaders, reqHost, form);
await this.wait(5000);
var pipelineID = await this.getPipelineID(this.newPipelineName);
if(pipelineID == 'Not a valid pipeline id.') {
throw new Error('Existing pipeline not found. Check endpoint url. Either choose an new pipeline name or create a new version.');
}
console.log(`\nThe new pipeline's ID is: ${pipelineID}`);
console.log(`New pipeline can be viewed at: ${this.endpointUrl}_/pipeline/#/pipelines/details/${pipelineID}`);
}
catch(error) {
console.log(error.message);
task.setResult(task.TaskResult.Failed, 'Failed to upload pipeline with the above error.');
task.setResult(task.TaskResult.Failed, error.message);
}
}
@ -208,7 +241,7 @@ export class UploadPipeline implements IUploadPipeline {
var req = request(
{
host: reqHost,
path: `/${this.getAllPipelinesEndpoint}/upload?name=${this.newPipelineName}`,
path: encodeURI(`/${this.getAllPipelinesEndpoint}/upload?name=${this.newPipelineName}`),
method: 'POST',
headers: reqHeaders,
},
@ -220,63 +253,61 @@ export class UploadPipeline implements IUploadPipeline {
console.log(`Response returned with status code ${response.statusCode}: ${response.statusMessage}`);
}
catch(error) {
task.setResult(task.TaskResult.Failed, error.message);
task.setResult(task.TaskResult.Failed, `${error.message} Make sure that your endpoint is correct, and that you are using the correct bearer token, if neccessary.`);
}
}
);
form.pipe(req);
}
// To post a new version you have to pipe a file payload as form data and add the name and pipeline id onto the url as a string
public async uploadNewPipelineVersion() {
try {
var uploadFile = fs.createReadStream(this.pipelineFilePath);
var form: FormData = new FormData();
form.append('uploadfile', uploadFile);
var reqHost = this.endpointUrl.substring(7, this.endpointUrl.length - 1);
var existingPLID = await this.getPipelineID();
if(existingPLID == 'Not a valid id.') {
var existingPLID = await this.getPipelineID(this.existingPipelineName);
if(existingPLID == 'Not a valid pipeline id.') {
throw new Error('Existing pipeline not found. Check endpoint url. Either choose an existing pipeline or create a new pipeline.');
}
if(this.bearerToken == undefined || this.bearerToken == null) {
await this.newVersionPostRequest(form.getHeaders(), reqHost, form, existingPLID);
var reqHeaders = form.getHeaders({'authorization': `Bearer ${this.bearerToken}`});
await this.newVersionPostRequest(reqHeaders, reqHost, form, existingPLID);
await this.wait(5000);
var versionID = await this.getPipelineVersionID(existingPLID);
if(versionID == 'Not a valid version id.') {
throw new Error('Existing version not found. Check endpoint url and bearer token.');
}
else {
var reqHeaders = {
'content-type': 'multipart/form-data',
'authorization': `Bearer ${this.bearerToken}`
}
await this.newVersionPostRequest(reqHeaders, reqHost, form, existingPLID);
}
}
catch(error) {
console.log(error.message);
task.setResult(task.TaskResult.Failed, 'Failed to upload new pipeline version with the above error.');
}
}
public async getPipelineID(): Promise<string> {
try {
var url = `${this.endpointUrl}${this.getAllPipelinesEndpoint}?filter={"predicates":[{"key":"name","op":"EQUALS","string_value":"${this.existingPipelineName}"}]}`;
url = encodeURI(url);
if(this.bearerToken == undefined || this.bearerToken == null) {
var webRequest = await this.restAPIClient.get<IAllPipeline>(url)!;
}
else {
var options: rest.IRequestOptions = {additionalHeaders: {'authorization': `Bearer ${this.bearerToken}`}};
var webRequest = await this.restAPIClient.get<IAllPipeline>(url, options)!;
}
if(webRequest.result != null) {
if(webRequest.result.pipelines[0].id != undefined) {
return webRequest.result.pipelines[0].id;
}
return 'Not a valid id.';
}
return 'Not a valid id.';
console.log(`\nThe new pipeline version's ID is: ${versionID}`);
console.log(`New pipeline version can be viewed at: ${this.endpointUrl}_/pipeline/#/pipelines/details/${this.pipelineID}/version/${versionID}`);
}
catch(error) {
task.setResult(task.TaskResult.Failed, error.message);
return 'Not a valid id.';
}
}
public async getPipelineID(pipelineName: string | undefined): Promise<string> {
try {
var url = `${this.endpointUrl}${this.getAllPipelinesEndpoint}?filter={"predicates":[{"key":"name","op":"EQUALS","string_value":"${pipelineName}"}]}`;
url = encodeURI(url);
var options: rest.IRequestOptions = {additionalHeaders: {'authorization': `Bearer ${this.bearerToken}`}};
var webRequest = await this.restAPIClient.get<IAllPipeline>(url, options)!;
if(webRequest.result != null) {
var pipelines = webRequest.result.pipelines;
if(pipelines[0].id != undefined) {
task.setVariable('kf_pipeline_id', pipelines[0].id);
return pipelines[0].id;
}
console.log('Pipeline not found. Make sure your endpoint and/or bearer token are correct.');
return 'Not a valid pipeline id.';
}
console.log('Request did not go through. Make sure your endpoint and/or bearer token are correct.');
return 'Not a valid pipeline id.';
}
catch(error) {
task.setResult(task.TaskResult.Failed, error.message);
return 'Not a valid pipeline id.';
}
}
@ -284,7 +315,7 @@ export class UploadPipeline implements IUploadPipeline {
var req = request(
{
host: reqHost,
path: `/${this.getAllPipelinesEndpoint}/upload_version?name=${this.versionName}&pipelineid=${existingPLID}`,
path: encodeURI(`/${this.getAllPipelinesEndpoint}/upload_version?name=${this.versionName}&pipelineid=${existingPLID}`),
method: 'POST',
headers: reqHeaders,
},
@ -296,10 +327,45 @@ export class UploadPipeline implements IUploadPipeline {
console.log(`Response returned with status code ${response.statusCode}: ${response.statusMessage}`);
}
catch(error) {
task.setResult(task.TaskResult.Failed, error.message);
task.setResult(task.TaskResult.Failed, `${error.message} Make sure that your endpoint is correct, and that you are using the correct bearer token, if neccessary.`);
}
}
);
form.pipe(req);
}
public async getPipelineVersionID(pipelineID: string): Promise<string> {
try {
var url = `${this.endpointUrl}${this.getAllVersionsEndpoint}?resource_key.type=PIPELINE&resource_key.id=${pipelineID}&filter={"predicates":[{"key":"name","op":"EQUALS","string_value":"${this.versionName}"}]}`;
url = encodeURI(url);
var options: rest.IRequestOptions = {additionalHeaders: {'authorization': `Bearer ${this.bearerToken}`}};
var webRequest = await this.restAPIClient.get<IAllPipelineVersion>(url, options)!;
if(webRequest.result != null) {
var versions = webRequest.result.versions;
if(versions != undefined) {
for(var i = 0; i < versions.length; i++) {
if(versions[i].name == this.versionName) {
task.setVariable('kf_version_id', versions[i].id);
return versions[i].id;
}
}
return webRequest.result.versions[0].id;
}
console.log('Version not found. Make sure your endpoint and/or bearer token are correct.');
return 'Not a valid version id.';
}
console.log('Request did not go through. Make sure your endpoint and/or bearer token are correct.');
return 'Not a valid version id.';
}
catch(error) {
task.setResult(task.TaskResult.Failed, error.message);
return 'Not a valid version id.';
}
}
public async wait(ms: number) {
await new Promise((resolve) => {
setTimeout(resolve, ms);
});
}
}

Просмотреть файл

@ -11,12 +11,24 @@ export interface IAllPipeline {
pipelines: IPipeline[];
}
export interface IPipelineVersion {
id: string;
created_at: Date;
name: string;
parameters: [];
resource_references: [{key: {}, relationship: string}];
}
export interface IAllPipelineVersion {
versions: IPipelineVersion[];
}
export interface IUploadPipeline {
endpointUrl: string;
getAllPipelinesEndpoint: string;
pipelineTask: string;
pipelineFilePath: string;
newPipelineName: string;
existingPipelineName: string;
versionName: string;
newPipelineName: string | undefined;
existingPipelineName: string | undefined;
versionName: string | undefined;
}

Просмотреть файл

@ -4,7 +4,7 @@
"description": "Task for uploading a pipeline to the Kubeflow workspace",
"main": "index.js",
"scripts": {
"test": "mocha ./Tests/UploadPipelineTests.js"
"test": "mocha ./Tests/UploadPipelineTests.js --reporter xunit --reporter-option output=UploadTestResults.xml"
},
"repository": {
"type": "git",

Просмотреть файл

@ -1,29 +1,22 @@
{
"id": "c351c0a5-577d-4777-806f-b4c50e6dc31b",
"name": "UploadKubeflowPipeline",
"name": "KubeflowUploadPipeline",
"friendlyName": "Kubeflow Upload Pipeline",
"description": "Upload pipeline to the Kubeflow workspace for use with machine learning",
"description": "Upload Kubeflow pipeline",
"helpMarkDown": "",
"category": "Utility",
"author": "CSE-DevOps",
"version": {
"Major": 0,
"Minor": 1,
"Patch": 2
"Patch": 19
},
"instanceNameFormat": "Upload pipeline to Kubeflow",
"groups": [
{
"name": "UploadNew",
"displayName": "Upload New Pipeline",
"isExpanded": true,
"visibleRule": ""
},
{
"name": "UploadNewVersion",
"displayName": "Upload New Pipeline Version",
"isExpanded": true,
"visibleRule": ""
"name": "UploadPipeline",
"displayName": "Upload Pipeline",
"isExpanded": true
}
],
"inputs": [
@ -31,6 +24,7 @@
"name": "kubeflowEndpoint",
"type": "string",
"label": "Kubeflow Endpoint",
"group": "UploadPipeline",
"defaultValue": "",
"required": true,
"helpMarkDown": "Input the URL of the Kubeflow workspace you are trying to upload to. Make sure you put in the url for the homepage in the following format: http://yourURL/",
@ -42,26 +36,29 @@
"name": "bearerToken",
"type": "string",
"label": "Bearer Token",
"group": "UploadPipeline",
"defaultValue": "",
"required": false,
"helpMarkDown": "DO NOT INPUT THE RAW TOKEN HERE! If your API connection requires a bearer token, create a secret pipeline variable and input the variable like this: $(secretVar). For help creating secret variables visit https://docs.microsoft.com/en-us/azure/devops/pipelines/process/variables?view=azure-devops&tabs=yaml%2Cbatch#secret-variables"
"helpMarkDown": "DO NOT USE RAW TOKEN! For help creating secret variables visit https://docs.microsoft.com/en-us/azure/devops/pipelines/process/variables?view=azure-devops&tabs=yaml%2Cbatch#secret-variables"
},
{
"name": "kubeflowPipelineTask",
"type": "pickList",
"label": "Kubeflow Pipeline Task",
"group": "UploadPipeline",
"defaultValue": "",
"required": true,
"helpMarkDown": "Choose the operation to perform.",
"options": {
"uploadNew": "Upload New Pipeline",
"uploadNewVersion": "Upload New Pipeline Version"
"uploadNew": "Upload Pipeline",
"uploadNewVersion": "Upload Pipeline Version"
}
},
{
"name": "pipelineFilePath",
"type": "string",
"type": "filePath",
"label": "Pipeline Path",
"group": "UploadPipeline",
"defaultValue": "",
"required": true,
"helpMarkDown": "File path of the pipeline file to be uploaded. Must be a compressed .tar.gz file."
@ -70,8 +67,9 @@
"name": "newPipelineName",
"type": "string",
"label": "Pipeline Name",
"defaultValue": "name",
"required": true,
"group": "UploadPipeline",
"defaultValue": "",
"required": false,
"helpMarkDown": "The name you would like to give your pipeline.",
"visibleRule": "kubeflowPipelineTask = uploadNew"
},
@ -79,8 +77,9 @@
"name": "existingPipelineName",
"type": "string",
"label": "Pipeline Name",
"defaultValue": "name",
"required": true,
"group": "UploadPipeline",
"defaultValue": "",
"required": false,
"helpMarkDown": "Choose an existing pipeline to make a new version of.",
"visibleRule": "kubeflowPipelineTask = uploadNewVersion"
},
@ -88,8 +87,9 @@
"name": "versionName",
"type": "string",
"label": "Version Name",
"defaultValue": "version",
"required": true,
"group": "UploadPipeline",
"defaultValue": "",
"required": false,
"helpMarkDown": "The name of your new version of the pipeline.",
"visibleRule": "kubeflowPipelineTask = uploadNewVersion"
}

Просмотреть файл

@ -1,9 +1,9 @@
{
"manifestVersion": 1.0,
"id": "CreateExperimentRun",
"name": "Create Kubeflow Experiment and Run",
"version": "0.0.0",
"publisher": "v-ryzube",
"name": "Kubeflow Experiment and Run",
"version": "0.1.0",
"publisher": "CSE-DevOps",
"description": "Create a new Experiment and new Run with Kubeflow",
"targets": [
{

Просмотреть файл

@ -2,7 +2,7 @@
"manifestVersion": 1.0,
"id": "UploadKubeflowPipeline",
"name": "Kubeflow Upload Pipeline",
"version": "0.1.2",
"version": "0.1.19",
"publisher": "CSE-DevOps",
"description": "Simplifies uploading a pipeline to the Kubeflow workspace",
"targets": [