Last active
November 1, 2015 22:43
-
-
Save alexvictoor/8c8ebf77e874104c9c5b to your computer and use it in GitHub Desktop.
Jersey + RxJava + Hystrix sandbox
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?xml version="1.0" encoding="UTF-8"?> | |
<project xmlns="http://maven.apache.org/POM/4.0.0" | |
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | |
<modelVersion>4.0.0</modelVersion> | |
<groupId>com.alex</groupId> | |
<artifactId>hystrix-sandbox</artifactId> | |
<version>1.0-SNAPSHOT</version> | |
<properties> | |
<jersey.version>2.22.1</jersey.version> | |
</properties> | |
<dependencies> | |
<dependency> | |
<groupId>com.netflix.hystrix</groupId> | |
<artifactId>hystrix-core</artifactId> | |
<version>1.4.17</version> | |
</dependency> | |
<dependency> | |
<groupId>com.netflix.archaius</groupId> | |
<artifactId>archaius-core</artifactId> | |
<version>0.4.1</version> | |
</dependency> | |
<dependency> | |
<groupId>io.reactivex</groupId> | |
<artifactId>rxjava</artifactId> | |
<version>1.0.14</version> | |
<scope>compile</scope> | |
</dependency> | |
<dependency> | |
<groupId>org.slf4j</groupId> | |
<artifactId>slf4j-api</artifactId> | |
<version>1.7.6</version> | |
</dependency> | |
<dependency> | |
<groupId>org.slf4j</groupId> | |
<artifactId>slf4j-simple</artifactId> | |
<version>1.7.6</version> | |
</dependency> | |
<dependency> | |
<groupId>org.glassfish.jersey.ext.rx</groupId> | |
<artifactId>jersey-rx-client-java8</artifactId> | |
<version>${jersey.version}</version> | |
</dependency> | |
<dependency> | |
<groupId>org.glassfish.jersey.ext.rx</groupId> | |
<artifactId>jersey-rx-client-guava</artifactId> | |
<version>${jersey.version}</version> | |
</dependency> | |
<dependency> | |
<groupId>org.glassfish.jersey.ext.rx</groupId> | |
<artifactId>jersey-rx-client-rxjava</artifactId> | |
<version>${jersey.version}</version> | |
</dependency> | |
<dependency> | |
<groupId>junit</groupId> | |
<artifactId>junit</artifactId> | |
<version>4.12</version> | |
<scope>test</scope> | |
</dependency> | |
<dependency> | |
<groupId>com.github.tomakehurst</groupId> | |
<artifactId>wiremock</artifactId> | |
<version>1.53</version> | |
<scope>test</scope> | |
</dependency> | |
<dependency> | |
<groupId>org.assertj</groupId> | |
<artifactId>assertj-core</artifactId> | |
<version>1.7.0</version> | |
<scope>test</scope> | |
</dependency> | |
<dependency> | |
<groupId>com.jayway.awaitility</groupId> | |
<artifactId>awaitility</artifactId> | |
<version>1.6.2</version> | |
<scope>test</scope> | |
</dependency> | |
</dependencies> | |
<build> | |
<plugins> | |
<plugin> | |
<artifactId>maven-compiler-plugin</artifactId> | |
<version>3.2</version> | |
<configuration> | |
<source>1.8</source> | |
<target>1.8</target> | |
</configuration> | |
</plugin> | |
</plugins> | |
</build> | |
</project> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.github.tomakehurst.wiremock.junit.WireMockRule; | |
import com.google.common.util.concurrent.ListenableFuture; | |
import com.netflix.hystrix.HystrixCommand; | |
import com.netflix.hystrix.HystrixCommandGroupKey; | |
import com.netflix.hystrix.HystrixCommandProperties; | |
import com.netflix.hystrix.HystrixObservableCommand; | |
import org.glassfish.jersey.client.rx.Rx; | |
import org.glassfish.jersey.client.rx.guava.RxListenableFutureInvoker; | |
import org.glassfish.jersey.client.rx.rxjava.RxObservableInvoker; | |
import org.junit.Rule; | |
import org.junit.Test; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import rx.Observable; | |
import javax.ws.rs.core.Response; | |
import java.util.Date; | |
import java.util.concurrent.ExecutionException; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicBoolean; | |
import static com.github.tomakehurst.wiremock.client.WireMock.*; | |
import static com.jayway.awaitility.Awaitility.await; | |
import static org.assertj.core.api.Assertions.assertThat; | |
public class HystrixTest { | |
private final static Logger log = LoggerFactory.getLogger(HystrixTest.class); | |
@Rule | |
public WireMockRule mock = new WireMockRule(); | |
@Test | |
public void should_call_remote_resource_synchronously() { | |
stubFor(get(urlMatching("/.*")) | |
.willReturn(aResponse().withStatus(200).withBody("Hello!"))); | |
Response response = Rx.newClient(RxObservableInvoker.class) | |
.target("http://localhost:8080") | |
.request() | |
.get(); | |
assertThat(response.readEntity(String.class)).isEqualTo("Hello!"); | |
} | |
@Test | |
public void should_call_remote_resource_asynchronously() throws ExecutionException, InterruptedException { | |
stubFor(get(urlMatching("/.*")) | |
.willReturn(aResponse().withStatus(200).withBody("Hello!"))); | |
ListenableFuture<Response> response = Rx.newClient(RxListenableFutureInvoker.class) | |
.target("http://localhost:8080") | |
.request() | |
.rx() | |
.get(); | |
await().until(() -> assertThat(response.isDone()).isTrue()); | |
assertThat(response.get().readEntity(String.class)).isEqualTo("Hello!"); | |
} | |
@Test | |
public void should_call_remote_resource_synchronously_from_an_hystrix_command() { | |
stubFor(get(urlMatching("/.*")) | |
.willReturn(aResponse().withStatus(200).withBody("Hello!"))); | |
HystrixCommand<String> cmd = new HystrixCommand<String>(HystrixCommandGroupKey.Factory.asKey("Alex")) { | |
@Override | |
protected String run() throws Exception { | |
log.info("Running command"); | |
Response response = Rx.newClient(RxObservableInvoker.class) | |
.target("http://localhost:8080") | |
.request() | |
.get(); | |
return response.readEntity(String.class); | |
} | |
}; | |
AtomicBoolean success = new AtomicBoolean(); | |
cmd.toObservable().doOnNext(log::info).subscribe(s -> success.set("Hello!".equals(s))); | |
await().untilTrue(success); | |
} | |
@Test | |
public void should_call_remote_resource_asynchronously_from_an_hystrix_command() { | |
stubFor(get(urlMatching("/.*")) | |
.willReturn(aResponse().withStatus(200).withBody("Hello!"))); | |
HystrixObservableCommand<String> cmd = new HystrixObservableCommand<String>(HystrixCommandGroupKey.Factory.asKey("Alex")) { | |
@Override | |
protected Observable<String> construct() { | |
log.info("Running command"); | |
return Rx.newClient(RxObservableInvoker.class) | |
.target("http://localhost:8080") | |
.request() | |
.rx() | |
.get() | |
.map(res -> res.readEntity(String.class)); | |
} | |
}; | |
AtomicBoolean success = new AtomicBoolean(); | |
cmd.toObservable().doOnNext(log::info).subscribe(s -> success.set("Hello!".equals(s))); | |
await().untilTrue(success); | |
} | |
@Test | |
public void should_get_fallback_asynchronously() { | |
stubFor(get(urlMatching("/error")) | |
.willReturn(aResponse().withStatus(500))); | |
stubFor(get(urlMatching("/fallback")) | |
.willReturn(aResponse().withStatus(200).withBody("Better than nothing"))); | |
HystrixObservableCommand<String> cmd = new HystrixObservableCommand<String>(HystrixCommandGroupKey.Factory.asKey("Alex")) { | |
@Override | |
protected Observable<String> construct() { | |
log.info("Running command"); | |
return Rx.newClient(RxObservableInvoker.class) | |
.target("http://localhost:8080/error") | |
.request() | |
.rx() | |
.get() | |
.flatMap(res -> { | |
final Observable<String> result; | |
if (res.getStatus() == 200) { | |
result = Observable.just(res.readEntity(String.class)); | |
} else { | |
result = Observable.error(new Exception("Server error")); | |
} | |
return result; | |
}); | |
} | |
@Override | |
protected Observable<String> resumeWithFallback() { | |
log.info("Falling back"); | |
return Rx.newClient(RxObservableInvoker.class) | |
.target("http://localhost:8080/fallback") | |
.request() | |
.rx() | |
.get() | |
.map(res -> res.readEntity(String.class)); | |
} | |
}; | |
AtomicBoolean success = new AtomicBoolean(); | |
cmd.toObservable().doOnNext(log::info).subscribe(s -> success.set("Better than nothing".equals(s))); | |
await().untilTrue(success); | |
} | |
@Test | |
public void should_get_fallback_after_timeout() { | |
stubFor(get(urlMatching("/.*")) | |
.willReturn(aResponse().withStatus(200).withBody("will never happen").withFixedDelay(1100))); // default hystrix timeout is 1s | |
HystrixObservableCommand<String> cmd = new HystrixObservableCommand<String>(HystrixCommandGroupKey.Factory.asKey("Alex")) { | |
@Override | |
protected Observable<String> construct() { | |
log.info("Running command"); | |
return Rx.newClient(RxObservableInvoker.class) | |
.target("http://localhost:8080/whatever") | |
.request() | |
.rx() | |
.get() | |
.map(res -> res.readEntity(String.class)); | |
} | |
@Override | |
protected Observable<String> resumeWithFallback() { | |
log.info("Falling back"); | |
return Observable.just("hard coded result"); | |
} | |
}; | |
AtomicBoolean success = new AtomicBoolean(); | |
cmd.toObservable().doOnNext(log::info).subscribe(s -> success.set("hard coded result".equals(s))); | |
await().untilTrue(success); | |
} | |
@Test | |
public void should_get_an_error_after_timeout() { | |
stubFor(get(urlMatching("/.*")) | |
.willReturn(aResponse().withStatus(200).withBody("will never happen").withFixedDelay(1100))); // default hystrix timeout is 1s | |
HystrixObservableCommand<String> cmd = new HystrixObservableCommand<String>(HystrixCommandGroupKey.Factory.asKey("Alex")) { | |
@Override | |
protected Observable<String> construct() { | |
Observable<String> obs = Rx.newClient(RxObservableInvoker.class) | |
.target("http://localhost:8080/whatever") | |
.request() | |
.rx() | |
.get() | |
.map(res -> res.readEntity(String.class)); | |
return obs; | |
} | |
}; | |
AtomicBoolean error = new AtomicBoolean(); | |
cmd.toObservable().doOnNext(System.out::println).doOnError(t -> { | |
error.set(true); | |
}).subscribe(); | |
await().untilTrue(error); | |
} | |
@Test | |
public void should_retry_before_fallback() { | |
stubFor(get(urlMatching("/.*")) | |
.willReturn(aResponse().withStatus(500).withBody("Bad!!!"))); | |
HystrixObservableCommand<String> cmd = new HystrixObservableCommand<String>( | |
HystrixObservableCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("Alex")) | |
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(20000)) | |
) { | |
@Override | |
protected Observable<String> construct() { | |
log.info("Running command"); | |
Observable<String> obs = Rx.newClient(RxObservableInvoker.class) | |
.target("http://localhost:8080/whatever") | |
.request() | |
.rx() | |
.get() | |
.flatMap(res -> { | |
final Observable<String> result; | |
if (res.getStatus() == 200) { | |
result = Observable.just(res.readEntity(String.class)); | |
} else { | |
result = Observable.error(new Exception("Server error " + new Date())); | |
} | |
return result; | |
}); | |
return obs.retryWhen(attempts -> | |
attempts.zipWith(Observable.range(1, 3), (t, i) -> i).flatMap(i -> { | |
log.info("delay retry by " + i + " second(s)"); | |
return Observable.timer(i, TimeUnit.SECONDS); | |
}).concatWith(attempts.flatMap(Observable::error)) | |
); | |
} | |
@Override | |
protected Observable<String> resumeWithFallback() { | |
log.info("falling back"); | |
return Observable.just("fallback!"); | |
} | |
}; | |
AtomicBoolean success = new AtomicBoolean(); | |
cmd.toObservable().doOnNext(log::info).subscribe(s -> success.set("fallback!".equals(s))); | |
await().untilTrue(success); | |
verify(1 + 3, getRequestedFor(urlMatching("/.*"))); | |
} | |
@Test | |
public void should_open_circuit_breaker_and_call_remote_resource_only_once() { | |
stubFor(get(urlMatching("/.*")) | |
.willReturn(aResponse().withStatus(200).withBody("Some text").withFixedDelay(1100))); | |
HystrixObservableCommand<String> cmd | |
= new HystrixObservableCommand<String>( | |
HystrixObservableCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("Alex")) | |
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withCircuitBreakerRequestVolumeThreshold(1)) | |
) { | |
@Override | |
protected Observable<String> construct() { | |
return Rx.newClient(RxObservableInvoker.class) | |
.target("http://localhost:8080/whatever") | |
.request() | |
.rx() | |
.get() | |
.map(res -> res.readEntity(String.class)); | |
} | |
@Override | |
protected Observable<String> resumeWithFallback() { | |
return Observable.just("hard coded result"); | |
} | |
}; | |
cmd.toObservable().delay(100, TimeUnit.MILLISECONDS).repeat(10).toBlocking().forEach(log::info); | |
verify(1, getRequestedFor(urlMatching("/.*"))); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment