Last active
March 25, 2020 14:53
-
-
Save daschl/5715418e6801c5684144079b74bda879 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.testcontainers.couchbase; | |
import com.github.dockerjava.api.command.InspectContainerResponse; | |
import com.github.dockerjava.api.model.ContainerNetwork; | |
import org.testcontainers.containers.GenericContainer; | |
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; | |
import org.testcontainers.shaded.com.fasterxml.jackson.databind.JsonNode; | |
import org.testcontainers.shaded.com.fasterxml.jackson.databind.ObjectMapper; | |
import org.testcontainers.shaded.okhttp3.Credentials; | |
import org.testcontainers.shaded.okhttp3.FormBody; | |
import org.testcontainers.shaded.okhttp3.OkHttpClient; | |
import org.testcontainers.shaded.okhttp3.Request; | |
import org.testcontainers.shaded.okhttp3.RequestBody; | |
import org.testcontainers.shaded.okhttp3.Response; | |
import java.io.IOException; | |
import java.time.Duration; | |
import java.util.ArrayList; | |
import java.util.Arrays; | |
import java.util.EnumSet; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Optional; | |
import java.util.Set; | |
import java.util.stream.Collectors; | |
public class CouchbaseContainer extends GenericContainer<CouchbaseContainer> { | |
private static final ObjectMapper MAPPER = new ObjectMapper(); | |
private static final OkHttpClient HTTP_CLIENT = new OkHttpClient() | |
.newBuilder() | |
.readTimeout(Duration.ofMinutes(1)) | |
.build(); | |
private static final int MGMT_PORT = 8091; | |
private static final int MGMT_SSL_PORT = 18091; | |
private static final int VIEW_PORT = 8092; | |
private static final int VIEW_SSL_PORT = 18092; | |
private static final int QUERY_PORT = 8093; | |
private static final int QUERY_SSL_PORT = 18093; | |
private static final int SEARCH_PORT = 8094; | |
private static final int SEARCH_SSL_PORT = 18094; | |
private static final int KV_PORT = 11210; | |
private static final int KV_SSL_PORT = 11207; | |
public static final String VERSION = "6.5.0"; | |
public static final String DOCKER_IMAGE_NAME = "couchbase/server:"; | |
private String username = "Administrator"; | |
private String password = "password"; | |
private Set<Service> enabledServices = EnumSet.allOf(Service.class); | |
private List<BucketDefinition> buckets = new ArrayList<>(); | |
public CouchbaseContainer() { | |
this(DOCKER_IMAGE_NAME + VERSION); | |
} | |
public CouchbaseContainer(final String imageName) { | |
super(imageName); | |
} | |
public String username() { | |
return username; | |
} | |
public CouchbaseContainer withUsername(final String username) { | |
checkNotRunning(); | |
this.username = username; | |
return this; | |
} | |
public String password() { | |
return password; | |
} | |
public CouchbaseContainer withPassword(final String password) { | |
checkNotRunning(); | |
this.password = password; | |
return this; | |
} | |
public CouchbaseContainer withBucket(final BucketDefinition bucketDefinition) { | |
checkNotRunning(); | |
this.buckets.add(bucketDefinition); | |
return this; | |
} | |
public CouchbaseContainer enabledServices(final Service... enabled) { | |
checkNotRunning(); | |
this.enabledServices = EnumSet.copyOf(Arrays.asList(enabled)); | |
return this; | |
} | |
private void checkNotRunning() { | |
if (isRunning()) { | |
throw new IllegalStateException("Setter can only be called before the container is running"); | |
} | |
} | |
@Override | |
protected void containerIsStarted(final InspectContainerResponse containerInfo) { | |
waitUntilNodeIsOnline(); | |
renameNode(); | |
initializeServices(); | |
addAdminUser(); | |
configureExternalPorts(); | |
if (enabledServices.contains(Service.INDEX)) { | |
configureIndexer(); | |
} | |
waitUntilNodeIsReady(); | |
createBuckets(); | |
} | |
private void configureIndexer() { | |
Response response = doHttpRequest(MGMT_PORT, "/settings/indexes", "POST", new FormBody.Builder() | |
.add("storageMode", "memory_optimized") | |
.build(), true | |
); | |
System.err.println(response); | |
} | |
private void waitUntilNodeIsOnline() { | |
new HttpWaitStrategy().forPort(MGMT_PORT).forPath("/pools").forStatusCode(200).waitUntilReady(this); | |
} | |
private void renameNode() { | |
Response response = doHttpRequest(MGMT_PORT, "/node/controller/rename", "POST", new FormBody.Builder() | |
.add("hostname", getInternalIpAddress()) | |
.build(), false | |
); | |
System.err.println(response); | |
} | |
private String getInternalIpAddress() { | |
final Map<String, ContainerNetwork> networks = getContainerInfo().getNetworkSettings().getNetworks(); | |
for (ContainerNetwork network : networks.values()) { | |
return network.getIpAddress(); | |
} | |
throw new IllegalStateException("No network available to extract the internal IP from!"); | |
} | |
private void initializeServices() { | |
final String services = enabledServices.stream().map(s -> { | |
switch (s) { | |
case KV: return "kv"; | |
case QUERY: return "n1ql"; | |
case INDEX: return "index"; | |
case SEARCH: return "fts"; | |
default: throw new IllegalStateException("Unknown service!"); | |
} | |
}).collect(Collectors.joining(",")); | |
Response response = doHttpRequest(MGMT_PORT, "/node/controller/setupServices", "POST", new FormBody.Builder() | |
.add("services", services) | |
.build(), false | |
); | |
System.err.println(response); | |
} | |
private void addAdminUser() { | |
Response response = doHttpRequest(MGMT_PORT, "/settings/web", "POST", new FormBody.Builder() | |
.add("username", username()) | |
.add("password", password()) | |
.add("port", Integer.toString(MGMT_PORT)) | |
.build(), false); | |
System.err.println(response); | |
} | |
private void configureExternalPorts() { | |
final FormBody.Builder builder = new FormBody.Builder(); | |
builder.add("hostname", getContainerIpAddress()); | |
builder.add("mgmt", Integer.toString(getMappedPort(MGMT_PORT))); | |
builder.add("mgmtSSL", Integer.toString(getMappedPort(MGMT_SSL_PORT))); | |
if (enabledServices.contains(Service.KV)) { | |
builder.add("kv", Integer.toString(getMappedPort(KV_PORT))); | |
builder.add("kvSSL", Integer.toString(getMappedPort(KV_SSL_PORT))); | |
builder.add("capi", Integer.toString(getMappedPort(VIEW_PORT))); | |
builder.add("capiSSL", Integer.toString(getMappedPort(VIEW_SSL_PORT))); | |
} | |
if (enabledServices.contains(Service.QUERY)) { | |
builder.add("n1ql", Integer.toString(getMappedPort(QUERY_PORT))); | |
builder.add("n1qlSSL", Integer.toString(getMappedPort(QUERY_SSL_PORT))); | |
} | |
if (enabledServices.contains(Service.SEARCH)) { | |
builder.add("fts", Integer.toString(getMappedPort(SEARCH_PORT))); | |
builder.add("ftsSSL", Integer.toString(getMappedPort(SEARCH_SSL_PORT))); | |
} | |
final Response response = doHttpRequest( | |
MGMT_PORT, | |
"/node/controller/setupAlternateAddresses/external", | |
"PUT", | |
builder.build(), | |
true | |
); | |
System.err.println(response); | |
} | |
private void waitUntilNodeIsReady() { | |
new HttpWaitStrategy() | |
.forPath("/pools/default") | |
.forPort(MGMT_PORT) | |
.withBasicCredentials(username(), password()) | |
.forStatusCode(200) | |
.forResponsePredicate(response -> { | |
try { | |
return Optional.of(MAPPER.readTree(response)) | |
.map(n -> n.at("/nodes/0/status")) | |
.map(JsonNode::asText) | |
.map("healthy"::equals) | |
.orElse(false); | |
} catch (IOException e) { | |
logger().error("Unable to parse response {}", response); | |
return false; | |
} | |
}).waitUntilReady(this); | |
} | |
private void createBuckets() { | |
for (BucketDefinition bucket : buckets) { | |
Response response = doHttpRequest(MGMT_PORT, "/pools/default/buckets", "POST", new FormBody.Builder() | |
.add("name", bucket.name) | |
.add("ramQuotaMB", Integer.toString(bucket.quota)) | |
.build(), true); | |
System.err.println(response); | |
if (bucket.queryPrimaryIndex) { | |
Response queryResponse = doHttpRequest(QUERY_PORT, "/query/service", "POST", new FormBody.Builder() | |
.add("statement", "CREATE PRIMARY INDEX on `" + bucket.name + "`") | |
.build(), true); | |
System.err.println(queryResponse); | |
} | |
} | |
} | |
private Response doHttpRequest(int port, final String path, final String method, final RequestBody body, | |
final boolean auth) { | |
try { | |
Request.Builder requestBuilder = new Request.Builder() | |
.url("http://" + getContainerIpAddress() + ":" + getMappedPort(port) + path); | |
if (auth) { | |
requestBuilder = requestBuilder.header("Authorization", Credentials.basic(username(), password())); | |
} | |
if (body == null) { | |
requestBuilder = requestBuilder.get(); | |
} else { | |
requestBuilder = requestBuilder.method(method, body); | |
} | |
return HTTP_CLIENT.newCall(requestBuilder.build()).execute(); | |
} catch (Exception ex) { | |
throw new RuntimeException("Could not perform request", ex); | |
} | |
} | |
public enum Service { | |
KV, | |
QUERY, | |
SEARCH, | |
INDEX, | |
} | |
public static class BucketDefinition { | |
private final String name; | |
private boolean queryPrimaryIndex = true; | |
private int quota = 100; | |
public BucketDefinition(String name) { | |
this.name = name; | |
} | |
public BucketDefinition withQuota(final int quota) { | |
if (quota < 100) { | |
throw new IllegalArgumentException("Bucket quota cannot be less than 100MB!"); | |
} | |
this.quota = quota; | |
return this; | |
} | |
public BucketDefinition createQueryPrimaryIndex(final boolean create) { | |
this.queryPrimaryIndex = create; | |
return this; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment