Add initial work to run spark job and print it out

Make spark work non-blocking
This commit is contained in:
Sarah Bird 2018-08-28 20:43:46 -05:00
Родитель 1949466aa2
Коммит f78d229104
5 изменённых файлов: 149 добавлений и 14 удалений

4
environment.yaml Normal file
Просмотреть файл

@ -0,0 +1,4 @@
name: overscripted_explorer
dependencies:
- pyspark
- bokeh

Просмотреть файл

@ -1,22 +1,35 @@
Steps to setup AWS Server
* RHEL 7
* m5d.large
Server dependencies
$ sudo yum update
$ sudo reboot
$ sudo yum install wget tmux vim git bzip2 java-1.8.0-openjdk
$ sudo yum update
$ sudo reboot
$ sudo yum install wget tmux vim git bzip2 java-1.8.0-openjdk
$ wget https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh
$ sh Miniconda3-latest-Linux-x86_64.sh
Get miniconda
$ wget https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh
$ sh Miniconda3-latest-Linux-x86_64.sh
Add github ssh keys to box (won't be necessary when repos public)
$ git clone git@github.com:mozilla/overscripted-explorer.git
$ git clone git@github.com:mozilla/overscripted-explorer.git
$ cd overscripted-explorer
$ tmux new -s bokeh
$ conda env create -f environment.yaml
$ source activate overscripted_explorer
Get data (instructions for AWS do it however)
$ pip install awscli
$ mkdir .aws
$ vim credentials
Set-up ephemeral drive and get data in it
$ lsblk # to see location of drive
$ sudo mkfs.ext4 -E nodiscard /dev/nvme0n1
$ sudo mkdir /mnt/Data
$ sudo mount -o discard /dev/nvme0n1 /mnt/Data
$ sudo chown ec2-user:ec2-user /mnt/Data
$ cd /mnt/Data
$ aws s3 cp s3://safe-ucosp-2017/safe_dataset/v1/clean.parquet . --recursive
Make sure environment variable `DATA_DIR` is set.
Run server in a tmux session (ahem - this isn't prod!)
$ cd overscripted-explorer
$ tmux new -s bokeh
$ conda env create -f environment.yaml
$ source activate overscripted_explorer

91
text_search/main.py Normal file
Просмотреть файл

@ -0,0 +1,91 @@
import os
from time import time
from bokeh.io import curdoc
from bokeh.layouts import layout, widgetbox
from bokeh.models import (
Button,
Div,
PreText,
Select,
Slider,
TextInput,
)
from jinja2 import Template
from pyspark import SparkContext, SQLContext
from pooler import unblock_with_finish
###
# Initial setup
###
DATA_DIR = os.environ.get('DATA_DIR')
APP_DIR = os.path.dirname(__file__)
DATA_FILE = os.path.join(DATA_DIR, 'clean.parquet')
sc = SparkContext("local", "Text Search")
spark = SQLContext(sc)
with open(os.path.join(APP_DIR, 'results.jinja'), 'r') as f:
results_template = Template(f.read())
###
# Setup bokeh objects
###
column_to_look_in = Select(
title="Column to look in",
options=["script_url", "location", "argument_0", "value_1000"],
value="script_url",
)
text_to_find = TextInput(title="Text to search for", value="google-analytics")
sample_frac = Slider(title="% of dataset to use", start=1, end=100, step=1, value=5)
apply_button = Button(label="Run")
save_button = Button(label="Save Results")
widgets = widgetbox(
column_to_look_in,
text_to_find,
sample_frac,
apply_button,
save_button,
width=300,
)
results_head = Div(text="<h2>Results</h2>")
results = PreText(text="", width=700)
spark_info = Div(text="spark info")
# Layout and add to doc
curdoc().add_root(layout([
[widgets, [results_head, results, spark_info]]
]))
###
# Setup callbacks
###
def periodic_task():
t = time()
spark_info.text = f'time: {t}'
def update_results():
result_text = results_template.render(
count=f'{count:,}',
)
results.text = "\n".join([results.text, result_text])
@unblock_with_finish(update_results)
def get_new_data():
global count
df = spark.read.parquet(DATA_FILE)
frac = sample_frac.value / 100
sample = df.sample(False, frac)
rows = sample.where(df[column_to_look_in.value].contains(text_to_find.value))
count = rows.count()
apply_button.on_click(get_new_data) # noqa
curdoc().add_periodic_callback(periodic_task, 1000)

25
text_search/pooler.py Normal file
Просмотреть файл

@ -0,0 +1,25 @@
""" Put processes in a seperate thread
This beauty is used to be able to decorate functions with pyspark work, so that
the work can be done in another thread and we can update the user on the pyspark progress.
"""
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from bokeh.io import curdoc
# It's important to define these out here
DOC = curdoc()
EXECUTOR = ThreadPoolExecutor(max_workers=4)
def unblock_with_finish(to_finish):
def unblock(to_pool):
def wrapper(*args, **kwargs):
EXECUTOR.submit(
partial(to_pool, *args, **kwargs)
).add_done_callback(
lambda x: DOC.add_next_tick_callback(to_finish)
)
return wrapper
return unblock

Просмотреть файл

@ -0,0 +1,2 @@
---------------------------------
Count: {{ count }}