Skip to content

Instantly share code, notes, and snippets.

@jasontedor
Created November 24, 2015 20:09
Show Gist options
  • Save jasontedor/050e7f33e9c12d40ac5f to your computer and use it in GitHub Desktop.
Save jasontedor/050e7f33e9c12d40ac5f to your computer and use it in GitHub Desktop.
InternalClusterService.java synchronization
diff --git a/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java b/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java
index 3407a57..73d014e 100644
--- a/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java
+++ b/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java
@@ -90,7 +90,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
private final Collection<ClusterStateListener> priorityClusterStateListeners = new CopyOnWriteArrayList<>();
private final Collection<ClusterStateListener> clusterStateListeners = new CopyOnWriteArrayList<>();
private final Collection<ClusterStateListener> lastClusterStateListeners = new CopyOnWriteArrayList<>();
- private final Map<ClusterStateTaskExecutor, List<UpdateTask>> updateTasksPerExecutor = new HashMap<>();
+ private final ConcurrentMap<ClusterStateTaskExecutor, List<UpdateTask>> updateTasksPerExecutor = new ConcurrentHashMap<>();
// TODO this is rather frequently changing I guess a Synced Set would be better here and a dedicated remove API
private final Collection<ClusterStateListener> postAppliedListeners = new CopyOnWriteArrayList<>();
private final Iterable<ClusterStateListener> preAppliedListeners = Iterables.concat(priorityClusterStateListeners, clusterStateListeners, lastClusterStateListeners);
@@ -285,9 +285,10 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
try {
final UpdateTask<T> updateTask = new UpdateTask<>(source, task, config, executor, listener);
- synchronized (updateTasksPerExecutor) {
- updateTasksPerExecutor.computeIfAbsent(executor, k -> new ArrayList<>()).add(updateTask);
- }
+ updateTasksPerExecutor.merge(
+ executor,
+ new ArrayList<>(Arrays.asList(updateTask)),
+ (oldValue, value) -> { List<UpdateTask> newValue = new ArrayList<>(oldValue); newValue.addAll(value); return newValue; });
if (config.timeout() != null) {
updateTasksExecutor.execute(updateTask, threadPool.scheduler(), config.timeout(), new Runnable() {
@@ -374,21 +375,20 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
<T> void runTasksForExecutor(ClusterStateTaskExecutor<T> executor) {
final ArrayList<UpdateTask<T>> toExecute = new ArrayList<>();
final ArrayList<String> sources = new ArrayList<>();
- synchronized (updateTasksPerExecutor) {
- List<UpdateTask> pending = updateTasksPerExecutor.remove(executor);
- if (pending != null) {
- for (Iterator<UpdateTask> iter = pending.iterator(); iter.hasNext(); ) {
- UpdateTask task = iter.next();
- if (task.processed.getAndSet(true) == false) {
- logger.trace("will process [{}]", task.source);
- toExecute.add((UpdateTask<T>) task);
- sources.add(task.source);
- } else {
- logger.trace("skipping [{}], already processed", task.source);
- }
+ List<UpdateTask> pending = updateTasksPerExecutor.remove(executor);
+ if (pending != null) {
+ for (Iterator<UpdateTask> iter = pending.iterator(); iter.hasNext(); ) {
+ UpdateTask task = iter.next();
+ if (task.processed.getAndSet(true) == false) {
+ logger.trace("will process [{}]", task.source);
+ toExecute.add((UpdateTask<T>) task);
+ sources.add(task.source);
+ } else {
+ logger.trace("skipping [{}], already processed", task.source);
}
}
}
+
if (toExecute.isEmpty()) {
return;
}
(END)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment