Use MVar over IORef
Prevents race conditions when updating the Maps
This commit is contained in:
parent
b9ae32c4c0
commit
08a1f272c6
3 changed files with 45 additions and 42 deletions
|
@ -8,6 +8,7 @@ module Myriad.Core
|
||||||
, LanguageConfig(..)
|
, LanguageConfig(..)
|
||||||
, MyriadT
|
, MyriadT
|
||||||
, MonadWithIO
|
, MonadWithIO
|
||||||
|
, MyriadIO
|
||||||
, runMyriadT
|
, runMyriadT
|
||||||
, initEnv
|
, initEnv
|
||||||
) where
|
) where
|
||||||
|
@ -23,8 +24,8 @@ import qualified Data.Text as T
|
||||||
import Dhall
|
import Dhall
|
||||||
import GHC.Generics (Generic)
|
import GHC.Generics (Generic)
|
||||||
|
|
||||||
|
import Control.Concurrent.MVar
|
||||||
import Control.Concurrent.QSem
|
import Control.Concurrent.QSem
|
||||||
import Data.IORef.Lifted
|
|
||||||
import Data.Snowflake
|
import Data.Snowflake
|
||||||
|
|
||||||
type Language = T.Text
|
type Language = T.Text
|
||||||
|
@ -35,9 +36,9 @@ data EvalResult = EvalOk BL.ByteString | EvalTimedOut | EvalErrored
|
||||||
|
|
||||||
data Env = Env
|
data Env = Env
|
||||||
{ config :: MyriadConfig
|
{ config :: MyriadConfig
|
||||||
, containers :: IORef (M.Map Language ContainerName)
|
, containers :: MVar (M.Map Language ContainerName)
|
||||||
, containerSems :: IORef (M.Map Language QSem)
|
, containerSems :: MVar (M.Map Language QSem)
|
||||||
, evalSems :: IORef (M.Map Language QSem)
|
, evalSems :: MVar (M.Map Language QSem)
|
||||||
, snowflakeGen :: SnowflakeGen
|
, snowflakeGen :: SnowflakeGen
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -66,6 +67,8 @@ type MyriadT m = ReaderT Env (LoggingT m)
|
||||||
|
|
||||||
type MonadWithIO m = (MonadIO m, MonadBase IO m, MonadBaseControl IO m)
|
type MonadWithIO m = (MonadIO m, MonadBase IO m, MonadBaseControl IO m)
|
||||||
|
|
||||||
|
type MyriadIO a = forall m. MonadWithIO m => MyriadT m a
|
||||||
|
|
||||||
readConfig :: T.Text -> IO MyriadConfig
|
readConfig :: T.Text -> IO MyriadConfig
|
||||||
readConfig = input auto
|
readConfig = input auto
|
||||||
|
|
||||||
|
@ -73,9 +76,9 @@ initEnv :: T.Text -> IO Env
|
||||||
initEnv configInput =
|
initEnv configInput =
|
||||||
Env
|
Env
|
||||||
<$> readConfig configInput
|
<$> readConfig configInput
|
||||||
<*> newIORef M.empty
|
<*> newMVar M.empty
|
||||||
<*> newIORef M.empty
|
<*> newMVar M.empty
|
||||||
<*> newIORef M.empty
|
<*> newMVar M.empty
|
||||||
<*> newSnowflakeGen defaultConfig 0
|
<*> newSnowflakeGen defaultConfig 0
|
||||||
|
|
||||||
runMyriadT :: MonadIO m => Env -> MyriadT m a -> m a
|
runMyriadT :: MonadIO m => Env -> MyriadT m a -> m a
|
||||||
|
|
|
@ -16,21 +16,21 @@ import Control.Monad.Reader
|
||||||
|
|
||||||
import qualified Data.Map.Strict as M
|
import qualified Data.Map.Strict as M
|
||||||
|
|
||||||
import Control.Concurrent.Lifted (fork, threadDelay)
|
|
||||||
import Control.Concurrent.Async.Lifted
|
import Control.Concurrent.Async.Lifted
|
||||||
|
import Control.Concurrent.Lifted (fork, threadDelay)
|
||||||
|
import Control.Concurrent.MVar.Lifted
|
||||||
import Control.Concurrent.QSem.Lifted
|
import Control.Concurrent.QSem.Lifted
|
||||||
import Control.Exception.Lifted
|
import Control.Exception.Lifted
|
||||||
import Data.IORef.Lifted
|
|
||||||
import Data.Snowflake
|
import Data.Snowflake
|
||||||
import System.Process.Typed
|
import System.Process.Typed
|
||||||
|
|
||||||
import Myriad.Core
|
import Myriad.Core
|
||||||
import Myriad.Util
|
import Myriad.Util
|
||||||
|
|
||||||
exec :: MonadWithIO m => String -> MyriadT m ()
|
exec :: String -> MyriadIO ()
|
||||||
exec = runProcess_ . shell
|
exec = runProcess_ . shell
|
||||||
|
|
||||||
buildImage :: MonadWithIO m => LanguageConfig -> MyriadT m ()
|
buildImage :: LanguageConfig -> MyriadIO ()
|
||||||
buildImage lang@LanguageConfig { name, concurrent } = do
|
buildImage lang@LanguageConfig { name, concurrent } = do
|
||||||
logInfoN $ mconcat ["Building image ", cvs $ imageName lang]
|
logInfoN $ mconcat ["Building image ", cvs $ imageName lang]
|
||||||
let cmd = mconcat ["docker build -t ", imageName lang, " ./languages/", cvs name]
|
let cmd = mconcat ["docker build -t ", imageName lang, " ./languages/", cvs name]
|
||||||
|
@ -39,35 +39,35 @@ buildImage lang@LanguageConfig { name, concurrent } = do
|
||||||
Env { config = MyriadConfig { prepareContainers }, containerSems, evalSems } <- ask
|
Env { config = MyriadConfig { prepareContainers }, containerSems, evalSems } <- ask
|
||||||
csem <- newQSem 1 -- We only want one container to be set up at a time
|
csem <- newQSem 1 -- We only want one container to be set up at a time
|
||||||
esem <- newQSem $ fromIntegral concurrent
|
esem <- newQSem $ fromIntegral concurrent
|
||||||
modifyIORef' containerSems $ M.insert name csem
|
modifyMVar_ containerSems $ pure . M.insert name csem
|
||||||
modifyIORef' evalSems $ M.insert name esem
|
modifyMVar_ evalSems $ pure . M.insert name esem
|
||||||
when_ prepareContainers $ setupContainer lang
|
when_ prepareContainers $ setupContainer lang
|
||||||
|
|
||||||
buildAllImages :: MonadWithIO m => MyriadT m ()
|
buildAllImages :: MyriadIO ()
|
||||||
buildAllImages = do
|
buildAllImages = do
|
||||||
MyriadConfig { languages, buildConcurrently } <- asks config
|
MyriadConfig { languages, buildConcurrently } <- asks config
|
||||||
if buildConcurrently
|
if buildConcurrently
|
||||||
then forConcurrently_ languages buildImage
|
then forConcurrently_ languages buildImage
|
||||||
else forM_ languages buildImage
|
else forM_ languages buildImage
|
||||||
|
|
||||||
startCleanup :: MonadWithIO m => MyriadT m ()
|
startCleanup :: MyriadIO ()
|
||||||
startCleanup = do
|
startCleanup = do
|
||||||
MyriadConfig { cleanupInterval } <- asks config
|
MyriadConfig { cleanupInterval } <- asks config
|
||||||
when_ (cleanupInterval > 0) do
|
when_ (cleanupInterval > 0) do
|
||||||
let t = fromIntegral cleanupInterval * 60000000
|
let t = fromIntegral cleanupInterval * 60000000
|
||||||
fork $ timer t
|
fork $ timer t
|
||||||
where
|
where
|
||||||
timer :: MonadWithIO m => Int -> MyriadT m ()
|
timer :: Int -> MyriadIO ()
|
||||||
timer t = forever do
|
timer t = forever do
|
||||||
threadDelay t
|
threadDelay t
|
||||||
n <- killAllContainersMaybe
|
n <- killAllContainersMaybe
|
||||||
logInfoN $ mconcat ["Cleaned up ", cvs $ show n, " containers"]
|
logInfoN $ mconcat ["Cleaned up ", cvs $ show n, " containers"]
|
||||||
timer t
|
timer t
|
||||||
|
|
||||||
setupContainer :: MonadWithIO m => LanguageConfig -> MyriadT m ContainerName
|
setupContainer :: LanguageConfig -> MyriadIO ContainerName
|
||||||
setupContainer lang@LanguageConfig { name, memory, cpus } = do
|
setupContainer lang@LanguageConfig { name, memory, cpus } = do
|
||||||
ref <- asks containers
|
ref <- asks containers
|
||||||
cnts <- readIORef ref
|
cnts <- readMVar ref
|
||||||
case cnts M.!? name of
|
case cnts M.!? name of
|
||||||
Just x -> pure x
|
Just x -> pure x
|
||||||
Nothing -> do
|
Nothing -> do
|
||||||
|
@ -91,60 +91,60 @@ setupContainer lang@LanguageConfig { name, memory, cpus } = do
|
||||||
-- 711 so that users can't traverse into other people's code
|
-- 711 so that users can't traverse into other people's code
|
||||||
exec $ mconcat ["docker exec ", cnt, " mkdir eval"]
|
exec $ mconcat ["docker exec ", cnt, " mkdir eval"]
|
||||||
exec $ mconcat ["docker exec ", cnt, " chmod 711 eval"]
|
exec $ mconcat ["docker exec ", cnt, " chmod 711 eval"]
|
||||||
modifyIORef' ref $ M.insert name cnt
|
modifyMVar_ ref $ pure . M.insert name cnt
|
||||||
logInfoN $ mconcat ["Started container ", cvs cnt]
|
logInfoN $ mconcat ["Started container ", cvs cnt]
|
||||||
pure cnt
|
pure cnt
|
||||||
|
|
||||||
killContainer :: MonadWithIO m => Language -> MyriadT m ()
|
killContainer :: Language -> MyriadIO ()
|
||||||
killContainer lang = do
|
killContainer lang = do
|
||||||
ref <- asks containers
|
ref <- asks containers
|
||||||
containers <- readIORef ref
|
containers <- readMVar ref
|
||||||
case containers M.!? lang of
|
case containers M.!? lang of
|
||||||
Nothing -> pure ()
|
Nothing -> pure ()
|
||||||
Just cnt -> do
|
Just cnt -> do
|
||||||
modifyIORef' ref $ M.delete lang
|
modifyMVar_ ref $ pure . M.delete lang
|
||||||
let cmd = mconcat ["docker kill ", cnt]
|
let cmd = mconcat ["docker kill ", cnt]
|
||||||
runProcess_ . setStderr nullStream . setStdout nullStream $ shell cmd
|
runProcess_ . setStderr nullStream . setStdout nullStream $ shell cmd
|
||||||
logInfoN $ mconcat ["Killed container ", cvs cnt]
|
logInfoN $ mconcat ["Killed container ", cvs cnt]
|
||||||
|
|
||||||
killContainerMaybe :: MonadWithIO m => Language -> MyriadT m Bool
|
killContainerMaybe :: Language -> MyriadIO Bool
|
||||||
killContainerMaybe lang = do
|
killContainerMaybe lang = do
|
||||||
containers <- asks containers >>= readIORef
|
containers <- asks containers >>= readMVar
|
||||||
case containers M.!? lang of
|
case containers M.!? lang of
|
||||||
Nothing -> pure False
|
Nothing -> pure False
|
||||||
Just cnt -> do
|
Just cnt -> do
|
||||||
res :: Either SomeException () <- try $ killContainer lang
|
res <- try @_ @SomeException $ killContainer lang
|
||||||
case res of
|
case res of
|
||||||
Left err -> do
|
Left err -> do
|
||||||
logErrorN $ mconcat ["An exception occured when killing ", cvs cnt, ":\n", cvs $ show err]
|
logErrorN $ mconcat ["An exception occured when killing ", cvs cnt, ":\n", cvs $ show err]
|
||||||
pure False
|
pure False
|
||||||
Right _ -> pure True
|
Right _ -> pure True
|
||||||
|
|
||||||
killAllContainers :: MonadWithIO m => MyriadT m ()
|
killAllContainers :: MyriadIO ()
|
||||||
killAllContainers = do
|
killAllContainers = do
|
||||||
containers <- asks containers >>= readIORef
|
containers <- asks containers >>= readMVar
|
||||||
forConcurrently_ (M.keys containers) $ killContainer
|
forConcurrently_ (M.keys containers) $ killContainer
|
||||||
|
|
||||||
killAllContainersMaybe :: MonadWithIO m => MyriadT m [ContainerName]
|
killAllContainersMaybe :: MyriadIO [ContainerName]
|
||||||
killAllContainersMaybe = do
|
killAllContainersMaybe = do
|
||||||
containers <- asks containers >>= readIORef
|
containers <- asks containers >>= readMVar
|
||||||
xs <- forConcurrently (M.toList containers) \(k, v) -> (v,) <$> killContainerMaybe k
|
xs <- forConcurrently (M.toList containers) \(k, v) -> (v,) <$> killContainerMaybe k
|
||||||
pure . map fst $ filter snd xs
|
pure . map fst $ filter snd xs
|
||||||
|
|
||||||
evalCode :: MonadWithIO m => LanguageConfig -> Int -> String -> MyriadT m EvalResult
|
evalCode :: LanguageConfig -> Int -> String -> MyriadIO EvalResult
|
||||||
evalCode lang@LanguageConfig { name, timeout, retries } numRetries code = do
|
evalCode lang@LanguageConfig { name, timeout, retries } numRetries code = do
|
||||||
Env { containerSems, evalSems } <- ask
|
Env { containerSems, evalSems } <- ask
|
||||||
csem <- (M.! name) <$> readIORef containerSems
|
csem <- (M.! name) <$> readMVar containerSems
|
||||||
esem <- (M.! name) <$> readIORef evalSems
|
esem <- (M.! name) <$> readMVar evalSems
|
||||||
bracket_ (waitQSem esem) (signalQSem esem) $ do
|
bracket_ (waitQSem esem) (signalQSem esem) $ do
|
||||||
cnt <- bracket_ (waitQSem csem) (signalQSem csem) $ setupContainer lang
|
cnt <- bracket_ (waitQSem csem) (signalQSem csem) $ setupContainer lang
|
||||||
doneRef <- newIORef False -- For keeping track of if the evaluation is done, i.e. succeeded or timed out.
|
doneRef <- newMVar False -- For keeping track of if the evaluation is done, i.e. succeeded or timed out.
|
||||||
void . fork $ timer doneRef -- `race` could not have been used here since some evals can't be cancelled.
|
void . fork $ timer doneRef -- `race` could not have been used here since some evals can't be cancelled.
|
||||||
res <- try $ eval cnt
|
res <- try $ eval cnt
|
||||||
case res of
|
case res of
|
||||||
Left (SomeException err) -> do
|
Left (SomeException err) -> do
|
||||||
void $ killContainerMaybe name
|
void $ killContainerMaybe name
|
||||||
done <- readIORef doneRef
|
done <- readMVar doneRef
|
||||||
if done
|
if done
|
||||||
-- If we find the eval is done from an exception, then it was timed out.
|
-- If we find the eval is done from an exception, then it was timed out.
|
||||||
then do
|
then do
|
||||||
|
@ -152,25 +152,25 @@ evalCode lang@LanguageConfig { name, timeout, retries } numRetries code = do
|
||||||
pure EvalTimedOut
|
pure EvalTimedOut
|
||||||
-- Otherwise, the container was killed from another eval, so we should retry.
|
-- Otherwise, the container was killed from another eval, so we should retry.
|
||||||
else do
|
else do
|
||||||
writeIORef doneRef True
|
modifyMVar_ doneRef $ pure . const True
|
||||||
if numRetries < fromIntegral retries
|
if numRetries < fromIntegral retries
|
||||||
then evalCode lang (numRetries + 1) code
|
then evalCode lang (numRetries + 1) code
|
||||||
else do
|
else do
|
||||||
logErrorN $ mconcat ["An exception occured when evaluating in ", cvs cnt, ":\n", cvs $ show err]
|
logErrorN $ mconcat ["An exception occured when evaluating in ", cvs cnt, ":\n", cvs $ show err]
|
||||||
pure EvalErrored
|
pure EvalErrored
|
||||||
Right x -> do
|
Right x -> do
|
||||||
writeIORef doneRef True
|
modifyMVar_ doneRef $ pure . const True
|
||||||
pure x
|
pure x
|
||||||
where
|
where
|
||||||
timer :: MonadWithIO m => IORef Bool -> MyriadT m ()
|
timer :: MVar Bool -> MyriadIO ()
|
||||||
timer doneRef = do
|
timer doneRef = do
|
||||||
threadDelay $ fromIntegral timeout * 1000000
|
threadDelay $ fromIntegral timeout * 1000000
|
||||||
done <- readIORef doneRef
|
done <- readMVar doneRef
|
||||||
unless_ done do
|
unless_ done do
|
||||||
writeIORef doneRef True
|
modifyMVar_ doneRef $ pure . const True
|
||||||
killContainerMaybe name
|
killContainerMaybe name
|
||||||
|
|
||||||
eval :: MonadWithIO m => ContainerName -> MyriadT m EvalResult
|
eval :: ContainerName -> MyriadIO EvalResult
|
||||||
eval cnt = do
|
eval cnt = do
|
||||||
logInfoN $ mconcat ["Running code in container ", cvs cnt, ":\n", cvs code]
|
logInfoN $ mconcat ["Running code in container ", cvs cnt, ":\n", cvs code]
|
||||||
snowflakeGen <- asks snowflakeGen
|
snowflakeGen <- asks snowflakeGen
|
||||||
|
|
|
@ -15,7 +15,7 @@ import qualified Data.Text as T
|
||||||
import GHC.Generics
|
import GHC.Generics
|
||||||
|
|
||||||
import Control.Concurrent.Async.Lifted
|
import Control.Concurrent.Async.Lifted
|
||||||
import Data.IORef.Lifted
|
import Control.Concurrent.MVar.Lifted
|
||||||
import Servant
|
import Servant
|
||||||
|
|
||||||
import Myriad.Core
|
import Myriad.Core
|
||||||
|
@ -62,7 +62,7 @@ serverT = handleLanguages :<|> handleEval :<|> handleContainers :<|> handleClean
|
||||||
handleContainers :: MyriadT m [T.Text]
|
handleContainers :: MyriadT m [T.Text]
|
||||||
handleContainers = do
|
handleContainers = do
|
||||||
logInfoN $ mconcat ["GET /containers"]
|
logInfoN $ mconcat ["GET /containers"]
|
||||||
containers <- asks containers >>= readIORef
|
containers <- asks containers >>= readMVar
|
||||||
pure . map cvs $ M.elems containers
|
pure . map cvs $ M.elems containers
|
||||||
|
|
||||||
handleCleanup :: MyriadT m [T.Text]
|
handleCleanup :: MyriadT m [T.Text]
|
||||||
|
|
Loading…
Reference in a new issue