1、spark的几个网站
image.png2、spark的原码下载和编译
我们将spark安装到节点2的机器上去
我们下载的spark版本为2.2.0,到官网下载。
http://spark.apache.org/downloads.html
编译参考网站
http://spark.apache.org/docs/2.2.0/building-spark.html
在编译的时候,需要注意官网的文档显示
Building Spark using Maven requires Maven 3.3.9 or newer and Java 8+
编译的3种方法
1111.jpg
make-distribution.sh既可以编译和又可以打包。
下载jdk8
2323.jpg下载地址
http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html
修改环境变量/etc/profile
# JAVA_HOME
export JAVA_HOME=/opt/modules/jdk1.8.0_162
export PATH=$PATH:$JAVA_HOME/bin
重新加载profile
source /etc/profile
下载maven
222222.jpg在/etc/profile 文件中加入下面一段
# MAVEN_HOME
export MAVEN_HOME=/opt/modules/maven-3.3.9
export PATH=$PATH:$MAVEN_HOME/bin
export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=1024M -XX:ReservedCodeCacheSize=1024M"
重新加载profile文件:source /etc/profile
测试:mvn -v
添加镜像
image.png编辑make-distribution.sh
image.pngVERSION=2.20
SCALA_VERSION=2.11.8
SPARK_HADOOP_VERSION=2.5.0
SPARK_HIVE=1
将网络设置为外网连接
执行编译命令
参考地址
http://spark.apache.org/docs/2.2.0/building-spark.html
./dev/make-distribution.sh --name custom-spark --tgz -Phadoop-2.5 -Phive -Phive-thriftserver -Pyarn
编译成功后的样子
image.png
在spark的解压包目录下,会多一个文件spark-2.20-bin-custom-spark.tgz
这个文件是我们最终需要压缩包。
将此spark-2.20-bin-custom-spark.tgz 解压到moudles目录下
3、scala的安装,环境设置
scala的版本选择和spark的版本息息相关,版本如何选择,可以参考spark官网的推荐。spark2.2.0的推荐版本为scala2.11.X。我们选择2.11.8
image.pngscala的下载地址http://www.scala-lang.org/download/2.11.8.html
下载解压到moudles
环境变量的配置:vim /etc/profile
#SCALA_HOME
export SCALA_HOME=/opt/moudles/...
export PATH=$PATH:$SCALA_HOME/bin
4、简单使用spark
启动
/opt/modules/spark-2.20-bin-custom
bin/spark-shell
若抛出下面这个异常,则说明你的网络模式没有改过来,还处于联网状态
java.net.BindException: Cannot assign requested address: Service 'sparkDriver' failed after 16 retries (on a random free port)! Consider explicitly setting the appropriate binding address for the service 'sparkDriver' (for example spark.driver.bindAddress for SparkDriver) to the correct binding address.
at sun.nio.ch.Net.bind0(Native Method)
at sun.nio.ch.Net.bind(Net.java:433)
at sun.nio.ch.Net.bind(Net.java:425)
at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223)
at io.netty.channel.socket.nio.NioServerSocketChannel.doBind(NioServerSocketChannel.java:127)
at io.netty.channel.AbstractChannel$AbstractUnsafe.bind(AbstractChannel.java:501)
at io.netty.channel.DefaultChannelPipeline$HeadContext.bind(DefaultChannelPipeline.java:1218)
at io.netty.channel.AbstractChannelHandlerContext.invokeBind(AbstractChannelHandlerContext.java:496)
at io.netty.channel.AbstractChannelHandlerContext.bind(AbstractChannelHandlerContext.java:481)
at io.netty.channel.DefaultChannelPipeline.bind(DefaultChannelPipeline.java:965)
at io.netty.channel.AbstractChannel.bind(AbstractChannel.java:210)
at io.netty.bootstrap.AbstractBootstrap$2.run(AbstractBootstrap.java:353)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:399)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:446)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:748)
java.net.BindException: Cannot assign requested address: Service 'sparkDriver' failed after 16 retries (on a random free port)! Consider explicitly setting the appropriate binding address for the service 'sparkDriver' (for example spark.driver.bindAddress for SparkDriver) to the correct binding address.
at sun.nio.ch.Net.bind0(Native Method)
at sun.nio.ch.Net.bind(Net.java:433)
at sun.nio.ch.Net.bind(Net.java:425)
at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223)
at io.netty.channel.socket.nio.NioServerSocketChannel.doBind(NioServerSocketChannel.java:127)
at io.netty.channel.AbstractChannel$AbstractUnsafe.bind(AbstractChannel.java:501)
at io.netty.channel.DefaultChannelPipeline$HeadContext.bind(DefaultChannelPipeline.java:1218)
at io.netty.channel.AbstractChannelHandlerContext.invokeBind(AbstractChannelHandlerContext.java:496)
at io.netty.channel.AbstractChannelHandlerContext.bind(AbstractChannelHandlerContext.java:481)
at io.netty.channel.DefaultChannelPipeline.bind(DefaultChannelPipeline.java:965)
at io.netty.channel.AbstractChannel.bind(AbstractChannel.java:210)
at io.netty.bootstrap.AbstractBootstrap$2.run(AbstractBootstrap.java:353)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:399)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:446)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:748)
<console>:14: error: not found: value spark
启动成功,控制台会打印
Spark context Web UI available at http://192.168.199.152:4040
Spark context available as 'sc' (master = local[*], app id = local-1519697608442).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.2.0
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_162)
Type in expressions to have them evaluated.
Type :help for more information.
其中sc、spark都是可以直接使用的对象。spark是sc的子类,具有更丰富的功能。
下面是使用spark对象来读取某个文件
#将文件变成一张表
scala> val textFile = spark.read.textFile("file:/opt/modules/spark-2.20-bin-custom/README.md")
textFile: org.apache.spark.sql.Dataset[String] = [value: string]
#读取文件的行数
scala> textFile.count()
res0: Long = 103
#读取文件的第一行
scala> textFile.first()
res1: String = # Apache Spark
#读取文件的前4行,并封装到数组中
scala> textFile.take(4)
res2: Array[String] = Array(# Apache Spark, "", Spark is a fast and general cluster computing system for Big Data. It provides, high-level APIs in Scala, Java, Python, and R, and an optimized engine that)
词频统计
1.在/opt/datas目录下,创建一个stu.txt文件,文件内容如下
java python scala
hadoop scala java
python javascript hue
hadoop java java
scala css sql
mysql oracle java
spark hue spark
2、用spark读取该文件
scala> val rdd = spark.read.textFile("file:///opt/datas/stu.txt")
rdd: org.apache.spark.sql.Dataset[String] = [value: string]
3、将文件内容切割编程字符串数组
scala> val lines = rdd.flatMap(x => x.split(" ")).collect
lines: Array[String] = Array(java, python, scala, hadoop, scala, java, python, javascript, hue, hadoop, java, java, scala, css, sql, mysql, oracle, java, spark, hue, spark)
4、将字符数组编程map
scala> val lines = rdd.flatMap(x => x.split(" ")).map(x => (x,1)).collect
lines: Array[(String, Int)] = Array((java,1), (python,1), (scala,1), (hadoop,1), (scala,1), (java,1), (python,1), (javascript,1), (hue,1), (hadoop,1), (java,1), (java,1), (scala,1), (css,1), (sql,1), (mysql,1), (oracle,1), (java,1), (spark,1), (hue,1), (spark,1))
5、按照出现频率进行统计
scala> val lines = rdd.flatMap(x => x.split(" ")).map(x => (x,1)).rdd.reduceByKey((a,b) => (a+b)).collect
lines: Array[(String, Int)] = Array((scala,3), (spark,2), (hadoop,2), (python,2), (mysql,1), (oracle,1), (css,1), (java,5), (hue,2), (javascript,1), (sql,1))
6、按照出现频率进行降序排序,则先把map的key/value变换位置,在按照key进行降序排序
scala> val lines = rdd.flatMap(x => x.split(" ")).map(x => (x,1)).rdd.reduceByKey((a,b) => (a+b)).map(x => (x._2,x._1)).collect
lines: Array[(Int, String)] = Array((3,scala), (2,spark), (2,hadoop), (2,python), (1,mysql), (1,oracle), (1,css), (5,java), (2,hue), (1,javascript), (1,sql))
scala> val lines = rdd.flatMap(x => x.split(" ")).map(x => (x,1)).rdd.reduceByKey((a,b) => (a+b)).map(x => (x._2,x._1)).sortByKey().collect
lines: Array[(Int, String)] = Array((1,mysql), (1,oracle), (1,css), (1,javascript), (1,sql), (2,spark), (2,hadoop), (2,python), (2,hue), (3,scala), (5,java))
7、再把key/value变换位置
scala> val lines = rdd.flatMap(x => x.split(" ")).map(x => (x,1)).rdd.reduceByKey((a,b) => (a+b)).map(x => (x._2,x._1)).sortByKey().map(x => (x._2,x._1)).collect
lines: Array[(String, Int)] = Array((mysql,1), (oracle,1), (css,1), (javascript,1), (sql,1), (spark,2), (hadoop,2), (python,2), (hue,2), (scala,3), (java,5))
网友评论