зеркало из https://github.com/github/deli.git
Fix space leak in writeChannel
Note: need to probably address this in writeChannelNonblocking, as well. I don't fully understand what's going on here, but my intuition is that somehow the combination of `forever` and `writeChannel` wasn't tail recursive, and so stack space was being built-up. By registering the writer with the scheduler, and letting the stack unwind (I think...), this seems to fix the issue.
This commit is contained in:
Родитель
294910df22
Коммит
2a9da36c90
|
@ -25,3 +25,6 @@ cabal.project.local
|
||||||
|
|
||||||
# datasets
|
# datasets
|
||||||
*.csv
|
*.csv
|
||||||
|
|
||||||
|
# profiling artifacts
|
||||||
|
*.ps
|
||||||
|
|
|
@ -0,0 +1,86 @@
|
||||||
|
import Control.Monad (forever)
|
||||||
|
import Deli (Channel, Deli, JobTiming(..))
|
||||||
|
import Deli.Printer (printResults)
|
||||||
|
import System.Random
|
||||||
|
import qualified Deli
|
||||||
|
|
||||||
|
singleQueue
|
||||||
|
:: Channel JobTiming
|
||||||
|
-> Deli JobTiming ()
|
||||||
|
singleQueue queue =
|
||||||
|
forever $ do
|
||||||
|
job <- Deli.readChannel queue
|
||||||
|
Deli.runJob job
|
||||||
|
|
||||||
|
singleQueueExample :: IO ()
|
||||||
|
singleQueueExample = do
|
||||||
|
gen <- newStdGen
|
||||||
|
let durations = repeat 0.5
|
||||||
|
count = 1000 * 100
|
||||||
|
times = [0,1..(count - 1)]
|
||||||
|
jobs = zipWith JobTiming times durations
|
||||||
|
res = Deli.simulate gen jobs singleQueue
|
||||||
|
printResults res
|
||||||
|
|
||||||
|
chainedQueues
|
||||||
|
:: Channel JobTiming
|
||||||
|
-> Deli JobTiming ()
|
||||||
|
chainedQueues queue = do
|
||||||
|
middleChan <- Deli.newChannel Nothing
|
||||||
|
Deli.fork $ forever $ do
|
||||||
|
job <- Deli.readChannel middleChan
|
||||||
|
Deli.runJob job
|
||||||
|
forever $ do
|
||||||
|
job <- Deli.readChannel queue
|
||||||
|
Deli.writeChannel middleChan job
|
||||||
|
|
||||||
|
chainedQueueExample :: IO ()
|
||||||
|
chainedQueueExample = do
|
||||||
|
gen <- newStdGen
|
||||||
|
let durations = repeat 0.5
|
||||||
|
count = 1000 * 100
|
||||||
|
times = [0,1..(count - 1)]
|
||||||
|
jobs = zipWith JobTiming times durations
|
||||||
|
res = Deli.simulate gen jobs chainedQueues
|
||||||
|
printResults res
|
||||||
|
|
||||||
|
oneThread
|
||||||
|
:: Channel JobTiming
|
||||||
|
-> Deli JobTiming ()
|
||||||
|
oneThread queue = do
|
||||||
|
middleChan <- Deli.newChannel Nothing
|
||||||
|
forever $ do
|
||||||
|
jobA <- Deli.readChannel queue
|
||||||
|
Deli.writeChannel middleChan jobA
|
||||||
|
jobB <- Deli.readChannel middleChan
|
||||||
|
Deli.runJob jobB
|
||||||
|
|
||||||
|
oneThreadExample :: IO ()
|
||||||
|
oneThreadExample = do
|
||||||
|
gen <- newStdGen
|
||||||
|
let durations = repeat 0.5
|
||||||
|
count = 1000 * 100
|
||||||
|
times = [0,1..(count - 1)]
|
||||||
|
jobs = zipWith JobTiming times durations
|
||||||
|
res = Deli.simulate gen jobs oneThread
|
||||||
|
printResults res
|
||||||
|
|
||||||
|
main :: IO ()
|
||||||
|
main = do
|
||||||
|
newline
|
||||||
|
putStrLn "## singleQueueExample ##"
|
||||||
|
newline
|
||||||
|
singleQueueExample
|
||||||
|
newline
|
||||||
|
|
||||||
|
putStrLn "## chainedQueueExample ##"
|
||||||
|
newline
|
||||||
|
chainedQueueExample
|
||||||
|
newline
|
||||||
|
|
||||||
|
putStrLn "## oneThreadExample ##"
|
||||||
|
newline
|
||||||
|
oneThreadExample
|
||||||
|
newline
|
||||||
|
|
||||||
|
where newline = putStrLn "\n"
|
10
deli.cabal
10
deli.cabal
|
@ -56,13 +56,13 @@ executable tutorial
|
||||||
default-language: Haskell2010
|
default-language: Haskell2010
|
||||||
ghc-options: -threaded -rtsopts -with-rtsopts=-N -O1
|
ghc-options: -threaded -rtsopts -with-rtsopts=-N -O1
|
||||||
|
|
||||||
test-suite deli-test
|
executable performance
|
||||||
type: exitcode-stdio-1.0
|
hs-source-dirs: app
|
||||||
hs-source-dirs: test
|
main-is: Performance.hs
|
||||||
main-is: Spec.hs
|
|
||||||
build-depends: base
|
build-depends: base
|
||||||
, deli
|
, deli
|
||||||
ghc-options: -threaded -rtsopts -with-rtsopts=-N
|
, random
|
||||||
|
ghc-options: -rtsopts
|
||||||
default-language: Haskell2010
|
default-language: Haskell2010
|
||||||
|
|
||||||
source-repository head
|
source-repository head
|
||||||
|
|
|
@ -449,6 +449,7 @@ iwriteChannel chan@(Channel _ident mMaxSize) item = do
|
||||||
Just ((readerId, nextReader), newReaders) -> do
|
Just ((readerId, nextReader), newReaders) -> do
|
||||||
channels . ix chan . readers .= newReaders
|
channels . ix chan . readers .= newReaders
|
||||||
local (const readerId) nextReader
|
local (const readerId) nextReader
|
||||||
|
register (ischeduleDuration 0 myId)
|
||||||
|
|
||||||
writeChannelNonblocking
|
writeChannelNonblocking
|
||||||
:: Monad m
|
:: Monad m
|
||||||
|
|
|
@ -1,2 +0,0 @@
|
||||||
main :: IO ()
|
|
||||||
main = putStrLn "Test suite not yet implemented"
|
|
Загрузка…
Ссылка в новой задаче