Common Concurrency Abstractions in Haskell, MVar

Posted in category haskell on 2015-07-31

Table of Contents

Haskell has many concurrency abstractions built into the base library as well as lots more in the form of libraries. This series of blog posts continueus with synchronized mutable variable, MVar.

Synchronized mutable variable, MVar

Official Control.Concurrent.MVar module documentation can be found here.

MVar is intermediate level blocking sycnhronization mechanism that allows threads to communicate with each other. Concept of synchronizing variable is similar to a box for just one single element - it is either full or empty. Therefore, logic for MVar variable is very simple:

MVar implementation is fair in such a way that it blocks threads in a FIFO queue which provides some guarantees that as long as values are repeatedly replaced in a variable, all threads are gettings eventually unblocked.

Overhead in using MVar is typically higher than with IORef. Most obvious reason is that IORef is non-blocking mechanism that utilizes compare-and-swap low level atomic processor instruction, whether MVar is a blocking mechanism that only lets single executing thread inside critical section at a time. Another reason is that MVar allows to implement any logic inside a critical section that may contain side effects. This possess another risk – locking your little transaction for too long which is not desirable in most cases.

MVar is used with the following set of key functions:

-- | Creates a new variable holding a value.
newMVar :: a -> IO (MVar a)

-- | Creates a new variable holding no value.
newEmptyMVar :: IO (MVar a)

-- | Gets value out of variable. Blocks if variable is yet empty.
takeMVar :: MVar a -> IO a

-- | Puts value into a variable. Blocks if variable is yet full.
putMVar :: a -> MVar a -> IO ()

-- | Atomically reads value from a variable. Blocks if variable is yet empty.
readMVar :: MVar a -> IO a

-- | Non-blocking version of readMVar.
tryReadMVar :: MVar a -> IO (Maybe a)

-- | Exception-safe way to modify the value of a variable.
-- This is a safe combination of takeMVar and putMVar.
modifyMVar :: MVar a -> (a -> IO (a, b)) -> IO b

-- | Exception-safe way to execute critical section on a value.
withMVar :: MVar a -> (a -> IO b) -> IO b

Last two exception-safe versions of functions make sure that in case of any exception in a critical section, putMVar is always called making sure that lock is released and other threads waiting for it will not stall.

Typical cross-thread communication scenarios with using MVar

MVar can be used in many different way. Here are some typical cross-thread communication scenarios:

Round-robin request handling

MVar can be used for handling requests in a round-robin fashion - next request is handled by first vacant handler. Please consider following example:

module Main where

import Control.Concurrent
import Control.Monad

main :: IO ()
main = do
  x <- newEmptyMVar :: IO (MVar Int)
  void $ forkIO $ forever $ do -- request handler #1
    val <- takeMVar x -- blocks until value is received
    putStrLn $ "Received value (Thread #1): " ++ show val
  void $ forkIO $ forever $ do -- request handler #2
    val <- takeMVar x -- blocks until value is received
    putStrLn $ "Received value (Thread #2): " ++ show val
  mapM_ (putMVar x) [0..9]
  threadDelay 100000

Notice that threadDelay 100000 is used after setting all values? It is there to ensure that forked thread will have a chance to complete its job before whole application will be stopped, since Haskell runtime will terminate all background threads upon the exit of the main thread including threads that have still something to do.

This program will produce following trace:

# Received value (Thread #1): 0
# Received value (Thread #2): 1
# Received value (Thread #1): 2
# Received value (Thread #2): 3
# Received value (Thread #1): 4
# Received value (Thread #2): 5
# Received value (Thread #1): 6
# Received value (Thread #2): 7
# Received value (Thread #1): 8
# Received value (Thread #2): 9

As you can see each other request is handled by next vacant handler making them alternating.

Fan-in request handling

MVar can easily be used for creating simple one-way channels when logging thread(s) put something in MVar while one other persisting thread receives it on the other side and then persists it in one way or another. A very straight forward example for such communication could be simple logger which exposes MVar while other threads write into it.

Consider following example:

module Main where

import Control.Concurrent
import Control.Monad

main :: IO ()
main = do
  x <- startLogger
  logInfo x "Application started"
  logInfo x "Application is doing something useful"
  void $ forkIO $ do -- start processor #1
    logInfo x "Processor #1: starting"
    logInfo x "Processor #1: doing something interesting"
    logInfo x "Processor #1: stopping"
  void $ forkIO $ do -- start processor #2
    logInfo x "Processor #2: starting"
    logInfo x "Processor #2: doing something interesting"
    logInfo x "Processor #2: stopping"
  threadDelay 100000
  logInfo x "Application stopped"

startLogger :: IO (MVar String)
startLogger = do
  x <- newEmptyMVar
  void $ forkIO $ forever $ do -- start logger service
    line <- takeMVar x
    putStrLn line
  return x

logInfo :: MVar String -> String -> IO ()
logInfo = putMVar

This program emits following trace:

# Application started
# Application is doing something useful
# Processor #1: starting
# Processor #2: starting
# Processor #1: doing something interesting
# Processor #2: doing something interesting
# Processor #1: stopping
# Processor #2: stopping
# Application stopped

Log entries are mixed from different threads as they keep doing something.

Ensuring boundaries of data (shared state container)

MVar ensures boundaries/consistency of enclosed data. Consider following example where there is a bank account with certain amount of money on it, and how multiple threads are all trying to withraw some money from it:

module Main where

import Control.Concurrent
import Control.Monad

type Amount = Int
type Account = MVar Amount

main :: IO ()
main = do
  account <- newAccount 42
  void $ forkIO $ void $ withdraw account 22
  void $ forkIO $ void $ withdraw account 22
  void $ forkIO $ void $ withdraw account 21
  void $ forkIO $ void $ withdraw account 20
  threadDelay 100000

newAccount :: Amount -> IO Account
newAccount = newMVar

withdraw :: Account -> Amount -> IO Bool
withdraw account amount = modifyMVar account $ \ currentAmount -> do
  if currentAmount < amount
    then do
      putStrLn $ show currentAmount ++ "-" ++ show amount ++ " = not enough money"
      return (currentAmount, False)
    else do
      let netAmount = currentAmount - amount
      putStrLn $ show currentAmount ++ "-" ++ show amount ++ " = " ++ show netAmount ++ " left"
      return (netAmount, True)

This is what the program emits:

# 42-22 = 20 left
# 20-22 = not enough money
# 20-21 = not enough money
# 20-20 = 0 left

As you can see consistency of a bank account is ensured.

Ensuring boundaries of a complex critical section

It is rather typical to use MVar as a lock around critical sections in the code. Taking a value from MVar variable typically means acquiring the lock (starting a critical section) and putting changed value back into MVar variable means releasing the lock (concluding a critical section) subsequently allowing next signle thread in the queue to acquire the lock.

Now let’s take previous example and introduce another bank account. Our goal is to ensure consistency across all bank accounts this time.

module Main where

import Control.Concurrent
import Control.Monad

type Amount = Int
data Account = Account String (MVar Amount)

main :: IO ()
main = do
  lock <- newEmptyMVar :: IO (MVar ())
  account1 <- newAccount "account #1" 42
  account2 <- newAccount "account #2" 48
  void $ forkIO $ do
    takeMVar lock >> putStrLn "[transaction #1] - starting"
    void $ withdraw account1 22
    void $ withdraw account2 22
    putStrLn "[transaction #1] - done" >> putMVar lock ()
  void $ forkIO $ do
    takeMVar lock >> putStrLn "[transaction #2] - starting"
    void $ withdraw account1 22
    void $ withdraw account2 22
    putStrLn "[transaction #2] - done" >> putMVar lock ()
  putMVar lock () -- fire off all transactions
  threadDelay 100000

newAccount :: String -> Amount -> IO Account
newAccount name amount = do
  account <- newMVar amount
  return $ Account name account

withdraw :: Account -> Amount -> IO Bool
withdraw (Account name account) amount = modifyMVar account $ \ currentAmount -> do
  if currentAmount < amount
    then do
      log name $ show currentAmount ++ "-" ++ show amount ++ " = not enough money"
      return (currentAmount, False)
    else do
      let netAmount = currentAmount - amount
      log name $ show currentAmount ++ "-" ++ show amount ++ " = " ++ show netAmount ++ " left"
      return (netAmount, True)
  where
    log :: String -> String -> IO ()
    log accountName line = putStrLn $ accountName ++ " :: " ++ line

This program emits following trace:

# [transaction #1] - starting
# account #1 :: 42-22 = 20 left
# account #2 :: 48-22 = 26 left
# [transaction #1] - done
# [transaction #2] - starting
# account #1 :: 20-22 = not enough money
# account #2 :: 26-22 = 4 left
# [transaction #2] - done

Notice, how different transactions are not overlapping each other ensuring consistency throughout multiple MVar instance and not only within individual MVar instances.

Pitfalls of using low-level concurrency abstractions

When choosing MVar, it is necessary to be mindful about certain pitfalls it is easily possible to get into.

Deadlocks

Deadlock is a situation in which several actions are waiting for each other to finish, and thus neither ever does.

In general any blocking concurrency abstraction that has separate operations for acquiring and releasing locks is susceptible to deadlocks. This is exactly what we have with MVar - takeMVar for acquiring a lock and putMVar for release it.

Consider the following example with two MVar variables and two different threads waiting until another one finishes:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
module Main where

import Control.Concurrent
import Control.Monad

main :: IO ()
main = do
  a <- newMVar ()
  b <- newMVar ()
  void $ forkIO $ do -- start thread #1
    void $ takeMVar a
    putStrLn "Thread #1: started"
    threadDelay 100000
    void $ takeMVar b >> putMVar b () -- waiting for Thread #2 to complete
    void $ putMVar a ()
    putStrLn "Thread #1: completed"
  void $ forkIO $ do -- start thread #2
    void $ takeMVar b
    putStrLn "Thread #2: started"
    threadDelay 100000
    void $ takeMVar a >> putMVar a () -- waiting for Thread #1 to complete
    void $ putMVar b ()
    putStrLn "Thread #2: completed"
  threadDelay 1000000 -- wait a bit
  result <- tryReadMVar a >> tryReadMVar b
  case result of
    Just _ -> putStrLn "Ok"
    Nothing -> putStrLn "Deadlocked!"

Here is the log of execution of the above program:

# Thread #1: started
# Thread #2: started
# Deadlocked!

Just to prove that the program works fine without waiting for another action to finish, comment out lines 14 and 21. The program looks like following now:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
module Main where

import Control.Concurrent
import Control.Monad

main :: IO ()
main = do
  a <- newMVar ()
  b <- newMVar ()
  void $ forkIO $ do -- start thread #1
    void $ takeMVar a
    putStrLn "Thread #1: started"
    threadDelay 100000
    -- void $ takeMVar b >> putMVar b () -- waiting for Thread #2 to complete
    void $ putMVar a ()
    putStrLn "Thread #1: completed"
  void $ forkIO $ do -- start thread #2
    void $ takeMVar b
    putStrLn "Thread #2: started"
    threadDelay 100000
    -- void $ takeMVar a >> putMVar a () -- waiting for Thread #1 to complete
    void $ putMVar b ()
    putStrLn "Thread #2: completed"
  threadDelay 1000000 -- wait a bit
  result <- tryReadMVar a >> tryReadMVar b
  case result of
    Just _ -> putStrLn "Ok"
    Nothing -> putStrLn "Deadlocked!"

Then execution log will look way better:

# Thread #1: started
# Thread #2: started
# Thread #2: completed
# Thread #1: completed
# Ok

This was probably simplest deadlock scenario with using MVar. There are many more other ways to deadlock your pragrams most of which are not as easy to resolve as in the above example, so be careful.

Resource management

Due to the fact that acquiring and releasing a lock are two distinct operations might lead to another kind of problem – reliable resource management. Consider the following program:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
module Main where

import Control.Concurrent
import Control.Monad

main :: IO ()
main = do
  a <- newMVar ()
  -- start thread #1
  void $ forkIO $ do
    void $ takeMVar a
    putStrLn "Thread #1: started"
    void $ fail "Unexpected reason ..." -- <-- unexpected failure
    void $ putMVar a ()                 -- <-- release the lock (never happens)
    putStrLn "Thread #1: completed"
  threadDelay 500000
  -- start thread #2
  void $ forkIO $ do
    void $ takeMVar a                   -- <-- waits for Thread #1
    putStrLn "Thread #2: started"
    -- doing something useful
    void $ putMVar a ()
    putStrLn "Thread #2: completed"
  -- wait a bit
  threadDelay 1000000
  result <- tryReadMVar a
  case result of
    Just _ -> putStrLn "Ok"
    Nothing -> putStrLn "Deadlocked!"

Between 12~13 lines is the critical section for Thread #1 which in this case fails. This leads to the situation when the lock is never released (line 14). As a result no other thread can acquire it again – somewhat a deadlock.

In order to avoid these kinds of errors one could either fallback to using exception-safe functions like withMVar or implement try-finally pattern manually. Here is how withMVar function is implemented in the base package:

withMVar :: MVar a -> (a -> IO b) -> IO b
withMVar m io =
  mask $ \restore -> do
    a <- takeMVar m
    b <- restore (io a) `onException` putMVar m a
    putMVar m a
    return b

Aside from all the interesting functions (like mask, onException) this code does a very simple thing - it makes sure that in case of any unforeseen error putMVar will anyway be called, thus a lock will be released in any case. So, let’s see how our previous example would look like with using withMVar instead of using explicit pair of takeMVar and putMVar:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
module Main where

import Control.Concurrent
import Control.Monad

main :: IO ()
main = do
  a <- newMVar ()
  void $ forkIO $ void $ withMVar a $ \_ -> do -- start thread #1
    putStrLn "Thread #1: started"
    void $ fail "Unexpected reason ..." -- <-- unexpected failure
    putStrLn "Thread #1: completed"
  threadDelay 500000
  void $ forkIO $ void $ withMVar a $ \_ -> do -- start thread #2
    putStrLn "Thread #2: started"
    -- doing something useful
    putStrLn "Thread #2: completed"
  threadDelay 1000000 -- wait a bit
  result <- tryReadMVar a
  case result of
    Just _ -> putStrLn "Ok"
    Nothing -> putStrLn "Deadlocked!"

This program will run with the following trace, demonstrating that this time any unforeseen errors are handled correctly with regards to releasing a shared resource (lock in this case):

# Thread #1: started
# Program.hs: user error (Unexpected reason ...)
# Thread #2: started
# Thread #2: completed
# Ok

References

  1. “Concurrent and Multicore Programming” chapter in Real World Haskell Book
  2. “Basic Concurrency: Threads and MVars” chapter in the exceptional Simon Marlow’s book, “Parallel and Concurrent Programming in Haskell”

“Common Concurrency Abstractions in Haskell” Series