Created
September 21, 2018 06:29
-
-
Save hoangtrucit/28be985af2655563b1ab758de20fd749 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.kinesisdemo.kinesisdemo.service.kinesis.consumer; | |
import com.kinesisdemo.kinesisdemo.service.kinesis.config.KinesisClient; | |
import com.kinesisdemo.kinesisdemo.service.kinesis.config.KinesisConfig; | |
import com.kinesisdemo.kinesisdemo.service.mysql.UserRepository; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.boot.CommandLineRunner; | |
import org.springframework.stereotype.Component; | |
import javax.annotation.PreDestroy; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.ScheduledExecutorService; | |
import java.util.concurrent.TimeUnit; | |
@Component | |
public class Consumer implements CommandLineRunner { | |
@Autowired | |
private KinesisConfig kinesisConfig; | |
@Autowired | |
private KinesisClient kinesisClient; | |
@Autowired | |
private UserRepository userRepository; | |
private ScheduledExecutorService executor; | |
@Override | |
public void run(String... strings){ | |
try{ | |
executor = Executors.newScheduledThreadPool(4); | |
QueryEvent queryEvent1 = new QueryEvent() | |
.setShardId(kinesisConfig.getShardOne()) | |
.setKinesisClient(kinesisClient) | |
.setKinesisConfig(kinesisConfig) | |
.setUserRepository(userRepository) | |
.getPointerFromKinesis(); | |
QueryEvent queryEvent2 = new QueryEvent() | |
.setShardId(kinesisConfig.getShardTwo()) | |
.setKinesisClient(kinesisClient) | |
.setKinesisConfig(kinesisConfig) | |
.setUserRepository(userRepository) | |
.getPointerFromKinesis(); | |
// executor.scheduleAtFixedRate(queryEvent1, 1, 1, TimeUnit.SECONDS); | |
// executor.scheduleAtFixedRate(queryEvent2, 1, 1, TimeUnit.SECONDS); | |
}catch (Throwable e){ | |
e.printStackTrace(); | |
System.out.println(e.getMessage()); | |
} | |
} | |
@PreDestroy | |
public void onDestroy() throws Exception { | |
executor.shutdown(); | |
executor.shutdownNow(); | |
while (!executor.isTerminated()) { | |
Thread.sleep(100); | |
executor.shutdownNow(); | |
} | |
System.out.println("Success stop consumer"); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.kinesisdemo.kinesisdemo.service.kinesis.consumer; | |
import com.amazonaws.services.kinesis.model.*; | |
import com.kinesisdemo.kinesisdemo.service.kinesis.config.KinesisClient; | |
import com.kinesisdemo.kinesisdemo.service.kinesis.config.KinesisConfig; | |
import com.kinesisdemo.kinesisdemo.service.mysql.ProccessData; | |
import com.kinesisdemo.kinesisdemo.service.mysql.UserRepository; | |
import java.util.Date; | |
import java.util.List; | |
public class QueryEvent implements Runnable { | |
private Boolean lock = false; | |
private String shardIterator = ""; | |
private String shardId; | |
private KinesisConfig kinesisConfig; | |
private KinesisClient kinesisClient; | |
private ShardIteratorType shardIteratorType = ShardIteratorType.LATEST; | |
private Long millisBehindLatest; | |
private Date date; | |
private UserRepository userRepository; | |
public UserRepository getUserRepository() { | |
return userRepository; | |
} | |
public QueryEvent setUserRepository(UserRepository userRepository) { | |
this.userRepository = userRepository; | |
return this; | |
} | |
public QueryEvent setDate(Date date) { | |
this.date = date; | |
return this; | |
} | |
public QueryEvent setShardIteratorType(ShardIteratorType shardIteratorType) { | |
this.shardIteratorType = shardIteratorType; | |
return this; | |
} | |
public QueryEvent setShardId(String shardId) { | |
this.shardId = shardId; | |
return this; | |
} | |
public QueryEvent setKinesisConfig(KinesisConfig kinesisConfig) { | |
this.kinesisConfig = kinesisConfig; | |
return this; | |
} | |
public QueryEvent setKinesisClient(KinesisClient kinesisClient) { | |
this.kinesisClient = kinesisClient; | |
return this; | |
} | |
public QueryEvent getPointerFromKinesis() { | |
GetShardIteratorResult getShardIteratorResult = this.kinesisClient | |
.getAmazonKinesis() | |
.getShardIterator( | |
this.buildQuery() | |
); | |
this.shardIterator = getShardIteratorResult.getShardIterator(); | |
return this; | |
} | |
private GetShardIteratorRequest buildQuery() { | |
Shard shard = new Shard(); | |
GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest(); | |
getShardIteratorRequest.setStreamName(kinesisConfig.getStreamName()); | |
getShardIteratorRequest.setShardId(shard.getShardId()); | |
getShardIteratorRequest.setShardId(this.shardId); | |
getShardIteratorRequest.setShardIteratorType(shardIteratorType); | |
switch (shardIteratorType) { | |
case AT_TIMESTAMP: | |
getShardIteratorRequest.setTimestamp(date); | |
break; | |
} | |
return getShardIteratorRequest; | |
} | |
private void getData() { | |
lock = true; | |
GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); | |
getRecordsRequest.setShardIterator(shardIterator); | |
getRecordsRequest.setLimit(5000); | |
GetRecordsResult result = kinesisClient | |
.getAmazonKinesis() | |
.getRecords(getRecordsRequest); | |
List<Record> recordList = result.getRecords(); | |
shardIterator = result.getNextShardIterator(); | |
millisBehindLatest = result.getMillisBehindLatest(); | |
System.out.println(String.format( | |
"[shard]: %s [query type]: %s [total records]: %s [latest]: %s", | |
shardId, | |
shardIteratorType.toString(), | |
this.formatTotalRecords(recordList.size()), | |
millisBehindLatest.toString() | |
)); | |
if (recordList.size() > 0) { | |
new Thread(new ProccessData(result.getRecords(), userRepository)).start(); | |
} | |
lock = false; | |
} | |
private String formatTotalRecords(int x) { | |
return String.format(x > 0 ? "\033[31;1m%d\033[0m" : "%d", x); | |
} | |
public void proccessData() { | |
} | |
@Override | |
public void run() { | |
if (lock || this.shardIterator.length() <= 0) return; | |
this.getData(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment