33-Scala-Akka

发布时间 2024-01-11 09:51:37作者: tree6x7

1. Akka 概述

Akka 是 Java 虚拟机 JVM 平台上构建高并发、分布式和容错应用的工具包和运行时,你可以理解成 Akka 是编写并发程序的框架。Akka 用 Scala 语言写成,同时提供了Scala 和 Java 的开发接口。

Akka 基于 Actor 模型,它提供了一种轻量级的并发抽象,称为 Actor,以及处理并发和分布式通信的工具。在 Akka 中,Actor 是并发执行的基本单位,它可以接收消息、处理消息和发送消息给其他 Actor。每个 Actor 都有自己的状态和行为,并且可以通过消息来进行通信和协调。

Actor 模型用于解决什么问题?

处理并发问题关键是要保证共享数据的一致性和正确性,因为程序是多线程时,多个线程对同一个数据进行修改,若不加同步条件,势必会造成数据污染。但是当我们对关键代码加入同步条件 synchronized 后,实际上大并发就会阻塞在这段代码,对程序效率有很大影响。若是用单线程处理,不会有数据一致性的问题,但是系统的性能又不能保证。Actor 模型的出现解决了这个问题,简化并发编程,提升程序性能。 你可以这里理解:Actor 模型是一种处理并发问题的解决方案。

2. Actor 模型

2.1 模型概述

https://www.cnblogs.com/gabry/p/9264083.html

Akka 处理并发的方法基于 Actor 模型。

在 Actor 模型中,actor 是一个并发原语,简单的说,一个 actor 就是一个工人,与进程或线程一样都能够工作或处理任务。其实这还有点不好理解,我们可以把它想象成面向对象编程语言中的一个对象实例。在 OOP 中一个对象可以访问或修改另一个对象的属性,也可以直接调用另一个对象的方法。例如下图,person1 给 person2 发送了一个消息,直接调用方法就行了。深入底层执行逻辑的话,结果就是 JVM 转到 sayHello 的代码区,一步步执行。

public class HelloWorld {
    private String name = "";
    public HelloWorld(String name){
        this.name = name;
    }
    public String getName(){
        return this.name;
    }
    public void sayHello(HelloWorld to, String msg){
        System.out.println(to.getName()+" 收到 "+name+" 的消息:"+ msg);
    }
}

public class OOPInvoke {
    public static void main( String[] args ) {
        HelloWorld person1 = new HelloWorld("Person1");
        HelloWorld person2 = new HelloWorld("Person2");
        person1.sayHello(person2,"Hello world");
    }
}

在基于 Actor 的系统里,所有的事物都是 Actor,就好像在面向对象设计里面所有的事物都是对象一样。Actor 模型是作为一个并发模型设计和架构的。

Actor 和对象的不同之处在于,Actor 的状态不能直接读取、修改,Actor 的方法不能直接调用。Actor 只能通过「消息传递」的方式与外界通信。每个对象都有一个 this 指针,代表对象的地址,可以通过该地址调用方法或存取状态;与此类似,Actor 也有一个代表本身的地址,但只能向该地址发送消息。

Actor 是并发执行的最小单位,它封装了状态和行为,并且只能通过消息进行通信(Actor 之间只能通过消息通信)。Actor 之间的通信是异步的,不会阻塞或等待响应。

  • Actor 通过消息传递的方式与外界通信。消息传递是异步的。每个 Actor 都有一个邮箱,该邮箱接收并缓存其他 Actor 发过来的消息,Actor 一次只能同步处理一个消息,处理消息过程中,除了可以接收消息,不能做任何其他操作。
  • Actor 模型的另一个好处就是可以消除共享状态,因为它每次只能处理一条消息,所以 Actor 内部可以安全的处理状态,而不用考虑锁机制

那么读者可能会问,每次只处理一个消息,这不是会严重的影响性能么?废话,一次处理一个消息当然影响性能了。不过,如果你恰当的运用 Akka 和 Actor 模型,完全可以不必关心性能的问题。下面是 Actor 模型的几个基本原则:

  • 所有的计算都是在 Actor 中执行的
  • Actor 之间只能通过消息进行通信交流
  • 为了响应消息,Actor 可以进行下列操作:
    • 更改状态或行为
    • 发消息给其他 Actor
    • 创建有限数量的子 Actor

看了上面几个基本原则,你是不是更加鉴定的认为 Actor 模型没啥用?嗯,这就对了,因为我当初也是这么认为的。

一次处理一个消息,没有并发,怎么提高性能?如果 Actor 只能更改状态或行为,发消息给其他 Actor,创建有限数量的子 Actor,那我的业务逻辑在哪里?Actor之间只能通过消息通信,我怎么知道另外一个 Actor 的地址?

其实 Actor 模型出现的很早,而 20 世纪 80 年代,爱立信在 Erlang 中实现了 Actor 模型,用于嵌入式电信应用程序。该实现中引入了监督机制提供的容错性概念。爱立信使用 Erlang 和 Actor 模型实现了一款日后经常被提及的应用:AXD301。这玩意儿能提供 99.9999999% 的可用性,看到没,7个9!!!绝对可以亮瞎人们的狗眼,这意味着在 100 年的时间中,AXD301 只有 3.1s 的时间会宕机。

Actor 模型的另一个重要的特性就是容错,它通过监督机制提供容错。这跟 Java 中的 throw Exception 有点类似,都是把处理响应错误的责任交给出错对象以外的实体。但在 Java 中如果一个程序或者线程抛出了一个异常,你敢放心的恢复对应的程序或线程吗?你确保恢复之后还能正常的运行吗?毕竟需要很多资源需要重新创建,但 Actor 模型可以!

如上图所示 Actor 之间是有层级关系的,子 Actor 如果出现了异常会抛给父 Actor,父 Actor 会根据情况重新构建子 Actor,子 Actor 从出现异常,到恢复之后正常运行,这段时间内的所有消息都不会丢失,等恢复之后又可以处理下一个消息。也就是说如果一个 Actor 抛出了异常,除了导致发生异常的消息外,任何消息都不会丢失。这容错性当然好了。当然了,为了实现这种特性,Akka 或 Erlang 需要做很多工作的。

Akka 中的 Actor 模型还有另外一个比较重要的两个特性:分布式与位置透明性。其实可以认为这是一个特性。Actor 模型中一个很重要的概念就是 Actor 地址,因为其他 Actor 需要通过这个地址与 Actor 进行通信。Akka 考虑到分布式的网络环境,对 Actor 地址进行了抽象,屏蔽了本地地址和远程地址的差异,对于开发者来说基本上是透明的。由于 Actor 地址是透明的,那么 Actor 又引入了集群。当然了,基于 Actor 模型和位置透明性,Akka 还有其他很多有用的组件,这里就不介绍了。

2.2 示例说明

(1)测试代码

class SayHelloActor extends Actor {
  
  /**
   * 收到消息之后处理消息的入口函数
   * 1. 该方法会被该 Actor 的 MailBox(其实现了 Runnable)调用
   * 2. 当该 Actor 的 MailBox 接收到消息,就会调用 receive
   * 3. Receive 底层:
   * -> trait Actor { type Receive = Actor.Receive }
   * -> object Actor { type Receive = PartialFunction[Any, Unit] }
   *
   * @return
   */
  override def receive: Receive = {
    case "hello" => println("Hello Back!")
    case "ok" => println("Ok Back!")
    case "exit" =>
      println("EXIT ActorSystem ...")
      // 停止当前ActorRef
      context.stop(self)
      // 退出ActorSystem
      context.system.terminate()
    case _ => println("NOTHING MATCH!")
  }

}


object MainApp {

  // 1. 先创建一个 ActorSystem,专门用于创建 Actor
  private val actorFactory = ActorSystem("firstActor")

  // 2. 创建 Actor,会返回关联的 ActorRef
  private val sayHelloActorRef = actorFactory.actorOf(Props[SayHelloActor], "sayHelloActor")

  def main(args: Array[String]): Unit = {
    // 给 sayHelloActor(的邮箱) 发消息
    // sayHelloActorRef -["hello"]-> DispatcherMessage -["hello"]-> {SayHelloActor#MailBox:队列&线程} -> MailBox.Actor.receive()
    sayHelloActorRef ! "hello"
    sayHelloActorRef ! "ok"
    sayHelloActorRef ! "2"
    sayHelloActorRef ! "exit"
  }

}

这是基础模式的最基本形式,给 Actor 发送消息,Actor 对消息进行响应,发送和响应是异步的,同一个 Actor 对所有的消息都是按照邮箱队列的顺序,串行调用的。

Hello Back!
Ok Back!
NOTHING MATCH!
EXIT ActorSystem ...

(2)结合示例再来说明 Actor 模型的核心概念

Actor 模型是一种并发计算模型,它将并发系统中的实体抽象为独立的 Actor,并通过消息传递进行通信。每个 Actor 都是独立的、可执行的单元,具有自己的状态和行为。Actor 之间相互解耦,通过异步方式发送和接收消息,从而实现并发和并行处理。

核心概念 简单说明
Actor Actor 是并发系统中的基本单位。每个 Actor 都有一个唯一的标识符,并且具有状态和行为。Actor 可以接收消息、处理消息和发送消息给其他 Actor。在 Akka 中,Actor 由 Actor 类表示,可以通过创建 Actor 类的实例来创建真实的 Actor。
消息传递 在 Actor 模型中,消息是 Actor 之间进行通信的基本单位。消息可以是任何对象,用于传递数据、指令或请求。消息是不可变的,且 Actor 之间只能通过消息进行通信。消息传递是异步的,这意味着发送消息的 Actor 不会阻塞等待响应(发送消息 Actor 也可以选择等待回复)。
邮箱(Mailbox) 每个 Actor 都有一个与之关联的 Mailbox,用于存放接收到的消息。Mailbox 是线程安全的队列,保证消息按顺序被处理。当 Actor 接收到消息时,消息会被放入 Mailbox 中等待处理。
行为和状态 Actor 的行为由其接收消息时执行的代码逻辑定义。每个 Actor 可以根据不同的消息采取不同的行为。Actor 还可以通过修改自己的状态来响应不同的消息。由于 Actor 之间是相互独立的,因此 Actor 的状态在不同的 Actor 之间是隔离的,不会相互影响。
ActorRef ActorRef 是对 Actor 的引用,是向特定 Actor 发送消息的句柄。通过 ActorRef,可以发送消息给指定的 Actor,而无需知道 Actor 的具体标识符。ActorRef 提供了一种封装,使得 Actor 的行为和状态对外部是隐藏的(e.g. A-Actor 如果想给自己发消息,就通过 A-ActorRef;A-Actor 想给 B-Actor 发消息,就需要持有 B-ActorRef,通过 B-ActorRef 发)。
生命周期管理 Akka 提供了对 Actor 生命周期的管理机制。Actor 在创建、启动、停止和重启等不同阶段都有相应的生命周期方法,可以在这些方法中执行初始化、清理、监控等操作。
监督机制 Akka 中的 Actor 可以通过监督机制对其他 Actor 的异常进行监控和处理。每个 Actor 都有一个监督者(Supervisor),它负责管理和监控受管 Actor 的行为。当受管 Actor 出现异常时,监督者可以根据事先定义的策略来决定如何处理异常,例如重启 Actor、停止 Actor 或进行其他操作。
ActorSystem ActorSystem 是 Akka 中的顶级抽象,代表着整个 Actor 系统。它是一个容器,负责创建和管理 Actor 的生命周期,以及提供配置、线程池等系统级的资源。是构建基于 Actor 模型的系统的入口点。每个应用程序通常只需要一个 ActorSystem 实例,但在某些情况下,可以创建多个 ActorSystem 实例以满足特定的需求。

通过使用 Actor 模型,Akka 实现了高度可扩展、高并发和容错的并发编程。每个 Actor 是独立的,它们之间通过消息传递进行通信,避免了传统并发编程中的锁竞争和共享状态的问题。同时,Akka 提供了监督机制和生命周期管理,帮助开发人员构建健壮的分布式系统。

2.3 补充说明

a. ActorSystem

  1. 创建 Actor:通过 ActorSystem,可以使用 system.actorOf 方法创建 Actor 的实例。ActorSystem 负责创建和管理所有的 Actor,并分配 Mailbox 和 Dispatcher。
  2. Actor 层级结构:ActorSystem 可以创建和管理多个 Actor,这些 Actor 可以根据业务需求形成层级结构。Actors 在层级结构中有不同的角色和职责,通过 ActorRef 可以在不同的 Actors 之间进行消息传递。
  3. 配置管理:ActorSystem 提供了配置机制,允许开发人员自定义和调整 ActorSystem 的行为。可以通过配置文件(比如 application.conf)或编程方式配置 ActorSystem 的各种参数,如线程池大小、超时时间等。
  4. 线程池管理:ActorSystem 实现了对线程池的管理和分配。它会根据系统负载和配置进行调度,决定将消息分发给哪个 Actor 并在哪个线程上执行。
  5. 监督机制和容错:ActorSystem 充当着顶级监督者的角色,负责监督整个 Actor 系统中的 Actor,并决定如何处理异常。当 Actor 异常终止时,ActorSystem 可以根据事先定义的策略(如重启、停止等)来管理和恢复 Actor 的状态。
  6. 生命周期管理:ActorSystem 提供了生命周期钩子方法,用于在启动和停止 ActorSystem 时执行一些初始化和清理操作。

b. Dispatcher

(1)Dispatcher

在 Akka 中,Dispatcher 负责将消息投递给 Actor 并决定使用哪个线程来执行 Actor 的逻辑。Dispatcher 是 ActorSystem 的一部分,可以根据配置和需求创建多个 Dispatcher。每个 Dispatcher 都有一个关联的线程池,用于管理任务的执行 —— 根据调度策略从 Mailbox 中获取消息,并将消息分配给合适的 Actor 执行。

(2)消息派发

当消息被发送给 Actor 时,Dispatcher 负责将消息投递给目标 Actor,并负责选择合适的线程执行 Actor 的处理逻辑。派发决策可以根据配置和调度算法进行,以满足不同的需求,例如公平派发、优先级派发等。

(3)派发器类型

Akka 提供了不同类型的 Dispatcher,每种类型适用于不同的场景和需求。常见的派发器类型包括:

  • ForkJoinDispatcher:基于 Java Fork/Join 框架的 Dispatcher,适用于计算密集型任务。
  • ThreadPoolDispatcher:基于线程池的 Dispatcher,适用于 I/O 密集型任务。
  • DefaultDispatcher:默认的 Dispatcher,在配置中未指定时使用。

(4)配置 Dispatcher

可以通过配置文件或编程方式为每个 Actor 或 ActorSystem 分配特定的 Dispatcher。在配置文件中,可以为 Dispatcher 指定线程池大小、派发策略、优先级等参数。

(5)Dispatcher 和 Mailbox 之间的关系可以简单描述为以下几点:

  • Dispatcher 知道所有的 Mailbox,因为它负责将消息投递给 Actor。
  • Dispatcher 根据配置和调度算法从 Mailbox 中获取消息,然后将其分配给合适的线程执行 Actor 的逻辑。
  • Mailbox 是 Actor 的一部分,负责存储接收到的消息。在获取到消息后,Mailbox 通过 Dispatcher 将消息分发给 Actor。

可以将 Dispatcher 比喻为调度中心,而 Mailbox 则是 Actor 的消息存储区。Dispatcher 从 Mailbox 获取消息,然后将其分派给合适的 Actor 执行。这种分工合作确保了 Actor 之间的消息按顺序和异步方式得到处理。

同时,Akka 还提供了不同类型的 Dispatcher 和 Mailbox 实现,以满足不同的需求和场景。可以根据具体的配置和应用需求选择适合的 Dispatcher 和 Mailbox 类型,以优化系统性能和资源利用。

总结起来,Dispatcher 负责将消息从 Mailbox 中获取,并将其分派给合适的线程执行 Actor 的逻辑。Mailbox 是 Actor 接收到消息时存放消息的缓冲区。它们共同协作,实现了消息的分发和处理。

消息派发和 Dispatcher 的工作机制可以实现 Actor 的并发处理和资源管理。Dispatcher 负责根据不同的调度策略将消息分发给 Actor,并利用线程池来管理执行 Actor 逻辑的线程。这样可以确保 Actor 之间的消息处理是异步和非阻塞的,提升了系统的并发性能和吞吐量。

