diff --git a/src/go/cmd/cachewarmer/README.md b/src/go/cmd/cachewarmer/README.md index 1b90e7fb..814ffb6f 100644 --- a/src/go/cmd/cachewarmer/README.md +++ b/src/go/cmd/cachewarmer/README.md @@ -104,8 +104,8 @@ On the controller or jumpbox, execute the following steps ```bash export BOOTSTRAP_PATH=/nfs/node0 +export STORAGE_ACCOUNT_RESOURCE_GROUP= export STORAGE_ACCOUNT= -export STORAGE_KEY='' export QUEUE_PREFIX= export BOOTSTRAP_EXPORT_PATH=/nfs1data @@ -136,6 +136,7 @@ export STORAGE_ACCOUNT= export STORAGE_KEY='' export QUEUE_PREFIX= ``` + 2. Run the following script: ```bash bash /nfs/node0/bootstrap/bootstrap.cachewarmer-worker.sh @@ -146,5 +147,5 @@ bash /nfs/node0/bootstrap/bootstrap.cachewarmer-worker.sh To submit a job, run a command similar to the following command, where the warm target variables are the Avere junction to warm: ```bash -sudo /usr/local/bin/cachewarmer-jobsubmitter -enableDebugging -storageAccountName "STORAGEACCOUNTREPLACE" -storageKey "STORAGEKEYREPLACE" -queueNamePrefix "QUEUEPREFIXREPLACE" -warmTargetExportPath "/nfs1data" -warmTargetMountAddresses "10.0.1.11,10.0.1.12,10.0.1.13" -warmTargetPath "/island" +sudo /usr/local/bin/cachewarmer-jobsubmitter -enableDebugging -storageAccountResourceGroup "STORAGERGREPLACE" -storageAccountName "STORAGEACCOUNTREPLACE" -queueNamePrefix "QUEUEPREFIXREPLACE" -warmTargetExportPath "/nfs1data" -warmTargetMountAddresses "10.0.1.11,10.0.1.12,10.0.1.13" -warmTargetPath "/island" ``` diff --git a/src/go/cmd/cachewarmer/cachewarmer-jobsubmitter/main.go b/src/go/cmd/cachewarmer/cachewarmer-jobsubmitter/main.go index 272c6998..aef80057 100644 --- a/src/go/cmd/cachewarmer/cachewarmer-jobsubmitter/main.go +++ b/src/go/cmd/cachewarmer/cachewarmer-jobsubmitter/main.go @@ -42,8 +42,8 @@ func initializeApplicationVariables(ctx context.Context) (*cachewarmer.WarmPathJ var maxFileSizeBytes = flag.Int64("maxFileSizeBytes", 0, "the maximum file size in bytes to warm.") + var storageAccountResourceGroup = flag.String("storageAccountResourceGroup", "", "the storage account resource group") var storageAccount = flag.String("storageAccountName", "", "the storage account name to host the queue") - var storageKey = flag.String("storageKey", "", "the storage key to access the queue") var queueNamePrefix = flag.String("queueNamePrefix", "", "the queue name to be used for organizing the work. The queues will be created automatically") var blockUntilWarm = flag.Bool("blockUntilWarm", false, "the job submitter will not return until there are no more jobs") @@ -72,14 +72,14 @@ func initializeApplicationVariables(ctx context.Context) (*cachewarmer.WarmPathJ os.Exit(1) } - if len(*storageAccount) == 0 { - fmt.Fprintf(os.Stderr, "ERROR: storageAccount is not specified\n") + if len(*storageAccountResourceGroup) == 0 { + fmt.Fprintf(os.Stderr, "ERROR: storageAccountResourceGroup is not specified\n") usage() os.Exit(1) } - if len(*storageKey) == 0 { - fmt.Fprintf(os.Stderr, "ERROR: storageKey is not specified\n") + if len(*storageAccount) == 0 { + fmt.Fprintf(os.Stderr, "ERROR: storageAccount is not specified\n") usage() os.Exit(1) } @@ -96,6 +96,12 @@ func initializeApplicationVariables(ctx context.Context) (*cachewarmer.WarmPathJ os.Exit(1) } + primaryKey, err := cachewarmer.GetPrimaryStorageKey(ctx, *storageAccountResourceGroup, *storageAccount) + if err != nil { + fmt.Fprintf(os.Stderr, "ERROR: unable to get storage account key: %s", err) + os.Exit(1) + } + warmJobPath := cachewarmer.InitializeWarmPathJob( *warmTargetMountAddresses, *warmTargetExportPath, @@ -107,7 +113,7 @@ func initializeApplicationVariables(ctx context.Context) (*cachewarmer.WarmPathJ cacheWarmerQueues, err := cachewarmer.InitializeCacheWarmerQueues( ctx, *storageAccount, - *storageKey, + primaryKey, *queueNamePrefix) if err != nil { fmt.Fprintf(os.Stderr, "ERROR: error initializing queue %v\n", err) diff --git a/src/go/cmd/cachewarmer/cachewarmer-manager/main.go b/src/go/cmd/cachewarmer/cachewarmer-manager/main.go index 1410a76c..a979c957 100644 --- a/src/go/cmd/cachewarmer/cachewarmer-manager/main.go +++ b/src/go/cmd/cachewarmer/cachewarmer-manager/main.go @@ -36,9 +36,10 @@ func initializeApplicationVariables(ctx context.Context) *cachewarmer.WarmPathMa var bootstrapMountAddress = flag.String("bootstrapMountAddress", "", "the mount address that hosts the worker bootstrap script") var bootstrapExportPath = flag.String("bootstrapExportPath", "", "the export path that hosts the worker bootstrap script") var bootstrapScriptPath = flag.String("bootstrapScriptPath", "", "the path to the worker bootstrap script") + var workerCount = flag.Int64("workerCount", 12, "the worker count to warm the cache") + var storageAccountResourceGroup = flag.String("storageAccountResourceGroup", "", "the storage account resource group") var storageAccount = flag.String("storageAccountName", "", "the storage account name to host the queue") - var storageKey = flag.String("storageKey", "", "the storage key to access the queue") var queueNamePrefix = flag.String("queueNamePrefix", "", "the queue name to be used for organizing the work. The queues will be created automatically") var vmssUserName = flag.String("vmssUserName", "", "the username for the vmss vms") @@ -70,14 +71,14 @@ func initializeApplicationVariables(ctx context.Context) *cachewarmer.WarmPathMa os.Exit(1) } - if len(*storageAccount) == 0 { - fmt.Fprintf(os.Stderr, "ERROR: storageAccount is not specified\n") + if len(*storageAccountResourceGroup) == 0 { + fmt.Fprintf(os.Stderr, "ERROR: storageAccountResourceGroup is not specified\n") usage() os.Exit(1) } - if len(*storageKey) == 0 { - fmt.Fprintf(os.Stderr, "ERROR: storageKey is not specified\n") + if len(*storageAccount) == 0 { + fmt.Fprintf(os.Stderr, "ERROR: storageAccount is not specified\n") usage() os.Exit(1) } @@ -112,10 +113,16 @@ func initializeApplicationVariables(ctx context.Context) *cachewarmer.WarmPathMa os.Exit(1) } + primaryKey, err := cachewarmer.GetPrimaryStorageKey(ctx, *storageAccountResourceGroup, *storageAccount) + if err != nil { + fmt.Fprintf(os.Stderr, "ERROR: unable to get storage account key: %s", err) + os.Exit(1) + } + cacheWarmerQueues, err := cachewarmer.InitializeCacheWarmerQueues( ctx, *storageAccount, - *storageKey, + primaryKey, *queueNamePrefix) if err != nil { fmt.Fprintf(os.Stderr, "ERROR: error initializing queue %v\n", err) @@ -124,6 +131,7 @@ func initializeApplicationVariables(ctx context.Context) *cachewarmer.WarmPathMa return cachewarmer.InitializeWarmPathManager( azureClients, + *workerCount, cacheWarmerQueues, *bootstrapMountAddress, *bootstrapExportPath, @@ -133,7 +141,7 @@ func initializeApplicationVariables(ctx context.Context) *cachewarmer.WarmPathMa *vmssSshPublicKey, *vmssSubnetName, *storageAccount, - *storageKey, + primaryKey, *queueNamePrefix, ) } diff --git a/src/go/cmd/cachewarmer/deploymentartifacts/bootstrap/systemd/cachewarmer-manager.service b/src/go/cmd/cachewarmer/deploymentartifacts/bootstrap/systemd/cachewarmer-manager.service index 715c4795..44ff69a3 100644 --- a/src/go/cmd/cachewarmer/deploymentartifacts/bootstrap/systemd/cachewarmer-manager.service +++ b/src/go/cmd/cachewarmer/deploymentartifacts/bootstrap/systemd/cachewarmer-manager.service @@ -12,7 +12,7 @@ LimitNOFILE=16384 Restart=always RestartSec=2 -ExecStart=/usr/local/bin/cachewarmer-manager -storageAccountName "STORAGE_ACCOUNT_REPLACE" -storageKey "STORAGE_KEY_REPLACE" -queueNamePrefix "QUEUE_PREFIX_REPLACE" -bootstrapExportPath "BOOTSTRAP_EXPORT_PATH_REPLACE" -bootstrapMountAddress "BOOTSTRAP_MOUNT_ADDRESS_REPLACE" -bootstrapScriptPath "BOOTSTRAP_SCRIPT_PATH_REPLACE" -vmssUserName "VMSS_USERNAME_REPLACE" VMSS_SSH_PUBLIC_KEY_REPLACE VMSS_PASSWORD_REPLACE VMSS_SUBNET_NAME_REPLACE +ExecStart=/usr/local/bin/cachewarmer-manager -storageAccountResourceGroup "STORAGE_RG_REPLACE" -storageAccountName "STORAGE_ACCOUNT_REPLACE"-queueNamePrefix "QUEUE_PREFIX_REPLACE" -bootstrapExportPath "BOOTSTRAP_EXPORT_PATH_REPLACE" -bootstrapMountAddress "BOOTSTRAP_MOUNT_ADDRESS_REPLACE" -bootstrapScriptPath "BOOTSTRAP_SCRIPT_PATH_REPLACE" -vmssUserName "VMSS_USERNAME_REPLACE" VMSS_SSH_PUBLIC_KEY_REPLACE VMSS_PASSWORD_REPLACE VMSS_SUBNET_NAME_REPLACE # make sure log directory exists and owned by syslog PermissionsStartOnly=true diff --git a/src/go/pkg/cachewarmer/const.go b/src/go/pkg/cachewarmer/const.go index 9a8e650d..f46b00df 100644 --- a/src/go/pkg/cachewarmer/const.go +++ b/src/go/pkg/cachewarmer/const.go @@ -37,21 +37,25 @@ const ( MountRetrySleepSeconds = 10 // this size is the most common, and will stand up the fastest - VMSSNodeSize = "Standard_D2s_v3" - VmssName = "cwvmss" - NodesPerNFSMountAddress = 2 + VMSSNodeSize = "Standard_D2s_v3" + VmssName = "cwvmss" + + /* by default Ubuntu doesn't install NFS and we need a distro with NFS installed by default for airgapped environments MarketPlacePublisher = "Canonical" MarketPlaceOffer = "UbuntuServer" MarketPlaceSku = "18.04-LTS" - /*PlanName = "" - PlanPublisherName = "" - PlanProductName = "" + PlanName = "" + PlanPublisherName = "" + PlanProductName = "" + */ + + // the controller will work in an airgapped environment MarketPlacePublisher = "microsoft-avere" MarketPlaceOffer = "vfxt" MarketPlaceSku = "avere-vfxt-controller" PlanName = "avere-vfxt-controller" PlanPublisherName = "microsoft-avere" - PlanProductName = "vfxt"*/ + PlanProductName = "vfxt" tick = time.Duration(1) * time.Millisecond // 1ms timeBetweenJobCheck = time.Duration(2) * time.Second // 2 seconds between checking for jobs @@ -66,4 +70,6 @@ const ( WorkerMultiplier = 2 MinimumJobsBeforeRefill = 100 + + SubscriptionIdEnvVar = "AZURE_SUBSCRIPTION_ID" ) diff --git a/src/go/pkg/cachewarmer/queue.go b/src/go/pkg/cachewarmer/queue.go index c594c698..26ced641 100644 --- a/src/go/pkg/cachewarmer/queue.go +++ b/src/go/pkg/cachewarmer/queue.go @@ -5,10 +5,51 @@ package cachewarmer import ( "context" "fmt" + "os" "github.com/Azure/Avere/src/go/pkg/azure" + "github.com/Azure/azure-sdk-for-go/profiles/2020-09-01/storage/mgmt/storage" + "github.com/Azure/go-autorest/autorest/azure/auth" ) +func GetSubscriptionID() (string, error) { + // try environment + if v := os.Getenv(SubscriptionIdEnvVar); v != "" { + return v, nil + } + + // try az cli file + if fileSettings, err := auth.GetSettingsFromFile(); err == nil && fileSettings.GetSubscriptionID() != "" { + return fileSettings.GetSubscriptionID(), nil + } + + // try metadata + if computeMetadata, err := GetComputeMetadata(); err == nil { + return computeMetadata.SubscriptionId, nil + } + + return "", fmt.Errorf("unable to get subscription from env var '%s', az cli login file, or instance meta data. Set the environment variable '%s' or run 'az login' to resolve", SubscriptionIdEnvVar, SubscriptionIdEnvVar) +} + +func GetPrimaryStorageKey(ctx context.Context, resourceGroup string, accountName string) (string, error) { + subscriptionId, err := GetSubscriptionID() + if err != nil { + return "", err + } + authorizer, err := auth.NewAuthorizerFromEnvironment() + if err != nil { + return "", fmt.Errorf("ERROR: authorizer from environment failed: %s", err) + } + accountsClient := storage.NewAccountsClient(subscriptionId) + accountsClient.Authorizer = authorizer + response, err := accountsClient.ListKeys(ctx, resourceGroup, accountName) + if err != nil { + return "", err + } + + return *(((*response.Keys)[0]).Value), nil +} + type CacheWarmerQueues struct { jobQueue *azure.Queue workQueue *azure.Queue diff --git a/src/go/pkg/cachewarmer/vmss.go b/src/go/pkg/cachewarmer/vmss.go index 32faadb7..48a86bb6 100644 --- a/src/go/pkg/cachewarmer/vmss.go +++ b/src/go/pkg/cachewarmer/vmss.go @@ -211,6 +211,9 @@ func createCacheWarmerVmssModel( publisher string, offer string, sku string, + planName string, + planPublisher string, + planProduct string, priority compute.VirtualMachinePriorityTypes, evictionPolicy compute.VirtualMachineEvictionPolicyTypes, subnetId string, @@ -236,6 +239,16 @@ func createCacheWarmerVmssModel( } } + var computePlan *compute.Plan + computePlan = nil + if len(planName) == 0 || len(planPublisher) == 0 || len(planProduct) == 0 { + computePlan = &compute.Plan{ + Name: to.StringPtr(planName), + Publisher: to.StringPtr(planPublisher), + Product: to.StringPtr(planProduct), + } + } + // create the vmss model return compute.VirtualMachineScaleSet{ Name: to.StringPtr(vmssName), @@ -244,6 +257,7 @@ func createCacheWarmerVmssModel( Name: to.StringPtr(vmssSKU), Capacity: to.Int64Ptr(nodeCount), }, + Plan: computePlan, VirtualMachineScaleSetProperties: &compute.VirtualMachineScaleSetProperties{ Overprovision: to.BoolPtr(false), UpgradePolicy: &compute.UpgradePolicy{ diff --git a/src/go/pkg/cachewarmer/warmpathmanager.go b/src/go/pkg/cachewarmer/warmpathmanager.go index 42acae46..e155efa1 100644 --- a/src/go/pkg/cachewarmer/warmpathmanager.go +++ b/src/go/pkg/cachewarmer/warmpathmanager.go @@ -21,6 +21,7 @@ import ( // WarmPathManager contains the information for the manager type WarmPathManager struct { AzureClients *AzureClients + WorkerCount int64 Queues *CacheWarmerQueues bootstrapMountAddress string bootstrapExportPath string @@ -37,6 +38,7 @@ type WarmPathManager struct { // InitializeWarmPathManager initializes the job submitter structure func InitializeWarmPathManager( azureClients *AzureClients, + workerCount int64, queues *CacheWarmerQueues, bootstrapMountAddress string, bootstrapExportPath string, @@ -50,6 +52,7 @@ func InitializeWarmPathManager( queueNamePrefix string) *WarmPathManager { return &WarmPathManager{ AzureClients: azureClients, + WorkerCount: workerCount, Queues: queues, bootstrapMountAddress: bootstrapMountAddress, bootstrapExportPath: bootstrapExportPath, @@ -310,8 +313,7 @@ func (m *WarmPathManager) RunVMSSManager(ctx context.Context, syncWaitGroup *syn if workerJob == nil { continue } - mountCount := len(workerJob.WarmTargetMountAddresses) - m.EnsureVmssRunning(ctx, mountCount) + m.EnsureVmssRunning(ctx) lastJobSeen = time.Now() } lastReadQueueSuccess = time.Now() @@ -320,7 +322,7 @@ func (m *WarmPathManager) RunVMSSManager(ctx context.Context, syncWaitGroup *syn } } -func (m *WarmPathManager) EnsureVmssRunning(ctx context.Context, mountCount int) { +func (m *WarmPathManager) EnsureVmssRunning(ctx context.Context) { vmssExists, err := VmssExists(ctx, m.AzureClients, VmssName) if err != nil { log.Error.Printf("checking VMSS existence failed with error %v", err) @@ -357,26 +359,27 @@ func (m *WarmPathManager) EnsureVmssRunning(ctx context.Context, mountCount int) return } - vmssCount := int64(mountCount * NodesPerNFSMountAddress) - cacheWarmerVmss := createCacheWarmerVmssModel( VmssName, // vmssName string, m.AzureClients.LocalMetadata.Location, // location string, VMSSNodeSize, // vmssSKU string, - vmssCount, // nodeCount int64, + m.WorkerCount, // nodeCount int64, m.vmssUserName, // userName string, m.vmssPassword, // password string, m.vmssSshPublicKey, // sshKeyData string, MarketPlacePublisher, // publisher string, MarketPlaceOffer, // offer string, MarketPlaceSku, // sku string, + PlanName, // planName string, + PlanPublisherName, // planPublisherName string, + PlanProductName, // planProductName string, compute.Spot, // priority compute.VirtualMachinePriorityTypes, compute.Delete, // evictionPolicy compute.VirtualMachineEvictionPolicyTypes vmssSubnetId, // subnetId string customData, ) - log.Info.Printf("create VMSS with %d workers", vmssCount) + log.Info.Printf("create VMSS with %d workers", m.WorkerCount) if _, err := CreateVmss(ctx, m.AzureClients, cacheWarmerVmss); err != nil { log.Error.Printf("error creating vmss: %v", err) return