This commit is contained in:
Anthony Howe 2018-12-13 09:26:57 -05:00 коммит произвёл GitHub
Родитель e243653016
Коммит 48b4b2c573
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
4 изменённых файлов: 373 добавлений и 2 удалений

Просмотреть файл

@ -0,0 +1,251 @@
package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"os"
"sync"
"time"
"github.com/Azure/Avere/src/go/pkg/azure"
"github.com/Azure/Avere/src/go/pkg/cli"
"github.com/Azure/Avere/src/go/pkg/log"
"github.com/Azure/Avere/src/go/pkg/random"
"github.com/google/uuid"
)
// BlobUploader handles all the blob uploads
type BlobUploader struct {
Context context.Context
BlobContainer *azure.BlobContainer
BlobSizeBytes int64
BlobCount int
ThreadCount int
BlobsUploaded int
FailureCount int
BytesUploaded int64
JobRunTime time.Duration
uploadBytesChannel chan int64
failureChannel chan struct{}
successChannel chan int64
}
// InitializeBlobUploader initializes the blob uploader
func InitializeBlobUploader(
ctx context.Context,
storageAccount string,
storageAccountKey string,
blobContainerName string,
blobSizeBytes int64,
blobCount int,
threadCount int) (*BlobUploader, error) {
blobContainer, err := azure.InitializeBlobContainer(ctx, storageAccount, storageAccountKey, blobContainerName)
if err != nil {
return nil, err
}
return &BlobUploader{
Context: ctx,
BlobContainer: blobContainer,
BlobSizeBytes: blobSizeBytes,
BlobCount: blobCount,
ThreadCount: threadCount,
uploadBytesChannel: make(chan int64),
successChannel: make(chan int64),
failureChannel: make(chan struct{}),
}, nil
}
// Run starts the upload workers
func (b *BlobUploader) Run(syncWaitGroup *sync.WaitGroup) {
start := time.Now()
defer func() { b.JobRunTime = time.Now().Sub(start) }()
log.Info.Printf("started BlobUploader.Run()\n")
defer syncWaitGroup.Done()
var cancel context.CancelFunc
b.Context, cancel = context.WithCancel(b.Context)
// start the ready queue listener and its workers
// this uses the example from here: https://github.com/Azure/azure-storage-queue-go/blob/master/2017-07-29/azqueue/zt_examples_test.go
for i := 0; i < b.ThreadCount; i++ {
syncWaitGroup.Add(1)
go b.StartBlobUploader(syncWaitGroup)
}
// dispatch jobs to the workers
dispatchedCount := 0
for dispatchedCount < b.BlobCount {
select {
case <-b.Context.Done():
return
case b.uploadBytesChannel <- b.BlobSizeBytes:
dispatchedCount++
case msg := <-b.uploadBytesChannel:
b.BlobsUploaded++
b.BytesUploaded += msg
case <-b.failureChannel:
b.FailureCount++
}
}
// wait for completion
for {
select {
case msg := <-b.uploadBytesChannel:
b.BlobsUploaded++
b.BytesUploaded += msg
case <-b.failureChannel:
b.FailureCount++
}
if (b.BlobsUploaded + b.FailureCount) == b.BlobCount {
cancel()
return
}
}
}
// StartBlobUploader starts the blob uploader
func (b *BlobUploader) StartBlobUploader(syncWaitGroup *sync.WaitGroup) {
defer syncWaitGroup.Done()
log.Info.Printf("[StartBlobUploader")
defer log.Info.Printf("completed StartBlobUploader]")
for {
// handle the messages
select {
case <-b.Context.Done():
return
case msg := <-b.uploadBytesChannel:
b.uploadBlob(msg)
}
}
}
// PrintStats prints out the statistics
func (b *BlobUploader) PrintStats() {
log.Info.Printf("Blobs Uploaded: %d / %d (%f failure rate)", b.BlobsUploaded, b.BlobCount, float32(b.FailureCount)/float32(b.BlobCount))
log.Info.Printf("MB/s Upload Rate: %f", float64(b.BytesUploaded)/(b.JobRunTime.Seconds()*float64(1024*1024)))
}
// defines the blob contents
type BlobContents struct {
Name string
PaddedString string
}
func (b *BlobUploader) uploadBlob(bytes int64) {
blobContents := &BlobContents{
Name: uuid.New().String(),
}
// learn the size of the current object
data, err := json.Marshal(blobContents)
if err != nil {
log.Error.Printf("error encountered marshalling blob %v", err)
b.failureChannel <- struct{}{}
return
}
// pad and re-martial to match the bytes
padLength := bytes - int64(len(data))
if padLength > 0 {
blobContents.PaddedString = random.RandStringRunes(int(padLength))
data, err = json.Marshal(blobContents)
if err != nil {
log.Error.Printf("error encountered marshalling blob %v", err)
b.failureChannel <- struct{}{}
return
}
}
if err := b.BlobContainer.UploadBlob(blobContents.Name, data); err != nil {
log.Error.Printf("failed to upload blob %v", err)
} else {
select {
case <-b.Context.Done():
return
case b.uploadBytesChannel <- bytes:
}
}
}
func usage(errs ...error) {
for _, err := range errs {
fmt.Fprintf(os.Stderr, "error: %s\n\n", err.Error())
}
fmt.Fprintf(os.Stderr, "usage: %s [OPTIONS]\n", os.Args[0])
fmt.Fprintf(os.Stderr, " write the job config file and posts to the queue\n")
fmt.Fprintf(os.Stderr, "\n")
fmt.Fprintf(os.Stderr, "required env vars:\n")
fmt.Fprintf(os.Stderr, "\t%s - azure storage account\n", azure.AZURE_STORAGE_ACCOUNT)
fmt.Fprintf(os.Stderr, "\t%s - azure storage account key\n", azure.AZURE_STORAGE_ACCOUNT_KEY)
fmt.Fprintf(os.Stderr, "\n")
fmt.Fprintf(os.Stderr, "options:\n")
flag.PrintDefaults()
}
func verifyEnvVars() bool {
available := true
available = available && cli.VerifyEnvVar(azure.AZURE_STORAGE_ACCOUNT)
available = available && cli.VerifyEnvVar(azure.AZURE_STORAGE_ACCOUNT_KEY)
return available
}
// GetContainerName generates a container based on time
func GetContainerName(blobCount int) string {
t := time.Now()
return fmt.Sprintf("job-%02d-%02d-%02d-%02d%02d%02d-%d", t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), blobCount)
}
func initializeApplicationVariables(ctx context.Context) (*BlobUploader, error) {
var blobFileSizeKB = flag.Int("blobFileSizeKB", 8*1024, "the blob file size in KB")
var blobCount = flag.Int("blobCount", 12, "the count of threads")
var threadCount = flag.Int("threadCount", 12, "the count of threads")
flag.Parse()
if envVarsAvailable := verifyEnvVars(); !envVarsAvailable {
usage()
os.Exit(1)
}
containerName := GetContainerName(*blobCount)
azure.FatalValidateContainerName(containerName)
storageAccount := cli.GetEnv(azure.AZURE_STORAGE_ACCOUNT)
storageKey := cli.GetEnv(azure.AZURE_STORAGE_ACCOUNT_KEY)
return InitializeBlobUploader(
ctx,
storageAccount,
storageKey,
containerName,
int64(*blobFileSizeKB)*int64(1024),
*blobCount,
*threadCount)
}
func main() {
// setup the shared context
ctx := context.Background()
syncWaitGroup := sync.WaitGroup{}
// initialize and start the orchestrator
blobUploader, err := initializeApplicationVariables(ctx)
if err != nil {
log.Error.Printf("error creating blob uploader: %v", err)
os.Exit(1)
}
syncWaitGroup.Add(1)
go blobUploader.Run(&syncWaitGroup)
log.Info.Printf("Waiting for all processes to finish")
syncWaitGroup.Wait()
blobUploader.PrintStats()
log.Info.Printf("finished")
}

Просмотреть файл

@ -0,0 +1,120 @@
package azure
import (
"context"
"fmt"
"net/url"
"os"
"regexp"
"github.com/Azure/Avere/src/go/pkg/log"
"github.com/Azure/azure-storage-blob-go/azblob"
)
const (
productionBlobURLTemplate = "https://%s.blob.core.windows.net"
)
// BlobContainer represents a blob container, this can be used to read/write blockblobs, appendblobs, or page blobs
// The implementation has been influenced by https://github.com/Azure/azure-storage-blob-go/blob/master/azblob/zt_examples_test.go
// RESTAPI: https://docs.microsoft.com/en-us/rest/api/storageservices/blob-service-rest-api
// AZBLOB: https://godoc.org/github.com/Azure/azure-storage-blob-go/azblob#pkg-examples
type BlobContainer struct {
ContainerURL azblob.ContainerURL
Context context.Context
}
// FatalValidateContainerName exits the program if the containername is not valid
func FatalValidateContainerName(containerName string) {
isValid, errorMessage := ValidateContainerName(containerName)
if !isValid {
log.Error.Printf(errorMessage)
os.Exit(1)
}
}
// ValidateContainerName validates container name according to https://docs.microsoft.com/en-us/rest/api/storageservices/create-container
func ValidateContainerName(containerName string) (bool, string) {
matched, err := regexp.MatchString("^[a-z0-9][a-z0-9-]{1,61}[a-z0-9]$", containerName)
if err != nil {
errorMessage := fmt.Sprintf("error while parsing container Name '%s' to server: %v", containerName, err)
return false, errorMessage
}
if !matched {
errorMessage := fmt.Sprintf("'%s' is not a valid queue name. Blob container needs to be 3-63 lowercase alphanumeric characters where all but the first and last character may be dash (https://docs.microsoft.com/en-us/rest/api/storageservices/create-container)", containerName)
return false, errorMessage
}
return true, ""
}
// InitializeBlob creates a Blob to represent the Azure Storage Queue
func InitializeBlobContainer(ctx context.Context, storageAccount string, storageAccountKey string, containerName string) (*BlobContainer, error) {
credential, err := azblob.NewSharedKeyCredential(storageAccount, storageAccountKey)
if err != nil {
log.Error.Printf("encountered error while creating new shared key credential %v", err)
return nil, err
}
p := azblob.NewPipeline(credential, azblob.PipelineOptions{})
u, _ := url.Parse(fmt.Sprintf(productionBlobURLTemplate, storageAccount))
serviceURL := azblob.NewServiceURL(*u, p)
// Create a URL that references the blob container in the Azure Storage account.
containerURL := serviceURL.NewContainerURL(containerName)
// create the container if it does not already exist
if containerCreateResponse, err := containerURL.Create(ctx, azblob.Metadata{}, azblob.PublicAccessNone); err != nil {
if serr, ok := err.(azblob.StorageError); !ok || serr.ServiceCode() != azblob.ServiceCodeContainerAlreadyExists {
log.Error.Printf("error encountered: %v", serr.ServiceCode())
return nil, err
}
} else if containerCreateResponse.StatusCode() == 201 {
log.Info.Printf("successfully created blob container '%s'", containerName)
}
return &BlobContainer{
ContainerURL: containerURL,
Context: ctx,
}, nil
}
// UploadBlob uploads the blob to the container
func (b *BlobContainer) UploadBlob(blobname string, data []byte) error {
blobURL := b.ContainerURL.NewBlockBlobURL(blobname)
if _, err := azblob.UploadBufferToBlockBlob(b.Context, data, blobURL, azblob.UploadToBlockBlobOptions{}); err != nil {
log.Error.Printf("encountered error uploading blob '%s': '%v'", blobname, err)
return err
}
return nil
}
// DownloadBlob downloads the bytes of the blob
func (b *BlobContainer) DownloadBlob(blobname string) ([]byte, error) {
blobURL := b.ContainerURL.NewBlobURL(blobname)
blobProperties, err := blobURL.GetProperties(b.Context, azblob.BlobAccessConditions{})
if err != nil {
log.Error.Printf("encountered error getting blob properties for '%s': '%v'", blobname, err)
return nil, err
}
data := make([]byte, 0, blobProperties.ContentLength())
if err := azblob.DownloadBlobToBuffer(b.Context, blobURL, 0, 0, data, azblob.DownloadFromBlobOptions{}); err != nil {
log.Error.Printf("encountered error downloading blob '%s': '%v'", blobname, err)
return nil, err
}
return data, nil
}
// DeleteBlob deletes the blob
func (b *BlobContainer) DeleteBlob(blobname string) error {
blobURL := b.ContainerURL.NewBlobURL(blobname)
if _, err := blobURL.Delete(b.Context, azblob.DeleteSnapshotsOptionNone, azblob.BlobAccessConditions{}); err != nil {
log.Error.Printf("encountered error deleting blob '%s': '%v'", blobname, err)
return err
}
return nil
}

Просмотреть файл

@ -71,7 +71,7 @@ func (j *JobConfigFile) WriteJobConfigFile(writer *file.ReaderWriter, filepath s
}
// pad and re-martial to match the bytes
padLength := (KB * 384) - len(data)
padLength := (KB * fileSize) - len(data)
if padLength > 0 {
j.PaddedString = random.RandStringRunes(padLength)
data, err = json.Marshal(j)

Просмотреть файл

@ -72,7 +72,7 @@ func (w *WorkFileWriter) WriteStartFiles(writer *file.ReaderWriter, filepath str
}
// pad and re-martial to match the bytes
padLength := (KB * 384) - len(data)
padLength := (KB * fileSize) - len(data)
if padLength > 0 {
w.PaddedString = random.RandStringRunes(padLength)
data, err = json.Marshal(w)