Created
June 22, 2021 00:41
-
-
Save mwmitchell/9958d53d6b60481a2d401d658373921b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.lucidworks.connectors.jobs.leader; | |
import java.time.Duration; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicBoolean; | |
import javax.annotation.PostConstruct; | |
import javax.annotation.PreDestroy; | |
import org.redisson.api.RLockReactive; | |
import org.redisson.api.RedissonClient; | |
import org.redisson.api.RedissonReactiveClient; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import reactor.core.Disposable; | |
import reactor.core.publisher.Mono; | |
public class RedisLeaderElection implements LeaderElection { | |
private static final Logger logger = LoggerFactory.getLogger(LeaderElection.class); | |
private static final int WAIT_SECONDS = 5; | |
private final String resourceId; | |
private final RedissonReactiveClient redissonClient; | |
private final List<Listener> listeners = new ArrayList<>(); | |
private final AtomicBoolean isLeader = new AtomicBoolean(); | |
private final AtomicBoolean stopping = new AtomicBoolean(); | |
private Disposable subscription; | |
public RedisLeaderElection(RedissonClient redissonClient, String resourceId) { | |
this.resourceId = resourceId; | |
this.redissonClient = redissonClient.reactive(); | |
} | |
public boolean isLeader() { | |
return isLeader.get(); | |
} | |
@PostConstruct | |
@Override | |
public void start() { | |
logger.info("Starting leadership election for resourceId={}", resourceId); | |
RSemaphoreReactive sem = redissonClient.getSemaphore(resourceId); | |
subscription = sem | |
.trySetPermits(1) | |
.flatMap(__ -> sem.tryAcquire(1, WAIT_SECONDS, TimeUnit.SECONDS)) | |
.flatMap(result -> { | |
if (result) { | |
return Mono.just(true); | |
} | |
return Mono.error(new RuntimeException()); | |
}) | |
.retry() | |
.repeat() | |
.doFinally(signalType -> { | |
try { | |
if(isLeader.get()){ | |
logger.info("Unlocking resourceId={}", resourceId); | |
sem.release().flatMap(__ -> sem.drainPermits()).block(); | |
logger.info("Unlocked resourceId={}", resourceId); | |
} | |
} catch (Exception e) { | |
logger.info("Error while unlocking resourceId={}, error={}", resourceId, e.getMessage()); | |
} finally { | |
logger.info("Leader election subscription for resourceId={} has been disposed", resourceId); | |
} | |
}) | |
.subscribe(lockObtained -> { | |
if (lockObtained) { | |
if (isLeader.compareAndSet(false, true)) { | |
listeners.forEach(Listener::onElected); | |
} | |
} else { | |
if (isLeader.compareAndSet(true, false)) { | |
listeners.forEach(Listener::onUnElected); | |
} | |
} | |
}); | |
} | |
@PreDestroy | |
@Override | |
public void stop() { | |
if (subscription != null) { | |
subscription.dispose(); | |
} | |
isLeader.set(false); | |
logger.info("Leadership instance for resourceId={} shutdown and released", resourceId); | |
} | |
@Override | |
public void addListener(Listener electionListener) { | |
if (listeners.contains(electionListener)) { | |
return; | |
} | |
listeners.add(electionListener); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment