Created
August 10, 2017 09:46
-
-
Save gotterdemarung/a6dec3fb7e51a37abaf009e426fccd40 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.ArrayList; | |
import java.util.HashMap; | |
import java.util.LinkedList; | |
import java.util.Objects; | |
import java.util.concurrent.ConcurrentHashMap; | |
import java.util.concurrent.ExecutorService; | |
import java.util.function.Consumer; | |
import java.util.function.Function; | |
/** | |
* Event emitter, that instantly takes event, but flushes them into registered consumers | |
* on regular basis. | |
* <p> | |
* Not thread-safe for consumer registration. | |
* | |
* @param <T> | |
*/ | |
public class FlushingEventEmitter<T> implements EventEmitter<T>, Function<String, LinkedList<T>>, Runnable { | |
private final HashMap<String, ArrayList<Consumer<T>>> consumers = new HashMap<>(); | |
private final ConcurrentHashMap<String, LinkedList<T>> buffer = new ConcurrentHashMap<>(); | |
private final ExecutorService executorService; | |
private final Object[] locks; | |
private final long sleepMillis; | |
private volatile boolean running; | |
/** | |
* Main constructor. | |
* | |
* @param executorService Executor service, used for multi-threading. | |
* @param locksCount Amount of locks, that may increase performance in high concurrency | |
* @param flushIntervalMillis Flush interval in milliseconds | |
*/ | |
public FlushingEventEmitter( | |
final ExecutorService executorService, | |
final int locksCount, | |
final long flushIntervalMillis | |
) { | |
Objects.requireNonNull(executorService, "executorService"); | |
this.executorService = executorService; | |
this.sleepMillis = flushIntervalMillis; | |
this.locks = new Object[locksCount]; | |
for (int i = 0; i < locksCount; i++) { | |
this.locks[i] = new Object(); | |
} | |
executorService.submit(this); | |
this.running = true; | |
} | |
/** | |
* Constructor | |
* | |
* @param executorService Executor service, used for multi-threading. | |
* @param flushIntervalMillis Flush interval in milliseconds | |
*/ | |
public FlushingEventEmitter( | |
final ExecutorService executorService, | |
final long flushIntervalMillis | |
) { | |
this(executorService, 10, flushIntervalMillis); | |
} | |
@Override | |
public void emit(final String routingKey, final T event) { | |
if (running && routingKey != null && event != null) { | |
ArrayList<Consumer<T>> cons = consumers.get(routingKey); | |
if (cons != null) { | |
// Locking | |
synchronized (locks[routingKey.hashCode() % locks.length]) { | |
LinkedList<T> list = buffer.get(routingKey); | |
if (list == null) { | |
buffer.computeIfAbsent(routingKey, this); | |
list = buffer.get(routingKey); | |
} | |
list.add(event); | |
} | |
} | |
} | |
} | |
@Override | |
public void on(final String routingKey, final Consumer<T> eventConsumer) { | |
Objects.requireNonNull(routingKey, "routingKey"); | |
Objects.requireNonNull(eventConsumer, "eventConsumer"); | |
ArrayList<Consumer<T>> receivers = consumers.get(routingKey); | |
if (receivers == null) { | |
receivers = new ArrayList<>(); | |
consumers.put(routingKey, receivers); | |
} | |
receivers.add(eventConsumer); | |
} | |
/** | |
* Used in flushing thread | |
*/ | |
@Override | |
public void run() { | |
while (running) { | |
try { | |
Thread.sleep(sleepMillis); | |
} catch (InterruptedException e) { | |
running = false; | |
continue; | |
} | |
// Flushing all events | |
for (String routingKey : buffer.keySet()) { | |
LinkedList<T> data; | |
synchronized (locks[routingKey.hashCode() % locks.length]) { | |
data = buffer.remove(routingKey); | |
} | |
// Sending | |
for (Consumer<T> consumer : consumers.get(routingKey)) { | |
executorService.submit(() -> { | |
for (T entry : data) { | |
consumer.accept(entry); | |
} | |
}); | |
} | |
} | |
} | |
} | |
/** | |
* Used for computeIfAbsent | |
*/ | |
@Override | |
public LinkedList<T> apply(final String s) { | |
return new LinkedList<>(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment