zio-archive/zio-akka-cluster
{ "createdAt": "2019-07-06T06:43:51Z", "defaultBranch": "series/2.x", "description": "ZIO wrapper for Akka Cluster", "fullName": "zio-archive/zio-akka-cluster", "homepage": "https://zio.dev/zio-akka-cluster", "language": "Scala", "name": "zio-akka-cluster", "pushedAt": "2024-08-20T01:42:31Z", "stargazersCount": 163, "topics": [ "akka", "functional-programming", "scala", "zio" ], "updatedAt": "2025-03-18T21:49:02Z", "url": "https://github.com/zio-archive/zio-akka-cluster"}[//] !: # (This file was autogenerated using zio-sbt-website plugin via sbt generateReadme command.)
[//] !: # (So please do not edit it manually. Instead, change “docs/index.md” file or sbt setting keys)
[//] !: # (e.g. “readmeDocumentation” and “readmeSupport”.)
ZIO Akka Cluster
Section titled “ZIO Akka Cluster”The ZIO Akka Cluster library is a ZIO wrapper on Akka Cluster. We can use clustering features of the Akka toolkit without the need to use the actor model.
Introduction
Section titled “Introduction”This library provides us following features:
-
Akka Cluster — This feature contains two Akka Cluster Membership operations called
joinandleaveand also it has some methods to retrieve Cluster State and Cluster Events. -
Akka Distributed PubSub — Akka has a Distributed Publish Subscribe facility in the cluster. It helps us to send a message to all actors in the cluster that have registered and subscribed for a specific topic name without knowing their physical address or without knowing which node they are running on.
-
Akka Cluster Sharding — Cluster sharding is useful when we need to distribute actors across several nodes in the cluster and want to be able to interact with them using their logical identifier without having to care about their physical location in the cluster, which might also change over time. When we have many stateful entities in our application that together they consume more resources (e.g. memory) than fit on one machine, it is useful to use Akka Cluster Sharding to distribute our entities to multiple nodes.
Installation
Section titled “Installation”In order to use this library, we need to add the following line in our build.sbt file:
libraryDependencies += "dev.zio" %% "zio-akka-cluster" % "0.3.0"Example
Section titled “Example”In the following example, we are using all these three features. We have a distributed counter application that lives in the Akka Cluster using Akka Cluster Sharding feature. So the location of LiveUsers and TotalRequests entities in the cluster is transparent for us. We send the result of each entity to the Distributed PubSub. So every node in the cluster can subscribe and listen to those results. Also, we have created a fiber that is subscribed to the cluster events. All the new events will be logged to the console:
import akka.actor.ActorSystemimport com.typesafe.config.{ Config, ConfigFactory }import zio._import zio.akka.cluster.Clusterimport zio.akka.cluster.sharding.{ Entity, Sharding }
sealed trait Counter extends Product with Serializablecase object Inc extends Countercase object Dec extends Counter
case class CounterApp(port: String) { val config: Config = ConfigFactory.parseString(s""" |akka { | actor { | provider = "cluster" | } | remote { | netty.tcp { | hostname = "127.0.0.1" | port = $port | } | } | cluster { | seed-nodes = ["akka.tcp://CounterApp@127.0.0.1:2551"] | } |} |""".stripMargin)
val actorSystem: ZLayer[Any, Nothing, ActorSystem] = ZLayer.scoped( ZIO.acquireRelease(ZIO.succeed(ActorSystem("CounterApp", config)))(sys => ZIO.fromFuture(_ => sys.terminate()).either ) )
val counterApp: ZIO[Scope, Throwable, Unit] = (for { queue <- Cluster.clusterEvents(true) pubsub <- zio.akka.cluster.pubsub.PubSub.createPubSub[Int] liveUsersLogger <- pubsub .listen("LiveUsers") .flatMap( _.take.tap(u => Console.printLine(s"Number of live users: $u")).forever ) .fork totalRequestLogger <- pubsub .listen("TotalRequests") .flatMap( _.take.tap(r => Console.printLine(s"Total request until now: $r")).forever ) .fork
clusterEvents <- queue.take .tap(x => Console.printLine("New event in cluster: " + x.toString)) .forever .fork
counterEntityLogic = (c: Counter) => for { entity <- ZIO.environment[Entity[Int]] newState <- c match { case Inc => entity.get.state.updateAndGet(s => Some(s.getOrElse(0) + 1)) case Dec => entity.get.state.updateAndGet(s => Some(s.getOrElse(0) - 1)) } _ <- pubsub.publish(entity.get.id, newState.getOrElse(0)).orDie } yield () cluster <- Sharding.start("CounterEntity", counterEntityLogic)
_ <- cluster.send("LiveUsers", Inc) _ <- cluster.send("TotalRequests", Inc) _ <- cluster.send("LiveUsers", Dec) _ <- cluster.send("LiveUsers", Inc) _ <- cluster.send("LiveUsers", Inc) _ <- cluster.send("TotalRequests", Inc) _ <- cluster.send("TotalRequests", Inc)
_ <- clusterEvents.join zipPar liveUsersLogger.join zipPar totalRequestLogger.join } yield ()).provide(actorSystem)}Now, let’s create a cluster comprising two nodes:
object CounterApp1 extends ZIOAppDefault { override def run = CounterApp("2551").counterApp}
object CounterApp2 extends ZIOAppDefault { override def run = CounterApp("2552").counterApp}Documentation
Section titled “Documentation”Learn more on the ZIO Akka Cluster homepage!
Contributing
Section titled “Contributing”For the general guidelines, see ZIO contributor’s guide.
Code of Conduct
Section titled “Code of Conduct”See the Code of Conduct
Support
Section titled “Support”Come chat with us on [![Badge-Discord]][Link-Discord].
[Badge-Discord] !: https://img.shields.io/discord/629491597070827530?logo=discord “chat on discord” [Link-Discord] !: https://discord.gg/2ccFBr4 “Discord”
License
Section titled “License”[License]!(LICENSE)