Created
December 9, 2020 21:45
-
-
Save bbstilson/e12232d140f5a1119ebfdd62b7d00a8c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.allenai.sqs.reader._ | |
import scala.collection.parallel.CollectionConverters._ | |
import scala.concurrent.ExecutionContext | |
import scala.collection.parallel.ExecutionContextTaskSupport | |
import java.util.concurrent.Executors | |
import com.amazonaws.services.sqs.AmazonSQSClientBuilder | |
object Main { | |
val cores = Runtime.getRuntime().availableProcessors() | |
val FILE_SIZE = 100000 | |
val PREFIX = "https://sqs.us-west-2.amazonaws.com/896129387501" | |
val FROM = s"$PREFIX/s2-espresso-prod" | |
val TO = s"$PREFIX/s2-espresso-backfill-prod" | |
val letters = 'a' to 'z' | |
val dataDir = os.pwd / "data" | |
val random = new scala.util.Random() | |
def newFile: String = List.fill(20)(letters(random.nextInt(26))).mkString | |
def main(args: Array[String]): Unit = { | |
val col = (0 to (cores * 2)).par | |
val tn = Thread.currentThread().getName() | |
col.tasksupport = new ExecutionContextTaskSupport( | |
ExecutionContext.fromExecutorService( | |
Executors.newFixedThreadPool(cores * 2) | |
) | |
) | |
col.foreach { n => | |
val upstream = new SingleQueueReader( | |
FROM, | |
10, | |
AmazonSQSClientBuilder.defaultClient(), | |
10000 | |
) | |
val downstream = new SQSUtil(TO) | |
var file = newFile | |
var fileLength = 0 | |
var start = System.currentTimeMillis() | |
upstream.batches.takeWhile(_.nonEmpty).foreach { batch => | |
val messages = batch.map(_.message).map { m => | |
m.getMessageId() -> m.getBody() | |
} | |
// write contents to file | |
os.write.append( | |
dataDir / file, | |
messages.map(_._2).mkString("\n") + "\n" | |
) | |
fileLength += batch.size | |
// send to downstream queue | |
downstream.sendMessageBatch(messages) | |
// delete from upstream | |
batch.foreach(_.delete()) | |
if (fileLength % 10000 == 0) { | |
val took = (System.currentTimeMillis() - start) / 1000 | |
val tn = Thread.currentThread().getName() | |
println( | |
s"$tn - (check in) Processed $fileLength messages in $took seconds (~${fileLength / took} mps)." | |
) | |
} | |
if (fileLength >= FILE_SIZE) { | |
val took = (System.currentTimeMillis() - start) / 1000 | |
println( | |
s"$tn - (file complete) Processed $fileLength messages in $took seconds (~${fileLength / took} mps)." | |
) | |
start = System.currentTimeMillis() | |
file = newFile | |
fileLength = 0 | |
} | |
} | |
} | |
} | |
} | |
import com.amazonaws.services.sqs._ | |
import com.amazonaws.services.sqs.model._ | |
import scala.jdk.CollectionConverters._ | |
class SQSUtil(queueUrl: String) { | |
private[this] val sqs = AmazonSQSClientBuilder.defaultClient() | |
def sendMessageBatch( | |
messages: List[(String, String)] | |
): SendMessageBatchResult = { | |
val entries = messages.map { case (id, body) => | |
new SendMessageBatchRequestEntry(id, body) | |
}.asJava | |
sqs.sendMessageBatch( | |
new SendMessageBatchRequest(queueUrl).withEntries(entries) | |
) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment