jiexray 2014-11-04
Akka 是一个用 Scala 编写的库,用于简化编写容错的、高可伸缩性的 Java 和 Scala 的 Actor 模型应用
1.基于Actor的模型,我们大家都知道,在多线程领域,最困难的莫过于并发了,我们一般情况下处理并发会涉及到共享变量的问题,这样很容易造成死锁,模型如下:
而Akka是基于消息的Actor模型,Actor是具有属性和操作的,所有的操作都在Actor自身来完成,所以不存共享数据的问题
这里Actor A和B之间的通信是以消息的方式进行通信,每一个操作都是在Actor的内部完成的,所以不存在资源竞争的问题。
另外,Actor是为分布式的应用服务了
下面看一个例子
首先编写服务端代码:
import akka.actor.ActorSystem import akka.actor.Actor import akka.actor.Props import com.typesafe.config.ConfigFactory import akka.actor.ActorRef object HelloRemote extends App { var akkaConf = ConfigFactory.parseString( s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport" |akka.remote.netty.tcp.hostname = "127.0.0.1" |akka.remote.netty.tcp.port = 5160 """.stripMargin) val actorSystem = ActorSystem("remotetest", akkaConf) val remoteActor = actorSystem.actorOf(Props[RemoteActor], name = "remoteActor") println(remoteActor) remoteActor.tell("START", ActorRef.noSender) } class RemoteActor extends Actor { def receive = { case "START" => println("RemoteActor started ") case msg: String => println(s"RemoteActor received message '$msg'") sender ! "reply from the RemoteActor" } }
这里RomoteActor是Actor的定义,所有的Actor都要继承Actor
receive 方法为需要自己实现的方法,这个方法内部定义了接收到消息后该做如何的处理
(这里是使用Scala语言实现的,Scala语言和java语言的不同点之一,就是case语句执行后不会继续往下执行)
这里我们定义了两类消息:
消息一:字符串等于START,这样是为了方便启动这个Actor
消息二:字符串类型消息,这样就接收所有的消息,然后进行回复。
sender是发送者actor的句柄,这样就能像发送者返回消息
我们启动这个remoteActor:
可以看到这个Actor已经启动,其中/user/remoteActor是这个actor的访问路径
下面我们编写一个本地端Actor
import akka.actor.Actor import akka.actor.ActorSystem import akka.actor.Props import com.typesafe.config.ConfigFactory import java.io.File import scala.concurrent.Await import scala.concurrent.duration._ import akka.actor.ActorRef object Local extends App { var akkaConf = ConfigFactory.parseString( s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport" |akka.remote.netty.tcp.hostname = "127.0.0.1" |akka.remote.netty.tcp.port = 0 """.stripMargin) val actorSystem = ActorSystem("local", akkaConf) val localActor1 = actorSystem.actorOf((Props(new LocalActor("localActor1"))), name = "LocalActor1") // the local actor val localActor2 = actorSystem.actorOf((Props(new LocalActor("localActor2"))), name = "LocalActor2") // the local actor val localActor3 = actorSystem.actorOf((Props(new LocalActor("localActor3"))), name = "LocalActor3") // the local actor val localActor4 = actorSystem.actorOf((Props(new LocalActor("localActor4"))), name = "LocalActor4") // the local actor localActor1 ! "START" // start the action localActor2 ! "START" // start the action localActor3 ! "START" // start the action localActor4 ! "START" // start the action } case object Tick class LocalActor(name: String) extends Actor { // create the remote actor val remote = context.actorSelection("akka.tcp://[email protected]:5160/user/remoteActor") var counter = 0 //var tiker = context.system.scheduler.schedule(1 millis, 100 millis)(println("hehheh")) def receive = { case "START" => remote ! "Hello from the LocalActor:" + name case msg: String => println(s"LocalActor received message: '$msg'") if (counter < 5) { sender ! "The " + counter + " message from the LocalActor:" + name counter += 1 } } }
val remote = context.actorSelection("akka.tcp://[email protected]:5160/user/remoteActor")
就是连接远端的actor
我们启动local端,会看到连接信息