Comment créer une application simple avec Akka Cluster

Si vous lisez mon histoire précédente sur Scalachain, vous avez probablement remarqué que c'est loin d'être un système distribué. Il manque toutes les fonctionnalités pour fonctionner correctement avec d'autres nœuds. Ajoutez à cela qu'une blockchain composée d'un seul nœud est inutile. Pour cette raison, j'ai décidé qu'il était temps de travailler sur la question.

Puisque Scalachain est propulsé par Akka, pourquoi ne pas saisir l'occasion de jouer avec Akka Cluster? J'ai créé un projet simple pour bricoler un peu avec Akka Cluster, et dans cette histoire je vais partager mes apprentissages. Nous allons créer un cluster de trois nœuds, en utilisant des routeurs Cluster Aware pour équilibrer la charge entre eux. Tout fonctionnera dans un conteneur Docker et nous utiliserons docker-compose pour un déploiement facile.

Ok, allons-y! ?

Introduction rapide à Akka Cluster

Akka Cluster fournit un excellent support pour la création d'applications distribuées. Le meilleur cas d'utilisation est celui où vous avez un nœud que vous souhaitez répliquer N fois dans un environnement distribué. Cela signifie que tous les N nœuds sont des pairs exécutant le même code. Akka Cluster vous offre la découverte immédiate des membres du même cluster. En utilisant des routeurs sensibles au cluster, il est possible d'équilibrer les messages entre les acteurs de différents nœuds. Il est également possible de choisir la politique d'équilibrage, faisant de l'équilibrage un jeu d'enfant!

En fait, vous pouvez choisir entre deux types de routeurs:

Routeur de groupe - Les acteurs auxquels envoyer les messages - les routes appelées - sont spécifiés en utilisant leur chemin d'acteur. Les routeurs partagent les routes créées dans le cluster. Nous utiliserons un routeur de groupe dans cet exemple.

Pool Router - Les routes sont créées et déployées par le routeur, ce sont donc ses enfants dans la hiérarchie des acteurs. Les routes ne sont pas partagées entre les routeurs. C'est idéal pour un scénario de réplique principal, où chaque routeur est le principal et achemine les répliques.

Ce n'est que la pointe de l'iceberg, je vous invite donc à lire la documentation officielle pour plus d'informations.

Un cluster pour les calculs mathématiques

Imaginons un scénario de cas d'utilisation. Supposons de concevoir un système pour exécuter des calculs mathématiques sur demande. Le système est déployé en ligne, il a donc besoin d'une API REST pour recevoir les demandes de calcul. Un processeur interne gère ces requêtes, exécutant le calcul et renvoyant le résultat.

Pour le moment, le processeur ne peut calculer que le nombre de Fibonacci. Nous décidons d'utiliser un cluster de nœuds pour répartir la charge entre les nœuds et améliorer les performances. Akka Cluster gérera la dynamique du cluster et l'équilibrage de charge entre les nœuds. Ça a l'air bien!

Hiérarchie des acteurs

Tout d'abord, nous devons définir notre hiérarchie d'acteurs. Le système peut être divisé en trois parties fonctionnelles: la logique métier , la gestion du cluster et le nœud lui-même. Il y a aussi le serveur mais ce n'est pas un acteur, et nous y travaillerons plus tard.

Logique métier

L'application doit faire des calculs mathématiques. On peut définir un Processoracteur simple pour gérer toutes les tâches de calcul. Chaque calcul que nous prenons en charge peut être implémenté dans un acteur spécifique, qui sera un enfant de celui- Processorci. De cette manière, l'application est modulaire et plus facile à étendre et à maintenir. À l'heure actuelle, le seul enfant de Processorsera l' ProcessorFibonacciacteur. Je suppose que vous pouvez deviner quelle est sa tâche. Cela devrait suffire pour commencer.

Gestion de cluster

Pour gérer le cluster, nous avons besoin d'un fichier ClusterManager. Cela semble simple, non? Cet acteur gère tout ce qui concerne le cluster, comme renvoyer ses membres quand on le lui demande. Il serait utile de consigner ce qui se passe à l'intérieur du cluster, nous définissons donc un ClusterListeneracteur. Il s'agit d'un enfant de ClusterManager, et s'abonne aux événements de cluster en les journalisant.

Nœud

L' Nodeacteur est la racine de notre hiérarchie. C'est le point d'entrée de notre système qui communique avec l'API. Le Processoret le ClusterManagersont ses enfants, avec l' ProcessorRouteracteur. Il s'agit de l'équilibreur de charge du système, répartissant la charge entre les Processors. Nous allons le configurer comme un routeur compatible avec les clusters, afin que chacun ProcessorRouterpuisse envoyer des messages aux Processors sur chaque nœud.

Implémentation des acteurs

Il est temps de mettre en œuvre nos acteurs! Nous mettons tout d'abord en œuvre les acteurs liés à la logique métier du système. On passe ensuite aux acteurs pour la gestion du cluster et à l'acteur racine ( Node) à la fin.

ProcesseurFibonacci

Cet acteur exécute le calcul du nombre de Fibonacci. Il reçoit un Computemessage contenant le nombre à calculer et la référence de l'acteur auquel répondre. La référence est importante, car il peut y avoir différents acteurs demandeurs. N'oubliez pas que nous travaillons dans un environnement distribué!

Une fois le Computemessage reçu, la fibonaccifonction calcule le résultat. Nous l'enveloppons dans un ProcessorResponseobjet pour fournir des informations sur le nœud qui a exécuté le calcul. Cela sera utile plus tard pour voir la politique de tourniquet en action.

Le résultat est ensuite envoyé à l'acteur auquel nous devons répondre. Peasy facile.

object ProcessorFibonacci { sealed trait ProcessorFibonacciMessage case class Compute(n: Int, replyTo: ActorRef) extends ProcessorFibonacciMessage def props(nodeId: String) = Props(new ProcessorFibonacci(nodeId)) def fibonacci(x: Int): BigInt = { @tailrec def fibHelper(x: Int, prev: BigInt = 0, next: BigInt = 1): BigInt = x match { case 0 => prev case 1 => next case _ => fibHelper(x - 1, next, next + prev) } fibHelper(x) } } class ProcessorFibonacci(nodeId: String) extends Actor { import ProcessorFibonacci._ override def receive: Receive = { case Compute(value, replyTo) => { replyTo ! ProcessorResponse(nodeId, fibonacci(value)) } } }

Processeur

L' Processoracteur gère les sous-processeurs spécifiques, comme celui de Fibonacci. Il doit instancier les sous-processeurs et leur transmettre les requêtes. En ce moment , nous avons seulement un sous-processeur, de sorte que le Processorreçoit une sorte de message: ComputeFibonacci. Ce message contient le nombre de Fibonacci à calculer. Une fois reçu, le nombre à calculer est envoyé à a FibonacciProcessor, avec la référence du sender().

object Processor { sealed trait ProcessorMessage case class ComputeFibonacci(n: Int) extends ProcessorMessage def props(nodeId: String) = Props(new Processor(nodeId)) } class Processor(nodeId: String) extends Actor { import Processor._ val fibonacciProcessor: ActorRef = context.actorOf(ProcessorFibonacci.props(nodeId), "fibonacci") override def receive: Receive = { case ComputeFibonacci(value) => { val replyTo = sender() fibonacciProcessor ! Compute(value, replyTo) } } }

ClusterListener

Nous aimerions enregistrer des informations utiles sur ce qui se passe dans le cluster. Cela pourrait nous aider à déboguer le système si nécessaire. C'est le but de l' ClusterListeneracteur. Avant de commencer, il s'abonne aux messages d'événement du cluster. L'acteur réagit à des messages tels que MemberUp, UnreachableMember, ou MemberRemoved, la journalisation de l'événement correspondant. Lorsqu'il ClusterListenerest arrêté, il se désabonne des événements du cluster.

object ClusterListener { def props(nodeId: String, cluster: Cluster) = Props(new ClusterListener(nodeId, cluster)) } class ClusterListener(nodeId: String, cluster: Cluster) extends Actor with ActorLogging { override def preStart(): Unit = { cluster.subscribe(self, initialStateMode = InitialStateAsEvents, classOf[MemberEvent], classOf[UnreachableMember]) } override def postStop(): Unit = cluster.unsubscribe(self) def receive = { case MemberUp(member) => log.info("Node {} - Member is Up: {}", nodeId, member.address) case UnreachableMember(member) => log.info(s"Node {} - Member detected as unreachable: {}", nodeId, member) case MemberRemoved(member, previousStatus) => log.info(s"Node {} - Member is Removed: {} after {}", nodeId, member.address, previousStatus) case _: MemberEvent => // ignore } }

ClusterManager

L'acteur responsable de la gestion du cluster est ClusterManager. Il crée l' ClusterListeneracteur et fournit la liste des membres du cluster sur demande. Il pourrait être étendu pour ajouter plus de fonctionnalités, mais pour le moment, cela suffit.

object ClusterManager { sealed trait ClusterMessage case object GetMembers extends ClusterMessage def props(nodeId: String) = Props(new ClusterManager(nodeId)) } class ClusterManager(nodeId: String) extends Actor with ActorLogging { val cluster: Cluster = Cluster(context.system) val listener: ActorRef = context.actorOf(ClusterListener.props(nodeId, cluster), "clusterListener") override def receive: Receive = { case GetMembers => { sender() ! cluster.state.members.filter(_.status == MemberStatus.up) .map(_.address.toString) .toList } } }

ProcesseurRouter

The load-balancing among processors is handled by the ProcessorRouter. It is created by the Node actor, but this time all the required information are provided in the configuration of the system.

class Node(nodeId: String) extends Actor { //... val processorRouter: ActorRef = context.actorOf(FromConfig.props(Props.empty), "processorRouter") //... }

Let’s analyse the relevant part in the application.conf file.

akka { actor { ... deployment { /node/processorRouter { router = round-robin-group routees.paths = ["/user/node/processor"] cluster { enabled = on allow-local-routees = on } } } } ... }

The first thing is to specify the path to the router actor, that is /node/processorRouter. Inside that property we can configure the behaviour of the router:

  • router: this is the policy for the load balancing of messages. I chose the round-robin-group, but there are many others.
  • routees.paths: these are the paths to the actors that will receive the messages handled by the router. We are saying: “When you receive a message, look for the actors corresponding to these paths. Choose one according to the policy and forward the message to it.” Since we are using Cluster Aware Routers, the routees can be on any node of the cluster.
  • cluster.enabled: are we operating in a cluster? The answer is on, of course!
  • cluster.allow-local-routees: here we are allowing the router to choose a routee in its node.

Using this configuration we can create a router to load balance the work among our processors.

Node

The root of our actor hierarchy is the Node. It creates the children actors — ClusterManager, Processor, and ProcessorRouter — and forwards the messages to the right one. Nothing complex here.

object Node { sealed trait NodeMessage case class GetFibonacci(n: Int) case object GetClusterMembers def props(nodeId: String) = Props(new Node(nodeId)) } class Node(nodeId: String) extends Actor { val processor: ActorRef = context.actorOf(Processor.props(nodeId), "processor") val processorRouter: ActorRef = context.actorOf(FromConfig.props(Props.empty), "processorRouter") val clusterManager: ActorRef = context.actorOf(ClusterManager.props(nodeId), "clusterManager") override def receive: Receive = { case GetClusterMembers => clusterManager forward GetMembers case GetFibonacci(value) => processorRouter forward ComputeFibonacci(value) } }

Server and API

Every node of our cluster runs a server able to receive requests. The Server creates our actor system and is configured through the application.conf file.

object Server extends App with NodeRoutes { implicit val system: ActorSystem = ActorSystem("cluster-playground") implicit val materializer: ActorMaterializer = ActorMaterializer() val config: Config = ConfigFactory.load() val address = config.getString("http.ip") val port = config.getInt("http.port") val nodeId = config.getString("clustering.ip") val node: ActorRef = system.actorOf(Node.props(nodeId), "node") lazy val routes: Route = healthRoute ~ statusRoutes ~ processRoutes Http().bindAndHandle(routes, address, port) println(s"Node $nodeId is listening at //$address:$port") Await.result(system.whenTerminated, Duration.Inf) }

Akka HTTP powers the server itself and the REST API, exposing three simple endpoints. These endpoints are defined in the NodeRoutes trait.

The first one is /health, to check the health of a node. It responds with a 200 OK if the node is up and running

lazy val healthRoute: Route = pathPrefix("health") { concat( pathEnd { concat( get { complete(StatusCodes.OK) } ) } ) }

The /status/members endpoint responds with the current active members of the cluster.

lazy val statusRoutes: Route = pathPrefix("status") { concat( pathPrefix("members") { concat( pathEnd { concat( get { val membersFuture: Future[List[String]] = (node ? GetClusterMembers).mapTo[List[String]] onSuccess(membersFuture) { members => complete(StatusCodes.OK, members) } } ) } ) } ) }

The last (but not the least) is the /process/fibonacci/n endpoint, used to request the Fibonacci number of n.

lazy val processRoutes: Route = pathPrefix("process") { concat( pathPrefix("fibonacci") { concat( path(IntNumber) { n => pathEnd { concat( get { val processFuture: Future[ProcessorResponse] = (node ? GetFibonacci(n)).mapTo[ProcessorResponse] onSuccess(processFuture) { response => complete(StatusCodes.OK, response) } } ) } } ) } ) }

It responds with a ProcessorResponse containing the result, along with the id of the node where the computation took place.

Cluster Configuration

Once we have all our actors, we need to configure the system to run as a cluster! The application.conf file is where the magic takes place. I’m going to split it in pieces to present it better, but you can find the complete file here.

Let’s start defining some useful variables.

clustering { ip = "127.0.0.1" ip = ${?CLUSTER_IP} port = 2552 port = ${?CLUSTER_PORT} seed-ip = "127.0.0.1" seed-ip = ${?CLUSTER_SEED_IP} seed-port = 2552 seed-port = ${?CLUSTER_SEED_PORT} cluster.name = "cluster-playground" }

Here we are simply defining the ip and port of the nodes and the seed, as well as the cluster name. We set a default value, then we override it if a new one is specified. The configuration of the cluster is the following.

akka { actor { provider = "cluster" ... /* router configuration */ ... } remote { log-remote-lifecycle-events = on netty.tcp { hostname = ${clustering.ip} port = ${clustering.port} } } cluster { seed-nodes = [ "akka.tcp://"${clustering.cluster.name}"@"${clustering.seed-ip}":"${clustering.seed-port} ] auto-down-unreachable-after = 10s } } ... /* server vars */ ... /* cluster vars */ }

Akka Cluster is build on top of Akka Remoting, so we need to configure it properly. First of all, we specify that we are going to use Akka Cluster saying that provider = "cluster". Then we bind cluster.ip and cluster.port to the hostname and port of the netty web framework.

The cluster requires some seed nodes as its entry points. We set them in the seed-nodes array, in the format akka.tcp://"{clustering.cluster.name}"@"{clustering.seed-ip}":”${clustering.seed-port}”. Right now we have one seed node, but we may add more later.

The auto-down-unreachable-after property sets a member as down after it is unreachable for a period of time. This should be used only during development, as explained in the official documentation.

Ok, the cluster is configured, we can move to the next step: Dockerization and deployment!

Dockerization and deployment

To create the Docker container of our node we can use sbt-native-packager. Its installation is easy: add addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.3.15") to the plugin.sbt file in the project/ folder. This amazing tool has a plugin for the creation of Docker containers. it allows us to configure the properties of our Dockerfile in the build.sbt file.

// other build.sbt properties enablePlugins(JavaAppPackaging) enablePlugins(DockerPlugin) enablePlugins(AshScriptPlugin) mainClass in Compile := Some("com.elleflorio.cluster.playground.Server") dockerBaseImage := "java:8-jre-alpine" version in Docker := "latest" dockerExposedPorts := Seq(8000) dockerRepository := Some("elleflorio")

Once we have setup the plugin, we can create the docker image running the command sbt docker:publishLocal. Run the command and taste the magic… ?

We have the Docker image of our node, now we need to deploy it and check that everything works fine. The easiest way is to create a docker-compose file that will spawn a seed and a couple of other nodes.

version: '3.5' networks: cluster-network: services: seed: networks: - cluster-network image: elleflorio/akka-cluster-playground ports: - '2552:2552' - '8000:8000' environment: SERVER_IP: 0.0.0.0 CLUSTER_IP: seed CLUSTER_SEED_IP: seed node1: networks: - cluster-network image: elleflorio/akka-cluster-playground ports: - '8001:8000' environment: SERVER_IP: 0.0.0.0 CLUSTER_IP: node1 CLUSTER_PORT: 1600 CLUSTER_SEED_IP: seed CLUSTER_SEED_PORT: 2552 node2: networks: - cluster-network image: elleflorio/akka-cluster-playground ports: - '8002:8000' environment: SERVER_IP: 0.0.0.0 CLUSTER_IP: node2 CLUSTER_PORT: 1600 CLUSTER_SEED_IP: seed CLUSTER_SEED_PORT: 2552

I won’t spend time going through it, since it is quite simple.

Let’s run it!

Time to test our work! Once we run the docker-compose up command, we will have a cluster of three nodes up and running. The seed will respond to requests at port :8000, while node1 and node2 at port :8001 and :8002. Play a bit with the various endpoints. You will see that the requests for a Fibonacci number will be computed by a different node each time, following a round-robin policy. That’s good, we are proud of our work and can get out for a beer to celebrate! ?

Conclusion

We are done here! We learned a lot of things in these ten minutes:

  • What Akka Cluster is and what can do for us.
  • How to create a distributed application with it.
  • How to configure a Group Router for load-balancing in the cluster.
  • How to Dockerize everything and deploy it using docker-compose.

You can find the complete application in my GitHub repo. Feel free to contribute or play with it as you like! ?

See you! ?