-
-
Save duarten/62841b0c32181bfd7ee9b1e19bcd078e to your computer and use it in GitHub Desktop.
import java.util.concurrent.atomic.AtomicInteger; | |
import java.util.concurrent.atomic.AtomicReference; | |
public final class Combiner { | |
public interface Action { | |
public void apply(); | |
} | |
private static final class Node { | |
public Action request; | |
public AtomicInteger wait = new AtomicInteger(); | |
public boolean complete; | |
public AtomicReference<Node> next = new AtomicReference<Node>(); | |
} | |
private ThreadLocal<Node> _myNode = new ThreadLocal<Node>() { | |
@Override | |
protected Node initialValue() { | |
return new Node(); | |
} | |
}; | |
private final int _limit; | |
private AtomicReference<Node> _tail; | |
public Combiner(int limit) { | |
_limit = limit; | |
_tail = new AtomicReference<Node>(new Node()); | |
} | |
public void combine(Action action) { | |
Node nextNode = _myNode.get(); | |
nextNode.complete = false; | |
nextNode.wait.set(1); | |
Node curNode = _tail.getAndSet(nextNode); | |
_myNode.set(curNode); | |
// | |
// There's now a window where nextNode/_tail can't be reached. | |
// So, any communication has to be done via the previous node | |
// in the list, curNode. | |
// | |
curNode.request = action; | |
curNode.next.lazySet(nextNode); | |
// Wait until our request has been fulfilled or we are the combiner. | |
final int maxSpins = 256; | |
int spin = 0; | |
while (curNode.wait.get() == 1) { | |
if (++spin == maxSpins) { | |
Thread.yield(); | |
spin = 0; | |
} | |
} | |
if (curNode.complete) | |
return; | |
// We are now the combiner. We copy n's Next field into nn, as n will | |
// become untouchable after n.wait.lazySet(0), due to reuse. | |
Node n = curNode; | |
Node nn; | |
for (int c = 0; c < _limit && (nn = n.next.get()) != null; ++c, n = nn) { | |
n.request.apply(); | |
n.next.set(null); | |
n.request = null; | |
n.complete = true; | |
n.wait.lazySet(0); | |
} | |
// Make someone else the combiner. | |
n.wait.set(0); | |
} | |
static final int threads = 100; | |
static final int ops = 1 << 20; | |
private static void testWithoutCombining() { | |
final AtomicInteger[] counters = new AtomicInteger[5]; | |
for (int i = 0; i < counters.length; ++i) | |
counters[i] = new AtomicInteger(); | |
final java.util.concurrent.CountDownLatch l = new java.util.concurrent.CountDownLatch(threads); | |
for (int i = 0; i < threads; ++i) { | |
new Thread() { | |
@Override | |
public void run() { | |
for (int j = 0; j < ops; ++j) { | |
for (AtomicInteger c : counters) { | |
c.incrementAndGet(); | |
} | |
} | |
l.countDown(); | |
} | |
}.start(); | |
} | |
do { | |
try { | |
l.await(); | |
break; | |
} catch (InterruptedException e) { } | |
} while (true); | |
for (int i = 0; i < counters.length; ++i) | |
System.out.println(counters[i].get()); | |
} | |
private static void testWithCombining() { | |
final int[] counters = new int[5]; | |
final Combiner c = new Combiner(100); | |
final java.util.concurrent.CountDownLatch l = new java.util.concurrent.CountDownLatch(threads); | |
for (int i = 0; i < threads; ++i) { | |
new Thread() { | |
@Override | |
public void run() { | |
for (int j = 0; j < ops; ++j) { | |
c.combine(new Combiner.Action() { | |
@Override | |
public void apply() { | |
for (int c = 0; c < counters.length; ++c) { | |
counters[c] += 1; | |
} | |
}}); | |
} | |
l.countDown(); | |
} | |
}.start(); | |
} | |
do { | |
try { | |
l.await(); | |
break; | |
} catch (InterruptedException e) { } | |
} while (true); | |
for (int i = 0; i < counters.length; ++i) | |
System.out.println(counters[i]); | |
} | |
public static void main(String[] args) { | |
long startTime = System.nanoTime(); | |
testWithCombining(); | |
//testWithoutCombining(); | |
long estimatedTime = System.nanoTime() - startTime; | |
System.out.println("time: " + estimatedTime / 1000000); | |
} | |
} |
I can't judge the fidelity of the implementation but I can give some hints on how to improve it:
- LockSupport/Thread.yield are just methadone to alleviate cache coherency traffic cost dependent by the running environment/usage that just hurt latencies making them less predictable: I would suggest to add a customizable WaitStrategy/IdleStrategy there to let a user choose what to do (n threads < n cores with pinned threads == win)
- try to pack more data into the datastructure (eg nodes) to avoid pointer chasing that will hurt perf both from GC point of view (infamous GC barriers) and due to the load/store fences put by the JVM around concurrent operations that wouldn't save any chase to load a referenced instance on fields
2 bis) try to pack fields into the classes while considering false sharing protection (JCtools uses abstract classes to control the space between fields) by using 2 cache lines (ie common modern x86 arch uses it as the prefetching size, impacting on false sharing as well) - please use JMH to avoid the JVM to perform some optimizations (eg dropping code that won't perform any visible effect) that won't happen on real code
- threads on test should be < n cores unless you want to measure the cost of context switching or bench the OS scheduler :P
I hope my 2 cents are welcome and compliment for the good work!!It is very interesting!!!!
To make my suggestions more clear I have created a branch of dummy project of mine: franz1981/java-puzzles@5eb59bb
In it I have copied the original Combiner implementation (adding a custom IdleStrategy to allow benchmarking it with JMH more easily)
and build up another one (OptimizedCombiner) with more "dense" datastructures, opefully packed to avoid false sharing.
In addition a I have relaxed some of the sequential consistent operations in favour of relaxed ones (without breaking the original intent).
The numbers on my box are (using 4 hammering threads):
Benchmark (combinerType) (idleStrategyType) Mode Cnt Score Error Units
CombinerBench.combine vanilla spin avgt 10 516.590 ± 13.473 ns/op
CombinerBench.combine vanilla yield avgt 10 470.046 ± 41.168 ns/op
CombinerBench.combine vanilla park avgt 10 517.785 ± 66.786 ns/op
CombinerBench.combine unsafe spin avgt 10 326.038 ± 3.921 ns/op
CombinerBench.combine unsafe yield avgt 10 346.840 ± 2.885 ns/op
CombinerBench.combine unsafe park avgt 10 346.230 ± 14.160 ns/op
CombinerBench.combine lock spin avgt 10 2157.008 ± 3378.290 ns/op
CombinerBench.combine lock yield avgt 10 353.903 ± 20.211 ns/op
CombinerBench.combine lock park avgt 10 315.353 ± 64.988 ns/op
While with just one (to emulate a normal single threaded usage):
Benchmark (combinerType) (idleStrategyType) Mode Cnt Score Error Units
CombinerBench.combine vanilla spin avgt 10 48.550 ± 0.902 ns/op
CombinerBench.combine vanilla yield avgt 10 49.079 ± 1.496 ns/op
CombinerBench.combine vanilla park avgt 10 48.539 ± 0.768 ns/op
CombinerBench.combine unsafe spin avgt 10 26.858 ± 0.502 ns/op
CombinerBench.combine unsafe yield avgt 10 26.689 ± 0.448 ns/op
CombinerBench.combine unsafe park avgt 10 27.732 ± 1.972 ns/op
CombinerBench.combine lock spin avgt 10 16.880 ± 0.196 ns/op
CombinerBench.combine lock yield avgt 10 17.035 ± 0.860 ns/op
CombinerBench.combine lock park avgt 10 17.245 ± 0.417 ns/op
Some notes:
vanilla
is the original combiner, whileunsafe
is the other one- the benchmark using a (spin) lock is not present in the original one, but given the intent of a combiner I suppose is a good
"contender" given that both guarantees exclusive execution of actions
The layout in memory of the optimized version is this one (running ~OptimizedCombiner::main`):
# Running 64-bit HotSpot VM.
# Using compressed oop with 3-bit shift.
# Using compressed klass with 3-bit shift.
# Objects are 8 bytes aligned.
# Field sizes by type: 4, 1, 1, 2, 2, 4, 4, 8, 8 [bytes]
# Array element sizes: 4, 1, 1, 2, 2, 4, 4, 8, 8 [bytes]
red.hat.puzzles.combiner.TailCombiner$Node object internals:
OFFSET SIZE TYPE DESCRIPTION VALUE
0 12 (object header) N/A
12 4 (alignment/padding gap)
16 8 long Pad0Node.p00 N/A
24 8 long Pad0Node.p01 N/A
32 8 long Pad0Node.p02 N/A
40 8 long Pad0Node.p03 N/A
48 8 long Pad0Node.p04 N/A
56 8 long Pad0Node.p05 N/A
64 8 long Pad0Node.p06 N/A
72 8 long Pad0Node.p07 N/A
80 8 long Pad0Node.p10 N/A
88 8 long Pad0Node.p11 N/A
96 8 long Pad0Node.p12 N/A
104 8 long Pad0Node.p13 N/A
112 8 long Pad0Node.p14 N/A
120 8 long Pad0Node.p15 N/A
128 8 long Pad0Node.p16 N/A
136 4 int WaitFieldNode.wait N/A
140 4 (alignment/padding gap)
144 8 long Pad1Node.p00 N/A
152 8 long Pad1Node.p01 N/A
160 8 long Pad1Node.p02 N/A
168 8 long Pad1Node.p03 N/A
176 8 long Pad1Node.p04 N/A
184 8 long Pad1Node.p05 N/A
192 8 long Pad1Node.p06 N/A
200 8 long Pad1Node.p10 N/A
208 8 long Pad1Node.p11 N/A
216 8 long Pad1Node.p12 N/A
224 8 long Pad1Node.p13 N/A
232 8 long Pad1Node.p14 N/A
240 8 long Pad1Node.p15 N/A
248 8 long Pad1Node.p16 N/A
256 8 long Pad1Node.p17 N/A
264 4 java.lang.Runnable Node.request N/A
268 4 red.hat.puzzles.combiner.TailCombiner.Node Node.next N/A
Instance size: 272 bytes
Space losses: 8 bytes internal + 0 bytes external = 8 bytes total
red.hat.puzzles.combiner.OptimizedCombiner object internals:
OFFSET SIZE TYPE DESCRIPTION VALUE
0 12 (object header) N/A
12 4 (alignment/padding gap)
16 8 long Pad0Combiner.p00 N/A
24 8 long Pad0Combiner.p01 N/A
32 8 long Pad0Combiner.p02 N/A
40 8 long Pad0Combiner.p03 N/A
48 8 long Pad0Combiner.p04 N/A
56 8 long Pad0Combiner.p05 N/A
64 8 long Pad0Combiner.p06 N/A
72 8 long Pad0Combiner.p07 N/A
80 8 long Pad0Combiner.p10 N/A
88 8 long Pad0Combiner.p11 N/A
96 8 long Pad0Combiner.p12 N/A
104 8 long Pad0Combiner.p13 N/A
112 8 long Pad0Combiner.p14 N/A
120 8 long Pad0Combiner.p15 N/A
128 8 long Pad0Combiner.p16 N/A
136 4 red.hat.puzzles.combiner.TailCombiner.Node TailCombiner._tail N/A
140 4 (alignment/padding gap)
144 8 long Pad1Combiner.p00 N/A
152 8 long Pad1Combiner.p01 N/A
160 8 long Pad1Combiner.p02 N/A
168 8 long Pad1Combiner.p03 N/A
176 8 long Pad1Combiner.p04 N/A
184 8 long Pad1Combiner.p05 N/A
192 8 long Pad1Combiner.p06 N/A
200 8 long Pad1Combiner.p10 N/A
208 8 long Pad1Combiner.p11 N/A
216 8 long Pad1Combiner.p12 N/A
224 8 long Pad1Combiner.p13 N/A
232 8 long Pad1Combiner.p14 N/A
240 8 long Pad1Combiner.p15 N/A
248 8 long Pad1Combiner.p16 N/A
256 8 long Pad1Combiner.p17 N/A
264 4 int OptimizedCombiner._limit N/A
268 4 java.lang.ThreadLocal OptimizedCombiner._myNode N/A
Instance size: 272 bytes
Space losses: 8 bytes internal + 0 bytes external = 8 bytes total
@stefanofago73
I suppose that access to a shared resource would be a nice use case for a Combiner
i've try to use this code on Java 8, on windows with i7, 8GB RAM.
The original code see flat-combining, two time slower...
So i've modified code with:
...on the combine method...locksupport.parknanos
...in testWithCombining...the use of lambda....
make it sense?
Now i've:
with combining time: 6166
without combining time: 19000
Thank You for this interesting code!