Fast lock-based FIFO alias bounded buffer

S

Szabolcs Ferenczi

I was re-reading the concurrent language tools of the Edison language,
when one interesting note caught my eyes: "... But the programmer is
no longer tied to the monitor concept, but can use simpler concepts,
such as semaphores ... Semaphores can then be used to implement a
multislot buffer in which sending and receiving can take place
simultaneously from different slots"

It seems that a solution does not necessarily have to be lock-free in
order to be efficient.

I have given it a try and checked it out how one could construct a
bounded buffer in C++ where the producer and the consumer can proceed
simultaneously.

template< typename T >
class BoundedBuffer {
enum {THREAD_SHARED=0, CLOSED=0};
public:
BoundedBuffer(const unsigned int limit,
Advancer *const first,
Advancer *const last)
: m_buf(limit),
m_first(first),
m_last(last) {
Sem_init(&m_full, THREAD_SHARED, CLOSED);
Sem_init(&m_empty, THREAD_SHARED, limit);
}
~BoundedBuffer() {
Sem_destroy(&m_full);
Sem_destroy(&m_empty);
}
void put(T item) {
Sem_wait(&m_empty);
m_buf[m_last->get_and_increment()] = item;
Sem_post(&m_full);
}
T get() {
Sem_wait(&m_full);
T aux = m_buf[m_first->get_and_increment()];
Sem_post(&m_empty);
return aux;
}
private:
std::vector< T > m_buf;
Advancer *const m_first;
Advancer *const m_last;
sem_t m_full;
sem_t m_empty;
};

For the construction, it needs two objects that implement atomic
increment of an index. By injecting these objects, one can supply
different advancers for the different needs. The indexes are used to
keep track of the first full and last empty positions, respectively.
The operation get_and_increment() atomically increments the index and
returns the value before the increment. If the index overflows, it is
reseted, so that the buffer slots are reused.

The class Advancer defines an abstract interface. I have used a
concrete class AtomicLockedAdvancer but anyone can use any other
technique as well.

The construction allows that producers and consumers can work on
different slots at the same time. It is solved by the two mutexes in
the two Advancers instead of the one mutex in the traditional monitor-
like solution. One mutex protects one index only. Thus the critical
region is smaller.

If a library does not provide semaphores, e.g. the Boost Library does
not, it can be solved by condition variables and mutexes as well. The
idea is the same, two mutexes must be employed instead of the one
mutex traditionally used in monitors.

Perhaps if the synchronization elements, i.e. the semaphores, are also
factored out, the class can be independent of any concrete thread
library.

Best Regards,
Szabolcs
 
C

Chris Thomasson

Szabolcs Ferenczi said:
I was re-reading the concurrent language tools of the Edison language,
when one interesting note caught my eyes: "... But the programmer is
no longer tied to the monitor concept, but can use simpler concepts,
such as semaphores ... Semaphores can then be used to implement a
multislot buffer in which sending and receiving can take place
simultaneously from different slots"

It seems that a solution does not necessarily have to be lock-free in
order to be efficient.
[...]

Efficient lock-based solutions are great, thanks for posting this; I am
looking forward to examining it. BTW, have you checked out Joe Seighs
semaphore algorithm which has wait-free fast-paths? They would definitely be
an asset to your algorithm. IIRC, here is how it went (in windows):

<sketch of 'joe_seigh_sem.hpp' which should compile>
___________________________________________________________________
#if ! defined(JOE_SEIGH_SEM_INCLUDE_HPP)
# define JOE_SEIGH_SEM_INCLUDE_HPP
# include <windows.h>
# include <exception>
# include <cassert>
# include <climits>
# define JOE_SEIGH_SEM_UNEXPECTED() \
assert(false); std::terminate()
/*===========================================================*/




class joe_seigh_sem {
LONG m_state;
HANDLE m_wset;


public:
joe_seigh_sem(LONG CONST state = 0)
: m_state(state),
m_wset(CreateSemaphore(NULL, 0, LONG_MAX, NULL)) {
if (! m_wset || state < 0) {
if (m_wset) { CloseHandle(m_wset); }
throw std::exception();
}
}

~joe_seigh_sem() throw() {
if (! CloseHandle(m_wset)) {
JOE_SEIGH_SEM_UNEXPECTED();
}
}


public:
void post() throw() {
if (InterlockedIncrement(&m_state) < 1) {
if (! ReleaseSemaphore(m_wset, 1, NULL)) {
JOE_SEIGH_SEM_UNEXPECTED();
}
}
}

void wait() throw() {
if (InterlockedDecrement(&m_state) < 0) {
if (WaitForSingleObject(m_wset, INFINITE) !=
WAIT_OBJECT_0) {
JOE_SEIGH_SEM_UNEXPECTED();
}
}
}
};




/*===========================================================*/
#endif
___________________________________________________________________



what do you think?
 
C

Chris Thomasson

Szabolcs Ferenczi said:
I was re-reading the concurrent language tools of the Edison language,
when one interesting note caught my eyes: "... But the programmer is
no longer tied to the monitor concept, but can use simpler concepts,
such as semaphores ... Semaphores can then be used to implement a
multislot buffer in which sending and receiving can take place
simultaneously from different slots"

It seems that a solution does not necessarily have to be lock-free in
order to be efficient.

I have given it a try and checked it out how one could construct a
bounded buffer in C++ where the producer and the consumer can proceed
simultaneously.

template< typename T >
class BoundedBuffer {
enum {THREAD_SHARED=0, CLOSED=0};
public:
BoundedBuffer(const unsigned int limit,
Advancer *const first,
Advancer *const last)
: m_buf(limit),
m_first(first),
m_last(last) {
Sem_init(&m_full, THREAD_SHARED, CLOSED);
[...]

Humm, the 'BoundedBuffer<T>::m_full' semaphore has an initial count of
'BoundedBuffer<T>::THREAD_SHARED' and a maximum count of
'BoundedBuffer<T>::CLOSED'; is that correct? Please briefly explain the
semantics of the 'Sem_init()' function; thanks.
 
C

Chris Thomasson

Chris Thomasson said:
Szabolcs Ferenczi said:
I was re-reading the concurrent language tools of the Edison language,
when one interesting note caught my eyes: "... But the programmer is
no longer tied to the monitor concept, but can use simpler concepts,
such as semaphores ... Semaphores can then be used to implement a
multislot buffer in which sending and receiving can take place
simultaneously from different slots"

It seems that a solution does not necessarily have to be lock-free in
order to be efficient.

I have given it a try and checked it out how one could construct a
bounded buffer in C++ where the producer and the consumer can proceed
simultaneously.
[...]
Humm, the 'BoundedBuffer<T>::m_full' semaphore has an initial count of
'BoundedBuffer<T>::THREAD_SHARED' and a maximum count of
'BoundedBuffer<T>::CLOSED'; is that correct? Please briefly explain the
semantics of the 'Sem_init()' function; thanks.

Ahh. Okay, it has to act just like the POSIX 'sem_init()' function:

http://www.opengroup.org/onlinepubs/007908775/xsh/sem_init.html

fine. The uppercase 'S' in 'Sem_init()' confused me for a moment. Sorry
about that.
 
C

Chris Thomasson

Szabolcs Ferenczi said:
I was re-reading the concurrent language tools of the Edison language,
when one interesting note caught my eyes: "... But the programmer is
no longer tied to the monitor concept, but can use simpler concepts,
such as semaphores ... Semaphores can then be used to implement a
multislot buffer in which sending and receiving can take place
simultaneously from different slots"

It seems that a solution does not necessarily have to be lock-free in
order to be efficient.

[...]

I just remembered that Dmitriy V'jukov posted a lock-free bounded buffer:

http://groups.google.com/group/comp.programming.threads/browse_frm/thread/1a3ef233d3717505

This will aid your performance tests for sure.
 
D

Dmitriy V'jukov

I was re-reading the concurrent language tools of the Edison language,
when one interesting note caught my eyes: "... But the programmer is
no longer tied to the monitor concept, but can use simpler concepts,
such as semaphores ... Semaphores can then be used to implement a
multislot buffer in which sending and receiving can take place
simultaneously from different slots"
It seems that a solution does not necessarily have to be lock-free in
order to be efficient.

[...]

I just remembered that Dmitriy V'jukov posted a lock-free bounded buffer:

http://groups.google.com/group/comp.programming.threads/browse_frm/th...

This will aid your performance tests for sure.


Note: In order to get 'blocking semantics' one must bolt eventcount
logic on top of this queue.
For example this one:
http://appcore.home.comcast.net/~appcore/appcore/include/ac_eventcount_algo1_h.html
http://appcore.home.comcast.net/~appcore/appcore/src/ac_eventcount_algo1_c.html


Dmitriy V'jukov
 
D

Dmitriy V'jukov

The construction allows that producers and consumers can work on
different slots at the same time.


This will work only in single-producer/single-consumer setup. So there
is no *producers* and *consumers*.


Dmitriy V'jukov
 
D

Dmitriy V'jukov

It seems that a solution does not necessarily have to be lock-free in
order to be efficient.

Lock-free in original sense is not about efficiency at all. It's only
about forward-progress.
There is no generally accepted term for 'efficient synchronization
algorithms'. I saw somewhere that David Dice used term 'atomic-free'.
But I think that 'atomic-free' is bad term for 'efficient *lock-based*
synchronization algorithms'.

Dmitriy V'jukov
 
S

Szabolcs Ferenczi

S

Szabolcs Ferenczi

Note: In order to get 'blocking semantics' one must bolt eventcount
logic on top of this queue.
For example this one:http://appcore.home.comcast.net/~ap...ast.net/~appcore/appcore/src/ac_eventcount_al...

Do not use those examples if you want to be on the safe side. Those
fragments contain the usual beginner's mistake of using the condition
variable for event signaling.

Note that you can check whether you use the condition variable
correctly by removing it from your algorithm and it must still be
correct functionally. (To be more exact, replace the signal/broadcast
with empty operation and replace wait with an unlock-lock sequence.)
If the functionality is broken, you use the condition variable in a
wrong way.

See:

http://groups.google.com/group/comp.programming.threads/msg/5503afa56835fd6d

Best Regards,
Szabolcs
 
S

Szabolcs Ferenczi

This will work only in single-producer/single-consumer setup. So there
is no *producers* and *consumers*.

De facto it works with thousand of producers and thousand of
consumers.

You can also see that if you carefully check how it works. The
simplicity comes from applying semaphores. They will only influence
the producers or consumers when the condition of the queue is not such
that the operation can be meaningfully finish. Remember, in a
concurrent programming environment the "once false, always false"
principle is not valid, contrary to sequential programming
environments, so if the queue is empty now it does not mean that the
get operation should return with false.

Producers only exclude each other for the very short time frame of
updating the index to the last location and consumers only exclude
each other for the very short time frame of updating the index to the
first location. Otherwise producers are not interfering with
consumers. It is not the case in the traditional monitor-based
solution where producers and consumers exclude each other for the time
the buffer is maintained.

I hope this helps.

Best Regards,
Szabolcs
 
D

Dmitriy V'jukov

De facto it works with thousand of producers and thousand of
consumers.

You can also see that if you carefully check how it works. The
simplicity comes from applying semaphores. They will only influence
the producers or consumers when the condition of the queue is not such
that the operation can be meaningfully finish. Remember, in a
concurrent programming environment the "once false, always false"
principle is not valid, contrary to sequential programming
environments, so if the queue is empty now it does not mean that the
get operation should return with false.


1. Queue is empty
2. Thread 1 calls get()
3. Thread 1 blocked on m_full
4. Thread 2 calls put()
5. Thread 2 acquires m_empty
6. Thread 2 executes m_last->get_and_increment()
7. Thread 3 calls put()
8. Thread 3 acquires m_empty
9. Thread 3 executes m_last->get_and_increment()
10. Thread 3 stores item in the buffer in the slot 2
11. Thread 3 releases m_full
12. Thread 1 acquires m_full
13. Thread 1 executes m_first->get_and_increment()
14. Thread 1 reads TRASH from slot 1
15. Thread 1 releases m_empty
16. Thread 1 returns TRASH to the user


It's excusable neglect for the one making first uncertain steps in the
Lock-Free World, and removing his first global mutex/monitor.


Dmitriy V'jukov
 
C

Chris Thomasson

Do not use those examples if you want to be on the safe side.

Do you even know what an eventcount is? Well, it's a general solution for
adding conditional blocking functionality to existing non-blocking
algorithms. It's kind of like a condition-variable for non-blocking
algorithms. In fact, it would be a great candidate for standardization. I
should propose it for Boost.Threads. What do you think Dmitriy?



Those
fragments contain the usual beginner's mistake of using the condition
variable for event signaling.

This is just totally false. Funny how you think its a mistake to use a
condition variable to signal about state-changes. Just to let you know, a
state-change that warrents a signal is analogous to an event. But, of
course, we have already been over this:

http://groups.google.com/group/comp.programming.threads/msg/a934807273c2d95b

http://groups.google.com/group/comp.programming.threads/msg/6882cd76a7ec4970

http://groups.google.com/group/comp.programming.threads/msg/cdfc27e51cee41b2


Please stop making that bogus assertion because some newbie might think you
know what your talking about!



Note that you can check whether you use the condition variable
correctly by removing it from your algorithm and it must still be
correct functionally. (To be more exact, replace the signal/broadcast
with empty operation and replace wait with an unlock-lock sequence.)

You should really try and understand my eventcount algorithm; here is a
simple, easy to read version:

http://groups.google.com/group/comp.programming.threads/browse_frm/thread/aa8c62ad06dbb380


It does not need to do a loop because I simply transfer spurious wakes out
of the slow-path and back into to the lock-free algorithm predicate test.
Joe Seigh demonstrated a usage pattern for it here:

http://groups.google.com/group/comp.programming.threads/msg/8e7a0379b55557c0
(at end of post...)



If the functionality is broken, you use the condition variable in a
wrong way.

I don't think that you have any idea on how to use condition variables
because of all the false assertions you make about them:

http://groups.google.com/group/comp.programming.threads/msg/6fba67075f21ca2f

http://groups.google.com/group/comp.programming.threads/msg/daa95ce7d49fcba4
 
S

Szabolcs Ferenczi

1. Queue is empty
2. Thread 1 calls get()
3. Thread 1 blocked on m_full
4. Thread 2 calls put()
5. Thread 2 acquires m_empty
6. Thread 2 executes m_last->get_and_increment()
7. Thread 3 calls put()
8. Thread 3 acquires m_empty
9. Thread 3 executes m_last->get_and_increment()
10. Thread 3 stores item in the buffer in the slot 2
11. Thread 3 releases m_full
12. Thread 1 acquires m_full
13. Thread 1 executes m_first->get_and_increment()
14. Thread 1 reads TRASH from slot 1
15. Thread 1 releases m_empty
16. Thread 1 returns TRASH to the user

It's excusable neglect for the one making first uncertain steps in the
Lock-Free World, and removing his first global mutex/monitor.

Dmitriy V'jukov- Hide quoted text -

- Show quoted text -

It looks like a good catch. Thanks.

Best Regards,
Szabolcs
 
D

Dmitriy V'jukov

Producers only exclude each other for the very short time frame of
updating the index to the last location and consumers only exclude
each other for the very short time frame of updating the index to the
first location. Otherwise producers are not interfering with
consumers. It is not the case in the traditional monitor-based
solution where producers and consumers exclude each other for the time
the buffer is maintained.

I hope this helps.


In reality if you have at least 1 atomic operation on shared location,
then threads cannot be acting in parallel provided then length of
operation is less than about 300 cycles.
So when you are saying "very short time frame of updating the index"
it makes me laugh.
Actually push() operation must described this way:
1. Mutually exclusive part for 300 cycles - sema_wait()
2. Mutually exclusive part for 300 cycles - index update
3. Truly parallel part for 2 cycles - storing item
4. Mutually exclusive part for 300 cycles - sema_post()

Benchmark with heavy load will show the same results - queue is very
slow and have negative scalability.


Dmitriy V'jukov
 
D

Dmitriy V'jukov

Do not use those examples if you want to be on the safe side. Those
fragments contain the usual beginner's mistake of using the condition
variable for event signaling.

Note that you can check whether you use the condition variable
correctly by removing it from your algorithm and it must still be
correct functionally. (To be more exact, replace the signal/broadcast
with empty operation and replace wait with an unlock-lock sequence.)
If the functionality is broken, you use the condition variable in a
wrong way.


What about it? Where eventcount algorithm will be broken if I remove
condition variable?


Dmitriy V'jukov
 
C

Chris Thomasson

Forget about it. It is so free that the API is simply not concurrent
programming compliant.

Huh? Please explain...



The dequeue operation returns a boolean if the
FIFO is empty. This is a sequential programming API.

:^/

Who cares if the dequeue operation returns boolean? As Dmitriy already
pointed out, you can use an eventcount to get conditional blocking; this is
how it could possibly look using my basic algorithm:

http://groups.google.com/group/comp.programming.threads/browse_frm/thread/aa8c62ad06dbb380

http://groups.google.com/group/comp.programming.threads/msg/8e7a0379b55557c0
(Joe Seigh points out where to place the memory barriers...)

<sketch>
____________________________________________________________
class simple_bounded_queue_wrapper {
bounded_queue m_impl;
ec m_full;
ec m_empty;


public:
bool enqueue_try(void* value) {
if (m_impl.enqueue(value)) {
m_empty.signal();
return true;
}
return false;
}

bool dequeue_try(void*& value) {
if (m_impl.dequeue(value)) {
m_full.signal();
return true;
}
return false;
}


public:
void enqueue_wait(void* value) {
while (! enqueue_try(value)) {
int const key = m_full.get();
if (enqueue_try(value)) { return; }
m_full.wait(key);
}
}

void dequeue_wait(void*& value) {
while (! dequeue_try(value)) {
int const key = m_empty.get();
if (dequeue_try(value)) { return; }
m_empty.wait(key);
}
}
};
____________________________________________________________



What's wrong with that? It maintains the lock-free properties of Dmitriy's
algorithm on the fast-paths. It will only block when certain conditions
arise (e.g., full/empty) in the slow-paths.
 
C

Chris Thomasson

Dmitriy V'jukov said:
What about it? Where eventcount algorithm will be broken if I remove
condition variable?

It won't be broken, however, it will use a horrible busy-loop predicated on
the result of the user algorithm.
 
D

Dmitriy V'jukov

Do you even know what an eventcount is? Well, it's a general solution for
adding conditional blocking functionality to existing non-blocking
algorithms. It's kind of like a condition-variable for non-blocking
algorithms. In fact, it would be a great candidate for standardization. I
should propose it for Boost.Threads. What do you think Dmitriy?


I have only good impressions of your eventcount. It's very simple and
very efficient. I used it to combine several queues and scheduler for
thread.

As for Boost.Threads I'm not sure. If someone will add event count to
Boost.Threads then he must also add some queues and stacks, and
educate people how correctly use it.


Dmitriy V'jukov
 
S

Szabolcs Ferenczi

Let us see how we can fix the problem pointed out in:

http://groups.google.com/group/comp.programming.threads/msg/c7aca39d932e0e30

The hazard can arise because the access of the array cell is not
included into the critical region. Typical mistake in concurrent
programs which, at the moment, can only be detected by good reviews.

The fix can be along the lines of including the access of the array of
cells into the atomic action:

m_last->put_and_increment(m_buf, item);

T aux = m_first->get_and_increment(m_buf);

A reference to the shared resource m_buf can be passed to the atomic
increment operation because the enclosing semaphore operations ensure
that the get and put indexes can access disjoint cells of the array.

Now the advancer becomes something like a primitive buffer, where the
two advancers are mapped to the same set of cells (array) so that the
subsets are always disjoint despite of the rearrangements. We need to
refactor the data (m_buf) to where it is handled (advancer). With this
background in mind, we can approach the problem with a fresh look from
top to down like this:

If only we had an atomic buffer, the class of the multi-slot buffers
could be derived from the atomic buffer like this:

template< typename T >
class BoundedBuffer : public AtomicBuffer< T > {
enum {THREAD_SHARED=0, CLOSED=0};
public:
BoundedBuffer(unsigned int limit)
: AtomicBuffer< T >(limit) {
Sem_init(&m_full, THREAD_SHARED, CLOSED);
Sem_init(&m_empty, THREAD_SHARED, limit);
}
~BoundedBuffer() {
Sem_destroy(&m_full);
Sem_destroy(&m_empty);
}
void put(T item) {
Sem_wait(&m_empty);
AtomicBuffer< T >::put(item);
Sem_post(&m_full);
}
T get() {
Sem_wait(&m_full);
T aux = AtomicBuffer< T >::get();
Sem_post(&m_empty);
return aux;
}
private:
sem_t m_full;
sem_t m_empty;
};

At this level we have the semaphores that wrap around the operations
of an atomic buffer. The atomic buffer is not defined yet, so let us
define it. If only we had a circular buffer we could make it atomic
with help of mutexes like this:

template< typename T >
class AtomicBuffer : public CircularBuffer< T > {
public:
AtomicBuffer(unsigned int limit)
: CircularBuffer< T >(limit) {
Pthread_mutex_init(&m_put_guard, NULL);
Pthread_mutex_init(&m_get_guard, NULL);
}
~AtomicBuffer() {
Pthread_mutex_destroy(&m_get_guard);
Pthread_mutex_destroy(&m_put_guard);
}
void put(T item) {
Pthread_mutex_lock(&m_put_guard);
CircularBuffer< T >::put(item);
Pthread_mutex_unlock(&m_put_guard);
}
T get() {
Pthread_mutex_lock(&m_get_guard);
T aux = CircularBuffer< T >::get();
Pthread_mutex_unlock(&m_get_guard);
return aux;
}
private:
pthread_mutex_t m_put_guard;
pthread_mutex_t m_get_guard;
};

At this level, we have the added mutexes, which we need to implement
the critical regions around a circular buffer. Note that although it
provides atomic access, the atomic buffer in itself is not safe in a
concurrent environment, i.e. without the wrapper with the semaphores.
Namely, producers and consumers may interfere with each other if the
buffer capacity is exceeded. On the other hand, the two mutexes make
it possible that the put and the get operations can happen
simultaneously on disjoint cells. Remember that the interference
between puts and gets are filtered out at the other level with the
semaphores.

We did not define the circular buffer yet, so let us define it:

template< typename T >
class CircularBuffer {
public:
CircularBuffer(unsigned int limit)
: m_buf(limit), m_first(0), m_last(0), m_limit(limit) { }
void put(T item) {
m_buf[m_last] = item; m_last = (m_last + 1) % m_limit;}
T get() {
T e = m_buf[m_first];
m_first = (m_first + 1) % m_limit; return e;}

private:
std::vector< T > m_buf;
unsigned int m_first, m_last, m_limit;
};

And so we have completed the task of redesigning a multi-slot bounded
buffer that allows producers and consumers to work on disjoint parts
of the buffer simultaneously. The original nice property remains
there. As a bonus, a hierarchical class structure emerged that
presents a nice layered object-oriented design solving one requirement
at a layer.

Best Regards,
Szabolcs
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

Forum statistics

Threads
473,961
Messages
2,570,131
Members
46,689
Latest member
liammiller

Latest Threads

Top