Created
August 10, 2019 03:11
-
-
Save Charlyzzz/3b128e87878b008688c408726d43245c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private def cloudwatchLogs(logGroup: String, streamName: String): Source[OutputLogEvent, NotUsed] = { | |
implicit val ec: ExecutionContextExecutor = ExecutionContext.global | |
val credentialsProvider = new AWSStaticCredentialsProvider(new ProfileCredentialsProvider("10pines").getCredentials) | |
val logsClient = AWSLogsClientBuilder.standard | |
.withCredentials(credentialsProvider) | |
.withRegion(Regions.US_WEST_2) | |
.withClientConfiguration(new ClientConfiguration()) | |
.build | |
Source.unfoldAsync("") { token => | |
val request = new GetLogEventsRequest(logGroup, streamName) | |
.withStartTime(0L) | |
.withEndTime(System.currentTimeMillis()) | |
if (token.nonEmpty) request.setNextToken(token) | |
Future { logsClient.getLogEvents(request) } | |
.map { logs => | |
val newToken = logs.getNextForwardToken | |
if (token == newToken) { | |
None | |
} else { | |
import scala.jdk.CollectionConverters._ | |
val events: List[OutputLogEvent] = logs.getEvents.asScala.toList | |
Some((newToken, events)) | |
} | |
} | |
}.mapConcat(identity) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment