美文网首页Akka
Akka 2.5.12 Using TCP

Akka 2.5.12 Using TCP

作者: mango_knight | 来源:发表于2018-04-17 16:10 被阅读0次

    本部分内容都需要引用:

        import akka.actor.{ Actor, ActorRef, Props }
        import akka.io.{ IO, Tcp }
        import akka.util.ByteString
        import java.net.InetSocketAddress
    

    所有的 Akka I/O API都需要通过manager访问,当使用 I/O API时,第一步就是对manager的引用。

        import akka.io.{ IO, Tcp }
        import context.system // implicitly used by IO(Tcp)
    
        val manager = IO(Tcp)
    

    manager是处理底层I/O资源(选择器、通道)并为特定任务(如监听传入连接)实例化worker的actor。

    连接(client side)

        object Client {
          def props(remote: InetSocketAddress, replies: ActorRef) =
            Props(classOf[Client], remote, replies)
        }
    
        class Client(remote: InetSocketAddress, listener: ActorRef) extends Actor {
    
          import Tcp._
          import context.system
          //连接远程的第一步给TCP manag发送Connect
          IO(Tcp) ! Connect(remote)
    
          def receive = {
            //Tcp manager会返回 两种消息
            case CommandFailed(_: Connect) ⇒
              listener ! "connect failed"
              context stop self
    
            case c @ Connected(remote, local) ⇒
              listener ! c
              //激活连接,给connection actor 发送注册消息。来通知谁该接收socket数据
              val connection = sender()
              connection ! Register(self)
              context become {
                case data: ByteString ⇒
                  connection ! Write(data)
                case CommandFailed(w: Write) ⇒
                  // O/S buffer was full
                  listener ! "write failed"
                case Received(data) ⇒
                  listener ! data
                case "close" ⇒
                  connection ! Close
                case _: ConnectionClosed ⇒
                  listener ! "connection closed"
                  context stop self
              }
          }
        }
    

    连接远程地址的第一步就是给TCP manager 发送一个Connect消息。
    然后TCP manager 会回复一个 CommandFailed消息或者产生一个内部actor来表示新的connection 。这个 connection actor会发送一个Connected消息给Client。
    为了激活新连接,需要发送给 connection actor 注册消息来通知谁该接收socket中的数据。并且要等注册消息传递过程完成以后连接才可以使用。而且这段过程如果超时connection actor 会自己终止。
    connection actor一直监视注册处理程序,如果它停止, connection actor会关闭连接,清理所有的连接相关的资源。
    client 使用become 方法演示从未连接到连接状态中观察到的命令和事件。

    接受连接(server side)

          class Server extends Actor {
    
          import Tcp._
          import context.system
    
          IO(Tcp) ! Bind(self, new InetSocketAddress("localhost", 0))
    
          def receive = {
            case b @ Bound(localAddress) ⇒
              context.parent ! b
    
            case CommandFailed(_: Bind) ⇒ context stop self
    
            case c @ Connected(remote, local) ⇒
              val handler = context.actorOf(Props[SimplisticHandler])
              val connection = sender()
              connection ! Register(handler)
          }
    
        }
    

    要创建TCP服务器并监听入站连接,必须将Bind命令发送给TCP manager。TCP manager监听特定InetSocketAddress上的TCP连接,端口为0表示连接到随机端口。

    发送bind消息的actor将收到bound消息,表明服务器已经准备好接受传入的连接。bound消息里包含InetSocketAddress(解析IP地址和端口)
    处理连接内容 与外部连接过程一样。将需要读取的内容交给一个处理程序(SimplisticHandler)完成操作。

        class SimplisticHandler extends Actor {
          import Tcp._
          def receive = {
            case Received(data) ⇒ sender() ! Write(data)
            case PeerClosed     ⇒ context stop self
          }
        }
    

    关闭连接

    CloseConfirmedCloseAbort 这是三个命令发送给connection actor 都可以关闭连接。

    • Close是通过 FIN消息关闭连接,不用等待远程端点确认消息。listener会收到Closed
    • ConfirmedClose也是通过 FIN消息关闭连接,但是发送FIN消息后仍然持续运行,知道收到远程的确认消息。listener会收到ConfirmedClosed
    • Abort将通过向远程端点发送RST消息立即终止连接。listener会收到Aborted

    如果远程端点关闭连接,会发送PeerClosed给listener。为了支持半关闭连接,将注册消息的keepOpenOnPeerClosed成员设置为true,在这种情况下,连接保持打开状态,直到接收到上述关闭命令之一为止。

    当发生错误导致连接被关闭时,ErrorClosed将被发送给侦听器。

    Write to a connection

    一旦连接建立,任何actor通过Tcp.WriteCommand形式发送数据,Tcp.WriteCommand只是个抽象类,有三种具体的实现方法。

    • Tcp.Write

      最简单的WriteCommand实现,它封装了一个ByteString实例和一个“ack”事件。
    • Tcp.WriteFile

      高效地发送文件中的“原始”数据。
    • Tcp.CompoundWrite

      将几个Tcp.Write和(或)Tcp.WriteFile放进一个命令中。优点如下:
      1.TCP连接actor一次只能处理一个write命令。通过将几个Write放到一个CompoundWrite中,可以让它们在最小开销的情况下通过连接发送,而无需通过基于ack-base的消息协议将它们发送给连接actor。
      2.因为WriteCommand是原子写入,所以当结合进CompoundWrite后,其他actor不能注入其他写入。
      3.因为CompoundWrite的子写入是Write和WriteFile,它们可以请求Ack消息。当这些子写入完成后发送Ack。通过connection actor在任意点发送ack可以看到CompoundWrite的过程进展。

    读写节流

    Tcp connection actor 的基本模型是没有内部缓冲器的,因为一次只能处理一个写入。无论写入还是读取都需要在用户级别控制拥塞。

    关于背压 写入有三种操作模式:
    1. ACK-based:每个写命令都带有一个任意对象,如果这个对象不是Tcp.NoAck。在成功地将所有包含的数据写入套接字后,它将被返回给发送方。如果在收到此确认之前没有其他的写操作,那么由于缓冲区溢出而不会发生故障。
    2.NACK-based:当下个写入到达,但上个写入还未完成时,将回复一个包含失败写入的CommandFailed消息。

    相关文章

      网友评论

        本文标题:Akka 2.5.12 Using TCP

        本文链接:https://www.haomeiwen.com/subject/ffiukftx.html