At second glance I wasn't that fond of my solution and tweaked it as
below:
void put(T t)
{
std::unique_lock<std::mutex> lock(m_, std::try_to_lock);
while (!lock.owns_lock())
{
if (queue_.size() > capacity_/4)
{
There is problem, as queue_ is not protected under lock.
for (int i = 0; i < 3250; ++i)
std::this_thread::yield();
lock.try_lock();
}
else
{
lock.lock();
break;
}
}
while(queue_.size() >= capacity_)c_full_.wait(lock);
queue_.push_back(std::move(t));
if (queue_.size() == 1)
c_empty_.notify_one();
}
T take()
{
std::unique_lock<std::mutex> lock(m_, std::try_to_lock);
int retry = 0;
while (!lock.owns_lock())
{
if (queue_.size() < 3*capacity_/4)
{
for (int i = 0; i < 3250; ++i)
std::this_thread::yield();
lock.try_lock();
}
else
{
lock.lock();
break;
}
}
while(queue_.empty())c_empty_.wait(lock);
T tmp = std::move(queue_.front());
queue_.pop_front();
if (queue_.size() == capacity_-1)
c_full_.notify_one();
return tmp;
}
I am admittedly coding to the benchmark (which is not a really great
idea). But I got the time on my system down from 13.7 seconds to
about 9.7 seconds (another 40%).
This is version with array (identical implementation
as ArrayBlockingQueue in Java), instead of deque (I
used atomic type for count_). This executes about same speed as Java
as atomic type slows it down a bit.
Sadly, no noticing difference between deque and array.
#include <condition_variable>
#include <mutex>
#include <thread>
#include <cstdlib>
#include <atomic>
template <class T>
class BlockingQueue{
public:
BlockingQueue(unsigned cap)
:
queue_((T*)new char[cap*sizeof(T)]),
insPos_(0),
extPos_(0),
count_(0),
capacity_(cap)
{
}
BlockingQueue(const BlockingQueue&)=delete;
BlockingQueue& operator=(const BlockingQueue&)=delete;
~BlockingQueue()
{
delete[] (char*)queue_;
}
void put(T t)
{
std::unique_lock<std::mutex> lock(m_, std::try_to_lock);
int retry = 0;
while (!lock.owns_lock())
{
if (count_ > capacity_/4 && ++retry < 1000)
{
std::this_thread::yield();
}
else
{
lock.lock();
}
}
c_full_.wait(lock, [this]{return count_ < capacity_;});
new(queue_+insPos_)T(std::move(t));
inc(insPos_);
++count_;
if (count_ == 1)
c_empty_.notify_one();
}
T take()
{
std::unique_lock<std::mutex> lock(m_, std::try_to_lock);
int retry = 0;
while (!lock.owns_lock())
{
if (count_ < 3*capacity_/4 && ++retry < 1000)
{
std::this_thread::yield();
}
else
{
lock.lock();
}
}
c_empty_.wait(lock, [this]{return count_ > 0 ;});
T tmp = std::move(queue_[extPos_]);
queue_[extPos_].~T();
inc(extPos_);
--count_;
if (count_ == capacity_-1)
c_full_.notify_one();
return tmp;
}
bool empty()
{
std::lock_guard<std::mutex> lock(m_);
return count_ == 0;
}
private:
void inc(unsigned& i)
{
++i;
if(i == capacity_)i = 0;
}
std::mutex m_;
std::condition_variable c_empty_,c_full_;
T* queue_;
unsigned insPos_,extPos_;
std::atomic<unsigned> count_;
const unsigned capacity_;
};
int main()
{
BlockingQueue<int> produced(1000);
const int nitems = 100000000;
std::srand(12345);
std::function<void()> f_prod = [&]() {
int i = nitems;
while(i-- > 0){
produced.put(i);
}
};
std::thread producer1(f_prod);
std::function<void()> f_cons = [&]() {
const int size = 10000;
int arr[size];
int i = nitems;
while(i-- > 0)
{
arr[std::rand()%size] =
produced.take();
}
};
std::thread consumer1(f_cons);
producer1.join();
consumer1.join();
}