initial commit of moving all content here

This commit is contained in:
Qinyuan Wan 2020-02-13 14:57:42 -08:00
Родитель e7c88229f8
Коммит 3e528207aa
21 изменённых файлов: 2547 добавлений и 2 удалений

17
.gitignore поставляемый Normal file
Просмотреть файл

@ -0,0 +1,17 @@
# ignore the cpp build dir
cpp/build/
# ignore dotnet build dirs
dotnet/bin/
dotnet/obj/
# ignore java output dirs
java/target/
# ignore scala output dirs
scala/project/
scala/target/
# ignore jar output files
*.jar

182
README.md
Просмотреть файл

@ -1,2 +1,180 @@
# asa-in-spark
This repository holds the library of running azure streaming analytics job query in spark # Table of contents
<!--ts-->
* [Introduction](#introduction)
* [How it works?](#how-it-works)
* [Quick Start](#quick-start)
* [How to generate the jar?](#how-to-generate-the-jar)
* [Supported ASA Queries](#supported-asa-queries)
* [Dependencies](#dependencies)
* [Troubleshooting](#troubleshooting)
* [Few helpful docker commands](#few-helpful-docker-commands)
* [Example](#example)
<!--te-->
## Introduction
ASASpark is a POC to make ASA query engine work with Apache Spark. Queries are written in ASA Query Language.
## How it works?
A Spark SQL Job running on a Spark Cluster in Databricks uses ASASpark-x.x.x.jar as a library to interpret query written in ASA query language and run it on Spark dataset.
Before you write your own query do check out the [Supported ASA Queries](#supported-asa-queries) section.
## Quick Start
1. Get the ASA-x.x.x.jar by following any one of the following steps:
a. Using source code: Follow steps mentioned in [How to generate the jar?](#how-to-generate-the-jar) section (Only for Microsoft users)
b. Download prebuilt jar from [here](Todo: add link)
2. Configure Azure Databricks and Spark Cluster:
a. [Create Azure Databricks workspace](https://docs.microsoft.com/en-us/azure/azure-databricks/quickstart-create-databricks-workspace-portal#create-an-azure-databricks-workspace)
b. [Create a Spark cluster in Databricks](https://docs.microsoft.com/en-us/azure/azure-databricks/quickstart-create-databricks-workspace-portal#create-a-spark-cluster-in-databricks)
c. Once the cluster is deployed, upload the ASA-x.x.x.jar as a library:
- Click on `<cluster_name> -> Libraries -> Install New` to open a modal/pop up.
- Select `Library Source` as `Upload`, `Library Type` as `Jar` and click on `Drop JAR here` to select the jar from your local filesystem.
- Once uploaded, click on the button `Install` to close the modal/pop up and verify that the `status` is `Installed`.
3. Run example using [Example](#example) Section
## How to generate the jar?
1. Clone the repo
```
$ git clone https://msdata.visualstudio.com/DefaultCollection/Azure%20Stream%20Analytics/_git/ASASpark
```
2. Install Docker
a. [For Linux (Ubuntu) on Windows installed from Microsoft Store](https://medium.com/@sebagomez/installing-the-docker-client-on-ubuntus-windows-subsystem-for-linux-612b392a44c4)
b. [For others use the official documentation](https://docs.docker.com/install/)
3. At the root of the repository build docker image which will contain the required jar
```
$ docker build -t asa-spark:3.1-bionic .
Note: Open URL in a browser and enter the code when prompted for auth by Azure Artifact Credential Provider
```
4. Start a docker container from the recently built docker image
```
$ docker run --name asa-spark -d -it asa-spark:3.1-bionic
```
5. Copy the required jar from the container to the local filesystem
```
$ docker cp asa-spark:/repos/ASASpark/java/target/ASA-0.0.1.jar .
```
6. Cleanup: stop the running container, remove it and delete the image
```
$ docker stop asa-spark
$ docker rm asa-spark
$ docker rmi asa-spark:3.1-bionic
```
## Supported ASA Queries
All other queries are supported except:
1. Nested data types: Array and Record are not yet supported out of the [the complete list of Data Types](https://docs.microsoft.com/en-us/stream-analytics-query/data-types-azure-stream-analytics) supported in ASA. Unsupported data type exception will be thrown on encountering any input or output of these data types.
2. Join Query (multiple inputs) is not yet supported.
## Dependencies
Docker is the only dependency for the developer. Docker takes care of all other dependencies listed below:
1. g++
2. Java 8
a. cmake uses `create_javah` which is deprecated in Java 9 onwards
b. Debian 10 discontinued Java 8 support on official repository while Bionic did not. Bionic is being used as docker base OS.
3. [cmake](https://cmake.org/)
4. [maven](https://maven.apache.org/)
5. [dotnet](https://docs.microsoft.com/en-us/dotnet/core/install/linux-package-manager-ubuntu-1804)
6. [Azure Artifact Credential Provider](https://github.com/microsoft/artifacts-credprovider) to exchange credentials to download private artifacts
7. [sbt](https://www.scala-sbt.org/)
## Troubleshooting
1. Build docker image in debug mode. This will configure the environment for development activity and not attempt to create the jar
```
$ docker build --build-arg mode=debug -t asa-spark-debug:3.1-bionic .
```
2. Start the container from the debug image and connect to it to open a bash shell
```
docker run --name asa-spark-debug -it asa-spark-debug:3.1-bionic /bin/bash
```
3. Now, you are inside the docker container at the repository root location. Develop/debug/troubleshoot and when you are ready, run `./build.sh` to generate a new jar to verify.
4. Follow steps in [Quick Start](#quick-start) to run this jar on Azure Databricks.
## Few helpful docker commands
> ```bash
># Build docker image using dockerfile present at the current location and tag it as asa-spark:3.1-bionic
>$ docker build -t asa-spark:3.1-bionic .
>
># List docker images
>$ docker images
>
># Start a new container(asa-spark) using an existing docker image(asa-spark:3.1-bionic) and detach
>$ docker run --name asa-spark -d -it asa-spark:3.1-bionic
>
># Copy a file from a running container named asa-spark to the current directory on localsystem
>$ docker cp asa-spark:/repos/ASASpark/java/target/ASA-0.0.1.jar .
>
># Connect to already running container named asa-spark and open a bash shell
>$ docker exec -it asa-spark bash
>
># Stop a running container named asa-spark
>$ docker stop asa-spark
>
># Remove a container named asa-spark
>$ docker rm asa-spark
>
># Delete docker image with tag asa-spark:3.1-bionic
>$ docker rmi asa-spark:3.1-bionic
>```
## Example
Notebook Code demonstrating how to call ASASpark-x.x.x.jar APIs to run a query. The code uses a csv file containing a dataset after uploading it to filestore.
1. Upload dataset
a. Go to Azure Databricks, click on `Data -> Databases -> default -> Add Data`.
b. Select `Data source` as `Upload Files`, click on `Drop files to upload` and select the `scala/test/resources/dataset1.csv` file from this repo. Wait for the upload to complete.
2. Create a notebook
a. Create a new Notebook by clicking on `Azure Databricks -> New Notebook`
b. Give `Name` as `ASASparkExample`, select `Language` as `Scala` and `Cluster` as created in step 2 (ii)
3. Add the following scala code to the above created notebook and click `Run all`
```scala
import org.scalatest._
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import scala.collection.Map
import com.microsoft.AzureStreamAnalytics
var spark = SparkSession.builder()
.master("local[2]") // 2 ... number of threads
.appName("ASA")
.config("spark.sql.shuffle.partitions", value = 1)
.config("spark.ui.enabled", value = false)
.config("spark.sql.crossJoin.enabled", value = true)
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
val df = spark.sqlContext
.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.option("timestampFormat", "MM-dd-yyyy hh mm ss")
.load("/FileStore/tables/dataset1.csv")
.repartition(col("category"))
.sortWithinPartitions(col("ts"))
.select(col("ts").cast(LongType), col("category"), col("value"))
val newDf = AzureStreamAnalytics.execute(
"SELECT category, System.Timestamp AS ts, COUNT(*) AS n FROM input TIMESTAMP BY DATEADD(second, ts, '2019-11-20T00:00:00Z') GROUP BY TumblingWindow(second, 2), category",
df)
newDf.show
```

18
azure-pipelines.yml Normal file
Просмотреть файл

@ -0,0 +1,18 @@
jobs:
- template: build.yml
parameters:
name: macOS
pool:
vmImage: 'macOS-10.13'
- template: build.yml
parameters:
name: Linux
pool:
vmImage: 'Ubuntu-16.04'
# - template: build.yml
# parameters:
# name: Windows
# pool:
# vmImage: 'vs2017-win2016'

54
build.sh Normal file
Просмотреть файл

@ -0,0 +1,54 @@
#!/bin/bash
if [ "$1" == "debug" ]; then
echo "[INFO] Not generating ASA-x.x.x.jar as debug mode is enabled and user wants to troubleshoot the build process"
exit 0
fi
echo "Cleaning existing build files..."
rm -rf cpp/build dotnet/bin dotnet/obj java/target || true
echo "Older build deleted"
echo "Downloading maven dependencies..."
cd java
mvn compile
cd ..
echo "Creating cpp build..."
mkdir -p cpp/build
cd cpp/build
cmake ..
cmake --build .
cd ../..
echo "cpp build created successfully"
echo """
====================================================================================================================================
Make sure azure credentials were provided successfully while building the docker image.
If not then stop this build.sh script and run
$ cd dotnet && dotnet restore --interactive
Open the URL on a browser and enter the code when prompted to authenticate
After the authentication, rerun the build.sh
For more information:
Azure Credential provider was installed while building the docker image to interactively acquire credentials for Azure Artifacts
https://github.com/microsoft/artifacts-credprovider
====================================================================================================================================
"""
echo "Creating dotnet build ..."
# This is where it hits case sensitive dir name errors. Wierd, but it works if triggerred second time
cd dotnet && (dotnet build || dotnet build) && cd ..
echo "Creating ASA-x.x.x.jar ..."
cd java
mvn package
# At this point jar should be created
ls -lh target/*.jar
echo "Jar created successfully"
cd ..
echo "Running sbt test..."
# Run test
cd scala && sbt test

391
cpp/ASANative.cpp Normal file
Просмотреть файл

@ -0,0 +1,391 @@
// #include "CoreCLRHost.hpp"
#include "utils.hpp"
#include "build/com_microsoft_ASANative.h"
#include <stdexcept>
#include <sstream>
#include <wchar.h>
#include <iostream>
#include <stdio.h>
#ifdef _WIN32
#include <windows.h>
#else
#include <dlfcn.h>
#endif
#if defined(__APPLE__)
std::string coreClrDll = "libcoreclr.dylib";
#else
std::string coreClrDll = "libcoreclr.so";
#endif
// TODO: remove me
#if not defined PATH_MAX
#include <stdio.h>
#define PATH_MAX FILENAME_MAX
#endif
// some JNI helper
class StringGuard {
JNIEnv* _env;
jstring _source;
const char* _cstr;
public:
StringGuard(JNIEnv *env, jstring source) : _env(env), _source(source), _cstr(nullptr) {
_cstr = _env->GetStringUTFChars(source, 0);
}
~StringGuard() {
if (_cstr) {
_env->ReleaseStringUTFChars(_source, _cstr);
_env->DeleteLocalRef(_source);
}
}
const char* c_str() { return _cstr; }
};
class StringUniGuard {
JNIEnv* _env;
jstring _source;
const jchar* _cstr;
jsize _length;
public:
StringUniGuard(JNIEnv *env, jstring source) : _env(env), _source(source), _cstr(nullptr) {
_cstr = _env->GetStringChars(source, nullptr);
_length = _env->GetStringLength(source);
}
~StringUniGuard() {
if (_cstr) {
_env->ReleaseStringChars(_source, _cstr);
_env->DeleteLocalRef(_source);
}
}
jsize length() { return _length; }
const jchar* c_str() { return _cstr; }
};
typedef void* (csharp_createASAHost_t)(const char16_t* sqlPtr, int sqlLen);
typedef void (csharp_deleteASAHost_t)(void* ptr);
typedef void (csharp_getOutputSchema_t)(void* ptr, const char16_t* sqlPtr, int sqlLen);
typedef void (csharp_registerArrowMemory_t)(void* ptr, jbyte* outputBuffer, int outputBufferLength, jbyte* inputBuffer, int inputBufferLength);
typedef void (csharp_runDotnetSpark_t)(const char16_t* localCsvNamePtr, int localCsvNameLen);
typedef long (csharp_pushRecord_t)(void* ptr);
typedef int (csharp_pushComplete_t)(void* ptr);
typedef void (csharp_nextOutputRecord_t)(void* ptr);
typedef void (csharp_stringFree_t)(const char16_t*);
class ASAHostException : public std::runtime_error {
public:
ASAHostException(const char* what) : std::runtime_error(what) {
}
};
class ASAHost;
class CLRHost {
void* hostHandle;
unsigned int domainId;
coreclrShutdownFunction* coreclr_shutdown_f;
coreclrCreateDelegateFunction* coreclr_create_delegate_f;
void* libraryPtr;
void* resolveFunctionPtr(const char* name) {
// std::cout << "resolveFunctionPtr " << name << std::endl;
void* f =
#ifdef _WIN32
GetProcAddress((HINSTANCE)libraryPtr , name);
#else
dlsym(libraryPtr, name);
#endif
if (!f) {
std::stringstream msg;
msg << "resolveFunctionPtr was unable to resolve " << name;
throw ASAHostException(msg.str().c_str());
}
return f;
}
public:
CLRHost() : hostHandle(NULL), domainId(0), libraryPtr(nullptr) {
}
void freeCLR() {
int status = coreclr_shutdown_f(hostHandle, domainId);
if(status < 0) {
std::stringstream msg;
msg << "coreclr_shutdown status: 0x" << std::hex << status;
throw std::runtime_error(msg.str());
}
}
~CLRHost() {
if (libraryPtr) {
#ifdef _WIN32
FreeLibrary((HINSTANCE)libraryPtr);
#else
dlclose(libraryPtr);
#endif
}
}
void initialize(std::string tempDir) {
std::string currentExeAbsolutePath = tempDir;
std::string clrFilesAbsolutePath = tempDir + "/clr";
std::string managedAssemblyAbsoluteDir = tempDir + "/binding";
std::string coreClrDllPath = clrFilesAbsolutePath + "/" + coreClrDll;
if(coreClrDllPath.size() >= PATH_MAX)
throw std::invalid_argument("Path to libcoreclr.so too long!");
// std::cout << "native.load library: " << coreClrDllPath << std::endl;
// get library handle
libraryPtr =
#ifdef _WIN32
LoadLibrary(coreClrDllPath.c_str() );
#else
dlopen(coreClrDllPath.c_str(), RTLD_NOW | RTLD_LOCAL );
#endif
if (!libraryPtr) {
std::stringstream msg;
msg << "Cannot find " << coreClrDll << "Path that was searched: "
<< coreClrDllPath;
throw ASAHostException(msg.str().c_str());
}
// std::cout << "native.load library succcess"<< std::endl;
std::string nativeDllSearchDirs = managedAssemblyAbsoluteDir + ":" + clrFilesAbsolutePath;
std::string tpaList;
AddFilesFromDirectoryToTpaList(clrFilesAbsolutePath, tpaList);
// std::cout << "TRUSTED_PLATFORM_ASSEMBLIES " << tpaList << std::endl;
// std::cout << "APP_PATHS: " << managedAssemblyAbsoluteDir << std::endl;
auto coreclr_initialize = (coreclrInitializeFunction*)resolveFunctionPtr("coreclr_initialize");
coreclr_shutdown_f = (coreclrShutdownFunction*)resolveFunctionPtr("coreclr_shutdown");
coreclr_create_delegate_f = (coreclrCreateDelegateFunction*)resolveFunctionPtr("coreclr_create_delegate");
const char *propertyKeys[] = {
"TRUSTED_PLATFORM_ASSEMBLIES",
"APP_PATHS",
"APP_NI_PATHS",
"NATIVE_DLL_SEARCH_DIRECTORIES",
"AppDomainCompatSwitch"
};
const char *propertyValues[] = {
tpaList.c_str(),
managedAssemblyAbsoluteDir.c_str(),
managedAssemblyAbsoluteDir.c_str(),
nativeDllSearchDirs.c_str(),
"UseLatestBehaviorWhenTFMNotSpecified"
};
// initialize coreclr
int status = coreclr_initialize(
currentExeAbsolutePath.c_str(),
"simpleCoreCLRHost",
sizeof(propertyKeys) / sizeof(propertyKeys[0]),
propertyKeys,
propertyValues,
&hostHandle,
&domainId
);
if (status < 0) {
std::stringstream msg;
msg << "coreclr_initialize status: 0x" << std::hex << status;
throw std::runtime_error(msg.str());
}
}
template<typename T>
T* getCSharpEntryPoint(const char* entrypointtype, const char* entrypointname) {
T* csharp_ptr;
// create delegate to our entry point
int status = coreclr_create_delegate_f(
hostHandle,
domainId,
"Managed", // Assembly spec
entrypointtype, // class name
entrypointname, // method name
reinterpret_cast<void**>(&csharp_ptr)
);
if (status < 0) {
std::stringstream msg;
msg << "getCSharpEntryPoint failed. status: 0x" << std::hex << status;
throw std::runtime_error(msg.str());
}
return csharp_ptr;
}
friend class ASAHost;
};
// main dispatcher class
class ASAHost {
CLRHost* _clrHost;
csharp_createASAHost_t* createASAHost;
csharp_deleteASAHost_t* deleteASAHost;
csharp_pushRecord_t* pushRecord_f;
csharp_pushComplete_t* pushComplete_f;
csharp_nextOutputRecord_t* nextOutputRecord_f;
csharp_stringFree_t* stringFree_f;
csharp_getOutputSchema_t* getOutputSchema_f;
csharp_registerArrowMemory_t* registerArrowMemory_f;
void* csharp_ASAHost;
public:
ASAHost(CLRHost* clrHost) : _clrHost(clrHost), createASAHost(nullptr), deleteASAHost(nullptr) {
}
void initializeCLR(std::string tempDir,
const jchar* sql, jsize sqlLen) {
// boilerplate code from: https://docs.microsoft.com/en-us/dotnet/core/tutorials/netcore-hosting
createASAHost = _clrHost->getCSharpEntryPoint<csharp_createASAHost_t>("ASAHost", "createASAHost");
deleteASAHost = _clrHost->getCSharpEntryPoint<csharp_deleteASAHost_t>("ASAHost", "deleteASAHost");
// forward get/set
stringFree_f = _clrHost->getCSharpEntryPoint<csharp_stringFree_t>("ASAHost", "stringFree");
pushComplete_f = _clrHost->getCSharpEntryPoint<csharp_pushComplete_t>("ASAHost", "pushComplete");
pushRecord_f = _clrHost->getCSharpEntryPoint<csharp_pushRecord_t>("ASAHost", "pushRecord");
nextOutputRecord_f = _clrHost->getCSharpEntryPoint<csharp_nextOutputRecord_t>("ASAHost", "nextOutputRecord");
getOutputSchema_f = _clrHost->getCSharpEntryPoint<csharp_getOutputSchema_t>("ASAHost", "getOutputSchema");
registerArrowMemory_f = _clrHost->getCSharpEntryPoint<csharp_registerArrowMemory_t>("ASAHost", "registerArrowMemory");
csharp_ASAHost = (*createASAHost)((char16_t*)sql, sqlLen);
}
void freeCLR() {
}
int pushRecord() { return (*pushRecord_f)(csharp_ASAHost); }
int pushComplete() { return (*pushComplete_f)(csharp_ASAHost); }
void nextOutputRecord() { (*nextOutputRecord_f)(csharp_ASAHost); }
void registerArrowMemory(jbyte* outputBuffer, int outputBufferLength, jbyte* inputBuffer, int inputBufferLength)
{(*registerArrowMemory_f)(csharp_ASAHost, outputBuffer, outputBufferLength, inputBuffer, inputBufferLength); }
void getOutputSchema(const char16_t* sqlPtr, int sqlLen) {(*getOutputSchema_f)(csharp_ASAHost, sqlPtr, sqlLen);}
};
JNIEXPORT jlong JNICALL Java_com_microsoft_ASANative_startCLR
(JNIEnv *env, jclass, jstring tempDir) {
try
{
StringGuard tempDirStr(env, tempDir);
auto clr = new CLRHost();
clr->initialize(tempDirStr.c_str());
return (jlong)clr;
}
catch(std::exception& e)
{
env->ThrowNew(env->FindClass("java/lang/Exception"), e.what());
}
}
JNIEXPORT jlong JNICALL Java_com_microsoft_ASANative_startASA
(JNIEnv *env, jclass cls, jlong clrPtr, jstring tempDir, jstring sql) {
ASAHost* host = nullptr;
try
{
StringGuard tempDirStr(env, tempDir);
StringUniGuard sqlStr(env, sql);
host = new ASAHost((CLRHost*)clrPtr);
host->initializeCLR(
tempDirStr.c_str(),
sqlStr.c_str(), sqlStr.length());
return (jlong)host;
}
catch(std::exception& e)
{
if (host)
delete host;
// forward exception
env->ThrowNew(env->FindClass("java/lang/Exception"), e.what());
}
}
JNIEXPORT void JNICALL Java_com_microsoft_ASANative_getOutputSchema
(JNIEnv *env, jclass cls, jlong ptr, jstring sql) {
try {
StringUniGuard sqlStr(env, sql);
((ASAHost*)ptr)->getOutputSchema((char16_t*)sqlStr.c_str(), sqlStr.length());
}
catch(std::exception& e)
{ env->ThrowNew(env->FindClass("java/lang/Exception"), e.what()); }
}
JNIEXPORT void JNICALL Java_com_microsoft_ASANative_registerArrowMemory
(JNIEnv *env, jclass cls, jlong ptr, jobject outputBuffer, jint outputBufferLength,
jobject inputBuffer, jint inputBufferLength)
{
try {
jbyte* bbuf_out;
jbyte* bbuf_in;
bbuf_out = (jbyte*)(env->GetDirectBufferAddress(outputBuffer));
bbuf_in = (jbyte*)(env->GetDirectBufferAddress(inputBuffer));
((ASAHost*)ptr)->registerArrowMemory(bbuf_out, outputBufferLength, bbuf_in, inputBufferLength);
}
catch (std::exception& e)
{ env->ThrowNew(env->FindClass("java/lang/Exception"), e.what()); }
}
JNIEXPORT jint JNICALL Java_com_microsoft_ASANative_pushRecord
(JNIEnv *env, jclass, jlong ptr) {
try { return ((ASAHost*)ptr)->pushRecord(); }
catch(std::exception& e)
{ env->ThrowNew(env->FindClass("java/lang/Exception"), e.what()); }
}
JNIEXPORT jint JNICALL Java_com_microsoft_ASANative_pushComplete
(JNIEnv *env, jclass, jlong ptr) {
try { return ((ASAHost*)ptr)->pushComplete(); }
catch(std::exception& e)
{ env->ThrowNew(env->FindClass("java/lang/Exception"), e.what()); }
}
JNIEXPORT void JNICALL Java_com_microsoft_ASANative_nextOutputRecord
(JNIEnv *env, jclass, jlong ptr) {
try { ((ASAHost*)ptr)->nextOutputRecord(); }
catch(std::exception& e)
{ env->ThrowNew(env->FindClass("java/lang/Exception"), e.what()); }
}
JNIEXPORT void JNICALL Java_com_microsoft_ASANative_stopASA
(JNIEnv *env, jclass, jlong ptr) {
try {
ASAHost* host = (ASAHost*)ptr;
host->freeCLR();
delete host;
}
catch(std::exception& e)
{ env->ThrowNew(env->FindClass("java/lang/Exception"), e.what()); }
}

29
cpp/CMakeLists.txt Normal file
Просмотреть файл

@ -0,0 +1,29 @@
cmake_minimum_required (VERSION 3.8)
project (cpp)
set(CMAKE_CXX_STANDARD 14)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
# 64bit compile
SET(CMAKE_CXX_FLAGS "-m64")
SET(CMAKE_C_FLAGS "-m64")
SET(CMAKE_EXE_LINKER_FLAGS "-m64")
SET(CMAKE_MODULE_LINKER_FLAGS "-m64")
SET(CMAKE_SHARED_LINKER_FLAGS "-m64")
set(CMAKE_POSITION_INDEPENDENT_CODE ON) # fPIC
find_package(Java REQUIRED)
find_package(JNI REQUIRED)
include(UseJava)
message (STATUS "JNI_INCLUDE_DIRS=${JNI_INCLUDE_DIRS}")
message (STATUS "JNI_LIBRARIES=${JNI_LIBRARIES}")
create_javah(TARGET javaheaders
CLASSES com.microsoft.ASANative
CLASSPATH ../../java/target/classes)
add_library(asa SHARED ASANative.cpp utils.hpp)
target_link_libraries(asa stdc++fs)
target_include_directories(asa PUBLIC ${JNI_INCLUDE_DIRS})

50
cpp/utils.hpp Normal file
Просмотреть файл

@ -0,0 +1,50 @@
//#pragma once
#include <cstdlib>
#include <set>
#include <string>
#include <cstring>
// Prototype of the coreclr_initialize function from the libcoreclr.so
typedef int (coreclrInitializeFunction)(
const char* exePath,
const char* appDomainFriendlyName,
int propertyCount,
const char** propertyKeys,
const char** propertyValues,
void** hostHandle,
unsigned int* domainId);
// Prototype of the coreclr_shutdown function from the libcoreclr.so
typedef int (coreclrShutdownFunction)(
void* hostHandle,
unsigned int domainId);
// Prototype of the coreclr_execute_assembly function from the libcoreclr.so
typedef int (coreclrCreateDelegateFunction)(
void* hostHandle,
unsigned int domainId,
const char* entryPointAssemblyName,
const char* entryPointTypeName,
const char* entryPointMethodName,
void** delegate);
//#if not defined ( __GNUC__ ) || __GNUC__ < 5 || ( __GNUC__ == 5 && __GNUC_MINOR__ < 3 )
// #error THIS SOFTWARE CURRENTLY BUILDS ONLY ON GCC 5.3 OR NEWER!
//#endif
#include <experimental/filesystem>
namespace SCCH_fs = std::experimental::filesystem;
void AddFilesFromDirectoryToTpaList( std::string directory, std::string& tpaList ) {
for ( auto& dirent : SCCH_fs::directory_iterator(directory) ) {
std::string path = dirent.path();
if ( ! path.compare(path.length() - 4, 4, ".dll") ) {
tpaList.append(path + ":");
}
}
}

55
dockerfile Normal file
Просмотреть файл

@ -0,0 +1,55 @@
# Use Microsoft Docker Image containing dotnet built from Ubuntu
# https://hub.docker.com/_/microsoft-dotnet-core-sdk
FROM mcr.microsoft.com/dotnet/core/sdk:3.1-bionic
RUN apt-get update \
&& apt-get install -y --no-install-recommends software-properties-common
RUN add-apt-repository -y ppa:openjdk-r/ppa
# Install g++
# Install java8 (CMakeList.txt has a dependency on java 8)
RUN apt-get update \
&& apt-get install -y build-essential \
openjdk-8-jdk \
cmake \
maven \
&& rm -rf /var/lib/apt/lists/*
ENV JAVA_HOME '/usr/lib/jvm/java-8-openjdk-amd64/'
# Install sbt
RUN echo "deb https://dl.bintray.com/sbt/debian /" | tee -a /etc/apt/sources.list.d/sbt.list
RUN curl -sL "https://keyserver.ubuntu.com/pks/lookup?op=get&search=0x2EE0EA64E40A89B84B2DF73499E82A75642AC823" | apt-key add
RUN apt-get update && apt-get install -y sbt && rm -rf /var/lib/apt/lists/*
# create repo dir to store ASASpark source code
# Note that this is also referred in scala/built.sbt
ENV WORKDIR '/repos/ASASpark'
RUN mkdir -p $WORKDIR
# Set working dir as repo root
WORKDIR $WORKDIR
# move code from local disk to container
COPY cpp cpp
COPY dotnet dotnet
COPY java java
COPY scala scala
COPY build.sh build.sh
# Install Azure Artifact Credential provider to interactively acquire credentials for Azure Artifacts
# https://github.com/microsoft/artifacts-credprovider
RUN cd dotnet && ./installcredprovider.sh
# Open the URL in a browser and enter the code when prompted for auth
RUN cd dotnet && dotnet restore --interactive
# Argument to control the purpose of creating the docker image
# 'build' (default mode for those interested in just creating the ASA-x.x.x.jar out of this src code repository) - this will create the ASA-x.x.x.jar while building docker image
# 'debug' (for developers to troubleshoot and work on the source code) - this will not create the jar while building the docker image using this dockerfile. This will just configure the environment and rely on developers to start and connect to the docker container to further develop/troubleshoot
ARG mode=build
# Either create the jar (publish) or do not create (rely on developer to enter into the container and develop/troubleshoot)
RUN ./build.sh $mode
CMD ["/bin/bash"]

356
dotnet/Managed.cs Normal file
Просмотреть файл

@ -0,0 +1,356 @@
using System;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
using Microsoft.Streaming.QueryRuntime.Abstraction.RuntimeTypesContracts;
using Microsoft.EventProcessing.TranslatorTransform;
using Microsoft.EventProcessing.SteamR.Sql;
using Microsoft.EventProcessing.SqlQueryRunner;
using Microsoft.EventProcessing.Compiler;
using Microsoft.EventProcessing.RuntimeTypes;
using Microsoft.EventProcessing.SteamR.Sql.Runtime;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Subjects;
using System.IO;
using Apache.Arrow.Ipc;
using Apache.Arrow.Types;
using Apache.Arrow.Memory;
using Apache.Arrow;
public class ASAHost{
private readonly Subject<IRecord> input = new Subject<IRecord>();
private readonly Dictionary<string, IObservable<IRecord>> outputs;
private Queue<IRecord> outputRecords = new Queue<IRecord>();
private IRecordSchema schema;
private Schema outputArrowSchema;
private NativeMemoryAllocator memoryAllocator;
private IRecord outputCurrent;
private static readonly DateTime epoch = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc);
private const int MillisecToTickRatio = 1000;
private const int MicrosecToMillisecRatio = 10;
private UnmanagedMemoryStream outputBuffer;
private UnmanagedMemoryStream inputBuffer;
private ArrowStreamReader reader;
private ArrowStreamWriter writer;
public ASAHost(string sql) {
//Console.WriteLine("The sql query is: " + sql);
// Console.WriteLine(inputSchema);
this.outputs = SqlQueryRunner.Query(
sql,
new CompilerConfig() {SqlCompatibility = new SqlCompatibility()},
ClrFramework.NetStandard20,
QueryHelper.BinLocations[ClrFramework.NetStandard20],
new Dictionary<string, Subject<IRecord>>() { { "input", this.input } });
if (this.outputs.Count != 1)
throw new ArgumentException("Query: '" + sql + "' returned 0 or more than 1 output: " + this.outputs.Count);
// TODO: disposable
this.outputs.First().Value.Subscribe(r => this.outputRecords.Enqueue(r));
//System.ObservableExtensions.Subscribe(this.outputs.First().Value, r => this.outputRecords.Enqueue(r));
this.memoryAllocator = new NativeMemoryAllocator(alignment: 64);
// <typename> <nullable: T|F> <fieldname>
// bigint T timestamp,int F value
//this.schema = DeserializeSchema(inputSchema);
}
public void nextOutputRecord_i() {
this.outputCurrent = this.outputRecords.Dequeue();
}
public int pushRecord_i() {
// reset the position of the buffer and do a full batch read;
outputBuffer.Position = 0;
//Console.WriteLine("New push Record is called in c#");
RecordBatch currentBatch = reader.ReadNextRecordBatch();
//Console.WriteLine("Newly read recordBatch length: " + currentBatch.Length);
for (int i = 0; i < currentBatch.Length; i ++)
{
var newInputRow = new object[this.schema.Ordinals.Count];
for (int j = 0; j < this.schema.Ordinals.Count; j ++)
{
var array_j = currentBatch.Arrays.ElementAt(j);
switch (array_j.Data.DataType.TypeId)
{
case ArrowTypeId.Int32:
newInputRow[j] = (Int64)(((Int32Array)array_j).Values[i]);
break;
case ArrowTypeId.Int64:
newInputRow[j] = ((Int64Array)array_j).Values[i];
break;
case ArrowTypeId.Float:
newInputRow[j] = (Double)(((FloatArray)array_j).Values[i]);
break;
case ArrowTypeId.Double:
newInputRow[j] = ((DoubleArray)array_j).Values[i];
break;
case ArrowTypeId.String:
newInputRow[j] = ((StringArray)array_j).GetString(i);
break;
case ArrowTypeId.Timestamp:
TimestampArray tArray = (TimestampArray)array_j;
var type = (TimestampType) tArray.Data.DataType;
double timeStampMilli = tArray.Values[i] / MillisecToTickRatio;
DateTime dtDateTime = epoch.AddMilliseconds(timeStampMilli);
//Console.WriteLine("tArray data Type: " + type.Unit + " Tick Value: " + timeStampMilli + " time: " + dtDateTime);
newInputRow[j] = dtDateTime;
break;
case ArrowTypeId.Binary:
newInputRow[j] = ((BinaryArray)array_j).GetBytes(i).ToArray();
break;
case ArrowTypeId.Boolean:
newInputRow[j] = ((BooleanArray)array_j).GetBoolean(i);
break;
default:
throw new Exception("Unsupported Arrow array type: " + array_j.Data.DataType.TypeId);
}
}
this.input.OnNext(Record.Create(this.schema, newInputRow));
}
//Write outputs if there is any
if (this.outputRecords.Count > 0)
{
List<IRecord> rows = new List<IRecord>();
while (this.outputRecords.Count > 0)
{
rows.Add(this.outputRecords.Dequeue());
}
var recordBatch = createOutputRecordBatch(rows);
WriteRecordBatch(recordBatch);
return recordBatch.Length;
}
return 0;
}
private RecordBatch createOutputRecordBatch(List<IRecord> rows)
{
var recordBatchBuilder = new RecordBatch.Builder(memoryAllocator);
for(int i = 0; i < this.outputArrowSchema.Fields.Count; i ++)
{
var field = this.outputArrowSchema.GetFieldByIndex(i);
switch (field.DataType.TypeId)
{
case ArrowTypeId.Int64:
recordBatchBuilder.Append(field.Name, field.IsNullable, col => col.Int64(
array => array.AppendRange(rows.Select(row => Convert.ToInt64(row[i])))));
break;
case ArrowTypeId.Double:
recordBatchBuilder.Append(field.Name, field.IsNullable, col => col.Double(
array => array.AppendRange(rows.Select(row => Convert.ToDouble(row[i])))));
break;
case ArrowTypeId.String:
recordBatchBuilder.Append(field.Name, field.IsNullable, col => col.String(
array => array.AppendRange(rows.Select(row => Convert.ToString(row[i])))));
break;
case ArrowTypeId.Timestamp:
recordBatchBuilder.Append(field.Name, field.IsNullable, col => col.Int64(
array => array.AppendRange(rows.Select(row => (((DateTime)row[i]).Ticks - epoch.Ticks) / MicrosecToMillisecRatio))));
break;
case ArrowTypeId.Binary:
recordBatchBuilder.Append(field.Name, field.IsNullable, col => col.Binary(
array => array.AppendRange(rows.Select(row => (byte[])(row[i])))));
break;
case ArrowTypeId.Boolean:
recordBatchBuilder.Append(field.Name, field.IsNullable, col => col.Boolean(
array => array.AppendRange(rows.Select(row => Convert.ToBoolean(row[i])))));
break;
default: throw new Exception("Unsupported Arrow type of output arrow schema: " + field.DataType.TypeId);
}
}
return recordBatchBuilder.Build();
}
private void WriteRecordBatch(RecordBatch batch)
{
inputBuffer.Position = 0;
writer.WriteRecordBatchAsync(batch).GetAwaiter().GetResult();
}
public int pushComplete_i()
{
this.input.OnCompleted();
if (this.outputRecords.Count > 0)
{
List<IRecord> rows = new List<IRecord>();
while (this.outputRecords.Count > 0)
{
rows.Add(this.outputRecords.Dequeue());
}
var recordBatch = createOutputRecordBatch(rows);
WriteRecordBatch(recordBatch);
return recordBatch.Length;
}
return 0;
}
public static IntPtr createASAHost(IntPtr sqlPtr, int sqlLength) {
string sql = Marshal.PtrToStringUni(sqlPtr, sqlLength);
// System.Console.WriteLine("C#.createASAHost: " + sqlLength + ": " + sql);
GCHandle gch = GCHandle.Alloc(new ASAHost(sql));
return GCHandle.ToIntPtr(gch);
}
public static void registerArrowMemory(IntPtr ptr, IntPtr outputBuffer,
int outputBufferLength, IntPtr inputBuffer, int inputBufferLength) {
Console.WriteLine("Registering shared memory===============================================================================");
((ASAHost)GCHandle.FromIntPtr(ptr).Target).registerArrowMemoryInner(outputBuffer, outputBufferLength,
inputBuffer, inputBufferLength);
}
public void registerArrowMemoryInner(IntPtr outputBuffer, int outputBufferLength, IntPtr inputBuffer, int inputBufferLength)
{
unsafe{
this.outputBuffer = new UnmanagedMemoryStream((byte*) outputBuffer.ToPointer(), outputBufferLength);
this.inputBuffer = new UnmanagedMemoryStream((byte*) inputBuffer.ToPointer(), inputBufferLength, inputBufferLength, FileAccess.Write);
}
}
private static IRecordSchema ArrowSchemaToASARecordSchema(Schema arrowSchema)
{
var asaFields = arrowSchema.Fields
.Select(kv => RecordSchema.Property(kv.Key, ArrowTypeToASAType(kv.Value.DataType)))
.ToArray();
return RecordSchema.CreateStrict(asaFields);
}
private static ITypeSchema ArrowTypeToASAType(IArrowType arrowType)
{
switch(arrowType.TypeId)
{
case ArrowTypeId.Int32:
case ArrowTypeId.Int64:
return PrimitiveSchema.BigintSchema;
case ArrowTypeId.Float:
case ArrowTypeId.Double:
return PrimitiveSchema.DoubleSchema;
case ArrowTypeId.String:
return PrimitiveSchema.StringSchema;
case ArrowTypeId.Timestamp:
return PrimitiveSchema.DateTimeSchema;
case ArrowTypeId.Binary:
return PrimitiveSchema.BinarySchema;
case ArrowTypeId.Boolean:
return PrimitiveSchema.BitSchema;
default: throw new Exception("Unsupported Arrow type: " + arrowType.TypeId);
}
}
private static IArrowType ASATypeToArrowType(ITypeSchema asaTypeSchema)
{
if (asaTypeSchema == PrimitiveSchema.BigintSchema)
return Int64Type.Default;
if (asaTypeSchema == PrimitiveSchema.DoubleSchema)
return DoubleType.Default;
if (asaTypeSchema == PrimitiveSchema.StringSchema)
return StringType.Default;
if (asaTypeSchema == PrimitiveSchema.DateTimeSchema)
return new TimestampType(TimeUnit.Microsecond, "UTC");
if (asaTypeSchema == PrimitiveSchema.BinarySchema)
return BinaryType.Default;
if (asaTypeSchema == PrimitiveSchema.BitSchema)
return BooleanType.Default;
if (asaTypeSchema == PrimitiveSchema.ObjectSchema)
throw new Exception("Unsupport ASA type Object type.");
throw new Exception("Unsupport ASA type: " + asaTypeSchema);
}
public static void getOutputSchema(IntPtr ptr, IntPtr sqlPtr, int sqlLen)
{
string sql = Marshal.PtrToStringUni(sqlPtr, sqlLen);
((ASAHost)GCHandle.FromIntPtr(ptr).Target).getOutputSchemaInner(sql);
}
public void getOutputSchemaInner(string sql) {
reader = new ArrowStreamReader(this.outputBuffer, leaveOpen: false);
// reader one batch to get the arrow schema first
reader.ReadNextRecordBatch();
//The batch should be null since no record should be outputted at this point
//Assert.IsNull(nullBatch);
//Console.WriteLine("Arrow input Schema: " + reader.Schema);
this.schema = ArrowSchemaToASARecordSchema(reader.Schema);
//Console.WriteLine("input ASA schema: " + this.schema);
var result =
SqlCompiler.Compile(
sql,
new QueryBindings(
new Dictionary<string, InputDescription> { { "input", new InputDescription(this.schema, InputType.Stream) } }) );
var step = result.Steps.First();
Schema.Builder builder = new Schema.Builder();
foreach (KeyValuePair<string, int> kv in step.Output.PayloadSchema.Ordinals.OrderBy(kv => kv.Value))
{
builder = builder.Field(f => f.Name(kv.Key).DataType(ASATypeToArrowType(step.Output.PayloadSchema[kv.Value].Schema)).Nullable(false));
}
this.outputArrowSchema = builder.Build();
//Console.WriteLine("Arrow output Schema: " + this.outputArrowSchema.Fields.Count);
this.writer = new ArrowStreamWriter(this.inputBuffer, this.outputArrowSchema);
//Write empty batch to send the schema to Java side
var emptyRecordBatch = createOutputRecordBatch(new List<IRecord>());
//writer.WriteRecordBatchAsync(new RecordBatch(this.outputArrowSchema, new IArrowArray[this.outputArrowSchema.Fields.Count] ,0)).GetAwaiter().GetResult();
WriteRecordBatch(emptyRecordBatch);
}
public static void stringFree(IntPtr strPtr) {
Marshal.FreeHGlobal(strPtr);
}
//////////////////////////////////////////////////////////////////////////////
public static void nextOutputRecord(IntPtr ptr) {
((ASAHost)GCHandle.FromIntPtr(ptr).Target).nextOutputRecord_i();
}
public static int pushRecord(IntPtr ptr) {
return ((ASAHost)GCHandle.FromIntPtr(ptr).Target).pushRecord_i();
}
public static int pushComplete(IntPtr ptr) {
return ((ASAHost)GCHandle.FromIntPtr(ptr).Target).pushComplete_i();
}
public static void deleteASAHost(IntPtr ptr) {
var gch = GCHandle.FromIntPtr(ptr);
// ((ASAHost)gch.Target).close(); // TODO
gch.Free();
}
}

44
dotnet/Managed.csproj Normal file
Просмотреть файл

@ -0,0 +1,44 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>library</OutputType>
<TargetFramework>netcoreapp2.2</TargetFramework>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<PlatformTarget>x64</PlatformTarget>
<AutoGenerateBindingRedirects>true</AutoGenerateBindingRedirects>
<GenerateBindingRedirectsOutputType>true</GenerateBindingRedirectsOutputType>
<CopyLocalLockFileAssemblies>true</CopyLocalLockFileAssemblies>
<CopyLocal>true</CopyLocal>
</PropertyGroup>
<!-- <ItemGroup><Reference Include="DataModel.Serialization"><HintPath>asa/DataModel.Serialization.dll</HintPath></Reference></ItemGroup>
<ItemGroup><Reference Include="FSharp.Core.Extensions"><HintPath>asa/FSharp.Core.Extensions.dll</HintPath></Reference></ItemGroup>
<ItemGroup><Reference Include="JavaScriptHost"><HintPath>asa/JavaScriptHost.dll</HintPath></Reference></ItemGroup>
<ItemGroup><Reference Include="Martingales"><HintPath>asa/Martingales.dll</HintPath></Reference></ItemGroup>
<ItemGroup><Reference Include="Microsoft.EventProcessing.Compiler.Operators"><HintPath>asa/Microsoft.EventProcessing.Compiler.Operators.dll</HintPath></Reference></ItemGroup>
<ItemGroup><Reference Include="Microsoft.EventProcessing.HostedRuntimes"><HintPath>asa/Microsoft.EventProcessing.HostedRuntimes.dll</HintPath></Reference></ItemGroup>
<ItemGroup><Reference Include="Microsoft.EventProcessing.RuntimeTypes"><HintPath>asa/Microsoft.EventProcessing.RuntimeTypes.dll</HintPath></Reference></ItemGroup>
<ItemGroup><Reference Include="Microsoft.EventProcessing.SqlQueryRunner"><HintPath>asa/Microsoft.EventProcessing.SqlQueryRunner.dll</HintPath></Reference></ItemGroup>
<ItemGroup><Reference Include="Microsoft.EventProcessing.SteamR.Globalization"><HintPath>asa/Microsoft.EventProcessing.SteamR.Globalization.dll</HintPath></Reference></ItemGroup>
<ItemGroup><Reference Include="Microsoft.EventProcessing.SteamR.Sql.Runtime"><HintPath>asa/Microsoft.EventProcessing.SteamR.Sql.Runtime.dll</HintPath></Reference></ItemGroup>
<ItemGroup><Reference Include="Microsoft.EventProcessing.SteamR.Sql"><HintPath>asa/Microsoft.EventProcessing.SteamR.Sql.dll</HintPath></Reference></ItemGroup>
<ItemGroup><Reference Include="Microsoft.EventProcessing.SteamR"><HintPath>asa/Microsoft.EventProcessing.SteamR.dll</HintPath></Reference></ItemGroup>
<ItemGroup><Reference Include="Microsoft.StreamProcessing.NRT"><HintPath>asa/Microsoft.StreamProcessing.NRT.dll</HintPath></Reference></ItemGroup>
<ItemGroup><Reference Include="Microsoft.Streaming.Globalization"><HintPath>asa/Microsoft.Streaming.Globalization.dll</HintPath></Reference></ItemGroup>
<ItemGroup><Reference Include="Microsoft.Streaming.MachineLearning.Common"><HintPath>asa/Microsoft.Streaming.MachineLearning.Common.dll</HintPath></Reference></ItemGroup>
<ItemGroup><Reference Include="Microsoft.Streaming.QueryRuntimeAbstraction"><HintPath>asa/Microsoft.Streaming.QueryRuntimeAbstraction.dll</HintPath></Reference></ItemGroup>
<ItemGroup><Reference Include="SteamRSDKServerSide"><HintPath>asa/SteamRSDKServerSide.dll</HintPath></Reference></ItemGroup>
<ItemGroup><Reference Include="System.Linq.CompilerServices"><HintPath>asa/System.Linq.CompilerServices.dll</HintPath></Reference></ItemGroup>
<ItemGroup><Reference Include="System.Linq.Expressions.Bonsai.Serialization"><HintPath>asa/System.Linq.Expressions.Bonsai.Serialization.dll</HintPath></Reference></ItemGroup>
<ItemGroup><Reference Include="System.Linq.Expressions.Bonsai"><HintPath>asa/System.Linq.Expressions.Bonsai.dll</HintPath></Reference></ItemGroup>
-->
<ItemGroup>
<PackageReference Include="Microsoft.CodeAnalysis.CSharp.Scripting" Version="3.1.0" />
<PackageReference Include="System.Reactive.Core" Version="4.0.0" />
<PackageReference Include="System.Reactive.Interfaces" Version="4.0.0" />
<PackageReference Include="System.Reactive.Linq" Version="4.0.0" />
<PackageReference Include="Azure.Messaging.EventHubs" Version="5.0.0-preview.5" />
<PackageReference Include="Trill" Version="2019.9.4.2" />
<PackageReference Include="Microsoft.Azure.StreamInsight.LocalQueryRuntime" Version="1.2.64719.1" />
<PackageReference Include="Apache.Arrow" Version="0.14.1" />
<PackageReference Include="FSharp.Core" Version="4.5.0.0" />
</ItemGroup>
</Project>

Просмотреть файл

@ -0,0 +1,33 @@
#!/usr/bin/env bash
# DESCRIPTION: A simple shell script designed to fetch the latest version
# of the artifacts credential provider plugin for dotnet and
# install it into $HOME/.nuget/plugins.
# SEE: https://github.com/Microsoft/artifacts-credprovider/blob/master/README.md
REPO="Microsoft/artifacts-credprovider"
FILE="Microsoft.NuGet.CredentialProvider.tar.gz"
VERSION="latest"
# URL pattern documented at https://help.github.com/en/articles/linking-to-releases as of 2019-03-29
URI="https://github.com/$REPO/releases/$VERSION/download/$FILE"
NUGET_PLUGIN_DIR="$HOME/.nuget/plugins"
# Ensure plugin directory exists
if [ ! -d "${NUGET_PLUGIN_DIR}" ]; then
echo "INFO: Creating the nuget plugin directory (i.e. ${NUGET_PLUGIN_DIR}). "
if ! mkdir -p "${NUGET_PLUGIN_DIR}"; then
echo "ERROR: Unable to create nuget plugins directory (i.e. ${NUGET_PLUGIN_DIR})."
exit 1
fi
fi
echo "Downloading from $URI"
# Extract netcore from the .tar.gz into the plugin directory
#Fetch the file
curl -H "Accept: application/octet-stream" \
-s \
-S \
-L \
"$URI" | tar xz -C "$HOME/.nuget/" "plugins/netcore"
echo "INFO: credential provider netcore plugin extracted to $HOME/.nuget/"

14
dotnet/nuget.config Normal file
Просмотреть файл

@ -0,0 +1,14 @@
<?xml version="1.0" encoding="utf-8"?>
<configuration>
<packageSources>
<!-- When <clear /> is present, previously defined sources are ignored -->
<!-- Remove this tag or un-comment the nuget.org source below to restore packages from nuget.org -->
<!-- For more info, see https://docs.nuget.org/consume/nuget-config-file -->
<clear />
<add key="SQLIS_ASI" value="https://msdata.pkgs.visualstudio.com/DefaultCollection/_packaging/SQLIS_ASI/nuget/v3/index.json" />
<add key="nuget.org" value="https://api.nuget.org/v3/index.json" />
</packageSources>
<activePackageSource>
<add key="All" value="(Aggregate source)" />
</activePackageSource>
</configuration>

198
java/pom.xml Normal file
Просмотреть файл

@ -0,0 +1,198 @@
<project>
<groupId>asainspark</groupId>
<artifactId>ASA</artifactId>
<modelVersion>4.0.0</modelVersion>
<name>ASA</name>
<packaging>jar</packaging>
<version>0.0.1</version>
<repositories>
<repository>
<id>apache</id>
<url>https://repository.apache.org/content/repositories/releases</url>
</repository>
<repository>
<id>central2</id>
<url>http://central.maven.org/maven2/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<scope>provided</scope>
<version>2.4.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<scope>provided</scope>
<version>2.4.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.arrow/arrow-vector -->
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
<version>0.14.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.arrow/arrow-memory -->
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory</artifactId>
<version>0.14.1</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.10.3</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
</dependencies>
<properties>
<java.version>1.8</java.version>
</properties>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.2.2</version>
<!-- The configuration of the plugin -->
<configuration>
<!-- TODO: remove -->
<!-- Specifies the configuration file of the assembly plugin -->
<descriptors>
<descriptor>src/main/assembly/assembly.xml</descriptor>
</descriptors>
</configuration>
</plugin>
<!-- copy CLR -->
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.1.0</version>
<executions>
<execution>
<id>copy-clr</id>
<phase>validate</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<outputDirectory>${basedir}/target/extra-resources/clr</outputDirectory>
<resources>
<resource>
<directory>/usr/share/dotnet/shared/Microsoft.NETCore.App/3.1.1</directory>
</resource>
</resources>
</configuration>
</execution>
<execution>
<id>copy-native</id>
<phase>validate</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<addDefaultExcludes>false</addDefaultExcludes>
<outputDirectory>${basedir}/target/extra-resources/binding</outputDirectory>
<resources>
<resource>
<directory>${basedir}/../cpp/build</directory>
<includes>
<include>libasa.so</include>
</includes>
</resource>
</resources>
</configuration>
</execution>
<execution>
<id>copy-deps</id>
<phase>validate</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<outputDirectory>${basedir}/target/extra-resources/binding</outputDirectory>
<resources>
<resource>
<directory>${user.home}/.nuget/packages/trill/2019.9.4.2/lib/netstandard2.0</directory>
</resource>
<resource>
<directory>${user.home}/.nuget/packages/system.reactive.core/4.0.0/lib/netstandard2.0</directory>
</resource>
<resource>
<directory>${basedir}/../dotnet/bin/Debug/netcoreapp2.2</directory>
</resource>
<resource>
<directory>${basedir}/../dotnet/deps</directory>
</resource>
</resources>
</configuration>
</execution>
<execution>
<id>copy-codegen</id>
<phase>validate</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<outputDirectory>${basedir}/target/extra-resources/binding/CodeGenRefs/netstandard2.0</outputDirectory>
<resources>
<resource>
<directory>${user.home}/.nuget/packages/microsoft.azure.streaminsight.localqueryruntime/1.2.64719.1/lib/netstandard2.0/CodeGenRefs</directory>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
<!-- Includes the runtime dependencies -->
<plugin>
<groupId>org.dstovall</groupId>
<artifactId>onejar-maven-plugin</artifactId>
<version>1.4.4</version>
<executions>
<execution>
<goals>
<goal>one-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</pluginManagement>
<resources>
<resource>
<directory>${basedir}/target/extra-resources</directory>
</resource>
<resource>
<directory>src/main/resources</directory>
<filtering>true</filtering>
</resource>
</resources>
</build>
<!-- One-Jar is in the googlecode repository -->
<pluginRepositories>
<pluginRepository>
<id>onejar-maven-plugin.googlecode.com</id>
<url>http://onejar-maven-plugin.googlecode.com/svn/mavenrepo</url>
</pluginRepository>
</pluginRepositories>
</project>

Просмотреть файл

@ -0,0 +1,154 @@
package com.microsoft;
import java.nio.file.*;
import java.net.URL;
import java.io.File;
import java.util.jar.*;
import java.util.*;
import java.nio.ByteBuffer;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
// avoid dependencies for javah
public class ASANative {
public static final String CLR_DIRECTORY = "clr/";
public static final String DLL_BINDING_DIRECTORY = "binding/";
public static final String ASANATIVE_TEMP_DIRECTORY = "asanative";
public static final String LIBASA_SO_FILE = "libasa.so";
static final Logger logger = Logger.getLogger(ASANative.class);
private static void ExtractFolder(Path temp) throws Exception {
File jarFile = new File(ASANative.class.getProtectionDomain().getCodeSource().getLocation().getPath());
JarFile jar = new JarFile(jarFile);
Enumeration<JarEntry> entries = jar.entries();
while(entries.hasMoreElements()) {
final String name = entries.nextElement().getName();
if (name.endsWith(FileSystems.getDefault().getSeparator()))
continue;
if (name.startsWith(CLR_DIRECTORY) || name.startsWith(DLL_BINDING_DIRECTORY)) {
Path parent = Paths.get(name).getParent();
if (parent != null)
Files.createDirectories(temp.resolve(parent));
Files.copy(ASANative.class.getResourceAsStream(FileSystems.getDefault().getSeparator() + name), temp.resolve(name));
}
}
jar.close();
}
private static Path tempDirectory;
private static long clrPtr;
static
{
try {
tempDirectory = Files.createTempDirectory(ASANATIVE_TEMP_DIRECTORY);
logger.info("TEMP DIR: " + tempDirectory);
ExtractFolder(tempDirectory);
System.load(tempDirectory.resolve(DLL_BINDING_DIRECTORY + LIBASA_SO_FILE).toString());
clrPtr = startCLR(tempDirectory.toString());
} catch (Exception e) {
//TODO: handle exception
logger.debug("Unable to extract: " + e.getMessage());
}
}
//ASANative destructor: clean up temp directory created in constructor
public static void deleteTemp()
{
try {
deleteDirectory(tempDirectory.toFile());
}
catch (Exception e)
{
logger.debug("Unable to clean up temp directory: " + e.getMessage());
}
}
private static boolean deleteDirectory(File toDelete)
{
File[] allContents = toDelete.listFiles();
if (allContents != null)
{
for (File file : allContents)
{
deleteDirectory(file);
}
}
return toDelete.delete();
}
public static native long startCLR(String tempDir);
public static long startASA(String sql)
{
try
{
return startASA(clrPtr, tempDirectory.toString(), sql);
}
catch (Exception e)
{
deleteTemp();
throw e;
}
}
public static void stopAndClean(long ptr)
{
stopASA(ptr);
deleteTemp();
}
private static native long startASA(long clrPtr, String tempDir, String sql);
private static native void stopASA(long ptr);
public static void getOutputSchema(String sql, long clrPtr)
{
try
{
getOutputSchema(clrPtr, sql);
}
catch (Exception e)
{
deleteTemp();
throw e;
}
}
private static native void getOutputSchema(long clrPtr, String sql);
public static void registerArrowMemory(ByteBuffer outputBuffer, int outputBufferLength,
ByteBuffer inputBuffer, int inputBufferLength, long ptr)
{
try
{
registerArrowMemory(ptr, outputBuffer, outputBufferLength, inputBuffer, inputBufferLength);
}
catch (Exception e)
{
deleteTemp();
throw e;
}
}
private static native void registerArrowMemory(long ptr, ByteBuffer outputBuffer, int outputBufferLength,
ByteBuffer inputBuffer, int inputBufferLength);
// TODO: this should be column batched to avoid massive amount of calls.
// re-use memory on the java-side and pin on C++ for memory copy?
public static native void nextOutputRecord(long ptr);
// returns rows discovered in output.
public static native int pushRecord(long ptr);
// no more data to come
public static native int pushComplete(long ptr);
}

Просмотреть файл

@ -0,0 +1,477 @@
package com.microsoft;
import io.netty.buffer.ArrowBuf;
import org.apache.arrow.vector.*;
import org.apache.arrow.vector.complex.*;
import org.apache.arrow.vector.holders.NullableVarCharHolder;
import org.apache.spark.sql.execution.arrow.ArrowUtils;
import org.apache.spark.sql.types.*;
import org.apache.spark.unsafe.types.UTF8String;
import org.apache.spark.sql.vectorized.*;
/**
* A column vector backed by Apache Arrow. Currently calendar interval type and map type are not
* supported.
*/
public final class ArrowColVector extends ColumnVector {
private final ArrowVectorAccessor accessor;
private ArrowColVector[] childColumns;
@Override
public boolean hasNull() {
return accessor.getNullCount() > 0;
}
@Override
public int numNulls() {
return accessor.getNullCount();
}
@Override
public void close() {
if (childColumns != null) {
for (int i = 0; i < childColumns.length; i++) {
childColumns[i].close();
childColumns[i] = null;
}
childColumns = null;
}
accessor.close();
}
@Override
public boolean isNullAt(int rowId) {
return accessor.isNullAt(rowId);
}
@Override
public boolean getBoolean(int rowId) {
return accessor.getBoolean(rowId);
}
@Override
public byte getByte(int rowId) {
return accessor.getByte(rowId);
}
@Override
public short getShort(int rowId) {
return accessor.getShort(rowId);
}
@Override
public int getInt(int rowId) {
return accessor.getInt(rowId);
}
@Override
public long getLong(int rowId) {
return accessor.getLong(rowId);
}
@Override
public float getFloat(int rowId) {
return accessor.getFloat(rowId);
}
@Override
public double getDouble(int rowId) {
return accessor.getDouble(rowId);
}
@Override
public Decimal getDecimal(int rowId, int precision, int scale) {
if (isNullAt(rowId)) return null;
return accessor.getDecimal(rowId, precision, scale);
}
@Override
public UTF8String getUTF8String(int rowId) {
if (isNullAt(rowId)) return null;
return accessor.getUTF8String(rowId);
}
@Override
public byte[] getBinary(int rowId) {
if (isNullAt(rowId)) return null;
return accessor.getBinary(rowId);
}
@Override
public ColumnarArray getArray(int rowId) {
if (isNullAt(rowId)) return null;
return accessor.getArray(rowId);
}
@Override
public ColumnarMap getMap(int rowId) {
throw new UnsupportedOperationException();
}
@Override
public ArrowColVector getChild(int ordinal) { return childColumns[ordinal]; }
public ArrowColVector(ValueVector vector) {
super(ArrowUtils.fromArrowField(vector.getField()));
if (vector instanceof BitVector) {
accessor = new BooleanAccessor((BitVector) vector);
} else if (vector instanceof TinyIntVector) {
accessor = new ByteAccessor((TinyIntVector) vector);
} else if (vector instanceof SmallIntVector) {
accessor = new ShortAccessor((SmallIntVector) vector);
} else if (vector instanceof IntVector) {
accessor = new IntAccessor((IntVector) vector);
} else if (vector instanceof BigIntVector) {
accessor = new LongAccessor((BigIntVector) vector);
} else if (vector instanceof Float4Vector) {
accessor = new FloatAccessor((Float4Vector) vector);
} else if (vector instanceof Float8Vector) {
accessor = new DoubleAccessor((Float8Vector) vector);
} else if (vector instanceof DecimalVector) {
accessor = new DecimalAccessor((DecimalVector) vector);
} else if (vector instanceof VarCharVector) {
accessor = new StringAccessor((VarCharVector) vector);
} else if (vector instanceof VarBinaryVector) {
accessor = new BinaryAccessor((VarBinaryVector) vector);
} else if (vector instanceof DateDayVector) {
accessor = new DateAccessor((DateDayVector) vector);
} else if (vector instanceof TimeStampMicroTZVector) {
accessor = new TimestampTZAccessor((TimeStampMicroTZVector) vector);
} else if (vector instanceof TimeStampMicroVector) {
accessor = new TimestampAccessor((TimeStampMicroVector) vector);
} else if (vector instanceof ListVector) {
ListVector listVector = (ListVector) vector;
accessor = new ArrayAccessor(listVector);
} else if (vector instanceof StructVector) {
StructVector structVector = (StructVector) vector;
accessor = new StructAccessor(structVector);
childColumns = new ArrowColVector[structVector.size()];
for (int i = 0; i < childColumns.length; ++i) {
childColumns[i] = new ArrowColVector(structVector.getVectorById(i));
}
} else {
throw new UnsupportedOperationException();
}
}
private abstract static class ArrowVectorAccessor {
private final ValueVector vector;
ArrowVectorAccessor(ValueVector vector) {
this.vector = vector;
}
// TODO: should be final after removing ArrayAccessor workaround
boolean isNullAt(int rowId) {
return vector.isNull(rowId);
}
final int getNullCount() {
return vector.getNullCount();
}
final void close() {
vector.close();
}
boolean getBoolean(int rowId) {
throw new UnsupportedOperationException();
}
byte getByte(int rowId) {
throw new UnsupportedOperationException();
}
short getShort(int rowId) {
throw new UnsupportedOperationException();
}
int getInt(int rowId) {
throw new UnsupportedOperationException();
}
long getLong(int rowId) {
throw new UnsupportedOperationException();
}
float getFloat(int rowId) {
throw new UnsupportedOperationException();
}
double getDouble(int rowId) {
throw new UnsupportedOperationException();
}
Decimal getDecimal(int rowId, int precision, int scale) {
throw new UnsupportedOperationException();
}
UTF8String getUTF8String(int rowId) {
throw new UnsupportedOperationException();
}
byte[] getBinary(int rowId) {
throw new UnsupportedOperationException();
}
ColumnarArray getArray(int rowId) {
throw new UnsupportedOperationException();
}
}
private static class BooleanAccessor extends ArrowVectorAccessor {
private final BitVector accessor;
BooleanAccessor(BitVector vector) {
super(vector);
this.accessor = vector;
}
@Override
final boolean getBoolean(int rowId) {
return accessor.get(rowId) == 1;
}
}
private static class ByteAccessor extends ArrowVectorAccessor {
private final TinyIntVector accessor;
ByteAccessor(TinyIntVector vector) {
super(vector);
this.accessor = vector;
}
@Override
final byte getByte(int rowId) {
return accessor.get(rowId);
}
}
private static class ShortAccessor extends ArrowVectorAccessor {
private final SmallIntVector accessor;
ShortAccessor(SmallIntVector vector) {
super(vector);
this.accessor = vector;
}
@Override
final short getShort(int rowId) {
return accessor.get(rowId);
}
}
private static class IntAccessor extends ArrowVectorAccessor {
private final IntVector accessor;
IntAccessor(IntVector vector) {
super(vector);
this.accessor = vector;
}
@Override
final int getInt(int rowId) {
return accessor.get(rowId);
}
}
private static class LongAccessor extends ArrowVectorAccessor {
private final BigIntVector accessor;
LongAccessor(BigIntVector vector) {
super(vector);
this.accessor = vector;
}
@Override
final long getLong(int rowId) {
return accessor.get(rowId);
}
}
private static class FloatAccessor extends ArrowVectorAccessor {
private final Float4Vector accessor;
FloatAccessor(Float4Vector vector) {
super(vector);
this.accessor = vector;
}
@Override
final float getFloat(int rowId) {
return accessor.get(rowId);
}
}
private static class DoubleAccessor extends ArrowVectorAccessor {
private final Float8Vector accessor;
DoubleAccessor(Float8Vector vector) {
super(vector);
this.accessor = vector;
}
@Override
final double getDouble(int rowId) {
return accessor.get(rowId);
}
}
private static class DecimalAccessor extends ArrowVectorAccessor {
private final DecimalVector accessor;
DecimalAccessor(DecimalVector vector) {
super(vector);
this.accessor = vector;
}
@Override
final Decimal getDecimal(int rowId, int precision, int scale) {
if (isNullAt(rowId)) return null;
return Decimal.apply(accessor.getObject(rowId), precision, scale);
}
}
private static class StringAccessor extends ArrowVectorAccessor {
private final VarCharVector accessor;
private final NullableVarCharHolder stringResult = new NullableVarCharHolder();
StringAccessor(VarCharVector vector) {
super(vector);
this.accessor = vector;
}
@Override
final UTF8String getUTF8String(int rowId) {
accessor.get(rowId, stringResult);
if (stringResult.isSet == 0) {
return null;
} else {
return UTF8String.fromAddress(null,
stringResult.buffer.memoryAddress() + stringResult.start,
stringResult.end - stringResult.start);
}
}
}
private static class BinaryAccessor extends ArrowVectorAccessor {
private final VarBinaryVector accessor;
BinaryAccessor(VarBinaryVector vector) {
super(vector);
this.accessor = vector;
}
@Override
final byte[] getBinary(int rowId) {
return accessor.getObject(rowId);
}
}
private static class DateAccessor extends ArrowVectorAccessor {
private final DateDayVector accessor;
DateAccessor(DateDayVector vector) {
super(vector);
this.accessor = vector;
}
@Override
final int getInt(int rowId) {
return accessor.get(rowId);
}
}
private static class TimestampTZAccessor extends ArrowVectorAccessor {
private final TimeStampMicroTZVector accessor;
TimestampTZAccessor(TimeStampMicroTZVector vector) {
super(vector);
this.accessor = vector;
}
@Override
final long getLong(int rowId) {
return accessor.get(rowId);
}
}
private static class TimestampAccessor extends ArrowVectorAccessor {
private final TimeStampMicroVector accessor;
TimestampAccessor(TimeStampMicroVector vector) {
super(vector);
this.accessor = vector;
}
@Override
final long getLong(int rowId) {
return accessor.get(rowId);
}
}
private static class ArrayAccessor extends ArrowVectorAccessor {
private final ListVector accessor;
private final ArrowColVector arrayData;
ArrayAccessor(ListVector vector) {
super(vector);
this.accessor = vector;
this.arrayData = new ArrowColVector(vector.getDataVector());
}
@Override
final boolean isNullAt(int rowId) {
// TODO: Workaround if vector has all non-null values, see ARROW-1948
if (accessor.getValueCount() > 0 && accessor.getValidityBuffer().capacity() == 0) {
return false;
} else {
return super.isNullAt(rowId);
}
}
@Override
final ColumnarArray getArray(int rowId) {
ArrowBuf offsets = accessor.getOffsetBuffer();
int index = rowId * ListVector.OFFSET_WIDTH;
int start = offsets.getInt(index);
int end = offsets.getInt(index + ListVector.OFFSET_WIDTH);
return new ColumnarArray(arrayData, start, end - start);
}
}
/**
* Any call to "get" method will throw UnsupportedOperationException.
*
* Access struct values in a ArrowColVector doesn't use this accessor. Instead, it uses
* getStruct() method defined in the parent class. Any call to "get" method in this class is a
* bug in the code.
*
*/
private static class StructAccessor extends ArrowVectorAccessor {
StructAccessor(StructVector vector) {
super(vector);
}
}
}

Просмотреть файл

@ -0,0 +1,317 @@
package com.microsoft;
import java.util.Map;
import java.util.List;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Arrays;
import java.util.stream.Collectors;
import java.util.TimeZone;
import java.sql.Timestamp;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.io.*;
import java.lang.Long;
import scala.collection.JavaConverters;
import org.joda.time.DateTimeZone;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.catalyst.expressions.AttributeReference;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.arrow.vector.types.*;
import org.apache.arrow.vector.types.pojo.*;
import org.apache.spark.sql.execution.arrow.ArrowUtils;
import org.apache.arrow.memory.*;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.execution.arrow.ArrowWriter;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
import org.apache.arrow.vector.ipc.WriteChannel;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.message.MessageSerializer;
import org.apache.spark.sql.vectorized.*;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import scala.collection.JavaConversions;
public class AzureStreamAnalytics
{
static final Logger logger = Logger.getLogger(AzureStreamAnalytics.class);
private static final int INTEROP_BUFFER_SIZE = 1024 * 1024; //1MB
private static final int ARROW_BATCH_SIZE = 100; //number of rows to transfer from JVM to DOTNET through arrow in each batch
private static Iterator<Row> processRows(
String sql,
StructType schema,
Iterator<Row> iter) throws IOException
{
final long ptr = ASANative.startASA(sql);
//register byte buffer memory address to ASAnative
ByteBuffer outputBuffer = ByteBuffer.allocateDirect(INTEROP_BUFFER_SIZE);
ByteBuffer inputBuffer = ByteBuffer.allocateDirect(INTEROP_BUFFER_SIZE);
ASANative.registerArrowMemory(outputBuffer, outputBuffer.capacity(),
inputBuffer, inputBuffer.capacity(), ptr);
Schema arrowSchema = ArrowUtils.toArrowSchema(schema, DateTimeZone.UTC.getID());
//Setup the arrow writer
RootAllocator rootAllocator = new RootAllocator(Long.MAX_VALUE);
ByteBufferBackedOutputStream outputStream = new ByteBufferBackedOutputStream(outputBuffer);
BufferAllocator writeAllocator = rootAllocator.newChildAllocator("Arrow writer to C# world", 0, Long.MAX_VALUE);
VectorSchemaRoot outputRoot = VectorSchemaRoot.create(arrowSchema, writeAllocator);
ArrowWriter arrowWriter = ArrowWriter.create(outputRoot);
ArrowStreamWriter writer = new ArrowStreamWriter(outputRoot, null, outputStream);
BufferAllocator readAllocator = rootAllocator.newChildAllocator("Arrow reader from C# world", 0, Long.MAX_VALUE);
ArrowStreamReader reader = new ArrowStreamReader(new ByteBufferBackedInputStream(inputBuffer), readAllocator);
VectorSchemaRoot readRoot = null;
try
{
//Wrtie the input schema to shared memory
WriteChannel writeChannel = new WriteChannel(Channels.newChannel(outputStream));
MessageSerializer.serialize(writeChannel, arrowSchema);
// need to start ASA on driver to get the output schema
// The output schema is also encoded in the shared errow memory as message header
// we need to read it from the shared memory
ASANative.getOutputSchema(sql, ptr);
readRoot = reader.getVectorSchemaRoot();
writer.start();
}
catch (IOException e)
{
logger.debug(e.getMessage());
throw e;
}
ExpressionEncoder<Row> inputEncoder = RowEncoder.apply(schema);
//Get Schema from arrow batch header and setup spark encoder based on the schema
StructType encodeSchema = ArrowUtils.fromArrowSchema(readRoot.getSchema());
logger.info("Encode Schema: " + encodeSchema.toString());
List<Attribute> attributes = JavaConversions.asJavaCollection(encodeSchema.toAttributes()).stream()
.map(Attribute::toAttribute).collect(Collectors.toList());
ExpressionEncoder<Row> encoder = RowEncoder.apply(encodeSchema).resolveAndBind(
JavaConverters.asScalaIteratorConverter(attributes.iterator()).asScala().toSeq(),
org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer$.MODULE$);
return new Iterator<Row>() {
private int rowsToFetch = 0;
private Iterator<InternalRow> internalRowIterator = null;
private boolean completePushed = false;
public boolean hasNext() {
// have more rows that need fetching from native side
if (this.rowsToFetch > 0)
return true;
// are there anymore input rows?
if (!iter.hasNext()) {
if (this.completePushed) {
ASANative.stopAndClean(ptr);
//stop the reader as well
return false;
}
// signal data end
this.rowsToFetch = ASANative.pushComplete(ptr);
this.completePushed = true;
}
else {
do {
int currentBatchRowCount = 0;
do {
// get next row
Row r = iter.next();
//arrowWriter.write(InternalRow.fromSeq(r.toSeq()));
arrowWriter.write(inputEncoder.toRow(r));
currentBatchRowCount ++;
// TODO: round-robin for multiple inputs
} while (iter.hasNext() && currentBatchRowCount < ARROW_BATCH_SIZE);
try
{
//Write Batch
outputBuffer.rewind();
arrowWriter.finish();
writer.writeBatch();
this.rowsToFetch = ASANative.pushRecord(ptr);
//C# side finished reading the buffer so let's rewind the shared buffer
arrowWriter.reset();
if (this.rowsToFetch == 0 && !iter.hasNext()) {
this.rowsToFetch = ASANative.pushComplete(ptr);
this.completePushed = true;
writer.end();
outputRoot.close();
writeAllocator.close();
}
}
catch (IOException e)
{
logger.debug(e.getMessage());
throw new RuntimeException(e);
}
} while (this.rowsToFetch == 0 && iter.hasNext());
}
//We reach here if rowsToFetch > 0 or !iter.hasNext()
return this.rowsToFetch > 0;
}
public Row next() {
this.rowsToFetch--;
if (internalRowIterator == null || !internalRowIterator.hasNext())
{
//Ask native to load next batch in buffer
inputBuffer.rewind();
try
{
VectorSchemaRoot readRoot = reader.getVectorSchemaRoot();
//load the next batch
boolean loadedBatch = reader.loadNextBatch();
if (loadedBatch && readRoot.getRowCount() > 0)
{
ColumnVector[] list = new ColumnVector[readRoot.getFieldVectors().size()];
for (int i = 0; i < readRoot.getFieldVectors().size(); i ++)
{
list[i] = new ArrowColVector(readRoot.getFieldVectors().get(i));
}
ColumnarBatch columnarBatch = new ColumnarBatch(list);
columnarBatch.setNumRows(readRoot.getRowCount());
internalRowIterator = columnarBatch.rowIterator();
}
else
{
throw new UnsupportedOperationException("Could load batch but there is more rows to Fetch!");
}
}
catch (IOException e)
{
e.printStackTrace();
}
}
return encoder.fromRow(internalRowIterator.next());
}
};
}
//This is the main function of executing ASA query
//It parses the schema and calls mapPartitions to process the each partition
public static Dataset<Row> execute(String query, Dataset<Row> input)
{
logger.setLevel(Level.INFO);
StructType sparkSchema = input.schema();
final long ptr = ASANative.startASA(query);
//register byte buffer memory address to ASAnative
ByteBuffer outputBuffer = ByteBuffer.allocateDirect(INTEROP_BUFFER_SIZE);
ByteBuffer inputBuffer = ByteBuffer.allocateDirect(INTEROP_BUFFER_SIZE);
ASANative.registerArrowMemory(outputBuffer, outputBuffer.capacity(),
inputBuffer, inputBuffer.capacity(), ptr);
Schema arrowSchema = ArrowUtils.toArrowSchema(sparkSchema, DateTimeZone.UTC.getID());
//Setup the arrow writer
RootAllocator rootAllocator = new RootAllocator(Long.MAX_VALUE);
ByteBufferBackedOutputStream outputStream = new ByteBufferBackedOutputStream(outputBuffer);
BufferAllocator writeAllocator = rootAllocator.newChildAllocator("Arrow writer to C# world", 0, Long.MAX_VALUE);
VectorSchemaRoot outputRoot = VectorSchemaRoot.create(arrowSchema, writeAllocator);
ArrowWriter arrowWriter = ArrowWriter.create(outputRoot);
ArrowStreamWriter writer = new ArrowStreamWriter(outputRoot, null, outputStream);
BufferAllocator readAllocator = rootAllocator.newChildAllocator("Arrow reader from C# world", 0, Long.MAX_VALUE);
ArrowStreamReader reader = new ArrowStreamReader(new ByteBufferBackedInputStream(inputBuffer), readAllocator);
VectorSchemaRoot readRoot = null;
try
{
//Wrtie the input schema to shared memory
WriteChannel writeChannel = new WriteChannel(Channels.newChannel(outputStream));
MessageSerializer.serialize(writeChannel, arrowSchema);
// need to start ASA on driver to get the output schema
// The output schema is also encoded in the shared errow memory as message header
// we need to read it from the shared memory
ASANative.getOutputSchema(query, ptr);
readRoot = reader.getVectorSchemaRoot();
}
catch (IOException e)
{
logger.debug(e.getMessage());
throw new RuntimeException(e);
}
logger.info("Read Schema: " + readRoot.getSchema().toString());
StructType outputSchema = ArrowUtils.fromArrowSchema(readRoot.getSchema());
logger.info("ASA InputSchema: " + sparkSchema + " OutputSchema: "+ outputSchema);
Encoder<Row> encoder = RowEncoder.apply(outputSchema);
//Use Apache Arrow
return input.mapPartitions(
iter -> processRows(query,
sparkSchema,
iter),
encoder);
}
//helper class to wrap output/input stream around the buffer
public static class ByteBufferBackedOutputStream extends OutputStream{
public ByteBuffer buf;
ByteBufferBackedOutputStream( ByteBuffer buf){
this.buf = buf;
}
public synchronized void write(int b) throws IOException {
buf.put((byte) b);
}
public synchronized void write(byte[] bytes, int off, int len) throws IOException {
buf.put(bytes, off, len);
}
}
public static class ByteBufferBackedInputStream extends InputStream{
public ByteBuffer buf;
ByteBufferBackedInputStream( ByteBuffer buf){
this.buf = buf;
}
public synchronized int read() throws IOException {
if (!buf.hasRemaining()) {
return -1;
}
return buf.get();
}
public synchronized int read(byte[] bytes, int off, int len) throws IOException {
len = Math.min(len, buf.remaining());
buf.get(bytes, off, len);
return len;
}
}
}

Просмотреть файл

@ -0,0 +1,8 @@
# Root logger option
log4j.rootLogger=INFO, stdout
# Direct log messages to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n

12
scala/build.sbt Normal file
Просмотреть файл

@ -0,0 +1,12 @@
name := "asa"
version := "0.1"
scalaVersion := "2.11.8"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % "2.3.1",
"org.apache.spark" %% "spark-mllib" % "2.3.1",
"org.scalatest" %% "scalatest" % "3.0.5" % "test",
"eisber.asa" % "asa" % "0.0.1" from "file:/repos/ASASpark/java/target/ASA-0.0.1.jar"
)

Просмотреть файл

@ -0,0 +1,39 @@
package eisber.asa
import org.apache.spark.sql.DataFrame
// import eisber.Sample1
import scala.collection.Map
class ASA {
def Run(query:String, inputs:Map[String, DataFrame]) = {
println(query)
for ((name, df) <- inputs) {
println(name)
df.show
}
// TODO: remove scala code?
// should be able to invoke the code directly from ADB
// TODO: Scala dataframe pass to Java dataframe
// df.mapPartitions((Iterable[Row] =>
// val asa = new Sample1()
// asa.setInputSchema()
// asa.getOutputSchema()
// asa.feedRow()
// asa.getRow()
// make it sync?
// ))
// create Sample1 Java object
// in constructor load .NET
// create methods to start new row
// void setInputSchema(string[] names, string?[] types)
// (string[], string[] types) getOutputSchema()
// void feedRow(object[] values);
// void setOutputCallback(func<object[]> x) //
// Sample1.run
}
}

Просмотреть файл

@ -0,0 +1,5 @@
ts,category,value
3,sports,5
1,news,4
2,news,1
4,news,2
1 ts category value
2 3 sports 5
3 1 news 4
4 2 news 1
5 4 news 2

Просмотреть файл

@ -0,0 +1,96 @@
package eisber.asa
import org.scalatest._
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import scala.collection.Map
import com.microsoft.AzureStreamAnalytics
class ASASpec extends FlatSpec {
"asa" should "run query" in {
var spark = SparkSession.builder()
.master("local[2]") // 2 ... number of threads
.appName("ASA")
.config("spark.sql.shuffle.partitions", value = 1)
.config("spark.ui.enabled", value = false)
.config("spark.sql.crossJoin.enabled", value = true)
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
val df = spark.sqlContext
.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.option("timestampFormat", "MM-dd-yyyy hh mm ss")
.load(getClass.getResource("/dataset1.csv").getPath)
.repartition(col("category"))
.sortWithinPartitions(col("ts"))
.select(col("ts").cast(LongType), col("category"), col("value"))
df.show
// new ASA().run("SELECT * FROM df", Map("df" -> df))
// new Sample1().intMethod(1)
// ASAExectuctor.
// TODO: policy
// invoked per physical partitions
val newDf = AzureStreamAnalytics.execute(
"SELECT category, System.Timestamp AS ts, COUNT(*) AS n FROM input TIMESTAMP BY DATEADD(second, ts, '2019-11-20T00:00:00Z') GROUP BY TumblingWindow(second, 2), category",
df)
// ASAExecutor_run(sql, {'i1': df1, 'i2': df2})
// df.join(df2, 'left').filter().select()
// spark.sql(' ... ') , saveAsTable(), createTemporaryView()
// - schema input/output
// - additional type support for fields: long, int, datetime [input? output?]
// struct?
// array
// - packaging (include CLR (dotnet core) in .jar)
// nuget?
// github repo?
// github.com/Microsoft/ASA4J ?
// github.com/Azure/ASA4J
// 2min video what this does. the scenario, power point
// Spark multiple nodes, good amount of data to show case
// Spark Summit? April 20
newDf.show
}
/*"asa spark" should "run dotent spark" in {
var spark = SparkSession.builder()
.master("local[2]") // 2 ... number of threads
.appName("ASA")
.config("spark.sql.shuffle.partitions", value = 1)
.config("spark.ui.enabled", value = false)
.config("spark.sql.crossJoin.enabled", value = true)
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
val df = spark.sqlContext
.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load(getClass.getResource("/dataset1.csv").getPath)
.repartition(col("category")) // numPartitions:
.sortWithinPartitions(col("ts"))
.select(col("ts").cast(LongType), col("category"), col("value"))
df.show;
val newDf = AzureStreamAnalytics.executeDotnet(
"SELECT category, System.Timestamp AS ts, COUNT(*) AS n FROM input TIMESTAMP BY DATEADD(second, ts, '1970-01-01T00:00:00Z') GROUP BY TumblingWindow(second, 2), category",
df)
}*/
}