开发文章

基于Actor的并发方案

在前面几篇关于如何利用Scala类型系统大幅提升灵活性和编译期安全性的文章之后,我们现在重新回到这个系列早期探讨过的一个话题:Scala如何处理并发。

我们前面介绍过组合使用Future进行异步处理的方案,这个方案能很好地应对诸多问题,但是,它不是Scala提供的唯一方案,Scala处理并发的另一块基石是“Actor模型”,它提供了一种基于进程间消息传递的并发方案。

Actor并不是新概念,最知名的实现是在Erlang上,Scala核心库很早就有自己的Actor实现了,但是在Scala2.11版本之后,它面临被废弃的命运,因为它将被Akka提供的Actor实现所取代,Akka作为Scala事实上的Actor标准已经很久了。

本文你将了解到Akka Actor模型的理念并学习如何使用Akka工具箱进行基础编程,我们不打算深入地讨论Akka Actor的所有内容,所以,和本系列前面的多数文章不同,本文是为了让你了解一下Akka的理念,激发你对它的兴趣。

共享可变状态的问题

当前主流的并发解决方案是“共享可变状态”( Shared Mutable State),即:大量有状态的对象,它们的状态可以被应用程序的很多地方改变,每处修改都是在自己的线程上进行的。这类方案下代码通常会遍布读/写锁,防止多个线程同时修改,以保证对象状态的改变是可控的。同时,我们还要尽量避免锁住太大的代码块,因为这会大幅度的削弱程序的性能。

Actor模型

广泛使用的“共享可变状态”方案需要你时刻谨记你的代码要在并发场景下运行,你必须从头到尾用并发的方式去设计和编写你的应用,以后你将很难再为其添加支持。Actor编程模型致力于避免上述的所有问题,它允许你编写易读的高性能并发代码。

Actor模型的思想是:把你的应用程序看作是由许多轻量的被称之为“Actor”的实体组成的,每个Actor只负责一个很小的任务,职责单一且清晰,复杂的业务逻辑会通过多个Actor之间相互协作来完成,比如委派任务给其他的Actor或者传递消息给协作者。

Actor系统

Actor是种可怜的“生物”,它们不能独自存活。Akka中的每一个Actor都是由一个Actor系统(Actor System)来创建和维护的。一个Actor系统会提供一整套辅助功能,但是现在我们先不用关心这些。

让我们从示例代码开始,要运行它们需要在基于Scala 2.10的SBT项目里添加如下的resolver和dependency:

复制内容到剪贴板
  1. resolvers += "Typesafe Releases" at "http://repo.typesafe.com/typesafe/releases"  
  2. libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.2.3"  

现在,让我们来创建一个ActorSystem,它将是所有用户定义的Actor的宿主环境。

复制内容到剪贴板
  1. import akka.actor.ActorSystem  
  2. object Barista extends App {  
  3.   val system = ActorSystem("Barista")  
  4.   system.shutdown()  
  5. }  

我们创建了一个ActorSystem实例,把它命名为“Barista”(咖啡师),我们将以”购买和制作咖啡”这个场景来讲解,这是我们在本系列较早一篇文章 composable futures中提到过的例子。

程序的最后,作为一个好市民,当我们不再使用这个ActorSystem时,我们要记得关闭它。

定义Actor

你的应用程序是由几十个还是几百万个Actor来组成取决于你的用例,但是Akka完全有能力处理百万级别数量的Actor,你可能觉得创建这么多Actor一定是疯了,但实际上,Actor和线程之间并不是一一对应的关系,这一点非常重要,否则系统的内存很快就会被耗光的。由于Actor的非阻塞特性,一个线程可以为不同的Actor服务,Akka(译者注:具体地说是Dispatcher)会让线程在不同的Actor之间切换,依据是当前谁有消息需要被处理就分配线程给它。

为了了解实际发生了什么,让我们首先创建一个简单的Actor:Barista,它负责接收咖啡订单制作咖啡,我们只是简单地让它打印一条消息来表示它完成了咖啡订单的处理:

复制内容到剪贴板
  1. sealed trait CoffeeRequest  
  2. case object CappuccinoRequest extends CoffeeRequest  
  3. case object EspressoRequest extends CoffeeRequest  
  4.   
  5. import akka.actor.Actor  
  6. class Barista extends Actor {  
  7.   def receive = {  
  8.     case CappuccinoRequest => println("I have to prepare a cappuccino!")  
  9.     case EspressoRequest => println("Let's prepare an espresso.")  
  10.   }  
  11. }  

首先,我们要定义Actor能理解的消息类型。如果消息有各种参数,通常我们会使用case class来封装消息并在Actor之间传递它们。如果消息没有参数,就简单地使用case object就可以了,就像我们这里写的一样。

任何情况下,要确保你的消息是不可变的,否则出现糟糕的后果。

接下来,我们看一下Barista类,它是一个具体类,继承自Actor特质,这个特质定义了一个方法receive,它返回一个Receive类型的值。Receive是类型PartialFunction[Any, Unit]的一个别名。

消息处理

receive方法的含义是什么呢?它的返回类型PartialFunction[Any, Unit]可能会让你觉得古怪。

简单地说,receive方法返回的这个偏函数代表着这个Actor对传递给它的所有消息的处理逻辑。无论什么时候,当你系统的其他部分(另外一个Actor或者其他什么东西)给这个Actor发送消息时,Akka最终都会通过调用这个Actor的receive方法所返回的这个偏函数来处理消息,在调用时会把这条消息作为这个偏函数的参数传递它。

副作用

在处理消息的时候,你可以让Actor做任何你想做的事情,但是,它就是不能有返回值

为什么?

因为偏函数限定了返回类型只能是Unit,我们曾经强调在函数式编程里应该总是尽量地使用纯函数,所以这里可能会让你感到有些意外。对于一个并发编程模型来说,这其实是合理的,Actor接收的每一条消息都是隔离处理的,是一条一条进行的,并不需要同步或锁机制。那些可能出现副作用的地方都会以一种可控的方式去处理。

非类型化

偏函数的另一个副作用是它所期望的参数类型Any,也就是消息是非类型化的,这在拥有如此强大类型系统的Scala里看上去让人困惑。

非类型化配合一些重要的设计决策可以让我们做很多事情,比如转发消息给其他的Actor,负载均衡或者代理Actor从而避免发送者了解过多的细节。

从实践来看,偏函数里代表消息的参数没有类型化通常不是问题,如果你使用的是强类型化的消息,你就使用模式匹配处理对应类型的消息就可以了,就像我们前面代码里做的那样。

但是有些时候,弱类型的Actor确实会导致糟糕的Bug。如果你习惯并高度依赖强类型系统,你可以看一下Akka还处于实验阶段的特性:Typed Channels。

异步和非阻塞

我前面提到Akka“最终”会让你的Actor去处理发送给它的消息。这句话的意思是:消息的发送和处理完成是一个异步的非阻塞的过程。发送方不会一直被阻塞到接收方处理完消息,它会直接继续它自己的工作,发送方可能会期望从处理方那里得到一个反馈消息,但也许它根本不关心消息的处理结果。

当某些组件发送一条消息给一个Actor时,真正发生的事情是:这个消息被投递给了这个Actor的Mailbox, 我们基本上可以把Mailbox当成一个对列。把一条消息放到一个Actor的Mailbox的过程也是非阻塞的,比如:发送方不会一直等待消息进入接收方的Mailbox对列。

Dispatcher会通知Actor它的Mailbox有一条新消息,如果这个Actor正在处理着手头上的消息,Dispatcher会从当前的执行上下文里选一个可用的线程,一旦Actor处理完前面的消息,它会让Actor在这个准备好的线程上从Mailbox里取出这条消息去处理。

Actor会阻塞分配给它的线程直到它开始处理一条消息,但这并不会阻塞消息的发送方,这意味着一个耗时较长的操作会影响整体的性能,因为所有其他的Actor不得不被安排从剩余线程中选取一个去处理消息(译者注:增加了线程调度的开销)。

因此,你的Receive偏函数要遵从一个核心的准则:尽可能地缩短它的执行时间(译者注:可以把任务分解为更小的单元委派更粒度更细的Actor去执行)。更重要的是,在你的消息处理代码里尽量地避免调用会造成阻塞的代码。

当然,有些事情是你无法完全避免的,比如,当今主流的数据库驱动基本都是阻塞的,如果你想让你的程序基于Actor去持久化或查询数据时,你就会面临这样的问题。现在已经有一些应对这类问题的方案了,但是作为一偏介绍性的文章,本文先不涉及。

创建一个Actor

定义好Actor之后,我们如何实际地使用我们的Barista呢?为此,我需要为创建Barista的一个实例。你可能会用下面这种常规的做法,即通过调用构造函数来实例化一个Actor:

复制内容到剪贴板
  1. val barista = new Barista // will throw exception  

这样不行!Akka会用一个ActorInitializationException异常回敬你。事情是这样的:为了能让Actor良好地工作,你的Actor必须交给ActorSystem和它的组件来托管。因此,你需要通过ActorSystem来帮你创建这个实例:

复制内容到剪贴板
  1. import akka.actor.{ActorRef, Props}  
  2. val barista: ActorRef = system.actorOf(Props[Barista], "Barista")  

定义在AcotorSysytem上actorOf方法期望一个Props实例,针对新建的Actor的配置信息都会封装到这个Props实例里,另外,方法也给这个Actor起了一个名称。我们使用的是创建Props实例的最简单的方式,也就是通过调用Props的伴生对象的apply方法来实现的,同时指定它的类型参数。Akka随后将根据给定的类型调用它的构造函数来创建一个新的Actor实例。

注意:actorOf返回的对象类型不是Barista而是ActorRef(译者注:Akka对Actor的托管体现在很多地方,前面提到的你不能直接创建一个Actor实例是一方面,此处,创建之后你得到的也不是Actor实例本身而是一个ActorRef)。Actor从不会与其他Actor直接通信,因此没有必要获取一个Actor实例的直接引用,而是让Actor或其他组件获取那些需要发送消息的Actor的ActorRef

因此,ActorRef扮演了Actor实例代理的角色,这样做会带来很多好处,因为一个ActorRef可以被序列化,我们可以让它代理一个远程机器上的Actor,至于ActorRef后面的这个Actor是本地JVM里的还是一个远程机器上的,这对使用者来说都是透明的,我们把这种特性称作“位置透明”。

请记住,ActorRef不是类型参数化的,任何ActorRef都可以互相交换,以便可以让你发送任意的消息给任意的ActorRef。这种设计我们前面也提到了,它让你可以简单地修改你的Actor系统的网络拓扑而不需要对发送方做任何修改。

发送消息

现在我们已经创建了一个Barista的actor实例并获得了指向它的引用ActorRef。我们现在可以给它发送消息了。这是通过调用ActorRef!方法实现的:

复制内容到剪贴板
  1. barista ! CappuccinoRequest  
  2. barista ! EspressoRequest  
  3. println("I ordered a cappuccino and an espresso")  

调用!方法是一个“调用之后不管”的操作:你告诉Barista你想要一杯卡布奇诺,但是你不会一直等着它响应。这是Akka里面Actor最常见的交互方式。通过调用!,你告诉Akka把你的消息加入到收信人的邮箱队列里,如前面所述,这是非阻塞的,作为接收方的Actor最终一定会处理你的消息。

由于异步的特性,上述代码的执行循序是不确定的,它可能会是这样一种结果:

复制内容到剪贴板
  1. I have to prepare a cappuccino!  
  2. I ordered a cappuccino and an espresso  
  3. Let's prepare an espresso.  

即使我们是先给Barista的Mailbox连续发送了两条消息,但是描述顾客点完咖啡信息去先于espresso的制作完成前打印到了控制台。

消息应答

有时候,你可能想通过给消息发送者发送一个应答消息。为了让你能那样做,Actor有一个叫sender的方法,它返回最后一条消息的发送方的ActorRef

但是它是怎么知道发送者是谁的呢?答案可以从!方法的函数签名看出来,它的第二个参数是一个隐式参数:

复制内容到剪贴板
  1. def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit  

在调用某个Actor的!方法时,这个Actor的ActorRef会作为隐士参数sender传递过去。

让我们改变一下Barista以便于它在工作前先发送一条账单消息BillCoffeeRequest的发送者:

复制内容到剪贴板
  1. case class Bill(cents: Int)  
  2. case object ClosingTime  
  3. class Barista extends Actor {  
  4.   def receive = {  
  5.     case CappuccinoRequest =>  
  6.       sender ! Bill(250)  
  7.       println("I have to prepare a cappuccino!")  
  8.     case EspressoRequest =>  
  9.       sender ! Bill(200)  
  10.       println("Let's prepare an espresso.")  
  11.     case ClosingTime => context.system.shutdown()  
  12.   }  
  13. }  

上述代码里我们引入了一个新的消息ClosingTime, Barista应对这条消息的做法就是关闭ActorSystem,所有的Actor都可以从ActorContext获取ActorSystem。

现在,让我们引入第二个Actor,它代表“顾客”。

复制内容到剪贴板
  1. case object CaffeineWithdrawalWarning  
  2. class Customer(caffeineSource: ActorRef) extends Actor {  
  3.   def receive = {  
  4.     case CaffeineWithdrawalWarning => caffeineSource ! EspressoRequest  
  5.     case Bill(cents) => println(s"I have to pay $cents cents, or else!")  
  6.   }  
  7. }  

这是一个咖啡成瘾的顾客,一旦咖啡因摄入量减少,他就要买咖啡喝。我们给Customer的构造函数传递一个ActorRef,即caffeineSourceCustomer并不知道这个ActorRef指向一个Barista,但它知道它可以发送一个CoffeeRequest消息给它,仅此而已。

最后,让我们实例化这两个Actor,并给Customer发送一个CaffeineWithdrawalWarning消息(译者注:这条消息的意思是:咖啡因摄入量减少,发出警告,言下之意就是顾客的身体告诉顾客:“你该喝咖啡了!”),让程序跑起来:

复制内容到剪贴板
  1. val barista = system.actorOf(Props[Barista], "Barista")  
  2. val customer = system.actorOf(Props(classOf[Customer], barista), "Customer")  
  3. customer ! CaffeineWithdrawalWarning  
  4. barista ! ClosingTime  

在这里,对于这个Customer,我们使用了一个不同的工厂方法来创建一个Props实例:我们传入Actor的类型以及它的构造函数所需要的参数。我们这样做是因为我们想把BaristaActorRef传递给Customer 构造函数。

发送CaffeineWithdrawalWarning消息给咖啡因成瘾的顾客,顾客的反应是立即买一杯浓咖啡,即发送一条EspressoRequest消息给咖啡师,后者则会发送一个账单消息给顾客。相应的输出可能是这样的:

复制内容到剪贴板
  1. Let's prepare an espresso.  
  2. I have to pay 200 cents, or else!  

首先,在处理EspressoRequest消息的时候,Barista会发一个消息给sender,也就是Customer,但是,这个操作并不会阻塞它后面的操作。Barista可以继续处理EspressoRequest,就如像控制台打印的那样。很快, Customer会处理Bill消息,然后打印到控制台。

问询(Ask)机制

有时候,给一个Actor发送消息然后期待返回一个响应消息这种模式并不适用于某些场景,最常见的例子是当某些组件并不是Actor但又需要和Actor交互时,它们就无法接收来自Actor的消息。

对于这种情况,Akka有一种Ask机制,它在基于Actor的并发和基于Future的并发之间提供一座桥梁,从客户端的角度看,它是这样工作的:

复制内容到剪贴板
  1. import akka.pattern.ask  
  2. import akka.util.Timeout  
  3. import scala.concurrent.duration._  
  4. implicit val timeout = Timeout(2.second)  
  5. implicit val ec = system.dispatcher  
  6. val f: Future[Any] = barista2 ? CappuccinoRequest  
  7. f.onSuccess {  
  8.   case Bill(cents) => println(s"Will pay $cents cents for a cappuccino")  
  9. }  

首先,你要引入对ask语法的支持(即import akka.pattern.ask),同时针对?方法返回的Future创建一个隐式变量timeout,Future还需要一个ExecutionContext。这里我们只简单地使用ActorSystem默认的dispatcher,它同时也是一个ExecutionContext

如你所见,返回的Future是非类型化的,它是一个Future[Any]。这也不奇怪,既然它接收的是来自某个Actor的消息,这些Actor(ActorRef)尚未类型化,返回的Future又怎么能类型化呢?

对于被问询的Actor来说,这和发送消息给被处理的消息的发送方并没有什么不同,所以我们在使用Ask机制从Barista处获取答复时,Barista本身不需要做任何改动。

一旦被询问的Actor发送了反馈消息给消息的发送方,返回的FuturePromise就完成了。

总的来说,主动告知要比问询好,因为它耗费的资源更少,Akka不是给“礼貌”的人准备的。但是,确实有些场景你只能使用问询的方式,但那也没有什么,Akka同样可以工作地很好。

有状态的Actor

每个Actor都可以维护一个内部的状态,但不是必须的。有时候,系统的整体状态很大一部分是由那些在Actor之间传递的不可变的消息所携带的信息组成

的。

一个Actor一次只处理一条消息,这个过程中它可能会修改它的内部状态,这意味着Actor的内部状态是可变的,但是既然每一条消息是互相隔离处理的,那么Actor的内部状态就不会因为并发而被搞乱。

为了说明这一点,让我们把无状态的Barista改造成有状态的,我们给它添加一个订单计数器:

复制内容到剪贴板
  1. class Barista extends Actor {  
  2.   var cappuccinoCount = 0  
  3.   var espressoCount = 0  
  4.   def receive = {  
  5.     case CappuccinoRequest =>  
  6.       sender ! Bill(250)  
  7.       cappuccinoCount += 1  
  8.       println(s"I have to prepare cappuccino #$cappuccinoCount")  
  9.     case EspressoRequest =>  
  10.       sender ! Bill(200)  
  11.       espressoCount += 1  
  12.       println(s"Let's prepare espresso #$espressoCount.")  
  13.     case ClosingTime => context.system.shutdown()  
  14.   }  
  15. }  

我们引入了两个变量,cappuccinoCountexpressoCount分别代表两类咖啡的订单数量。这是我们在本系列文章里第一次使用var,虽然在函数式编程里我们应该尽量地避免使用var,但这是唯一让你的Actor携带”状态”的做法。既然每条消息都是隔离处理的,上述的代码和在非Actor环境里使用AtomicInteger效果上是一样的。

小结

到这里,我们关于使用Actor模型进行并发编程以及如何在Akka中使用这种编程范式的介绍就要结束了。我们只是泛泛地介绍了一下,忽略了Akka里一些重要的概念,我希望这能让你对这种并发编程方法先有一个初步的了解,并激发出你的学习兴趣。

在下一篇文章里,我会增强我们的例子,给它添加一些有意义的行为以便介绍更多关于Akka Actor的东西,比如在一个Actor系统里是如何进行错误处理的。

感谢 bluishglc 支持 磐实编程网 原文地址:
blog.csdn.net/bluishglc/article/details/53155454

上一篇:MFC多线程编程

下一篇:JAVA线程池浅谈

文章信息

发布时间:2016-11-15

作者:bluishglc

发布者:aquwcw

浏览次数: