Skip to content

Instantly share code, notes, and snippets.

@dawnbreaks
Last active August 29, 2015 14:16
Show Gist options
  • Save dawnbreaks/2ea8c81568ba8c3a0abb to your computer and use it in GitHub Desktop.
Save dawnbreaks/2ea8c81568ba8c3a0abb to your computer and use it in GitHub Desktop.
package com.crm.test;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import com.linkedin.parseq.BaseTask;
import com.linkedin.parseq.Context;
import com.linkedin.parseq.Engine;
import com.linkedin.parseq.EngineBuilder;
import com.linkedin.parseq.ParTask;
import com.linkedin.parseq.Task;
import com.linkedin.parseq.Tasks;
import com.linkedin.parseq.promise.Promise;
import com.linkedin.parseq.promise.PromiseListener;
import com.linkedin.parseq.promise.Promises;
import com.linkedin.parseq.promise.SettablePromise;
public class TestParseq {
private static void sleep(long time){
try {
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static class MockAsyncHttpClient {
private final ScheduledExecutorService scheduler;
public MockAsyncHttpClient(ScheduledExecutorService scheduler) {
this.scheduler = scheduler;
}
public Promise<String> fetch(final String url) {
final SettablePromise<String> promise = Promises.settable();
scheduler.schedule(new Runnable() {
@Override
public void run() {
try{
System.out.println("fetching "+ url + "....");
promise.done("mock response for url = " + url);
} catch (Exception e) {
promise.fail(e);
}
}
}, 5, TimeUnit.SECONDS);
return promise;
}
}
private static class MockAsyncHttpTask extends BaseTask<String> {
private MockAsyncHttpClient httpClient;
private String url;
public MockAsyncHttpTask(final String name, MockAsyncHttpClient httpClient, String url){
super(name);
this.httpClient = httpClient;
this.url = url;
}
@Override
protected Promise<String> run(final Context context) throws Exception {
return httpClient.fetch(url);
}
};
public static void main(String[] args) {
ScheduledExecutorService serviceScheduler = Executors.newScheduledThreadPool(2);
MockAsyncHttpClient httpClient = new MockAsyncHttpClient(serviceScheduler);
final int numCores = Runtime.getRuntime().availableProcessors();
final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(numCores + 1);
final Engine engine = new EngineBuilder()
.setTaskExecutor(scheduler)
.setTimerScheduler(scheduler)
.build();
try {
testCustomTask(engine, httpClient);//test no-blocking task. work well as we expected.
sleep(15);
testRunnableAndCallableTask(engine);//test blocking task. par task are not concurrently runing.
} finally {
engine.shutdown();
scheduler.shutdownNow();
serviceScheduler.shutdown();
}
}
private static void testCustomTask(Engine engine, MockAsyncHttpClient httpClient) {
MockAsyncHttpTask fetchBaidu = new MockAsyncHttpTask("baidu", httpClient, "http://www.baidu.com");
MockAsyncHttpTask fetchGoogle = new MockAsyncHttpTask("google", httpClient, "http://www.google.com");
MockAsyncHttpTask fetchSina = new MockAsyncHttpTask("sina", httpClient, "http://www.sina.com");
MockAsyncHttpTask fetchQq = new MockAsyncHttpTask("qq", httpClient, "http://www.qq.com");
ParTask<String> parTask = Tasks.par(fetchBaidu, fetchGoogle);
Task<String> seqTask = Tasks.seq(fetchSina, fetchQq);
parTask.addListener(new PromiseListener<List<String>>(){
@Override
public void onResolved(Promise<List<String>> promise) {
System.out.println("parTask complete|valueListSize=" + promise.get().size()+"|value[0]=" + promise.get().get(0)+"|value[1]=" + promise.get().get(1));
}});
seqTask.addListener(new PromiseListener<String>(){
@Override
public void onResolved(Promise<String> promise) {
System.out.println("seqTask complete|value=" + promise.get());
}});
engine.run(parTask);
engine.run(seqTask);
try {
parTask.await();
seqTask.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static void testRunnableAndCallableTask(Engine engine){
Runnable runnable1 = new Runnable() {
@Override
public void run() {
System.out.println("runnable1|before sleep");
sleep(5000);
System.out.println("runnable1|after sleep");
}
};
Runnable runnable2 = new Runnable() {
@Override
public void run() {
System.out.println("runnable2|before sleep");
sleep(5000);
System.out.println("runnable2|after sleep");
}
};
Callable<String> callable1 = new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("callable1|before sleep");
sleep(5000);
System.out.println("callable1|after sleep");
return "callable1";
}
};
Callable<String> callable2 = new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("callable2|before sleep");
sleep(5000);
System.out.println("callable2|after sleep");
return "callable2";
}
};
ParTask<Object> parTask = Tasks.par(Tasks.action("parTaskRunnable", runnable1), Tasks.callable("parTaskCallable", callable1));
Task<String> seqTask = Tasks.seq(Tasks.action("seqTaskRunnable", runnable2), Tasks.callable("seqTaskCallable", callable2));
parTask.addListener(new PromiseListener<List<Object>>(){
@Override
public void onResolved(Promise<List<Object>> promise) {
System.out.println("parTask complete|valueListSize=" + promise.get().size()+"|value[0]=" + promise.get().get(0)+"|value[1]=" + promise.get().get(1));
}});
seqTask.addListener(new PromiseListener<String>(){
@Override
public void onResolved(Promise<String> promise) {
System.out.println("seqTask complete|value=" + promise.get());
}});
engine.run(parTask);
engine.run(seqTask);
try {
parTask.await();
seqTask.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment