Akka(11): 分布式运算:集群-均衡负载

软件设计 2017-07-03

  在上篇讨论里我们主要介绍了Akka-Cluster的基本原理。同时我们也确认了几个使用Akka-Cluster的重点:首先,Akka-Cluster集群构建与Actor编程没有直接的关联。集群构建是ActorSystem层面上的,可以是纯粹的配置和部署行为;分布式Actor程序编程实现了Actor消息地址的透明化,无须考虑目标运行环境是否分布式的,可以按正常的Actor编程模式进行。

  既然分布式的Actor编程无须特别针对集群环境,那么摆在我们面前的就是多个可以直接使用的运算环境(集群节点)了,现在我们的分布式编程方式应该主要聚焦在如何充分使用这些分布的运算环境,即:如何把程序合理分配到各集群节点以达到最优的运算输出效率。我们可以通过人为方式有目的向集群节点分配负载,或者通过某种算法来自动分配就像前面讨论过的Routing那样。

我们首先示范如何手工进行集群的负载分配:目的很简单:把不同的功能分配给不同的集群节点去运算。先按运算功能把集群节点分类:我们可以通过设定节点角色role来实现。然后需要一类节点(可能是多个节点)作为前端负责承接任务,然后根据任务类型来分配。负责具体运算的节点统称后台节点(backend nodes),负责接收和分配任务的节点统称前端节点(frontend nodes)。在编程过程中唯一需要考虑集群环境的部分就是前端节点需要知道处在所有后台节点上运算Actor的具体地址,即ActorRef。这点需要后台节点Node在加人集群时向前端负责分配任务的Actor报备自己的ActorRef。前端Actor是通过一个后台报备的ActorRef清单来分配任务的。假如我们用加减乘除模拟四项不同的运算功能,用手工形式把每一项功能部署到一个集群节点上,然后用一个前端节点按功能分配运算任务。下面是所有节点共享的消息类型:

package clusterloadbalance.messages

object Messages {
  sealed trait MathOps
  case class Add(x: Int, y: Int) extends MathOps
  case class Sub(x: Int, y: Int) extends MathOps
  case class Mul(x: Int, y: Int) extends MathOps
  case class Div(x: Int, y: Int) extends MathOps

  sealed trait ClusterMsg
  case class RegisterBackendActor(role: String) extends ClusterMsg

}

负责运算的后台定义如下:

package clusterloadbalance.backend

import akka.actor._
import clusterloadbalance.messages.Messages._

import scala.concurrent.duration._
import akka.cluster._
import akka.cluster.ClusterEvent._
import com.typesafe.config.ConfigFactory


object CalcFuctions {
  def propsFuncs = Props(new CalcFuctions)
  def propsSuper(role: String) = Props(new CalculatorSupervisor(role))
}

class CalcFuctions extends Actor {
  override def receive: Receive = {
    case Add(x,y) =>
      println(s"$x + $y carried out by ${self} with result=${x+y}")
    case Sub(x,y) =>
      println(s"$x - $y carried out by ${self} with result=${x - y}")
    case Mul(x,y) =>
      println(s"$x * $y carried out by ${self} with result=${x * y}")
    case Div(x,y) =>
        println(s"$x / $y carried out by ${self} with result=${x / y}")
  }

  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    println(s"Restarting calculator: ${reason.getMessage}")
    super.preRestart(reason, message)
  }
}

class CalculatorSupervisor(mathOps: String) extends Actor {
  def decider: PartialFunction[Throwable,SupervisorStrategy.Directive] = {
    case _: ArithmeticException => SupervisorStrategy.Resume
  }

  override def supervisorStrategy: SupervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds){
      decider.orElse(SupervisorStrategy.defaultDecider)
    }

  val calcActor = context.actorOf(CalcFuctions.propsFuncs,"calcFunction")
  val cluster = Cluster(context.system)
  override def preStart(): Unit = {
    cluster.subscribe(self,classOf[MemberUp])
    super.preStart()
  }

  override def postStop(): Unit =
    cluster.unsubscribe(self)
    super.postStop()

  override def receive: Receive = {
    case MemberUp(m) =>
      if (m.hasRole("frontend")) {
        context.actorSelection(RootActorPath(m.address)+"/user/frontend") !
          RegisterBackendActor(mathOps)
      }
    case msg@ _ => calcActor.forward(msg)
  }

}

object Calculator {
  def create(role: String): Unit = {   //create instances of backend Calculator
    val config = ConfigFactory.parseString("Backend.akka.cluster.roles = [\""+role+"\"]")
      .withFallback(ConfigFactory.load()).getConfig("Backend")
    val calcSystem = ActorSystem("calcClusterSystem",config)
    val calcRef = calcSystem.actorOf(CalcFuctions.propsSuper(role),"calculator")
  }
}

注意,这个运算Actor是带监管的,可以自动处理异常。

为了方便表达,我把所有功能都集中在CalcFunctions一个Actor里。在实际情况下应该分成四个不同的Actor,因为它们会被部署到不同的集群节点上。另外,增加了一个supervisor来监管CalcFunctions。这个supervisor也是分布在四个节点上分别监管本节点上的子级Actor。现在所有集群节点之间的功能交流都会通过这个supervisor来进行了,所以需要向前端登记ActorRef和具体负责的功能role。下面是前台程序定义:

package clusterloadbalance.frontend

import akka.actor._
import clusterloadbalance.messages.Messages._
import com.typesafe.config.ConfigFactory


object CalcRouter {
  def props = Props(new CalcRouter)
}

class CalcRouter extends Actor {
  var nodes: Map[String,ActorRef] = Map()
  override def receive: Receive = {
    case RegisterBackendActor(role) =>
      nodes += (role -> sender())
      context.watch(sender())
    case add: Add => routeCommand("adder", add)
    case sub: Sub => routeCommand("substractor",sub)
    case mul: Mul => routeCommand("multiplier",mul)
    case div: Div => routeCommand("divider",div)

    case Terminated(ref) =>    //remove from register
      nodes = nodes.filter { case (_,r) => r != ref}

  }
  def routeCommand(role: String, ops: MathOps): Unit = {
    nodes.get(role) match {
      case Some(ref) => ref ! ops
      case None =>
        println(s"$role not registered!")
    }
  }
}

object FrontEnd {
  private var router: ActorRef = _
  def create = {  //must load this seed-node before any backends
    val calcSystem = ActorSystem("calcClusterSystem",ConfigFactory.load().getConfig("Frontend"))
    router = calcSystem.actorOf(CalcRouter.props,"frontend")
  }
  def getRouter = router
}

只有一个前台负责功能分配。这是个seed-node,必须先启动,然后其它后台节点启动时才能进行后台登记。

我们用下面的代码来测试这个程序。由于Akka的集群节点是由不同的ActorSystem实例代表的,所以在这个例子里没有必要使用多层项目结构:

package clusterLoadBalance.demo
import clusterloadbalance.messages.Messages._
import clusterloadbalance.backend.Calculator
import clusterloadbalance.frontend.FrontEnd

object LoadBalancerDemo extends App {
  FrontEnd.create

  Calculator.create("adder")

  Calculator.create("substractor")

  Calculator.create("multiplier")

  Calculator.create("divider")

  Thread.sleep(2000)

  val router = FrontEnd.getRouter

  router ! Add(10,3)
  router ! Mul(3,7)
  router ! Div(8,2)
  router ! Sub(45, 3)
  router ! Div(8,0)

}

然后,resources/application.conf是这样定义的: 

Frontend {
  akka {
    actor {
      provider = "cluster"
    }
    remote {
      log-remote-lifecycle-events = off
      netty.tcp {
        hostname = "127.0.0.1"
        port = 2551
      }
    }

    cluster {
      roles = [frontend]
      seed-nodes = [
        "akka.tcp://[email protected]:2551"]

      auto-down-unreachable-after = 10s
    }
  }
}

Backend {
  akka{
    actor {
      provider = "cluster"
    }
    remote {
      log-remote-lifecycle-events = off
      netty.tcp {
        hostname = "127.0.0.1"
        port = 0
      }
    }

    cluster {
      roles = [backend]
      seed-nodes = [
        "akka.tcp://[email protected]:2551"]

      auto-down-unreachable-after = 10s
    }
  }
}

运算结果:

...
3 * 7 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#-1800460396] with result=21
10 + 3 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#1046049287] with result=13
8 / 2 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#-2008880936] with result=4
45 - 3 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#367851617] with result=42
[WARN] [06/30/2017 09:12:39.465] [calcClusterSystem-akka.actor.default-dispatcher-20] [akka://calcClusterSystem/user/calculator/calcFunction] / by zero

结果显示不同的功能在不同的节点运算,而且ArithmeticException并没有造成Restart,说明SupervisorStrategy发挥了作用。 

我们也可以用类似前面介绍的Routing方式来分配负载,原理还是通过前端的Router把任务分配到后台多个Routee上去运算。不同的是这些Routee可以在不同的集群节点上。原则上讲,具体任务分配是通过某种Routing算法自动安排到Routees上去的。但是,我们还是可以用ConsistentHashing-Router模式实现目标Routees的配对。上面例子的加减乘除操作类型可以成为这种Router模式的分类键(Key)。Router还是分Router-Group和Router-Pool两种:Router-Pool更自动化些,它的Routees是由Router构建及部署的,Router-Group的Router则是通过路径查找方式连接已经构建和部署好了的Routees的。如果我们希望有针对性的把任务分配到指定集群节点上的Routee,必须首先构建和部署Routees后再用Router-Group方式连接Routees。还是用上面的例子,使用ConsistentHashing-Router模式。首先调整一下消息类型:

package clusterrouter.messages
import akka.routing.ConsistentHashingRouter._
object Messages {
  class MathOps(hashKey: String) extends Serializable with ConsistentHashable {
    override def consistentHashKey: Any = hashKey
  }
  case class Add(x: Int, y: Int) extends MathOps("adder")
  case class Sub(x: Int, y: Int) extends MathOps("substractor")
  case class Mul(x: Int, y: Int) extends MathOps("multiplier")
  case class Div(x: Int, y: Int) extends MathOps("divider")

}

注意现在Router是在跨系统的集群环境下,消息类型必须实现Serializable。构建Calculator并且完成部署:

package clusterrouter.backend

import akka.actor._
import clusterrouter.messages.Messages._

import scala.concurrent.duration._

import com.typesafe.config.ConfigFactory


object CalcFuctions {
  def propsFuncs = Props(new CalcFuctions)
  def propsSuper = Props(new CalculatorSupervisor)
}

class CalcFuctions extends Actor {
  override def receive: Receive = {
    case Add(x,y) =>
      println(s"$x + $y carried out by ${self} with result=${x+y}")
    case Sub(x,y) =>
      println(s"$x - $y carried out by ${self} with result=${x - y}")
    case Mul(x,y) =>
      println(s"$x * $y carried out by ${self} with result=${x * y}")
    case Div(x,y) =>
        println(s"$x / $y carried out by ${self} with result=${x / y}")
  }

  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    println(s"Restarting calculator: ${reason.getMessage}")
    super.preRestart(reason, message)
  }
}

class CalculatorSupervisor extends Actor {
  def decider: PartialFunction[Throwable,SupervisorStrategy.Directive] = {
    case _: ArithmeticException => SupervisorStrategy.Resume
  }

  override def supervisorStrategy: SupervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds){
      decider.orElse(SupervisorStrategy.defaultDecider)
    }

  val calcActor = context.actorOf(CalcFuctions.propsFuncs,"calcFunction")

  override def receive: Receive = {
    case msg@ _ => calcActor.forward(msg)
  }

}


object Calculator {
  def create(port: Int): Unit = {   //create instances of backend Calculator
    val config = ConfigFactory.parseString("akka.cluster.roles = [backend]")
        .withFallback(ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port"))
      .withFallback(ConfigFactory.load())
    val calcSystem = ActorSystem("calcClusterSystem",config)
    val calcRef = calcSystem.actorOf(CalcFuctions.propsSuper,"calculator")
  }
}

用create函数构建后台节点后再在之上构建部署一个CalculatorSupervisor。

frontend定义如下:

package clusterrouter.frontend

import akka.actor._
import akka.routing.FromConfig
import com.typesafe.config.ConfigFactory


object CalcRouter {
  def props = Props(new CalcRouter)
}

class CalcRouter extends Actor {

  // This router is used both with lookup and deploy of routees. If you
  // have a router with only lookup of routees you can use Props.empty
  // instead of Props[CalculatorSupervisor].
  val calcRouter = context.actorOf(FromConfig.props(Props.empty),
    name = "calcRouter")

  override def receive: Receive = {
    case msg@ _ => calcRouter forward msg
  }
}

object FrontEnd {
  private var router: ActorRef = _
  def create = {  //must load this seed-node before any backends
    val calcSystem = ActorSystem("calcClusterSystem",ConfigFactory.load("hashing"))
    router = calcSystem.actorOf(CalcRouter.props,"frontend")
  }
  def getRouter = router
}

注意calcRouter构建,用了FromConfig.props(Props.empty),代表只进行Routees查找,无需部署。hashing.conf文件如下:

include "application"
akka.cluster.roles = [frontend]
akka.actor.deployment {
  /frontend/calcRouter {
    router = consistent-hashing-group
    routees.paths = ["/user/calculator"]
    cluster {
      enabled = on
      allow-local-routees = on
      use-role = backend
    }
  }
}

application.conf:

akka {
  actor {
    provider = cluster
  }
  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = "127.0.0.1"
      port = 0
    }
  }

  cluster {
    seed-nodes = [
      "akka.tcp://[email protected]:2551"]

    # auto downing is NOT safe for production deployments.
    # you may want to use it during development, read more about it in the docs.
    auto-down-unreachable-after = 10s
  }
}

用下面的代码进行测试:

package clusterLoadBalance.demo
import clusterrouter.messages.Messages._
import clusterrouter.backend.Calculator
import clusterrouter.frontend.FrontEnd

object LoadBalancerDemo extends App {

  Calculator.create(2551)   //seed-node
  Calculator.create(0)      //backend node
  Calculator.create(0)
  Calculator.create(0)
  Calculator.create(0)
  Calculator.create(0)

  Thread.sleep(2000)

  FrontEnd.create


  Thread.sleep(2000)

  val router = FrontEnd.getRouter

  router ! Add(10,3)
  router ! Mul(3,7)
  router ! Div(8,2)
  router ! Sub(45, 3)
  router ! Div(8,0)

  Thread.sleep(2000)

  router ! Add(10,3)
  router ! Mul(3,7)
  router ! Div(8,2)
  router ! Sub(45, 3)
  router ! Div(8,0)

}

重复两批次运算后显示: 

8 / 2 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#1669565820] with result=4
3 * 7 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#-1532934391] with result=21
10 + 3 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#904088130] with result=13
45 - 3 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#-1532934391] with result=42
[WARN] [07/02/2017 17:35:03.381] [calcClusterSystem-akka.actor.default-dispatcher-20] [akka://calcClusterSystem/user/calculator/calcFunction] / by zero
[INFO] [07/02/2017 17:35:05.076] [calcClusterSystem-akka.actor.default-dispatcher-3] [akka.cluster.Cluster(akka://calcClusterSystem)] Cluster Node [akka.tcp://[email protected]:2551] - Leader is moving node [akka.tcp://[email protected]:51304] to [Up]
10 + 3 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#904088130] with result=13
3 * 7 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#-1532934391] with result=21
8 / 2 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#1669565820] with result=4
45 - 3 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#-1532934391] with result=42
[WARN] [07/02/2017 17:35:05.374] [calcClusterSystem-akka.actor.default-dispatcher-17] [akka://calcClusterSystem/user/calculator/calcFunction] / by zero

相同功能的运算被分配到了相同的节点上,这点可以用Actor的地址证明。

Akka-Cluster提供的Adaptive-Group是一种比较智能化的自动Routing模式,它是通过对各集群节点的具体负载情况来分配任务的。用户只需要定义adaptive-group的配置,按情况增减集群节点以及在不同的集群节点上构建部署Routee都是自动的。Adaptive-Group-Router可以在配置文件中设置:

include "application"

akka.cluster.min-nr-of-members = 3

akka.cluster.role {
  frontend.min-nr-of-members = 1
  backend.min-nr-of-members = 2
}

akka.actor.deployment {
  /frontend/adaptive/calcRouter {
    # Router type provided by metrics extension.
    router = cluster-metrics-adaptive-group
    # Router parameter specific for metrics extension.
    # metrics-selector = heap
    # metrics-selector = load
    # metrics-selector = cpu
    metrics-selector = mix
    #
    nr-of-instances = 100
    routees.paths = ["/user/calculator"]
    cluster {
      enabled = on
      use-role = backend
      allow-local-routees = off
    }
  }
}

后台的运算Actor与上面人为分配方式一样不变。前端Router的构建也没变,只不过在构建时目标是cluster-metrics-adaptive-group。这个在测试代码中可以看到: 

package clusterLoadBalance.demo
import clusterrouter.messages.Messages._
import clusterrouter.backend.Calculator
import clusterrouter.frontend.CalcRouter
import com.typesafe.config.ConfigFactory
import scala.util.Random
import scala.concurrent.duration._
import akka.actor._
import akka.cluster._


class RouterRunner extends Actor {
  val jobs = List(Add,Sub,Mul,Div)
  import context.dispatcher

  val calcRouter = context.actorOf(CalcRouter.props,"adaptive")
  context.system.scheduler.schedule(3.seconds, 3.seconds, self, "DoRandomMath")

  override def receive: Receive = {
    case  _ => calcRouter ! anyMathJob
  }
  def anyMathJob: MathOps =
    jobs(Random.nextInt(4))(Random.nextInt(100), Random.nextInt(100))
}

object AdaptiveRouterDemo extends App {

  Calculator.create(2551)   //seed-node
  Calculator.create(0)      //backend node

  Thread.sleep(2000)


  val config = ConfigFactory.parseString("akka.cluster.roles = [frontend]").
    withFallback(ConfigFactory.load("adaptive"))

  val calcSystem = ActorSystem("calcClusterSystem",config)

  //#registerOnUp
  Cluster(calcSystem) registerOnMemberUp {
    val _ = calcSystem.actorOf(Props[RouterRunner],"frontend")
  }
  //#registerOnUp

  //val _ = calcSystem.actorOf(Props[RouterRunner],"frontend")

}

测试结果显示如下:

...
12 * 9 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#-1771471734] with result=108
27 + 74 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#-1512057504] with result=101
78 + 37 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#-1512057504] with result=115
77 * 33 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#-1771471734] with result=2541
32 - 19 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#-1512057504] with result=13
65 * 46 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#-1771471734] with result=2990
68 - 99 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#-1771471734] with result=-31
97 + 18 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#-1771471734] with result=115
...

运算已经被部署到不同节点去进行了。

 下面是这次讨论示范的源代码:

例1:

build.sbt:

name := "cluster-load-balance"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies ++= {
  val akkaVersion = "2.5.3"
  Seq(
    "com.typesafe.akka"       %%  "akka-actor"   % akkaVersion,
    "com.typesafe.akka"       %%  "akka-cluster"   % akkaVersion
  )

}

application.conf:

Frontend {
  akka {
    actor {
      provider = "cluster"
    }
    remote {
      log-remote-lifecycle-events = off
      netty.tcp {
        hostname = "127.0.0.1"
        port = 2551
      }
    }

    cluster {
      roles = [frontend]
      seed-nodes = [
        "akka.tcp://[email protected]:2551"]

      auto-down-unreachable-after = 10s
    }
  }
}

Backend {
  akka{
    actor {
      provider = "cluster"
    }
    remote {
      log-remote-lifecycle-events = off
      netty.tcp {
        hostname = "127.0.0.1"
        port = 0
      }
    }

    cluster {
      roles = [backend]
      seed-nodes = [
        "akka.tcp://[email protected]:2551"]

      auto-down-unreachable-after = 10s
    }
  }
}

messages/Messages.scala

package clusterloadbalance.messages

object Messages {
  sealed trait MathOps
  case class Add(x: Int, y: Int) extends MathOps
  case class Sub(x: Int, y: Int) extends MathOps
  case class Mul(x: Int, y: Int) extends MathOps
  case class Div(x: Int, y: Int) extends MathOps

  sealed trait ClusterMsg
  case class RegisterBackendActor(role: String) extends ClusterMsg

}

backend/Calculator.scala:

package clusterloadbalance.backend

import akka.actor._
import clusterloadbalance.messages.Messages._

import scala.concurrent.duration._
import akka.cluster._
import akka.cluster.ClusterEvent._
import com.typesafe.config.ConfigFactory


object CalcFuctions {
  def propsFuncs = Props(new CalcFuctions)
  def propsSuper(role: String) = Props(new CalculatorSupervisor(role))
}

class CalcFuctions extends Actor {
  override def receive: Receive = {
    case Add(x,y) =>
      println(s"$x + $y carried out by ${self} with result=${x+y}")
    case Sub(x,y) =>
      println(s"$x - $y carried out by ${self} with result=${x - y}")
    case Mul(x,y) =>
      println(s"$x * $y carried out by ${self} with result=${x * y}")
    case Div(x,y) =>
        println(s"$x / $y carried out by ${self} with result=${x / y}")
  }

  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    println(s"Restarting calculator: ${reason.getMessage}")
    super.preRestart(reason, message)
  }
}

class CalculatorSupervisor(mathOps: String) extends Actor {
  def decider: PartialFunction[Throwable,SupervisorStrategy.Directive] = {
    case _: ArithmeticException => SupervisorStrategy.Resume
  }

  override def supervisorStrategy: SupervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds){
      decider.orElse(SupervisorStrategy.defaultDecider)
    }

  val calcActor = context.actorOf(CalcFuctions.propsFuncs,"calcFunction")
  val cluster = Cluster(context.system)
  override def preStart(): Unit = {
    cluster.subscribe(self,classOf[MemberUp])
    super.preStart()
  }

  override def postStop(): Unit =
    cluster.unsubscribe(self)
    super.postStop()

  override def receive: Receive = {
    case MemberUp(m) =>
      if (m.hasRole("frontend")) {
        context.actorSelection(RootActorPath(m.address)+"/user/frontend") !
          RegisterBackendActor(mathOps)
      }
    case msg@ _ => calcActor.forward(msg)
  }

}


object Calculator {
  def create(role: String): Unit = {   //create instances of backend Calculator
    val config = ConfigFactory.parseString("Backend.akka.cluster.roles = ["+role+"]")
      .withFallback(ConfigFactory.load()).getConfig("Backend")
    val calcSystem = ActorSystem("calcClusterSystem",config)
    val calcRef = calcSystem.actorOf(CalcFuctions.propsSuper(role),"calculator")
  }
}

frontend/FrontEnd.scala:

package clusterloadbalance.frontend

import akka.actor._
import clusterloadbalance.messages.Messages._
import com.typesafe.config.ConfigFactory


object CalcRouter {
  def props = Props(new CalcRouter)
}

class CalcRouter extends Actor {
  var nodes: Map[String,ActorRef] = Map()
  override def receive: Receive = {
    case RegisterBackendActor(role) =>
      nodes += (role -> sender())
      context.watch(sender())
    case add: Add => routeCommand("adder", add)
    case sub: Sub => routeCommand("substractor",sub)
    case mul: Mul => routeCommand("multiplier",mul)
    case div: Div => routeCommand("divider",div)

    case Terminated(ref) =>    //remove from register
      nodes = nodes.filter { case (_,r) => r != ref}

  }
  def routeCommand(role: String, ops: MathOps): Unit = {
    nodes.get(role) match {
      case Some(ref) => ref ! ops
      case None =>
        println(s"$role not registered!")
    }
  }
}

object FrontEnd {
  private var router: ActorRef = _
  def create = {  //must load this seed-node before any backends
    val calcSystem = ActorSystem("calcClusterSystem",ConfigFactory.load().getConfig("Frontend"))
    router = calcSystem.actorOf(CalcRouter.props,"frontend")
  }
  def getRouter = router
}

loadBalanceDemo.scala

package clusterLoadBalance.demo
import clusterloadbalance.messages.Messages._
import clusterloadbalance.backend.Calculator
import clusterloadbalance.frontend.FrontEnd

object LoadBalancerDemo extends App {
  FrontEnd.create

  Calculator.create("adder")

  Calculator.create("substractor")

  Calculator.create("multiplier")

  Calculator.create("divider")

  Thread.sleep(2000)

  val router = FrontEnd.getRouter

  router ! Add(10,3)
  router ! Mul(3,7)
  router ! Div(8,2)
  router ! Sub(45, 3)
  router ! Div(8,0)
  
}

例2:

build.sbt:

name := "cluster-router-demo"

version := "1.0"

scalaVersion := "2.11.9"

sbtVersion := "0.13.7"

resolvers += "Akka Snapshot Repository" at "http://repo.akka.io/snapshots/"

val akkaVersion = "2.5.3"

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-actor" % akkaVersion,
  "com.typesafe.akka" %% "akka-remote" % akkaVersion,
  "com.typesafe.akka" %% "akka-cluster" % akkaVersion,
  "com.typesafe.akka" %% "akka-cluster-metrics" % akkaVersion,
  "com.typesafe.akka" %% "akka-cluster-tools" % akkaVersion,
  "com.typesafe.akka" %% "akka-multi-node-testkit" % akkaVersion)

application.conf+hashing.conf+adaptive.conf:

akka {
  actor {
    provider = cluster
  }
  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = "127.0.0.1"
      port = 0
    }
  }

  cluster {
    seed-nodes = [
      "akka.tcp://[email protected]:2551"]

    # auto downing is NOT safe for production deployments.
    # you may want to use it during development, read more about it in the docs.
    auto-down-unreachable-after = 10s
  }
}

include "application"
akka.cluster.roles = [frontend]
akka.actor.deployment {
  /frontend/calcRouter {
    router = consistent-hashing-group
    routees.paths = ["/user/calculator"]
    cluster {
      enabled = on
      allow-local-routees = on
      use-role = backend
    }
  }
}

include "application"

akka.cluster.min-nr-of-members = 3

akka.cluster.role {
  frontend.min-nr-of-members = 1
  backend.min-nr-of-members = 2
}

akka.actor.warn-about-java-serializer-usage = off
akka.log-dead-letters-during-shutdown = off
akka.log-dead-letters = off

akka.actor.deployment {
  /frontend/adaptive/calcRouter {
    # Router type provided by metrics extension.
    router = cluster-metrics-adaptive-group
    # Router parameter specific for metrics extension.
    # metrics-selector = heap
    # metrics-selector = load
    # metrics-selector = cpu
    metrics-selector = mix
    #
    nr-of-instances = 100
    routees.paths = ["/user/calculator"]
    cluster {
      enabled = on
      use-role = backend
      allow-local-routees = off
    }
  }
}

messages/Messages.scala:

package clusterrouter.messages
import akka.routing.ConsistentHashingRouter._
object Messages {
  class MathOps(hashKey: String) extends Serializable with ConsistentHashable {
    override def consistentHashKey: Any = hashKey
  }
  case class Add(x: Int, y: Int) extends MathOps("adder")
  case class Sub(x: Int, y: Int) extends MathOps("substractor")
  case class Mul(x: Int, y: Int) extends MathOps("multiplier")
  case class Div(x: Int, y: Int) extends MathOps("divider")

}

backend/Calculator.scala:

package clusterrouter.backend

import akka.actor._
import clusterrouter.messages.Messages._

import scala.concurrent.duration._

import com.typesafe.config.ConfigFactory


object CalcFuctions {
  def propsFuncs = Props(new CalcFuctions)
  def propsSuper = Props(new CalculatorSupervisor)
}

class CalcFuctions extends Actor {
  override def receive: Receive = {
    case Add(x,y) =>
      println(s"$x + $y carried out by ${self} with result=${x+y}")
    case Sub(x,y) =>
      println(s"$x - $y carried out by ${self} with result=${x - y}")
    case Mul(x,y) =>
      println(s"$x * $y carried out by ${self} with result=${x * y}")
    case Div(x,y) =>
        println(s"$x / $y carried out by ${self} with result=${x / y}")
  }

  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    println(s"Restarting calculator: ${reason.getMessage}")
    super.preRestart(reason, message)
  }
}

class CalculatorSupervisor extends Actor {
  def decider: PartialFunction[Throwable,SupervisorStrategy.Directive] = {
    case _: ArithmeticException => SupervisorStrategy.Resume
  }

  override def supervisorStrategy: SupervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds){
      decider.orElse(SupervisorStrategy.defaultDecider)
    }

  val calcActor = context.actorOf(CalcFuctions.propsFuncs,"calcFunction")

  override def receive: Receive = {
    case msg@ _ => calcActor.forward(msg)
  }

}


object Calculator {
  def create(port: Int): Unit = {   //create instances of backend Calculator
    val config = ConfigFactory.parseString("akka.cluster.roles = [backend]")
        .withFallback(ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port"))
      .withFallback(ConfigFactory.load("adaptive"))
    val calcSystem = ActorSystem("calcClusterSystem",config)
    val calcRef = calcSystem.actorOf(CalcFuctions.propsSuper,"calculator")
  }
}

frontend/FrontEnd.scala:

package clusterrouter.frontend

import akka.actor._
import akka.routing.FromConfig
import com.typesafe.config.ConfigFactory

object CalcRouter {
  def props = Props(new CalcRouter)
}

class CalcRouter extends Actor {

  // This router is used both with lookup and deploy of routees. If you
  // have a router with only lookup of routees you can use Props.empty
  // instead of Props[CalculatorSupervisor].
  val calcRouter = context.actorOf(FromConfig.props(Props.empty),
    name = "calcRouter")

  override def receive: Receive = {
    case msg@ _ => calcRouter forward msg
  }
}

object FrontEnd {
  private var router: ActorRef = _
  def create = {  //must load this seed-node before any backends
    val calcSystem = ActorSystem("calcClusterSystem",ConfigFactory.load("hashing"))
    router = calcSystem.actorOf(CalcRouter.props,"frontend")
  }
  def getRouter = router
}

loadBalancerDemo.scala:

package clusterrouter.demo
import clusterrouter.messages.Messages._
import clusterrouter.backend.Calculator
import clusterrouter.frontend.FrontEnd

object LoadBalancerDemo extends App {

  Calculator.create(2551)   //seed-node
  Calculator.create(0)      //backend node
  Calculator.create(0)
  Calculator.create(0)
  Calculator.create(0)
  Calculator.create(0)

  Thread.sleep(2000)

  FrontEnd.create


  Thread.sleep(2000)

  val router = FrontEnd.getRouter

  router ! Add(10,3)
  router ! Mul(3,7)
  router ! Div(8,2)
  router ! Sub(45, 3)
  router ! Div(8,0)

  Thread.sleep(2000)

  router ! Add(10,3)
  router ! Mul(3,7)
  router ! Div(8,2)
  router ! Sub(45, 3)
  router ! Div(8,0)

}

adaptiveRouterDemo.scala:

package clusterrouter.demo
import clusterrouter.messages.Messages._
import clusterrouter.backend.Calculator
import clusterrouter.frontend.CalcRouter
import com.typesafe.config.ConfigFactory
import scala.util.Random
import scala.concurrent.duration._
import akka.actor._
import akka.cluster._


class RouterRunner extends Actor {
  val jobs = List(Add,Sub,Mul,Div)
  import context.dispatcher

  val calcRouter = context.actorOf(CalcRouter.props,"adaptive")
  context.system.scheduler.schedule(3.seconds, 3.seconds, self, "DoRandomMath")

  override def receive: Receive = {
    case  _ => calcRouter ! anyMathJob
  }
  def anyMathJob: MathOps =
    jobs(Random.nextInt(4))(Random.nextInt(100), Random.nextInt(100))
}

object AdaptiveRouterDemo extends App {

  Calculator.create(2551)   //seed-node
  Calculator.create(0)      //backend node

  Thread.sleep(2000)


  val config = ConfigFactory.parseString("akka.cluster.roles = [frontend]").
    withFallback(ConfigFactory.load("adaptive"))

  val calcSystem = ActorSystem("calcClusterSystem",config)

  //#registerOnUp
  Cluster(calcSystem) registerOnMemberUp {
    val _ = calcSystem.actorOf(Props[RouterRunner],"frontend")
  }
  //#registerOnUp

  //val _ = calcSystem.actorOf(Props[RouterRunner],"frontend")

}

相关推荐