Skip to content

Instantly share code, notes, and snippets.

@bonifaido
Last active October 6, 2017 12:16
Show Gist options
  • Save bonifaido/5117342 to your computer and use it in GitHub Desktop.
Save bonifaido/5117342 to your computer and use it in GitHub Desktop.
Playing with Java Exchangers.
package me.nandork.exchangertest;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
*
* @author Nandor Kracser
*/
class ExchangerTest {
private static final long ITERATIONS = 1000L * 1000L * 100L;
private static final int BUFFER_SIZE = 100;
static class ExchangeProducer implements Runnable {
private final Exchanger<List<String>>[] exchangers;
private long timeouts = 0;
private int nextExchanger = 0;
public ExchangeProducer(ExchangeConsumer... exchangeConsumers) {
this.exchangers = new Exchanger[exchangeConsumers.length];
for (int i = 0; i < exchangeConsumers.length; i++) {
exchangers[i] = exchangeConsumers[i].getExchanger();
}
}
private Exchanger<List<String>> nextExchanger() {
Exchanger<List<String>> next = exchangers[nextExchanger++];
if (nextExchanger == exchangers.length) {
nextExchanger = 0;
}
return next;
}
@Override
public void run() {
List<String> currentBuffer = new ArrayList<String>(BUFFER_SIZE);
try {
for (long i = 0; i < ITERATIONS; i++) {
currentBuffer.add(produce());
if (currentBuffer.size() == BUFFER_SIZE) {
// try to exchange data
while (true) {
try {
currentBuffer = nextExchanger().exchange(currentBuffer, 10, TimeUnit.MICROSECONDS);
break;
} catch (TimeoutException ex) {
timeouts++;
}
}
}
}
// shutdown consumers
for (Exchanger exchanger : exchangers) {
exchanger.exchange(null);
}
} catch (InterruptedException ex) {
}
}
String produce() {
return "Message";
}
public long getTimeouts() {
return timeouts;
}
}
static class ExchangeConsumer implements Runnable {
private long messageCount = 0;
private Exchanger<List<String>> exchanger = new Exchanger<List<String>>();
@Override
public void run() {
List<String> currentBuffer = new ArrayList<String>(BUFFER_SIZE);
try {
while (currentBuffer != null) {
for (int i = currentBuffer.size() - 1; i >= 0; i--) {
String msg = currentBuffer.get(i);
consume(msg);
}
currentBuffer.clear();
currentBuffer = exchanger.exchange(currentBuffer);
}
} catch (InterruptedException ex) {
}
}
void consume(String msg) {
messageCount++;
}
public Exchanger<List<String>> getExchanger() {
return exchanger;
}
public long getMessageCount() {
return messageCount;
}
}
static void exchangerTest() throws InterruptedException {
ExecutorService threadPool = Executors.newFixedThreadPool(3);
ExchangeConsumer exchangeConsumer = new ExchangeConsumer();
ExchangeConsumer exchangeConsumer1 = new ExchangeConsumer();
ExchangeProducer exchangeProducer = new ExchangeProducer(exchangeConsumer, exchangeConsumer1);
long start = System.currentTimeMillis();
threadPool.execute(exchangeProducer);
threadPool.execute(exchangeConsumer);
threadPool.execute(exchangeConsumer1);
threadPool.shutdown();
threadPool.awaitTermination(1, TimeUnit.DAYS);
long end = System.currentTimeMillis();
long time = end - start;
System.out.println("Timeouts: " + exchangeProducer.getTimeouts());
System.out.println("Exchanger Delivered messages: " + exchangeConsumer.getMessageCount() + " time: " + time + " ms");
System.out.println("Exchanger Delivered messages: " + exchangeConsumer1.getMessageCount() + " time: " + time + " ms");
System.out.println("This is ~" + (long) (exchangeConsumer.getMessageCount() / (time / 1000f)) + " messages/s");
}
public static void main(String[] args) throws InterruptedException {
exchangerTest();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment