зеркало из https://github.com/github/deli.git
This commit is contained in:
Родитель
0c0c6a0d82
Коммит
b9987f5ee4
|
@ -7,6 +7,8 @@ import Control.Monad (replicateM, forM_, forever)
|
|||
import Control.Monad.Loops (iterateM_)
|
||||
import Control.Monad.Random.Class (getRandomR)
|
||||
import Data.Coerce (coerce)
|
||||
import Data.List (elemIndex, foldl1')
|
||||
import Data.Maybe (fromMaybe)
|
||||
import Data.Random.Source.PureMT (newPureMT)
|
||||
import Deli (Channel, Deli, JobTiming(..))
|
||||
import Deli.Printer (printResults)
|
||||
|
@ -50,6 +52,39 @@ randomWorkers num jobChannel = do
|
|||
job <- Deli.readChannel jobChannel
|
||||
Deli.writeChannel workerQueue job
|
||||
|
||||
leastConn
|
||||
:: Int
|
||||
-> Channel JobTiming
|
||||
-> Deli JobTiming ()
|
||||
leastConn num jobChannel = do
|
||||
chans :: [Channel JobTiming] <- replicateM num createWorker
|
||||
forever $ do
|
||||
job <- Deli.readChannel jobChannel
|
||||
|
||||
conns <- mapM Deli.channelLength chans
|
||||
let minIndex = fromMaybe 0 $ elemIndex (foldl1' min conns) conns
|
||||
|
||||
Deli.writeChannel (chans !! minIndex) job
|
||||
|
||||
twoRandomChoices
|
||||
:: Int
|
||||
-> Channel JobTiming
|
||||
-> Deli JobTiming ()
|
||||
twoRandomChoices num jobChannel = do
|
||||
chans :: [Channel JobTiming] <- replicateM num createWorker
|
||||
forever $ do
|
||||
job <- Deli.readChannel jobChannel
|
||||
|
||||
randomWorkerIndexA <- getRandomR (0, length chans - 1)
|
||||
randomWorkerIndexB <- getRandomR (0, length chans - 1)
|
||||
|
||||
aLength <- Deli.channelLength (chans !! randomWorkerIndexA)
|
||||
bLength <- Deli.channelLength (chans !! randomWorkerIndexB)
|
||||
|
||||
if aLength < bLength
|
||||
then Deli.writeChannel (chans !! randomWorkerIndexA) job
|
||||
else Deli.writeChannel (chans !! randomWorkerIndexB) job
|
||||
|
||||
data PriorityChannel = PriorityChannel
|
||||
{ _pduration :: !Deli.Duration
|
||||
, _pchannel :: !(Deli.Channel JobTiming)
|
||||
|
@ -67,7 +102,7 @@ dispatch
|
|||
:: Deli.Channel JobTiming
|
||||
-> (PQueue.MinQueue PriorityChannel, Deli.Time)
|
||||
-> Deli JobTiming (PQueue.MinQueue PriorityChannel, Deli.Time)
|
||||
dispatch readChan !(queue, prevTime) = do
|
||||
dispatch readChan (queue, prevTime) = do
|
||||
job <- Deli.readChannel readChan
|
||||
newTime <- Deli.now
|
||||
|
||||
|
@ -104,13 +139,17 @@ loadBalancerExample = do
|
|||
inputGen <- newPureMT
|
||||
let arrivals = Deli.Random.arrivalTimePoissonDistribution 1500
|
||||
serviceTimes = Deli.Random.durationExponentialDistribution 0.025
|
||||
numTests = 1000 * 1000 * 10
|
||||
numTests = 1000 * 1000 * 1
|
||||
jobsA = take numTests $ Deli.Random.distributionToJobs arrivals serviceTimes inputGen
|
||||
jobsB = take numTests $ Deli.Random.distributionToJobs arrivals serviceTimes inputGen
|
||||
jobsC = take numTests $ Deli.Random.distributionToJobs arrivals serviceTimes inputGen
|
||||
jobsD = take numTests $ Deli.Random.distributionToJobs arrivals serviceTimes inputGen
|
||||
jobsE = take numTests $ Deli.Random.distributionToJobs arrivals serviceTimes inputGen
|
||||
roundRobinRes = Deli.simulate simulationGen jobsA (roundRobinWorkers 48)
|
||||
randomRes = Deli.simulate simulationGen jobsB (randomWorkers 48)
|
||||
leastWorkLeftRes = Deli.simulate simulationGen jobsC (leastWorkLeft 48)
|
||||
twoRandomChoicesRes = Deli.simulate simulationGen jobsD (twoRandomChoices 48)
|
||||
leastConnRes = Deli.simulate simulationGen jobsE (leastConn 48)
|
||||
|
||||
putStrLn "## Round Robin ##"
|
||||
printResults roundRobinRes
|
||||
|
@ -121,6 +160,12 @@ loadBalancerExample = do
|
|||
putStrLn "## LeastWorkLeft ##"
|
||||
printResults leastWorkLeftRes
|
||||
newline
|
||||
putStrLn "## TwoRandomChoices ##"
|
||||
printResults twoRandomChoicesRes
|
||||
newline
|
||||
putStrLn "## LeastConn ##"
|
||||
printResults leastConnRes
|
||||
newline
|
||||
|
||||
where newline = putStrLn "\n"
|
||||
|
||||
|
|
|
@ -14,9 +14,6 @@ extra-source-files: README.md
|
|||
cabal-version: >=1.10
|
||||
|
||||
library
|
||||
if flag(dump-core)
|
||||
build-depends: dump-core
|
||||
ghc-options: -fplugin=DumpCore -fplugin-opt DumpCore:core-html
|
||||
hs-source-dirs: src
|
||||
exposed-modules: Control.Monad.Concurrent
|
||||
, Deli
|
||||
|
|
|
@ -30,6 +30,7 @@ module Control.Monad.Concurrent
|
|||
, writeChannelNonblocking
|
||||
, readChannel
|
||||
, readChannelNonblocking
|
||||
, channelLength
|
||||
, runConcurrentT
|
||||
) where
|
||||
|
||||
|
@ -567,6 +568,21 @@ ireadChannelNonblocking chan = do
|
|||
IConcurrentT (resetT (runIConcurrentT' (local (const writerId) nextWriter)))
|
||||
return (Just val)
|
||||
|
||||
channelLength
|
||||
:: Monad m
|
||||
=> Channel chanState
|
||||
-> ConcurrentT chanState m Int
|
||||
channelLength = ConcurrentT . iChannelLength
|
||||
|
||||
iChannelLength
|
||||
:: Monad m
|
||||
=> Channel chanState
|
||||
-> IConcurrentT chanState m Int
|
||||
iChannelLength chan = do
|
||||
chanMap <- use channels
|
||||
let chanContents = fromMaybe Data.Sequence.empty $ chanMap ^? (ix chan . contents)
|
||||
return (Data.Sequence.length chanContents)
|
||||
|
||||
runConcurrentT
|
||||
:: Monad m
|
||||
=> ConcurrentT chanState m ()
|
||||
|
|
|
@ -32,6 +32,7 @@ module Deli
|
|||
, writeChannelNonblocking
|
||||
, readChannel
|
||||
, readChannelNonblocking
|
||||
, channelLength
|
||||
, runDeli
|
||||
, runJob
|
||||
, priority
|
||||
|
@ -170,6 +171,11 @@ readChannelNonblocking
|
|||
-> Deli chanState (Maybe chanState)
|
||||
readChannelNonblocking = Deli . Concurrent.readChannelNonblocking
|
||||
|
||||
channelLength
|
||||
:: Concurrent.Channel chanState
|
||||
-> Deli chanState Int
|
||||
channelLength = Deli . Concurrent.channelLength
|
||||
|
||||
------------------------------------------------------------------------------
|
||||
-- ## Time Conversion
|
||||
------------------------------------------------------------------------------
|
||||
|
|
Загрузка…
Ссылка в новой задаче