This commit is contained in:
Reid Draper 2019-04-08 18:01:40 -05:00
Коммит ee49be9743
18 изменённых файлов: 1834 добавлений и 0 удалений

27
.gitignore поставляемый Normal file
Просмотреть файл

@ -0,0 +1,27 @@
dist
dist-*
cabal-dev
*.o
*.hi
*.chi
*.chs.h
*.dyn_o
*.dyn_hi
.hpc
.hsenv
.cabal-sandbox/
cabal.sandbox.config
*.prof
*.aux
*.hp
*.eventlog
.stack-work/
cabal.project.local
.HTF/
.ghc.environment.*
# profiling
*.svg
# datasets
*.csv

76
CODE_OF_CONDUCT.md Normal file
Просмотреть файл

@ -0,0 +1,76 @@
# Contributor Covenant Code of Conduct
## Our Pledge
In the interest of fostering an open and welcoming environment, we as
contributors and maintainers pledge to making participation in our project and
our community a harassment-free experience for everyone, regardless of age, body
size, disability, ethnicity, sex characteristics, gender identity and expression,
level of experience, education, socio-economic status, nationality, personal
appearance, race, religion, or sexual identity and orientation.
## Our Standards
Examples of behavior that contributes to creating a positive environment
include:
* Using welcoming and inclusive language
* Being respectful of differing viewpoints and experiences
* Gracefully accepting constructive criticism
* Focusing on what is best for the community
* Showing empathy towards other community members
Examples of unacceptable behavior by participants include:
* The use of sexualized language or imagery and unwelcome sexual attention or
advances
* Trolling, insulting/derogatory comments, and personal or political attacks
* Public or private harassment
* Publishing others' private information, such as a physical or electronic
address, without explicit permission
* Other conduct which could reasonably be considered inappropriate in a
professional setting
## Our Responsibilities
Project maintainers are responsible for clarifying the standards of acceptable
behavior and are expected to take appropriate and fair corrective action in
response to any instances of unacceptable behavior.
Project maintainers have the right and responsibility to remove, edit, or
reject comments, commits, code, wiki edits, issues, and other contributions
that are not aligned to this Code of Conduct, or to ban temporarily or
permanently any contributor for other behaviors that they deem inappropriate,
threatening, offensive, or harmful.
## Scope
This Code of Conduct applies both within project spaces and in public spaces
when an individual is representing the project or its community. Examples of
representing a project or community include using an official project e-mail
address, posting via an official social media account, or acting as an appointed
representative at an online or offline event. Representation of a project may be
further defined and clarified by project maintainers.
## Enforcement
Instances of abusive, harassing, or otherwise unacceptable behavior may be
reported by contacting the project team at opensource@github.com. All
complaints will be reviewed and investigated and will result in a response that
is deemed necessary and appropriate to the circumstances. The project team is
obligated to maintain confidentiality with regard to the reporter of an incident.
Further details of specific enforcement policies may be posted separately.
Project maintainers who do not follow or enforce the Code of Conduct in good
faith may face temporary or permanent repercussions as determined by other
members of the project's leadership.
## Attribution
This Code of Conduct is adapted from the [Contributor Covenant][homepage], version 1.4,
available at https://www.contributor-covenant.org/version/1/4/code-of-conduct.html
[homepage]: https://www.contributor-covenant.org
For answers to common questions about this code of conduct, see
https://www.contributor-covenant.org/faq

29
LICENSE Normal file
Просмотреть файл

@ -0,0 +1,29 @@
BSD 3-Clause License
Copyright (c) 2019, GitHub
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
3. Neither the name of the copyright holder nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

9
Makefile Normal file
Просмотреть файл

@ -0,0 +1,9 @@
.PHONY: all compile
all: compile docs/user-guide.md
compile:
@stack build
docs/user-guide.md: app/Tutorial.lhs
@./build-docs.sh

34
README.md Normal file
Просмотреть файл

@ -0,0 +1,34 @@
# deli
## What is Deli?
Deli is a performance modeling tool, allowing you to understand and experiment
with new designs at several orders of magnitude faster than wall-clock time.
Specifically, Deli is designed to help you understand how long it takes for
'jobs' or 'requests' to complete in your system. Deli borrows concepts and
terminology from [queueing
theory](https://en.wikipedia.org/wiki/Queueing_theory), and is implemented as a
Haskell DSL, exposing a [Go (language)](https://golang.org/) -like concurrency
and message-passing API.
Deli's documentation is divided into the following three sections, depending on
your interest.
## First time readers
If you're looking for an overview on Deli, and whether it may be appropriate
for your problem, head over to our [overview documentation](docs/overview.md).
## Using Deli
If you've decided you want to use Deli, or are already using it in a project,
then our [user guide](docs/user-guide.md) is a great resource.
## Contributing
If you'd like to contribute to Deli, start with our [contributing
documentation](docs/contributing.md).
## License
Deli is BSD3 licensed. More information is availabile in [LICENSE](LICENSE).

2
Setup.hs Normal file
Просмотреть файл

@ -0,0 +1,2 @@
import Distribution.Simple
main = defaultMain

287
app/Tutorial.lhs Normal file
Просмотреть файл

@ -0,0 +1,287 @@
Welcome to the Deli tutorial. Through a series of increasingly complex
examples, this tutorial will give you an idea of the power and usage for Deli.
This example is also a literate Haskell file, which means this document itself
compiles and is executable. You can run it yourself and see the output by
running:
```shell
$ stack build
$ stack run tutorial
```
First, let's begin with our imports:
\begin{code}
module Main where
import Control.Lens (to)
import Control.Monad (replicateM_, forever)
import Data.Random.Source.PureMT (newPureMT)
import Deli (Channel, Deli, JobTiming(..))
import Deli.Printer (printResults)
import System.Random
import qualified Deli
import qualified Deli.Random
\end{code}
Simple Queues
---
Next, let's create our first example, of a single queue and worker. Work will
be placed on the main queue, and our worker will read from it, and process each
item in serial:
\begin{code}
singleQueue
:: Channel JobTiming
-> Deli JobTiming ()
singleQueue queue =
forever $ do
job <- Deli.readChannel queue
Deli.runJob job
\end{code}
As you can see, describing a very simple system like this has little ceremony.
Next, let's set up the rest of the simulation, and run it.
\begin{code}
singleQueueExample :: IO ()
singleQueueExample = do
gen <- newStdGen
let durations = cycle [0.8, 0.9, 1.0, 1.1, 1.2]
times = [0,1..(100000-1)]
jobs = zipWith JobTiming times durations
res = Deli.simulate gen jobs singleQueue
printResults res
\end{code}
First we've created a new random number generator (the Deli type implements
`MonadRandom`, for convenient, reproducible random number generation). Next, we
create a dataset of our jobs, to be simulated. In this case, jobs will take one
of a set of durations (in seconds), with a mean of `1.0`. Then we set it up so
that jobs will be triggered from the outside world once each second.
Finally, we run the simulation, passing in our random number seed, set of jobs,
and our implemented system (`singleQueueExample`).
Running the simulation, we get two primary sets of statistics. We see the wait
time (how long did our jobs have to wait in line before processing begun), and
their sojourn time, which is the wait time plus the processing time.
In this case, we have a non-zero wait-time, which means we are sometimes at
capacity, and are queueing up work. This is also reflected in the fact that the
soujourn 50th percentile is greating (albeit slightly) than one-second.
You will see output similar to this:
```shell
Simulated wait (milliseconds):
simulated 99th: 294.99974998749934
simulated 95th: 274.9987499374968
simulated 75th: 181.24578114452865
simulated 50th: 87.4934373359334
Simulated sojourn (milliseconds):
simulated 99th: 1295.0000000000002
simulated 95th: 1275.0000000000002
simulated 75th: 1181.2495312382812
simulated 50th: 1087.497187429686
```
Next, let's see what happens if we add more workers:
\begin{code}
variableWorkers
:: Deli.HasJobTiming jobType
=> Int
-> Channel jobType
-> Deli jobType ()
variableWorkers num queue =
replicateM_ num $
Deli.fork $ forever $ do
job <- Deli.readChannel queue
Deli.runJob job
\end{code}
Here we've simply parameterized the number of workers. For each worker, we
spawn a thread (using the Deli DSL), and enter an infinite loop to read work
from the shared queue. This expands our exposure to the Deli API, as we've now
seen `fork`, `readChannel`, `runJob`, and `simulate`. Deli's core API exposes
familiar programming concepts to create queues, read and write to them, and
fork (lightweight) threads. This allows you to create a model of your system,
using similar constructs to the actual version. This is core to Deli.
\begin{code}
twoWorkerQueueExample :: IO ()
twoWorkerQueueExample = do
gen <- newStdGen
let durations = cycle [0.8, 0.9, 1.0, 1.1, 1.2]
times = [0,1..(100000-1)]
jobs = zipWith JobTiming times durations
res = Deli.simulate gen jobs (variableWorkers 2)
printResults res
\end{code}
Now we can run our same example, and pass in two workers. Running this, we see
that the system never reaches capacity, as the wait time is always zero. We
won't be able to beat this performance.
```
Simulated wait (milliseconds):
simulated 99th: 0.0
simulated 95th: 0.0
simulated 75th: 0.0
simulated 50th: 0.0
Simulated sojourn (milliseconds):
simulated 99th: 1197.5
simulated 95th: 1187.5
simulated 75th: 1125.0
simulated 50th: 1000.0
```
A more complex example
---
Now, let's say we have an pareto distribution, with some requests
generally being quick, and others generally taking much longer. Let's compare
two implementations, one simply with twenty workers, and another with two separate
queues, partitioned by request type (using a total still of twenty workers).
Now let's create our two systems whose performance we want to compare.
\begin{code}
twentyWorkers
:: Channel JobTiming
-> Deli JobTiming ()
twentyWorkers = variableWorkers 20
partitionedQueues
:: Channel JobTiming
-> Deli JobTiming ()
partitionedQueues jobChannel = do
-- We'll read work from the main queue, and then partition
-- it into either the slow or fast queue.
-- First, we create the two partitions, each with a buffer of 16.
-- Instead, we could pass in Nothing for an unbounded queue.
slowChannel <- Deli.newChannel (Just 16)
fastChannel <- Deli.newChannel (Just 16)
-- Each of our two workers will implement work stealing. The algorithm
-- is as follows. First, check if your primary queue has work, if so,
-- perform it. If not, check to see if the other queue has work, if so,
-- per form it. If not, wait until your primary queue does have work.
-- Spawn the slow workers
replicateM_ 4 $
Deli.fork $
forever $ do
mSlowJob <- Deli.readChannelNonblocking slowChannel
case mSlowJob of
Just job ->
Deli.runJob job
Nothing -> do
mFastJob <- Deli.readChannelNonblocking fastChannel
case mFastJob of
Nothing ->
Deli.readChannel slowChannel >>= Deli.runJob
Just fastJob ->
Deli.runJob fastJob
-- Spawn the fast workers
replicateM_ 16 $
Deli.fork $
forever $ do
mFastJob <- Deli.readChannelNonblocking fastChannel
case mFastJob of
Just job ->
Deli.runJob job
Nothing -> do
mSlowJob <- Deli.readChannelNonblocking slowChannel
case mSlowJob of
Nothing ->
Deli.readChannel fastChannel >>= Deli.runJob
Just slowJob ->
Deli.runJob slowJob
-- Loop forever, reading items, and putting them in the
-- appropriate queue
forever $ do
item <- Deli.readChannel jobChannel
-- If a job's duration is greater than 500 milliseconds,
-- put it into the slow queue.
-- In the real world, you'd likely have to predict the service
-- time based on the parameters of the request, and in practice,
-- that technique works remarkably well.
if _jobDuration item > 0.5
then Deli.writeChannel slowChannel item
else Deli.writeChannel fastChannel item
\end{code}
We've set up our two implementations, now let's generate some example requests,
and compare results.
Instead of using a cycled list for our input data, we'll make things a bit more
realistic, and use a poisson process for arrival times, and a pareto
distribution for service times.
\begin{code}
paretoExample :: IO ()
paretoExample = do
simulationGen <- newStdGen
inputGen <- newPureMT
-- Generate a poisson process of arrivals, with a mean of 650 arrivals
-- per second
let arrivals = Deli.Random.arrivalTimePoissonDistribution 650
-- Generate a Pareto distribution of service times, with a mean service
-- time of 3 milliseconds (0.03 seconds) (alpha is set to 1.16 inside this
-- function)
serviceTimes = Deli.Random.durationParetoDistribution 0.03
jobs = take 200000 $ Deli.Random.distributionToJobs arrivals serviceTimes inputGen
twentyWorkersRes = Deli.simulate simulationGen jobs twentyWorkers
partitionedRes = Deli.simulate simulationGen jobs partitionedQueues
putStrLn "## Pareto example ##"
putStrLn "## twentyWorkers ##"
printResults twentyWorkersRes
newline
putStrLn "## partitionedQueues ##"
printResults partitionedRes
newline
where newline = putStrLn "\n"
\end{code}
Interestingly enough, our more complex implementation is able to beat the
simple twenty workers. Intuitively, this is because with a Pareto distribution,
the occasional really slow job causes head of line blocking. By separating out
into a slow and fast queue (with work stealing), slow items will only block
other slow items, and when there are no slow items, all workers can be utilized
(via work stealing) to process fast jobs.
Note in particular how much better the work stealing algorithm does at the 95th
and 99th percentile of sojourn time.
\begin{code}
main :: IO ()
main = do
putStrLn "## singleQueueExample ##"
singleQueueExample
newline
putStrLn "## twoWorkerQueueExample ##"
twoWorkerQueueExample
newline
paretoExample
newline
where newline = putStrLn "\n"
\end{code}
That's currently it for this tutorial, but we'll be looking to expand it in the
future.

5
build-docs.sh Executable file
Просмотреть файл

@ -0,0 +1,5 @@
#!/usr/bin/env bash
docs="$(pandoc -f markdown+lhs -t markdown-fenced_code_attributes app/Tutorial.lhs)"
header="$(printf '%s\n%s\n%s' 'This file is generated, please edit [app/Tutorial.lhs](../app/Tutorial.lhs) instead.' '***' "$docs")"
printf "%s" "$header" > docs/user-guide.md

70
deli.cabal Normal file
Просмотреть файл

@ -0,0 +1,70 @@
name: deli
version: 0.1.0.0
-- synopsis:
-- description:
homepage: https://github.com/github/deli
license: BSD3
license-file: LICENSE
author: Reid Draper
maintainer: opensource+deli@github.com
copyright: 2019 GitHub
category: Simulation
build-type: Simple
extra-source-files: README.md
cabal-version: >=1.10
library
hs-source-dirs: src
exposed-modules: Control.Monad.Concurrent
, Deli
, Deli.Printer
, Deli.Random
build-depends: base >= 4.7 && < 5
, MonadRandom
, bytestring
, containers
, dlist
, lens
, mtl
, pqueue
, random
, random-fu
, random-source
, tdigest
, time
, transformers
default-language: Haskell2010
ghc-options: -Wall
executable tutorial
hs-source-dirs: app
main-is: Tutorial.lhs
ghc-options: -O1
build-depends: base
, bytestring
, containers
, deli
, mtl
, parallel
, lens
, monad-loops
, random
, random-fu
, random-source
, tdigest
, time
default-language: Haskell2010
ghc-options: -threaded -rtsopts -with-rtsopts=-N -O1
test-suite deli-test
type: exitcode-stdio-1.0
hs-source-dirs: test
main-is: Spec.hs
build-depends: base
, deli
ghc-options: -threaded -rtsopts -with-rtsopts=-N
default-language: Haskell2010
source-repository head
type: git
location: https://github.com/github/deli

38
docs/contributing.md Normal file
Просмотреть файл

@ -0,0 +1,38 @@
## Contributing
[fork]: https://github.com/github/deli/fork
[pr]: https://github.com/github/deli/compare
[code-of-conduct]: /CODE_OF_CONDUCT.md
Hi there! We're thrilled that you'd like to contribute to this project. Your
help is essential for keeping it great.
Contributions to this project are
[released](https://help.github.com/articles/github-terms-of-service/#6-contributions-under-repository-license)
to the public under the [BSD3](/LICENSE).
Please note that this project is released with a [Contributor Code of
Conduct][code-of-conduct]. By participating in this project you agree to abide
by its terms.
## Submitting a pull request
0. [Fork][fork] and clone the repository
0. Configure and install the dependencies: `stack build`
0. Make sure the tests pass on your machine: `stack test`
0. Create a new branch: `git checkout -b my-branch-name`
0. Make your change, add tests, and make sure the tests still pass
0. Push to your fork and [submit a pull request][pr]
0. Pat your self on the back and wait for your pull request to be reviewed and merged.
Here are a few things you can do that will increase the likelihood of your pull request being accepted:
- Write tests.
- Keep your change as focused as possible. If there are multiple changes you would like to make that are not dependent upon each other, consider submitting them as separate pull requests.
- Write a [good commit message](http://tbaggery.com/2008/04/19/a-note-about-git-commit-messages.html).
## Resources
- [How to Contribute to Open Source](https://opensource.guide/how-to-contribute/)
- [Using Pull Requests](https://help.github.com/articles/about-pull-requests/)
- [GitHub Help](https://help.github.com)

54
docs/overview.md Normal file
Просмотреть файл

@ -0,0 +1,54 @@
# Overview
Deli is a Haskell DSL which allows you to model a system, and then run
simulations to understand performance. Deli borrows from [queueing
theory](https://en.wikipedia.org/wiki/Queueing_theory), and allows us to model
and understand how work is performed with a limited set of resources. Deli can
be used to model everything from elevator scheduling, thread and disk
schedulers in operating systems, to how to design checkout lines at a grocery
store.
To model your system, Deli gives you a concurrency and message passing API
similar to the Go programming language, and allows you to model input into your
system either statistically, or from a production log (like a CSV file).
Once you have modeled your system and its inputs, you run Deli, which runs as
quickly as it can, simulating wall-clock time, and then returns with statistics
about how your system performed.
Before digging in further, let's start with why you *should not* use Deli.
## Why shouldn't you use Deli?
* It may take longer to learn to use Deli than to fix your problem another way
* Uses Deli requires a small but existent knowledge of the Haskell programming
language
* Using Deli doesn't obviate the need to understand basic statistics and the
distribution of your input data
* Deli currently is light on documentation, we hope that the community can assist here
## Why was this built?
Deli was built to explore improvements to GitHub's webhook infrastructure,
where the HTTP response time distribution can vary dramatically when a single
popular webhook target is unavailable.
The initial design goals were sketched out as follows:
* Easily test new algorithms
* Run several orders of magnitude faster than “real time”. Ideally constrained
only by CPU, not wall-clock whatsoever.
* Run deterministically (given the same RNG seed/state)
## How does it work?
Deli is a Haskell library and DSL, implemented as a [discrete event
simulation](https://en.wikipedia.org/wiki/Discrete_event_simulation). It
presents a
[CSP](https://en.wikipedia.org/wiki/Communicating_sequential_processes)
concurrency API which is used to model systems.
## What next?
If you'd like to start using Deli, head over to our [user
guide](user-guide.md).

160
docs/user-guide.md Normal file
Просмотреть файл

@ -0,0 +1,160 @@
This file is generated, please edit [app/Tutorial.lhs](../app/Tutorial.lhs) instead.
***
Welcome to the Deli tutorial. Through a series of increasingly complex
examples, this tutorial will give you an idea of the power and usage for
Deli.
This example is also a literate Haskell file, which means this document
itself compiles and is executable. You can run it yourself and see the
output by running:
``` shell
$ stack build
$ stack run tutorial
```
First, let's begin with our imports:
``` haskell
module Main where
import Control.Monad (replicateM_, forever)
import Deli (Channel, Deli, JobTiming(..))
import qualified Deli
import Deli.Printer (printResults)
import System.Random
```
Simple Queues
-------------
Next, let's create our first example, of a single queue and worker. Work
will be placed on the main queue, and our worker will read from it, and
process each item in serial:
``` haskell
singleQueue
:: Channel JobTiming
-> Deli JobTiming ()
singleQueue queue =
forever $ do
job <- Deli.readChannel queue
Deli.runJob job
```
As you can see, describing a very simple system like this has little
ceremony. Next, let's set up the rest of the simulation, and run it.
``` haskell
singleQueueExample :: IO ()
singleQueueExample = do
gen <- newStdGen
let durations = cycle [0.8, 0.9, 1.0, 1.1, 1.2]
times = [0,1..(100000-1)]
jobs = zipWith JobTiming times durations
res = Deli.simulate gen jobs singleQueue
printResults res
```
First we've created a new random number generator (the Deli type
implements `MonadRandom`, for convenient, reproducible random number
generation). Next, we create a dataset of our jobs, to be simulated. In
this case, jobs will take one of a set of durations (in seconds), with a
mean of `1.0`. Then we set it up so that jobs will be triggered from the
outside world once each second.
Finally, we run the simulation, passing in our random number seed, set
of jobs, and our implemented system (`singleQueueExample`).
Running the simulation, we get two primary sets of statistics. We see
the wait time (how long did our jobs have to wait in line before
processing begun), and their sojourn time, which is the wait time plus
the processing time.
In this case, we have a non-zero wait-time, which means we are sometimes
at capacity, and are queueing up work. This is also reflected in the
fact that the soujourn 50th percentile is greating (albeit slightly)
than one-second.
You will see output similar to this:
``` shell
Simulated wait (milliseconds):
simulated 99th: 294.99974998749934
simulated 95th: 274.9987499374968
simulated 75th: 181.24578114452865
simulated 50th: 87.4934373359334
Simulated sojourn (milliseconds):
simulated 99th: 1295.0000000000002
simulated 95th: 1275.0000000000002
simulated 75th: 1181.2495312382812
simulated 50th: 1087.497187429686
```
Next, let's see what happens if we add more workers:
``` haskell
variableWorkers
:: Int
-> Channel JobTiming
-> Deli JobTiming ()
variableWorkers num queue =
replicateM_ num $
Deli.fork $ forever $ do
job <- Deli.readChannel queue
Deli.runJob job
```
Here we've simply parameterized the number of workers. For each worker,
we spawn a thread (using the Deli DSL), and enter an infinite loop to
read work from the shared queue. This expands our exposure to the Deli
API, as we've now seen `fork`, `readChannel`, `runJob`, and `simulate`.
Deli's core API exposes familiar programming concepts to create queues,
read and write to them, and fork (lightweight) threads. This allows you
to create a model of your system, using similar constructs to the actual
version. This is core to Deli.
``` haskell
twoWorkerQueueExample :: IO ()
twoWorkerQueueExample = do
gen <- newStdGen
let durations = cycle [0.8, 0.9, 1.0, 1.1, 1.2]
times = [0,1..(100000-1)]
jobs = zipWith JobTiming times durations
res = Deli.simulate gen jobs (variableWorkers 2)
printResults res
```
Now we can run our same example, and pass in two workers. Running this,
we see that the system never reaches capacity, as the wait time is
always zero. We won't be able to beat this performance.
Simulated wait (milliseconds):
simulated 99th: 0.0
simulated 95th: 0.0
simulated 75th: 0.0
simulated 50th: 0.0
Simulated sojourn (milliseconds):
simulated 99th: 1197.5
simulated 95th: 1187.5
simulated 75th: 1125.0
simulated 50th: 1000.0
A more complex example
----------------------
``` haskell
main :: IO ()
main = do
putStrLn "## singleQueueExample ##"
singleQueueExample
newline
putStrLn "## twoWorkerQueueExample ##"
twoWorkerQueueExample
newline
where newline = putStrLn "\n"
```

Просмотреть файл

@ -0,0 +1,583 @@
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE UndecidableInstances #-}
module Control.Monad.Concurrent
( Channel
, Time(..)
, Duration(..)
, ThreadId
, ConcurrentT
, addDuration
, microsecond
, millisecond
, millisecondsToDuration
, millisecondsToTime
, microsecondsToDuration
, microsecondsToTime
, subtractTime
, fork
, threadId
, sleep
, yield
, lazySchedule
, now
, newChannel
, writeChannel
, writeChannelNonblocking
, readChannel
, readChannelNonblocking
, runConcurrentT
) where
import Control.Lens (at, ix, makeLenses, to, use, (^?), (.=), (+=), (%=), (?~))
import Control.Monad.State.Strict
import Control.Monad.Reader (MonadReader, ReaderT, ask, local, runReaderT)
import Control.Monad.Trans.Cont (ContT, evalContT, resetT, shiftT)
import Data.Map.Strict
import Data.Maybe
import Data.PQueue.Min as PQueue
import Data.Sequence
import Data.Time.Clock (DiffTime, picosecondsToDiffTime)
data Queue a = Queue
{ _writeEnd :: [a]
, _readEnd :: [a]
}
emptyQueue :: Queue a
emptyQueue = Queue [] []
readQueue
:: Queue a
-> Maybe (a, Queue a)
readQueue (Queue writeEnd readEnd) =
case readEnd of
(h:tl) ->
let newQueue = Queue writeEnd tl
in Just (h, newQueue)
[] ->
if Prelude.null writeEnd
then Nothing
else readQueue (Queue [] (Prelude.reverse writeEnd))
writeQueue
:: Queue a
-> a
-> Queue a
writeQueue (Queue writeEnd readEnd) val =
Queue (val:writeEnd) readEnd
-- ** delimited continuations **
-- shift = escape
-- reset = capture
data Channel a = Channel
{ _chanId :: !Integer
, _chanSize :: !(Maybe Int)
}
deriving (Eq, Ord, Show)
data ChanAndWaiters chanState m = ChanAndWaiters
{ _contents :: !(Seq chanState)
, _readers :: Queue (ThreadId, IConcurrentT chanState m ())
, _writers :: Queue (ThreadId, IConcurrentT chanState m ())
}
newtype Time = Time DiffTime
deriving (Show, Eq, Ord, Num, Fractional, Enum)
newtype Duration = Duration DiffTime
deriving (Show, Eq, Ord, Num, Fractional, Real, Enum)
newtype ThreadId = ThreadId Integer
deriving (Show, Eq, Ord)
addDuration
:: Time
-> Duration
-> Time
addDuration (Time t) (Duration d) =
Time (t + d)
microsecond :: Duration
microsecond = Duration (picosecondsToDiffTime 1000000)
millisecond :: Duration
millisecond = microsecond * 1000
millisecondsToTime
:: Integer
-> Time
millisecondsToTime millis =
Time $ picosecondsToDiffTime (1000 * 1000 * 1000 * millis)
millisecondsToDuration
:: Integer
-> Duration
millisecondsToDuration millis =
Duration $ picosecondsToDiffTime (1000 * 1000 * 1000 * millis)
microsecondsToTime
:: Integer
-> Time
microsecondsToTime micros =
Time $ picosecondsToDiffTime (1000 * 1000 * micros)
microsecondsToDuration
:: Integer
-> Duration
microsecondsToDuration micros =
Duration $ picosecondsToDiffTime (1000 * 1000 * micros)
subtractTime
:: Time
-> Time
-> Duration
subtractTime (Time end) (Time start) =
Duration (end - start)
data PriorityCoroutine chanState m = PriorityCoroutine
{ _routine :: IConcurrentT chanState m ()
, _pId :: !ThreadId
, _priority :: !Time
}
instance Eq (PriorityCoroutine chanState m)
where (==) a b = (_priority a, _pId a) == (_priority b, _pId b)
instance Ord (PriorityCoroutine chanState m)
-- NOTE: should this incorporate the threadId?
where compare a b = compare (_priority a) (_priority b)
type CoroutineQueue chanState m = MinQueue (PriorityCoroutine chanState m)
data ConcurrentState chanState m = ConcurrentState
{ _coroutines :: !(CoroutineQueue chanState m)
, _scheduledRoutines :: [(Time, IConcurrentT chanState m ())]
, _nextThreadIdent :: !ThreadId
, _channels :: !(Map (Channel chanState) (ChanAndWaiters chanState m))
, _nextChannelIdent :: !Integer
, _nowTime :: !Time
}
newtype IConcurrentT chanState m a =
IConcurrentT
{ runIConcurrentT' :: ContT () (ReaderT ThreadId (StateT (ConcurrentState chanState m) m)) a
} deriving (Functor, Monad, MonadIO, MonadReader ThreadId, MonadState (ConcurrentState chanState m))
instance Applicative (IConcurrentT chanState m) where
pure = IConcurrentT . pure
(IConcurrentT a) <*> (IConcurrentT b) = IConcurrentT (a <*> b)
(IConcurrentT a) *> (IConcurrentT b) = IConcurrentT $ a >>= const b
instance MonadTrans (IConcurrentT chanState) where
lift = IConcurrentT . lift . lift . lift
newtype ConcurrentT chanState m a =
ConcurrentT
{ runConcurrentT' :: IConcurrentT chanState m a
} deriving (Functor, Applicative, Monad, MonadIO)
instance MonadState s m => MonadState s (ConcurrentT chanState m) where
get = lift get
put = lift . put
state = lift . state
instance MonadTrans (ConcurrentT chanState) where
lift = ConcurrentT . IConcurrentT . lift . lift . lift
-- For some reason I had to put these together underneath the definition
-- of `IConcurrentT'
makeLenses ''ConcurrentState
makeLenses ''ChanAndWaiters
freshState
:: ConcurrentState chanState m
freshState = ConcurrentState
{ _coroutines = PQueue.empty
, _scheduledRoutines = []
-- 1 because we `runReaderT' with 0 for the main thread,
-- which is already created
, _nextThreadIdent = ThreadId 1
, _channels = Data.Map.Strict.empty
, _nextChannelIdent = 0
, _nowTime = 0
}
register
:: Monad m
=> (IConcurrentT chanState m () -> IConcurrentT chanState m ())
-> IConcurrentT chanState m ()
register callback =
IConcurrentT $ shiftT $ \k -> do
let routine = IConcurrentT (lift (k ()))
runIConcurrentT' (callback routine)
getCCs
:: Monad m
=> IConcurrentT chanState m (CoroutineQueue chanState m)
getCCs = use coroutines
putCCs
:: Monad m
=> CoroutineQueue chanState m
-> IConcurrentT chanState m ()
putCCs queue =
coroutines .= queue
updateNow
:: Monad m
=> Time
-> IConcurrentT chanState m ()
updateNow time =
nowTime .= time
dequeue
:: Monad m
=> IConcurrentT chanState m ()
dequeue = do
queue <- getCCs
scheduled <- use scheduledRoutines
let mMin = PQueue.minView queue
case (mMin, scheduled) of
(Nothing, []) ->
return ()
(Just (PriorityCoroutine nextCoroutine pId priority, modifiedQueue), []) -> do
putCCs modifiedQueue
updateNow priority
IConcurrentT (resetT (runIConcurrentT' (local (const pId) nextCoroutine)))
dequeue
(Nothing, (priority, nextCoroutine): tl) -> do
scheduledRoutines .= tl
updateNow priority
IConcurrentT (resetT (runIConcurrentT' nextCoroutine))
dequeue
(Just (PriorityCoroutine nextCoroutineQ pId priorityQ, modifiedQueue), (priorityL, nextCoroutineL): tl) ->
if priorityL <= priorityQ
then do
scheduledRoutines .= tl
updateNow priorityL
IConcurrentT (resetT (runIConcurrentT' (local (const pId) nextCoroutineL)))
dequeue
else do
putCCs modifiedQueue
updateNow priorityQ
IConcurrentT (resetT (runIConcurrentT' nextCoroutineQ))
dequeue
ischeduleDuration
:: Monad m
=> Duration
-> ThreadId
-> IConcurrentT chanState m ()
-> IConcurrentT chanState m ()
ischeduleDuration duration pId routine = do
currentNow <- inow
ischedule (addDuration currentNow duration) pId routine
sleep
:: Monad m
=> Duration
-> ConcurrentT chanState m ()
sleep = ConcurrentT . isleep
isleep
:: Monad m
=> Duration
-> IConcurrentT chanState m ()
isleep duration = do
myId <- ithreadId
register (ischeduleDuration duration myId)
yield
:: Monad m
=> ConcurrentT chanState m ()
yield = ConcurrentT iyield
iyield
:: Monad m
=> IConcurrentT chanState m ()
iyield =
-- rather than implementing a separate queue/seq for yield'ers, we actually
-- do want to advance our clock as we yield, simulating CPU cycles
isleep microsecond
lazySchedule
:: Monad m
=> [(Time, ConcurrentT chanState m ())]
-> ConcurrentT chanState m ()
lazySchedule scheduled =
ConcurrentT (ilazySchedule [(time, runConcurrentT' t) | (time, t) <- scheduled])
ilazySchedule
:: Monad m
=> [(Time, IConcurrentT chanState m ())]
-> IConcurrentT chanState m ()
ilazySchedule scheduled =
scheduledRoutines .= scheduled
ischedule
:: Monad m
=> Time
-> ThreadId
-> IConcurrentT chanState m ()
-> IConcurrentT chanState m ()
ischedule time pId routine = do
currentRoutines <- getCCs
currentNow <- inow
-- to prevent time from moving backward by scheduling something in the
-- past, we schedule it to the `max' of the current time, or the schedule
-- time. Effectively this immediately schedules the process if it were
-- to otherwise have been scheduled for the past.
let scheduleTime = max time currentNow
newRoutines = insertBehind (PriorityCoroutine routine pId scheduleTime) currentRoutines
putCCs newRoutines
now
:: Monad m
=> ConcurrentT chanState m Time
now = ConcurrentT inow
inow
:: Monad m
=> IConcurrentT chanState m Time
inow = use nowTime
fork
:: Monad m
=> ConcurrentT chanState m ()
-> ConcurrentT chanState m ()
fork (ConcurrentT f) =
ConcurrentT (ifork f)
ifork
:: Monad m
=> IConcurrentT chanState m ()
-> IConcurrentT chanState m ()
ifork routine = do
tId@(ThreadId i) <- use nextThreadIdent
nextThreadIdent .= (ThreadId (i + 1))
ischeduleDuration 0 tId routine
myId <- ithreadId
register (ischeduleDuration 0 myId)
threadId
:: Monad m
=> ConcurrentT chanState m ThreadId
threadId = ConcurrentT ithreadId
ithreadId
:: Monad m
=> IConcurrentT chanState m ThreadId
ithreadId = ask
newChannel
:: Monad m
=> Maybe Int
-> ConcurrentT chanState m (Channel chanState)
newChannel = ConcurrentT . inewChannel
inewChannel
:: Monad m
=> Maybe Int
-> IConcurrentT chanState m (Channel chanState)
inewChannel mChanSize = do
-- grab the next channel identifier and then
-- immediately increment it for the next use
chanIdent <- use nextChannelIdent
nextChannelIdent += 1
let chan = Channel chanIdent mChanSize
emptySeq = Data.Sequence.empty
chanAndWaiters = ChanAndWaiters emptySeq emptyQueue emptyQueue
channels %= (at chan ?~ chanAndWaiters)
return chan
writeChannel
:: Monad m
=> Channel chanState
-> chanState
-> ConcurrentT chanState m ()
writeChannel chan item =
ConcurrentT (iwriteChannel chan item)
iwriteChannel
:: Monad m
=> Channel chanState
-> chanState
-> IConcurrentT chanState m ()
iwriteChannel chan@(Channel _ident mMaxSize) item = do
chanMap <- use channels
let chanContents = chanMap ^? (ix chan . contents)
chanCurrentSize = maybe 0 Data.Sequence.length chanContents
myId <- ithreadId
-- when there's already an element, we block and wait our turn to write
-- once the queue is empty/writable
case mMaxSize of
Just maxSize | chanCurrentSize >= maxSize ->
register $ \routine ->
channels . ix chan . writers %= flip writeQueue (myId, routine)
_ ->
return ()
-- now we've waited, if needed
-- write the value, and then notify any readers
-- our state may have changed, so get it again
-- write the value to the queue
channels . ix chan . contents %= (|> item)
chanMap2 <- use channels
let readerView = join $ (readQueue . _readers) <$> Data.Map.Strict.lookup chan chanMap2
case readerView of
-- there are no readers
Nothing ->
return ()
-- there is a reader, call the reader
Just ((readerId, nextReader), newReaders) -> do
channels . ix chan . readers .= newReaders
local (const readerId) nextReader
writeChannelNonblocking
:: Monad m
=> Channel chanState
-> chanState
-> ConcurrentT chanState m (Maybe chanState)
writeChannelNonblocking chan item =
ConcurrentT (iwriteChannelNonblocking chan item)
iwriteChannelNonblocking
:: Monad m
=> Channel chanState
-> chanState
-> IConcurrentT chanState m (Maybe chanState)
iwriteChannelNonblocking chan@(Channel _ident mMaxSize) item = do
chanMap <- use channels
myId <- ithreadId
let chanContents = chanMap ^? (ix chan . contents)
chanCurrentSize = maybe 0 Data.Sequence.length chanContents
-- when there's already an element, we block and wait our turn to write
-- once the queue is empty/writable
case mMaxSize of
Just maxSize | chanCurrentSize >= maxSize ->
return Nothing
_ -> do
-- write the value to the queue
channels . ix chan . contents %= (|> item)
chanMap2 <- use channels
let readerView = join $ (readQueue . _readers) <$> Data.Map.Strict.lookup chan chanMap2
case readerView of
-- there are no readers
Nothing ->
return (Just item)
-- there is a reader, call the reader
Just ((readerId, nextReader), newReaders) -> do
channels . ix chan . readers .= newReaders
--local (const readerId) nextReader
ischeduleDuration 0 readerId nextReader
register (ischeduleDuration 0 myId)
return (Just item)
readChannel
:: Monad m
=> Channel chanState
-> ConcurrentT chanState m chanState
readChannel = ConcurrentT . ireadChannel
ireadChannel
:: Monad m
=> Channel chanState
-> IConcurrentT chanState m chanState
ireadChannel chan = do
chanMap <- use channels
let mChanContents = fromMaybe EmptyL $ chanMap ^? (ix chan . contents . to viewl)
myId <- ithreadId
case mChanContents of
EmptyL -> do
-- nothing to read, so we add ourselves to the queue
register $ \routine ->
channels . ix chan . readers %= flip writeQueue (myId, routine)
-- we can actually just recur here to read the value, since now
-- that we're running again, the queue will have a value for us to
-- read
ireadChannel chan
val :< newSeq -> do
-- write the new seq
channels . ix chan . contents .= newSeq
-- see if there are any writers
chanMap2 <- use channels
let writerView = join $ (readQueue . _writers) <$> Data.Map.Strict.lookup chan chanMap2
case writerView of
Nothing ->
return val
Just ((writerId, nextWriter), newWriters) -> do
channels . ix chan . writers .= newWriters
local (const writerId) nextWriter
return val
readChannelNonblocking
:: Monad m
=> Channel chanState
-> ConcurrentT chanState m (Maybe chanState)
readChannelNonblocking = ConcurrentT . ireadChannelNonblocking
ireadChannelNonblocking
:: Monad m
=> Channel chanState
-> IConcurrentT chanState m (Maybe chanState)
ireadChannelNonblocking chan = do
chanMap <- use channels
let mChanContents = fromMaybe EmptyL $ chanMap ^? (ix chan . contents . to viewl)
case mChanContents of
EmptyL -> return Nothing
val :< newSeq -> do
-- write the new seq
channels . ix chan . contents .= newSeq
-- see if there are any writers
chanMap2 <- use channels
let writerView = join $ (readQueue . _writers) <$> Data.Map.Strict.lookup chan chanMap2
case writerView of
Nothing ->
return (Just val)
Just ((writerId, nextWriter), newWriters) -> do
channels . ix chan . writers .= newWriters
local (const writerId) nextWriter
return (Just val)
runConcurrentT
:: Monad m
=> ConcurrentT chanState m ()
-> m ()
runConcurrentT (ConcurrentT routine) =
runIConcurrentT routine
runIConcurrentT
:: Monad m
=> IConcurrentT chanState m ()
-> m ()
runIConcurrentT routine =
let resetAction = do
resetT (runIConcurrentT' routine)
runIConcurrentT' dequeue
in
void $ flip evalStateT freshState $ flip runReaderT (ThreadId 0) $ evalContT resetAction

274
src/Deli.hs Normal file
Просмотреть файл

@ -0,0 +1,274 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveFunctor #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE TemplateHaskell #-}
module Deli
( Deli
, HasJobTiming(..)
, JobTiming(..)
, TimesliceStats(..)
, DeliState(..)
-- re-exported from Control.Monad.Concurrent
, Concurrent.Time(..)
, Concurrent.Duration(..)
, Concurrent.Channel
, Concurrent.ThreadId
, Concurrent.addDuration
, Concurrent.microsecond
, Concurrent.millisecond
, Concurrent.millisecondsToDuration
, Concurrent.millisecondsToTime
, Concurrent.microsecondsToDuration
, Concurrent.microsecondsToTime
, Concurrent.subtractTime
, fork
, threadId
, sleep
, now
, newChannel
, writeChannel
, writeChannelNonblocking
, readChannel
, readChannelNonblocking
, runDeli
, runJob
, priority
, simulate
) where
import Control.Lens (Getter, makeLenses, to, use, (%~), (+~), (.~), (^.))
import Control.Monad.Random.Strict
import Control.Monad.State.Strict (State, execState, modify')
import Data.Function ((&))
import Data.Map.Strict
import Data.Maybe (fromJust)
import Data.TDigest (TDigest, tdigest, quantile)
import Data.Time
import System.Random (StdGen)
import qualified Control.Monad.Concurrent as Concurrent
import qualified Data.TDigest as TDigest
data JobTiming = JobTiming
{ _jobStart :: !Concurrent.Time
, _jobDuration :: !Concurrent.Duration
} deriving (Show, Eq, Ord)
class HasJobTiming a where
jobTiming :: Getter a JobTiming
instance HasJobTiming JobTiming where
jobTiming = to id
data FinishedJob = FinishedJob
{ _jobFinishTime :: Concurrent.Time
, _jobWait :: Concurrent.Duration
} deriving (Show, Eq, Ord)
data TimesliceStats = TimesliceStats
{
-- inclusive
_sliceStart :: Concurrent.Time
, _response50 :: Concurrent.Duration
, _response99 :: Concurrent.Duration
} deriving (Show)
data DeliState = DeliState
{ _sojournStatistics :: !(TDigest 10)
, _perfectStatistics :: !(TDigest 10)
, _waitStatistics :: !(TDigest 10)
, _temporalStats :: !(Map Concurrent.Time TimesliceStats)
, _currentMinute :: !Concurrent.Time
, _currentDigest :: !(TDigest 10)
, _numProcessed :: !Integer
} deriving (Show)
makeLenses ''DeliState
freshState :: DeliState
freshState =
DeliState
{ _sojournStatistics = emptyDigest
, _perfectStatistics = emptyDigest
, _waitStatistics = emptyDigest
, _temporalStats = Data.Map.Strict.empty
, _currentMinute = 0
, _currentDigest = emptyDigest
, _numProcessed = 0
}
where emptyDigest = tdigest []
newtype Deli chanState a =
Deli
{ _getDeli :: Concurrent.ConcurrentT chanState (RandT StdGen (State DeliState)) a
} deriving (Functor, Applicative, Monad)
instance MonadRandom (Deli chanState) where
getRandomR range = getRandomR range >>= Deli . pure
getRandom = getRandom >>= Deli . pure
getRandomRs range = getRandomRs range >>= Deli . pure
getRandoms = getRandoms >>= Deli . pure
------------------------------------------------------------------------------
-- ## Wrappers around the Control.Monad.Concurrent API
------------------------------------------------------------------------------
fork
:: Deli chanState ()
-> Deli chanState ()
fork (Deli conc) =
Deli $ Concurrent.fork conc
threadId
:: Deli chanState Concurrent.ThreadId
threadId =
Deli Concurrent.threadId
sleep
:: Concurrent.Duration
-> Deli chanState ()
sleep = Deli . Concurrent.sleep
now
:: Deli chanState Concurrent.Time
now = Deli Concurrent.now
newChannel
:: Maybe Int
-> Deli chanState (Concurrent.Channel chanState)
newChannel = Deli . Concurrent.newChannel
writeChannel
:: Concurrent.Channel chanState
-> chanState
-> Deli chanState ()
writeChannel chan item =
Deli (Concurrent.writeChannel chan item)
writeChannelNonblocking
:: Concurrent.Channel chanState
-> chanState
-> Deli chanState (Maybe chanState)
writeChannelNonblocking chan item =
Deli (Concurrent.writeChannelNonblocking chan item)
readChannel
:: Concurrent.Channel chanState
-> Deli chanState chanState
readChannel = Deli . Concurrent.readChannel
readChannelNonblocking
:: Concurrent.Channel chanState
-> Deli chanState (Maybe chanState)
readChannelNonblocking = Deli . Concurrent.readChannelNonblocking
------------------------------------------------------------------------------
-- ## Time Conversion
------------------------------------------------------------------------------
-- Round down a `Concurrent.Time' to the nearest minute
clampMinutes
:: Concurrent.Time
-> Concurrent.Time
clampMinutes (Concurrent.Time t) =
let picosPerMinute = 60000000000000
inPicos = diffTimeToPicoseconds t
toMinute = inPicos `quot` picosPerMinute
in Concurrent.Time (picosecondsToDiffTime (toMinute * picosPerMinute))
doubleToDuration :: Double -> Concurrent.Duration
doubleToDuration = fromRational . toRational
------------------------------------------------------------------------------
-- ## Simulation
------------------------------------------------------------------------------
runDeli
:: StdGen
-> Deli chanState ()
-> DeliState
runDeli gen (Deli conc) =
let !randomAction = Concurrent.runConcurrentT conc
!writerAction = evalRandT randomAction gen
!res = execState writerAction freshState
in res
runJob
:: HasJobTiming j
=> j
-> Deli chanState ()
runJob j = do
let (JobTiming start duration) = j ^. jobTiming
beforeJob <- Deli Concurrent.now
Deli (Concurrent.sleep duration)
nowTime <- Deli Concurrent.now
let !sojourn = Concurrent.subtractTime nowTime start
!waitTime = Concurrent.subtractTime beforeJob start
modifier s = s & numProcessed +~ 1
& sojournStatistics %~ TDigest.insert (realToFrac sojourn)
& waitStatistics %~ TDigest.insert (realToFrac waitTime)
& perfectStatistics %~ TDigest.insert (realToFrac duration)
Deli $ modify' modifier
updateTemporalStats (FinishedJob nowTime waitTime)
priority
:: HasJobTiming j
=> Concurrent.Time
-> j
-> Concurrent.Duration
priority time j =
let (JobTiming start duration) = j ^. jobTiming
numerator = Concurrent.subtractTime time start + duration
denominator = duration
in
if denominator == 0
then 0
else numerator / denominator
updateTemporalStats
:: FinishedJob
-> Deli chanState ()
updateTemporalStats (FinishedJob endTime sojourn) = do
let clampedEnd = clampMinutes endTime
currentSlice <- Deli $ use currentMinute
if currentSlice == clampedEnd
then do
let modifier s =
s & currentDigest %~ TDigest.insert (realToFrac sojourn)
Deli $ modify' modifier
else do
let modifier s =
s & currentMinute .~ clampedEnd
& currentDigest .~ TDigest.singleton (realToFrac sojourn)
& temporalStats %~ Data.Map.Strict.insert currentSlice (digestToTimeSlice currentSlice (s ^. currentDigest))
Deli $ modify' modifier
digestToTimeSlice
:: Concurrent.Time
-> TDigest compression
-> TimesliceStats
digestToTimeSlice minute stats =
TimesliceStats
{ _sliceStart = minute
, _response50 = doubleToDuration (fromJust (quantile 0.5 stats))
, _response99 = doubleToDuration (fromJust (quantile 0.99 stats))
}
simulate
:: HasJobTiming j
=> StdGen
-> [j]
-> (Concurrent.Channel j -> Deli j ())
-> DeliState
simulate gen jobs process =
runDeli gen $ do
mainChan <- Deli (Concurrent.newChannel Nothing)
let insertQueue = Concurrent.writeChannel mainChan
scheduled = [(_jobStart (job ^. jobTiming), insertQueue job) | job <- jobs]
Deli (Concurrent.lazySchedule scheduled)
process mainChan

43
src/Deli/Printer.hs Normal file
Просмотреть файл

@ -0,0 +1,43 @@
module Deli.Printer
( printResults
) where
import Data.Maybe (fromJust)
import Data.TDigest
import Deli
import Text.Printf (printf)
uQuantile
:: Double
-> TDigest comp
-> Double
uQuantile q digest =
1000 * fromJust (quantile q digest)
printTruncate :: String -> Double -> IO ()
printTruncate s d = do
printf s d
putStrLn ""
printResults :: DeliState -> IO ()
printResults res = do
putStrLn "Simulated wait (milliseconds):"
printTruncate "simulated 99th: %.2f" (uQuantile 0.99 (_waitStatistics res))
printTruncate "simulated 95th: %.2f" (uQuantile 0.95 (_waitStatistics res))
printTruncate "simulated 75th: %.2f" (uQuantile 0.75 (_waitStatistics res))
printTruncate "simulated 50th: %.2f" (uQuantile 0.50 (_waitStatistics res))
putStrLn ""
putStrLn "Simulated sojourn (milliseconds):"
printTruncate "simulated 99th: %.2f" (uQuantile 0.99 (_sojournStatistics res))
printTruncate "simulated 95th: %.2f" (uQuantile 0.95 (_sojournStatistics res))
printTruncate "simulated 75th: %.2f" (uQuantile 0.75 (_sojournStatistics res))
printTruncate "simulated 50th: %.2f" (uQuantile 0.50 (_sojournStatistics res))
putStrLn ""
putStrLn "Overall processing:"
putStrLn $ "total number processed: " ++ show (_numProcessed res)

68
src/Deli/Random.hs Normal file
Просмотреть файл

@ -0,0 +1,68 @@
{-# LANGUAGE BangPatterns #-}
module Deli.Random
( distributionToJobs
, distributionToList
, arrivalTimePoissonDistribution
, durationExponentialDistribution
, durationParetoDistribution
) where
import Data.Random (RVar, sampleState)
import Data.Random.Distribution.Exponential (exponential)
import Data.Random.Distribution.Pareto (pareto)
import Data.Random.Source.PureMT (PureMT)
import Data.Time.Clock (diffTimeToPicoseconds)
import Deli (Time, Duration(..), JobTiming(..), microsecondsToDuration, microsecondsToTime)
distributionToJobs
:: RVar Time
-> RVar Duration
-> PureMT
-> [JobTiming]
distributionToJobs timing duration gen =
let jobTimingR = JobTiming <$> timing <*> duration
jobs = distributionToList jobTimingR gen
addTimings (JobTiming a _) (JobTiming b d) = JobTiming (a + b) d
in scanl1 addTimings jobs
distributionToList
:: RVar a
-> PureMT
-> [a]
distributionToList dist gen =
let (!val, newGen) = sampleState dist gen
in (val : distributionToList dist newGen )
arrivalTimePoissonDistribution
:: Double -- ^ Mean number of arrivals per second
-> RVar Time
arrivalTimePoissonDistribution rate =
let inverseRate = 1 / rate
expDist = exponential inverseRate
doubleToTime d = round (d * 1000 * 1000)
in microsecondsToTime . doubleToTime <$> expDist
durationExponentialDistribution
:: Duration -- ^ Mean service time
-> RVar Duration
durationExponentialDistribution (Duration diffTime) =
let picosDuration = diffTimeToPicoseconds diffTime
oneSecondInPicos = 1000 * 1000 * 1000 :: Double
expDist = exponential (fromIntegral picosDuration / oneSecondInPicos)
doubleToDuration d = round (d * 1000)
in microsecondsToDuration . doubleToDuration <$> expDist
-- |Create a Duration Pareto distribution from a mean service time. Note that
-- this hardcodes an alpha (α) of 1.16 (log4 5), which is used for the 80-20
-- "Pareto priciple" distribution.
durationParetoDistribution
:: Duration -- ^ Mean service time
-> RVar Duration
durationParetoDistribution (Duration diffTime) =
let picosDuration = diffTimeToPicoseconds diffTime
picosMeanToScale = fromIntegral picosDuration * (0.1379 :: Double)
oneSecondInPicos = 1000 * 1000 * 1000 :: Double
paretoDist = pareto (picosMeanToScale / oneSecondInPicos) 1.16
doubleToDuration d = round (d * 1000)
in microsecondsToDuration . doubleToDuration <$> paretoDist

73
stack.yaml Normal file
Просмотреть файл

@ -0,0 +1,73 @@
# This file was automatically generated by 'stack init'
#
# Some commonly used options have been documented as comments in this file.
# For advanced use and comprehensive documentation of the format, please see:
# https://docs.haskellstack.org/en/stable/yaml_configuration/
# Resolver to choose a 'specific' stackage snapshot or a compiler version.
# A snapshot resolver dictates the compiler version and the set of packages
# to be used for project dependencies. For example:
#
# resolver: lts-3.5
# resolver: nightly-2015-09-21
# resolver: ghc-7.10.2
# resolver: ghcjs-0.1.0_ghc-7.10.2
# resolver:
# name: custom-snapshot
# location: "./custom-snapshot.yaml"
resolver: nightly-2017-10-19
# User packages to be built.
# Various formats can be used as shown in the example below.
#
# packages:
# - some-directory
# - https://example.com/foo/bar/baz-0.0.2.tar.gz
# - location:
# git: https://github.com/commercialhaskell/stack.git
# commit: e7b331f14bcffb8367cd58fbfc8b40ec7642100a
# - location: https://github.com/commercialhaskell/stack/commit/e7b331f14bcffb8367cd58fbfc8b40ec7642100a
# extra-dep: true
# subdirs:
# - auto-update
# - wai
#
# A package marked 'extra-dep: true' will only be built if demanded by a
# non-dependency (i.e. a user package), and its test suites and benchmarks
# will not be run. This is useful for tweaking upstream packages.
packages:
- .
# Dependency packages to be pulled from upstream that are not in the resolver
# (e.g., acme-missiles-0.3)
extra-deps:
- transformers-0.5.5.0
- random-fu-0.2.7.0
- rvar-0.2.0.3
- MonadPrompt-1.0.0.5
- log-domain-0.12
- random-source-0.3.0.6
- flexible-defaults-0.0.1.2
# Override default flag values for local packages and extra-deps
flags: {}
# Extra package databases containing global packages
extra-package-dbs: []
# Control whether we use the GHC we find on the path
# system-ghc: true
#
# Require a specific version of stack, using version ranges
# require-stack-version: -any # Default
# require-stack-version: ">=1.5"
#
# Override the architecture used by stack, especially useful on Windows
# arch: i386
# arch: x86_64
#
# Extra directories used by stack for building
# extra-include-dirs: [/path/to/dir]
# extra-lib-dirs: [/path/to/dir]
#
# Allow a newer minor version of GHC than the snapshot specifies
# compiler-check: newer-minor

2
test/Spec.hs Normal file
Просмотреть файл

@ -0,0 +1,2 @@
main :: IO ()
main = putStrLn "Test suite not yet implemented"