Created
July 1, 2019 09:13
-
-
Save abitofhelp/57985ed06e903d77121842b6cc747ebb to your computer and use it in GitHub Desktop.
Reactive streaming from Azure Blob to rxjava generator/parser. This snippet shows how to stream content from Azure Blob Storage using their Java SDK v11 and reactive streams. The blob's stream is fed into a rxjava/rxkotlin generator, which parses sample data from the blob stream. Subscribers use the Flowable to work with each sample parsed from …
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Method generator parses sample data from a json stream. | |
* @param sampleJsonUrl String is the URL to the Azure Blob Storage sample data. | |
* @return Flowable<Sample> is a cold, synchronous, stateful and backpressure-aware | |
* generator of features. | |
*/ | |
fun generator(sampleJsonUrl: String) = | |
blobStorage.downloadBlob(sampleJsonUrl) | |
.map { bbuf: ByteBuffer -> JsonFactory().createParser(bbuf.array()) } | |
.map { jParser -> | |
Flowable.generate<Sample, JsonParser>( | |
java.util.concurrent.Callable { jParser.gobbleJsonToSamples() }, | |
io.reactivex.functions.BiConsumer<JsonParser, Emitter<Sample>> { | |
parser: JsonParser, emitter: Emitter<Sample> -> | |
pullOrComplete(parser, emitter) | |
}, | |
Consumer<JsonParser> { jParser.close() } | |
) | |
}.flatMapPublisher { it } | |
/** | |
* Method downloadBlob will retrieve a blob from storage and return a | |
* reactive stream of bytes. | |
* @param url String is the url to the blob. | |
* @return Single<ByteBuffer> is the blob's content. | |
*/ | |
fun downloadBlob(url: String): Single<ByteBuffer> = | |
BlockBlobURL(URL(url), pipeline) | |
.download(null, null, false, null) | |
.flatMap { | |
FlowableUtil | |
.collectBytesInBuffer( | |
it.body(ReliableDownloadOptions().withMaxRetryRequests(3)) | |
) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment