Skip to content

Instantly share code, notes, and snippets.

@valencik
Created July 9, 2024 20:19
Show Gist options
  • Save valencik/b1bcf0f173321c4dec17901ac3611baa to your computer and use it in GitHub Desktop.
Save valencik/b1bcf0f173321c4dec17901ac3611baa to your computer and use it in GitHub Desktop.
Elasticsearch Bulk Indexing with a stream of JSON strings
import cats.effect.IO
import fs2.{Pipe, Pull, Stream}
object JsonlPayloadPipe {
/** Builds a pipe that transforms a stream of JSON strings into payloads for the Elasticsearch Bulk
* API. Each input string represents a JSON document to be sent to the Bulk API. The JSONs are
* gathered, interspersed with `delimiter`, and packaged into payload strings up to a maximum
* size of `batchMax` bytes.
*
* @param delimiter
* This is the "action" ES should apply to the document following it.
* A typical value might be `{ "index" : { "_index" : "my-index" } }`
* @param batchMax
* Maximum payload size in bytes
* @return
* A pipe that yields a stream of row counts and payloads
*/
def formatUpToMaxBytes(delimiter: String, batchMax: Int): Pipe[IO, String, (Int, String)] = {
val sb = StringBuilder.apply(capacity = batchMax)
def finalizePayload(): String = {
// ES requires an extra trailing newline at the end of bulk payloads
sb.append("\n")
val res = sb.result()
sb.clear()
res
}
// This is a recursive function that processes the stream
// while accumulating the current payload size and current number of rows in the payload
def go(jsons: Stream[IO, String], currSize: Int, currNum: Int): Pull[IO, (Int, String), Unit] =
// Separate stream into current Chunk and remaining Stream
jsons.pull.uncons.flatMap {
case Some((headChunk, tailStream)) => {
var i = 0
var takeMore = true
var size = currSize
// Accumulate jsonRows into the string builder
while (takeMore && i < headChunk.size) {
val jsonRow = headChunk(i)
val newSize = size + delimiter.size + jsonRow.size + 2
if (newSize < batchMax && jsonRow.nonEmpty) {
sb.append(delimiter)
sb.append(jsonRow)
sb.append("\n")
size = newSize
i += 1
} else {
// Couldn't fit this jsonRow, don't take anymore
takeMore = false
}
}
if (takeMore) {
// We finished processing this chunk and still have more room
go(tailStream, currSize = size, currNum = currNum + i)
} else {
// We are done this chunk or have no more room in the string builder
// If we haven't accumulated any rows, stop with no output
// Otherwise return the number of rows processed, the payload, and keep processing
if (currNum == 0)
Pull.done
else {
// Recursively call ourselves with Stream of remaining json strings
val remaining = Stream.chunk(headChunk.drop(i)) ++ tailStream
val continue = go(remaining, currSize = 0, currNum = 0)
Pull.output1((currNum, finalizePayload())) >> continue
}
}
}
case None => {
// The jsons stream is now empty
// If we haven't accumulated any rows, stop with no output
// Otherwise return the number of rows processed and the payload
if (currNum == 0) Pull.done else Pull.output1((currNum, finalizePayload()))
}
}
rows => go(rows, 0, 0).stream
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment