美文网首页Spark学习之路spark
java spark sql操作cassandar

java spark sql操作cassandar

作者: 我发际线很高 | 来源:发表于2017-12-18 18:20 被阅读96次

    前期准备:

    cassandra集群(可以参考网站 https://cassandrazh.github.io/

    spark集群(可以参考我的文章 http://www.jianshu.com/p/756209fa7078


    1、spark中配置cassandar相应的jar包

    不配置会报如下异常:ClassNotFoundException:com.datastax.spark.connector.rdd.partitioner.CassandraPartition

    怎么配置:

     $SPARK_HOME/bin/spark-shell --packages datastax:spark-cassandra-connector:1.6.0-s_2.10

    1.6.0指的是spark的版本,s_2.10是scala的版本。

    具体版本对应可以参看这个网站:https://spark-packages.org/package/datastax/spark-cassandra-connector

    如果用spark-shell命令提交不成功,可以直接在SPARK的spark-env.sh配置中加入SPARK_CLASSPATH

    export SPARK_CLASSPATH=$SPARK_CLASSPATH:/opt/spark/spark-cassandra-lib/datastax_spark-cassandra-connector-1.6.0-s_2.10.jar:/opt/spark/spark-cassandra-lib/com.datastax.cassandra_cassandra-driver-core-3.0.0.jar:/opt/spark/spark-cassandra-lib/com.google.guava_guava-16.0.1.jar:/opt/spark/spark-cassandra-lib/com.twitter_jsr166e-1.1.0.jar

    记得集群中的每台spark都要加,具体版本具体下载。

    2、Eclipse新建一个maven项目

    引入相关包(这里引入的spark包版本一定要对应上集群环境安装的spark版本,否则序列化和反序列会失败):

    分别引入:spark-core,spark-sql,spark-cassandra-connector

    groupId:org.apache.spark

    artifactId:spark-core_2.10

    version:1.6.0

    groupId:org.apache.spark

    artifactId:spark-sql_2.10

    version:1.6.0

    groupId:com.datastax.spark

    artifactId:spark-cassandra-connector_2.10

    version:1.6.0

    如果出现因为google guava造成的错误,可以在引用上面3个包的时候去掉google guava,然后单独引用一个版本.

    版本不一样的错误异常:

    Caused by: java.lang.RuntimeException: java.io.InvalidClassException: org.apache.spark.rpc.RpcEndpointRef; local class incompatible: stream classdesc serialVersionUID = -1223633663228316618, local class serialVersionUID = 18257903091306170

    3、新建一个测试类SparkCassandraConnector(基于spark1.6,注意spark2.x开始不一样,后文有提到

    public class SparkCassandraConnector {

        private static final String APP_NAME = "SPARK_CASSANDRA_TESTI"; //这里随便写

        private static final String MASTER = "spark://172.16.101.60:7077"; //spark集群地址

        private static final String HOST = "172.16.101.60"; //cassandra安装主机地址

        private static SparkConf conf = null;

        private static SparkContext sparkContext = null;

        static {

            conf = new SparkConf();

            conf.setAppName(APP_NAME);

            conf.setMaster(MASTER);

            conf.set("spark.cassandra.connection.host", HOST);

            sparkContext = new SparkContext(conf);  // 初始化一个sparkContext

            /* 这里要注意几个地方:1、1.6用的是sparkcontext,2.x开始用sparksession。

                                                   2、spark集群中每次只能有一个sparkcontext,当然可以通过配置修改

                                                   3、基于以上,所以这里初始化一个sparkcontext,并且不关闭它(缺点也显而易见)

               */

        }

        public static void main(String[] args) {

            CassandraSQLContext csc = new CassandraSQLContext(sparkContext);

            csc.setKeyspace("bi"); // 设置cassandra的keyspace ,keyspace有点类似数据库的意思

            csc.sql("select * from test").show(); // 这里可以写任何spark支持的sql语句

       }

    }


    spark2开始spark开始使用spark session,而且datasetx将spark-cassandra-connector中的CassandraSQLContext类移除了:

    先得到spark session:

    SparkSession spark = SparkSession.builder().appName(APP_NAME).config("spark.cassandra.connection.host",HOST).master(MASTER).getOrCreate();

    然后将spark的配置文件传递给CassandraConnector:

    CassandraConnector connector = CassandraConnector.apply(spark.sparkContext().conf());

    这里可以直接通过CassandraConnector这个对象去对cassandra建表,删除表:

    Sessionsession=connector.openSession();

    session.execute("CREATE TABLE mykeyspace.people(id UUID PRIMARY KEY, username TEXT, email TEXT)");

    通过spark得到spark的dataset:

    Dataset dataset = spark.read().format("org.apache.spark.sql.cassandra").options(new HashMap() {

    {

    put("keyspace", "bi"); // cassandra keyspace

    put("table", "people"); // cassandra表名

    }

    }).load();

    Dataset dataset2 = spark.read().format("org.apache.spark.sql.cassandra").options(new HashMap() {

    {

    put("keyspace", "bi");

    put("table", "people2");

    }

    }).load();

    上面构造了两个dataset,分别是dataset和dataset2,我假设这2个dataset表结构一样,现在对他做左连接操作:

    dataset.createOrReplaceTempView("usertable");

    dataset2.createOrReplaceTempView("usertable2");

    Dataset dataset3 = spark.sql("select * from usertable left join usertable2 on usertable.id = usertable2.id");

    dataset3.show();


    不用spark集群,用local模式:

    spark是可以不用安装集群,用local模式来写出demo的。

    只要将上面的MASTER参数改为local就行了。

    但是spark2改为local模式的时候要注意,要在配置中设置:config("spark.sql.warehouse.dir", "file:///D://333");

    相关文章

      网友评论

        本文标题:java spark sql操作cassandar

        本文链接:https://www.haomeiwen.com/subject/gnmdixtx.html