Skip to content

Instantly share code, notes, and snippets.

@vietj
Created September 11, 2024 16:31
Show Gist options
  • Save vietj/02ce9dc89c8a1c11dabe8828f760f973 to your computer and use it in GitHub Desktop.
Save vietj/02ce9dc89c8a1c11dabe8828f760f973 to your computer and use it in GitHub Desktop.
diff --git a/vertx-web-client/src/test/java/io/vertx/ext/web/client/WebClientTest.java b/vertx-web-client/src/test/java/io/vertx/ext/web/client/WebClientTest.java
index 4426909..11761b6 100644
--- a/vertx-web-client/src/test/java/io/vertx/ext/web/client/WebClientTest.java
+++ b/vertx-web-client/src/test/java/io/vertx/ext/web/client/WebClientTest.java
@@ -970,10 +970,10 @@ public class WebClientTest extends WebClientTestBase {
write(buffer, promise);
return promise.future();
}
- public void write(Buffer buffer, Handler<AsyncResult<Void>> handler) {
+ public void write(Buffer buffer, Promise<Void> promise) {
received.addAndGet(buffer.length());
- if (handler != null) {
- handler.handle(Future.succeededFuture());
+ if (promise != null) {
+ promise.complete();
}
}
public Future<Void> close() {
diff --git a/vertx-web-graphql/src/test/java/io/vertx/ext/web/handler/graphql/GraphQLRequest.java b/vertx-web-graphql/src/test/java/io/vertx/ext/web/handler/graphql/GraphQLRequest.java
index 0a6673b..0b30e4e 100644
--- a/vertx-web-graphql/src/test/java/io/vertx/ext/web/handler/graphql/GraphQLRequest.java
+++ b/vertx-web-graphql/src/test/java/io/vertx/ext/web/handler/graphql/GraphQLRequest.java
@@ -129,6 +129,10 @@ public class GraphQLRequest {
return this;
}
+ void send(HttpClient client, Promise<JsonObject> handler) throws Exception {
+ send(client, 200, handler::handle);
+ }
+
void send(HttpClient client, Handler<AsyncResult<JsonObject>> handler) throws Exception {
send(client, 200, handler);
}
diff --git a/vertx-web/src/main/java/io/vertx/ext/web/handler/impl/ChainAuthHandlerImpl.java b/vertx-web/src/main/java/io/vertx/ext/web/handler/impl/ChainAuthHandlerImpl.java
index b2fb515..5d1c16a 100644
--- a/vertx-web/src/main/java/io/vertx/ext/web/handler/impl/ChainAuthHandlerImpl.java
+++ b/vertx-web/src/main/java/io/vertx/ext/web/handler/impl/ChainAuthHandlerImpl.java
@@ -67,7 +67,7 @@ public class ChainAuthHandlerImpl extends AuthenticationHandlerImpl<Authenticati
}
}
- private void iterate(final int idx, final RoutingContext ctx, User result, Throwable exception, Handler<AsyncResult<User>> handler) {
+ private void iterate(final int idx, final RoutingContext ctx, User result, Throwable exception, Promise<User> handler) {
// stop condition
if (idx >= handlers.size()) {
if (all) {
diff --git a/vertx-web/src/main/java/io/vertx/ext/web/handler/impl/SessionHandlerImpl.java b/vertx-web/src/main/java/io/vertx/ext/web/handler/impl/SessionHandlerImpl.java
index 75220da..61a298c 100644
--- a/vertx-web/src/main/java/io/vertx/ext/web/handler/impl/SessionHandlerImpl.java
+++ b/vertx-web/src/main/java/io/vertx/ext/web/handler/impl/SessionHandlerImpl.java
@@ -434,7 +434,7 @@ public class SessionHandlerImpl implements SessionHandler {
return context.succeededFuture(session);
});
}
- private void doGetSession(Vertx vertx, long startTime, String sessionID, Handler<AsyncResult<Session>> resultHandler) {
+ private void doGetSession(Vertx vertx, long startTime, String sessionID, Promise<Session> resultHandler) {
sessionStore.get(sessionID)
.onComplete(res -> {
if (res.succeeded()) {
diff --git a/vertx-web/src/main/java/io/vertx/ext/web/handler/sockjs/impl/SockJSSession.java b/vertx-web/src/main/java/io/vertx/ext/web/handler/sockjs/impl/SockJSSession.java
index 306bf79..a4c2927 100644
--- a/vertx-web/src/main/java/io/vertx/ext/web/handler/sockjs/impl/SockJSSession.java
+++ b/vertx-web/src/main/java/io/vertx/ext/web/handler/sockjs/impl/SockJSSession.java
@@ -73,7 +73,7 @@ class SockJSSession extends SockJSSocketBase implements Shareable {
private final long timeout;
private final Handler<SockJSSocket> sockHandler;
private final long heartbeatID;
- private final List<Handler<AsyncResult<Void>>> writeAcks = new ArrayList<>();
+ private final List<Promise<Void>> writeAcks = new ArrayList<>();
private TransportListener listener;
private boolean closed;
private boolean openWritten;
@@ -308,7 +308,7 @@ class SockJSSession extends SockJSSocketBase implements Shareable {
final TransportListener listener = this.listener;
if (listener != null) {
final String json;
- final List<Handler<AsyncResult<Void>>> acks;
+ final List<Promise<Void>> acks;
synchronized (this) {
if (!pendingWrites.isEmpty()) {
json = JsonCodec.encode(pendingWrites.toArray(new String[0]));
diff --git a/vertx-web/src/main/java/io/vertx/ext/web/healthchecks/impl/HealthCheckHandlerImpl.java b/vertx-web/src/main/java/io/vertx/ext/web/healthchecks/impl/HealthCheckHandlerImpl.java
index 3d85990..90fdd14 100644
--- a/vertx-web/src/main/java/io/vertx/ext/web/healthchecks/impl/HealthCheckHandlerImpl.java
+++ b/vertx-web/src/main/java/io/vertx/ext/web/healthchecks/impl/HealthCheckHandlerImpl.java
@@ -116,7 +116,7 @@ public class HealthCheckHandlerImpl implements HealthCheckHandler {
if (this.resultMapper != null) {
Promise<CheckResult> promise = Promise.promise();
promise.future().flatMap(resultMapper).onComplete(handler);
- return promise;
+ return promise::handle;
}
return handler;
}
diff --git a/src/main/java/io/vertx/redis/client/impl/RedisClusterClient.java b/src/main/java/io/vertx/redis/client/impl/RedisClusterClient.java
index 8aab6c2..178c538 100644
--- a/src/main/java/io/vertx/redis/client/impl/RedisClusterClient.java
+++ b/src/main/java/io/vertx/redis/client/impl/RedisClusterClient.java
@@ -154,7 +154,7 @@ public class RedisClusterClient extends BaseRedisClient implements Redis {
return promise.future();
}
- private void connect(Slots slots, Handler<AsyncResult<RedisConnection>> onConnected) {
+ private void connect(Slots slots, Promise<RedisConnection> onConnected) {
// create a cluster connection
final Set<Throwable> failures = ConcurrentHashMap.newKeySet();
final AtomicInteger counter = new AtomicInteger();
@@ -180,7 +180,7 @@ public class RedisClusterClient extends BaseRedisClient implements Redis {
}
private void connectionComplete(AtomicInteger counter, Slots slots, Map<String, PooledRedisConnection> connections,
- Set<Throwable> failures, Handler<AsyncResult<RedisConnection>> onConnected) {
+ Set<Throwable> failures, Promise<RedisConnection> onConnected) {
if (counter.incrementAndGet() == slots.endpoints().length) {
// end condition
if (!failures.isEmpty()) {
diff --git a/src/main/java/io/vertx/redis/client/impl/RedisClusterConnection.java b/src/main/java/io/vertx/redis/client/impl/RedisClusterConnection.java
index ecf8395..42a3b15 100644
--- a/src/main/java/io/vertx/redis/client/impl/RedisClusterConnection.java
+++ b/src/main/java/io/vertx/redis/client/impl/RedisClusterConnection.java
@@ -266,7 +266,7 @@ public class RedisClusterConnection implements RedisConnection {
return map;
}
- void send(String endpoint, int retries, Request command, Handler<AsyncResult<Response>> handler) {
+ void send(String endpoint, int retries, Request command, Promise<Response> handler) {
PooledRedisConnection connection = connections.get(endpoint);
if (connection == null) {
connectionManager.getConnection(endpoint, RedisReplicas.NEVER != connectOptions.getUseReplicas() ? Request.cmd(Command.READONLY) : null)
@@ -319,13 +319,14 @@ public class RedisClusterConnection implements RedisConnection {
}
String newEndpoint = uri.protocol() + "://" + uri.userinfo() + addr;
if (ask) {
- send(newEndpoint, retries - 1, Request.cmd(Command.ASKING), resp -> {
- if (resp.failed()) {
- handler.handle(Future.failedFuture("Failed ASKING: " + resp.cause() + ", caused by " + cause));
- } else {
- send(newEndpoint, retries - 1, command, handler);
- }
- });
+ Future.<Response>future(p -> send(newEndpoint, retries - 1, Request.cmd(Command.ASKING), p))
+ .onComplete(resp -> {
+ if (resp.failed()) {
+ handler.handle(Future.failedFuture("Failed ASKING: " + resp.cause() + ", caused by " + cause));
+ } else {
+ send(newEndpoint, retries - 1, command, handler);
+ }
+ });
} else {
send(newEndpoint, retries - 1, command, handler);
}
@@ -447,7 +448,7 @@ public class RedisClusterConnection implements RedisConnection {
return promise.future();
}
- private void batch(String endpoint, int retries, List<Request> commands, Handler<AsyncResult<List<Response>>> handler) {
+ private void batch(String endpoint, int retries, List<Request> commands, Promise<List<Response>> handler) {
RedisConnection connection = connections.get(endpoint);
if (connection == null) {
connectionManager.getConnection(endpoint, RedisReplicas.NEVER != connectOptions.getUseReplicas() ? Request.cmd(Command.READONLY) : null)
@@ -500,13 +501,14 @@ public class RedisClusterConnection implements RedisConnection {
}
String newEndpoint = uri.protocol() + "://" + uri.userinfo() + addr;
if (ask) {
- batch(newEndpoint, retries - 1, Collections.singletonList(Request.cmd(Command.ASKING)), resp -> {
- if (resp.failed()) {
- handler.handle(Future.failedFuture("Failed ASKING: " + resp.cause() + ", caused by " + cause));
- } else {
- batch(newEndpoint, retries - 1, commands, handler);
- }
- });
+ Future.<List<Response>>future(p -> batch(newEndpoint, retries - 1, Collections.singletonList(Request.cmd(Command.ASKING)), p))
+ .onComplete(resp -> {
+ if (resp.failed()) {
+ handler.handle(Future.failedFuture("Failed ASKING: " + resp.cause() + ", caused by " + cause));
+ } else {
+ batch(newEndpoint, retries - 1, commands, handler);
+ }
+ });
} else {
batch(newEndpoint, retries - 1, commands, handler);
}
diff --git a/src/main/java/io/vertx/redis/client/impl/RedisClusterImpl.java b/src/main/java/io/vertx/redis/client/impl/RedisClusterImpl.java
index 243c724..10d2400 100644
--- a/src/main/java/io/vertx/redis/client/impl/RedisClusterImpl.java
+++ b/src/main/java/io/vertx/redis/client/impl/RedisClusterImpl.java
@@ -83,14 +83,15 @@ public class RedisClusterImpl implements RedisCluster {
return;
}
- conn.send(endpoints[index], RedisClusterConnection.RETRIES, request, ar -> {
- if (ar.succeeded()) {
- result.add(ar.result());
- onAllNodes(endpoints, index + 1, request, result, conn, promise);
- } else {
- promise.fail(ar.cause());
- }
- });
+ Future.<Response>future(p -> conn.send(endpoints[index], RedisClusterConnection.RETRIES, request, p))
+ .onComplete(ar -> {
+ if (ar.succeeded()) {
+ result.add(ar.result());
+ onAllNodes(endpoints, index + 1, request, result, conn, promise);
+ } else {
+ promise.fail(ar.cause());
+ }
+ });
}
@Override
diff --git a/src/main/java/io/vertx/redis/client/impl/RedisReplicationClient.java b/src/main/java/io/vertx/redis/client/impl/RedisReplicationClient.java
index 4c0480e..e90c448 100644
--- a/src/main/java/io/vertx/redis/client/impl/RedisReplicationClient.java
+++ b/src/main/java/io/vertx/redis/client/impl/RedisReplicationClient.java
@@ -117,7 +117,7 @@ public class RedisReplicationClient extends BaseRedisClient implements Redis {
return promise.future();
}
- private void connectWithDiscoverTopology(List<String> endpoints, int index, Set<Throwable> failures, Handler<AsyncResult<RedisConnection>> onConnect) {
+ private void connectWithDiscoverTopology(List<String> endpoints, int index, Set<Throwable> failures, Promise<RedisConnection> onConnect) {
if (index >= endpoints.size()) {
// stop condition
StringBuilder message = new StringBuilder("Cannot connect to any of the provided endpoints");
diff --git a/src/main/java/io/vertx/redis/client/impl/RedisSentinelClient.java b/src/main/java/io/vertx/redis/client/impl/RedisSentinelClient.java
index 61d67ad..6ad3e5b 100644
--- a/src/main/java/io/vertx/redis/client/impl/RedisSentinelClient.java
+++ b/src/main/java/io/vertx/redis/client/impl/RedisSentinelClient.java
@@ -20,6 +20,8 @@ import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
+import io.vertx.core.internal.ContextInternal;
+import io.vertx.core.internal.PromiseInternal;
import io.vertx.core.internal.logging.Logger;
import io.vertx.core.internal.logging.LoggerFactory;
import io.vertx.core.net.NetClientOptions;
@@ -73,7 +75,7 @@ public class RedisSentinelClient extends BaseRedisClient implements Redis {
public Future<RedisConnection> connect() {
final Promise<RedisConnection> promise = vertx.promise();
- createConnectionInternal(connectOptions, connectOptions.getRole(), createConnection -> {
+ createConnectionInternal(vertx.getOrCreateContext(), connectOptions, connectOptions.getRole()).onComplete(createConnection -> {
if (createConnection.failed()) {
promise.fail(createConnection.cause());
return;
@@ -125,12 +127,12 @@ public class RedisSentinelClient extends BaseRedisClient implements Redis {
}
private Future<PooledRedisConnection> createConnectionInternal(RedisRole role) {
- Promise<PooledRedisConnection> promise = Promise.promise();
- createConnectionInternal(connectOptions, role, promise);
- return promise.future();
+ return createConnectionInternal(vertx.getOrCreateContext(), connectOptions, role);
}
- private void createConnectionInternal(RedisSentinelConnectOptions options, RedisRole role, Handler<AsyncResult<PooledRedisConnection>> onCreate) {
+ private Future<PooledRedisConnection> createConnectionInternal(ContextInternal ctx, RedisSentinelConnectOptions options, RedisRole role) {
+
+ PromiseInternal<PooledRedisConnection> onCreate = ctx.promise();
final Handler<AsyncResult<RedisURI>> createAndConnect = resolve -> {
if (resolve.failed()) {
@@ -164,6 +166,8 @@ public class RedisSentinelClient extends BaseRedisClient implements Redis {
resolveClient(this::getReplicaFromEndpoint, options, createAndConnect);
break;
}
+
+ return onCreate.future();
}
/**
diff --git a/src/main/java/io/vertx/redis/client/impl/SharedSlots.java b/src/main/java/io/vertx/redis/client/impl/SharedSlots.java
index b095157..16f7d70 100644
--- a/src/main/java/io/vertx/redis/client/impl/SharedSlots.java
+++ b/src/main/java/io/vertx/redis/client/impl/SharedSlots.java
@@ -55,7 +55,7 @@ class SharedSlots {
}
}
- private void getSlots(List<String> endpoints, int index, Set<Throwable> failures, Handler<AsyncResult<Slots>> onGotSlots) {
+ private void getSlots(List<String> endpoints, int index, Set<Throwable> failures, Promise<Slots> onGotSlots) {
if (index >= endpoints.size()) {
// stop condition
StringBuilder message = new StringBuilder("Cannot connect to any of the provided endpoints");
diff --git a/src/main/java/io/vertx/httpproxy/impl/ProxiedRequest.java b/src/main/java/io/vertx/httpproxy/impl/ProxiedRequest.java
index 931f00f..b7e68e4 100644
--- a/src/main/java/io/vertx/httpproxy/impl/ProxiedRequest.java
+++ b/src/main/java/io/vertx/httpproxy/impl/ProxiedRequest.java
@@ -146,7 +146,7 @@ public class ProxiedRequest implements ProxyRequest {
return new ProxiedResponse(this, proxiedRequest.response());
}
- void sendRequest(Handler<AsyncResult<ProxyResponse>> responseHandler) {
+ void sendRequest(Promise<ProxyResponse> responseHandler) {
request.response().<ProxyResponse>map(r -> {
r.pause(); // Pause it
diff --git a/src/main/java/io/vertx/httpproxy/impl/ProxiedResponse.java b/src/main/java/io/vertx/httpproxy/impl/ProxiedResponse.java
index 5d0690f..36a1d20 100644
--- a/src/main/java/io/vertx/httpproxy/impl/ProxiedResponse.java
+++ b/src/main/java/io/vertx/httpproxy/impl/ProxiedResponse.java
@@ -172,7 +172,7 @@ class ProxiedResponse implements ProxyResponse {
return promise.future();
}
- public void send(Handler<AsyncResult<Void>> completionHandler) {
+ public void send(Promise<Void> completionHandler) {
// Set stuff
proxiedResponse.setStatusCode(statusCode);
@@ -261,7 +261,7 @@ class ProxiedResponse implements ProxyResponse {
return this;
}
- private void sendResponse(ReadStream<Buffer> body, Handler<AsyncResult<Void>> completionHandler) {
+ private void sendResponse(ReadStream<Buffer> body, Promise<Void> completionHandler) {
Pipe<Buffer> pipe = body.pipe();
pipe.endOnSuccess(true);
pipe.endOnFailure(false);
diff --git a/src/test/java/io/vertx/ext/mongo/MongoClientTest.java b/src/test/java/io/vertx/ext/mongo/MongoClientTest.java
index 89af155..424d636 100644
--- a/src/test/java/io/vertx/ext/mongo/MongoClientTest.java
+++ b/src/test/java/io/vertx/ext/mongo/MongoClientTest.java
@@ -177,7 +177,7 @@ public class MongoClientTest extends MongoClientTestBase {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<List<String>> foos = new AtomicReference<>();
mongoClient.createCollection(collection).onComplete(onSuccess(res -> {
- insertDocs(mongoClient, collection, numDocs, onSuccess(res2 -> {
+ insertDocs(mongoClient, collection, numDocs).onComplete(onSuccess(res2 -> {
FindOptions findOptions = new FindOptions().setSort(new JsonObject().put("counter", 1)).setBatchSize(1);
ReadStream<JsonObject> stream = mongoClient.findBatchWithOptions(collection, new JsonObject(), findOptions);
streamReference.set(stream);
@@ -239,7 +239,7 @@ public class MongoClientTest extends MongoClientTestBase {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicLong count = new AtomicLong();
mongoClient.createCollection(collection).onComplete(onSuccess(res -> {
- insertDocs(mongoClient, collection, numDocs, onSuccess(res2 -> {
+ insertDocs(mongoClient, collection, numDocs).onComplete(onSuccess(res2 -> {
mongoClient.aggregate(collection,
new JsonArray().add(new JsonObject().put("$match", new JsonObject().put("foo", new JsonObject().put("$regex", "bar1"))))
.add(new JsonObject().put("$count", "foo_starting_with_bar1")))
@@ -263,7 +263,7 @@ public class MongoClientTest extends MongoClientTestBase {
final CountDownLatch fetchLatch = new CountDownLatch(numDocs);
final CountDownLatch endLatch = new CountDownLatch(1);
final String collection = randomCollection();
- insertDocs(mongoClient, collection, numDocs, onSuccess(res -> {
+ insertDocs(mongoClient, collection, numDocs).onComplete(onSuccess(res -> {
mongoClient.aggregateWithOptions(collection, pipeline, aggregateOptions)
.exceptionHandler(this::fail)
.handler(j -> fetchLatch.countDown())
diff --git a/src/test/java/io/vertx/ext/mongo/MongoTestBase.java b/src/test/java/io/vertx/ext/mongo/MongoTestBase.java
index b325267..1e23323 100644
--- a/src/test/java/io/vertx/ext/mongo/MongoTestBase.java
+++ b/src/test/java/io/vertx/ext/mongo/MongoTestBase.java
@@ -19,6 +19,7 @@ package io.vertx.ext.mongo;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
+import io.vertx.core.Promise;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.test.core.TestUtils;
@@ -126,7 +127,7 @@ public abstract class MongoTestBase extends VertxTestBase {
return "ext-mongo" + TestUtils.randomAlphaString(20);
}
- protected void insertDocs(MongoClient mongoClient, String collection, int num, Handler<AsyncResult<Void>> resultHandler) {
+ protected void insertDocs(MongoClient mongoClient, String collection, int num, Promise<Void> resultHandler) {
insertDocs(mongoClient, collection, num, this::createDoc, resultHandler);
}
@@ -138,7 +139,7 @@ public abstract class MongoTestBase extends VertxTestBase {
return Future.future(h -> insertDocs(mongoClient, collection, num, docSupplier, h));
}
- protected void insertDocs(MongoClient mongoClient, String collection, int num, Function<Integer, JsonObject> docSupplier, Handler<AsyncResult<Void>> resultHandler) {
+ protected void insertDocs(MongoClient mongoClient, String collection, int num, Function<Integer, JsonObject> docSupplier, Promise<Void> resultHandler) {
if (num != 0) {
AtomicInteger cnt = new AtomicInteger();
for (int i = 0; i < num; i++) {
diff --git a/vertx-db2-client/src/main/java/io/vertx/db2client/impl/DB2SocketConnection.java b/vertx-db2-client/src/main/java/io/vertx/db2client/impl/DB2SocketConnection.java
index 63d8f4f..9893381 100644
--- a/vertx-db2-client/src/main/java/io/vertx/db2client/impl/DB2SocketConnection.java
+++ b/vertx-db2-client/src/main/java/io/vertx/db2client/impl/DB2SocketConnection.java
@@ -17,6 +17,7 @@ package io.vertx.db2client.impl;
import io.netty.channel.ChannelPipeline;
import io.vertx.core.AsyncResult;
+import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.internal.ContextInternal;
@@ -85,7 +86,13 @@ public class DB2SocketConnection extends SocketConnectionBase {
if (txCmd.kind == TxCommand.Kind.BEGIN) {
// DB2 always implicitly starts a transaction with each query, and does
// not support the 'BEGIN' keyword. Instead we can no-op BEGIN commands
- cmd.handler = handler;
+ cmd.handler = (res, err) -> {
+ if (err != null) {
+ handler.handle(Future.failedFuture(err));
+ } else {
+ handler.handle(Future.succeededFuture(res));
+ }
+ };
cmd.complete(CommandResponse.success(txCmd.result).toAsyncResult());
} else {
SimpleQueryCommand<Void> cmd2 = new SimpleQueryCommand<>(txCmd.kind.sql, false, false,
diff --git a/vertx-oracle-client/src/main/java/io/vertx/oracleclient/impl/OracleJdbcConnection.java b/vertx-oracle-client/src/main/java/io/vertx/oracleclient/impl/OracleJdbcConnection.java
index 49b6ad8..17cf422 100644
--- a/vertx-oracle-client/src/main/java/io/vertx/oracleclient/impl/OracleJdbcConnection.java
+++ b/vertx-oracle-client/src/main/java/io/vertx/oracleclient/impl/OracleJdbcConnection.java
@@ -173,7 +173,7 @@ public class OracleJdbcConnection implements Connection {
return promise.future();
}
- private <R> void doSchedule(CommandBase<R> cmd, Handler<AsyncResult<R>> handler) {
+ private <R> void doSchedule(CommandBase<R> cmd, Promise<R> handler) {
cmd.handler = handler;
if (closePromise == null) {
pending.add(cmd);
diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/pubsub/PgSubscriberImpl.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/pubsub/PgSubscriberImpl.java
index 2d3de59..83ab86a 100644
--- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/pubsub/PgSubscriberImpl.java
+++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/pubsub/PgSubscriberImpl.java
@@ -106,7 +106,7 @@ public class PgSubscriberImpl implements PgSubscriber {
if (!closed) {
Long val = reconnectPolicy.apply(count);
if (val >= 0) {
- tryConnect(val, ar -> {
+ Future.<Void>future(p -> tryConnect(val, p)).onComplete(ar -> {
if (ar.failed()) {
checkReconnect(count + 1);
}
@@ -150,7 +150,7 @@ public class PgSubscriberImpl implements PgSubscriber {
return promise.future();
}
- private void tryConnect(long delayMillis, Handler<AsyncResult<Void>> handler) {
+ private void tryConnect(long delayMillis, Promise<Void> handler) {
if (!connecting) {
connecting = true;
if (delayMillis > 0) {
@@ -161,11 +161,11 @@ public class PgSubscriberImpl implements PgSubscriber {
}
}
- private void doConnect(Handler<AsyncResult<Void>> completionHandler) {
+ private void doConnect(Promise<Void> completionHandler) {
PgConnection.connect(vertx, options).onComplete(ar -> handleConnectResult(completionHandler, ar));
}
- private synchronized void handleConnectResult(Handler<AsyncResult<Void>> completionHandler, AsyncResult<PgConnection> ar1) {
+ private synchronized void handleConnectResult(Promise<Void> completionHandler, AsyncResult<PgConnection> ar1) {
connecting = false;
if (ar1.succeeded()) {
conn = ar1.result();
diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/SocketConnectionBase.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/SocketConnectionBase.java
index 2cc46eb..0fc7d16 100644
--- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/SocketConnectionBase.java
+++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/SocketConnectionBase.java
@@ -45,6 +45,7 @@ import io.vertx.sqlclient.spi.DatabaseMetadata;
import java.util.ArrayDeque;
import java.util.List;
+import java.util.function.BiConsumer;
import java.util.function.Predicate;
/**
@@ -207,6 +208,16 @@ public abstract class SocketConnectionBase implements Connection {
}
protected <R> void doSchedule(CommandBase<R> cmd, Handler<AsyncResult<R>> handler) {
+ doSchedule(cmd, (r, err) -> {
+ if (err == null) {
+ handler.handle(Future.succeededFuture(r));
+ } else {
+ handler.handle(Future.failedFuture(err));
+ }
+ });
+ }
+
+ protected <R> void doSchedule(CommandBase<R> cmd, BiConsumer<R, Throwable> handler) {
if (handler == null) {
throw new IllegalArgumentException();
}
@@ -220,7 +231,7 @@ public abstract class SocketConnectionBase implements Connection {
CompositeCommand composite = (CompositeCommand) cmd;
List<CommandBase<?>> commands = composite.commands();
pending.addAll(commands);
- composite.handler.handle(Future.succeededFuture());
+ composite.handler.accept(null, null);
} else {
pending.add(cmd);
}
@@ -285,10 +296,10 @@ public abstract class SocketConnectionBase implements Connection {
private PrepareStatementCommand prepareCommand(ExtendedQueryCommand<?> queryCmd, boolean cache, boolean sendParameterTypes) {
PrepareStatementCommand prepareCmd = new PrepareStatementCommand(queryCmd.sql(), null, cache, sendParameterTypes ? queryCmd.parameterTypes() : null);
- prepareCmd.handler = ar -> {
+ prepareCmd.handler = (r, err) -> {
paused = false;
- if (ar.succeeded()) {
- PreparedStatement ps = ar.result();
+ if (err == null) {
+ PreparedStatement ps = r;
if (cache) {
cacheStatement(ps);
}
@@ -303,7 +314,7 @@ public abstract class SocketConnectionBase implements Connection {
ctx.flush();
}
} else {
- Throwable cause = ar.cause();
+ Throwable cause = err;
if (isIndeterminatePreparedStatementError(cause) && !sendParameterTypes) {
ChannelHandlerContext ctx = socket.channelHandlerContext();
// We cannot cache this prepared statement because it might be executed with another type
@@ -340,9 +351,9 @@ public abstract class SocketConnectionBase implements Connection {
if (psCache != null && psCache.isFull()) {
PreparedStatement evicted = psCache.evict();
CloseStatementCommand closeCmd = new CloseStatementCommand(evicted);
- closeCmd.handler = ar -> {
- if (ar.failed()) {
- logger.error("Error when closing cached prepared statement", ar.cause());
+ closeCmd.handler = (r, err) -> {
+ if (err != null) {
+ logger.error("Error when closing cached prepared statement", err);
}
};
return closeCmd;
diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/TransactionImpl.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/TransactionImpl.java
index d2e8193..8492478 100644
--- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/TransactionImpl.java
+++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/TransactionImpl.java
@@ -28,6 +28,8 @@ import io.vertx.sqlclient.internal.Connection;
import io.vertx.sqlclient.internal.command.CommandBase;
import io.vertx.sqlclient.internal.command.TxCommand;
+import java.util.function.BiConsumer;
+
public class TransactionImpl implements Transaction {
private final ContextInternal context;
@@ -59,18 +61,18 @@ public class TransactionImpl implements Transaction {
}
private <R> void execute(CommandBase<R> cmd) {
- Handler<AsyncResult<R>> handler = cmd.handler;
+ BiConsumer<R, Throwable> handler = cmd.handler;
connection.schedule(context, cmd).onComplete(handler);
}
- private <T> Handler<AsyncResult<T>> wrap(CommandBase<?> cmd, Promise<T> handler) {
- return ar -> {
+ private <T> BiConsumer<T, Throwable> wrap(CommandBase<?> cmd, Promise<T> handler) {
+ return (r, err) -> {
CommandBase<?> abc = cmd;
synchronized (TransactionImpl.this) {
pendingQueries--;
}
checkEnd();
- handler.handle(ar);
+ handler.handle(err == null ? Future.succeededFuture(r) : Future.failedFuture(err));
};
}
@@ -150,11 +152,11 @@ public class TransactionImpl implements Transaction {
private TxCommand<Void> txCommand(TxCommand.Kind kind) {
TxCommand<Void> cmd = new TxCommand<>(kind, null);
- cmd.handler = ar -> {
- if (ar.succeeded()) {
+ cmd.handler = (res, err) -> {
+ if (err == null) {
completion.complete(kind);
} else {
- completion.fail(ar.cause());
+ completion.fail(err);
}
};
return cmd;
diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java
index de4a84f..af71e8f 100644
--- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java
+++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java
@@ -150,7 +150,7 @@ public class SqlConnectionPool {
public void evict() {
long now = System.currentTimeMillis();
- pool.evict(conn -> conn.shouldEvict(now), ar -> {
+ pool.evict(conn -> conn.shouldEvict(now)).onComplete(ar -> {
if (ar.succeeded()) {
List<PooledConnection> res = ar.result();
for (PooledConnection conn : res) {
@@ -184,7 +184,7 @@ public class SqlConnectionPool {
public <R> Future<R> execute(ContextInternal context, CommandBase<R> cmd) {
Promise<Lease<PooledConnection>> p = context.promise();
Object metric = enqueueMetric();
- pool.acquire(context, 0, p);
+ pool.acquire(context, 0).onComplete(p);
return p.future().compose(lease -> {
dequeueMetric(metric);
PooledConnection pooled = lease.get();
@@ -204,7 +204,7 @@ public class SqlConnectionPool {
});
}
- public void acquire(ContextInternal context, long timeout, Handler<AsyncResult<PooledConnection>> handler) {
+ public void acquire(ContextInternal context, long timeout, Promise<PooledConnection> handler) {
class PoolRequest implements PoolWaiter.Listener<PooledConnection>, Handler<AsyncResult<Lease<PooledConnection>>> {
private final Object metric;
@@ -249,7 +249,7 @@ public class SqlConnectionPool {
public void onEnqueue(PoolWaiter<PooledConnection> waiter) {
if (timeout > 0L && timerID == -1L) {
timerID = context.setTimer(timeout, id -> {
- pool.cancel(waiter, ar -> {
+ pool.cancel(waiter).onComplete(ar -> {
if (ar.succeeded()) {
if (ar.result()) {
handler.handle(Future.failedFuture("Timeout"));
@@ -269,12 +269,12 @@ public class SqlConnectionPool {
}
Object metric = enqueueMetric();
PoolRequest request = new PoolRequest(metric);
- pool.acquire(context, request, 0, request);
+ pool.acquire(context, request, 0).onComplete(request);
}
public Future<Void> close() {
Promise<Void> promise = vertx.promise();
- pool.close(ar1 -> {
+ pool.close().onComplete(ar1 -> {
if (ar1.succeeded()) {
List<Future<Void>> results = ar1
.result()
diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/command/CommandBase.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/command/CommandBase.java
index c591c96..6c0b894 100644
--- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/command/CommandBase.java
+++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/command/CommandBase.java
@@ -21,13 +21,15 @@ import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
+import java.util.function.BiConsumer;
+
/**
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
public abstract class CommandBase<R> {
- public Handler<AsyncResult<R>> handler;
+ public BiConsumer<R, Throwable> handler;
public final void fail(Throwable err) {
complete(Future.failedFuture(err));
@@ -39,7 +41,7 @@ public abstract class CommandBase<R> {
public final void complete(AsyncResult<R> resp) {
if (handler != null) {
- handler.handle(resp);
+ handler.accept(resp.result(), resp.cause());
}
}
}
diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/command/CommandResponse.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/command/CommandResponse.java
index 85c123f..845f632 100644
--- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/command/CommandResponse.java
+++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/command/CommandResponse.java
@@ -43,7 +43,7 @@ public class CommandResponse<R> {
public void fire() {
if (cmd.handler != null) {
- cmd.handler.handle(toAsyncResult());
+ cmd.handler.accept(toAsyncResult().result(), toAsyncResult().cause());
}
}
}
diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/pool/PoolImpl.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/pool/PoolImpl.java
index 2fad432..921232e 100644
--- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/pool/PoolImpl.java
+++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/pool/PoolImpl.java
@@ -175,7 +175,7 @@ public class PoolImpl extends SqlClientBase implements Pool, Closeable {
return pool.execute(context, cmd);
}
- private void acquire(ContextInternal context, long timeout, Handler<AsyncResult<SqlConnectionPool.PooledConnection>> completionHandler) {
+ private void acquire(ContextInternal context, long timeout, Promise<SqlConnectionPool.PooledConnection> completionHandler) {
pool.acquire(context, timeout, completionHandler);
}
diff --git a/vertx-service-discovery-backend-consul/src/main/java/io/vertx/servicediscovery/backend/consul/ConsulBackendService.java b/vertx-service-discovery-backend-consul/src/main/java/io/vertx/servicediscovery/backend/consul/ConsulBackendService.java
index b24f98d..16feb8e 100644
--- a/vertx-service-discovery-backend-consul/src/main/java/io/vertx/servicediscovery/backend/consul/ConsulBackendService.java
+++ b/vertx-service-discovery-backend-consul/src/main/java/io/vertx/servicediscovery/backend/consul/ConsulBackendService.java
@@ -131,7 +131,13 @@ public class ConsulBackendService implements ServiceDiscoveryBackend {
@Override
public void getRecord(String uuid, Handler<AsyncResult<Record>> resultHandler) {
Promise<List<Record>> recordList = Promise.promise();
- getRecords(recordList);
+ getRecords(ar -> {
+ if (ar.succeeded()) {
+ recordList.complete(ar.result());
+ } else {
+ recordList.fail(ar.cause());
+ }
+ });
recordList.future().map(l -> l.stream().filter(r -> uuid.equals(r.getRegistration())).findFirst().orElse(null)).onComplete(resultHandler);
}
diff --git a/vertx-service-discovery-backend-zookeeper/src/main/java/io/vertx/servicediscovery/backend/zookeeper/ZookeeperBackendService.java b/vertx-service-discovery-backend-zookeeper/src/main/java/io/vertx/servicediscovery/backend/zookeeper/ZookeeperBackendService.java
index 359b5af..d4701c6 100644
--- a/vertx-service-discovery-backend-zookeeper/src/main/java/io/vertx/servicediscovery/backend/zookeeper/ZookeeperBackendService.java
+++ b/vertx-service-discovery-backend-zookeeper/src/main/java/io/vertx/servicediscovery/backend/zookeeper/ZookeeperBackendService.java
@@ -228,7 +228,13 @@ public class ZookeeperBackendService implements ServiceDiscoveryBackend, Connect
List<Future<Record>> futures = new ArrayList<>();
for (String child : children) {
Promise<Record> promise = Promise.promise();
- getRecord(child, promise);
+ getRecord(child, ar -> {
+ if (ar.succeeded()) {
+ promise.complete(ar.result());
+ } else {
+ promise.fail(ar.cause());
+ }
+ });
futures.add(promise.future());
}
diff --git a/vertx-service-discovery-bridge-kubernetes/src/main/java/io/vertx/servicediscovery/kubernetes/KubernetesServiceImporter.java b/vertx-service-discovery-bridge-kubernetes/src/main/java/io/vertx/servicediscovery/kubernetes/KubernetesServiceImporter.java
index ffc3d54..5cd8371 100644
--- a/vertx-service-discovery-bridge-kubernetes/src/main/java/io/vertx/servicediscovery/kubernetes/KubernetesServiceImporter.java
+++ b/vertx-service-discovery-bridge-kubernetes/src/main/java/io/vertx/servicediscovery/kubernetes/KubernetesServiceImporter.java
@@ -334,7 +334,7 @@ public class KubernetesServiceImporter implements ServiceImporter {
return result.onSuccess(tk -> this.token = tk).mapEmpty();
}
- private void publishRecord(Record record, Handler<AsyncResult<Record>> completionHandler) {
+ private void publishRecord(Record record, Promise<Record> completionHandler) {
publisher.publish(record).onComplete(ar -> {
if (completionHandler != null) {
completionHandler.handle(ar);
diff --git a/vertx-service-discovery/src/main/java/io/vertx/servicediscovery/impl/DiscoveryImpl.java b/vertx-service-discovery/src/main/java/io/vertx/servicediscovery/impl/DiscoveryImpl.java
index 59485ab..ea4d87a 100644
--- a/vertx-service-discovery/src/main/java/io/vertx/servicediscovery/impl/DiscoveryImpl.java
+++ b/vertx-service-discovery/src/main/java/io/vertx/servicediscovery/impl/DiscoveryImpl.java
@@ -170,7 +170,7 @@ public class DiscoveryImpl implements ServiceDiscovery, ServicePublisher {
}
public ServiceDiscovery registerServiceImporter(ServiceImporter importer, JsonObject configuration,
- Handler<AsyncResult<Void>> completionHandler) {
+ Promise<Void> completionHandler) {
JsonObject conf;
if (configuration == null) {
conf = new JsonObject();
@@ -216,7 +216,7 @@ public class DiscoveryImpl implements ServiceDiscovery, ServicePublisher {
}
public ServiceDiscovery registerServiceExporter(ServiceExporter exporter, JsonObject configuration,
- Handler<AsyncResult<Void>> completionHandler) {
+ Promise<Void> completionHandler) {
JsonObject conf;
if (configuration == null) {
conf = new JsonObject();
@@ -275,7 +275,7 @@ public class DiscoveryImpl implements ServiceDiscovery, ServicePublisher {
});
}
- public void publish(Record record, Handler<AsyncResult<Record>> resultHandler) {
+ public void publish(Record record, Promise<Record> resultHandler) {
Status status = record.getStatus() == null || record.getStatus() == Status.UNKNOWN
? Status.UP : record.getStatus();
@@ -305,7 +305,7 @@ public class DiscoveryImpl implements ServiceDiscovery, ServicePublisher {
return promise.future();
}
- public void unpublish(String id, Handler<AsyncResult<Void>> resultHandler) {
+ public void unpublish(String id, Promise<Void> resultHandler) {
backend.remove(id, record -> {
if (record.failed()) {
resultHandler.handle(Future.failedFuture(record.cause()));
@@ -334,7 +334,7 @@ public class DiscoveryImpl implements ServiceDiscovery, ServicePublisher {
}
public void getRecord(JsonObject filter,
- Handler<AsyncResult<Record>> resultHandler) {
+ Promise<Record> resultHandler) {
boolean includeOutOfService = false;
Function<Record, Boolean> accept;
if (filter == null) {
@@ -354,8 +354,14 @@ public class DiscoveryImpl implements ServiceDiscovery, ServicePublisher {
return promise.future();
}
- public void getRecord(String id, Handler<AsyncResult<@Nullable Record>> resultHandler) {
- backend.getRecord(id, resultHandler);
+ public void getRecord(String id, Promise<@Nullable Record> resultHandler) {
+ backend.getRecord(id, ar -> {
+ if (ar.succeeded()) {
+ resultHandler.complete(ar.result());
+ } else {
+ resultHandler.fail(ar.cause());
+ }
+ });
}
@Override
@@ -365,7 +371,7 @@ public class DiscoveryImpl implements ServiceDiscovery, ServicePublisher {
return promise.future();
}
- public void getRecord(Function<Record, Boolean> filter, Handler<AsyncResult<Record>> resultHandler) {
+ public void getRecord(Function<Record, Boolean> filter, Promise<Record> resultHandler) {
getRecord(filter, false, resultHandler);
}
@@ -376,8 +382,7 @@ public class DiscoveryImpl implements ServiceDiscovery, ServicePublisher {
return promise.future();
}
- public void getRecord(Function<Record, Boolean> filter, boolean includeOutOfService, Handler<AsyncResult<Record>>
- resultHandler) {
+ public void getRecord(Function<Record, Boolean> filter, boolean includeOutOfService, Promise<Record> resultHandler) {
Objects.requireNonNull(filter);
backend.getRecords(list -> {
if (list.failed()) {
@@ -403,7 +408,7 @@ public class DiscoveryImpl implements ServiceDiscovery, ServicePublisher {
return promise.future();
}
- public void getRecords(JsonObject filter, Handler<AsyncResult<List<Record>>> resultHandler) {
+ public void getRecords(JsonObject filter, Promise<List<Record>> resultHandler) {
boolean includeOutOfService = false;
Function<Record, Boolean> accept;
if (filter == null) {
@@ -423,7 +428,7 @@ public class DiscoveryImpl implements ServiceDiscovery, ServicePublisher {
return promise.future();
}
- public void getRecords(Function<Record, Boolean> filter, Handler<AsyncResult<List<Record>>> resultHandler) {
+ public void getRecords(Function<Record, Boolean> filter, Promise<List<Record>> resultHandler) {
getRecords(filter, false, resultHandler);
}
@@ -434,7 +439,7 @@ public class DiscoveryImpl implements ServiceDiscovery, ServicePublisher {
return promise.future();
}
- public void getRecords(Function<Record, Boolean> filter, boolean includeOutOfService, Handler<AsyncResult<List<Record>>> resultHandler) {
+ public void getRecords(Function<Record, Boolean> filter, boolean includeOutOfService, Promise<List<Record>> resultHandler) {
Objects.requireNonNull(filter);
backend.getRecords(list -> {
if (list.failed()) {
@@ -457,7 +462,7 @@ public class DiscoveryImpl implements ServiceDiscovery, ServicePublisher {
return promise.future();
}
- public void update(Record record, Handler<AsyncResult<Record>> resultHandler) {
+ public void update(Record record, Promise<Record> resultHandler) {
backend.update(record, ar -> {
if (ar.failed()) {
resultHandler.handle(Future.failedFuture(ar.cause()));
diff --git a/src/main/java/io/vertx/cassandra/impl/ResultSetImpl.java b/src/main/java/io/vertx/cassandra/impl/ResultSetImpl.java
index 2309179..1d07231 100644
--- a/src/main/java/io/vertx/cassandra/impl/ResultSetImpl.java
+++ b/src/main/java/io/vertx/cassandra/impl/ResultSetImpl.java
@@ -94,7 +94,7 @@ public class ResultSetImpl implements ResultSet {
return resultSetRef.get().wasApplied();
}
- private void loadMore(Context context, List<Row> loaded, Handler<AsyncResult<List<Row>>> handler) {
+ private void loadMore(Context context, List<Row> loaded, Promise<List<Row>> handler) {
int availableWithoutFetching = resultSetRef.get().remaining();
List<Row> rows = new ArrayList<>(loaded.size() + availableWithoutFetching);
rows.addAll(loaded);
diff --git a/src/test/java/io/vertx/circuitbreaker/impl/CircuitBreakerImplTest.java b/src/test/java/io/vertx/circuitbreaker/impl/CircuitBreakerImplTest.java
index bb62cef..07b7f11 100644
--- a/src/test/java/io/vertx/circuitbreaker/impl/CircuitBreakerImplTest.java
+++ b/src/test/java/io/vertx/circuitbreaker/impl/CircuitBreakerImplTest.java
@@ -787,7 +787,7 @@ public class CircuitBreakerImplTest {
List<AsyncResult<String>> results = new ArrayList<>();
for (int i = 0; i < options.getMaxFailures(); i++) {
breaker.<String>execute(future -> future.fail("expected failure"))
- .onComplete(results::add);
+ .onComplete(ar -> results.add(ar));
}
await().until(() -> results.size() == options.getMaxFailures());
results.forEach(ar -> {
@@ -799,7 +799,7 @@ public class CircuitBreakerImplTest {
await().until(() -> breaker.state() == CircuitBreakerState.OPEN);
breaker.<String>execute(future -> future.fail("expected failure"))
- .onComplete(results::add);
+ .onComplete(ar -> results.add(ar));
await().until(() -> results.size() == 1);
results.forEach(ar -> {
assertThat(ar.failed()).isTrue();
@@ -820,7 +820,7 @@ public class CircuitBreakerImplTest {
// Ignored.
}
})
- .onComplete(results::add);
+ .onComplete(ar -> results.add(ar));
await().until(() -> results.size() == 1);
results.forEach(ar -> {
assertThat(ar.failed()).isTrue();
@@ -845,7 +845,7 @@ public class CircuitBreakerImplTest {
t -> {
throw new RuntimeException("boom");
})
- .onComplete(results::add);
+ .onComplete(ar -> results.add(ar));
}
await().until(() -> results.size() == options.getMaxFailures());
results.forEach(ar -> {
@@ -861,7 +861,7 @@ public class CircuitBreakerImplTest {
t -> {
throw new RuntimeException("boom");
})
- .onComplete(results::add);
+ .onComplete(ar -> results.add(ar));
await().until(() -> results.size() == 1);
results.forEach(ar -> {
assertThat(ar.failed()).isTrue();
diff --git a/src/test/java/io/vertx/circuitbreaker/impl/MyAsyncOperations.java b/src/test/java/io/vertx/circuitbreaker/impl/MyAsyncOperations.java
index 0f5021a..f445e69 100644
--- a/src/test/java/io/vertx/circuitbreaker/impl/MyAsyncOperations.java
+++ b/src/test/java/io/vertx/circuitbreaker/impl/MyAsyncOperations.java
@@ -27,7 +27,7 @@ import io.vertx.core.Promise;
*/
public class MyAsyncOperations {
- public static void operation(int a, int b, Handler<AsyncResult<Integer>> handler) {
+ public static void operation(int a, int b, Promise<Integer> handler) {
handler.handle(Future.succeededFuture(a + b));
}
diff --git a/src/test/java/io/vertx/circuitbreaker/impl/UsageTest.java b/src/test/java/io/vertx/circuitbreaker/impl/UsageTest.java
index 91b921a..5b93da3 100644
--- a/src/test/java/io/vertx/circuitbreaker/impl/UsageTest.java
+++ b/src/test/java/io/vertx/circuitbreaker/impl/UsageTest.java
@@ -3,10 +3,7 @@ package io.vertx.circuitbreaker.impl;
import com.github.tomakehurst.wiremock.junit.WireMockRule;
import io.vertx.circuitbreaker.CircuitBreaker;
import io.vertx.circuitbreaker.CircuitBreakerOptions;
-import io.vertx.core.AsyncResult;
-import io.vertx.core.Future;
-import io.vertx.core.Handler;
-import io.vertx.core.Vertx;
+import io.vertx.core.*;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
@@ -161,7 +158,7 @@ public class UsageTest {
private List<String> items = new ArrayList<>();
- public void asyncWrite(String content, Scenario scenario, Handler<AsyncResult<Void>> resultHandler) {
+ public void asyncWrite(String content, Scenario scenario, Promise<Void> resultHandler) {
long random = (long) (Math.random() * 1000);
switch (scenario) {
case TIMEOUT:
diff --git a/src/main/java/io/vertx/ext/shell/command/impl/CommandRegistryImpl.java b/src/main/java/io/vertx/ext/shell/command/impl/CommandRegistryImpl.java
index 1491aec..f4a734b 100644
--- a/src/main/java/io/vertx/ext/shell/command/impl/CommandRegistryImpl.java
+++ b/src/main/java/io/vertx/ext/shell/command/impl/CommandRegistryImpl.java
@@ -40,6 +40,7 @@ import io.vertx.ext.shell.command.CommandRegistry;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiConsumer;
import java.util.stream.Collectors;
/**
@@ -90,7 +91,7 @@ public class CommandRegistryImpl implements CommandRegistry {
return registerCommand(Command.create(vertx, command));
}
- public CommandRegistry registerCommand(Class<? extends AnnotatedCommand> command, Handler<AsyncResult<Command>> completionHandler) {
+ public CommandRegistry registerCommand(Class<? extends AnnotatedCommand> command, Promise<Command> completionHandler) {
return registerCommand(Command.create(vertx, command), completionHandler);
}
@@ -101,13 +102,13 @@ public class CommandRegistryImpl implements CommandRegistry {
return promise.future();
}
- public CommandRegistry registerCommand(Command command, Handler<AsyncResult<Command>> completionHandler) {
- return registerCommands(Collections.singletonList(command), ar -> {
+ public CommandRegistry registerCommand(Command command, Promise<Command> completionHandler) {
+ return registerCommands(Collections.singletonList(command), (res, err) -> {
if (completionHandler != null) {
- if (ar.succeeded()) {
- completionHandler.handle(Future.succeededFuture(ar.result().get(0)));
+ if (err == null) {
+ completionHandler.handle(Future.succeededFuture(res.get(0)));
} else {
- completionHandler.handle(Future.failedFuture(ar.cause()));
+ completionHandler.handle(Future.failedFuture(err));
}
}
});
@@ -120,7 +121,7 @@ public class CommandRegistryImpl implements CommandRegistry {
return promise.future();
}
- public CommandRegistry registerCommands(List<Command> commands, Handler<AsyncResult<List<Command>>> doneHandler) {
+ public CommandRegistry registerCommands(List<Command> commands, BiConsumer<List<Command>, Throwable> doneHandler) {
if (closed) {
throw new IllegalStateException();
}
@@ -153,9 +154,9 @@ public class CommandRegistryImpl implements CommandRegistry {
filter(reg -> ar.result().equals(reg.deploymendID)).
map(reg -> reg.command).
collect(Collectors.toList());
- doneHandler.handle(Future.succeededFuture(regs));
+ doneHandler.accept(regs, null);
} else {
- doneHandler.handle(Future.failedFuture(ar.cause()));
+ doneHandler.accept(null, ar.cause());
}
});
return this;
@@ -168,7 +169,7 @@ public class CommandRegistryImpl implements CommandRegistry {
return promise.future();
}
- public CommandRegistry unregisterCommand(String name, Handler<AsyncResult<Void>> completionHandler) {
+ public CommandRegistry unregisterCommand(String name, Promise<Void> completionHandler) {
if (closed) {
throw new IllegalStateException();
}
diff --git a/src/main/java/io/vertx/ext/shell/impl/ShellServiceImpl.java b/src/main/java/io/vertx/ext/shell/impl/ShellServiceImpl.java
index f30f69a..d93ef17 100644
--- a/src/main/java/io/vertx/ext/shell/impl/ShellServiceImpl.java
+++ b/src/main/java/io/vertx/ext/shell/impl/ShellServiceImpl.java
@@ -118,7 +118,7 @@ public class ShellServiceImpl implements ShellService {
return p.future();
}
- private void startServer(List<CommandResolver> resolvers, Handler<AsyncResult<Void>> startHandler) {
+ private void startServer(List<CommandResolver> resolvers, Promise<Void> startHandler) {
TelnetTermOptions telnetOptions = options.getTelnetOptions();
SSHTermOptions sshOptions = options.getSSHOptions();
HttpTermOptions webOptions = options.getHttpOptions();
diff --git a/src/main/java/io/vertx/ext/shell/term/impl/HttpTermServer.java b/src/main/java/io/vertx/ext/shell/term/impl/HttpTermServer.java
index c156bb8..6a3b797 100644
--- a/src/main/java/io/vertx/ext/shell/term/impl/HttpTermServer.java
+++ b/src/main/java/io/vertx/ext/shell/term/impl/HttpTermServer.java
@@ -33,10 +33,7 @@
package io.vertx.ext.shell.term.impl;
import io.termd.core.readline.Keymap;
-import io.vertx.core.AsyncResult;
-import io.vertx.core.Future;
-import io.vertx.core.Handler;
-import io.vertx.core.Vertx;
+import io.vertx.core.*;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpServer;
import io.vertx.ext.auth.authentication.AuthenticationProvider;
@@ -87,7 +84,7 @@ public class HttpTermServer implements TermServer {
return this;
}
- public TermServer listen(Handler<AsyncResult<Void>> listenHandler) {
+ public TermServer listen(Promise<Void> listenHandler) {
Charset charset = Charset.forName(options.getCharset());
@@ -169,7 +166,7 @@ public class HttpTermServer implements TermServer {
return Future.future(this::close);
}
- public void close(Handler<AsyncResult<Void>> completionHandler) {
+ public void close(Promise<Void> completionHandler) {
if (server != null) {
server.close()
.onComplete(completionHandler);
diff --git a/src/main/java/io/vertx/ext/shell/term/impl/SSHServer.java b/src/main/java/io/vertx/ext/shell/term/impl/SSHServer.java
index ab314ae..c31c19a 100644
--- a/src/main/java/io/vertx/ext/shell/term/impl/SSHServer.java
+++ b/src/main/java/io/vertx/ext/shell/term/impl/SSHServer.java
@@ -117,7 +117,7 @@ public class SSHServer implements TermServer {
return this;
}
- public SSHServer listen(Handler<AsyncResult<Void>> listenHandler) {
+ public SSHServer listen(Promise<Void> listenHandler) {
if (!status.compareAndSet(STATUS_STOPPED, STATUS_STARTING)) {
listenHandler.handle(Future.failedFuture("Invalid state:" + status.get()));
return this;
@@ -219,7 +219,7 @@ public class SSHServer implements TermServer {
return nativeServer.getPort();
}
- public void close(Handler<AsyncResult<Void>> completionHandler) {
+ public void close(Promise<Void> completionHandler) {
if (!status.compareAndSet(STATUS_STARTED, STATUS_STOPPING)) {
completionHandler.handle(Future.failedFuture("Invalid state:" + status.get()));
return;
diff --git a/src/main/java/io/vertx/ext/shell/term/impl/TelnetTermServer.java b/src/main/java/io/vertx/ext/shell/term/impl/TelnetTermServer.java
index 630b55d..298aae2 100644
--- a/src/main/java/io/vertx/ext/shell/term/impl/TelnetTermServer.java
+++ b/src/main/java/io/vertx/ext/shell/term/impl/TelnetTermServer.java
@@ -34,10 +34,7 @@ package io.vertx.ext.shell.term.impl;
import io.termd.core.readline.Keymap;
import io.termd.core.telnet.TelnetTtyConnection;
-import io.vertx.core.AsyncResult;
-import io.vertx.core.Future;
-import io.vertx.core.Handler;
-import io.vertx.core.Vertx;
+import io.vertx.core.*;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetServer;
import io.vertx.ext.auth.authentication.AuthenticationProvider;
@@ -76,7 +73,7 @@ public class TelnetTermServer implements TermServer {
return this;
}
- public TermServer listen(Handler<AsyncResult<Void>> listenHandler) {
+ public TermServer listen(Promise<Void> listenHandler) {
Charset charset = Charset.forName(options.getCharset());
if (server == null) {
server = vertx.createNetServer(options);
@@ -104,7 +101,7 @@ public class TelnetTermServer implements TermServer {
return this;
}
- public void close(Handler<AsyncResult<Void>> completionHandler) {
+ public void close(Promise<Void> completionHandler) {
if (server != null) {
server.close()
.onComplete(completionHandler);
diff --git a/src/test/java/io/vertx/ext/shell/support/TestTermServer.java b/src/test/java/io/vertx/ext/shell/support/TestTermServer.java
index b48e1a6..6cf17c6 100644
--- a/src/test/java/io/vertx/ext/shell/support/TestTermServer.java
+++ b/src/test/java/io/vertx/ext/shell/support/TestTermServer.java
@@ -33,10 +33,7 @@
package io.vertx.ext.shell.support;
import io.termd.core.tty.TtyConnection;
-import io.vertx.core.AsyncResult;
-import io.vertx.core.Future;
-import io.vertx.core.Handler;
-import io.vertx.core.Vertx;
+import io.vertx.core.*;
import io.vertx.ext.auth.authentication.AuthenticationProvider;
import io.vertx.ext.shell.term.Term;
import io.vertx.ext.shell.term.TermServer;
@@ -73,7 +70,7 @@ public class TestTermServer implements TermServer {
throw new UnsupportedOperationException();
}
- public TermServer listen(Handler<AsyncResult<Void>> listenHandler) {
+ public TermServer listen(Promise<Void> listenHandler) {
listenHandler.handle(Future.succeededFuture());
return this;
}
@@ -83,7 +80,7 @@ public class TestTermServer implements TermServer {
throw new UnsupportedOperationException();
}
- public void close(Handler<AsyncResult<Void>> completionHandler) {
+ public void close(Promise<Void> completionHandler) {
completionHandler.handle(Future.succeededFuture());
}
diff --git a/src/main/java/io/vertx/ext/stomp/impl/DefaultStompHandler.java b/src/main/java/io/vertx/ext/stomp/impl/DefaultStompHandler.java
index c1594e2..25c39d3 100644
--- a/src/main/java/io/vertx/ext/stomp/impl/DefaultStompHandler.java
+++ b/src/main/java/io/vertx/ext/stomp/impl/DefaultStompHandler.java
@@ -16,11 +16,7 @@
package io.vertx.ext.stomp.impl;
-import io.vertx.core.AsyncResult;
-import io.vertx.core.Context;
-import io.vertx.core.Future;
-import io.vertx.core.Handler;
-import io.vertx.core.Vertx;
+import io.vertx.core.*;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.internal.PromiseInternal;
import io.vertx.core.internal.logging.Logger;
@@ -420,7 +416,7 @@ public class DefaultStompHandler implements StompServerHandler {
public StompServerHandler onAuthenticationRequest(StompServerConnection connection,
String login, String passcode,
- Handler<AsyncResult<Boolean>> handler) {
+ Promise<Boolean> handler) {
final AuthenticationProvider auth;
synchronized (this) {
// Stack contention.
diff --git a/src/main/java/io/vertx/ext/stomp/impl/StompClientConnectionImpl.java b/src/main/java/io/vertx/ext/stomp/impl/StompClientConnectionImpl.java
index 35ff102..9fcd5b8 100644
--- a/src/main/java/io/vertx/ext/stomp/impl/StompClientConnectionImpl.java
+++ b/src/main/java/io/vertx/ext/stomp/impl/StompClientConnectionImpl.java
@@ -224,7 +224,7 @@ public class StompClientConnectionImpl implements StompClientConnection, Handler
return promise.future();
}
- public synchronized StompClientConnection send(Frame frame, Handler<AsyncResult<Frame>> receiptHandler) {
+ public synchronized StompClientConnection send(Frame frame, Promise<Frame> receiptHandler) {
if (receiptHandler != null && frame.getCommand() != Command.PING) {
String receiptId = UUID.randomUUID().toString();
frame.addHeader(Frame.RECEIPT, receiptId);
@@ -252,7 +252,7 @@ public class StompClientConnectionImpl implements StompClientConnection, Handler
}
public StompClientConnection send(String destination, Map<String, String> headers, Buffer body,
- Handler<AsyncResult<Frame>> receiptHandler) {
+ Promise<Frame> receiptHandler) {
// No need for synchronization, no field access, except client (final)
if (headers == null) {
headers = new Headers();
@@ -282,7 +282,7 @@ public class StompClientConnectionImpl implements StompClientConnection, Handler
return promise.future();
}
- public StompClientConnection subscribe(String destination, Handler<Frame> handler, Handler<AsyncResult<String>> receiptHandler) {
+ public StompClientConnection subscribe(String destination, Handler<Frame> handler, Promise<String> receiptHandler) {
return subscribe(destination, null, handler, receiptHandler);
}
@@ -293,7 +293,7 @@ public class StompClientConnectionImpl implements StompClientConnection, Handler
return promise.future();
}
- public synchronized StompClientConnection subscribe(String destination, Map<String, String> headers, Handler<Frame> handler, Handler<AsyncResult<String>> receiptHandler) {
+ public synchronized StompClientConnection subscribe(String destination, Map<String, String> headers, Handler<Frame> handler, Promise<String> receiptHandler) {
Objects.requireNonNull(destination);
Objects.requireNonNull(handler);
@@ -319,7 +319,7 @@ public class StompClientConnectionImpl implements StompClientConnection, Handler
}
Frame frame = new Frame(Command.SUBSCRIBE, headers, null);
- send(frame, ar -> {
+ Future.<Frame>future(p -> send(frame, p)).onComplete(ar -> {
if (receiptHandler != null) {
receiptHandler.handle(ar.map(id));
}
@@ -342,7 +342,7 @@ public class StompClientConnectionImpl implements StompClientConnection, Handler
return promise.future();
}
- public synchronized StompClientConnection unsubscribe(String destination, Map<String, String> headers, Handler<AsyncResult<Frame>>
+ public synchronized StompClientConnection unsubscribe(String destination, Map<String, String> headers, Promise<Frame>
receiptHandler) {
Objects.requireNonNull(destination);
if (headers == null) {
@@ -382,7 +382,7 @@ public class StompClientConnectionImpl implements StompClientConnection, Handler
return this;
}
- public StompClientConnection beginTX(String id, Handler<AsyncResult<Frame>> receiptHandler) {
+ public StompClientConnection beginTX(String id, Promise<Frame> receiptHandler) {
return beginTX(id, new Headers(), receiptHandler);
}
@@ -398,7 +398,7 @@ public class StompClientConnectionImpl implements StompClientConnection, Handler
return promise.future();
}
- public StompClientConnection beginTX(String id, Map<String, String> headers, Handler<AsyncResult<Frame>> receiptHandler) {
+ public StompClientConnection beginTX(String id, Map<String, String> headers, Promise<Frame> receiptHandler) {
Objects.requireNonNull(id);
Objects.requireNonNull(headers);
@@ -410,7 +410,7 @@ public class StompClientConnectionImpl implements StompClientConnection, Handler
return commit(id, new Headers());
}
- public StompClientConnection commit(String id, Handler<AsyncResult<Frame>> receiptHandler) {
+ public StompClientConnection commit(String id, Promise<Frame> receiptHandler) {
return commit(id, new Headers(), receiptHandler);
}
@@ -421,7 +421,7 @@ public class StompClientConnectionImpl implements StompClientConnection, Handler
return promise.future();
}
- public StompClientConnection commit(String id, Map<String, String> headers, Handler<AsyncResult<Frame>> receiptHandler) {
+ public StompClientConnection commit(String id, Map<String, String> headers, Promise<Frame> receiptHandler) {
Objects.requireNonNull(id);
Objects.requireNonNull(headers);
return send(new Frame().setCommand(Command.COMMIT).setTransaction(id), receiptHandler);
@@ -432,7 +432,7 @@ public class StompClientConnectionImpl implements StompClientConnection, Handler
return abort(id, new Headers());
}
- public StompClientConnection abort(String id, Handler<AsyncResult<Frame>> receiptHandler) {
+ public StompClientConnection abort(String id, Promise<Frame> receiptHandler) {
return abort(id, new Headers(), receiptHandler);
}
@@ -443,7 +443,7 @@ public class StompClientConnectionImpl implements StompClientConnection, Handler
return promise.future();
}
- public StompClientConnection abort(String id, Map<String, String> headers, Handler<AsyncResult<Frame>> receiptHandler) {
+ public StompClientConnection abort(String id, Map<String, String> headers, Promise<Frame> receiptHandler) {
Objects.requireNonNull(id);
Objects.requireNonNull(headers);
return send(new Frame().setCommand(Command.ABORT).setTransaction(id), receiptHandler);
@@ -463,16 +463,17 @@ public class StompClientConnectionImpl implements StompClientConnection, Handler
return promise.future();
}
- public StompClientConnection disconnect(Handler<AsyncResult<Frame>> receiptHandler) {
+ public StompClientConnection disconnect(Promise<Frame> receiptHandler) {
return disconnect(new Frame().setCommand(Command.DISCONNECT), receiptHandler);
}
- public StompClientConnection disconnect(Frame frame, Handler<AsyncResult<Frame>> receiptHandler) {
+ public StompClientConnection disconnect(Frame frame, Promise<Frame> receiptHandler) {
Objects.requireNonNull(frame);
synchronized (this) {
if (status == Status.CONNECTED) {
status = Status.CLOSING;
- send(frame, f -> {
+ Future.<Frame>future(p -> send(frame, p))
+ .onComplete(f -> {
if (receiptHandler != null) {
receiptHandler.handle(f);
}
@@ -493,7 +494,7 @@ public class StompClientConnectionImpl implements StompClientConnection, Handler
return promise.future();
}
- public StompClientConnection ack(String id, Handler<AsyncResult<Frame>> receiptHandler) {
+ public StompClientConnection ack(String id, Promise<Frame> receiptHandler) {
Objects.requireNonNull(id);
send(new Frame(Command.ACK, Headers.create(Frame.ID, id), null), receiptHandler);
return this;
@@ -505,7 +506,7 @@ public class StompClientConnectionImpl implements StompClientConnection, Handler
return promise.future();
}
- public StompClientConnection nack(String id, Handler<AsyncResult<Frame>> receiptHandler) {
+ public StompClientConnection nack(String id, Promise<Frame> receiptHandler) {
Objects.requireNonNull(id);
send(new Frame(Command.NACK, Headers.create(Frame.ID, id), null), receiptHandler);
return this;
@@ -518,7 +519,7 @@ public class StompClientConnectionImpl implements StompClientConnection, Handler
return promise.future();
}
- public StompClientConnection ack(String id, String txId, Handler<AsyncResult<Frame>> receiptHandler) {
+ public StompClientConnection ack(String id, String txId, Promise<Frame> receiptHandler) {
Objects.requireNonNull(id, "A ACK frame must contain the ACK id");
Objects.requireNonNull(txId);
@@ -534,7 +535,7 @@ public class StompClientConnectionImpl implements StompClientConnection, Handler
return promise.future();
}
- public StompClientConnection nack(String id, String txId, Handler<AsyncResult<Frame>> receiptHandler) {
+ public StompClientConnection nack(String id, String txId, Promise<Frame> receiptHandler) {
Objects.requireNonNull(id, "A NACK frame must contain the ACK id");
Objects.requireNonNull(txId);
diff --git a/src/main/java/io/vertx/ext/stomp/impl/StompClientImpl.java b/src/main/java/io/vertx/ext/stomp/impl/StompClientImpl.java
index 7df05c4..9511bdf 100644
--- a/src/main/java/io/vertx/ext/stomp/impl/StompClientImpl.java
+++ b/src/main/java/io/vertx/ext/stomp/impl/StompClientImpl.java
@@ -55,7 +55,7 @@ public class StompClientImpl implements StompClient {
this.client = (NetClientInternal) vertx.createNetClient(options);
}
- public StompClient connect(Handler<AsyncResult<StompClientConnection>> resultHandler) {
+ public StompClient connect(Promise<StompClientConnection> resultHandler) {
return connect(options.getPort(), options.getHost(), resultHandler);
}
@@ -142,7 +142,7 @@ public class StompClientImpl implements StompClient {
return client == null;
}
- public synchronized StompClient connect(int port, String host, Handler<AsyncResult<StompClientConnection>> resultHandler) {
+ public synchronized StompClient connect(int port, String host, Promise<StompClientConnection> resultHandler) {
Handler<Frame> r = receivedFrameHandler;
Handler<Frame> w = writingFrameHandler;
diff --git a/src/main/java/io/vertx/ext/stomp/impl/StompServerImpl.java b/src/main/java/io/vertx/ext/stomp/impl/StompServerImpl.java
index 86044a8..020573b 100644
--- a/src/main/java/io/vertx/ext/stomp/impl/StompServerImpl.java
+++ b/src/main/java/io/vertx/ext/stomp/impl/StompServerImpl.java
@@ -81,7 +81,7 @@ public class StompServerImpl implements StompServer {
return promise.future();
}
- public StompServer listen(Handler<AsyncResult<StompServer>> handler) {
+ public StompServer listen(Promise<StompServer> handler) {
return listen(options.getPort(), options.getHost(), handler);
}
@@ -97,7 +97,7 @@ public class StompServerImpl implements StompServer {
return promise.future();
}
- public StompServer listen(int port, String host, Handler<AsyncResult<StompServer>> handler) {
+ public StompServer listen(int port, String host, Promise<StompServer> handler) {
if (port == -1) {
handler.handle(Future.failedFuture("TCP server disabled. The port is set to '-1'."));
return this;
@@ -202,7 +202,7 @@ public class StompServerImpl implements StompServer {
return handler;
}
- public void close(Handler<AsyncResult<Void>> done) {
+ public void close(Promise<Void> done) {
if (!listening) {
if (done != null) {
vertx.runOnContext((v) -> done.handle(Future.succeededFuture()));
diff --git a/src/test/java/io/vertx/ext/stomp/impl/ReceiptTest.java b/src/test/java/io/vertx/ext/stomp/impl/ReceiptTest.java
index 1eb26f7..c15d6c1 100644
--- a/src/test/java/io/vertx/ext/stomp/impl/ReceiptTest.java
+++ b/src/test/java/io/vertx/ext/stomp/impl/ReceiptTest.java
@@ -83,7 +83,7 @@ public class ReceiptTest {
List<AsyncResult<?>> receipts = new CopyOnWriteArrayList<>();
client((ar -> {
final StompClientConnection connection = ar.result();
- connection.subscribe("/queue", frames::add).onComplete(receipts::add);
+ connection.subscribe("/queue", frames::add).onComplete(a -> receipts.add(a));
}));
Awaitility.waitAtMost(10, TimeUnit.SECONDS).until(() -> Helper.hasDestination(server.stompHandler().getDestinations(), "/queue"));
@@ -93,7 +93,7 @@ public class ReceiptTest {
client((ar -> {
final StompClientConnection connection = ar.result();
- connection.send("/queue", Buffer.buffer("Hello")).onComplete(receipts::add);
+ connection.send("/queue", Buffer.buffer("Hello")).onComplete(a -> receipts.add(a));
}));
Awaitility.waitAtMost(10, TimeUnit.SECONDS).until(() -> !frames.isEmpty());
@@ -108,25 +108,25 @@ public class ReceiptTest {
AtomicReference<StompClientConnection> client = new AtomicReference<>();
client((ar -> {
final StompClientConnection connection = ar.result();
- connection.subscribe("/queue", frames::add).onComplete(receipts::add);
+ connection.subscribe("/queue", frames::add).onComplete(a -> receipts.add(a));
}));
client((ar -> {
final StompClientConnection connection = ar.result();
client.set(connection);
- connection.subscribe("/queue", frames::add).onComplete(receipts::add);
+ connection.subscribe("/queue", frames::add).onComplete(a -> receipts.add(a));
}));
Awaitility.waitAtMost(10, TimeUnit.SECONDS).until(() -> Helper.hasDestination(server.stompHandler().getDestinations(), "/queue"));
client((ar -> {
final StompClientConnection connection = ar.result();
- connection.send("/queue", Buffer.buffer("Hello")).onComplete(receipts::add);
+ connection.send("/queue", Buffer.buffer("Hello")).onComplete(a -> receipts.add(a));
}));
Awaitility.waitAtMost(10, TimeUnit.SECONDS).until(() -> frames.size() >= 2);
Awaitility.waitAtMost(10, TimeUnit.SECONDS).until(() -> receipts.size() == 3);
- client.get().unsubscribe("/queue").onComplete(receipts::add);
+ client.get().unsubscribe("/queue").onComplete(a -> receipts.add(a));
Awaitility.waitAtMost(10, TimeUnit.SECONDS).until(() -> receipts.size() == 4);
}
@@ -136,14 +136,14 @@ public class ReceiptTest {
client((ar -> {
final StompClientConnection connection = ar.result();
connection.subscribe("/queue", Headers.create(Frame.ACK, "client"),
- frame -> connection.ack(frame.getAck())).onComplete(receipts::add);
+ frame -> connection.ack(frame.getAck())).onComplete(a -> receipts.add(a));
}));
Awaitility.waitAtMost(10, TimeUnit.SECONDS).until(() -> Helper.hasDestination(server.stompHandler().getDestinations(), "/queue"));
client((ar -> {
final StompClientConnection connection = ar.result();
- connection.send("/queue", Buffer.buffer("Hello")).onComplete(receipts::add);
+ connection.send("/queue", Buffer.buffer("Hello")).onComplete(a -> receipts.add(a));
}));
Awaitility.waitAtMost(10, TimeUnit.SECONDS).until(() -> receipts.size() == 2);
@@ -155,14 +155,14 @@ public class ReceiptTest {
client((ar -> {
final StompClientConnection connection = ar.result();
connection.subscribe("/queue", Headers.create(Frame.ACK, "client"),
- frame -> connection.nack(frame.getAck())).onComplete(receipts::add);
+ frame -> connection.nack(frame.getAck())).onComplete(a -> receipts.add(a));
}));
Awaitility.waitAtMost(10, TimeUnit.SECONDS).until(() -> Helper.hasDestination(server.stompHandler().getDestinations(), "/queue"));
client((ar -> {
final StompClientConnection connection = ar.result();
- connection.send("/queue", Buffer.buffer("Hello")).onComplete(receipts::add);
+ connection.send("/queue", Buffer.buffer("Hello")).onComplete(a -> receipts.add(a));
}));
Awaitility.waitAtMost(10, TimeUnit.SECONDS).until(() -> receipts.size() == 2);
@@ -175,7 +175,7 @@ public class ReceiptTest {
List<Frame> errors = new CopyOnWriteArrayList<>();
client((ar -> {
final StompClientConnection connection = ar.result();
- connection.subscribe("/queue", frames::add).onComplete(receipts::add);
+ connection.subscribe("/queue", frames::add).onComplete(a -> receipts.add(a));
}));
Awaitility.waitAtMost(10, TimeUnit.SECONDS).until(() -> receipts.size() == 1);
@@ -183,14 +183,14 @@ public class ReceiptTest {
client((ar -> {
final StompClientConnection connection = ar.result();
connection.errorHandler(errors::add);
- connection.beginTX("my-tx").onComplete(receipts::add);
+ connection.beginTX("my-tx").onComplete(a -> receipts.add(a));
connection.send(new Frame().setCommand(Command.SEND).setDestination("/queue").setTransaction("my-tx")
- .setBody(Buffer.buffer("Hello"))).onComplete(receipts::add);
+ .setBody(Buffer.buffer("Hello"))).onComplete(a -> receipts.add(a));
connection.send(new Frame().setCommand(Command.SEND).setDestination("/queue").setTransaction("my-tx").setBody(
- Buffer.buffer("World"))).onComplete(receipts::add);
+ Buffer.buffer("World"))).onComplete(a -> receipts.add(a));
connection.send(new Frame().setCommand(Command.SEND).setDestination("/queue").setTransaction("my-tx")
- .setBody(Buffer.buffer("!!!"))).onComplete(receipts::add);
- connection.commit("my-tx").onComplete(receipts::add);
+ .setBody(Buffer.buffer("!!!"))).onComplete(a -> receipts.add(a));
+ connection.commit("my-tx").onComplete(a -> receipts.add(a));
}));
Awaitility.waitAtMost(10, TimeUnit.SECONDS).until(() -> frames.size() == 3 && errors.isEmpty()
@@ -205,7 +205,7 @@ public class ReceiptTest {
client((ar -> {
final StompClientConnection connection = ar.result();
- connection.subscribe("/queue", frames::add).onComplete(receipts::add);
+ connection.subscribe("/queue", frames::add).onComplete(a -> receipts.add(a));
}));
Awaitility.waitAtMost(10, TimeUnit.SECONDS).until(() -> receipts.size() == 1);
@@ -213,14 +213,14 @@ public class ReceiptTest {
client((ar -> {
final StompClientConnection connection = ar.result();
connection.errorHandler(errors::add);
- connection.beginTX("my-tx").onComplete(receipts::add);
+ connection.beginTX("my-tx").onComplete(a -> receipts.add(a));
connection.send(new Frame().setCommand(Command.SEND).setDestination("/queue").setTransaction("my-tx")
- .setBody(Buffer.buffer("Hello"))).onComplete(receipts::add);
+ .setBody(Buffer.buffer("Hello"))).onComplete(a -> receipts.add(a));
connection.send(new Frame().setCommand(Command.SEND).setDestination("/queue").setTransaction("my-tx").setBody(
- Buffer.buffer("World"))).onComplete(receipts::add);
+ Buffer.buffer("World"))).onComplete(a -> receipts.add(a));
connection.send(new Frame().setCommand(Command.SEND).setDestination("/queue").setTransaction("my-tx")
- .setBody(Buffer.buffer("!!!"))).onComplete(receipts::add);
- connection.abort("my-tx").onComplete(receipts::add);
+ .setBody(Buffer.buffer("!!!"))).onComplete(a -> receipts.add(a));
+ connection.abort("my-tx").onComplete(a -> receipts.add(a));
}));
Awaitility.waitAtMost(10, TimeUnit.SECONDS).until(() -> frames.size() == 0 && errors.isEmpty()
diff --git a/src/main/java/io/vertx/ext/mail/impl/SMTPConnection.java b/src/main/java/io/vertx/ext/mail/impl/SMTPConnection.java
index 1a43043..039a9e0 100644
--- a/src/main/java/io/vertx/ext/mail/impl/SMTPConnection.java
+++ b/src/main/java/io/vertx/ext/mail/impl/SMTPConnection.java
@@ -58,9 +58,9 @@ public class SMTPConnection {
private boolean inuse;
private boolean quitSent;
- private Handler<AsyncResult<String>> commandReplyHandler;
+ private Promise<String> commandReplyHandler;
private Handler<Throwable> exceptionHandler;
- private Handler<AsyncResult<Void>> closeHandler;
+ private Promise<Void> closeHandler;
private Capabilities capa = new Capabilities();
private final ContextInternal context;
private long expirationTimestamp;
@@ -104,7 +104,7 @@ public class SMTPConnection {
log.error("dropping reply arriving after we stopped processing the buffer.");
} else {
// make sure we only call the handler once
- Handler<AsyncResult<String>> currentHandler = commandReplyHandler;
+ Promise<String> currentHandler = commandReplyHandler;
commandReplyHandler = null;
if (currentHandler != null) {
currentHandler.handle(Future.succeededFuture(buffer.toString()));
@@ -265,7 +265,7 @@ public class SMTPConnection {
private void handleError(Throwable t) {
context.emit(roc -> {
- Handler<AsyncResult<String>> currentHandler = commandReplyHandler;
+ Promise<String> currentHandler = commandReplyHandler;
if (currentHandler != null) {
commandReplyHandler = null;
currentHandler.handle(Future.failedFuture(t));
diff --git a/vertx-grpc/src/main/java/io/vertx/grpc/VertxServer.java b/vertx-grpc/src/main/java/io/vertx/grpc/VertxServer.java
index 19e4573..356369e 100644
--- a/vertx-grpc/src/main/java/io/vertx/grpc/VertxServer.java
+++ b/vertx-grpc/src/main/java/io/vertx/grpc/VertxServer.java
@@ -171,23 +171,25 @@ public class VertxServer extends Server {
@Override
public VertxServer start() throws IOException {
- return start(ar -> {});
+ start();
+ return this;
}
- public VertxServer start(Handler<AsyncResult<Void>> completionHandler) {
+ public Future<Void> start2() {
if (id.port > 0) {
actual = map.computeIfAbsent(id, id -> new ActualServer(context.owner(), id, options, builder, commandDecorator));
} else {
actual = new ActualServer(context.owner(), id, options, builder, commandDecorator);
}
+ Promise<Void> p = Promise.promise();
actual.start(context, ar1 -> {
if (ar1.succeeded()) {
hook = this::shutdown;
context.addCloseHook(hook);
}
- completionHandler.handle(ar1);
+ p.handle(ar1);
});
- return this;
+ return p.future();
}
@Override
diff --git a/vertx-grpc/src/test/java/io/vertx/ext/grpc/CommandDecoratorTest.java b/vertx-grpc/src/test/java/io/vertx/ext/grpc/CommandDecoratorTest.java
index 2b76932..098e9e7 100644
--- a/vertx-grpc/src/test/java/io/vertx/ext/grpc/CommandDecoratorTest.java
+++ b/vertx-grpc/src/test/java/io/vertx/ext/grpc/CommandDecoratorTest.java
@@ -38,7 +38,7 @@ public class CommandDecoratorTest extends GrpcTestBase {
}
}).build();
- server.start(ar -> {
+ server.start2().onComplete(ar -> {
if (ar.succeeded()) {
if (server.getRawServer() == null) {
should.fail("The underlying server not exposed (server.getRawServer())");
diff --git a/vertx-grpc/src/test/java/io/vertx/ext/grpc/GrpcTestBase.java b/vertx-grpc/src/test/java/io/vertx/ext/grpc/GrpcTestBase.java
index 2222711..b7588b2 100644
--- a/vertx-grpc/src/test/java/io/vertx/ext/grpc/GrpcTestBase.java
+++ b/vertx-grpc/src/test/java/io/vertx/ext/grpc/GrpcTestBase.java
@@ -61,13 +61,10 @@ public abstract class GrpcTestBase {
}
Future<Void> startServer(BindableService service, VertxServerBuilder builder) {
- Promise<Void> promise = Promise.promise();
server = builder
.addService(service)
- .build()
- .start(promise);
-
- return promise.future();
+ .build();
+ return server.start2();
}
Future<Void> startServer(ServerServiceDefinition service) {
@@ -89,7 +86,8 @@ public abstract class GrpcTestBase {
void startServer(ServerServiceDefinition service, VertxServerBuilder builder, Handler<AsyncResult<Void>> completionHandler) {
server = builder
.addService(service)
- .build()
- .start(completionHandler);
+ .build();
+ server.start2()
+ .onComplete(completionHandler);
}
}
diff --git a/vertx-grpc/src/test/java/io/vertx/ext/grpc/NativeTransportTest.java b/vertx-grpc/src/test/java/io/vertx/ext/grpc/NativeTransportTest.java
index ab93434..0ad3650 100644
--- a/vertx-grpc/src/test/java/io/vertx/ext/grpc/NativeTransportTest.java
+++ b/vertx-grpc/src/test/java/io/vertx/ext/grpc/NativeTransportTest.java
@@ -31,7 +31,8 @@ public class NativeTransportTest {
VertxServerBuilder.forPort(vertx, 0)
.addService(new GreeterGrpc.GreeterImplBase() { })
.build()
- .start(ctx.asyncAssertSuccess());
+ .start2()
+ .onComplete(ctx.asyncAssertSuccess());
}
private void assumeNativeTransport() {
diff --git a/vertx-grpc/src/test/java/io/vertx/ext/grpc/RpcTest.java b/vertx-grpc/src/test/java/io/vertx/ext/grpc/RpcTest.java
index 007eace..95e1c53 100644
--- a/vertx-grpc/src/test/java/io/vertx/ext/grpc/RpcTest.java
+++ b/vertx-grpc/src/test/java/io/vertx/ext/grpc/RpcTest.java
@@ -139,8 +139,9 @@ public class RpcTest extends GrpcTestBase {
};
server = VertxServerBuilder.forPort(vertx, port)
.addService(ServerInterceptors.intercept(service, BlockingServerInterceptor.wrap(vertx, blockingInterceptor)))
- .build()
- .start(ar -> {
+ .build();
+ server
+ .start2().onComplete(ar -> {
if (ar.failed()) {
should.fail(ar.cause());
return;
diff --git a/vertx-grpc/src/test/java/io/vertx/ext/grpc/VerticleTest.java b/vertx-grpc/src/test/java/io/vertx/ext/grpc/VerticleTest.java
index 6c80f77..4753f0b 100644
--- a/vertx-grpc/src/test/java/io/vertx/ext/grpc/VerticleTest.java
+++ b/vertx-grpc/src/test/java/io/vertx/ext/grpc/VerticleTest.java
@@ -64,7 +64,7 @@ public class VerticleTest {
}
};
server = VertxServerBuilder.forPort(vertx, port).addService(service).build();
- server.start(startFuture);
+ server.start2().onComplete(startFuture);
}
@Override
diff --git a/src/main/java/io/vertx/amqp/impl/AmqpClientImpl.java b/src/main/java/io/vertx/amqp/impl/AmqpClientImpl.java
index 07fccca..470e7e7 100644
--- a/src/main/java/io/vertx/amqp/impl/AmqpClientImpl.java
+++ b/src/main/java/io/vertx/amqp/impl/AmqpClientImpl.java
@@ -68,7 +68,7 @@ public class AmqpClientImpl implements AmqpClient {
return future;
}
- public void close(Handler<AsyncResult<Void>> handler) {
+ public void close(Promise<Void> handler) {
List<Future<Void>> actions = new ArrayList<>();
for (AmqpConnection connection : connections) {
actions.add(connection.close());
@@ -104,7 +104,7 @@ public class AmqpClientImpl implements AmqpClient {
}
public AmqpClient createReceiver(String address,
- Handler<AsyncResult<AmqpReceiver>> completionHandler) {
+ Promise<AmqpReceiver> completionHandler) {
return connect(res -> {
if (res.failed()) {
completionHandler.handle(res.mapEmpty());
@@ -121,7 +121,7 @@ public class AmqpClientImpl implements AmqpClient {
return promise.future();
}
- public AmqpClient createReceiver(String address, AmqpReceiverOptions receiverOptions, Handler<AsyncResult<AmqpReceiver>> completionHandler) {
+ public AmqpClient createReceiver(String address, AmqpReceiverOptions receiverOptions, Promise<AmqpReceiver> completionHandler) {
return connect(res -> {
if (res.failed()) {
completionHandler.handle(res.mapEmpty());
@@ -138,7 +138,7 @@ public class AmqpClientImpl implements AmqpClient {
return promise.future();
}
- public AmqpClient createSender(String address, Handler<AsyncResult<AmqpSender>> completionHandler) {
+ public AmqpClient createSender(String address, Promise<AmqpSender> completionHandler) {
return connect(res -> {
if (res.failed()) {
completionHandler.handle(res.mapEmpty());
@@ -156,7 +156,7 @@ public class AmqpClientImpl implements AmqpClient {
}
public AmqpClient createSender(String address, AmqpSenderOptions options,
- Handler<AsyncResult<AmqpSender>> completionHandler) {
+ Promise<AmqpSender> completionHandler) {
return connect(res -> {
if (res.failed()) {
completionHandler.handle(res.mapEmpty());
diff --git a/src/main/java/io/vertx/amqp/impl/AmqpConnectionImpl.java b/src/main/java/io/vertx/amqp/impl/AmqpConnectionImpl.java
index 139d943..6d2afbf 100644
--- a/src/main/java/io/vertx/amqp/impl/AmqpConnectionImpl.java
+++ b/src/main/java/io/vertx/amqp/impl/AmqpConnectionImpl.java
@@ -210,7 +210,7 @@ public class AmqpConnectionImpl implements AmqpConnection {
return this;
}
- public AmqpConnection close(Handler<AsyncResult<Void>> done) {
+ public AmqpConnection close(Promise<Void> done) {
context.runOnContext(ignored -> {
ProtonConnection actualConnection = connection.get();
if (actualConnection == null || closed.get() || (!isLocalOpen() && !isRemoteOpen())) {
@@ -268,7 +268,7 @@ public class AmqpConnectionImpl implements AmqpConnection {
receivers.remove(receiver);
}
- public AmqpConnection createDynamicReceiver(Handler<AsyncResult<AmqpReceiver>> completionHandler) {
+ public AmqpConnection createDynamicReceiver(Promise<AmqpReceiver> completionHandler) {
return createReceiver(null, new AmqpReceiverOptions().setDynamic(true), completionHandler);
}
@@ -279,7 +279,7 @@ public class AmqpConnectionImpl implements AmqpConnection {
return promise.future();
}
- public AmqpConnection createReceiver(String address, Handler<AsyncResult<AmqpReceiver>> completionHandler) {
+ public AmqpConnection createReceiver(String address, Promise<AmqpReceiver> completionHandler) {
ProtonLinkOptions opts = new ProtonLinkOptions();
runWithTrampoline(x -> {
@@ -305,7 +305,7 @@ public class AmqpConnectionImpl implements AmqpConnection {
}
public AmqpConnection createReceiver(String address, AmqpReceiverOptions receiverOptions,
- Handler<AsyncResult<AmqpReceiver>> completionHandler) {
+ Promise<AmqpReceiver> completionHandler) {
ProtonLinkOptions opts = new ProtonLinkOptions();
AmqpReceiverOptions recOpts = receiverOptions == null ? new AmqpReceiverOptions() : receiverOptions;
opts
@@ -370,7 +370,7 @@ public class AmqpConnectionImpl implements AmqpConnection {
}
}
- public AmqpConnection createSender(String address, Handler<AsyncResult<AmqpSender>> completionHandler) {
+ public AmqpConnection createSender(String address, Promise<AmqpSender> completionHandler) {
Objects.requireNonNull(address, "The address must be set");
return createSender(address, new AmqpSenderOptions(), completionHandler);
}
@@ -383,7 +383,7 @@ public class AmqpConnectionImpl implements AmqpConnection {
}
public AmqpConnection createSender(String address, AmqpSenderOptions options,
- Handler<AsyncResult<AmqpSender>> completionHandler) {
+ Promise<AmqpSender> completionHandler) {
if (address == null && !options.isDynamic()) {
throw new IllegalArgumentException("Address must be set if the link is not dynamic");
}
@@ -431,7 +431,7 @@ public class AmqpConnectionImpl implements AmqpConnection {
return promise.future();
}
- public AmqpConnection createAnonymousSender(Handler<AsyncResult<AmqpSender>> completionHandler) {
+ public AmqpConnection createAnonymousSender(Promise<AmqpSender> completionHandler) {
Objects.requireNonNull(completionHandler, "The completion handler must be set");
runWithTrampoline(x -> {
ProtonConnection conn = connection.get();
diff --git a/src/main/java/io/vertx/amqp/impl/AmqpReceiverImpl.java b/src/main/java/io/vertx/amqp/impl/AmqpReceiverImpl.java
index 8d2f559..40faa62 100644
--- a/src/main/java/io/vertx/amqp/impl/AmqpReceiverImpl.java
+++ b/src/main/java/io/vertx/amqp/impl/AmqpReceiverImpl.java
@@ -68,7 +68,7 @@ public class AmqpReceiverImpl implements AmqpReceiver {
AmqpConnectionImpl connection,
AmqpReceiverOptions options,
ProtonReceiver receiver,
- Handler<AsyncResult<AmqpReceiver>> completionHandler) {
+ Promise<AmqpReceiver> completionHandler) {
this.address = address;
this.receiver = receiver;
this.connection = connection;
@@ -310,10 +310,10 @@ public class AmqpReceiverImpl implements AmqpReceiver {
return connection;
}
- public void close(Handler<AsyncResult<Void>> handler) {
- Handler<AsyncResult<Void>> actualHandler;
+ public void close(Promise<Void> handler) {
+ Promise<Void> actualHandler;
if (handler == null) {
- actualHandler = x -> { /* NOOP */ };
+ actualHandler = Promise.promise();
} else {
actualHandler = handler;
}
diff --git a/src/main/java/io/vertx/amqp/impl/AmqpSenderImpl.java b/src/main/java/io/vertx/amqp/impl/AmqpSenderImpl.java
index 9d32620..f3d2c9e 100644
--- a/src/main/java/io/vertx/amqp/impl/AmqpSenderImpl.java
+++ b/src/main/java/io/vertx/amqp/impl/AmqpSenderImpl.java
@@ -40,7 +40,7 @@ public class AmqpSenderImpl implements AmqpSender {
private long remoteCredit = 0;
private AmqpSenderImpl(ProtonSender sender, AmqpConnectionImpl connection,
- Handler<AsyncResult<AmqpSender>> completionHandler) {
+ Promise<AmqpSender> completionHandler) {
this.sender = sender;
this.connection = connection;
@@ -86,7 +86,7 @@ public class AmqpSenderImpl implements AmqpSender {
* @param completionHandler the completion handler
*/
static void create(ProtonSender sender, AmqpConnectionImpl connection,
- Handler<AsyncResult<AmqpSender>> completionHandler) {
+ Promise<AmqpSender> completionHandler) {
new AmqpSenderImpl(sender, connection, completionHandler);
}
@@ -137,17 +137,13 @@ public class AmqpSenderImpl implements AmqpSender {
return doSend(message, null);
}
- private AmqpSender doSend(AmqpMessage message, Handler<AsyncResult<Void>> acknowledgmentHandler) {
+ private AmqpSender doSend(AmqpMessage message, Promise<Void> acknowledgmentHandler) {
Handler<ProtonDelivery> ack = delivery -> {
DeliveryState remoteState = delivery.getRemoteState();
- Handler<AsyncResult<Void>> handler = acknowledgmentHandler;
+ Promise<Void> handler = acknowledgmentHandler;
if (acknowledgmentHandler == null) {
- handler = ar -> {
- if (ar.failed()) {
- LOGGER.warn("Message rejected by remote peer", ar.cause());
- }
- };
+ handler = Promise.promise();
}
if (remoteState == null) {
@@ -233,7 +229,7 @@ public class AmqpSenderImpl implements AmqpSender {
return this;
}
- public AmqpSender sendWithAck(AmqpMessage message, Handler<AsyncResult<Void>> acknowledgementHandler) {
+ public AmqpSender sendWithAck(AmqpMessage message, Promise<Void> acknowledgementHandler) {
return doSend(message, acknowledgementHandler);
}
@@ -244,10 +240,10 @@ public class AmqpSenderImpl implements AmqpSender {
return promise.future();
}
- public void close(Handler<AsyncResult<Void>> handler) {
- Handler<AsyncResult<Void>> actualHandler;
+ public void close(Promise<Void> handler) {
+ Promise<Void> actualHandler;
if (handler == null) {
- actualHandler = x -> { /* NOOP */ };
+ actualHandler = Promise.promise();
} else {
actualHandler = handler;
}
diff --git a/src/main/java/io/vertx/kafka/client/common/impl/CloseHandler.java b/src/main/java/io/vertx/kafka/client/common/impl/CloseHandler.java
index 7f5b89d..ebe2472 100644
--- a/src/main/java/io/vertx/kafka/client/common/impl/CloseHandler.java
+++ b/src/main/java/io/vertx/kafka/client/common/impl/CloseHandler.java
@@ -15,10 +15,7 @@
*/
package io.vertx.kafka.client.common.impl;
-import io.vertx.core.AsyncResult;
-import io.vertx.core.Closeable;
-import io.vertx.core.Future;
-import io.vertx.core.Handler;
+import io.vertx.core.*;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.internal.VertxInternal;
@@ -34,9 +31,9 @@ public class CloseHandler {
private Closeable closeable;
private Runnable closeableHookCleanup;
- private final BiConsumer<Long, Handler<AsyncResult<Void>>> close;
+ private final BiConsumer<Long, Promise<Void>> close;
- public CloseHandler(BiConsumer<Long, Handler<AsyncResult<Void>>> close) {
+ public CloseHandler(BiConsumer<Long, Promise<Void>> close) {
this.close = close;
}
@@ -80,15 +77,15 @@ public class CloseHandler {
public void close() {
unregisterCloseHook();
- close.accept(0L, ar -> {});
+ close.accept(0L, Promise.promise());
}
- public void close(Handler<AsyncResult<Void>> completionHandler) {
+ public void close(Promise<Void> completionHandler) {
unregisterCloseHook();
close.accept(0L, completionHandler);
}
- public void close(long timeout, Handler<AsyncResult<Void>> completionHandler) {
+ public void close(long timeout, Promise<Void> completionHandler) {
unregisterCloseHook();
close.accept(timeout, completionHandler);
}
diff --git a/src/main/java/io/vertx/kafka/client/consumer/impl/KafkaReadStreamImpl.java b/src/main/java/io/vertx/kafka/client/consumer/impl/KafkaReadStreamImpl.java
index 0cc9b98..1386792 100644
--- a/src/main/java/io/vertx/kafka/client/consumer/impl/KafkaReadStreamImpl.java
+++ b/src/main/java/io/vertx/kafka/client/consumer/impl/KafkaReadStreamImpl.java
@@ -111,12 +111,12 @@ public class KafkaReadStreamImpl<K, V> implements KafkaReadStream<K, V> {
this.tracer = ConsumerTracer.create(ctxInt.tracer(), options);
}
- private <T> void start(java.util.function.BiConsumer<Consumer<K, V>, Promise<T>> task, Handler<AsyncResult<T>> handler) {
+ private <T> void start(java.util.function.BiConsumer<Consumer<K, V>, Promise<T>> task, Promise<T> handler) {
this.worker = Executors.newSingleThreadExecutor(r -> new Thread(r, "vert.x-kafka-consumer-thread-" + threadCount.getAndIncrement()));
this.submitTaskWhenStarted(task, handler);
}
- private <T> void submitTaskWhenStarted(java.util.function.BiConsumer<Consumer<K, V>, Promise<T>> task, Handler<AsyncResult<T>> handler) {
+ private <T> void submitTaskWhenStarted(java.util.function.BiConsumer<Consumer<K, V>, Promise<T>> task, Promise<T> handler) {
if (worker == null) {
throw new IllegalStateException();
}
@@ -264,7 +264,7 @@ public class KafkaReadStreamImpl<K, V> implements KafkaReadStream<K, V> {
}
protected <T> void submitTask(java.util.function.BiConsumer<Consumer<K, V>, Promise<T>> task,
- Handler<AsyncResult<T>> handler) {
+ Promise<T> handler) {
if (this.closed.compareAndSet(true, false)) {
this.start(task, handler);
} else {
diff --git a/src/main/java/io/vertx/rabbitmq/impl/RabbitMQClientImpl.java b/src/main/java/io/vertx/rabbitmq/impl/RabbitMQClientImpl.java
index 50b4fff..5f1fafe 100644
--- a/src/main/java/io/vertx/rabbitmq/impl/RabbitMQClientImpl.java
+++ b/src/main/java/io/vertx/rabbitmq/impl/RabbitMQClientImpl.java
@@ -203,7 +203,8 @@ public class RabbitMQClientImpl implements RabbitMQClient, ShutdownListener {
if (handler.queue().isCancelled()) {
return;
}
- restartConnect(0, rh -> {
+ Future.<Void>future(p -> restartConnect(0, p))
+ .onComplete(rh -> {
forChannel(chan -> {
RabbitMQConsumer q = handler.queue();
chan.basicConsume(q.queueName(), options.isAutoAck(), options.getConsumerTag(),
@@ -234,7 +235,7 @@ public class RabbitMQClientImpl implements RabbitMQClient, ShutdownListener {
* @param attempts number of attempts
* @param resultHandler handler called when operation is done with a result of the operation
*/
- private void restartConnect(int attempts, Handler<AsyncResult<Void>> resultHandler) {
+ private void restartConnect(int attempts, Promise<Void> resultHandler) {
if (retries == 0) {
log.error("Retries disabled. Will not attempt to restart");
resultHandler.handle(Future.failedFuture("Retries disabled. Will not attempt to restart"));
@@ -261,7 +262,7 @@ public class RabbitMQClientImpl implements RabbitMQClient, ShutdownListener {
}
- private void execRestart(int attempts, Handler<AsyncResult<Void>> resultHandler) {
+ private void execRestart(int attempts, Promise<Void> resultHandler) {
stop().onComplete(ar -> {
if (ar.succeeded()) {
if (attempts >= retries) {
@@ -659,7 +660,7 @@ public class RabbitMQClientImpl implements RabbitMQClient, ShutdownListener {
}
log.info("RabbitMQ connection shutdown! The client will attempt to reconnect automatically", cause);
//Make sure to perform reconnection
- restartConnect(0, rh -> {
+ Future.<Void>future(p -> restartConnect(0, p)).onComplete(rh -> {
log.info("reconnect success");
});
}
diff --git a/src/main/java/io/vertx/rabbitmq/impl/RabbitMQPublisherImpl.java b/src/main/java/io/vertx/rabbitmq/impl/RabbitMQPublisherImpl.java
index 5f83683..85a05f4 100644
--- a/src/main/java/io/vertx/rabbitmq/impl/RabbitMQPublisherImpl.java
+++ b/src/main/java/io/vertx/rabbitmq/impl/RabbitMQPublisherImpl.java
@@ -63,13 +63,13 @@ public class RabbitMQPublisherImpl implements RabbitMQPublisher, ReadStream<Rabb
private final String routingKey;
private final BasicProperties properties;
private final Buffer message;
- private final Handler<AsyncResult<Void>> publishHandler;
- private final Handler<AsyncResult<Long>> confirmHandler;
+ private final Promise<Void> publishHandler;
+ private final Promise<Long> confirmHandler;
private volatile long deliveryTag;
MessageDetails(String exchange, String routingKey, BasicProperties properties, Buffer message,
- Handler<AsyncResult<Void>> publishHandler,
- Handler<AsyncResult<Long>> confirmHandler) {
+ Promise<Void> publishHandler,
+ Promise<Long> confirmHandler) {
this.exchange = exchange;
this.routingKey = routingKey;
this.properties = properties;
@@ -113,7 +113,7 @@ public class RabbitMQPublisherImpl implements RabbitMQPublisher, ReadStream<Rabb
return promise.future();
}
- private void stop(Handler<AsyncResult<Void>> resultHandler) {
+ private void stop(Promise<Void> resultHandler) {
stopped = true;
sendQueue.pause();
if (sendQueue.isEmpty()) {
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment