Last active
July 27, 2021 19:30
-
-
Save ova2/728c48ef913d1d27f1ef5634709722bd to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Finds a value by key in an in-memory cache or load it from a remote source. | |
* The loaded value will be cached. | |
*/ | |
public Mono<OVALUE> lookup(KEY key) { | |
return Mono.defer(() -> getValueAsMono(key) | |
.switchIfEmpty(Mono.defer(() -> onCacheMissResume(key))) | |
); | |
} | |
private Mono<OVALUE> getValueAsMono(KEY key) { | |
final Lock readLock = lock.readLock(); | |
readLock.lock(); | |
try { | |
return Mono.justOrEmpty(cache.get(key)).flatMap(CacheMonoValue::toMono); | |
} finally { | |
readLock.unlock(); | |
} | |
} | |
private Mono<OVALUE> onCacheMissResume(KEY key) { | |
final Lock writeLock = lock.writeLock(); | |
writeLock.lock(); | |
try { | |
// check if value was already cached by another thread | |
final var cachedValue = cache.get(key); | |
if (cachedValue == null) { | |
final Mono<OVALUE> monoValue; | |
if (valuePublisher != null) { | |
// get value from external value publisher | |
monoValue = valuePublisher | |
.filter(value -> Objects.equals(keyExtractor.apply(value), key)) | |
.map(valueExtractor) | |
.next(); | |
} else if (valueSupplier != null) { | |
// get value from external supplier | |
monoValue = valueSupplier.apply(key); | |
} else { | |
throw new IllegalStateException("Value can be not determined," + | |
"neither valuePublisher nor valueSupplier were set"); | |
} | |
// cache Mono as value immediately | |
cache.put(key, new CacheMonoValue<>(monoValue)); | |
// cache success and error values encapsulated in signal when it is available | |
return monoValue.doOnEach(signal -> { | |
if (signal.isOnNext()) { | |
cache.put(key, new CacheMonoValue<>( | |
Signal.next(Objects.requireNonNull(signal.get()))) | |
); | |
} else if (signal.isOnError()) { | |
final Signal<OVALUE> errorSignal; | |
if (signal.getThrowable() == null) { | |
errorSignal = Signal.error( | |
new Throwable("Getting value from external provider failed")); | |
} else { | |
errorSignal = Signal.error(signal.getThrowable()); | |
} | |
cache.put(key, new CacheMonoValue<>(errorSignal)); | |
} | |
}); | |
} | |
return Mono.justOrEmpty(cachedValue).flatMap(CacheMonoValue::toMono); | |
} finally { | |
writeLock.unlock(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment