First, let's begin with our imports:
``` haskell
module Main where
import Control.Lens (to)
import Control.Monad (replicateM_, forever)
import Data.Random.Source.PureMT (newPureMT)
import Deli (Channel, Deli, JobTiming(..))
import qualified Deli
import Deli.Printer (printResults)
import System.Random
import qualified Deli.Random
import qualified Deli.Random
Simple Queues
Next, let's see what happens if we add more workers:
``` haskell
:: Int
-> Channel JobTiming
-> Deli JobTiming ()
:: Deli.HasJobTiming jobType
=> Int
-> Channel jobType
-> Deli jobType ()
variableWorkers num queue =
replicateM_ num $
Deli.fork $ forever $ do
always zero. We won't be able to beat this performance.
A more complex example
Now, let's say we have an pareto distribution, with some requests
generally being quick, and others generally taking much longer. Let's
compare two implementations, one simply with twenty workers, and another
with two separate queues, partitioned by request type (using a total
still of twenty workers).
Now let's create our two systems whose performance we want to compare.
``` haskell
:: Channel JobTiming
-> Deli JobTiming ()
twentyWorkers = variableWorkers 20
:: Channel JobTiming
-> Deli JobTiming ()
partitionedQueues jobChannel = do
-- We'll read work from the main queue, and then partition
-- it into either the slow or fast queue.
-- First, we create the two partitions, each with a buffer of 16.
-- Instead, we could pass in Nothing for an unbounded queue.
slowChannel <- Deli.newChannel (Just 16)
fastChannel <- Deli.newChannel (Just 16)
-- Each of our two workers will implement work stealing. The algorithm
-- is as follows. First, check if your primary queue has work, if so,
-- perform it. If not, check to see if the other queue has work, if so,
-- per form it. If not, wait until your primary queue does have work.
-- Spawn the slow workers
replicateM_ 4 $
Deli.fork $
forever $ do
mSlowJob <- Deli.readChannelNonblocking slowChannel
case mSlowJob of
Just job ->
Deli.runJob job
Nothing -> do
mFastJob <- Deli.readChannelNonblocking fastChannel
case mFastJob of
Nothing ->
Deli.readChannel slowChannel >>= Deli.runJob
Just fastJob ->
Deli.runJob fastJob
-- Spawn the fast workers
replicateM_ 16 $
Deli.fork $
forever $ do
mFastJob <- Deli.readChannelNonblocking fastChannel
case mFastJob of
Just job ->
Deli.runJob job
Nothing -> do
mSlowJob <- Deli.readChannelNonblocking slowChannel
case mSlowJob of
Nothing ->
Deli.readChannel fastChannel >>= Deli.runJob
Just slowJob ->
Deli.runJob slowJob
-- Loop forever, reading items, and putting them in the
-- appropriate queue
forever $ do
item <- Deli.readChannel jobChannel
-- If a job's duration is greater than 500 milliseconds,
-- put it into the slow queue.
-- In the real world, you'd likely have to predict the service
-- time based on the parameters of the request, and in practice,
-- that technique works remarkably well.
if _jobDuration item > 0.5
then Deli.writeChannel slowChannel item
else Deli.writeChannel fastChannel item
We've set up our two implementations, now let's generate some example
requests, and compare results.
Instead of using a cycled list for our input data, we'll make things a
bit more realistic, and use a poisson process for arrival times, and a
pareto distribution for service times.
``` haskell
paretoExample :: IO ()
paretoExample = do
simulationGen <- newStdGen
inputGen <- newPureMT
-- Generate a poisson process of arrivals, with a mean of 650 arrivals
-- per second
let arrivals = Deli.Random.arrivalTimePoissonDistribution 650
-- Generate a Pareto distribution of service times, with a mean service
-- time of 3 milliseconds (0.03 seconds) (alpha is set to 1.16 inside this
-- function)
serviceTimes = Deli.Random.durationParetoDistribution 0.03
jobs = take 200000 $ Deli.Random.distributionToJobs arrivals serviceTimes inputGen
twentyWorkersRes = Deli.simulate simulationGen jobs twentyWorkers
partitionedRes = Deli.simulate simulationGen jobs partitionedQueues
putStrLn "## Pareto example ##"
putStrLn "## twentyWorkers ##"
printResults twentyWorkersRes
putStrLn "## partitionedQueues ##"
printResults partitionedRes
where newline = putStrLn "\n"
Interestingly enough, our more complex implementation is able to beat
the simple twenty workers. Intuitively, this is because with a Pareto
distribution, the occasional really slow job causes head of line
blocking. By separating out into a slow and fast queue (with work
stealing), slow items will only block other slow items, and when there
are no slow items, all workers can be utilized (via work stealing) to
process fast jobs.
Note in particular how much better the work stealing algorithm does at
the 95th and 99th percentile of sojourn time.
``` haskell
main :: IO ()
main = do
main = do
where newline = putStrLn "\n"
That's currently it for this tutorial, but we'll be looking to expand it
in the future.
