зеркало из https://github.com/Azure/Avere.git
cachemanager fixes (#1277)
This commit is contained in:
Родитель
3d3fb8d6ed
Коммит
d5d49cc5ef
|
@ -104,8 +104,8 @@ On the controller or jumpbox, execute the following steps
|
|||
```bash
|
||||
export BOOTSTRAP_PATH=/nfs/node0
|
||||
|
||||
export STORAGE_ACCOUNT_RESOURCE_GROUP=
|
||||
export STORAGE_ACCOUNT=
|
||||
export STORAGE_KEY=''
|
||||
export QUEUE_PREFIX=
|
||||
|
||||
export BOOTSTRAP_EXPORT_PATH=/nfs1data
|
||||
|
@ -136,6 +136,7 @@ export STORAGE_ACCOUNT=
|
|||
export STORAGE_KEY=''
|
||||
export QUEUE_PREFIX=
|
||||
```
|
||||
|
||||
2. Run the following script:
|
||||
```bash
|
||||
bash /nfs/node0/bootstrap/bootstrap.cachewarmer-worker.sh
|
||||
|
@ -146,5 +147,5 @@ bash /nfs/node0/bootstrap/bootstrap.cachewarmer-worker.sh
|
|||
To submit a job, run a command similar to the following command, where the warm target variables are the Avere junction to warm:
|
||||
|
||||
```bash
|
||||
sudo /usr/local/bin/cachewarmer-jobsubmitter -enableDebugging -storageAccountName "STORAGEACCOUNTREPLACE" -storageKey "STORAGEKEYREPLACE" -queueNamePrefix "QUEUEPREFIXREPLACE" -warmTargetExportPath "/nfs1data" -warmTargetMountAddresses "10.0.1.11,10.0.1.12,10.0.1.13" -warmTargetPath "/island"
|
||||
sudo /usr/local/bin/cachewarmer-jobsubmitter -enableDebugging -storageAccountResourceGroup "STORAGERGREPLACE" -storageAccountName "STORAGEACCOUNTREPLACE" -queueNamePrefix "QUEUEPREFIXREPLACE" -warmTargetExportPath "/nfs1data" -warmTargetMountAddresses "10.0.1.11,10.0.1.12,10.0.1.13" -warmTargetPath "/island"
|
||||
```
|
||||
|
|
|
@ -42,8 +42,8 @@ func initializeApplicationVariables(ctx context.Context) (*cachewarmer.WarmPathJ
|
|||
|
||||
var maxFileSizeBytes = flag.Int64("maxFileSizeBytes", 0, "the maximum file size in bytes to warm.")
|
||||
|
||||
var storageAccountResourceGroup = flag.String("storageAccountResourceGroup", "", "the storage account resource group")
|
||||
var storageAccount = flag.String("storageAccountName", "", "the storage account name to host the queue")
|
||||
var storageKey = flag.String("storageKey", "", "the storage key to access the queue")
|
||||
var queueNamePrefix = flag.String("queueNamePrefix", "", "the queue name to be used for organizing the work. The queues will be created automatically")
|
||||
|
||||
var blockUntilWarm = flag.Bool("blockUntilWarm", false, "the job submitter will not return until there are no more jobs")
|
||||
|
@ -72,14 +72,14 @@ func initializeApplicationVariables(ctx context.Context) (*cachewarmer.WarmPathJ
|
|||
os.Exit(1)
|
||||
}
|
||||
|
||||
if len(*storageAccount) == 0 {
|
||||
fmt.Fprintf(os.Stderr, "ERROR: storageAccount is not specified\n")
|
||||
if len(*storageAccountResourceGroup) == 0 {
|
||||
fmt.Fprintf(os.Stderr, "ERROR: storageAccountResourceGroup is not specified\n")
|
||||
usage()
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
if len(*storageKey) == 0 {
|
||||
fmt.Fprintf(os.Stderr, "ERROR: storageKey is not specified\n")
|
||||
if len(*storageAccount) == 0 {
|
||||
fmt.Fprintf(os.Stderr, "ERROR: storageAccount is not specified\n")
|
||||
usage()
|
||||
os.Exit(1)
|
||||
}
|
||||
|
@ -96,6 +96,12 @@ func initializeApplicationVariables(ctx context.Context) (*cachewarmer.WarmPathJ
|
|||
os.Exit(1)
|
||||
}
|
||||
|
||||
primaryKey, err := cachewarmer.GetPrimaryStorageKey(ctx, *storageAccountResourceGroup, *storageAccount)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "ERROR: unable to get storage account key: %s", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
warmJobPath := cachewarmer.InitializeWarmPathJob(
|
||||
*warmTargetMountAddresses,
|
||||
*warmTargetExportPath,
|
||||
|
@ -107,7 +113,7 @@ func initializeApplicationVariables(ctx context.Context) (*cachewarmer.WarmPathJ
|
|||
cacheWarmerQueues, err := cachewarmer.InitializeCacheWarmerQueues(
|
||||
ctx,
|
||||
*storageAccount,
|
||||
*storageKey,
|
||||
primaryKey,
|
||||
*queueNamePrefix)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "ERROR: error initializing queue %v\n", err)
|
||||
|
|
|
@ -36,9 +36,10 @@ func initializeApplicationVariables(ctx context.Context) *cachewarmer.WarmPathMa
|
|||
var bootstrapMountAddress = flag.String("bootstrapMountAddress", "", "the mount address that hosts the worker bootstrap script")
|
||||
var bootstrapExportPath = flag.String("bootstrapExportPath", "", "the export path that hosts the worker bootstrap script")
|
||||
var bootstrapScriptPath = flag.String("bootstrapScriptPath", "", "the path to the worker bootstrap script")
|
||||
var workerCount = flag.Int64("workerCount", 12, "the worker count to warm the cache")
|
||||
|
||||
var storageAccountResourceGroup = flag.String("storageAccountResourceGroup", "", "the storage account resource group")
|
||||
var storageAccount = flag.String("storageAccountName", "", "the storage account name to host the queue")
|
||||
var storageKey = flag.String("storageKey", "", "the storage key to access the queue")
|
||||
var queueNamePrefix = flag.String("queueNamePrefix", "", "the queue name to be used for organizing the work. The queues will be created automatically")
|
||||
|
||||
var vmssUserName = flag.String("vmssUserName", "", "the username for the vmss vms")
|
||||
|
@ -70,14 +71,14 @@ func initializeApplicationVariables(ctx context.Context) *cachewarmer.WarmPathMa
|
|||
os.Exit(1)
|
||||
}
|
||||
|
||||
if len(*storageAccount) == 0 {
|
||||
fmt.Fprintf(os.Stderr, "ERROR: storageAccount is not specified\n")
|
||||
if len(*storageAccountResourceGroup) == 0 {
|
||||
fmt.Fprintf(os.Stderr, "ERROR: storageAccountResourceGroup is not specified\n")
|
||||
usage()
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
if len(*storageKey) == 0 {
|
||||
fmt.Fprintf(os.Stderr, "ERROR: storageKey is not specified\n")
|
||||
if len(*storageAccount) == 0 {
|
||||
fmt.Fprintf(os.Stderr, "ERROR: storageAccount is not specified\n")
|
||||
usage()
|
||||
os.Exit(1)
|
||||
}
|
||||
|
@ -112,10 +113,16 @@ func initializeApplicationVariables(ctx context.Context) *cachewarmer.WarmPathMa
|
|||
os.Exit(1)
|
||||
}
|
||||
|
||||
primaryKey, err := cachewarmer.GetPrimaryStorageKey(ctx, *storageAccountResourceGroup, *storageAccount)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "ERROR: unable to get storage account key: %s", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
cacheWarmerQueues, err := cachewarmer.InitializeCacheWarmerQueues(
|
||||
ctx,
|
||||
*storageAccount,
|
||||
*storageKey,
|
||||
primaryKey,
|
||||
*queueNamePrefix)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "ERROR: error initializing queue %v\n", err)
|
||||
|
@ -124,6 +131,7 @@ func initializeApplicationVariables(ctx context.Context) *cachewarmer.WarmPathMa
|
|||
|
||||
return cachewarmer.InitializeWarmPathManager(
|
||||
azureClients,
|
||||
*workerCount,
|
||||
cacheWarmerQueues,
|
||||
*bootstrapMountAddress,
|
||||
*bootstrapExportPath,
|
||||
|
@ -133,7 +141,7 @@ func initializeApplicationVariables(ctx context.Context) *cachewarmer.WarmPathMa
|
|||
*vmssSshPublicKey,
|
||||
*vmssSubnetName,
|
||||
*storageAccount,
|
||||
*storageKey,
|
||||
primaryKey,
|
||||
*queueNamePrefix,
|
||||
)
|
||||
}
|
||||
|
|
|
@ -12,7 +12,7 @@ LimitNOFILE=16384
|
|||
Restart=always
|
||||
RestartSec=2
|
||||
|
||||
ExecStart=/usr/local/bin/cachewarmer-manager -storageAccountName "STORAGE_ACCOUNT_REPLACE" -storageKey "STORAGE_KEY_REPLACE" -queueNamePrefix "QUEUE_PREFIX_REPLACE" -bootstrapExportPath "BOOTSTRAP_EXPORT_PATH_REPLACE" -bootstrapMountAddress "BOOTSTRAP_MOUNT_ADDRESS_REPLACE" -bootstrapScriptPath "BOOTSTRAP_SCRIPT_PATH_REPLACE" -vmssUserName "VMSS_USERNAME_REPLACE" VMSS_SSH_PUBLIC_KEY_REPLACE VMSS_PASSWORD_REPLACE VMSS_SUBNET_NAME_REPLACE
|
||||
ExecStart=/usr/local/bin/cachewarmer-manager -storageAccountResourceGroup "STORAGE_RG_REPLACE" -storageAccountName "STORAGE_ACCOUNT_REPLACE"-queueNamePrefix "QUEUE_PREFIX_REPLACE" -bootstrapExportPath "BOOTSTRAP_EXPORT_PATH_REPLACE" -bootstrapMountAddress "BOOTSTRAP_MOUNT_ADDRESS_REPLACE" -bootstrapScriptPath "BOOTSTRAP_SCRIPT_PATH_REPLACE" -vmssUserName "VMSS_USERNAME_REPLACE" VMSS_SSH_PUBLIC_KEY_REPLACE VMSS_PASSWORD_REPLACE VMSS_SUBNET_NAME_REPLACE
|
||||
|
||||
# make sure log directory exists and owned by syslog
|
||||
PermissionsStartOnly=true
|
||||
|
|
|
@ -37,21 +37,25 @@ const (
|
|||
MountRetrySleepSeconds = 10
|
||||
|
||||
// this size is the most common, and will stand up the fastest
|
||||
VMSSNodeSize = "Standard_D2s_v3"
|
||||
VmssName = "cwvmss"
|
||||
NodesPerNFSMountAddress = 2
|
||||
VMSSNodeSize = "Standard_D2s_v3"
|
||||
VmssName = "cwvmss"
|
||||
|
||||
/* by default Ubuntu doesn't install NFS and we need a distro with NFS installed by default for airgapped environments
|
||||
MarketPlacePublisher = "Canonical"
|
||||
MarketPlaceOffer = "UbuntuServer"
|
||||
MarketPlaceSku = "18.04-LTS"
|
||||
/*PlanName = ""
|
||||
PlanPublisherName = ""
|
||||
PlanProductName = ""
|
||||
PlanName = ""
|
||||
PlanPublisherName = ""
|
||||
PlanProductName = ""
|
||||
*/
|
||||
|
||||
// the controller will work in an airgapped environment
|
||||
MarketPlacePublisher = "microsoft-avere"
|
||||
MarketPlaceOffer = "vfxt"
|
||||
MarketPlaceSku = "avere-vfxt-controller"
|
||||
PlanName = "avere-vfxt-controller"
|
||||
PlanPublisherName = "microsoft-avere"
|
||||
PlanProductName = "vfxt"*/
|
||||
PlanProductName = "vfxt"
|
||||
|
||||
tick = time.Duration(1) * time.Millisecond // 1ms
|
||||
timeBetweenJobCheck = time.Duration(2) * time.Second // 2 seconds between checking for jobs
|
||||
|
@ -66,4 +70,6 @@ const (
|
|||
|
||||
WorkerMultiplier = 2
|
||||
MinimumJobsBeforeRefill = 100
|
||||
|
||||
SubscriptionIdEnvVar = "AZURE_SUBSCRIPTION_ID"
|
||||
)
|
||||
|
|
|
@ -5,10 +5,51 @@ package cachewarmer
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"github.com/Azure/Avere/src/go/pkg/azure"
|
||||
"github.com/Azure/azure-sdk-for-go/profiles/2020-09-01/storage/mgmt/storage"
|
||||
"github.com/Azure/go-autorest/autorest/azure/auth"
|
||||
)
|
||||
|
||||
func GetSubscriptionID() (string, error) {
|
||||
// try environment
|
||||
if v := os.Getenv(SubscriptionIdEnvVar); v != "" {
|
||||
return v, nil
|
||||
}
|
||||
|
||||
// try az cli file
|
||||
if fileSettings, err := auth.GetSettingsFromFile(); err == nil && fileSettings.GetSubscriptionID() != "" {
|
||||
return fileSettings.GetSubscriptionID(), nil
|
||||
}
|
||||
|
||||
// try metadata
|
||||
if computeMetadata, err := GetComputeMetadata(); err == nil {
|
||||
return computeMetadata.SubscriptionId, nil
|
||||
}
|
||||
|
||||
return "", fmt.Errorf("unable to get subscription from env var '%s', az cli login file, or instance meta data. Set the environment variable '%s' or run 'az login' to resolve", SubscriptionIdEnvVar, SubscriptionIdEnvVar)
|
||||
}
|
||||
|
||||
func GetPrimaryStorageKey(ctx context.Context, resourceGroup string, accountName string) (string, error) {
|
||||
subscriptionId, err := GetSubscriptionID()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
authorizer, err := auth.NewAuthorizerFromEnvironment()
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("ERROR: authorizer from environment failed: %s", err)
|
||||
}
|
||||
accountsClient := storage.NewAccountsClient(subscriptionId)
|
||||
accountsClient.Authorizer = authorizer
|
||||
response, err := accountsClient.ListKeys(ctx, resourceGroup, accountName)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return *(((*response.Keys)[0]).Value), nil
|
||||
}
|
||||
|
||||
type CacheWarmerQueues struct {
|
||||
jobQueue *azure.Queue
|
||||
workQueue *azure.Queue
|
||||
|
|
|
@ -211,6 +211,9 @@ func createCacheWarmerVmssModel(
|
|||
publisher string,
|
||||
offer string,
|
||||
sku string,
|
||||
planName string,
|
||||
planPublisher string,
|
||||
planProduct string,
|
||||
priority compute.VirtualMachinePriorityTypes,
|
||||
evictionPolicy compute.VirtualMachineEvictionPolicyTypes,
|
||||
subnetId string,
|
||||
|
@ -236,6 +239,16 @@ func createCacheWarmerVmssModel(
|
|||
}
|
||||
}
|
||||
|
||||
var computePlan *compute.Plan
|
||||
computePlan = nil
|
||||
if len(planName) == 0 || len(planPublisher) == 0 || len(planProduct) == 0 {
|
||||
computePlan = &compute.Plan{
|
||||
Name: to.StringPtr(planName),
|
||||
Publisher: to.StringPtr(planPublisher),
|
||||
Product: to.StringPtr(planProduct),
|
||||
}
|
||||
}
|
||||
|
||||
// create the vmss model
|
||||
return compute.VirtualMachineScaleSet{
|
||||
Name: to.StringPtr(vmssName),
|
||||
|
@ -244,6 +257,7 @@ func createCacheWarmerVmssModel(
|
|||
Name: to.StringPtr(vmssSKU),
|
||||
Capacity: to.Int64Ptr(nodeCount),
|
||||
},
|
||||
Plan: computePlan,
|
||||
VirtualMachineScaleSetProperties: &compute.VirtualMachineScaleSetProperties{
|
||||
Overprovision: to.BoolPtr(false),
|
||||
UpgradePolicy: &compute.UpgradePolicy{
|
||||
|
|
|
@ -21,6 +21,7 @@ import (
|
|||
// WarmPathManager contains the information for the manager
|
||||
type WarmPathManager struct {
|
||||
AzureClients *AzureClients
|
||||
WorkerCount int64
|
||||
Queues *CacheWarmerQueues
|
||||
bootstrapMountAddress string
|
||||
bootstrapExportPath string
|
||||
|
@ -37,6 +38,7 @@ type WarmPathManager struct {
|
|||
// InitializeWarmPathManager initializes the job submitter structure
|
||||
func InitializeWarmPathManager(
|
||||
azureClients *AzureClients,
|
||||
workerCount int64,
|
||||
queues *CacheWarmerQueues,
|
||||
bootstrapMountAddress string,
|
||||
bootstrapExportPath string,
|
||||
|
@ -50,6 +52,7 @@ func InitializeWarmPathManager(
|
|||
queueNamePrefix string) *WarmPathManager {
|
||||
return &WarmPathManager{
|
||||
AzureClients: azureClients,
|
||||
WorkerCount: workerCount,
|
||||
Queues: queues,
|
||||
bootstrapMountAddress: bootstrapMountAddress,
|
||||
bootstrapExportPath: bootstrapExportPath,
|
||||
|
@ -310,8 +313,7 @@ func (m *WarmPathManager) RunVMSSManager(ctx context.Context, syncWaitGroup *syn
|
|||
if workerJob == nil {
|
||||
continue
|
||||
}
|
||||
mountCount := len(workerJob.WarmTargetMountAddresses)
|
||||
m.EnsureVmssRunning(ctx, mountCount)
|
||||
m.EnsureVmssRunning(ctx)
|
||||
lastJobSeen = time.Now()
|
||||
}
|
||||
lastReadQueueSuccess = time.Now()
|
||||
|
@ -320,7 +322,7 @@ func (m *WarmPathManager) RunVMSSManager(ctx context.Context, syncWaitGroup *syn
|
|||
}
|
||||
}
|
||||
|
||||
func (m *WarmPathManager) EnsureVmssRunning(ctx context.Context, mountCount int) {
|
||||
func (m *WarmPathManager) EnsureVmssRunning(ctx context.Context) {
|
||||
vmssExists, err := VmssExists(ctx, m.AzureClients, VmssName)
|
||||
if err != nil {
|
||||
log.Error.Printf("checking VMSS existence failed with error %v", err)
|
||||
|
@ -357,26 +359,27 @@ func (m *WarmPathManager) EnsureVmssRunning(ctx context.Context, mountCount int)
|
|||
return
|
||||
}
|
||||
|
||||
vmssCount := int64(mountCount * NodesPerNFSMountAddress)
|
||||
|
||||
cacheWarmerVmss := createCacheWarmerVmssModel(
|
||||
VmssName, // vmssName string,
|
||||
m.AzureClients.LocalMetadata.Location, // location string,
|
||||
VMSSNodeSize, // vmssSKU string,
|
||||
vmssCount, // nodeCount int64,
|
||||
m.WorkerCount, // nodeCount int64,
|
||||
m.vmssUserName, // userName string,
|
||||
m.vmssPassword, // password string,
|
||||
m.vmssSshPublicKey, // sshKeyData string,
|
||||
MarketPlacePublisher, // publisher string,
|
||||
MarketPlaceOffer, // offer string,
|
||||
MarketPlaceSku, // sku string,
|
||||
PlanName, // planName string,
|
||||
PlanPublisherName, // planPublisherName string,
|
||||
PlanProductName, // planProductName string,
|
||||
compute.Spot, // priority compute.VirtualMachinePriorityTypes,
|
||||
compute.Delete, // evictionPolicy compute.VirtualMachineEvictionPolicyTypes
|
||||
vmssSubnetId, // subnetId string
|
||||
customData,
|
||||
)
|
||||
|
||||
log.Info.Printf("create VMSS with %d workers", vmssCount)
|
||||
log.Info.Printf("create VMSS with %d workers", m.WorkerCount)
|
||||
if _, err := CreateVmss(ctx, m.AzureClients, cacheWarmerVmss); err != nil {
|
||||
log.Error.Printf("error creating vmss: %v", err)
|
||||
return
|
||||
|
|
Загрузка…
Ссылка в новой задаче