Skip to content

Instantly share code, notes, and snippets.

@daemin-hwang
Created December 22, 2015 02:07
Show Gist options
  • Save daemin-hwang/f0ed970a2765aded8597 to your computer and use it in GitHub Desktop.
Save daemin-hwang/f0ed970a2765aded8597 to your computer and use it in GitHub Desktop.
spring batch flow job example
package kr.co.plaync.lineage2.app.batch.rank.job;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import javax.persistence.EntityManagerFactory;
import javax.sql.DataSource;
import kr.co.plaync.lineage2.app.batch.powerbook.listener.CustomJobExecutionListener;
import kr.co.plaync.lineage2.app.batch.rank.job.redis.RedisItemProcessor;
import kr.co.plaync.lineage2.app.batch.rank.job.redis.RedisItemReader;
import kr.co.plaync.lineage2.domain.ingame.RankUser;
import kr.co.plaync.lineage2.opensource.batch.BatchInfo;
import kr.co.plaync.lineage2.opensource.jdbc.config.DbconnInfo;
import kr.co.plaync.lineage2.opensource.jdbc.config.DbconnInfo.LiveReplication;
import kr.co.plaync.lineage2.opensource.redis.repository.RedisRepository;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.scope.context.JobSynchronizationManager;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JpaItemWriter;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.SyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.data.redis.core.ZSetOperations.TypedTuple;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.util.Assert;
import com.ncsoft.ncframework.jdbc.routing.DataSourceContextHolder;
@Slf4j
@Configuration
public class RedisJobConfigurationTest {
@Deprecated
private static final String DELIMITER = ":";
private static final String STEP_NAME_DELIMITER = ":";
private static final Integer BATCH_SIZE = 420;
private static final Integer REDIS_KEY_EXPIRE_TIME = 60 * 60 * 6;
public enum SERVERINFO {
LINEAGE2_REPLICATION101("CLASSIC","WORLD_101", "101"),
LINEAGE2_REPLICATION102("CLASSIC","WORLD_102", "102"),
LINEAGE2_REPLICATION104("CLASSIC","WORLD_104", "104"),
LINEAGE2_REPLICATION107("CLASSIC","WORLD_107", "107");
/* LINEAGE2_REPLICATION7("LIVE","7"),
LINEAGE2_REPLICATION42("LIVE","42"),
LINEAGE2_REPLICATION44("LIVE","44"),
LINEAGE2_REPLICATION45("LIVE","45"),
LINEAGE2_REPLICATION46("LIVE","46"),
LINEAGE2_REPLICATION48("LIVE","48"),
LINEAGE2_REPLICATION49("LIVE","49"),
LINEAGE2_REPLICATION50("LIVE","50"),
LINEAGE2_REPLICATION51("LIVE","51");*/
private String serverType;
private String redisKey;
private String serverId;
SERVERINFO(String serverType, String redisKey, String serverId) {
this.serverType = serverType;
this.redisKey = redisKey;
this.serverId = serverId;
}
public String getServerType() {
return serverType;
}
public String getRedisKey() {
return redisKey;
}
public String getServerId() {
return serverId;
}
}
@Autowired
private JobBuilderFactory jobBuilders;
@Autowired
private CustomJobExecutionListener customJobExecutionListener;
@Autowired
private StepBuilderFactory stepBuilders;
@Autowired
@Qualifier(DbconnInfo.LINEAGE2_GAME_INFO)
private DataSource dataSource;
@Autowired
@Qualifier(LiveReplication.LINEAGE2_REPLICATION101)
private DataSource dataSource101;
@Autowired
@Qualifier(LiveReplication.LINEAGE2_REPLICATION102)
private DataSource dataSource102;
@Autowired
@Qualifier(LiveReplication.LINEAGE2_REPLICATION104)
private DataSource dataSource104;
@Autowired
@Qualifier(LiveReplication.LINEAGE2_REPLICATION107)
private DataSource dataSource107;
/*@Autowired
@Qualifier(OpDevReplication.LINEAGE2_REPLICATION7)
private DataSource dataSource7;
@Autowired
@Qualifier(OpDevReplication.LINEAGE2_REPLICATION42)
private DataSource dataSource42;
@Autowired
@Qualifier(OpDevReplication.LINEAGE2_REPLICATION44)
private DataSource dataSource44;
@Autowired
@Qualifier(OpDevReplication.LINEAGE2_REPLICATION45)
private DataSource dataSource45;
@Autowired
@Qualifier(OpDevReplication.LINEAGE2_REPLICATION46)
private DataSource dataSource46;
@Autowired
@Qualifier(OpDevReplication.LINEAGE2_REPLICATION48)
private DataSource dataSource48;
@Autowired
@Qualifier(OpDevReplication.LINEAGE2_REPLICATION49)
private DataSource dataSource49;
@Autowired
@Qualifier(OpDevReplication.LINEAGE2_REPLICATION50)
private DataSource dataSource50;
@Autowired
@Qualifier(OpDevReplication.LINEAGE2_REPLICATION51)
private DataSource dataSource51;*/
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private RedisRepository redisRepository;
@Autowired
@Qualifier(BatchInfo.DEFAULT_ENTITY_MANAGER_FACTORY)
private EntityManagerFactory emf;
@Bean(name = "ingameRankJobTest")
public Job job() {
JobBuilder jobBuilder = jobBuilders.get("ingameRankJobTest").listener(customJobExecutionListener);
FlowBuilder<Flow>splitflowBuilder1 = new FlowBuilder<Flow>("splitflowBuilder1").start(beforeStep());
FlowBuilder<Flow>splitflowBuilder2 = new FlowBuilder<Flow>("splitflowBuilder2").start(beforeStep());
FlowBuilder<Flow>splitflowBuilder3 = new FlowBuilder<Flow>("splitflowBuilder3").start(beforeStep());
for(Step step : step1List()) {
Flow subFlow = new FlowBuilder<Flow>(step.getName()).from(step).build();
String connName = step.getName().split(STEP_NAME_DELIMITER)[1];
splitflowBuilder1.split(taskExecutor(connName)).add(subFlow);
}
for(Step step : step2List()) {
Flow subFlow = new FlowBuilder<Flow>(step.getName()).from(step).build();
String connName = step.getName().split(STEP_NAME_DELIMITER)[1];
splitflowBuilder2.split(syncTaskExecutor(connName)).add(subFlow);
}
for(Step step : step3List()) {
Flow subFlow = new FlowBuilder<Flow>(step.getName()).from(step).build();
String connName = step.getName().split(STEP_NAME_DELIMITER)[1];
splitflowBuilder3.split(taskExecutor(connName)).add(subFlow);
}
return jobBuilder.start(splitflowBuilder1.build())
.next(splitflowBuilder2.build())
.next(splitflowBuilder3.build())
.end().build();
}
class DynamicDataSourceSyncTaskExecutor extends SyncTaskExecutor {
private static final long serialVersionUID = 1L;
private String dataSourceName;
public DynamicDataSourceSyncTaskExecutor(String dataSourceName) {
this.dataSourceName = dataSourceName;
}
@Override
public void execute(final Runnable task) {
final JobExecution jobExecution = JobSynchronizationManager.getContext().getJobExecution();
super.execute(new Runnable() {
@Override
public void run() {
DataSourceContextHolder.setDataSourceName(dataSourceName);
JobSynchronizationManager.register(jobExecution);
try {
task.run();
} finally {
DataSourceContextHolder.clearDataSourceName();
JobSynchronizationManager.release();
}
}
});
}
}
private TaskExecutor syncTaskExecutor(final String dataSourceName) {
return new DynamicDataSourceSyncTaskExecutor(dataSourceName);
}
class DynamicDataSourceSimpleAsyncTaskExecutor extends SimpleAsyncTaskExecutor {
private static final long serialVersionUID = 1L;
private String dataSourceName;
public DynamicDataSourceSimpleAsyncTaskExecutor(String dataSourceName) {
this.dataSourceName = dataSourceName;
}
@Override
protected void doExecute(final Runnable task) {
final JobExecution jobExecution = JobSynchronizationManager.getContext().getJobExecution();
super.doExecute(new Runnable() {
@Override
public void run() {
DataSourceContextHolder.setDataSourceName(dataSourceName);
JobSynchronizationManager.register(jobExecution);
try {
task.run();
} finally {
DataSourceContextHolder.clearDataSourceName();
JobSynchronizationManager.release();
}
}
});
}
}
private TaskExecutor taskExecutor(final String dataSourceName) {
return new DynamicDataSourceSimpleAsyncTaskExecutor(dataSourceName);
}
private String lookupDBConnName(final SERVERINFO serverInfo) {
String serverId = serverInfo.getServerId();
String[] dataSourceNames = DbconnInfo.getDataSourceNames(LiveReplication.class);
for(String dataSourceName : dataSourceNames) {
if (dataSourceName.contains(serverId)) {
return dataSourceName;
}
}
return null;
}
@Scope("prototype")
@Bean
public Step beforeStep() {
Tasklet tasklet = new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
// TODO Auto-generated method stub
log.info("[start ingameRankJob before step.]");
return RepeatStatus.FINISHED;
}
};
return stepBuilders.get("beforeStep")
.tasklet(tasklet)
.build();
}
public List<Step> step1List() {
List<Step>stepList = new ArrayList<Step>();
for (SERVERINFO serverInfo : SERVERINFO.values()) {
String connName = lookupDBConnName(serverInfo);
Step step = stepBuilders.get("saveIngameRankToRedisStep" + STEP_NAME_DELIMITER + connName)
.tasklet(tasklet(serverInfo))
.build();
stepList.add(step);
}
return stepList;
}
public List<Step> step2List() {
List<Step>stepList = new ArrayList<Step>();
for (SERVERINFO serverInfo : SERVERINFO.values()) {
String connName = DbconnInfo.LINEAGE2_GAME_INFO;
Step step = stepBuilders.get("deleteIngameRankToDbStep" + STEP_NAME_DELIMITER + connName)
.tasklet(tasklet2(serverInfo))
.build();
stepList.add(step);
}
return stepList;
}
public List<Step> step3List() {
List<Step>stepList = new ArrayList<Step>();
for (SERVERINFO serverInfo : SERVERINFO.values()) {
String worldId = serverInfo.getRedisKey();
String connName = DbconnInfo.LINEAGE2_GAME_INFO;
Step step = stepBuilders.get("saveIngameRankToDbStep" + STEP_NAME_DELIMITER + connName).<TypedTuple<String>, RankUser> chunk(BATCH_SIZE)
.reader(reader(worldId))
.processor(new RedisItemProcessor())
.writer(writer())
.build();
stepList.add(step);
}
return stepList;
}
/**
* 복제DB에서 사용자 경험치 데이타를 지정된 사이즈만큼 행 반환한다 .
* Fetch 사이즈만큼 행 반환시마다 Redis에 Insert를 수행한다.
* 전체 Insert 작업이 완료된후 Redis에 해당 월드 키를 6시간마다 삭제처리한다.
* @return
*/
@Scope("prototype")
@Bean
protected Tasklet tasklet(final SERVERINFO serverInfo) {
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext context) {
final String redisKey = serverInfo.getRedisKey();
//해당 리플리케이션의 user_data 카운팅 수 추출
long serverUserCount = selectServerUserCount();
long size = serverUserCount / BATCH_SIZE + 1;
List<RankUser> rankUserList = null;
for (long i = 0L; i < size; i++) {
long min = (i * BATCH_SIZE) + 1; // 1, 1001, 2001
long max = (i + 1) * BATCH_SIZE; // 1000, 2000, 3000
rankUserList = selectRankUserList(min, max);
for (RankUser user : rankUserList) {
String value = user.generateRankUserRedisValue();
redisRepository.addSortedSet(redisKey, value, user.getExp());
}
}
//레디스에 삽입된 카운트 체킹
Long userCount = redisRepository.getSortedSize(serverInfo.getRedisKey());
redisRepository.setExpireKey(redisKey, REDIS_KEY_EXPIRE_TIME);
contribution.incrementWriteCount(userCount.intValue());
log.info("{} insertIngameRankToRedisTask FINISHED", redisKey);
return RepeatStatus.FINISHED;
}
};
}
private long selectServerUserCount() {
StringBuilder countSql = new StringBuilder();
countSql.append(" SELECT MAX(char_id)");
countSql.append(" FROM user_data");
countSql.append(" WHERE builder = 0" );
countSql.append(" AND account_id > 0" );
countSql.append(" AND temp_delete_date is null" );
long serverUserCount = jdbcTemplate.queryForObject(countSql.toString(), Long.class);
return serverUserCount;
}
private List<RankUser> selectRankUserList(long min, long max) {
StringBuilder sql = new StringBuilder();
sql.append(" SELECT char_id,world,char_name,pledge_id,lev,class,exp ");
sql.append(" FROM user_data WITH (READUNCOMMITTED)" );
sql.append(" WHERE builder = 0" );
sql.append(" AND account_id > 0" );
sql.append(" AND temp_delete_date is null" );
sql.append(" AND char_id >= ? and char_id <= ?" );
List<RankUser>rankUserList = jdbcTemplate.query(sql.toString(), new UserDataMapper(), new Object[] {min, max});
return rankUserList;
}
/**
* 신규 배치 데이타 등록전 기존 날짜 및 월드에 등록된 데이타 삭제 처리
* 중복 실행시 데이타 이중 등록 방지용도로 Old 데이타 삭제처리함
* 데이타 삭제시 Lock 현상 제거
* @return
*/
@Scope("prototype")
@Bean
protected Tasklet tasklet2(final SERVERINFO serverInfo) {
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext context) {
StringBuilder deleteSql = new StringBuilder();
deleteSql.append(" WHILE (1>0) ");
deleteSql.append(" BEGIN");
deleteSql.append(" DELETE TOP (").append(BATCH_SIZE).append(")");
deleteSql.append(" FROM GrowRanking ");
deleteSql.append(" WHERE insertDate = CONVERT(CHAR(10), getdate(), 112) ");
deleteSql.append(" AND serverId = ").append(serverInfo.serverId);
deleteSql.append(" IF @@ROWCOUNT = 0 BREAK");
deleteSql.append(" END");
jdbcTemplate.execute(deleteSql.toString());
log.info("{} deleteIngameRankToDbTask FINISHED", serverInfo.getRedisKey());
return RepeatStatus.FINISHED;
}
};
}
/////////////////////////////////
// STEP 3 영역
/////////////////////////////////
@Scope("prototype")
@Bean
public ItemReader<TypedTuple<String>> reader(String worldId) {
RedisItemReader reader = new RedisItemReader(worldId);
reader.setPageSize(BATCH_SIZE);
return reader;
}
@Scope("prototype")
@Bean
public ItemWriter<RankUser> writer() {
JpaItemWriter<RankUser>writer = new JpaItemWriter<>();
writer.setEntityManagerFactory(emf);
return writer;
}
public class UserDataMapper implements RowMapper<RankUser> {
@Override
public RankUser mapRow(ResultSet rs, int i) throws SQLException {
RankUser user = new RankUser();
user.setCharacterId(rs.getInt("char_id"));
user.setCharacterName(rs.getString("char_name"));
user.setClassId(rs.getInt("class"));
user.setCharacterLevel(rs.getInt("lev"));
user.setServerId(rs.getInt("world"));
user.setPledgeId(rs.getInt("pledge_id"));
user.setExp(rs.getLong("exp"));
return user;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment