Skip to content

Instantly share code, notes, and snippets.

@alexvictoor
Last active November 1, 2015 22:43
Show Gist options
  • Save alexvictoor/8c8ebf77e874104c9c5b to your computer and use it in GitHub Desktop.
Save alexvictoor/8c8ebf77e874104c9c5b to your computer and use it in GitHub Desktop.
Jersey + RxJava + Hystrix sandbox
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.alex</groupId>
<artifactId>hystrix-sandbox</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<jersey.version>2.22.1</jersey.version>
</properties>
<dependencies>
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-core</artifactId>
<version>1.4.17</version>
</dependency>
<dependency>
<groupId>com.netflix.archaius</groupId>
<artifactId>archaius-core</artifactId>
<version>0.4.1</version>
</dependency>
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
<version>1.0.14</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.6</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.6</version>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.ext.rx</groupId>
<artifactId>jersey-rx-client-java8</artifactId>
<version>${jersey.version}</version>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.ext.rx</groupId>
<artifactId>jersey-rx-client-guava</artifactId>
<version>${jersey.version}</version>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.ext.rx</groupId>
<artifactId>jersey-rx-client-rxjava</artifactId>
<version>${jersey.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.tomakehurst</groupId>
<artifactId>wiremock</artifactId>
<version>1.53</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>1.7.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.jayway.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>1.6.2</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
import com.github.tomakehurst.wiremock.junit.WireMockRule;
import com.google.common.util.concurrent.ListenableFuture;
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandProperties;
import com.netflix.hystrix.HystrixObservableCommand;
import org.glassfish.jersey.client.rx.Rx;
import org.glassfish.jersey.client.rx.guava.RxListenableFutureInvoker;
import org.glassfish.jersey.client.rx.rxjava.RxObservableInvoker;
import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import javax.ws.rs.core.Response;
import java.util.Date;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static com.github.tomakehurst.wiremock.client.WireMock.*;
import static com.jayway.awaitility.Awaitility.await;
import static org.assertj.core.api.Assertions.assertThat;
public class HystrixTest {
private final static Logger log = LoggerFactory.getLogger(HystrixTest.class);
@Rule
public WireMockRule mock = new WireMockRule();
@Test
public void should_call_remote_resource_synchronously() {
stubFor(get(urlMatching("/.*"))
.willReturn(aResponse().withStatus(200).withBody("Hello!")));
Response response = Rx.newClient(RxObservableInvoker.class)
.target("http://localhost:8080")
.request()
.get();
assertThat(response.readEntity(String.class)).isEqualTo("Hello!");
}
@Test
public void should_call_remote_resource_asynchronously() throws ExecutionException, InterruptedException {
stubFor(get(urlMatching("/.*"))
.willReturn(aResponse().withStatus(200).withBody("Hello!")));
ListenableFuture<Response> response = Rx.newClient(RxListenableFutureInvoker.class)
.target("http://localhost:8080")
.request()
.rx()
.get();
await().until(() -> assertThat(response.isDone()).isTrue());
assertThat(response.get().readEntity(String.class)).isEqualTo("Hello!");
}
@Test
public void should_call_remote_resource_synchronously_from_an_hystrix_command() {
stubFor(get(urlMatching("/.*"))
.willReturn(aResponse().withStatus(200).withBody("Hello!")));
HystrixCommand<String> cmd = new HystrixCommand<String>(HystrixCommandGroupKey.Factory.asKey("Alex")) {
@Override
protected String run() throws Exception {
log.info("Running command");
Response response = Rx.newClient(RxObservableInvoker.class)
.target("http://localhost:8080")
.request()
.get();
return response.readEntity(String.class);
}
};
AtomicBoolean success = new AtomicBoolean();
cmd.toObservable().doOnNext(log::info).subscribe(s -> success.set("Hello!".equals(s)));
await().untilTrue(success);
}
@Test
public void should_call_remote_resource_asynchronously_from_an_hystrix_command() {
stubFor(get(urlMatching("/.*"))
.willReturn(aResponse().withStatus(200).withBody("Hello!")));
HystrixObservableCommand<String> cmd = new HystrixObservableCommand<String>(HystrixCommandGroupKey.Factory.asKey("Alex")) {
@Override
protected Observable<String> construct() {
log.info("Running command");
return Rx.newClient(RxObservableInvoker.class)
.target("http://localhost:8080")
.request()
.rx()
.get()
.map(res -> res.readEntity(String.class));
}
};
AtomicBoolean success = new AtomicBoolean();
cmd.toObservable().doOnNext(log::info).subscribe(s -> success.set("Hello!".equals(s)));
await().untilTrue(success);
}
@Test
public void should_get_fallback_asynchronously() {
stubFor(get(urlMatching("/error"))
.willReturn(aResponse().withStatus(500)));
stubFor(get(urlMatching("/fallback"))
.willReturn(aResponse().withStatus(200).withBody("Better than nothing")));
HystrixObservableCommand<String> cmd = new HystrixObservableCommand<String>(HystrixCommandGroupKey.Factory.asKey("Alex")) {
@Override
protected Observable<String> construct() {
log.info("Running command");
return Rx.newClient(RxObservableInvoker.class)
.target("http://localhost:8080/error")
.request()
.rx()
.get()
.flatMap(res -> {
final Observable<String> result;
if (res.getStatus() == 200) {
result = Observable.just(res.readEntity(String.class));
} else {
result = Observable.error(new Exception("Server error"));
}
return result;
});
}
@Override
protected Observable<String> resumeWithFallback() {
log.info("Falling back");
return Rx.newClient(RxObservableInvoker.class)
.target("http://localhost:8080/fallback")
.request()
.rx()
.get()
.map(res -> res.readEntity(String.class));
}
};
AtomicBoolean success = new AtomicBoolean();
cmd.toObservable().doOnNext(log::info).subscribe(s -> success.set("Better than nothing".equals(s)));
await().untilTrue(success);
}
@Test
public void should_get_fallback_after_timeout() {
stubFor(get(urlMatching("/.*"))
.willReturn(aResponse().withStatus(200).withBody("will never happen").withFixedDelay(1100))); // default hystrix timeout is 1s
HystrixObservableCommand<String> cmd = new HystrixObservableCommand<String>(HystrixCommandGroupKey.Factory.asKey("Alex")) {
@Override
protected Observable<String> construct() {
log.info("Running command");
return Rx.newClient(RxObservableInvoker.class)
.target("http://localhost:8080/whatever")
.request()
.rx()
.get()
.map(res -> res.readEntity(String.class));
}
@Override
protected Observable<String> resumeWithFallback() {
log.info("Falling back");
return Observable.just("hard coded result");
}
};
AtomicBoolean success = new AtomicBoolean();
cmd.toObservable().doOnNext(log::info).subscribe(s -> success.set("hard coded result".equals(s)));
await().untilTrue(success);
}
@Test
public void should_get_an_error_after_timeout() {
stubFor(get(urlMatching("/.*"))
.willReturn(aResponse().withStatus(200).withBody("will never happen").withFixedDelay(1100))); // default hystrix timeout is 1s
HystrixObservableCommand<String> cmd = new HystrixObservableCommand<String>(HystrixCommandGroupKey.Factory.asKey("Alex")) {
@Override
protected Observable<String> construct() {
Observable<String> obs = Rx.newClient(RxObservableInvoker.class)
.target("http://localhost:8080/whatever")
.request()
.rx()
.get()
.map(res -> res.readEntity(String.class));
return obs;
}
};
AtomicBoolean error = new AtomicBoolean();
cmd.toObservable().doOnNext(System.out::println).doOnError(t -> {
error.set(true);
}).subscribe();
await().untilTrue(error);
}
@Test
public void should_retry_before_fallback() {
stubFor(get(urlMatching("/.*"))
.willReturn(aResponse().withStatus(500).withBody("Bad!!!")));
HystrixObservableCommand<String> cmd = new HystrixObservableCommand<String>(
HystrixObservableCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("Alex"))
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(20000))
) {
@Override
protected Observable<String> construct() {
log.info("Running command");
Observable<String> obs = Rx.newClient(RxObservableInvoker.class)
.target("http://localhost:8080/whatever")
.request()
.rx()
.get()
.flatMap(res -> {
final Observable<String> result;
if (res.getStatus() == 200) {
result = Observable.just(res.readEntity(String.class));
} else {
result = Observable.error(new Exception("Server error " + new Date()));
}
return result;
});
return obs.retryWhen(attempts ->
attempts.zipWith(Observable.range(1, 3), (t, i) -> i).flatMap(i -> {
log.info("delay retry by " + i + " second(s)");
return Observable.timer(i, TimeUnit.SECONDS);
}).concatWith(attempts.flatMap(Observable::error))
);
}
@Override
protected Observable<String> resumeWithFallback() {
log.info("falling back");
return Observable.just("fallback!");
}
};
AtomicBoolean success = new AtomicBoolean();
cmd.toObservable().doOnNext(log::info).subscribe(s -> success.set("fallback!".equals(s)));
await().untilTrue(success);
verify(1 + 3, getRequestedFor(urlMatching("/.*")));
}
@Test
public void should_open_circuit_breaker_and_call_remote_resource_only_once() {
stubFor(get(urlMatching("/.*"))
.willReturn(aResponse().withStatus(200).withBody("Some text").withFixedDelay(1100)));
HystrixObservableCommand<String> cmd
= new HystrixObservableCommand<String>(
HystrixObservableCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("Alex"))
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withCircuitBreakerRequestVolumeThreshold(1))
) {
@Override
protected Observable<String> construct() {
return Rx.newClient(RxObservableInvoker.class)
.target("http://localhost:8080/whatever")
.request()
.rx()
.get()
.map(res -> res.readEntity(String.class));
}
@Override
protected Observable<String> resumeWithFallback() {
return Observable.just("hard coded result");
}
};
cmd.toObservable().delay(100, TimeUnit.MILLISECONDS).repeat(10).toBlocking().forEach(log::info);
verify(1, getRequestedFor(urlMatching("/.*")));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment