Fix to Hive and Spark functions
This commit is contained in:
AlanWeaver 2017-06-14 16:11:52 +01:00
Родитель 2dd1cd8631
Коммит 74906b414e
11 изменённых файлов: 104 добавлений и 597 удалений

Просмотреть файл

@ -258,7 +258,7 @@ azureCreateHDI <- function(azureActiveContext, resourceGroup, location,
#' @family HDInsight functions
#' @export
azureResizeHDI <- function(azureActiveContext, clustername,
role = c("worker", "head", "edge"),
role = c("workernode", "headnode", "edgenode"),
size = 2, mode = c("Sync", "Async"), subscriptionID,
resourceGroup, verbose = FALSE) {
@ -267,7 +267,7 @@ azureResizeHDI <- function(azureActiveContext, clustername,
assert_that(is_resource_group(resourceGroup))
assert_that(is_clustername(clustername))
assert_that(is.integer(size))
assert_that(is.integer(as.integer(size)))
role <- match.arg(role)
mode <- match.arg(mode)

Просмотреть файл

@ -30,14 +30,18 @@ azureHiveStatus <- function(azureActiveContext, clustername, hdiAdmin,
if (!length(HP)) {
stop("Error: No Valid hdiPassword provided")
}
verbosity <- set_verbosity(verbose)
azureActiveContext$hdiAdmin <- HA
azureActiveContext$hdiPassword <- HP
azureActiveContext$clustername <- CN
cat(HA)
cat(HP)
uri <- paste0("https://", CN, ".azurehdinsight.net/templeton/v1/status")
r <- GET(URL, add_headers(.headers = c(`Content-type` = "application/json")),
cat(uri)
r <- GET(uri, add_headers(.headers = c(`Content-type` = "application/json")),
authenticate(HA, HP), verbosity)
if (status_code(r) != 200 && status_code(r) != 201) {
stop(paste0("Error: Return code(", status_code(r), ")"))
@ -130,9 +134,9 @@ azureHiveSQL <- function(azureActiveContext, CMD, clustername, hdiAdmin,
if (DUR < 5)
DUR <- DUR + 1
if (df$status$state == "PREP")
message("P")
message("P",appendLF = FALSE)
if (df$status$state == "RUNNING")
message("R")
message("R",appendLF = FALSE)
# print(df$status$state)
r <- GET(URL, add_headers(.headers = c(`Content-type` = "application/json")),
@ -142,9 +146,9 @@ azureHiveSQL <- function(azureActiveContext, CMD, clustername, hdiAdmin,
df <- fromJSON(rl)
}
if (df$status$state == "SUCCEEDED")
message("S")
message("S",appendLF = FALSE)
if (df$status$state == "FAILED")
message("F")
message("F",appendLF = FALSE)
STATE <- df$status$state
message("Finished Running statement: ", Sys.time())

Просмотреть файл

@ -276,13 +276,17 @@ azureSparkCMD <- function(azureActiveContext, CMD, clustername, hdiAdmin,
sep = "")
# print(URL)
message(paste("CMD Running: ", Sys.time()))
message("Running(R), Completed(C)")
message("Running(R) Waiting(W) Completed(C)")
while (df$state == "running") {
while (df$state == "running" || df$state == "waiting") {
Sys.sleep(DUR)
if (DUR < 5)
DUR <- DUR + 1
message("R")
if (df$state == "running")
message("R",appendLF = FALSE)
if (df$state == "waiting")
message("W",appendLF = FALSE)
r <- GET(URL, add_headers(.headers = c(`Content-type` = "application/json")),
authenticate(HA, HP))
rl <- content(r, "text", encoding = "UTF-8")
@ -290,7 +294,7 @@ azureSparkCMD <- function(azureActiveContext, CMD, clustername, hdiAdmin,
df <- fromJSON(rl)
}
message("C")
message("C",appendLF = FALSE)
message("Finished Running statement: ", Sys.time())
RET <- df$output$data[1]
# rownames(RET) <- 'Return Value'

Просмотреть файл

@ -8,8 +8,8 @@
To get started with this package, see the vignettes:
* [Tutorial](http://htmlpreview.github.io/?https://github.com/Microsoft/AzureSMR/blob/master/vignettes/tutorial.html)
* [Getting Authenticated](http://htmlpreview.github.io/?https://github.com/Microsoft/AzureSMR/blob/master/vignettes/Authentication.html)
* [Tutorial](http://htmlpreview.github.io/?https://github.com/Microsoft/AzureSMR/blob/master/inst/doc/tutorial.html)
* [Getting Authenticated](http://htmlpreview.github.io/?https://github.com/Microsoft/AzureSMR/blob/master/inst/doc/Authentication.html)
To access the package help, just type `?AzureSMR` into your code editor.

Просмотреть файл

@ -0,0 +1,5 @@
\dontrun{
library(AzureSMR)
azureDeleteHDI(asc, clustername = "azuresmrclustername")
}

Просмотреть файл

@ -1,4 +0,0 @@
## ---- eval = FALSE-------------------------------------------------------
# sc <- createAzureContext(tenantID = "{TID}", clientID = "{CID}", authKey= "{KEY}")
# rgs <- azureListRG(sc)
# rgs

Просмотреть файл

@ -47,7 +47,7 @@ To apply access control azToken Resource Group
16. Identify the resource group you will associate with this application.
17. Choose the Users menu item from the Resource scope.
17. Choose the Access Control(IAM) menu item from the Resource scope.
18. In the resulting scope click the `+ Add` button.
@ -62,7 +62,7 @@ Alternatively you can access control azToken Subscription Level
16. Identify the Subscription you will associate with this application.
17. Choose the Users(access) menu item.
17. Choose the Access Control(IAM) menu item.
18. In the resulting scope click the + Add button.
@ -71,7 +71,6 @@ Alternatively you can access control azToken Subscription Level
20. Select the resulting list item for that App then click Select in that scope then OK in the "Add access" scope. The user will be added to the list.
That is all. You can test this by trying:
```{r, eval = FALSE}

Различия файлов скрыты, потому что одна или несколько строк слишком длинны

Просмотреть файл

@ -1,140 +0,0 @@
## ---- eval=FALSE---------------------------------------------------------
# # Install devtools
# if(!require("devtools")) install.packages("devtools")
# devtools::install_github("Microsoft/AzureSMR")
# library(AzureSMR)
## ---- eval=FALSE---------------------------------------------------------
# sc <- createAzureContext(tenantID = "{TID}", clientID = "{CID}", authKey= "{KEY}")
# sc
## ---- eval=FALSE---------------------------------------------------------
# azureListSubscriptions(sc)
#
## ---- eval=FALSE---------------------------------------------------------
# # list resource groups
# azureListRG(sc)
#
# # list all resources
# azureListAllResources(sc)
#
# azureListAllResources(sc, location = "northeurope")
#
# azureListAllResources(sc, type = "Microsoft.Sql/servers", location = "northeurope")
#
# azureListAllResources(sc, resourceGroup = "Analytics")
#
# azureCreateResourceGroup(sc, resourceGroup = "testme", location = "northeurope")
#
# azureDeleteResourceGroup(sc, resourceGroup = "testme")
#
# azureListRG(sc)$name
#
## ---- eval=FALSE---------------------------------------------------------
# azureListVM(sc, resourceGroup = "AWHDIRG")
#
# ## Name Location Type OS State Admin
# ## 1 DSVM1 northeurope Microsoft.Compute/virtualMachines Linux Succeeded alanwe
#
# azureStartVM(sc, vmName = "DSVM1")
# azureStopVM(sc, vmName = "DSVM1")
## ---- eval=FALSE---------------------------------------------------------
# sKey <- AzureSAGetKey(sc, resourceGroup = "Analytics", storageAccount = "analyticsfiles")
## ---- eval=FALSE---------------------------------------------------------
# azListContainers(sc, storageAccount = "analyticsfiles", containers = "Test")
## ---- eval=FALSE---------------------------------------------------------
# azureListStorageBlobs(sc, storageAccount = "analyticsfiles", container = "test")
## ---- eval=FALSE---------------------------------------------------------
# azurePutBlob(sc, StorageAccount = "analyticsfiles", container = "test",
# contents = "Hello World",
# blob = "HELLO")
## ---- eval=FALSE---------------------------------------------------------
# azureGetBlob(sc, storageAccount = "analyticsfiles", container = "test",
# blob="HELLO",
# type="text")
## ---- eval=FALSE---------------------------------------------------------
# azureListHDI(sc)
# azureListHDI(sc, resourceGroup ="Analytics")
#
## ---- eval=FALSE---------------------------------------------------------
# azureResizeHDI(sc, resourceGroup = "Analytics", clusterName = "{HDIClusterName}",
# Role="workernode",Size=2)
#
# ## AzureResizeHDI: Request Submitted: 2016-06-23 18:50:57
# ## Resizing(R), Succeeded(S)
# ## RRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRR
# ## RRRRRRRRRRRRRRRRRRS
# ## Finished Resizing Sucessfully: 2016-06-23 19:04:43
# ## Finished: 2016-06-23 19:04:43
# ## ## Information
# ## " headnode ( 2 * Standard_D3_v2 ) workernode ( 5 * Standard_D3_v2 ) zookeepernode ( 3 * Medium ) edgenode0 ( 1 * Standard_D4_v2 )"
## ---- eval=FALSE---------------------------------------------------------
# azureDeployTemplate(sc, resourceGroup = "Analytics", deplName = "Deploy1",
# templateURL = "{TEMPLATEURL}", paramURL = "{PARAMURL}")
#
# ## AzureDeployTemplate: Request Submitted: 2016-06-23 18:50:57
# ## Resizing(R), Succeeded(S)
# ## RRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRR
# ## RRRRRRRRRRRRRRRRRRS
# ## Finished Deployed Sucessfully: 2016-06-23 19:04:43
# ## Finished: 2016-06-23 19:04:43
## ---- eval=FALSE---------------------------------------------------------
# azureHiveStatus(sc, clusterName = "{hdicluster}",
# hdiAdmin = "admin",
# hdiPassword = "********")
# AzureHiveSQL(sc,
# CMD = "select * from airports",
# Path = "wasb://{container}@{hdicluster}.blob.core.windows.net/")
#
# stdout <- AzureGetBlob(sc, Container = "test", Blob = "stdout")
#
# read.delim(text=stdout, header=TRUE, fill=TRUE)
#
## ---- eval=FALSE---------------------------------------------------------
# azureSparkNewSession(sc, clusterName = "{hdicluster}",
# hdiAdmin = "admin",
# hdiPassword = "********",
# kind = "pyspark")
## ---- eval=FALSE---------------------------------------------------------
# azureSparkListSessions(sc, clusterName = "{hdicluster}")
## ---- eval=FALSE---------------------------------------------------------
# # SAMPLE PYSPARK SCRIPT TO CALCULATE PI
# pythonCmd <- '
# from pyspark import SparkContext
# from operator import add
# import sys
# from random import random
# partitions = 1
# n = 20000000 * partitions
# def f(_):
# x = random() * 2 - 1
# y = random() * 2 - 1
# return 1 if x ** 2 + y ** 2 < 1 else 0
#
# count = sc.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
# Pi = (4.0 * count / n)
# print("Pi is roughly %f" % Pi)'
#
# azureSparkCMD(sc, cmd = pythonCmd, sessionID = "5")
#
# ## [1] "Pi is roughly 3.140285"
## ---- eval=FALSE---------------------------------------------------------
# azureSparkCMD(sc, clusterName = "{hdicluster}", cmd = "print Pi", sessionID="5")
#
# #[1] "3.1422"

Просмотреть файл

@ -53,8 +53,9 @@ sc <- createAzureContext(tenantID = "{TID}", clientID = "{CID}", authKey= "{KEY}
sc
```
To get an authorisation token use `azureAuthenticate()`. Note this token will time our after a period and therefore you need to run it again occasionally. TIP: Use AzureAuthenticate before a long running task.
If you provide autentication paramters to createAzureContext() the function will automatically authenticate.
To manually get an authorisation token use `azureAuthenticate()`.
Note this token will time our after a period and therefore you need to run it again occasionally. TIP: Use AzureAuthenticate before a long running task.
The `azureListSubscriptions()` function lists all the available subscriptions. If you only have one it sets the default Subscription in the `azureActiveContext` to that subscription ID.
@ -76,28 +77,31 @@ azureListAllResources(sc, location = "northeurope")
azureListAllResources(sc, type = "Microsoft.Sql/servers", location = "northeurope")
azureListAllResources(sc, resourceGroup = "Analytics")
azureCreateResourceGroup(sc, resourceGroup = "testme", location = "northeurope")
azureDeleteResourceGroup(sc, resourceGroup = "testme")
azureCreateStorageAccount(sc,storageAccount="testmystorage1",resourceGroup = "testme")
azureListRG(sc)$name
azureListAllResources(sc, resourceGroup = "testme")
# When finished, to delete a Resource Group use azureDeleteResourceGroup
azureDeleteResourceGroup(sc, resourceGroup = "testme")
```
## Manage Virtual Machines
Use these functions to list, start and stop Virtual Machines (see templates for Creation).
Use these functions to list, start and stop existing Virtual Machines (see templates for Creation).
To Create VMs please refer to Resource Templates below.
```{r, eval=FALSE}
azureListVM(sc, resourceGroup = "AWHDIRG")
## List VMs in a ResourceGroup
azureListVM(sc, resourceGroup = "testme")
## Name Location Type OS State Admin
## 1 DSVM1 northeurope Microsoft.Compute/virtualMachines Linux Succeeded alanwe
## 1 DSVM1 northeurope Microsoft.Compute/virtualMachines Linux Succeeded
azureStartVM(sc, vmName = "DSVM1")
azureStopVM(sc, vmName = "DSVM1")
@ -109,34 +113,41 @@ In order to access Storage Blobs you need to have a key. Use `azureSAGetKey()` t
```{r, eval=FALSE}
sKey <- azureSAGetKey(sc, resourceGroup = "Analytics", storageAccount = "analyticsfiles")
sKey <- azureSAGetKey(sc, resourceGroup = "testme", storageAccount = "testmystorage1")
```
To create containers in a storage account use`azureCreateStorageContainer()`
```{r, eval=FALSE}
azureCreateStorageContainer(sc,"opendata",storageAccount = "testmystorage1", resourceGroup = "testme")
```
To list containers in a storage account use `azureListContainers()`
```{r, eval=FALSE}
azureListContainers(sc, storageAccount = "analyticsfiles", containers = "Test")
```
To list blobs in a container use `azureListStorageBlobs()`
```{r, eval=FALSE}
azureListStorageBlobs(sc, storageAccount = "analyticsfiles", container = "test")
azureListStorageContainers(sc, storageAccount = "testmystorage1", resourceGroup = "testme")
```
To Write a Blobs use `azurePutBlob()`
```{r, eval=FALSE}
azurePutBlob(sc, StorageAccount = "analyticsfiles", container = "test",
azurePutBlob(sc, storageAccount = "testmystorage1", container = "opendata",
contents = "Hello World",
blob = "HELLO")
```
To list blobs in a container use `azureListStorageBlobs()`
```{r, eval=FALSE}
azureListStorageBlobs(sc, storageAccount = "testmystorage1", container = "opendata")
```
To read a blob in a container use `azureGetBlob()`
```{r, eval=FALSE}
azureGetBlob(sc, storageAccount = "analyticsfiles", container = "test",
azureGetBlob(sc, storageAccount = "testmystorage1", container = "opendata",
blob="HELLO",
type="text")
```
@ -144,25 +155,35 @@ azureGetBlob(sc, storageAccount = "analyticsfiles", container = "test",
## Manage HDInsight Clusters
You can use `AzureSMR` to manage Azure HDInsight clusters. To create clusters use Resource Templates (See below).
You can use `AzureSMR` to manage Azure HDInsight clusters. To create clusters use azureCreateHDI or for advanced configurations use Resource Templates (See below).
Also see functions for submitting Hive and Spark jobs.
```{r, eval=FALSE}
azureCreateHDI(sc,
resourceGroup = "testme",
clustername = "smrhdi", # only low case letters, digit, and dash.
storageAccount = "testmystorage1",
adminUser = "hdiadmin",
adminPassword = "AzureSMR_password123",
sshUser = "hdisshuser",
sshPassword = "AzureSMR_password123",
kind = "rserver")
```
Use `azureListHDI()` to list available Clusters.
```{r, eval=FALSE}
azureListHDI(sc)
azureListHDI(sc, resourceGroup ="Analytics")
azureListHDI(sc, resourceGroup ="testme")
```
Use `azureResizeHDI()` to resize a cluster
```{r, eval=FALSE}
azureResizeHDI(sc, resourceGroup = "Analytics", clusterName = "{HDIClusterName}",
Role="workernode",Size=2)
azureResizeHDI(sc, resourceGroup = "testme", clustername = "smrhdi", role="workernode",size=3)
## AzureResizeHDI: Request Submitted: 2016-06-23 18:50:57
## azureResizeHDI: Request Submitted: 2016-06-23 18:50:57
## Resizing(R), Succeeded(S)
## RRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRR
## RRRRRRRRRRRRRRRRRRS
@ -182,7 +203,7 @@ To create a resource using a template in AzureSM use AzureDeployTemplate. The Te
azureDeployTemplate(sc, resourceGroup = "Analytics", deplName = "Deploy1",
templateURL = "{TEMPLATEURL}", paramURL = "{PARAMURL}")
## AzureDeployTemplate: Request Submitted: 2016-06-23 18:50:57
## azureDeployTemplate: Request Submitted: 2016-06-23 18:50:57
## Resizing(R), Succeeded(S)
## RRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRR
## RRRRRRRRRRRRRRRRRRS
@ -196,12 +217,14 @@ ADMIN TIP: If a deployment fails. Go to the Azure Portal and look azToken Actvit
These functions facilitate the use of hive jobs on a HDInsight Cluster
```{r, eval=FALSE}
azureHiveStatus(sc, clusterName = "{hdicluster}",
hdiAdmin = "admin",
hdiPassword = "********")
azureHiveStatus(sc, clusterName = "smrhdi",
hdiAdmin = "hdiadmin",
hdiPassword = "AzureSMR_password123")
azureHiveSQL(sc,
CMD = "select * from airports",
Path = "wasb://{container}@{hdicluster}.blob.core.windows.net/")
CMD = "select * from hivesampletable",
path = "wasb://opendata@testmystorage1.blob.core.windows.net/")
azureListStorageBlobs(sc, storageAccount = "testmystorage1", container = "opendata")
stdout <- azureGetBlob(sc, Container = "test", Blob = "stdout")
@ -217,19 +240,20 @@ read.delim(text=stdout, header=TRUE, fill=TRUE)
To Create a new Spark Session (Via Livy) use `azureSparkNewSession()`
```{r, eval=FALSE}
azureSparkNewSession(sc, clusterName = "{hdicluster}",
hdiAdmin = "admin",
hdiPassword = "********",
azureSparkNewSession(sc, clustername = "smrhdi",
hdiAdmin = "hdiadmin",
hdiPassword = "AzureSMR_password123",
kind = "pyspark")
```
To view the status of sessions use AzureSparkListSessions
Wait for status to be Idle
```{r, eval=FALSE}
azureSparkListSessions(sc, clusterName = "{hdicluster}")
azureSparkListSessions(sc, clustername = "smrhdi")
```
To send a command to the Spark Session use `azureSparkCMD()`. In this case it submits a Python routine
To send a command to the Spark Session use `azureSparkCMD()`. In this case it submits a Python routine. Ensure you preserve indents for Python.
```{r, eval=FALSE}
# SAMPLE PYSPARK SCRIPT TO CALCULATE PI
@ -249,7 +273,7 @@ count = sc.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
Pi = (4.0 * count / n)
print("Pi is roughly %f" % Pi)'
azureSparkCMD(sc, cmd = pythonCmd, sessionID = "5")
azureSparkCMD(sc, CMD = pythonCmd, sessionID = "0")
## [1] "Pi is roughly 3.140285"
```
@ -257,8 +281,20 @@ azureSparkCMD(sc, cmd = pythonCmd, sessionID = "5")
Check Session variables are retained
```{r, eval=FALSE}
azureSparkCMD(sc, clusterName = "{hdicluster}", cmd = "print Pi", sessionID="5")
azureSparkCMD(sc, clustername = "smrhdi", CMD = "print Pi", sessionID="0")
#[1] "3.1422"
```
You can also run SparkR sessions
```{r, eval=FALSE}
azureSparkNewSession(sc, clustername = "smrhdi",
hdiAdmin = "hdiadmin",
hdiPassword = "AzureSMR_password123",
kind = "sparkr")
azureSparkCMD(sc, clustername = "smrhdi", CMD = "HW<-'hello R'", sessionID="2")
azureSparkCMD(sc, clustername = "smrhdi", CMD = "cat(HW)", sessionID="2")
```

Различия файлов скрыты, потому что одна или несколько строк слишком длинны