本人使用spark2.1.0, scala 2.11.0 hadoop 2.7.3。打开IDEA新建一个Maven工程,
去Spark官网查找maven对spark的配置,比如寻找kafka的配置,maven中的使用
Source Artifact
Kafka spark-streaming-kafka-0-8_2.11
Flume spark-streaming-flume_2.11
Kinesis spark-streaming-kinesis-asl_2.11 [Amazon Software License]
最后我们在pom.xml中的配置如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.kason.spark</groupId>
<artifactId>spark_platform_learn</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spark.version>2.1.0</spark.version>
<scala.version>2.11</scala.version>
<hadoop.version>2.7.3</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.39</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
<!-- maven官方 http://repo1.maven.org/maven2/ 或 http://repo2.maven.org/maven2/ (延迟低一些) -->
<repositories>
<repository>
<id>central</id>
<name>Maven Repository Switchboard</name>
<layout>default</layout>
<url>http://repo2.maven.org/maven2</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<!-- MAVEN 编译使用的JDK版本 -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>
SparkStreaming Demo code:
package com.scala.action.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Created by kason_zhang on 4/7/2017.
*/
object SparkStreaming {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[3]").setAppName("BasicStreamingExample")
val ssc = new StreamingContext(conf, Seconds(5))
val lines = ssc.socketTextStream("10.64.24.78" , 9999)
val words = lines.flatMap(_.split(" "))
val wc = words.map(x => (x, 1)).reduceByKey((x, y) => x + y)
wc.print
wc.saveAsTextFiles("D:\\work\\cloud\\log\\word.txt")
println("pandas: sscstart")
ssc.start()
println("pandas: awaittermination")
ssc.awaitTermination()
println("pandas: done!")
}
}
Centos 安装nc(netstat)
yum install nc
启动监听端口Server
nc -lk 9999
防火墙开启9999端口
[kason@kason Desktop]$ su
Password:
[root@kason Desktop]# /sbin/iptables -I INPUT -p tcp --dport 9999 -j ACCEPT
[root@kason Desktop]# /etc/init.d/iptables save
iptables: Saving firewall rules to /etc/sysconfig/iptables:[ OK ]
[root@kason Desktop]# service iptables restart
iptables: Setting chains to policy ACCEPT: filter [ OK ]
iptables: Flushing firewall rules: [ OK ]
iptables: Unloading modules: [ OK ]
iptables: Applying firewall rules: [ OK ]
[root@kason Desktop]# vi /etc/init.d/iptables
[root@kason Desktop]# /etc/init.d/iptables status
之后Linux下执行nc -lk 9999然后就可以在下面输入要发送的数据
[kason@kason Desktop]$ nc -lk 9999
hell owoo
goo ndate
better best
goo ndate
vi /e
hello world
hello world out
hell o
kkl portal
hell o kkl
IDEA的输出结果如下:
-------------------------------------------
Time: 1491875390000 ms
-------------------------------------------
(o,1)
(kkl,1)
(hell,1)
因此使用IDEA开发一个简单的demo就完成了。
网友评论