本文基于Spark搭建 之 单机模式
目录
并行集合
/opt/services/spark/bin/spark-shell
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
distData.count()
// Long = 5
文件系统
Local
/opt/services/spark/bin/spark-shell
val textFile = sc.textFile("/opt/services/spark/README.md")
textFile.count()
// Long = 104
HDFS
./bin/hdfs dfs -copyFromLocal /opt/services/spark/README.md /
./bin/hdfs dfs -ls /
/opt/services/spark/bin/spark-shell
val textFile = sc.textFile("hdfs://localhost:9000/README.md")
textFile.count()
// Long = 104
HDFS搭建参考HBase搭建 之 伪分布模式
数据库服务
MySQL
docker run --name spark-mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d mysql:5.7.17
docker exec -it spark-mysql /bin/bash
mysql -uroot -p123456
CREATE DATABASE IF NOT EXISTS db_spark DEFAULT CHARSET utf8 COLLATE utf8_general_ci;
USE db_spark;
CREATE TABLE users ( \
id int(10) unsigned NOT NULL AUTO_INCREMENT, \
name varchar(20) DEFAULT NULL COMMENT '用户名', \
PRIMARY KEY (`id`) \
);
INSERT INTO users VALUES (1, 'XiaoWang');
INSERT INTO users VALUES (2, 'XiaoMing');
# cd /opt/services
# wget https://mirror.tuna.tsinghua.edu.cn/mysql/downloads/Connector-J/mysql-connector-java-5.1.49.tar.gz
# tar xf mysql-connector-java-5.1.49.tar.gz
/opt/services/spark/bin/spark-shell --jars /opt/services/mysql-connector-java-5.1.49/mysql-connector-java-5.1.49-bin.jar
import org.apache.spark.rdd.JdbcRDD
import java.sql.DriverManager
val rdd = new JdbcRDD(sc, () => {
Class.forName("com.mysql.jdbc.Driver").newInstance()
DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306/db_spark?useSSL=false&characterEncoding=utf8", "root", "123456")
},
"SELECT * FROM users WHERE ? <= id and id <= ?",
1,
2,
2,
rs => rs.getString(2)
)
rdd.foreach(println)
# XiaoWang
# XiaoMing
Mongo
docker run --name spark-mongo -p 27017-27019:27017-27019 -d mongo:4.0.4
docker exec -it spark-mongo /bin/bash
mongo
use db_spark;
db.createCollection("users");
db.users.insert({ name : "XiaoZhang" });
db.users.insert({ name : "XiaoHua" });
/opt/services/spark/bin/spark-shell --conf "spark.mongodb.input.uri=mongodb://127.0.0.1:27017/db_spark.users" --packages org.mongodb.spark:mongo-spark-connector_2.11:2.4.2
详细参考本地运行模式搭建
import com.mongodb.spark._
val rdd = MongoSpark.load(sc)
rdd.foreach(println)
# Document{{_id=60402f30181717e6500d2c11, name=XiaoZhang}}
# Document{{_id=60402f3e181717e6500d2c12, name=XiaoHua}}
Elasticsearch
wget https://mirrors.huaweicloud.com/elasticsearch/7.4.0/elasticsearch-7.4.0-darwin-x86_64.tar.gz
tar xf elasticsearch-7.4.0-darwin-x86_64.tar.gz
mv elasticsearch-7.4.0 es
/opt/services/es/bin/elasticsearch
curl localhost:9200
curl -X PUT -H "Content-Type: application/json" localhost:9200/idx_spark/users/1 -d '{ "name" : "XiaoNiu" }'
curl -X PUT -H "Content-Type: application/json" localhost:9200/idx_spark/users/2 -d '{ "name" : "XiaoMa" }'
curl localhost:9200/idx_spark/users/_search | json
/opt/services/spark/bin/spark-shell --packages org.elasticsearch/elasticsearch-spark-20_2.11:7.4.0
import org.elasticsearch.spark._
val rdd = sc.esRDD("idx_spark/users", "")
rdd.first
# (String, scala.collection.Map[String,AnyRef]) = (1,Map(name -> XiaoNiu))
rdd.count
# Long = 2
HBase
wget http://archive.apache.org/dist/hbase/2.0.0/hbase-2.0.0-bin.tar.gz
tar xf hbase-2.0.0-bin.tar.gz
mv hbase-2.0.0 hbase
/opt/services/hbase/bin/start-hbase.sh
详细参考HBase搭建 之 单机模式
/opt/services/hbase/bin/hbase shell
create 'users', 'cf'
put 'users', 'row1', 'cf:name', 'XiaoLi'
put 'users', 'row2', 'cf:name', 'XiaoLin'
scan 'users'
/opt/services/spark/bin/spark-shell --packages com.hortonworks.shc:shc-core:1.1.0.3.1.2.2-1 --repositories http://repo.hortonworks.com/content/groups/public/
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set("hbase.zookeeper.quorum", "localhost")
hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")
hbaseConf.set(TableInputFormat.INPUT_TABLE, "users")
val hBaseRDD = sc.newAPIHadoopRDD(hbaseConf,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result])
hBaseRDD.foreach{ case (_ ,result) =>
val key = Bytes.toString(result.getRow)
val name = Bytes.toString(result.getValue("cf".getBytes, "name".getBytes))
println("Row key:" + key + "\tcf.Name:" + name)
}
# Row key:row1 cf.Name:XiaoLi
# Row key:row2 cf.Name:XiaoLin
网友评论