3. Actor 间通讯案例

3.1 环境配置

<properties>
  <encoding>UTF-8</encoding>
  <scala.version>2.12.18</scala.version>
  <scala.compat.version>2.12</scala.compat.version>
  <akka.version>2.5.21</akka.version>
</properties>

<dependencies>
  <!-- 添加scala的依赖 -->
  <dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>${scala.version}</version>
  </dependency>

  <!-- 添加akka的actor依赖 -->
  <dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-actor_${scala.compat.version}</artifactId>
    <version>${akka.version}</version>
  </dependency>

  <!-- 多进程之间的Actor通信 -->
  <dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-remote_${scala.compat.version}</artifactId>
    <version>${akka.version}</version>
  </dependency>
</dependencies>

<!-- 指定插件-->
<build>
  <!-- 指定源码包和测试包的位置 -->
  <sourceDirectory>src/main/scala</sourceDirectory>
  <testSourceDirectory>src/test/scala</testSourceDirectory>
  <plugins>
    <!-- 指定编译scala的插件 -->
    <plugin>
      <groupId>net.alchim31.maven</groupId>
      <artifactId>scala-maven-plugin</artifactId>
      <version>3.2.2</version>
      <executions>
        <execution>
          <goals>
            <goal>compile</goal>
            <goal>testCompile</goal>
          </goals>
          <configuration>
            <args>
              <arg>-dependencyfile</arg>
              <arg>${project.build.directory}/.scala_dependencies</arg>
            </args>
          </configuration>
        </execution>
      </executions>
    </plugin>

    <!-- maven打包的插件 -->
    <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-shade-plugin</artifactId>
      <version>2.4.3</version>
      <executions>
        <execution>
          <phase>package</phase>
          <goals>
            <goal>shade</goal>
          </goals>
          <configuration>
            <filters>
              <filter>
                <artifact>*:*</artifact>
                <excludes>
                  <exclude>META-INF/*.SF</exclude>
                  <exclude>META-INF/*.DSA</exclude>
                  <exclude>META-INF/*.RSA</exclude>
                </excludes>
              </filter>
            </filters>
            <transformers>
              <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                <resource>reference.conf</resource>
              </transformer>
              <!-- 指定main方法 -->
              <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                <mainClass>xxx</mainClass>
              </transformer>
            </transformers>
          </configuration>
        </execution>
      </executions>
    </plugin>
  </plugins>
</build>

3.2 编写代码

a. AActor

class AActor(val bActorRef: ActorRef) extends Actor {

  override def receive: Receive = {
    case "start" =>
      println("A-Actor start!")
      self ! "r u ready?"
    case "kick" =>
      println("A-Actor: 发起攻击")
      bActorRef ! "kick"
  }
}

b. BActor

class BActor extends Actor {

  override def receive: Receive = {
    case "start" =>
      println("B-Actor start!")
      self ! "r u ready?"
    case "kick" =>
      println("B-Actor: 开始反击!")
      sender() ! "kick"
  }
}

c. MainApp

object MainApp {
  def main(args: Array[String]): Unit = {

    val actorFactory = ActorSystem()
    val bActorRef = actorFactory.actorOf(Props[BActor], "bActor")
    val aActorRef = actorFactory.actorOf(Props(new AActor(bActorRef)), "aActor")

    aActorRef ! "start"
    aActorRef ! "kick"
  }
}

两个Actor通讯机制和Actor 自身发消息机制基本一样,只是要注意如下:

  1. 如果 A Actor 在需要给 B Actor 发消息,则需要持有 B Actor 的 ActorRef,可以通过创建时,传入 B Actor 的 ActorRef。
  2. 当 B Actor 在 receive 方法中接收到消息,需要回复时,可以通过 sender() 获取到发送 Actor 的代理对象。

如何理解 Actor 的 receive 方法被调用?

每个 Actor 对应一个 MailBox,MailBox 实现了 Runnable 接口,处于运行的状态。当有消息到达 MailBox,就会去调用 Actor 的 receive 方法。

4. Akka 网络编程

Akka 将复杂的 Actor 通信、Actor 注册、Actor 查找进行了封装。用户在写自己的 Actor 时,只需要实现 akka.actor.Actor 这个接口。

在 Akka 中,每一个 Actor 都有一个唯一的 URL,该 URL 的定义格式和万维网地址的定义格式非常相似。

每一个 Actor 通过 ActorSystem 和 Context 初始化的时候,都会得到自己唯一的路径,路径格式如下。并且可以通过 actorSelection(path) 方法查找对应路径的 Actor 对象,该方法返回该 Actor 的 ActorRef,得到 ActorRef 后就可以发送消息了。

akka.tcp://systemName@ip:port/user/topActorName/otherActorName

4.1 案例:小黄鸡客服

a. MsgProtocol

case class ClientMsg(msg: String)

case class ServerMsg(msg: String)

b. Server

class YellowChickenServer extends Actor {
  override def receive: Receive = {
    case "start" => println("小黄鸡开始工作啦~")
    case ClientMsg(msg) => msg match {
      case "twice" => sender() ! ServerMsg("元气兔兔拯救世界!")
      case "once" => sender() ! ServerMsg("汪汪汪!有九个主人!")
      case "slogan" => sender() ! ServerMsg("三四五代全熬死,还得看我兔瓦斯。")
      case _ => sender() ! ServerMsg("[nayeon,jeongyeon,momo,sana,jihyo,mina,dahyun,chaeyoung,tzuyu]")
    }
  }
}

object ServerApp extends App {
  private val (host, port) = ("127.0.0.1", 9999)
  // 创建 config 对象:指定协议类型、监听的IP和端口
  private val config = ConfigFactory.parseString(
    s"""
       |akka.actor.provider="akka.remote.RemoteActorRefProvider"
       |akka.remote.netty.tcp.hostname=$host
       |akka.remote.netty.tcp.port=$port
       """.stripMargin)
  private val serverActorSystem = ActorSystem("Server", config)

  private val yellowChickenServer: ActorRef = serverActorSystem.actorOf(Props[YellowChickenServer], "yellowChickenServer")

}

c. Client

class YellowChickenClient(val serverHost: String, val serverPort: Int) extends Actor {

  private var serverActorRef: ActorSelection = _

  // 该方法会在Actor运行前执行,在Akka开发中,通常将初始化工作放在preStart中。
  override def preStart() = {
    // ActorSystem("Server", config)
    // "user" 是固定值
    // serverActorSystem.actorOf(Props[YellowChickenServer], "yellowChickenServer")
    println("init client...")
    serverActorRef = context.actorSelection(s"akka.tcp://Server@$serverHost:$serverPort/user/yellowChickenServer")
  }

  override def receive: Receive = {
    case "start" => println("用户来了")
    case msg: String => serverActorRef ! ClientMsg(msg)
    case ServerMsg(msg) => println(s"小黄鸡客服:$msg")
  }
}

object ClientApp extends App {

  private val (clientHost, clientPort, serverHost, serverPort) = ("127.0.0.1", 9990, "127.0.0.1", 9999)
  val config = ConfigFactory.parseString(
    s"""
       |akka.actor.provider="akka.remote.RemoteActorRefProvider"
       |akka.remote.netty.tcp.hostname=$clientHost
       |akka.remote.netty.tcp.port=$clientPort
       """.stripMargin)
  private val clientActorSystem = ActorSystem("Client", config)
  private val clientActorRef: ActorRef = clientActorSystem.actorOf(Props(new YellowChickenClient(serverHost, serverPort)), "yellowChickenClient-01")

  clientActorRef ! "start"

  private var msg = ""
  while (true) {
    msg = StdIn.readLine()
    clientActorRef ! msg
    println("Enter your question: ")
  }

}

4.2 案例:Spark 活性检测

a. MsgProtocol

// Master 向 Worker 返回注册成功
case object RegisteredWorkerInfo

// Worker 向 Master 发送注册信息
case class RegisterWorkerInfo(id: String, cpu: Int, ram: Int)

// Master 存储 Worker 信息的数据结构
class WorkerInfo(val id: String, val cpu: Int, val ram: Int) {
  var lastHeartBeatTime: Long = _
}

// Worker 通知自己该发送心跳了
case object SendHeartbeat

// Worker 向 Master 发送的心跳信息
case class Heartbeat(id: String)

// Master 通知自己该做超时检查了
case object StartTimeoutWorker

// Master 通知自己删除超时Worker
case object RemoveTimeoutWorker

b. SparkMaster

class SparkMaster extends Actor {

  private val workerRegister = mutable.Map[String, WorkerInfo]()

  override def receive: Receive = {
    case "start" =>
      println("SparkMaster start...")
      self ! StartTimeoutWorker
    case RegisterWorkerInfo(id, cpu, ram) if !workerRegister.contains(id) =>
      val workerInfo = new WorkerInfo(id, cpu, ram)
      workerRegister += (id -> workerInfo)
      println(s"workersCnt=${workerRegister.size}")
      sender() ! RegisteredWorkerInfo
    case Heartbeat(id) =>
      val workerInfo = workerRegister(id)
      workerInfo.lastHeartBeatTime = System.currentTimeMillis
      println(s"Master update workerId=$id's heartbeat...")
    case StartTimeoutWorker =>
      import context.dispatcher
      context.system.scheduler.schedule(0 millis, 10000 millis, self, RemoveTimeoutWorker)
    case RemoveTimeoutWorker =>
      println(s"start to remove timeout worker ... workersCnt=${workerRegister.size}")
      val workerInfoList = workerRegister.values
      val now = System.currentTimeMillis
      workerInfoList.filter(worker => now - worker.lastHeartBeatTime > 6677).foreach(worker => workerRegister.remove(worker.id))
      println(s"remove timeout worker over ... workersCnt=${workerRegister.size}")
  }
}

object MasterApp extends App {
  // private val (host, port, name) = ("127.0.0.1", 10008, "sparkMaster")
  val host = args(0)
  val port = args(1)
  val name = args(2)

  // 创建 config 对象:指定协议类型、监听的IP和端口
  private val config = ConfigFactory.parseString(
    s"""
       |akka.actor.provider="akka.remote.RemoteActorRefProvider"
       |akka.remote.netty.tcp.hostname=$host
       |akka.remote.netty.tcp.port=$port
       """.stripMargin)

  private val serverActorSystem = ActorSystem("sparkMasterServer", config)

  private val sparkMasterActorRef: ActorRef = serverActorSystem.actorOf(Props[SparkMaster], name)

  sparkMasterActorRef ! "start"

}

c. SparkWorker

class SparkWorker(val masterName: String, val masterHost: String, val masterPort: Int) extends Actor {

  private var masterActorRef: ActorSelection = _

  val id = UUID.randomUUID().toString

  override def preStart() = {
    masterActorRef = context.actorSelection(s"akka.tcp://sparkMasterServer@$masterHost:$masterPort/user/$masterName")
  }

  override def receive: Receive = {
    case "start" =>
      println("SparkWorker start...")
      masterActorRef ! RegisterWorkerInfo(id, 16, 64 * 1024)
    case RegisteredWorkerInfo =>
      println(s"workerId=$id registered successfully!")
      import context.dispatcher
      // 1. 0 millis 不延时,立即执行定时器
      // 2. 3000 millis 表示每隔3s执行一次
      // 3. self 表示发给自己
      // 4. SendHeartbeat 发送的内容
      context.system.scheduler.schedule(0 millis, 3000 millis, self, SendHeartbeat)
    case SendHeartbeat =>
      println(s"workerId=$id sends heartbeat to master...")
      masterActorRef ! Heartbeat(id)
  }
}

object WorkerApp extends App {
  private val (workerName, workerHost, workerPort, masterName, masterHost, masterPort) = ("sparkWorker-01", "127.0.0.1", 10001, "sparkMaster", "127.0.0.1", 10008)

  val config = ConfigFactory.parseString(
    s"""
       |akka.actor.provider="akka.remote.RemoteActorRefProvider"
       |akka.remote.netty.tcp.hostname=$workerHost
       |akka.remote.netty.tcp.port=$workerPort
     """.stripMargin)
  private val workerActorSystem = ActorSystem("sparkWorkerServer", config)

  private val workerActorRef: ActorRef = workerActorSystem.actorOf(Props(new SparkWorker(masterName, masterHost, masterPort)), workerName)

  workerActorRef ! "start"

}