Last active
August 29, 2015 14:16
-
-
Save dawnbreaks/2ea8c81568ba8c3a0abb to your computer and use it in GitHub Desktop.
parseq example: https://github.com/linkedin/parseq
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.crm.test; | |
import java.util.List; | |
import java.util.concurrent.Callable; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.ScheduledExecutorService; | |
import java.util.concurrent.TimeUnit; | |
import com.linkedin.parseq.BaseTask; | |
import com.linkedin.parseq.Context; | |
import com.linkedin.parseq.Engine; | |
import com.linkedin.parseq.EngineBuilder; | |
import com.linkedin.parseq.ParTask; | |
import com.linkedin.parseq.Task; | |
import com.linkedin.parseq.Tasks; | |
import com.linkedin.parseq.promise.Promise; | |
import com.linkedin.parseq.promise.PromiseListener; | |
import com.linkedin.parseq.promise.Promises; | |
import com.linkedin.parseq.promise.SettablePromise; | |
public class TestParseq { | |
private static void sleep(long time){ | |
try { | |
Thread.sleep(time); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
private static class MockAsyncHttpClient { | |
private final ScheduledExecutorService scheduler; | |
public MockAsyncHttpClient(ScheduledExecutorService scheduler) { | |
this.scheduler = scheduler; | |
} | |
public Promise<String> fetch(final String url) { | |
final SettablePromise<String> promise = Promises.settable(); | |
scheduler.schedule(new Runnable() { | |
@Override | |
public void run() { | |
try{ | |
System.out.println("fetching "+ url + "...."); | |
promise.done("mock response for url = " + url); | |
} catch (Exception e) { | |
promise.fail(e); | |
} | |
} | |
}, 5, TimeUnit.SECONDS); | |
return promise; | |
} | |
} | |
private static class MockAsyncHttpTask extends BaseTask<String> { | |
private MockAsyncHttpClient httpClient; | |
private String url; | |
public MockAsyncHttpTask(final String name, MockAsyncHttpClient httpClient, String url){ | |
super(name); | |
this.httpClient = httpClient; | |
this.url = url; | |
} | |
@Override | |
protected Promise<String> run(final Context context) throws Exception { | |
return httpClient.fetch(url); | |
} | |
}; | |
public static void main(String[] args) { | |
ScheduledExecutorService serviceScheduler = Executors.newScheduledThreadPool(2); | |
MockAsyncHttpClient httpClient = new MockAsyncHttpClient(serviceScheduler); | |
final int numCores = Runtime.getRuntime().availableProcessors(); | |
final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(numCores + 1); | |
final Engine engine = new EngineBuilder() | |
.setTaskExecutor(scheduler) | |
.setTimerScheduler(scheduler) | |
.build(); | |
try { | |
testCustomTask(engine, httpClient);//test no-blocking task. work well as we expected. | |
sleep(15); | |
testRunnableAndCallableTask(engine);//test blocking task. par task are not concurrently runing. | |
} finally { | |
engine.shutdown(); | |
scheduler.shutdownNow(); | |
serviceScheduler.shutdown(); | |
} | |
} | |
private static void testCustomTask(Engine engine, MockAsyncHttpClient httpClient) { | |
MockAsyncHttpTask fetchBaidu = new MockAsyncHttpTask("baidu", httpClient, "http://www.baidu.com"); | |
MockAsyncHttpTask fetchGoogle = new MockAsyncHttpTask("google", httpClient, "http://www.google.com"); | |
MockAsyncHttpTask fetchSina = new MockAsyncHttpTask("sina", httpClient, "http://www.sina.com"); | |
MockAsyncHttpTask fetchQq = new MockAsyncHttpTask("qq", httpClient, "http://www.qq.com"); | |
ParTask<String> parTask = Tasks.par(fetchBaidu, fetchGoogle); | |
Task<String> seqTask = Tasks.seq(fetchSina, fetchQq); | |
parTask.addListener(new PromiseListener<List<String>>(){ | |
@Override | |
public void onResolved(Promise<List<String>> promise) { | |
System.out.println("parTask complete|valueListSize=" + promise.get().size()+"|value[0]=" + promise.get().get(0)+"|value[1]=" + promise.get().get(1)); | |
}}); | |
seqTask.addListener(new PromiseListener<String>(){ | |
@Override | |
public void onResolved(Promise<String> promise) { | |
System.out.println("seqTask complete|value=" + promise.get()); | |
}}); | |
engine.run(parTask); | |
engine.run(seqTask); | |
try { | |
parTask.await(); | |
seqTask.await(); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
private static void testRunnableAndCallableTask(Engine engine){ | |
Runnable runnable1 = new Runnable() { | |
@Override | |
public void run() { | |
System.out.println("runnable1|before sleep"); | |
sleep(5000); | |
System.out.println("runnable1|after sleep"); | |
} | |
}; | |
Runnable runnable2 = new Runnable() { | |
@Override | |
public void run() { | |
System.out.println("runnable2|before sleep"); | |
sleep(5000); | |
System.out.println("runnable2|after sleep"); | |
} | |
}; | |
Callable<String> callable1 = new Callable<String>() { | |
@Override | |
public String call() throws Exception { | |
System.out.println("callable1|before sleep"); | |
sleep(5000); | |
System.out.println("callable1|after sleep"); | |
return "callable1"; | |
} | |
}; | |
Callable<String> callable2 = new Callable<String>() { | |
@Override | |
public String call() throws Exception { | |
System.out.println("callable2|before sleep"); | |
sleep(5000); | |
System.out.println("callable2|after sleep"); | |
return "callable2"; | |
} | |
}; | |
ParTask<Object> parTask = Tasks.par(Tasks.action("parTaskRunnable", runnable1), Tasks.callable("parTaskCallable", callable1)); | |
Task<String> seqTask = Tasks.seq(Tasks.action("seqTaskRunnable", runnable2), Tasks.callable("seqTaskCallable", callable2)); | |
parTask.addListener(new PromiseListener<List<Object>>(){ | |
@Override | |
public void onResolved(Promise<List<Object>> promise) { | |
System.out.println("parTask complete|valueListSize=" + promise.get().size()+"|value[0]=" + promise.get().get(0)+"|value[1]=" + promise.get().get(1)); | |
}}); | |
seqTask.addListener(new PromiseListener<String>(){ | |
@Override | |
public void onResolved(Promise<String> promise) { | |
System.out.println("seqTask complete|value=" + promise.get()); | |
}}); | |
engine.run(parTask); | |
engine.run(seqTask); | |
try { | |
parTask.await(); | |
seqTask.await(); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment