Created
October 29, 2015 16:32
-
-
Save nhachicha/7596e912c81aae38d721 to your computer and use it in GitHub Desktop.
Examples using synchronizers (CountDownLatch, CyclicBarrier, Phaser)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// ********************************************************************* // | |
// ************************** CountDownLatch ************************** // | |
// ********************************************************************* // | |
public class RandomIntAverage { | |
CountDownLatch controller = new CountDownLatch(NB_THREADS); | |
public void randomIntAvg() throws InterruptedException { | |
for (int i = 0; i < NB_THREADS; i++) { | |
new Thread(new Task()).start(); | |
} | |
controller.await(); // wait until all participating threads finishes | |
computeAverage(); | |
} | |
class Task implements Runnable { | |
@Override | |
public void run() { | |
queue.add(random.nextInt()); | |
controller.countDown(); | |
} | |
} | |
private final static int NB_THREADS = 3; | |
ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>(); | |
Random random = new Random(System.currentTimeMillis()); | |
void computeAverage () { | |
double sum = 0; | |
for (Integer random : queue) { | |
sum += random; | |
} | |
System.out.println("Average of " + queue.size() + " random int = " + sum / queue.size()); | |
} | |
public static void main(String[] args) throws InterruptedException { | |
RandomIntAverage main = new RandomIntAverage(); | |
main.randomIntAvg(); | |
} | |
} | |
// ********************************************************************* // | |
// ************************** CountDownLatch ************************** // | |
// ********************************************************************* // | |
// basic UC | |
public class RandomIntAverage { | |
CyclicBarrier controller = new CyclicBarrier(NB_THREADS + 1); // account for main thread | |
public void randomIntAvg() throws InterruptedException, BrokenBarrierException { | |
for (int i = 0; i < NB_THREADS; i++) { | |
new Thread(new Task()).start(); | |
} | |
controller.await(); // wait until all participating threads finishes | |
computeAverage(); | |
} | |
class Task implements Runnable { | |
@Override | |
public void run() { | |
queue.add(random.nextInt()); | |
try { | |
controller.await(); | |
} catch (InterruptedException | BrokenBarrierException e) { | |
e.printStackTrace(); | |
} | |
} | |
} | |
private final static int NB_THREADS = 3; | |
ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>(); | |
Random random = new Random(System.currentTimeMillis()); | |
void computeAverage () { | |
double sum = 0; | |
for (Integer random : queue) { | |
sum += random; | |
} | |
System.out.println("Average of " + queue.size() + " random int = " + sum / queue.size()); | |
} | |
public static void main(String[] args) throws InterruptedException, BrokenBarrierException { | |
RandomIntAverage main = new RandomIntAverage(); | |
main.randomIntAvg(); | |
} | |
} | |
// Reusing the barrier | |
public class RandomIntAverageReusingBarrier { | |
CyclicBarrier cyclicBarrier = new CyclicBarrier(NB_THREADS, new Aggregate()); | |
public void randomAvg() { | |
for (int i = 0; i < NB_THREADS; i++) { | |
new Thread(new Task()).start(); | |
} | |
} | |
class Task implements Runnable { | |
@Override | |
public void run() { | |
try { | |
queue.add(random.nextInt()); | |
cyclicBarrier.await(); | |
// reusing the barrier | |
assert queue.size() == 0; | |
queue.add(random.nextInt()); | |
cyclicBarrier.await(); | |
} catch (InterruptedException | BrokenBarrierException e) { | |
e.printStackTrace(); | |
} | |
} | |
} | |
class Aggregate implements Runnable { | |
@Override | |
public void run() { | |
// All threads arrived at barrier | |
computeAverage(); | |
// clear the queue for reuse | |
queue.clear(); | |
} | |
} | |
private final static int NB_THREADS = 3; | |
ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>(); | |
Random random = new Random(System.currentTimeMillis()); | |
void computeAverage () { | |
double sum = 0; | |
for (Integer random : queue) { | |
sum += random; | |
} | |
System.out.println("Average of " + queue.size() + " random int = " + sum / queue.size()); | |
} | |
public static void main(String[] args) { | |
RandomIntAverageReusingBarrier main = new RandomIntAverageReusingBarrier(); | |
main.randomAvg(); | |
} | |
} | |
// ********************************************************************* // | |
// ************************** Phaser ************************** // | |
// ********************************************************************* // | |
//basic UC | |
public class RandomIntAverage { | |
Phaser phaser = new Phaser(1); | |
public void randomIntAvg(int n) { | |
for (int i = 0; i < n; i++) { | |
phaser.register(); | |
new Thread(new Task()).start(); | |
} | |
phaser.arriveAndAwaitAdvance(); // wait until all registered threads arrives | |
computeAverage(); | |
} | |
class Task implements Runnable { | |
@Override | |
public void run() { | |
queue.add(random.nextInt()); | |
phaser.arrive(); | |
} | |
} | |
ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>(); | |
Random random = new Random(System.currentTimeMillis()); | |
void computeAverage () { | |
double sum = 0; | |
for (Integer random : queue) { | |
sum += random; | |
} | |
System.out.println("Average of " + queue.size() + " random int = " + sum / queue.size()); | |
} | |
public static void main(String[] args) { | |
RandomIntAverage main = new RandomIntAverage(); | |
main.randomIntAvg(3); | |
} | |
} | |
// Resuing the barrier | |
public class RandomIntAverageMultiplePhases { | |
Phaser phaser = new Phaser(1) { | |
@Override | |
protected boolean onAdvance(int phase, int registeredParties) { | |
System.out.println("Phase #" + phase); | |
computeAverage(); | |
queue.clear(); | |
// return true if the Phaser should terminate, false if Phaser should continue with next phase | |
return phase >= NB_PHASES - 1 || registeredParties == 0; | |
} | |
}; | |
public void randomIntAvg(int n) { | |
for (int i = 0; i < n; i++) { | |
phaser.register(); | |
new Thread(new Task()).start(); | |
} | |
// main thread finished initialising, unregister to unlock waiting thread | |
// if we don't unregister the condition will be met once, but we want to reuse the barrier | |
// and trigger a new phase every time n registered thread arrives & not n+1 | |
phaser.arriveAndDeregister(); // unregister to start reusing the barrier | |
} | |
class Task implements Runnable { | |
@Override | |
public void run() { | |
while (!phaser.isTerminated()) { | |
queue.add(random.nextInt()); | |
phaser.arriveAndAwaitAdvance(); | |
} | |
} | |
} | |
private final static int NB_PHASES = 2; | |
private final static int NB_THREADS = 3; | |
ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>(); | |
Random random = new Random(System.currentTimeMillis()); | |
void computeAverage () { | |
double sum = 0; | |
for (Integer random : queue) { | |
sum += random; | |
} | |
System.out.println("Average of " + queue.size() + " random int = " + sum / queue.size()); | |
} | |
public static void main(String[] args) { | |
RandomIntAverageMultiplePhases main = new RandomIntAverageMultiplePhases(); | |
main.randomIntAvg(NB_THREADS); | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment