How to Use Incremental Result Co

作者: Kent_Yao | 来源:发表于2018-05-25 16:14 被阅读136次

    Kyuubi

    1. Introductions

    Kyuubi is an enhanced edition of the Apache Spark's primordial Thrift JDBC/ODBC Server. It is mainly designed for directly running SQL towards a cluster with all components including HDFS, YARN, Hive MetaStore, and itself secured.
    Recently, Kyuubi support incrementally receive partial result data from executor side. The main purpose of this feature is to reduce OutOfMemoryError risks of the Kyuubi Server itself. Kyuubi itself will somehow much more risky than Spark Thrift Server for the reason of multi SparkContext support, so it is important to add this function.

    2. Configurations

    Name Default Description
    spark.kyuubi.operation.incremental.collect false Whether to use incremental result collection from Spark executor side to Kyuubi server side.

    As is shown in the above table, there is only one configuration to enable this feature which is disabled by default.

    3. How to Configue

    3.1 Server level scope

    spark.kyuubi.operation.incremental.collect is a Kyuubi type configuration which can simply treated as a Spark one see the differences, which means that

    1. it can be set via --conf spark.kyuubi.operation.incremental.collect=true with $KYUUBI_HOME/bin/start-kyuubi.sh script.
    $KYUUBI_HOME/bin/start-kyuubi.sh \
        --master yarn \
        --deploy-mode client \
        --driver-memory 10g \
        --conf spark.kyuubi.operation.incremental.collect=true
    
    1. it also can be put in the properties file(spark-defaults.conf) used by the spark launched Kyuubi, which always can be found in the $SPARK_HOME/conf directory.

    The way how we configure Kyuubi as above means this configuration will be spread server side to affect all KyuubiSession s a.k.a HiveConnection s.

    3.2 Session level scope

    Kyuubi also treats it as an session level configuration, so we can change it inside session without affecting others. This makes Kyuubi more flexible. We will introduce it in the following.

    Let's say we already have a startup Kyuubi Server. We use beeline cli to connect and test.

    ~/data/apache-spark/spark-2.1.2-bin-hadoop2.7$ bin/beeline -u "jdbc:hive2://kyuubi.server.163.org:10009/;principal=hive/kyuubi.server.163.org@SERVER.163.ORG;hive.server2.proxy.user=hzyaoqin#spark.yarn.queue=default;spark.sql.haha=hehe;spark.scheduler.mode=FAIR;spark.kyuubi.operation.incremental.collect=true"
    Connecting to jdbc:hive2://kyuubi.server.163.org10009/;principal=hive/kyuubi.server.163.org@SERVER.163.ORG;hive.server2.proxy.user=hzyaoqin#spark.yarn.queue=default;spark.sql.haha=hehe;spark.scheduler.mode=FAIR;spark.kyuubi.operation.incremental.collect=true
    18/05/25 14:49:08 INFO Utils: Supplied authorities: kyuubi.server.163.org:10009
    18/05/25 14:49:08 INFO Utils: Resolved authority:kyuubi.server.163.org:10009
    18/05/25 14:49:08 INFO HiveConnection: Will try to open client transport with JDBC Uri: jdbc:hive2://kyuubi.server.163.org:10009/;principal=hive/kyuubi.server.163.org@SERVER.163.ORG;hive.server2.proxy.user=hzyaoqin#spark.yarn.queue=default;spark.sql.haha=hehe;spark.scheduler.mode=FAIR;spark.kyuubi.operation.incremental.collect=true
    Connected to: Spark SQL (version 2.1.2)
    Driver: Hive JDBC (version 1.2.1.spark2)
    Transaction isolation: TRANSACTION_REPEATABLE_READ
    Beeline version 1.2.1.spark2 by Apache Hive
    

    Like other spark/kyuubi configuration, we can simplely put it in the connection string. And let's test with show tables statement.

    0: jdbc:hive2://kyuubi.server.163.org:> show tables;
    18/05/25 14:49:54 INFO KyuubiOperation: Running query 'show tables' with 3a1f0ca4-8495-4e23-aa1e-bffb202b49a5
    18/05/25 14:49:54 INFO SparkSqlParser: Parsing command: show tables
    18/05/25 14:49:54 INFO KyuubiOperation: Executing query in incremental collection mode
    18/05/25 14:49:54 INFO DAGScheduler: Asked to cancel job group 3a1f0ca4-8495-4e23-aa1e-bffb202b49a5
    +-----------+-----------------+--------------+--+
    | database  |    tableName    | isTemporary  |
    +-----------+-----------------+--------------+--+
    | default   | src             | false        |
    | default   | src2            | false        |
    | default   | src3            | false        |
    | default   | src_parquet_30  | false        |
    +-----------+-----------------+--------------+--+
    4 rows selected (0.835 seconds)
    

    "KyuubiOperation: Executing query in incremental collection mode" tells us that we collect results incrementally...

    And than, let's test select * from src2 statement.

    0: jdbc:hive2://kyuubi.server.163.org:> select * from src2;
    18/05/25 14:50:00 INFO KyuubiOperation: Running query 'select * from src2' with ca9f76f5-72f7-4c22-86cd-f31c892070c8
    18/05/25 14:50:00 INFO SparkSqlParser: Parsing command: select * from src2
    18/05/25 14:50:01 INFO CatalystSqlParser: Parsing command: int
    18/05/25 14:50:01 INFO CatalystSqlParser: Parsing command: string
    18/05/25 14:50:01 INFO deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
    18/05/25 14:50:01 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 339.6 KB, free 15.8 GB)
    18/05/25 14:50:01 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 31.2 KB, free 15.8 GB)
    18/05/25 14:50:01 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.201.168.184:25356 (size: 31.2 KB, free: 15.8 GB)
    18/05/25 14:50:01 INFO SparkContext: Created broadcast 0 from toString at KyuubiOperation.scala:319
    18/05/25 14:50:01 INFO KyuubiOperation: Executing query in incremental collection mode
    18/05/25 14:50:02 INFO GPLNativeCodeLoader: Loaded native gpl library
    18/05/25 14:50:02 WARN LzoCompressor: java.lang.UnsatisfiedLinkError: Cannot load liblzo2.so.2 (liblzo2.so.2: cannot open shared object file: No such file or directory)!
    18/05/25 14:50:02 ERROR LzoCodec: Failed to load/initialize native-lzo library
    18/05/25 14:50:02 INFO FileInputFormat: Total input paths to process : 1
    18/05/25 14:50:02 INFO DAGScheduler: Asked to cancel job group ca9f76f5-72f7-4c22-86cd-f31c892070c8
    +------+----------+--+
    | key  |  value   |
    +------+----------+--+
    | 238  | val_238  |
    | 86   | val_86   |
    497 lines are omitted here......
    | 97   | val_97   |
    +------+----------+--+
    500 rows selected (4.938 seconds)
    18/05/25 14:50:02 INFO DAGScheduler: Got job 0 (toSeq at KyuubiOperation.scala:233) with 1 output partitions
    18/05/25 14:50:02 INFO DAGScheduler: Final stage: ResultStage 0 (toSeq at KyuubiOperation.scala:233)
    18/05/25 14:50:02 INFO DAGScheduler: Parents of final stage: List()
    18/05/25 14:50:02 INFO DAGScheduler: Missing parents: List()
    18/05/25 14:50:02 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[6] at toLocalIterator at KyuubiOperation.scala:324), which has no missing parents
    18/05/25 14:50:02 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 8.1 KB, free 15.8 GB)
    18/05/25 14:50:02 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 4.4 KB, free 15.8 GB)
    18/05/25 14:50:02 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 10.201.168.184:25356 (size: 4.4 KB, free: 15.8 GB)
    18/05/25 14:50:02 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:996
    18/05/25 14:50:02 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[6] at toLocalIterator at KyuubiOperation.scala:324)
    18/05/25 14:50:02 INFO YarnScheduler: Adding task set 0.0 with 1 tasks
    18/05/25 14:50:02 INFO FairSchedulableBuilder: Added task set TaskSet_0.0 tasks to pool default
    18/05/25 14:50:02 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, hadoop3020.jd.163.org, executor 1, partition 0, NODE_LOCAL, 6005 bytes)
    18/05/25 14:50:02 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on hadoop3020.jd.163.org:57229 (size: 4.4 KB, free: 10.5 GB)
    18/05/25 14:50:03 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on hadoop3020.jd.163.org:57229 (size: 31.2 KB, free: 10.5 GB)
    18/05/25 14:50:05 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 3396 ms on hadoop3020.jd.163.org (executor 1) (1/1)
    18/05/25 14:50:05 INFO YarnScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool default
    18/05/25 14:50:05 INFO DAGScheduler: ResultStage 0 (toSeq at KyuubiOperation.scala:233) finished in 3.401 s
    18/05/25 14:50:05 INFO DAGScheduler: Got job 1 (indices at ColumnBasedSet.scala:60) with 1 output partitions
    18/05/25 14:50:05 INFO DAGScheduler: Final stage: ResultStage 1 (indices at ColumnBasedSet.scala:60)
    18/05/25 14:50:05 INFO DAGScheduler: Parents of final stage: List()
    18/05/25 14:50:05 INFO DAGScheduler: Missing parents: List()
    18/05/25 14:50:05 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[6] at toLocalIterator at KyuubiOperation.scala:324), which has no missing parents
    18/05/25 14:50:05 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 8.1 KB, free 15.8 GB)
    18/05/25 14:50:05 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 4.4 KB, free 15.8 GB)
    18/05/25 14:50:05 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 10.201.168.184:25356 (size: 4.4 KB, free: 15.8 GB)
    18/05/25 14:50:05 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:996
    18/05/25 14:50:05 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[6] at toLocalIterator at KyuubiOperation.scala:324)
    18/05/25 14:50:05 INFO YarnScheduler: Adding task set 1.0 with 1 tasks
    18/05/25 14:50:05 INFO FairSchedulableBuilder: Added task set TaskSet_1.0 tasks to pool default
    18/05/25 14:50:05 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, hadoop3020.jd.163.org, executor 1, partition 1, NODE_LOCAL, 6005 bytes)
    18/05/25 14:50:05 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on hadoop3020.jd.163.org:57229 (size: 4.4 KB, free: 10.5 GB)
    18/05/25 14:50:05 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 98 ms on hadoop3020.jd.163.org (executor 1) (1/1)
    18/05/25 14:50:05 INFO YarnScheduler: Removed TaskSet 1.0, whose tasks have all completed, from pool default
    18/05/25 14:50:05 INFO DAGScheduler: ResultStage 1 (indices at ColumnBasedSet.scala:60) finished in 0.099 s
    

    "KyuubiOperation: Executing query in incremental collection mode" shows..

    3.3 By set command

    As a runtime configuration, it can be set by SetCommand as following,

    set spark.kyuubi.operation.incremental.collect=false;
    

    Let's test this mode in the same beeline client.

    0: jdbc:hive2://hzadg-jenkins.server.163.org:> set spark.kyuubi.operation.incremental.collect=false;
    18/05/25 15:58:50 INFO KyuubiOperation: Running query 'set spark.kyuubi.operation.incremental.collect=false' with b63966a7-731c-48d6-9b99-f1a738232bb5
    18/05/25 15:58:50 INFO SparkSqlParser: Parsing command: set spark.kyuubi.operation.incremental.collect=false
    18/05/25 15:58:50 INFO KyuubiOperation: Executing query in incremental collection mode
    18/05/25 15:58:50 INFO DAGScheduler: Asked to cancel job group b63966a7-731c-48d6-9b99-f1a738232bb5
    +---------------------------------------------+--------+--+
    |                     key                     | value  |
    +---------------------------------------------+--------+--+
    | spark.kyuubi.operation.incremental.collect  | false  |
    +---------------------------------------------+--------+--+
    

    As the logs shown above, we still go the incremental way. And than execute select * from src2.

    0: jdbc:hive2://kyuubi.server.163.org:> select * from src2;
    18/05/25 15:58:54 INFO KyuubiOperation: Running query 'select * from src2' with 01e00534-c927-4a45-be3b-9c0491897322
    18/05/25 15:58:54 INFO SparkSqlParser: Parsing command: select * from src2
    18/05/25 15:58:54 INFO CatalystSqlParser: Parsing command: int
    18/05/25 15:58:54 INFO CatalystSqlParser: Parsing command: string
    18/05/25 15:58:54 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 339.6 KB, free 15.8 GB)
    18/05/25 15:58:54 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 31.2 KB, free 15.8 GB)
    18/05/25 15:58:54 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on 10.201.168.184:25356 (size: 31.2 KB, free: 15.8 GB)
    18/05/25 15:58:54 INFO SparkContext: Created broadcast 3 from toString at KyuubiOperation.scala:319
    18/05/25 15:58:54 INFO FileInputFormat: Total input paths to process : 1
    18/05/25 15:58:54 INFO SparkContext: Starting job: collect at KyuubiOperation.scala:326
    18/05/25 15:58:54 INFO DAGScheduler: Got job 2 (collect at KyuubiOperation.scala:326) with 2 output partitions
    18/05/25 15:58:54 INFO DAGScheduler: Final stage: ResultStage 2 (collect at KyuubiOperation.scala:326)
    18/05/25 15:58:54 INFO DAGScheduler: Parents of final stage: List()
    18/05/25 15:58:54 INFO DAGScheduler: Missing parents: List()
    18/05/25 15:58:54 INFO DAGScheduler: Submitting ResultStage 2 (MapPartitionsRDD[12] at collect at KyuubiOperation.scala:326), which has no missing parents
    18/05/25 15:58:54 INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 8.0 KB, free 15.8 GB)
    18/05/25 15:58:54 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 4.4 KB, free 15.8 GB)
    18/05/25 15:58:54 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on 10.201.168.184:25356 (size: 4.4 KB, free: 15.8 GB)
    18/05/25 15:58:54 INFO SparkContext: Created broadcast 4 from broadcast at DAGScheduler.scala:996
    18/05/25 15:58:54 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 2 (MapPartitionsRDD[12] at collect at KyuubiOperation.scala:326)
    18/05/25 15:58:54 INFO YarnScheduler: Adding task set 2.0 with 2 tasks
    18/05/25 15:58:54 INFO FairSchedulableBuilder: Added task set TaskSet_2.0 tasks to pool default
    18/05/25 15:58:54 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 2, hadoop3020.jd.163.org, executor 1, partition 0, NODE_LOCAL, 6261 bytes)
    18/05/25 15:58:54 INFO TaskSetManager: Starting task 1.0 in stage 2.0 (TID 3, hadoop3020.jd.163.org, executor 1, partition 1, NODE_LOCAL, 6261 bytes)
    18/05/25 15:58:54 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on hadoop3020.jd.163.org:57229 (size: 4.4 KB, free: 10.5 GB)
    18/05/25 15:58:54 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on hadoop3020.jd.163.org:57229 (size: 31.2 KB, free: 10.5 GB)
    18/05/25 15:58:54 INFO TaskSetManager: Finished task 1.0 in stage 2.0 (TID 3) in 219 ms on hadoop3020.jd.163.org (executor 1) (1/2)
    18/05/25 15:58:54 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 2) in 223 ms on hadoop3020.jd.163.org (executor 1) (2/2)
    18/05/25 15:58:54 INFO YarnScheduler: Removed TaskSet 2.0, whose tasks have all completed, from pool default
    18/05/25 15:58:54 INFO DAGScheduler: ResultStage 2 (collect at KyuubiOperation.scala:326) finished in 0.219 s
    18/05/25 15:58:54 INFO DAGScheduler: Job 2 finished: collect at KyuubiOperation.scala:326, took 0.240928 s
    18/05/25 15:58:54 INFO DAGScheduler: Asked to cancel job group 01e00534-c927-4a45-be3b-9c0491897322
    +------+----------+--+
    | key  |  value   |
    +------+----------+--+
    | 238  | val_238  |
    | 86   | val_86   |
    497 lines are omitted here......
    | 97   | val_97   |
    +------+----------+--+
    500 rows selected (0.553 seconds)
    

    "KyuubiOperation: Executing query in incremental collection mode" has gone away. we set back to collect data as a whole part.

    Conclusions

    Kyuubi now supports incremental result collection in three ways. HOPE this feature helps all of you in Spark SQL productization.

    相关文章

      网友评论

        本文标题:How to Use Incremental Result Co

        本文链接:https://www.haomeiwen.com/subject/rzxujftx.html