From b9987f5ee4288ff42212ac162bb6b2e0a88ba2ca Mon Sep 17 00:00:00 2001 From: Reid Draper Date: Tue, 1 Dec 2020 14:03:46 -0600 Subject: [PATCH] WIP --- app/LoadBalancer.hs | 49 +++++++++++++++++++++++++++++++-- deli.cabal | 3 -- src/Control/Monad/Concurrent.hs | 16 +++++++++++ src/Deli.hs | 6 ++++ 4 files changed, 69 insertions(+), 5 deletions(-) diff --git a/app/LoadBalancer.hs b/app/LoadBalancer.hs index 8e9cb91..1e8b5c4 100644 --- a/app/LoadBalancer.hs +++ b/app/LoadBalancer.hs @@ -7,6 +7,8 @@ import Control.Monad (replicateM, forM_, forever) import Control.Monad.Loops (iterateM_) import Control.Monad.Random.Class (getRandomR) import Data.Coerce (coerce) +import Data.List (elemIndex, foldl1') +import Data.Maybe (fromMaybe) import Data.Random.Source.PureMT (newPureMT) import Deli (Channel, Deli, JobTiming(..)) import Deli.Printer (printResults) @@ -50,6 +52,39 @@ randomWorkers num jobChannel = do job <- Deli.readChannel jobChannel Deli.writeChannel workerQueue job +leastConn + :: Int + -> Channel JobTiming + -> Deli JobTiming () +leastConn num jobChannel = do + chans :: [Channel JobTiming] <- replicateM num createWorker + forever $ do + job <- Deli.readChannel jobChannel + + conns <- mapM Deli.channelLength chans + let minIndex = fromMaybe 0 $ elemIndex (foldl1' min conns) conns + + Deli.writeChannel (chans !! minIndex) job + +twoRandomChoices + :: Int + -> Channel JobTiming + -> Deli JobTiming () +twoRandomChoices num jobChannel = do + chans :: [Channel JobTiming] <- replicateM num createWorker + forever $ do + job <- Deli.readChannel jobChannel + + randomWorkerIndexA <- getRandomR (0, length chans - 1) + randomWorkerIndexB <- getRandomR (0, length chans - 1) + + aLength <- Deli.channelLength (chans !! randomWorkerIndexA) + bLength <- Deli.channelLength (chans !! randomWorkerIndexB) + + if aLength < bLength + then Deli.writeChannel (chans !! randomWorkerIndexA) job + else Deli.writeChannel (chans !! randomWorkerIndexB) job + data PriorityChannel = PriorityChannel { _pduration :: !Deli.Duration , _pchannel :: !(Deli.Channel JobTiming) @@ -67,7 +102,7 @@ dispatch :: Deli.Channel JobTiming -> (PQueue.MinQueue PriorityChannel, Deli.Time) -> Deli JobTiming (PQueue.MinQueue PriorityChannel, Deli.Time) -dispatch readChan !(queue, prevTime) = do +dispatch readChan (queue, prevTime) = do job <- Deli.readChannel readChan newTime <- Deli.now @@ -104,13 +139,17 @@ loadBalancerExample = do inputGen <- newPureMT let arrivals = Deli.Random.arrivalTimePoissonDistribution 1500 serviceTimes = Deli.Random.durationExponentialDistribution 0.025 - numTests = 1000 * 1000 * 10 + numTests = 1000 * 1000 * 1 jobsA = take numTests $ Deli.Random.distributionToJobs arrivals serviceTimes inputGen jobsB = take numTests $ Deli.Random.distributionToJobs arrivals serviceTimes inputGen jobsC = take numTests $ Deli.Random.distributionToJobs arrivals serviceTimes inputGen + jobsD = take numTests $ Deli.Random.distributionToJobs arrivals serviceTimes inputGen + jobsE = take numTests $ Deli.Random.distributionToJobs arrivals serviceTimes inputGen roundRobinRes = Deli.simulate simulationGen jobsA (roundRobinWorkers 48) randomRes = Deli.simulate simulationGen jobsB (randomWorkers 48) leastWorkLeftRes = Deli.simulate simulationGen jobsC (leastWorkLeft 48) + twoRandomChoicesRes = Deli.simulate simulationGen jobsD (twoRandomChoices 48) + leastConnRes = Deli.simulate simulationGen jobsE (leastConn 48) putStrLn "## Round Robin ##" printResults roundRobinRes @@ -121,6 +160,12 @@ loadBalancerExample = do putStrLn "## LeastWorkLeft ##" printResults leastWorkLeftRes newline + putStrLn "## TwoRandomChoices ##" + printResults twoRandomChoicesRes + newline + putStrLn "## LeastConn ##" + printResults leastConnRes + newline where newline = putStrLn "\n" diff --git a/deli.cabal b/deli.cabal index 330dfc3..68ac340 100644 --- a/deli.cabal +++ b/deli.cabal @@ -14,9 +14,6 @@ extra-source-files: README.md cabal-version: >=1.10 library - if flag(dump-core) - build-depends: dump-core - ghc-options: -fplugin=DumpCore -fplugin-opt DumpCore:core-html hs-source-dirs: src exposed-modules: Control.Monad.Concurrent , Deli diff --git a/src/Control/Monad/Concurrent.hs b/src/Control/Monad/Concurrent.hs index a7fe4c2..702b664 100644 --- a/src/Control/Monad/Concurrent.hs +++ b/src/Control/Monad/Concurrent.hs @@ -30,6 +30,7 @@ module Control.Monad.Concurrent , writeChannelNonblocking , readChannel , readChannelNonblocking + , channelLength , runConcurrentT ) where @@ -567,6 +568,21 @@ ireadChannelNonblocking chan = do IConcurrentT (resetT (runIConcurrentT' (local (const writerId) nextWriter))) return (Just val) +channelLength + :: Monad m + => Channel chanState + -> ConcurrentT chanState m Int +channelLength = ConcurrentT . iChannelLength + +iChannelLength + :: Monad m + => Channel chanState + -> IConcurrentT chanState m Int +iChannelLength chan = do + chanMap <- use channels + let chanContents = fromMaybe Data.Sequence.empty $ chanMap ^? (ix chan . contents) + return (Data.Sequence.length chanContents) + runConcurrentT :: Monad m => ConcurrentT chanState m () diff --git a/src/Deli.hs b/src/Deli.hs index ad28a90..fdd3d6e 100644 --- a/src/Deli.hs +++ b/src/Deli.hs @@ -32,6 +32,7 @@ module Deli , writeChannelNonblocking , readChannel , readChannelNonblocking + , channelLength , runDeli , runJob , priority @@ -170,6 +171,11 @@ readChannelNonblocking -> Deli chanState (Maybe chanState) readChannelNonblocking = Deli . Concurrent.readChannelNonblocking +channelLength + :: Concurrent.Channel chanState + -> Deli chanState Int +channelLength = Deli . Concurrent.channelLength + ------------------------------------------------------------------------------ -- ## Time Conversion ------------------------------------------------------------------------------