Paul Khuong mostly on Lisp

rss feed

Sat, 04 Dec 2010

 

Concurrency with MVars

[Updated twice to improve readability on planet lisp, and again to remove some useless package qualifiers.]

I originally intended to remind users of condition variables (waitqueues) in SBCL that condition-wait can return spuriously. In other words, you’re likely misusing condition variables if your program isn’t correct with (basically) the following definition of condition-wait:

(defun condition-wait (queue mutex) 
  (declare (ignore queue)) 
  (sb-thread:release-mutex mutex) 
  ;; sleep and/or assume a fair mutex 
  (sb-thread:get-mutex mutex))

One way to almost always use condition variables correctly is to insert calls to condition-wait in a busy-polling loop. I was going to code up a rough example of how to do that for coroutines, but it seemed better to exploit MVars instead.

1 MVars

(The code for this section is available at http://discontinuity.info/˜pkhuong/mvar.lisp.)

The GHC docs describe MVars as “(pronounced ”em-var”) is a synchronising variable, used for communication between concurrent threads. It can be thought of as a a [sic] box, which may be empty or full.” Another way to see them is as bounded (one-element) message queues.

To use MVars, we need to be able to create one (make), consume a value (take) and put one in the queue (put). It’s also useful to expose a type (mvar) and a type test (mvar-p).

(defpackage "MVAR" 
    (:use "CL" "SB-THREAD") 
  (:export "MVAR" "MVAR-P" "MAKE" "VALUE" "TAKE" "PUT")) 
 
(in-package "MVAR")

To implement an MVar, we obviously need some way to denote emptyness, a mutable box, and a mutex to protect against concurrent accesses.

(defconstant +empty+ ’+empty+) 
 
(defstruct (mvar 
             (:constructor %make-mvar)) 
  (mutex      (make-mutex)     :type mutex 
   :read-only t) 
  ;; signaled when reads are possible 
  (read-cvar  (make-waitqueue) :type waitqueue 
   :read-only t) 
  ;; signaled when writes are possible 
  (write-cvar (make-waitqueue) :type waitqueue 
   :read-only t) 
  (value  +empty+))

(defun make (&optional (value +empty+)) 
  (%make-mvar :value value))

The two condition variables (waitqueues) are used to reduce busy-looping. It would also be possible to only have a single condition variable for both reads and writes, but that would result in even more spurious wake-ups. Instead, the code can use condition-notify to only wake a single waiter at a time.

(defun take (mvar) 
  (declare (type mvar mvar)) 
  (let ((mutex (mvar-mutex mvar)) 
        (cvar  (mvar-read-cvar mvar))) 
    (with-mutex (mutex) 
      (loop for value = (mvar-value mvar) 
            do (cond ((eq value +empty+) 
                      (condition-wait cvar mutex)) 
                     (t 
                      (setf (mvar-value mvar) +empty+) 
                      (condition-notify (mvar-write-cvar mvar)) 
                      (return value)))))))

(defun put (mvar new-value) 
  (declare (type mvar mvar)) 
  (assert (not (eq new-value +empty+))) 
  (let ((mutex (mvar-mutex mvar)) 
        (cvar  (mvar-write-cvar mvar))) 
    (with-mutex (mutex) 
      (loop for value = (mvar-value mvar) 
            do (cond ((eq value +empty+) 
                      (setf (mvar-value mvar) new-value) 
                      (condition-notify (mvar-read-cvar mvar)) 
                      (return new-value)) 
                     (t 
                      (condition-wait cvar mutex)))))))

Finally, tiny setf wrappers never hurt:

(declaim (inline value (setf value))) 
(defun value (mvar) 
  (take mvar)) 
 
(defun (setf value) (value mvar) 
  (put mvar value))

2 Implementing coroutines with MVars

(The code for this section is available at http://discontinuity.info/˜pkhuong/coroutine.lisp.)

Coroutines are like functions, except that they allow multiple returns and (re-) entries. Users should be able to create coroutines (coroutine), yield values from coroutines, and grab the next values from a coroutine.

(defpackage "COROUTINE" 
    (:use "CL" "SB-THREAD" "SB-EXT") 
  (:export "COROUTINE" "YIELD" "NEXT" "+DEAD+")) 
 
(in-package "COROUTINE")

To implement that, coroutines only need a thread and two mvar, one for arguments and another for return values:

(defstruct (coroutine 
             (:constructor %make-coroutine (thread in out))) 
  (thread nil :type thread    :read-only t) 
  (in     nil :type mvar:mvar :read-only t) 
  (out    nil :type mvar:mvar :read-only t))

next simply has to put fresh argument values, and take return values:

(defun next (coroutine &rest values) 
  (mvar:put (coroutine-in coroutine) values) 
  (values-list (mvar:take (coroutine-out coroutine))))

yield shouldn’t be used outside coroutines, so it’s defined as a stub and a compiler macro:

(defun yield (&rest values) 
  (declare (ignore values)) 
  (error "~S used outside ~S" ’yield ’coroutine)) 
 
(define-compiler-macro yield (&whole whole &rest values) 
  (declare (ignore values)) 
  (warn "~S used outside ~S" ’yield ’coroutine) 
  whole)

Finally, coroutines are just threads with a local yield function.

(defconstant +dead+ ’+dead+) 
 
(defmacro coroutine (&body body) 
  (let ((_in    (gensym "IN")) 
        (_out   (gensym "OUT")) 
        (_block (gensym "BLOCK"))) 
    ‘(make-coroutine 
      (lambda (,_in ,_out) 
        "IN is the input MVAR and OUT the output MVAR." 
        (lambda () 
          (block ,_block 
            (flet ((yield (&rest values) 
                     (mvar:put ,_out values) 
                     (let ((in (mvar:take ,_in))) 
                       (when (eq in +dead+) 
                         (return-from ,_block)) 
                       (values-list in)))) 
              ;; signal that initialization is complete 
              (yield) 
              (locally 
                  ,@body))) 
          (mvar:put ,_out (list +dead+))))))) 
 
(defun make-coroutine (builder) 
  (let* ((in     (mvar:make)) 
         (out    (mvar:make)) 
         (thread (make-thread (funcall builder in out))) 
         (coroutine (%make-coroutine thread in out))) 
    ;; the coroutine thread and the finalizer don’t hold references 
    ;; to the coroutine struct, so finalize isn’t useless. 
    (finalize coroutine 
              (lambda () 
                (mvar:put in +dead+) 
                (join-thread thread))) 
    ;; return the coroutine and the first yielded values 
    (multiple-value-call #’values 
      coroutine 
      (values-list (mvar:take out)))))

3 Same-fringe with coroutines

A classic toy application of coroutines (or, actually generators, since information only flows out of coroutines) is the same fringe problem. We can implement that by first enumerating the leaves of a cons tree:

(defun leaves (tree) 
  (coroutine 
    (labels ((walk (tree) 
               (cond ((consp tree) 
                      (walk (car tree)) 
                      (walk (cdr tree))) 
                     (t 
                      (yield tree))))) 
      (walk tree))))

Then, we only have to read the leaves from the input trees:

(defun same-fringe (tree1 tree2) 
  (loop with leaves1 = (leaves tree1) 
        with leaves2 = (leaves tree2) 
        for leaf1 = (next leaves1) 
        for leaf2 = (next leaves2) 
        do 
     (cond ((and (eq leaf1 +dead+) 
                 (eq leaf2 +dead+)) 
            (return t)) 
           ((not (eql leaf1 leaf2)) 
            (return nil)))))

posted at: 17:22 | /Lisp | permalink

Made with PyBlosxom Contact me by email: pvk@pvk.ca.