Skip to content

Instantly share code, notes, and snippets.

Forked from nhachicha/
Last active May 8, 2016 14:28
Show Gist options
  • Save JeppeLeth/25518f6febb172717fb8548f7649beec to your computer and use it in GitHub Desktop.
Save JeppeLeth/25518f6febb172717fb8548f7649beec to your computer and use it in GitHub Desktop.
Examples using synchronizers (CountDownLatch, CyclicBarrier, Phaser)
// Talk and slides:
// ********************************************************************* //
// ************************** CountDownLatch ************************** //
// ********************************************************************* //
public class RandomIntAverage {
CountDownLatch controller = new CountDownLatch(NB_THREADS);
public void randomIntAvg() throws InterruptedException {
for (int i = 0; i < NB_THREADS; i++) {
new Thread(new Task()).start();
controller.await(); // wait until all participating threads finishes
class Task implements Runnable {
public void run() {
private final static int NB_THREADS = 3;
ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
Random random = new Random(System.currentTimeMillis());
void computeAverage () {
double sum = 0;
for (Integer random : queue) {
sum += random;
System.out.println("Average of " + queue.size() + " random int = " + sum / queue.size());
public static void main(String[] args) throws InterruptedException {
RandomIntAverage main = new RandomIntAverage();
// ********************************************************************* //
// ************************** CountDownLatch ************************** //
// ********************************************************************* //
// basic UC
public class RandomIntAverage {
CyclicBarrier controller = new CyclicBarrier(NB_THREADS + 1); // account for main thread
public void randomIntAvg() throws InterruptedException, BrokenBarrierException {
for (int i = 0; i < NB_THREADS; i++) {
new Thread(new Task()).start();
controller.await(); // wait until all participating threads finishes
class Task implements Runnable {
public void run() {
try {
} catch (InterruptedException | BrokenBarrierException e) {
private final static int NB_THREADS = 3;
ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
Random random = new Random(System.currentTimeMillis());
void computeAverage () {
double sum = 0;
for (Integer random : queue) {
sum += random;
System.out.println("Average of " + queue.size() + " random int = " + sum / queue.size());
public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
RandomIntAverage main = new RandomIntAverage();
// Reusing the barrier
public class RandomIntAverageReusingBarrier {
CyclicBarrier cyclicBarrier = new CyclicBarrier(NB_THREADS, new Aggregate());
public void randomAvg() {
for (int i = 0; i < NB_THREADS; i++) {
new Thread(new Task()).start();
class Task implements Runnable {
public void run() {
try {
// reusing the barrier
assert queue.size() == 0;
} catch (InterruptedException | BrokenBarrierException e) {
class Aggregate implements Runnable {
public void run() {
// All threads arrived at barrier
// clear the queue for reuse
private final static int NB_THREADS = 3;
ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
Random random = new Random(System.currentTimeMillis());
void computeAverage () {
double sum = 0;
for (Integer random : queue) {
sum += random;
System.out.println("Average of " + queue.size() + " random int = " + sum / queue.size());
public static void main(String[] args) {
RandomIntAverageReusingBarrier main = new RandomIntAverageReusingBarrier();
// ********************************************************************* //
// ************************** Phaser ************************** //
// ********************************************************************* //
//basic UC
public class RandomIntAverage {
Phaser phaser = new Phaser(1);
public void randomIntAvg(int n) {
for (int i = 0; i < n; i++) {
new Thread(new Task()).start();
phaser.arriveAndAwaitAdvance(); // wait until all registered threads arrives
class Task implements Runnable {
public void run() {
ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
Random random = new Random(System.currentTimeMillis());
void computeAverage () {
double sum = 0;
for (Integer random : queue) {
sum += random;
System.out.println("Average of " + queue.size() + " random int = " + sum / queue.size());
public static void main(String[] args) {
RandomIntAverage main = new RandomIntAverage();
// Resuing the barrier
public class RandomIntAverageMultiplePhases {
Phaser phaser = new Phaser(1) {
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println("Phase #" + phase);
// return true if the Phaser should terminate, false if Phaser should continue with next phase
return phase >= NB_PHASES - 1 || registeredParties == 0;
public void randomIntAvg(int n) {
for (int i = 0; i < n; i++) {
new Thread(new Task()).start();
// main thread finished initialising, unregister to unlock waiting thread
// if we don't unregister the condition will be met once, but we want to reuse the barrier
// and trigger a new phase every time n registered thread arrives & not n+1
phaser.arriveAndDeregister(); // unregister to start reusing the barrier
class Task implements Runnable {
public void run() {
while (!phaser.isTerminated()) {
private final static int NB_PHASES = 2;
private final static int NB_THREADS = 3;
ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
Random random = new Random(System.currentTimeMillis());
void computeAverage () {
double sum = 0;
for (Integer random : queue) {
sum += random;
System.out.println("Average of " + queue.size() + " random int = " + sum / queue.size());
public static void main(String[] args) {
RandomIntAverageMultiplePhases main = new RandomIntAverageMultiplePhases();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment