Last active
July 27, 2021 19:24
-
-
Save ova2/7c3f99a74aa1d9c20786f07c88e0bff2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Slf4j | |
public class CacheMono<KEY, IVALUE, OVALUE> { | |
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); | |
private final Map<KEY, CacheMonoValue<OVALUE>> cache = new HashMap<>(); | |
/** | |
* External value supplier which should be provided if "valuePublisher" with "keyExtractor" | |
* are not set | |
*/ | |
private final Function<KEY, Mono<OVALUE>> valueSupplier; | |
/** | |
* External source publisher stream which should be provided if "valueSupplier" is not set | |
*/ | |
private final Flux<IVALUE> valuePublisher; | |
/** | |
* Key extractor for emitted items provided by "valuePublisher" | |
*/ | |
private final Function<IVALUE, KEY> keyExtractor; | |
/** | |
* Value extractor for emitted items provided by "valuePublisher" | |
*/ | |
private final Function<IVALUE, OVALUE> valueExtractor; | |
private CacheMono(Function<KEY, Mono<OVALUE>> valueSupplier, Flux<IVALUE> valuePublisher, | |
Function<IVALUE, KEY> keyExtractor, Function<IVALUE, OVALUE> valueExtractor) { | |
this.valueSupplier = valueSupplier; | |
this.valuePublisher = valuePublisher; | |
this.keyExtractor = keyExtractor; | |
this.valueExtractor = valueExtractor; | |
} | |
/** | |
* Factory method to create a CacheMono instance from an external value supplier. The value | |
* supplier is called by this CacheMono instance for retrieving values when they are missing | |
* in cache ("pull" principle to retrieve not yet cached values). | |
*/ | |
public static <KEY, VALUE> CacheMono<KEY, VALUE, VALUE> fromSupplier( | |
@NonNull Function<KEY, Mono<VALUE>> valueSupplier) { | |
Objects.requireNonNull(valueSupplier); | |
return new CacheMono<>(valueSupplier, null, null, null); | |
} | |
/** | |
* Factory method to create a CacheMono instance from an external value publisher. | |
* Published values will fill this cache (reactive "push" way). | |
*/ | |
public static <KEY, VALUE> CacheMono<KEY, VALUE, VALUE> fromPublisher( | |
@NonNull Flux<VALUE> valuePublisher, @NonNull Function<VALUE, KEY> keyExtractor) { | |
Objects.requireNonNull(valuePublisher); | |
Objects.requireNonNull(keyExtractor); | |
return createCacheMono(valuePublisher, keyExtractor, Function.identity()); | |
} | |
/** | |
* Factory method to create a CacheMono instance from an external value publisher. | |
* Published values will fill this cache (reactive "push" way). | |
*/ | |
public static <KEY, IVALUE, OVALUE> CacheMono<KEY, IVALUE, OVALUE> fromPublisher( | |
@NonNull Flux<IVALUE> valuePublisher, | |
@NonNull Function<IVALUE, KEY> keyExtractor, | |
@NonNull Function<IVALUE, OVALUE> valueExtractor) { | |
Objects.requireNonNull(valuePublisher); | |
Objects.requireNonNull(keyExtractor); | |
return createCacheMono(valuePublisher, keyExtractor, valueExtractor); | |
} | |
private static <KEY, IVALUE, OVALUE> CacheMono<KEY, IVALUE, OVALUE> createCacheMono( | |
@NonNull Flux<IVALUE> valuePublisher, | |
@NonNull Function<IVALUE, KEY> keyExtractor, | |
@NonNull Function<IVALUE, OVALUE> valueExtractor) { | |
var cacheMono = new CacheMono<>(null, valuePublisher, keyExtractor, valueExtractor); | |
valuePublisher.doOnEach(signal -> { | |
if (signal.hasValue()) { | |
final var inputValue = signal.get(); | |
final var outputSignal = Signal.next(valueExtractor.apply(inputValue)); | |
cacheMono.cache.put(keyExtractor.apply(inputValue), | |
new CacheMonoValue<>(outputSignal)); | |
} else if (signal.isOnError()) { | |
if (signal.getThrowable() == null) { | |
log.error("Error from value publisher"); | |
} else { | |
log.error("Error from value publisher, message = {}", | |
signal.getThrowable().getMessage()); | |
} | |
} | |
}).subscribe(); | |
return cacheMono; | |
} | |
... | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment