Skip to content

Instantly share code, notes, and snippets.

@gotterdemarung
Created August 10, 2017 09:46
Show Gist options
  • Save gotterdemarung/a6dec3fb7e51a37abaf009e426fccd40 to your computer and use it in GitHub Desktop.
Save gotterdemarung/a6dec3fb7e51a37abaf009e426fccd40 to your computer and use it in GitHub Desktop.
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.function.Function;
/**
* Event emitter, that instantly takes event, but flushes them into registered consumers
* on regular basis.
* <p>
* Not thread-safe for consumer registration.
*
* @param <T>
*/
public class FlushingEventEmitter<T> implements EventEmitter<T>, Function<String, LinkedList<T>>, Runnable {
private final HashMap<String, ArrayList<Consumer<T>>> consumers = new HashMap<>();
private final ConcurrentHashMap<String, LinkedList<T>> buffer = new ConcurrentHashMap<>();
private final ExecutorService executorService;
private final Object[] locks;
private final long sleepMillis;
private volatile boolean running;
/**
* Main constructor.
*
* @param executorService Executor service, used for multi-threading.
* @param locksCount Amount of locks, that may increase performance in high concurrency
* @param flushIntervalMillis Flush interval in milliseconds
*/
public FlushingEventEmitter(
final ExecutorService executorService,
final int locksCount,
final long flushIntervalMillis
) {
Objects.requireNonNull(executorService, "executorService");
this.executorService = executorService;
this.sleepMillis = flushIntervalMillis;
this.locks = new Object[locksCount];
for (int i = 0; i < locksCount; i++) {
this.locks[i] = new Object();
}
executorService.submit(this);
this.running = true;
}
/**
* Constructor
*
* @param executorService Executor service, used for multi-threading.
* @param flushIntervalMillis Flush interval in milliseconds
*/
public FlushingEventEmitter(
final ExecutorService executorService,
final long flushIntervalMillis
) {
this(executorService, 10, flushIntervalMillis);
}
@Override
public void emit(final String routingKey, final T event) {
if (running && routingKey != null && event != null) {
ArrayList<Consumer<T>> cons = consumers.get(routingKey);
if (cons != null) {
// Locking
synchronized (locks[routingKey.hashCode() % locks.length]) {
LinkedList<T> list = buffer.get(routingKey);
if (list == null) {
buffer.computeIfAbsent(routingKey, this);
list = buffer.get(routingKey);
}
list.add(event);
}
}
}
}
@Override
public void on(final String routingKey, final Consumer<T> eventConsumer) {
Objects.requireNonNull(routingKey, "routingKey");
Objects.requireNonNull(eventConsumer, "eventConsumer");
ArrayList<Consumer<T>> receivers = consumers.get(routingKey);
if (receivers == null) {
receivers = new ArrayList<>();
consumers.put(routingKey, receivers);
}
receivers.add(eventConsumer);
}
/**
* Used in flushing thread
*/
@Override
public void run() {
while (running) {
try {
Thread.sleep(sleepMillis);
} catch (InterruptedException e) {
running = false;
continue;
}
// Flushing all events
for (String routingKey : buffer.keySet()) {
LinkedList<T> data;
synchronized (locks[routingKey.hashCode() % locks.length]) {
data = buffer.remove(routingKey);
}
// Sending
for (Consumer<T> consumer : consumers.get(routingKey)) {
executorService.submit(() -> {
for (T entry : data) {
consumer.accept(entry);
}
});
}
}
}
}
/**
* Used for computeIfAbsent
*/
@Override
public LinkedList<T> apply(final String s) {
return new LinkedList<>();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment