java.util.concurrent.ConcurrentLinkedQueue is incredibly slow and eats up memory on multi-processor systems.
Compare with java.util.concurrent.LinkedBlockingQueue.
This code below, performs xxxQueue.offer(xx) in a loop, by multiple threads. The LinkedBlockingQueue requires very little memory. the ConcurrentLinkedQueue requires almost 3 times bigger heap to just complete.
One a 2 core machine, CLQ is slower than LBQ. One 8 cores, CLQ sometimes takes forever to complete. I've had to kill it sometimes.
Any thoughts?
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class LinkedQTest {
public static void main(String[] args) {
int x = 1000;
int y = 1000;
System.err.println("Warmup..");
runTest(8, 100, 100);
System.err.println("Testing..");
runTest(8, 1000, 1000);
System.err.println("Testing..");
runTest(16, 1000, 1000);
}
static void runTest(final int threadCount, final int x, final int y) {
System.err.println("Threads: " + threadCount);
//-------------
final LinkedBlockingQueue lbq = new LinkedBlockingQueue(Integer.MAX_VALUE);
Thread[] lbqThreads = new Thread[threadCount];
for (int i = 0; i < lbqThreads.length; i++) {
lbqThreads[i] = new Thread() {
@Override
public void run() {
testLBQ(lbq, x, y);
}
};
}
long startLBQ = System.nanoTime();
for (Thread lbqThread : lbqThreads) {
lbqThread.start();
}
for (Thread lbqThread : lbqThreads) {
try {
lbqThread.join();
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
long endLBQ = System.nanoTime();
int sizeLBQ = lbq.size();
if (sizeLBQ != (x * y * threadCount)) {
System.err.println(
"Wrong number of items: " + sizeLBQ + ". Expected: " + (x * y * threadCount));
}
long timeLBQ = (endLBQ - startLBQ) / (1000 * 1000);
System.err.println("LBQ: Size: " + sizeLBQ + ", Time millis: " + timeLBQ);
//-------------
final ConcurrentLinkedQueue clq = new ConcurrentLinkedQueue();
Thread[] clqThreads = new Thread[threadCount];
for (int i = 0; i < clqThreads.length; i++) {
clqThreads[i] = new Thread() {
@Override
public void run() {
testCLQ(clq, x, y);
}
};
}
long startCLQ = System.nanoTime();
for (Thread clqThread : clqThreads) {
clqThread.start();
}
for (Thread clqThread : clqThreads) {
try {
clqThread.join();
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
long endCLQ = System.nanoTime();
int sizeCLQ = lbq.size();
if (sizeCLQ != (x * y * threadCount)) {
System.err.println(
"Wrong number of items: " + sizeCLQ + ". Expected: " + (x * y * threadCount));
}
long timeCLQ = (endCLQ - startCLQ) / (1000 * 1000);
System.err.println("CLQ: Size: " + sizeCLQ + ", Time millis: " + timeCLQ);
//-------------
System.gc();
}
static long testLBQ(LinkedBlockingQueue q, int x, int y) {
Object o = new Object();
long start = System.nanoTime();
for (int m = 0; m < x; m++) {
lbqOffer(q, y, o);
}
long end = System.nanoTime();
return end - start;
}
private static void lbqOffer(LinkedBlockingQueue q, int y, Object o) {
for (int i = 0; i < y; i++) {
q.offer(o);
}
}
static long testCLQ(ConcurrentLinkedQueue q, int x, int y) {
Object o = new Object();
long start = System.nanoTime();
for (int m = 0; m < x; m++) {
clqOffer(q, y, o);
}
long end = System.nanoTime();
return end - start;
}
private static void clqOffer(ConcurrentLinkedQueue q, int y, Object o) {
for (int i = 0; i < y; i++) {
q.offer(o);
}
}
}