Created
December 23, 2015 14:04
-
-
Save vaughnd/22fe0670a296ebcb436c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.vaughndickson.elasticsearch | |
import groovy.util.logging.Slf4j | |
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus | |
import org.elasticsearch.client.Client | |
import org.elasticsearch.client.transport.TransportClient | |
import org.elasticsearch.common.settings.ImmutableSettings | |
import org.elasticsearch.common.settings.Settings | |
import org.elasticsearch.common.transport.InetSocketTransportAddress | |
import org.springframework.beans.factory.DisposableBean | |
import org.springframework.beans.factory.FactoryBean | |
import org.springframework.beans.factory.InitializingBean | |
import sun.net.spi.nameservice.dns.DNSNameService | |
@Slf4j | |
public class ElasticsearchClient implements FactoryBean<Client>, InitializingBean, DisposableBean { | |
public enum ELASTIC_SEARCH_TYPE {normal,found} | |
// ES configuration properties | |
String elasticsearchType // normal or found | |
String elasticsearchClientName // identifiable name, e.g. contentStoreEsClient for debugging | |
// local: localhost:9300 | |
// found: xyz.us-east-1.aws.found.io:9343 | |
String elasticsearchTransportUri | |
// local: cluster | |
// found: blank (calculated from TransportUri hash above, e.g. xyz) | |
String elasticsearchClustername | |
// local: blank | |
// found: shield configured username | |
String elasticsearchUsername | |
// local: blank | |
// found: shield configured password | |
String elasticsearchPassword | |
// instance variables | |
protected TransportClient client | |
protected int defaultPort | |
protected Timer dnsRefresherTimer | |
protected final static long DNS_REFRESH_TIME = 60 * 1000L // refresh DNS every minute, as the Found ELB timeout is 5min | |
/** | |
* Build shield client over SSL and authentication. | |
* @return Client client | |
*/ | |
protected TransportClient buildShieldClient(int defaultPort) { | |
String host = elasticsearchTransportUri.substring(0, elasticsearchTransportUri.indexOf(":")) | |
String foundClusterId = host.split("\\.", 2)[0] | |
// Build the settings for our client. | |
Settings settings = ImmutableSettings.settingsBuilder() | |
.put("transport.ping_schedule", "5s") | |
.put("cluster.name", foundClusterId) | |
.put("action.bulk.compress", false) | |
.put("shield.transport.ssl", "true") | |
.put("request.headers.X-Found-Cluster", foundClusterId) | |
.put("shield.user", elasticsearchUsername + ":" + elasticsearchPassword) | |
.build() | |
// Instantiate a TransportClient and add the cluster to the list of addresses to connect to. | |
// Only port 9343 (SSL-encrypted) is currently supported. | |
Client client = new TransportClient(settings) | |
.addTransportAddress(toAddress(elasticsearchTransportUri, defaultPort)) | |
ClusterHealthStatus status = client.admin().cluster().prepareHealth().get().getStatus() | |
log.info("Connected to Elasticsearch instance: " + elasticsearchTransportUri + ", status: " + status + ", client: " + elasticsearchClientName) | |
return client | |
} | |
/** | |
* Build normal elasticsearch transport client using only host and cluster name | |
* @return Client client | |
*/ | |
protected TransportClient buildNormalClient(int defaultPort) { | |
// Build the settings for our client. | |
Settings settings = ImmutableSettings.settingsBuilder() | |
.put("transport.ping_schedule", "5s") | |
.put("cluster.name", elasticsearchClustername) | |
.put("action.bulk.compress", false) | |
.put("shield.enabled", false) | |
.build() | |
TransportClient client = new TransportClient(settings) | |
client.addTransportAddress(toAddress(elasticsearchTransportUri, defaultPort)) | |
ClusterHealthStatus status = client.admin().cluster().prepareHealth().get().getStatus() | |
log.info("Connected to Elasticsearch instance: " + elasticsearchTransportUri + ", status: " + status + ", client: " + elasticsearchClientName) | |
return client | |
} | |
private def toAddress(String address, defaultPort) { | |
if (address == null) return null | |
String[] splitted = address.split(":") | |
int port = defaultPort | |
if (splitted.length > 1) { | |
port = Integer.parseInt(splitted[1]) | |
} | |
return new InetSocketTransportAddress(splitted[0], port) | |
} | |
// Factory bean methods | |
void afterPropertiesSet() throws Exception { | |
log.info("Starting ElasticSearch client " + elasticsearchClientName) | |
if(elasticsearchTransportUri != null) { | |
if(elasticsearchType as ELASTIC_SEARCH_TYPE == ELASTIC_SEARCH_TYPE.normal) { | |
defaultPort = 9300 | |
client = buildNormalClient(defaultPort) | |
} else { | |
defaultPort = 9343 | |
client = buildShieldClient(defaultPort) | |
startDnsRefresherTask() | |
} | |
} else { | |
throw new IllegalArgumentException("No ES transport URI specified, please configure!") | |
} | |
} | |
Client getObject() throws Exception { | |
log.info("ELASTICSEARCH CLIENT: Getting " + elasticsearchClientName + " " + client + " of type " + elasticsearchType) | |
return client | |
} | |
Class<Client> getObjectType() { | |
return Client.class | |
} | |
boolean isSingleton() { | |
return true | |
} | |
void destroy() throws Exception { | |
try { | |
log.info("Closing ElasticSearch client " + elasticsearchClientName) | |
if (client != null) { | |
client.close() | |
} | |
} catch(Exception e) { | |
log.error("Error closing ElasticSearch client: ", e) | |
} | |
} | |
void startDnsRefresherTask() { | |
dnsRefresherTimer = new Timer() | |
dnsRefresherTimer.scheduleAtFixedRate(new DnsRefresherTask(), 0, DNS_REFRESH_TIME) | |
log.info("Started elasticsearch DNS refresh timer task") | |
} | |
/** | |
* Task to check for new IPs attached to our Found hostname every minute and add them to the transport client, | |
* to prevent NoNodeAvailableExceptions. | |
* | |
* We got this exception because we looked up a single IP once on boot, and used it for the lifetime of the service. While | |
* Found says they change the IPs over time, so old ones will eventually stop working without warning. | |
* | |
* The TransportClient checks for duplicates, so we just keep adding all IPs we find. A reachability check is done | |
* by the client too, so we don't check that ourselves. | |
* | |
* We use our own copy of DnsNameService, so that cached IPs don't get used. And we don't have to change the system-level | |
* caching which might break other services. | |
* | |
* Discussion about this: https://discuss.elastic.co/t/nonodeavailableexception-with-java-transport-client/37702 | |
*/ | |
class DnsRefresherTask extends TimerTask { | |
// we try to use our own name service first, so we can skip caching without breaking the rest of the system | |
// by setting the dns ttl too low | |
DNSNameService nameService = new DNSNameService() | |
@Override | |
void run() { | |
try { | |
String[] splitted = elasticsearchTransportUri.split(":") | |
String host = splitted[0] | |
int port = defaultPort | |
if (splitted.length > 1) { | |
port = Integer.parseInt(splitted[1]) | |
} | |
Set<InetAddress> addresses = new HashSet<>() | |
try { | |
// don't try use DNS if we're looking for localhost or already have an IP | |
if(host != "localhost" && Character.digit(host.charAt(0), 16) == -1) { | |
addresses.addAll(nameService.lookupAllHostAddr(host)) | |
} | |
} catch(Exception ex) { | |
log.error("Internal name service failed, trying system one", ex) | |
} | |
if(addresses.isEmpty()) { | |
addresses.addAll(InetAddress.getAllByName(host)) | |
} | |
for(InetAddress addr : addresses) { | |
if(addr instanceof Inet4Address) { | |
log.debug("ES DNS REFRESH (" + client + "): adding " + addr) | |
client.addTransportAddress(new InetSocketTransportAddress(addr, port)) | |
} | |
} | |
} catch(Exception ex) { | |
log.error("Failed to refresh elasticsearch DNS, will retry", ex) | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
The
DnsRefresherTask
is a nice idea!although
DNSNameService nameService = new DNSNameService()
seems to be an issue with Java8, I'm not able to compile it.