J
Joe Seigh
Here's a port of a fast pathed semaphore I did elsewhere. It
only does single permit acquire and release. If you use it
in conjunction with ConcurrentLinkedQueue you can get a blocking
queuue that's up to 3X faster than LinkedBlockingQueue under
contention.
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
public class FastSemaphore {
AtomicInteger count; // semaphore count
AtomicInteger cancel; // deferred cancelation count
Semaphore sem; // slow semaphore
public FastSemaphore(int z, boolean fair) {
count = new AtomicInteger(z);
cancel = new AtomicInteger(0);
sem = new Semaphore(0, fair);
}
/*
* processCancels - add cancelCount to current count
*
* increment count by min(cancelCount, -(count)) iff count < 0
*/
void processCancels(int cancelCount) {
int oldCount;
int newCount;
if (cancelCount > 0) {
while ((oldCount = count.get()) < 0) {
if ((newCount = oldCount + cancelCount) > 0)
newCount = 0;
if (count.compareAndSet(oldCount, newCount)) {
cancelCount -= (newCount - oldCount); // update cancelCount
break;
}
}
}
// add any untransferred cancelCount back into cancel
if (cancelCount > 0) {
cancel.addAndGet(cancelCount);
}
}
public void acquire()
throws InterruptedException
{
if (count.addAndGet(-1) < 0) {
try {
sem.acquire();
}
catch (InterruptedException e) {
// uncomment one and only one of the following 2 statements
cancel.incrementAndGet();
// processCancels(1);
throw e;
}
}
}
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException
{
boolean rc;
if (count.addAndGet(-1) < 0) {
try {
rc = sem.tryAcquire(timeout, unit);
}
catch (InterruptedException e) {
// uncomment one and only one of the following 2 statements
cancel.incrementAndGet();
// processCancels(1);
throw e;
}
if (rc == false) {
cancel.incrementAndGet();
// processCancels(1);
}
return rc;
}
else
return true;
}
public boolean tryAcquire() {
int oldCount;
do {
oldCount = count.get();
}
while (oldCount > 0 && !count.compareAndSet(oldCount, (oldCount - 1)));
return (oldCount > 0);
}
public void release() {
if (cancel.get() > 0 && count.get() < 0) {
processCancels(cancel.getAndSet(0));
}
if (count.addAndGet(1) <= 0) {
sem.release();
}
}
}
/*-*/
only does single permit acquire and release. If you use it
in conjunction with ConcurrentLinkedQueue you can get a blocking
queuue that's up to 3X faster than LinkedBlockingQueue under
contention.
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
public class FastSemaphore {
AtomicInteger count; // semaphore count
AtomicInteger cancel; // deferred cancelation count
Semaphore sem; // slow semaphore
public FastSemaphore(int z, boolean fair) {
count = new AtomicInteger(z);
cancel = new AtomicInteger(0);
sem = new Semaphore(0, fair);
}
/*
* processCancels - add cancelCount to current count
*
* increment count by min(cancelCount, -(count)) iff count < 0
*/
void processCancels(int cancelCount) {
int oldCount;
int newCount;
if (cancelCount > 0) {
while ((oldCount = count.get()) < 0) {
if ((newCount = oldCount + cancelCount) > 0)
newCount = 0;
if (count.compareAndSet(oldCount, newCount)) {
cancelCount -= (newCount - oldCount); // update cancelCount
break;
}
}
}
// add any untransferred cancelCount back into cancel
if (cancelCount > 0) {
cancel.addAndGet(cancelCount);
}
}
public void acquire()
throws InterruptedException
{
if (count.addAndGet(-1) < 0) {
try {
sem.acquire();
}
catch (InterruptedException e) {
// uncomment one and only one of the following 2 statements
cancel.incrementAndGet();
// processCancels(1);
throw e;
}
}
}
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException
{
boolean rc;
if (count.addAndGet(-1) < 0) {
try {
rc = sem.tryAcquire(timeout, unit);
}
catch (InterruptedException e) {
// uncomment one and only one of the following 2 statements
cancel.incrementAndGet();
// processCancels(1);
throw e;
}
if (rc == false) {
cancel.incrementAndGet();
// processCancels(1);
}
return rc;
}
else
return true;
}
public boolean tryAcquire() {
int oldCount;
do {
oldCount = count.get();
}
while (oldCount > 0 && !count.compareAndSet(oldCount, (oldCount - 1)));
return (oldCount > 0);
}
public void release() {
if (cancel.get() > 0 && count.get() < 0) {
processCancels(cancel.getAndSet(0));
}
if (count.addAndGet(1) <= 0) {
sem.release();
}
}
}
/*-*/