diff --git a/.gitignore b/.gitignore index a014a4b..a879044 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,5 @@ cabal.sandbox.config examples/Group *.sublime-* dist-newstyle/ +cabal.project.local +cabal.project.local~ diff --git a/Data/Atomic.hs b/Data/Atomic.hs index 98a5501..ce28d2a 100644 --- a/Data/Atomic.hs +++ b/Data/Atomic.hs @@ -1,4 +1,9 @@ -{-# LANGUAGE BangPatterns, ForeignFunctionInterface #-} +{-# LANGUAGE BangPatterns + , CPP + , ForeignFunctionInterface + , MagicHash + , UnboxedTuples + #-} -- | An atomic integer value. All operations are thread safe. module Data.Atomic ( @@ -12,53 +17,112 @@ module Data.Atomic , subtract ) where -import Data.Int (Int64) -import Foreign.ForeignPtr (ForeignPtr, mallocForeignPtr, withForeignPtr) -import Foreign.Ptr (Ptr) -import Foreign.Storable (poke) +#include "MachDeps.h" +#ifndef SIZEOF_HSINT +#error "MachDeps.h didn't define SIZEOF_HSINT" +#endif + import Prelude hiding (read, subtract) +import GHC.Int + +#if SIZEOF_HSINT == 8 + +-- 64-bit imports +import GHC.IO +import GHC.Prim + +#else + +-- 32-bit imports +import Data.IORef + +#endif + + +-- 64-bit machine, Int ~ Int64, do it the fast way: +#if SIZEOF_HSINT == 8 + +#if MIN_VERSION_base(4,17,0) +int64ToInt :: Int64# -> Int# +int64ToInt = int64ToInt# + +intToInt64 :: Int# -> Int64# +intToInt64 = intToInt64# +#else +int64ToInt :: Int# -> Int# +int64ToInt i = i + +intToInt64 :: Int# -> Int# +intToInt64 i = i +#endif + -- | A mutable, atomic integer. -newtype Atomic = C (ForeignPtr Int64) +data Atomic = C (MutableByteArray# RealWorld) -- | Create a new, zero initialized, atomic. new :: Int64 -> IO Atomic -new n = do - fp <- mallocForeignPtr - withForeignPtr fp $ \ p -> poke p n - return $ C fp +new (I64# n64) = IO $ \s -> + case newByteArray# SIZEOF_HSINT# s of { (# s1, mba #) -> + case atomicWriteIntArray# mba 0# (int64ToInt n64) s1 of { s2 -> + (# s2, C mba #) }} read :: Atomic -> IO Int64 -read (C fp) = withForeignPtr fp cRead - -foreign import ccall unsafe "hs_atomic_read" cRead :: Ptr Int64 -> IO Int64 +read (C mba) = IO $ \s -> + case atomicReadIntArray# mba 0# s of { (# s1, n #) -> + (# s1, I64# (intToInt64 n) #)} -- | Set the atomic to the given value. write :: Atomic -> Int64 -> IO () -write (C fp) n = withForeignPtr fp $ \ p -> cWrite p n +write (C mba) (I64# n64) = IO $ \s -> + case atomicWriteIntArray# mba 0# (int64ToInt n64) s of { s1 -> + (# s1, () #) } -foreign import ccall unsafe "hs_atomic_write" cWrite - :: Ptr Int64 -> Int64 -> IO () +-- | Increase the atomic by the given amount. +add :: Atomic -> Int64 -> IO () +add (C mba) (I64# n64) = IO $ \s -> + case fetchAddIntArray# mba 0# (int64ToInt n64) s of { (# s1, _ #) -> + (# s1, () #) } --- | Increase the atomic by one. -inc :: Atomic -> IO () -inc atomic = add atomic 1 +-- | Decrease the atomic by the given amount. +subtract :: Atomic -> Int64 -> IO () +subtract (C mba) (I64# n64) = IO $ \s -> + case fetchSubIntArray# mba 0# (int64ToInt n64) s of { (# s1, _ #) -> + (# s1, () #) } --- | Decrease the atomic by one. -dec :: Atomic -> IO () -dec atomic = subtract atomic 1 +#else + +-- 32-bit machine, Int ~ Int32, fall back to IORef. This could be replaced with +-- faster implementations for specific 32-bit machines in the future, but the +-- idea is to preserve 64-bit width for counters. + +newtype Atomic = C (IORef Int64) + +-- | Create a new, zero initialized, atomic. +new :: Int64 -> IO Atomic +new = fmap C . newIORef + +read :: Atomic -> IO Int64 +read (C ior) = readIORef ior + +-- | Set the atomic to the given value. +write :: Atomic -> Int64 -> IO () +write (C ior) !i = atomicWriteIORef ior i -- | Increase the atomic by the given amount. add :: Atomic -> Int64 -> IO () -add (C fp) n = withForeignPtr fp $ \ p -> cAdd p n +add (C ior) !i = atomicModifyIORef' ior (\(!n) -> (n+i, ())) -- | Decrease the atomic by the given amount. subtract :: Atomic -> Int64 -> IO () -subtract (C fp) n = withForeignPtr fp $ \ p -> cSubtract p n +subtract (C ior) !i = atomicModifyIORef' ior (\(!n) -> (n-i, ())) --- | Increase the atomic by the given amount. -foreign import ccall unsafe "hs_atomic_add" cAdd :: Ptr Int64 -> Int64 -> IO () +#endif --- | Increase the atomic by the given amount. -foreign import ccall unsafe "hs_atomic_subtract" cSubtract - :: Ptr Int64 -> Int64 -> IO () +-- | Increase the atomic by one. +inc :: Atomic -> IO () +inc atomic = add atomic 1 + +-- | Decrease the atomic by one. +dec :: Atomic -> IO () +dec atomic = subtract atomic 1 diff --git a/System/Metrics.hs b/System/Metrics.hs index 60e22b0..0d4c7ea 100644 --- a/System/Metrics.hs +++ b/System/Metrics.hs @@ -68,7 +68,6 @@ module System.Metrics , Value(..) ) where -import Control.Applicative ((<$>)) import Control.Monad (forM) import Data.Int (Int64) import qualified Data.IntMap.Strict as IM diff --git a/System/Metrics/Distribution.hsc b/System/Metrics/Distribution.hsc index 8e53e29..46166b1 100644 --- a/System/Metrics/Distribution.hsc +++ b/System/Metrics/Distribution.hsc @@ -1,9 +1,12 @@ {-# LANGUAGE BangPatterns #-} {-# LANGUAGE ForeignFunctionInterface #-} +{-# LANGUAGE MagicHash #-} {-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE UnboxedTuples #-} {-# OPTIONS_GHC -funbox-strict-fields #-} #include "distrib.h" +#include "MachDeps.h" -- | This module defines a type for tracking statistics about a series -- of events. An event could be handling of a request and the value @@ -28,85 +31,150 @@ module System.Metrics.Distribution ) where import Control.Monad (forM_, replicateM) -import Data.Int (Int64) -import Foreign.C.Types (CInt) -import Foreign.ForeignPtr (ForeignPtr, mallocForeignPtr, withForeignPtr) -import Foreign.Ptr (Ptr) -import Foreign.Storable (Storable(alignment, peek, poke, sizeOf), peekByteOff, - pokeByteOff) import Prelude hiding (max, min, read, sum) +import Foreign.Storable (sizeOf) + +import GHC.Float +import GHC.Int (Int(..), Int64(..)) +import GHC.IO +import GHC.Prim + import Data.Array import System.Metrics.Distribution.Internal (Stats(..)) import System.Metrics.ThreadId +-- Three cases for these definitions +-- 64-bit, GHC has Int64 +-- 64-bit, GHC doesn't have Int64 +-- 32-bit + +-- 64-bit machine, Int ~ Int64, do it the fast way: +#if SIZEOF_HSINT == 8 + +#if MIN_VERSION_base(4,17,0) +int64ToDouble :: Int64## -> Double## +int64ToDouble i = int2Double## (int64ToInt## i) + +intToInt64 :: Int## -> Int64## +intToInt64 = intToInt64## + +plusInt64 :: Int64## -> Int64## -> Int64## +plusInt64 = plusInt64## + +eqInt64 :: Int64## -> Int64## -> Int## +eqInt64 = eqInt64## + +readInt64Array :: MutableByteArray## d -> Int## -> State## d -> (## State## d, Int64## ##) +readInt64Array = readInt64Array## + +writeInt64Array :: MutableByteArray## d -> Int## -> Int64## -> State## d -> State## d +writeInt64Array = writeInt64Array## + +#else +int64ToDouble :: Int## -> Double## +int64ToDouble = int2Double## + +intToInt64 :: Int## -> Int## +intToInt64 i = i + +plusInt64 :: Int## -> Int## -> Int## +plusInt64 = (+##) + +eqInt64 :: Int## -> Int## -> Int## +eqInt64 = (==##) + +readInt64Array :: MutableByteArray## d -> Int## -> State## d -> (## State## d, Int## ##) +readInt64Array = readIntArray## + +writeInt64Array :: MutableByteArray## d -> Int## -> Int## -> State## d -> State## d +writeInt64Array = writeIntArray## +#endif + +#else +-- NB: I've only tested these with the WASM backend: + +intToInt64 :: Int## -> Int64## +intToInt64 = intToInt64## + +plusInt64 :: Int64## -> Int64## -> Int64## +plusInt64 = plusInt64## + +eqInt64 :: Int64## -> Int64## -> Int## +eqInt64 = eqInt64## + +readInt64Array :: MutableByteArray## d -> Int## -> State## d -> (## State## d, Int64## ##) +readInt64Array = readInt64Array## + +writeInt64Array :: MutableByteArray## d -> Int## -> Int64## -> State## d -> State## d +writeInt64Array = writeInt64Array## + +-- I don't know a better way on 32-bit machines... +int64ToDouble :: Int64## -> Double## +int64ToDouble i = + case fromIntegral (I64## i) of (D## d) -> d +#endif + -- | An metric for tracking events. newtype Distribution = Distribution { unD :: Array Stripe } -data Stripe = Stripe - { stripeFp :: !(ForeignPtr CDistrib) - } - -data CDistrib = CDistrib - { cCount :: !Int64 - , cMean :: !Double - , cSumSqDelta :: !Double - , cSum :: !Double - , cMin :: !Double - , cMax :: !Double - , cLock :: !Int64 -- ^ 0 - unlocked, 1 - locked - } - -instance Storable CDistrib where - sizeOf _ = (#size struct distrib) - alignment _ = alignment (undefined :: CInt) - - peek p = do - cCount <- (#peek struct distrib, count) p - cMean <- (#peek struct distrib, mean) p - cSumSqDelta <- (#peek struct distrib, sum_sq_delta) p - cSum <- (#peek struct distrib, sum) p - cMin <- (#peek struct distrib, min) p - cMax <- (#peek struct distrib, max) p - cLock <- (#peek struct distrib, lock) p - return $! CDistrib - { cCount = cCount - , cMean = cMean - , cSumSqDelta = cSumSqDelta - , cSum = cSum - , cMin = cMin - , cMax = cMax - , cLock = cLock - } - - poke p CDistrib{..} = do - (#poke struct distrib, count) p cCount - (#poke struct distrib, mean) p cMean - (#poke struct distrib, sum_sq_delta) p cSumSqDelta - (#poke struct distrib, sum) p cSum - (#poke struct distrib, min) p cMin - (#poke struct distrib, max) p cMax - (#poke struct distrib, lock) p cLock - -newCDistrib :: IO (ForeignPtr CDistrib) -newCDistrib = do - fp <- mallocForeignPtr - withForeignPtr fp $ \ p -> poke p $ CDistrib - { cCount = 0 - , cMean = 0.0 - , cSumSqDelta = 0.0 - , cSum = 0.0 - , cMin = 0.0 - , cMax = 0.0 - , cLock = 0 - } - return fp +newtype Stripe = Stripe { stripeD :: Distrib } + +data Distrib = Distrib (MutableByteArray## RealWorld) + +unI :: Int -> Int## +unI (I## x) = x +{-# INLINE unI #-} + +distribLen :: Int +distribLen = (#size struct distrib) + +lockPos :: Int +lockPos = div (#offset struct distrib, lock) + (sizeOf (undefined :: Int)) + +countPos :: Int +countPos = div (#offset struct distrib, count) + (sizeOf (undefined :: Int64)) + +meanPos :: Int +meanPos = div (#offset struct distrib, mean) + (sizeOf (undefined :: Double)) + +sumSqDeltaPos :: Int +sumSqDeltaPos = div (#offset struct distrib, sum_sq_delta) + (sizeOf (undefined :: Double)) + +sumPos :: Int +sumPos = div (#offset struct distrib, sum) + (sizeOf (undefined :: Double)) + +minPos :: Int +minPos = div (#offset struct distrib, min) + (sizeOf (undefined :: Double)) + +maxPos :: Int +maxPos = div (#offset struct distrib, max) + (sizeOf (undefined :: Double)) + +newDistrib :: IO Distrib +newDistrib = IO $ \s -> + case newByteArray## (unI distribLen) s of { (## s1, mba ##) -> + -- probably unnecessary + case atomicWriteIntArray## mba (unI lockPos) 0## s1 of { s2 -> + case writeInt64Array mba (unI countPos) (intToInt64 0##) s2 of { s3 -> + case writeDoubleArray## mba (unI meanPos) 0.0#### s3 of { s4 -> + case writeDoubleArray## mba (unI sumSqDeltaPos) 0.0#### s4 of { s5 -> + case writeDoubleArray## mba (unI sumPos) 0.0#### s5 of { s6 -> + case writeDoubleArray## mba (unI minPos) 0.0#### s6 of { s7 -> + case writeDoubleArray## mba (unI maxPos) 0.0#### s7 of { s8 -> + (## s8, Distrib mba ##) }}}}}}}} newStripe :: IO Stripe newStripe = do - fp <- newCDistrib + d <- newDistrib return $! Stripe - { stripeFp = fp + { stripeD = d } -- | Number of lock stripes. Should be greater or equal to the number @@ -132,34 +200,113 @@ new = (Distribution . fromList numStripes) `fmap` add :: Distribution -> Double -> IO () add distrib val = addN distrib val 1 -foreign import ccall unsafe "hs_distrib_add_n" cDistribAddN - :: Ptr CDistrib -> Double -> Int64 -> IO () +{-# INLINE spinLock #-} +spinLock :: MutableByteArray## RealWorld -> State## RealWorld -> State## RealWorld +spinLock mba = \s -> + case casIntArray## mba (unI lockPos) 0## 1## s of { (## s1, r ##) -> + case r of { 0## -> s1; _ -> + case yield## s1 of { s2 -> + spinLock mba s2 }}} + +{-# INLINE spinUnlock #-} +spinUnlock :: MutableByteArray## RealWorld -> State## RealWorld -> State## RealWorld +spinUnlock mba = \s -> + case writeIntArray## mba (unI lockPos) 0## s of { s2 -> s2 } + -- | Add the same value to the distribution N times. +-- Mean and variance are computed according to +-- http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Online_algorithm addN :: Distribution -> Double -> Int64 -> IO () -addN distrib val n = do - stripe <- myStripe distrib - withForeignPtr (stripeFp stripe) $ \ p -> - cDistribAddN p val n +addN distribution (D## val) (I64## n) = IO $ \s -> + case myStripe distribution of { (IO myStripe') -> + case myStripe' s of { (## s1, (Stripe (Distrib mba)) ##) -> + case spinLock mba s1 of { s2 -> + case readInt64Array mba (unI countPos) s2 of { (## s3, count ##) -> + case readDoubleArray## mba (unI meanPos) s3 of { (## s4, mean ##) -> + case readDoubleArray## mba (unI sumSqDeltaPos) s4 of { (## s5, sumSqDelta ##) -> + case readDoubleArray## mba (unI sumPos) s5 of { (## s6, dSum ##) -> + case readDoubleArray## mba (unI minPos) s6 of { (## s7, dMin ##) -> + case readDoubleArray## mba (unI maxPos) s7 of { (## s8, dMax ##) -> + case plusInt64 count n of { count' -> + case val -#### mean of { delta -> + case mean +#### ((int64ToDouble n) *#### delta /#### (int64ToDouble count')) of { mean' -> + case sumSqDelta +#### (delta *#### (val -#### mean') *#### (int64ToDouble n)) of { sumSqDelta' -> + case writeInt64Array mba (unI countPos) count' s8 of { s9 -> + case writeDoubleArray## mba (unI meanPos) mean' s9 of { s10 -> + case writeDoubleArray## mba (unI sumSqDeltaPos) sumSqDelta' s10 of { s11 -> + case writeDoubleArray## mba (unI sumPos) (dSum +#### val) s11 of { s12 -> + case (case val <#### dMin of { 0## -> dMin; _ -> val }) of { dMin' -> + case (case val >#### dMax of { 0## -> dMax; _ -> val }) of { dMax' -> + case writeDoubleArray## mba (unI minPos) dMin' s12 of { s13 -> + case writeDoubleArray## mba (unI maxPos) dMax' s13 of { s14 -> + case spinUnlock mba s14 of { s15 -> + (## s15, () ##) }}}}}}}}}}}}}}}}}}}}}} -foreign import ccall unsafe "hs_distrib_combine" combine - :: Ptr CDistrib -> Ptr CDistrib -> IO () +-- | Combine 'b' with 'a', writing the result in 'a'. Takes the lock of +-- 'b' while combining, but doesn't otherwise modify 'b'. 'a' is +-- assumed to not be used concurrently. +-- See also: +-- http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm +combine :: Distrib -> Distrib -> IO () +combine (Distrib bMBA) (Distrib aMBA) = IO $ \s -> + case spinLock bMBA s of { s1 -> + case readInt64Array aMBA (unI countPos) s1 of { (## s2, aCount ##) -> + case readInt64Array bMBA (unI countPos) s2 of { (## s3, bCount ##) -> + case plusInt64 aCount bCount of { count' -> + case readDoubleArray## aMBA (unI meanPos) s3 of { (## s4, aMean ##) -> + case readDoubleArray## bMBA (unI meanPos) s4 of { (## s5, bMean ##) -> + case bMean -#### aMean of { delta -> + case ( (((int64ToDouble aCount) *#### aMean) +#### ((int64ToDouble bCount) *#### bMean)) + /#### (int64ToDouble count') + ) of { mean' -> + case readDoubleArray## aMBA (unI sumSqDeltaPos) s5 of { (## s6, aSumSqDelta ##) -> + case readDoubleArray## bMBA (unI sumSqDeltaPos) s6 of { (## s7, bSumSqDelta ##) -> + case ( aSumSqDelta + +#### bSumSqDelta + +#### ( delta + *#### delta + *#### ( (int64ToDouble aCount) *#### (int64ToDouble bCount) + /#### (int64ToDouble count') + ) + ) + ) of { sumSqDelta' -> + case writeInt64Array aMBA (unI countPos) count' s7 of { s8 -> + case (case eqInt64 count' (intToInt64 0##) of { 0## -> mean'; _ -> 0.0#### }) of { writeMean -> + case writeDoubleArray## aMBA (unI meanPos) writeMean s8 of { s9 -> + case writeDoubleArray## aMBA (unI sumSqDeltaPos) sumSqDelta' s9 of { s10 -> + case readDoubleArray## aMBA (unI sumPos) s10 of { (## s11, aSum ##) -> + case readDoubleArray## bMBA (unI sumPos) s11 of { (## s12, bSum ##) -> + case writeDoubleArray## aMBA (unI sumPos) (aSum +#### bSum) s12 of { s13 -> + case readDoubleArray## bMBA (unI minPos) s13 of { (## s14, bMin ##) -> + case writeDoubleArray## aMBA (unI minPos) bMin s14 of { s15 -> + -- This is slightly hacky, but ok: see + -- 813aa426be78e8abcf1c7cdd43433bcffa07828e + case readDoubleArray## bMBA (unI maxPos) s15 of { (## s16, bMax ##) -> + case writeDoubleArray## aMBA (unI maxPos) bMax s16 of { s17 -> + case spinUnlock bMBA s17 of { s18 -> + (## s18, () ##) }}}}}}}}}}}}}}}}}}}}}}} -- | Get the current statistical summary for the event being tracked. read :: Distribution -> IO Stats read distrib = do - result <- newCDistrib - CDistrib{..} <- withForeignPtr result $ \ resultp -> do - forM_ (toList $ unD distrib) $ \ stripe -> - withForeignPtr (stripeFp stripe) $ \ p -> - combine p resultp - peek resultp - return $! Stats - { mean = cMean - , variance = if cCount == 0 then 0.0 - else cSumSqDelta / fromIntegral cCount - , count = cCount - , sum = cSum - , min = cMin - , max = cMax - } + result@(Distrib mba) <- newDistrib + forM_ (toList $ unD distrib) $ \(Stripe d) -> + combine d result + IO $ \s -> + case readInt64Array mba (unI countPos) s of { (## s1, count ##) -> + case readDoubleArray## mba (unI meanPos) s1 of { (## s2, mean ##) -> + case readDoubleArray## mba (unI sumSqDeltaPos) s2 of { (## s3, sumSqDelta ##) -> + case readDoubleArray## mba (unI sumPos) s3 of { (## s4, dSum ##) -> + case readDoubleArray## mba (unI minPos) s4 of { (## s5, dMin ##) -> + case readDoubleArray## mba (unI maxPos) s5 of { (## s6, dMax ##) -> + (## s6 + , Stats { mean = (D## mean) + , variance = if (I64## count) == 0 then 0.0 + else (D## sumSqDelta) / (D## (int64ToDouble count)) + , count = (I64## count) + , sum = (D## dSum) + , min = (D## dMin) + , max = (D## dMax) + } + ##) }}}}}} diff --git a/benchmarks/Counter.hs b/benchmarks/Counter.hs index eea2785..5f4d8fe 100644 --- a/benchmarks/Counter.hs +++ b/benchmarks/Counter.hs @@ -5,18 +5,21 @@ module Main where import Control.Concurrent import Control.Monad -import System.Metrics.Counter +import qualified System.Metrics.Counter as C main :: IO () main = do - counter <- new + counter <- C.new locks <- replicateM n newEmptyMVar mapM_ (forkIO . work counter iters) locks mapM_ takeMVar locks + fin <- C.read counter + when (fin /= fromIntegral (n * iters)) $ + fail "Data.Atomic is broken!" where n = 100 iters = 100000 - work :: Counter -> Int -> MVar () -> IO () + work :: C.Counter -> Int -> MVar () -> IO () work !_ 0 !lock = putMVar lock () - work counter i lock = inc counter >> work counter (i - 1) lock + work counter i lock = C.inc counter >> work counter (i - 1) lock diff --git a/cbits/atomic.c b/cbits/atomic.c deleted file mode 100644 index e9f01da..0000000 --- a/cbits/atomic.c +++ /dev/null @@ -1,17 +0,0 @@ -#include "HsFFI.h" - -void hs_atomic_add(volatile StgInt64* atomic, StgInt64 n) { - __sync_fetch_and_add(atomic, n); -} - -void hs_atomic_subtract(volatile StgInt64* atomic, StgInt64 n) { - __sync_fetch_and_sub(atomic, n); -} - -StgInt64 hs_atomic_read(volatile const StgInt64* atomic) { - return *atomic; -} - -void hs_atomic_write(volatile StgInt64* atomic, StgInt64 n) { - *atomic = n; -} diff --git a/cbits/distrib.c b/cbits/distrib.c deleted file mode 100644 index ae2a824..0000000 --- a/cbits/distrib.c +++ /dev/null @@ -1,44 +0,0 @@ -#include "HsFFI.h" -#include "distrib.h" - -static void hs_lock(volatile StgInt64* lock) { - while(!__sync_bool_compare_and_swap(lock, 0, 1)); -} - -static void hs_unlock(volatile StgInt64* lock) { - *lock = 0; -} - -// Mean and variance are computed according to -// http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Online_algorithm -void hs_distrib_add_n(struct distrib* distrib, StgDouble val, StgInt64 n) { - hs_lock(&distrib->lock); - const StgInt64 count = distrib->count + n; - const StgDouble delta = val - distrib->mean; - const StgDouble mean = distrib->mean + n * delta / count; - const StgDouble sum_sq_delta = distrib->sum_sq_delta + delta * (val - mean) * n; - distrib->count = count; - distrib->mean = mean; - distrib->sum_sq_delta = sum_sq_delta; - distrib->sum += val; - distrib->min = val < distrib->min ? val : distrib->min; - distrib->max = val > distrib->max ? val : distrib->max; - hs_unlock(&distrib->lock); -} - -// http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm -void hs_distrib_combine(struct distrib* b, struct distrib* a) { - hs_lock(&b->lock); - const StgInt64 count = a->count + b->count; - const StgDouble delta = b->mean - a->mean; - const StgDouble mean = (a->count * a->mean + b->count * b->mean) / count; - const StgDouble sum_sq_delta = (a->sum_sq_delta + b->sum_sq_delta + - delta * delta * (a->count * b->count) / count); - a->count = count; - a->mean = (count == 0) ? 0.0 : mean; // divide-by-zero gives NaN - a->sum_sq_delta = sum_sq_delta; - a->sum = a->sum + b->sum; - a->min = b->min; // This is slightly hacky, but ok: see - a->max = b->max; // 813aa426be78e8abcf1c7cdd43433bcffa07828e - hs_unlock(&b->lock); -} diff --git a/cbits/distrib.h b/cbits/distrib.h index 6111b35..ead588f 100644 --- a/cbits/distrib.h +++ b/cbits/distrib.h @@ -1,20 +1,22 @@ +#include "MachDeps.h" + #include "HsFFI.h" -struct distrib { - StgInt64 count; - StgDouble mean; - StgDouble sum_sq_delta; - StgDouble sum; - StgDouble min; - StgDouble max; - volatile StgInt64 lock; -}; +#include -void hs_distrib_add_n(struct distrib* distrib, StgDouble val, StgInt64 n); +#if WORD_SIZE_IN_BYTES > 32 +#define HSINTTYPE int32_t +#else +#define HSINTTYPE int64_t +#endif -/* - * Combine 'b' with 'a', writing the result in 'a'. Takes the lock of - * 'b' while combining, but doesn't otherwise modify 'b'. 'a' is - * assumed to not be used concurrently. - */ -void hs_distrib_combine(struct distrib* b, struct distrib* a); +struct distrib { + // 0 -> unlocked; 1 -> locked + HSINTTYPE lock; + int64_t count; + double mean; + double sum_sq_delta; + double sum; + double min; + double max; +}; diff --git a/ekg-core.cabal b/ekg-core.cabal index 3141c8e..2ff3ebf 100644 --- a/ekg-core.cabal +++ b/ekg-core.cabal @@ -48,21 +48,20 @@ library build-depends: base >=4.6 && <4.22 , containers >=0.5 && <0.9 + , ghc-prim >= 0.4 && < 0.14 , text <2.2 , unordered-containers <0.3 default-language: Haskell2010 ghc-options: -Wall + ghc-options: -Wall -O2 if arch(i386) cc-options: -march=i686 includes: distrib.h install-includes: distrib.h include-dirs: cbits - c-sources: - cbits/atomic.c - cbits/distrib.c benchmark counter main-is: Counter.hs @@ -73,7 +72,7 @@ benchmark counter default-language: Haskell2010 hs-source-dirs: benchmarks - ghc-options: -O2 -threaded -Wall + ghc-options: -O2 -threaded -rtsopts -Wall benchmark distribution main-is: Distribution.hs @@ -84,7 +83,7 @@ benchmark distribution default-language: Haskell2010 hs-source-dirs: benchmarks - ghc-options: -O2 -threaded -Wall + ghc-options: -O2 -threaded -rtsopts -Wall source-repository head type: git