This commit is contained in:
JS 2017-12-07 15:32:15 -08:00 коммит произвёл GitHub
Родитель c12ecebad2
Коммит 7d05011697
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
1 изменённых файлов: 283 добавлений и 0 удалений

Просмотреть файл

@ -0,0 +1,283 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Accelerating Spark with GPU\n",
"\n",
"This notebook will walk through a quick tutorial of how to use Spark with GPUs (using Numba) to accelerate processes."
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"import numpy as np\n",
"from numba import cuda"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
" <div>\n",
" <p><b>SparkContext</b></p>\n",
"\n",
" <p><a href=\"http://10.0.0.4:4040\">Spark UI</a></p>\n",
"\n",
" <dl>\n",
" <dt>Version</dt>\n",
" <dd><code>v2.2.0</code></dd>\n",
" <dt>Master</dt>\n",
" <dd><code>spark://10.0.0.4:7077</code></dd>\n",
" <dt>AppName</dt>\n",
" <dd><code>pyspark-shell</code></dd>\n",
" </dl>\n",
" </div>\n",
" "
],
"text/plain": [
"<SparkContext master=spark://10.0.0.4:7077 appName=pyspark-shell>"
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"sc"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Lets check that your Spark cluster has the correct setup. This includes using a GPU enabled VM as well as using one of the Nvidia Docker images. More info here: https://github.com/Azure/aztk/blob/master/docs/60-gpu.md"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Hurray! You're cluster is gpu enabled!\n"
]
}
],
"source": [
"if cuda.is_available():\n",
" print(\"Hurray! You're cluster is gpu enabled!\")\n",
"else:\n",
" print(\"Check that you are running on gpu enabled vms. You can visit this link to learn more: https://github.com/Azure/aztk/blob/master/docs/60-gpu.md\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Define cuda kernel"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"from numba import jit\n",
"\n",
"@cuda.jit('(float32[:], float32[:])')\n",
"def func(inp, out):\n",
" i = cuda.grid(1)\n",
" if i < out.size:\n",
" out[i] = inp[i] ** 2"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Wrap cuda kernel launching logic"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"def gpu_work(xs):\n",
" inp = np.asarray(list(xs), dtype=np.float32)\n",
" out = np.zeros_like(inp)\n",
" block_size = 32 * 4\n",
" grid_size = (inp.size + block_size - 1)\n",
" func[grid_size, block_size](inp, out)\n",
" return out"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Create Spark RDD partitions"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Partitions 12\n"
]
}
],
"source": [
"rdd = sc.parallelize(list(range(100)))\n",
"print(\"Partitions\", rdd.getNumPartitions())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Apply `gpu_work` on each partition"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[0.0, 1.0, 4.0, 9.0, 16.0, 25.0, 36.0, 49.0, 64.0, 81.0, 100.0, 121.0, 144.0, 169.0, 196.0, 225.0, 256.0, 289.0, 324.0, 361.0, 400.0, 441.0, 484.0, 529.0, 576.0, 625.0, 676.0, 729.0, 784.0, 841.0, 900.0, 961.0, 1024.0, 1089.0, 1156.0, 1225.0, 1296.0, 1369.0, 1444.0, 1521.0, 1600.0, 1681.0, 1764.0, 1849.0, 1936.0, 2025.0, 2116.0, 2209.0, 2304.0, 2401.0, 2500.0, 2601.0, 2704.0, 2809.0, 2916.0, 3025.0, 3136.0, 3249.0, 3364.0, 3481.0, 3600.0, 3721.0, 3844.0, 3969.0, 4096.0, 4225.0, 4356.0, 4489.0, 4624.0, 4761.0, 4900.0, 5041.0, 5184.0, 5329.0, 5476.0, 5625.0, 5776.0, 5929.0, 6084.0, 6241.0, 6400.0, 6561.0, 6724.0, 6889.0, 7056.0, 7225.0, 7396.0, 7569.0, 7744.0, 7921.0, 8100.0, 8281.0, 8464.0, 8649.0, 8836.0, 9025.0, 9216.0, 9409.0, 9604.0, 9801.0]\n"
]
}
],
"source": [
"rdd = rdd.mapPartitions(gpu_work)\n",
"print(rdd.collect())"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"from numba import vectorize\n",
"import math\n",
"\n",
"@vectorize([\"float32(float32, float32)\", \"float64(float64, float64)\"], target='cpu')\n",
"def cpu_some_trig(x, y):\n",
" return math.cos(x) + math.sin(y)\n",
"\n",
"@vectorize([\"float32(float32, float32)\", \"float64(float64, float64)\"], target='cuda')\n",
"def cuda_some_trig(x, y):\n",
" return math.cos(x) + math.sin(y)"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"nelem = 10 ** 6\n",
"xs = np.random.random(nelem).astype(np.float32)\n",
"ys = np.random.random(nelem).astype(np.float32)"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"15.4 ms ± 217 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)\n"
]
}
],
"source": [
"%%timeit\n",
"res = cpu_some_trig(xs, ys)"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"4.54 ms ± 451 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)\n"
]
}
],
"source": [
"%%timeit\n",
"res = cuda_some_trig(xs, ys)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We can observe roughly as 4X speed up with GPUs!"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "PySpark",
"language": "python",
"name": "pyspark"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.2"
}
},
"nbformat": 4,
"nbformat_minor": 2
}