Last active
June 28, 2017 18:41
-
-
Save choffmeister/bd3522bd22bb361eb5eee049afceb3f0 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
akka.cluster.seed.zookeeper { | |
url = ${AKKA_CLUSTER_SEED_ZOOKEEPER_URL} | |
path = "/akka/cluster/seed/documents" | |
} | |
lagom { | |
discovery { | |
zookeeper { | |
server-hostname = ${LAGOM_DISCOVERY_ZOOKEEPER_SERVER_HOSTNAME} | |
server-port = 2181 | |
server-port = ${?LAGOM_DISCOVERY_ZOOKEEPER_SERVER_PORT} | |
uri-scheme = "http" | |
uri-scheme = ${?LAGOM_DISCOVERY_ZOOKEEPER_URI_SCHEME} | |
routing-policy = "round-robin" | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
libraryDependencies += Seq( | |
"com.sclasen" %% "akka-zk-cluster-seed" % "0.1.8", | |
"org.apache.curator" % "curator-x-discovery" % "2.11.0" | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor.ActorSystem | |
import akka.cluster.seed.ZookeeperClusterSeed | |
trait MyAkkaSeedComponent { | |
def actorSystem: ActorSystem | |
val extension = ZookeeperClusterSeed(actorSystem) | |
extension.join() | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.Closeable | |
import java.net.{InetAddress, URI} | |
import java.util.concurrent.ConcurrentHashMap | |
import com.lightbend.lagom.scaladsl.api.Descriptor.Call | |
import com.lightbend.lagom.scaladsl.api.ServiceLocator | |
import com.typesafe.config.Config | |
import com.typesafe.config.ConfigException.BadValue | |
import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory} | |
import org.apache.curator.retry.ExponentialBackoffRetry | |
import org.apache.curator.utils.CloseableUtils | |
import org.apache.curator.x.discovery.{ServiceDiscovery, ServiceDiscoveryBuilder, ServiceInstance} | |
import play.api.Logger | |
import scala.collection.concurrent.Map | |
import scala.collection.convert.decorateAsScala._ | |
import scala.concurrent.{ExecutionContext, Future} | |
import scala.util.Random | |
class MyServiceLocator(config: Config) extends ServiceLocator with Closeable { | |
import MyServiceLocator._ | |
lazy val log = Logger(getClass) | |
val locatorConfig = config.getConfig("lagom.discovery.zookeeper") | |
val serverHostname = locatorConfig.getString("server-hostname") | |
val serverPort = locatorConfig.getString("server-port") | |
val scheme = locatorConfig.getString("uri-scheme") | |
val routingPolicy = locatorConfig.getString("routing-policy") | |
val zookeeperUri = s"$serverHostname:$serverPort" | |
val zookeeperServicesPath = "/lagom/services" | |
private val zookeeperClient: CuratorFramework = | |
CuratorFrameworkFactory.newClient(zookeeperUri, new ExponentialBackoffRetry(1000, 3)) | |
private val serviceDiscovery: ServiceDiscovery[String] = | |
ServiceDiscoveryBuilder | |
.builder(classOf[String]) | |
.client(zookeeperClient) | |
.basePath(zookeeperServicesPath) | |
.build() | |
zookeeperClient.start() | |
serviceDiscovery.start() | |
override def doWithService[T](name: String, serviceCall: Call[_, _])(block: (URI) => Future[T])(implicit ec: ExecutionContext): Future[Option[T]] = { | |
locate(name).flatMap { uriOpt => | |
uriOpt.fold(Future.successful(Option.empty[T])) { uri => | |
block.apply(uri).map(Option.apply) | |
} | |
} | |
} | |
override def locate(name: String): Future[Option[URI]] = { | |
val instances = serviceDiscovery.queryForInstances(name).asScala.toList | |
Future.successful( | |
instances.size match { | |
case 0 => None | |
case 1 => toURIs(instances).headOption | |
case _ => instancePicker(name, instances) | |
}) | |
} | |
override def locate(name: String, serviceCall: Call[_, _]): Future[Option[URI]] = | |
locate(name) | |
override def close(): Unit = { | |
CloseableUtils.closeQuietly(serviceDiscovery) | |
CloseableUtils.closeQuietly(zookeeperClient) | |
} | |
private val roundRobinIndexFor: Map[String, Int] = new ConcurrentHashMap[String, Int]().asScala | |
private val instancePicker: InstancePicker[String] = routingPolicy match { | |
case "first" => | |
(_, instances) => Some(pickFirstInstance(instances)) | |
case "random" => | |
(_, instances) => Some(pickRandomInstance(instances)) | |
case "round-robin" => | |
(name, instances) => Some(pickRoundRobinInstance(name, instances)) | |
case unknown => | |
throw new BadValue("lagom.discovery.zookeeper.routing-policy", s"'$unknown' is not a valid routing algorithm") | |
} | |
private def pickFirstInstance(services: List[ServiceInstance[String]]): URI = { | |
assert(services.size > 1) | |
toURIs(services).sortWith(_.toString < _.toString).head | |
} | |
private def pickRandomInstance(services: List[ServiceInstance[String]]): URI = { | |
assert(services.size > 1) | |
toURIs(services).sortWith(_.toString < _.toString).apply(Random.nextInt(services.size - 1)) | |
} | |
private def pickRoundRobinInstance(name: String, services: List[ServiceInstance[String]]): URI = { | |
assert(services.size > 1) | |
roundRobinIndexFor.putIfAbsent(name, 0) | |
val sortedServices = toURIs(services).sortWith(_.toString < _.toString) | |
val currentIndex = roundRobinIndexFor(name) | |
val nextIndex = | |
if (sortedServices.size > currentIndex + 1) currentIndex + 1 | |
else 0 | |
roundRobinIndexFor += (name -> nextIndex) | |
sortedServices(currentIndex) | |
} | |
private def toURIs(services: List[ServiceInstance[String]]): List[URI] = | |
services.map { service => | |
val address = service.getAddress | |
val serviceAddress = | |
if (address == "" || address == "localhost") InetAddress.getLoopbackAddress.getHostAddress | |
else address | |
new URI(s"$scheme://$serviceAddress:${service.getPort}") | |
} | |
} | |
object MyServiceLocator { | |
type InstancePicker[T] = (String, List[ServiceInstance[T]]) => Option[URI] | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.lightbend.lagom.scaladsl.api.ServiceInfo | |
import org.apache.curator.x.discovery.{ServiceInstance, UriSpec} | |
import play.api.Configuration | |
import play.api.inject.ApplicationLifecycle | |
import scala.concurrent.Future | |
trait MyServiceLocatorComponent { | |
def applicationLifecycle: ApplicationLifecycle | |
def configuration: Configuration | |
def serviceInfo: ServiceInfo | |
def serviceLocator = new MyServiceLocator(configuration.underlying) | |
// docker injects this env variable and makes sure that other services | |
// can reach this container under this name | |
val hostname = System.getenv("HOSTNAME") | |
val name = serviceInfo.serviceName | |
val id = s"$name-$hostname" | |
val registry = new MyServiceRegistry(serviceLocator.zookeeperUri, serviceLocator.zookeeperServicesPath) | |
val serviceInstance = ServiceInstance.builder[String]() | |
.name(name) | |
.id(id) | |
.address(hostname) | |
.port(9000) | |
.uriSpec(new UriSpec("{scheme}://{serviceAddress}:{servicePort}")) | |
.build() | |
registry.register(serviceInstance) | |
applicationLifecycle.addStopHook(() => { | |
registry.unregister(serviceInstance) | |
Future.successful() | |
}) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.Closeable | |
import org.apache.curator.framework.CuratorFrameworkFactory | |
import org.apache.curator.retry.ExponentialBackoffRetry | |
import org.apache.curator.utils.CloseableUtils | |
import org.apache.curator.x.discovery.{ServiceDiscoveryBuilder, ServiceInstance} | |
class MyServiceRegistry(val zookeeperUrl: String, val zookeeperServicesPath: String) extends Closeable { | |
val zkClient = CuratorFrameworkFactory.newClient(zookeeperUrl, new ExponentialBackoffRetry(1000, 3)) | |
zkClient.start() | |
val serviceDiscovery = ServiceDiscoveryBuilder.builder(classOf[String]).client(zkClient).basePath(zookeeperServicesPath).build | |
serviceDiscovery.start() | |
def register(serviceInstance: ServiceInstance[String]): Unit = { | |
serviceDiscovery.registerService(serviceInstance) | |
} | |
def unregister(serviceInstance: ServiceInstance[String]): Unit = { | |
serviceDiscovery.unregisterService(serviceInstance) | |
} | |
override def close(): Unit = { | |
CloseableUtils.closeQuietly(serviceDiscovery) | |
CloseableUtils.closeQuietly(zkClient) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment