美文网首页Spark 应用Java学习笔记spark
利用Akka获取Spark任务的返回结果

利用Akka获取Spark任务的返回结果

作者: SamHxm | 来源:发表于2017-06-01 15:00 被阅读301次

    通过spark-submit提交的任务都需要指定Main类作为程序的入口,Main类执行结束即Spark任务终结。如果需要通过外部程序实时向Spark任务提交数据并获取结果又该如何呢?

    思路很简单,让Spark任务的Main方法不终止,外部程序与Spark任务进行通信,交互数据。

    通信方式很多,比如Socket,netty或者内置Tomcat,Jetty等,不过考虑编码的快捷,通过Akka是比较不错的选择。

    开发分为2部分。1.编写Spark任务,该部分会提交到Spark集群中。2.外部调用代码,该部分模拟客户端代码。2者食用Akka Actor进行通信。

    先看Spark任务部分

    SparkConfig 定义SparkContext对象

    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
    
    object SparkConfig {
      val conf = new SparkConf().setAppName("testSpark")
      val sc = new SparkContext(conf)
    }
    

    DataService 作为调用Spark RDD操作的业务类。

    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
    import com.sam.spark.demo.data.config.SparkConfig
    import scala.collection.mutable.ArrayBuffer
    
    
    class DataService {
    
      def handler(list: ArrayBuffer[String]) : String = {
        val array = SparkConfig.sc.parallelize(list).max()
        array
      }
    }
    

    Worker Akka的Actor对象,接收外部入参,调用DataService对象,并返回结果

    import akka.actor.Actor
    import org.slf4j.LoggerFactory
    import com.sam.spark.demo.data.service.DataService
    import com.sam.spark.demo.akka.msg.TextMessage
    import java.util.UUID
    import scala.collection.mutable.ArrayBuffer
    
    class Worker extends Actor {
      
      val dataService = new DataService()
      
      def receive = {
        case x: ArrayBuffer[String] => {
          val tm = new TextMessage()
          tm.msg = dataService.handler(x)
          sender ! tm
        }
      }
    }
    

    TextMessage 作为返回的消息对象

    class TextMessage extends Serializable {
      
      var msg : String = null
    }
    

    AkkaConfig Actor配置类,创建Worker对象

    import akka.actor.ActorSystem
    import akka.actor.Props
    import com.typesafe.config.ConfigFactory
    
    object AkkaConfig {
    
      val system = ActorSystem("ReactiveEnterprise",ConfigFactory.load().getConfig("serverSystem"))
    
      val workerRef = system.actorOf(Props[Worker], "worker")
    }
    

    程序入口类

    import com.sam.spark.demo.akka.AkkaConfig
    import com.sam.spark.demo.data.config.SparkConfig
    import scala.concurrent.duration.Duration
    import scala.concurrent.Await
    import java.util.concurrent.TimeUnit
    
    object AppStart {
      def main(args: Array[String]): Unit = {
        SparkConfig
        AkkaConfig
      }
    }
    

    pom.xml

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>1.6.3</version>
            <scope>provided</scope>
        </dependency>
    
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.10.5</version>
            <scope>provided</scope>
        </dependency>
    
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
    
        <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.0</version>
                <configuration>
                    <source>1.7</source>
                    <target>1.7</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer
                                    implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>Main类名</mainClass>
                                </transformer>
                                <transformer
                                    implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>reference.conf</resource>
                                </transformer>
                            </transformers>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
    

    Akka配置文件

    serverSystem {
        akka {
            actor {
                provider = "akka.remote.RemoteActorRefProvider"
                default-dispatcher {
                    throughput = 2
                }
                
                serializers {
                    java = "akka.serialization.JavaSerializer"
                }
                
                serialization-bindings {
                    "需要序列化的消息类名" = java
                }
            }
            remote { 
                enabled-transports = ["akka.remote.netty.tcp"] 
                netty.tcp { 
                    hostname = "Akka Remote服务地址" 
                    port = Akka Remote端口
                } 
            }
        }
    }
    

    打包

    mvn clean scala:compile package -DskipTests=true
    

    发布到Spark集群

    ./spark-1.6.3-bin-hadoop2.6/bin/spark-submit --class Main类名 --master spark://Spark Master地址 ./spark.demo-0.0.1-SNAPSHOT.jar
    
    再看客户端实现

    本地Actor 获取远程ActorRef并发送消息

    import akka.actor.Actor
    import akka.actor.ActorSelection
    import com.sam.spark.demo.akka.msg.TextMessage
    import scala.collection.mutable.ArrayBuffer
    
    class Client extends Actor {
      
      var remoteActor : ActorSelection = context.actorSelection("akka.tcp://ReactiveEnterprise@10.16.64.146:2555/user/processManagers/worker")
    
      override def receive: Receive = {
        case msg: ArrayBuffer[String] => {
          remoteActor ! msg
        }
        case msg: TextMessage => {
          println(msg.msg)
        }
      }
    }
    

    本地Main类 模拟向远端Actor发送消息

    import akka.actor.ActorSystem
    import com.typesafe.config.ConfigFactory
    import akka.actor.Props
    import akka.pattern.Patterns
    import scala.concurrent.duration.Duration
    import scala.concurrent.Await
    import akka.util.Timeout
    import java.util.concurrent.TimeUnit
    import java.util.UUID
    import scala.collection.mutable.ArrayBuffer
    
    object ClientStart {
      def main(args: Array[String]): Unit = {
    
        val serverSystem = ActorSystem("clientSystem", ConfigFactory.load().getConfig("clientSystem"))
        val clientRef = serverSystem.actorOf(Props[Client], "client")
        
    
        while (true) {
          var list = new ArrayBuffer[String]
          for (i <- 1 to 100) {
            list += UUID.randomUUID().toString()
          }
          clientRef ! list
          Thread.sleep(500)
        }
    
        //    val future = Patterns.ask(clientRef, "world", Timeout.apply(10L, TimeUnit.SECONDS));
        //    val result = Await.result(future, Duration.create(10, TimeUnit.SECONDS));
        //    println(result)
      }
    }
    

    Akka配置文件

    clientSystem {
        akka {
            actor {
                provider = "akka.remote.RemoteActorRefProvider"
                default-dispatcher {
                    throughput = 2
                }
                
                serializers {
                    java = "akka.serialization.JavaSerializer"
                }
                
                serialization-bindings {
                    "需要序列化的消息类名" = java
                }   
            }
        }
    }
    

    相关文章

      网友评论

      • 5e21eb17bc1e:请教一下,如何让运行中spark streaming executor 接受命令,动态执行一些加载资源的action?

      本文标题:利用Akka获取Spark任务的返回结果

      本文链接:https://www.haomeiwen.com/subject/likefxtx.html