Created
March 12, 2019 14:25
-
-
Save He-Pin/d31882e65c5bd58464ab42a688794efc to your computer and use it in GitHub Desktop.
reactor not working
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.alibaba.wireless.process.message.single.impl; | |
import com.alibaba.wireless.utils.NamedPoolThreadFactory; | |
import com.taobao.wireless.ripple2.core.util.SaneRejectedExecutionHandler; | |
import org.reactivestreams.Subscriber; | |
import org.reactivestreams.Subscription; | |
import reactor.core.publisher.Flux; | |
import reactor.core.scheduler.Schedulers; | |
import java.io.IOException; | |
import java.time.Duration; | |
import java.util.Collections; | |
import java.util.Iterator; | |
import java.util.List; | |
import java.util.concurrent.*; | |
import java.util.function.Function; | |
import java.util.stream.Collectors; | |
/** | |
* @author kerr | |
**/ | |
public class WorkQueueTest2 { | |
private static final ExecutorService asyncThreadPool; | |
private static final Flux<List<Integer>> works; | |
private static final LinkedBlockingQueue<List<Integer>> workQueue = new LinkedBlockingQueue<>(10); | |
static { | |
asyncThreadPool = new ThreadPoolExecutor(2 , 32, 60 * 1000, | |
//保活时间是毫秒时间! | |
TimeUnit.MILLISECONDS, | |
new LinkedBlockingDeque<>(100), | |
new NamedPoolThreadFactory( | |
"integerInsertWorker", true), | |
new SaneRejectedExecutionHandler()); | |
works = Flux.fromIterable(new Iterable<List<Integer>>() { | |
@Override | |
public Iterator<List<Integer>> iterator() { | |
return new Iterator<List<Integer>>() { | |
@Override | |
public boolean hasNext() { | |
return true; | |
} | |
@Override | |
public List<Integer> next() { | |
try { | |
return workQueue.take(); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
return null; | |
} | |
} | |
}; | |
} | |
}); | |
works.flatMapIterable(single -> single) | |
.windowTimeout(100, Duration.ofMillis(2)) | |
//.buffer(100,TimeUnit.MILLISECONDS,100) | |
.subscribeOn(Schedulers.fromExecutor(asyncThreadPool)) | |
.flatMap(new Function<Flux<Integer>, Flux<Integer>>() { | |
@Override | |
public Flux<Integer> apply(final Flux<Integer> integers) { | |
return Flux.just(integers.subscribeOn(Schedulers.fromExecutor(asyncThreadPool)) | |
.toStream() | |
.collect(Collectors.toList())) | |
.subscribeOn(Schedulers.fromExecutor(asyncThreadPool)) | |
.map(new Function<List<Integer>, Integer>() { | |
@Override | |
public Integer apply(final List<Integer> integers) { | |
System.out.println("onNext on "+Thread.currentThread() + " size :" + integers.size()); | |
try { | |
Thread.sleep(1000); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
return integers.size(); | |
} | |
}); | |
} | |
}) | |
.subscribe(new Subscriber<Integer>() { | |
private volatile Subscription subscription; | |
@Override | |
public void onSubscribe(final Subscription s) { | |
System.out.println("onSubscribe"); | |
subscription = s; | |
//s.request(Long.MAX_VALUE); | |
} | |
@Override | |
public void onNext(final Integer integer) { | |
System.out.println("size :"+integer); | |
subscription.request(1); | |
} | |
@Override | |
public void onError(final Throwable t) { | |
System.out.println(11111); | |
t.printStackTrace(); | |
} | |
@Override | |
public void onComplete() { | |
} | |
}); | |
} | |
public static void process(int value) throws InterruptedException { | |
System.out.println("process value "+value); | |
workQueue.put(Collections.singletonList(value)); | |
} | |
public static void main(String[] args) throws IOException { | |
//首先开100个线程去发布数据 | |
for (int i = 0; i < 100; i++) { | |
new Thread(new Runnable() { | |
@Override | |
public void run() { | |
for (int j = 0; j < 1000000; j++) { | |
try { | |
process(j); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
try { | |
Thread.sleep(ThreadLocalRandom.current().nextInt(2, 10)); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
} | |
}) | |
.start(); | |
} | |
System.out.println("read...."); | |
System.in.read(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment