Request for comments about synchronized queue using boost

N

Nordlöw

I am currently designing a synchronized queue used to communicate
between threads. Is the code given below a good solution? Am I
using mutex lock/unlock more than needed?

Are there any resources out there on the Internet on how to design
*thread-safe* *efficient* data-
structures?

/Nordlöw

The file synched_queue.hpp follows:

#ifndef PNW__SYNCHED_QUEUE_HPP
#define PNW__SYNCHED_QUEUE_HPP

/*!
* @file synched_queue.hpp
* @brief Synchronized (Thread Safe) Container Wrapper on std:queue
* using Boost::Thread.
*/

#include <queue>
#include <iostream>

#include <boost/bind.hpp>
#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>

//
============================================================================

template <typename T>
class synched_queue
{
std::queue<T> q; ///< Queue.
boost::mutex m; ///< Mutex.
public:
/*!
* Push @p value.
*/
void push(const T & value) {
boost::mutex::scoped_lock sl(m); // NOTE: lock mutex
q.push(value);
}

/*!
* Try and pop into @p value, returning directly in any case.
* @return true if pop was success, false otherwise.
*/
bool try_pop(T & value) {
boost::mutex::scoped_lock sl(m); // NOTE: lock mutex
if (q.size()) {
value = q.front();
q.pop();
return true;
}
return false;
}

/// Pop and return value, possibly waiting forever.
T wait_pop() {
boost::mutex::scoped_lock sl(m); // NOTE: lock mutex
// wait until queue has at least on element()
c.wait(sl, boost::bind(&std::queue<T>::size, q));
T value = q.front();
q.pop();
return value;
}

size_type size() const {
boost::mutex::scoped_lock sl(m); // NOTE: lock mutex
return q.size();
}

bool empty() const {
boost::mutex::scoped_lock sl(m); // NOTE: lock mutex
return q.empty();
}

};

//
============================================================================

#endif
 
M

Maxim Yegorushkin

I am currently designing a synchronized queue used to communicate
between threads. Is the code given below a good solution? Am I
using mutex lock/unlock more than needed?
/Nordlöw

The file synched_queue.hpp follows:

#ifndef PNW__SYNCHED_QUEUE_HPP
#define PNW__SYNCHED_QUEUE_HPP

/*!
 * @file synched_queue.hpp
 * @brief Synchronized (Thread Safe) Container Wrapper on std:queue
 *        using Boost::Thread.
 */

#include <queue>
#include <iostream>

#include <boost/bind.hpp>
#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>

//
============================================================================

template <typename T>
class synched_queue
{
    std::queue<T> q;              ///< Queue.
    boost::mutex m;             ///< Mutex.

A member variable is missing here:

boost::condition c;
public:
    /*!
     * Push @p value.
     */
    void push(const T & value) {
        boost::mutex::scoped_lock sl(m); // NOTE: lock mutex
        q.push(value);

You need to notify other threads waiting on the queue:

c.notify_one();
    }

    /*!
     * Try and pop into @p value, returning directly in any case.
     * @return true if pop was success, false otherwise.
     */
    bool try_pop(T & value) {
        boost::mutex::scoped_lock sl(m); // NOTE: lock mutex
        if (q.size()) {
            value = q.front();
            q.pop();
            return true;
        }
        return false;
    }

    /// Pop and return value, possibly waiting forever.
    T wait_pop() {
        boost::mutex::scoped_lock sl(m); // NOTE: lock mutex
        // wait until queue has at least on element()

The following line:
        c.wait(sl, boost::bind(&std::queue<T>::size, q));

boost::bind(&std::queue<T>::size, q) stores a copy of the queue in the
object created by boost::bind, so that the wait never finishes if the
queue is empty (and if the condition variable is not notified (see
above)).

It should be as simple as:

while(q.empty())
c.wait(sl);
        T value = q.front();
        q.pop();
        return value;
    }

    size_type size() const {
        boost::mutex::scoped_lock sl(m); // NOTE: lock mutex
        return q.size();
    }

    bool empty() const {
        boost::mutex::scoped_lock sl(m); // NOTE: lock mutex
        return q.empty();
    }

};

//
============================================================================

#endif


The other thing is that the queue does not support destruction: the
destructor does not unblock any threads blocked in wait.

Apart from that, the mutex is held for too long. You don't really need
to hold the lock when allocating memory for elements and when invoking
the copy constructor of the elements.

Here is an improved version (although a bit simplified):

#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>
#include <boost/function.hpp>
#include <list>

template<class T>
class atomic_queue : private boost::noncopyable
{
private:
boost::mutex mtx_;
boost::condition cnd_;
bool cancel_;
unsigned waiting_;

// use list as a queue because it allows for splicing:
// moving elements between lists without any memory allocation and
copying
typedef std::list<T> queue_type;
queue_type q_;

public:
struct cancelled : std::logic_error
{
cancelled() : std::logic_error("cancelled") {}
};

atomic_queue()
: cancel_()
, waiting_()
{}

~atomic_queue()
{
// cancel all waiting threads
this->cancel();
}

void cancel()
{
// cancel all waiting threads
boost::mutex::scoped_lock l(mtx_);
cancel_ = true;
cnd_.notify_all();
// and wait till they are done
while(waiting_)
cnd_.wait(l);
}

void push(T const& t)
{
// this avoids an allocation inside the critical section
bellow
queue_type tmp(&t, &t + 1);
{
boost::mutex::scoped_lock l(mtx_);
q_.splice(q_.end(), tmp);
}
cnd_.notify_one();
}

// this function provides only basic exception safety if T's copy
ctor can
// throw or strong exception safety if T's copy ctor is nothrow
T pop()
{
// this avoids copying T inside the critical section bellow
queue_type tmp;
{
boost::mutex::scoped_lock l(mtx_);
++waiting_;
while(!cancel_ && q_.empty())
cnd_.wait(l);
--waiting_;
if(cancel_)
{
cnd_.notify_all();
throw cancelled();
}
tmp.splice(tmp.end(), q_, q_.begin());
}
return tmp.front();
}
};

typedef boost::function<void()> unit_of_work;
typedef atomic_queue<unit_of_work> work_queue;

void typical_thread_pool_working_thread(work_queue* q)
try
{
for(;;)
q->pop()();
}
catch(work_queue::cancelled&)
{
// time to terminate the thread
}
Are there any resources out there on the Internet on how to design
*thread-safe* *efficient* data-structures?

I would recommend "Programming with POSIX Threads" book by by David R.
Butenhof.
 
T

Thomas J. Gritzan

Hendrik said:
Nordlöw said:
I am currently designing a synchronized queue used to communicate
between threads. Is the code given below a good solution? Am I
using mutex lock/unlock more than needed?

Are there any resources out there on the Internet on how to design
*thread-safe* *efficient* data-
structures?
comp.programming.threads?

/Nordlöw

[...]

/// Pop and return value, possibly waiting forever.
T wait_pop() {
boost::mutex::scoped_lock sl(m); // NOTE: lock mutex
// wait until queue has at least on element()
c.wait(sl, boost::bind(&std::queue<T>::size, q));
T value = q.front();
q.pop();
return value;
}

I haven't done any threading in a decade or so, but I wonder how
in the above code anything could be put into the locked queue.
What am I missing?
Oh, and I wonder what 'c' is.

c is a condition variable:
http://www.boost.org/doc/libs/1_36_...ation.html#thread.synchronization.condvar_ref

You lock the mutex, then wait for a condition, which (automatically)
unlocks the mutex, and locks it again if the condition occurs.
 
N

Nordlöw

A member variable is missing here:

    boost::condition c;


You need to notify other threads waiting on the queue:

    c.notify_one();






The following line:


boost::bind(&std::queue<T>::size, q) stores a copy of the queue in the
object created by boost::bind, so that the wait never finishes if the
queue is empty (and if the condition variable is not notified (see
above)).

It should be as simple as:

    while(q.empty())
        c.wait(sl);






The other thing is that the queue does not support destruction: the
destructor does not unblock any threads blocked in wait.

Apart from that, the mutex is held for too long. You don't really need
to hold the lock when allocating memory for elements and when invoking
the copy constructor of the elements.

Here is an improved version (although a bit simplified):

#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>
#include <boost/function.hpp>
#include <list>

template<class T>
class atomic_queue : private boost::noncopyable
{
private:
    boost::mutex mtx_;
    boost::condition cnd_;
    bool cancel_;
    unsigned waiting_;

    // use list as a queue because it allows for splicing:
    // moving elements between lists without any memory allocation and
copying
    typedef std::list<T> queue_type;
    queue_type q_;

public:
    struct cancelled : std::logic_error
    {
        cancelled() : std::logic_error("cancelled") {}
    };

    atomic_queue()
        : cancel_()
        , waiting_()
    {}

    ~atomic_queue()
    {
        // cancel all waiting threads
        this->cancel();
    }

    void cancel()
    {
        // cancel all waiting threads
        boost::mutex::scoped_lock l(mtx_);
        cancel_ = true;
        cnd_.notify_all();
        // and wait till they are done
        while(waiting_)
            cnd_.wait(l);
    }

    void push(T const& t)
    {
        // this avoids an allocation inside the critical section
bellow
        queue_type tmp(&t, &t + 1);
        {
            boost::mutex::scoped_lock l(mtx_);
            q_.splice(q_.end(), tmp);
        }
        cnd_.notify_one();
    }

    // this function provides only basic exception safety if T's copy
ctor can
    // throw or strong exception safety if T's copy ctor is nothrow
    T pop()
    {
        // this avoids copying T inside the critical section bellow
        queue_type tmp;
        {
            boost::mutex::scoped_lock l(mtx_);
            ++waiting_;
            while(!cancel_ && q_.empty())
                cnd_.wait(l);
            --waiting_;
            if(cancel_)
            {
                cnd_.notify_all();
                throw cancelled();
            }
            tmp.splice(tmp.end(), q_, q_.begin());
        }
        return tmp.front();
    }

};

typedef boost::function<void()> unit_of_work;
typedef atomic_queue<unit_of_work> work_queue;

void typical_thread_pool_working_thread(work_queue* q)
try
{
    for(;;)
        q->pop()();}

catch(work_queue::cancelled&)
{
    // time to terminate the thread

}

I would recommend "Programming with POSIX Threads" book by by David R.
Butenhof.


Doesn't the push-argument "T const & t" instead of my version "const T
& t" mean that we don't copy at all here? I believe &t evaluates to
the memory pointer of t:

void push(T const& t)
{
// this avoids an allocation inside the critical section
bellow
queue_type tmp(&t, &t + 1);
{
boost::mutex::scoped_lock l(mtx_);
q_.splice(q_.end(), tmp);
}
cnd_.notify_one();
}

/Nordlöw
 
N

Nordlöw

Thomas said:
[...]
 I haven't done any threading in a decade or so, but I wonder how
 in the above code anything could be put into the locked queue.
 What am I missing?
 Oh, and I wonder what 'c' is.
You lock the mutex, then wait for a condition, which (automatically)
unlocks the mutex, and locks it again if the condition occurs.

  Ah, thanks. I haven't looked at boost's threads yet.

  Schobi

How can I your queue structure in the following code example:


#include "../synched_queue.hpp"
#include "../threadpool/include/threadpool.hpp"
#include <iostream>

using namespace boost::threadpool;

template <typename T>
void produce(synched_queue<T> & q, size_t n)
{
for (size_t i = 0; i < n; i++) {
T x = i;
q.push(x);
std::cout << "i:" << i << " produced: " << x << std::endl;
}
}

template <typename T>
void consume(synched_queue<T> & q, size_t n)
{
for (size_t i = 0; i < n; i++) {
T x = q.wait_pop();
std::cout << "i:" << i << " consumed: " << x << std::endl;
}
}

int main()
{
typedef float Elm;
synched_queue<float> q;
// boost::thread pt(boost::bind(produce<Elm>, q, 10));
// boost::thread ct(boost::bind(consume<Elm>, q, 10));
// pt.join();
// ct.join();
return 0;
}


Thanks in advance,
/Nordlöw
 
M

Maxim Yegorushkin

Doesn't the push-argument "T const & t" instead of my version "const T
& t" mean that we don't copy at all here?

No, T const& and const T& is the same thing: a reference to a constant
T.
I believe &t evaluates to
the memory pointer of t:

    void push(T const& t)
    {
        // this avoids an allocation inside the critical section
bellow
        queue_type tmp(&t, &t + 1);
        {
            boost::mutex::scoped_lock l(mtx_);
            q_.splice(q_.end(), tmp);
        }
        cnd_.notify_one();
    }

The trick here is that element t is first inserted in a temporary list
tmp on the stack.

queue_type tmp(&t, &t + 1); // create a list with a copy of t

This involves allocating memory and copying t. And here it is done
without holding the lock because allocating memory may be expensive
(might cause the system to do swapping) and as you hold the lock all
the worker threads won't be able to pop elements from the queue during
such time. Next, the lock is acquired and the element is moved from
list tmp into q_:

q_.splice(q_.end(), tmp);

This operation does not involve any memory allocation or copying
elements (because you can do so easily with the nodes of doubly-linked
lists), which make your critical section of code execute really fast
without stalling the worked threads for too long.
 
M

Maxim Yegorushkin

I am currently designing a synchronized queue used to communicate
between threads. Is the code given below a good solution? Am I
using mutex lock/unlock more than needed?

Are there any resources out there on the Internet on how to design
*thread-safe* *efficient* data-
structures?

You can also try concurrent_queue from
http://www.threadingbuildingblocks.org/codesamples.php#concurrent_queue

Scout around that link for more documentation.
 
J

James Kanze

[...]
Are there any resources out there on the Internet on how to
design *thread-safe* *efficient* data- structures?

You have to be very careful with googling in cases like this.
There's an awful lot of junk on the net. Just looking at the
first hit, for example, it's quite clear that the author doesn't
know what he's talking about, and I suspect that that's true in
a large number of cases.
 
J

James Kanze

On Oct 15, 2:36 pm, Nordlöw <[email protected]> wrote:

[...]
Apart from that, the mutex is held for too long. You don't
really need to hold the lock when allocating memory for
elements and when invoking the copy constructor of the
elements.

Which sounds a lot like pre-mature optimization to me. First
get the queue working, then see if there is a performance
problem, and only then, do something about it. (Given that
clients will normally only call functions on the queue when they
have nothing to do, waiting a couple of microseconds longer on
the lock won't impact anything.)
Here is an improved version (although a bit simplified):
void push(T const& t)

Actually, I'd name this "send", and not "push". Just because
the standard library uses very poor names doesn't mean we have
to.
{
// this avoids an allocation inside the critical section
bellow
queue_type tmp(&t, &t + 1);
{
boost::mutex::scoped_lock l(mtx_);
q_.splice(q_.end(), tmp);
}
cnd_.notify_one();
}

This is unnecessary complexity. And probably looses runtime
efficiency (not that it's important): his initial version uses
std::deque, which doesn't have to allocate at each
insertion---in fact, in all of the uses I've measured, the queue
tends to hit its maximum size pretty quickly, and there are no
more allocations after that.

Yet another case where premature optimization turns out to be
pessimization.

[...]
// this function provides only basic exception safety if T's
// copy ctor can throw or strong exception safety if T's copy
// ctor is nothrow

:).

In practice, I find that almost all of my inter-thread queues
need to contain polymorphic objects. Which means that the queue
contains pointers, and that all of the objects will in fact be
dynamically allocated. The result is that I use std::auto_ptr
in the interface (so the producer can't access the object once it
has been passed off, and the consumer knows to delete it).

Of course, std::auto_ptr has a no throw copy constructor, so the
queue itself has a strong exception safe guarantee.
I would recommend "Programming with POSIX Threads" book by by
David R. Butenhof.

Very much so, for the basics. (Formally, it's only Unix, but
practically, Boost threads are modeled after pthreads.) For the
data structures, it's less obvious, and of course, Butenhof
doesn't go into the issues which are particular to C++ (local
statics with dynamic initialization, the fact that pointers to
functions have to be ``extern "C"'', etc.).
 
S

Szabolcs Ferenczi

[...]
Are there any resources out there on the Internet on how to
design *thread-safe* *efficient* data- structures?
Sure.http://www.google.nl/search?q=boost+thread+safe+queue=

You have to be very careful with googling in cases like this.
There's an awful lot of junk on the net.  Just looking at the
first hit, for example, it's quite clear that the author doesn't
know what he's talking about, and I suspect that that's true in
a large number of cases.

Hmmmm... For me the first hit is a didactic piece by Anthony Williams:

Implementing a Thread-Safe Queue using Condition Variables ...
In those cases, it might be worth using something like boost::eek:ptional
to avoid this requirement ... Tags: threading, thread safe, queue,
condition variable ...
www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html

Saying that "it's quite clear that the author doesn't know what he's
talking about" is, hmmm..., at least indicates something about you.

I do not want to defend him but if you just read it to the end, you
must have learnt something, I guess. You should not stop by the first
fragment which is just a starting point illustrating the problem.

I agree in that he should not have suggest such a bad habit of
handling a shared resource in the front part of his article or, at
least, he should have warned the smattering reader that it is not the
correct way.

Happy reading.

Best Regards,
Szabolcs
 
M

Maxim Yegorushkin

On Oct 15, 2:36 pm, Nordlöw <[email protected]> wrote:

    [...]
Apart from that, the mutex is held for too long. You don't
really need to hold the lock when allocating memory for
elements and when invoking the copy constructor of the
elements.

Which sounds a lot like pre-mature optimization to me.  First
get the queue working, then see if there is a performance
problem, and only then, do something about it.  (Given that
clients will normally only call functions on the queue when they
have nothing to do, waiting a couple of microseconds longer on
the lock won't impact anything.)
Here is an improved version (although a bit simplified):
    void push(T const& t)

Actually, I'd name this "send", and not "push".  Just because
the standard library uses very poor names doesn't mean we have
to.
    {
        // this avoids an allocation inside the critical section
bellow
        queue_type tmp(&t, &t + 1);
        {
            boost::mutex::scoped_lock l(mtx_);
            q_.splice(q_.end(), tmp);
        }
        cnd_.notify_one();
    }

This is unnecessary complexity.  And probably looses runtime
efficiency (not that it's important): his initial version uses
std::deque, which doesn't have to allocate at each
insertion---in fact, in all of the uses I've measured, the queue
tends to hit its maximum size pretty quickly, and there are no
more allocations after that.

Yet another case where premature optimization turns out to be
pessimization.

    [...]
   // this function provides only basic exception safety if T's
   // copy ctor can throw or strong exception safety if T's copy
   // ctor is nothrow

:).

In practice, I find that almost all of my inter-thread queues
need to contain polymorphic objects.  Which means that the queue
contains pointers, and that all of the objects will in fact be
dynamically allocated.  The result is that I use std::auto_ptr
in the interface (so the producer can't access the object once it
has been passed off, and the consumer knows to delete it).

I agree with you that holding work elements by value is not most
practical. boost::function<> and std::list<> were used only for
simplicity here.

As you said, in practice, the work units are dynamically allocated
polymorphic objects. Naturally, the work unit base class is also a
(singly-linked) list node and the queue is implemented as an intrusive
list. This way, once a work unit has been allocated, the operations on
the inter-thread queue do not involve any memory allocations.

Something like this:

struct WorkUnit
{
WorkUnit* next;

WorkUnit()
: next()
{}

virtual ~WorkUnit() = 0;
virtual void execute() = 0;
virtual void release() { delete this; }
};

template<class T>
struct IntrusiveQueue
{
T *head, **tail;

IntrusiveQueue()
: head()
, tail(&head)
{}

void push_back(T* n)
{
*tail = n;
tail = &n->next;
}

T* pop_front()
{
T* n = head;
if(head && !(head = head->next))
tail = &head;
return n;
}
};
 
A

Andy

Nordlöw said:
I am currently designing a synchronized queue used to communicate
between threads. Is the code given below a good solution? Am I
using mutex lock/unlock more than needed?

Are there any resources out there on the Internet on how to design
*thread-safe* *efficient* data-
structures?

/Nordlöw

November issue of DDJ has a good article by Herb Sutter "Writing
a Generalized Concurrent Queue". Some may consider it full of
premature optimizations, but it's still a good reading. Watch DDJ
site for it to appear online soon.

Andy.
 
J

James Kanze

On Oct 16, 5:42 pm, Szabolcs Ferenczi <[email protected]>
wrote:
[...]
Are there any resources out there on the Internet on how to
design *thread-safe* *efficient* data- structures?
Sure.http://www.google.nl/search?q=boost+thread+safe+queue=
You have to be very careful with googling in cases like
this. There's an awful lot of junk on the net. Just
looking at the first hit, for example, it's quite clear that
the author doesn't know what he's talking about, and I
suspect that that's true in a large number of cases.
Hmmmm... For me the first hit is a didactic piece by Anthony
Williams:
Implementing a Thread-Safe Queue using Condition Variables ...
In those cases, it might be worth using something like boost::eek:ptional
to avoid this requirement ... Tags: threading, thread safe, queue,
condition variable ...www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-...

Yep. That was the example I was talking about.
Saying that "it's quite clear that the author doesn't know
what he's talking about" is, hmmm..., at least indicates
something about you.

Yes. That I know a bit about programming in a multithreaded
environment, and can spot basic stupidities right away.
I do not want to defend him but if you just read it to the
end, you must have learnt something, I guess.

I learned that he doesn't fully understand the implications.
You should not stop by the first fragment which is just a
starting point illustrating the problem.

When I see threaded code returning a reference into a protected
structure, after having freed the lock, I stop. No one who
understands threading would ever write something like that.
I agree in that he should not have suggest such a bad habit of
handling a shared resource in the front part of his article
or, at least, he should have warned the smattering reader that
it is not the correct way.

The rest just goes on to present what everyone knows anyway.
 
G

gpderetta

When I see threaded code returning a reference into a protected
structure, after having freed the lock, I stop.  No one who
understands threading would ever write something like that.

Well, the queue is a single consumer object (the author explicitly
says that), so as long as the consumer doesn't pop, the reference is
never invalidated (it uses a deque internally), at least with all sane
implementations of deque (and I'm fairly sure that c++0x will require
it to work).
The rest just goes on to present what everyone knows anyway.

'Everyone' is a fairly strong word. In my experience, many programmers
have no idea of what a condition variable is or how to use it.
 
J

James Kanze

Well, the queue is a single consumer object (the author
explicitly says that), so as long as the consumer doesn't pop,
the reference is never invalidated (it uses a deque
internally), at least with all sane implementations of deque
(and I'm fairly sure that c++0x will require it to work).

In other words, if it works, it works. The author does vaguely
mention something about "single consumer" after presenting the
code, but there's certainly nothing fundamental in his code
which prevents the client from reading from several different
threads. *IF* you're claiming any sort of thread safety
contract which doesn't require external synchronization, you
don't allow pointers and references to external data to escape.
'Everyone' is a fairly strong word. In my experience, many
programmers have no idea of what a condition variable is or
how to use it.

Yes. I should have said: everyone who knows anything about
programming with threads. From what I've seen, that's really a
minority.

My problem with the article is twofold: first, he presents a
simplistic implementation which is simply too dangerous to
consider. He does mention a vague constraint concerning its
use, but without explaining why, or covering any of the basic
principles. And he then goes on to explain how to use one
particular construct, a condition, again without explaining any
of the basic details. If someone reads just that article,
they're going to mess things up seriously, because they don't
know the basic principles. And if they've learned the basic
principles, say by reading Butenhof, then they already know
everything the article presents.
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

No members online now.

Forum statistics

Threads
473,962
Messages
2,570,134
Members
46,692
Latest member
JenniferTi

Latest Threads

Top