Spring boot暴力集成Scala Akka

作者: SamHxm | 来源:发表于2017-04-26 19:10 被阅读0次

    在<a href="http://www.jianshu.com/p/f786b6ff91c7">之前的文章</a>中介绍了spring boot简化集成Akka,最近通过在项目中的实践,又有的新的想法。

    首先定义Akka配置类

    import akka.actor.ActorSystem
    import akka.actor.Props
    import org.springframework.context.annotation.Bean
    import org.springframework.stereotype.Component
    import com.sam.demo.akka.scala.facade.ProcessManagers
    
    @Component
    class AkkaConfig {
    
      val system = ActorSystem("ReactiveEnterprise")
    
      val processManagersRef = system.actorOf(Props[ProcessManagers].withDispatcher("my-thread-pool-dispatcher"), "processManagers")
      
      @Bean
      def processManagers = {
        processManagersRef
      }
      
      @Bean
      def worker = {
        system.actorSelection("/user/processManagers/worker")
      }
    
    }
    

    在配置类中,首先定义了ActorRef对象processManagersRef,作用是作为其他业务相关Actor的监控者,避免在akka user节点下创建过多的Actor。 注意worker,是通过路径查询获取。worker的创建放在processManagers中。当AkkaConfig实例化时,processManagersRef随之实例化。

    ProcessManagers如下:

    import org.slf4j.LoggerFactory
    import akka.actor.Actor
    import akka.actor.Props
    import com.sam.demo.akka.scala.Worker
    
    
    class ProcessManagers extends Actor {
      
      val logger = LoggerFactory.getLogger(getClass)
    
      val worker = context.actorOf(Props[Worker].withDispatcher("my-thread-pool-dispatcher"), "worker")
      
      def receive = {
        case x: Any =>
      }
    
    }
    

    该类无任务业务方法,其作用仅仅是创建了另一个Actor(worker),将来可扩展ProcessManagers作为其他业务Actor的监控者。

    在该类实例化过程中,同时也创建了worker的ActorRef

    Worker定义如下:

    import akka.actor.Actor
    import org.slf4j.LoggerFactory
    import com.sam.demo.BeanFactory
    import com.sam.demo.service.DomainService
    
    class Worker extends Actor {
      
      val logger = LoggerFactory.getLogger(getClass)
      
      lazy val domainService = BeanFactory.buildFactory().getBean(classOf[DomainService])
      
      def receive = {
        case x: String => {
          logger.info("x:{}", x)
          val textMsg = new TextMessage(domainService.toUpper(x))
          sender ! textMsg
        }
      }
    }
    

    通过BeanFactory获得spring IOC容器中的业务Bean,从而实现业务方法的调用。

    关键点来了,如何向Actor发消息? 给哪个Actor发消息? 之前的做法是发消息给ProcessManagers,ProcessManagers透传消息给Worker,之所以这样做,是因为无法将Worker的ActorRef引用注入到Spring IOC容器中的Bean中。

    现在,在AkkaConfig中已经将Worker的引用(注意: 通过system.actorSelection的返回类型是ActorSelection,不是ActorRef)暴露为一个Spring Bean,是需要将Worker的引用注入给调用方即可。

    调用方如下:

    import java.util.UUID;
    import java.util.concurrent.TimeUnit;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import com.sam.demo.akka.scala.TextMessage;
    
    import akka.actor.ActorSelection;
    import akka.pattern.Patterns;
    import akka.util.Timeout;
    import scala.concurrent.Await;
    import scala.concurrent.Future;
    import scala.concurrent.duration.Duration;
    
    @RestController
    @RequestMapping(value={"/users"})
    public class UserController {
        
        @Autowired
        private ActorSelection worker;
        
        @SuppressWarnings({"rawtypes","unchecked"})
        @RequestMapping(value={"/find"})
        public String find(String id) throws Exception {
    
            String uuid = UUID.randomUUID().toString();
            Future future = Patterns.ask(worker, uuid, Timeout.apply(10L, TimeUnit.SECONDS));
            TextMessage o = (TextMessage)Await.result(future, Duration.create(10, TimeUnit.SECONDS));
            return o.msg();
        }
    }
    

    现在实现了调用方直接将消息发给业务Actor,由原来2次消息发送变成了1次。

    代码其他部分保持不变,其他细节参考<a href="http://www.jianshu.com/p/f786b6ff91c7">Spring非常规集成Akka</a>

    相关文章

      网友评论

        本文标题:Spring boot暴力集成Scala Akka

        本文链接:https://www.haomeiwen.com/subject/sorwzttx.html