美文网首页
game 数据分析

game 数据分析

作者: 飞起的书包 | 来源:发表于2019-02-22 13:41 被阅读3次

    package kafka_producer;

    import org.apache.kafka.clients.producer.Callback;

    import org.apache.kafka.clients.producer.KafkaProducer;

    import org.apache.kafka.clients.producer.ProducerRecord;

    import org.apache.kafka.clients.producer.RecordMetadata;

    import java.util.Properties;

    public class GameDataProducer {

        public static void main(String[] args) {

            Properties props = new Properties();

            //kafka 集群配置

            props.put("bootstrap.servers", "tstkj001:6667,tstkj002:6667,tstkj003:6667");

            props.put("acks", "1");

            props.put("retries", 3);

            props.put("batch.size", 16384);

            props.put("buffer.memory", 33554432);

            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

            KafkaProducer<String, String> producer = new KafkaProducer<>(props);

            //用户 topic 和数据 自己模拟

            ProducerRecord<String, String> msg1 = new ProducerRecord<>("user", "");

            //游戏 topic 和数据 自己模拟

            ProducerRecord<String, String> msg2 = new ProducerRecord<>("game", "");

            send(producer,msg1);

            send(producer,msg2);

        }

        public static void send(KafkaProducer producer,  ProducerRecord<String, String> msg){

            producer.send(msg, new Callback() {

                @Override

                public void onCompletion(RecordMetadata recordMetadata, Exception e) {

                    if(e !=null){

                        e.printStackTrace();

                    }

                }

            });

        }

    }

    ___________________________________________________________________________________________________

    import org.apache.spark.SparkConf

    import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}

    import org.apache.spark.streaming.{Seconds, StreamingContext}

    object DirectKafkaWordCount {

      def main(args: Array[String]) {

        if (args.length < 2) {

          System.err.println(

            s"""

              |Usage: DirectKafkaWordCount <brokers> <topics>

              |  <brokers> is a list of one or more Kafka brokers

              |  <topics> is a list of one or more kafka topics to consume from

              |

            """.stripMargin)

          System.exit(1)

        }

        //    StreamingExamples.setStreamingLogLevels()

        val Array(brokers, topics) = args

        // Create context with 2 second batch interval

        val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")

        val ssc = new StreamingContext(sparkConf, Seconds(2))

        // Create direct kafka stream with brokers and topics

        val topicsSet = topics.split(",").toSet

        val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)

        val messages = KafkaUtils.createDirectStream[String, String](

          ssc,

          LocationStrategies.PreferConsistent,

          ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))

        // Get the lines, split them into words, count the words and print

        val lines = messages.map(_.value)

        val words = lines.flatMap(_.split(" "))

        val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)

        wordCounts.print()

        // Start the computation

        ssc.start()

        ssc.awaitTermination()

      }

    }

    _____________________________________________________________________________________________________

    /**

      * 测试类

      */

    object WordCountTest {

      def main(args: Array[String]): Unit = {

        val params = new Array[String](2)

        //对应的是DirectKafkaWordCount中args[0] 的参数 kafka集群

          params(0) = "tstkj001:6667,tstkj002:6667,tstkj003:6667"

        //对应的是DirectKafkaWordCount中args[1] 的参数 topic

          params(1) = "user,game"

        DirectKafkaWordCount.main(params)

      }

    }

    ___________________________________________________________________________________________________

    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->

    <dependencies>

        <dependency>

            <groupId>org.apache.spark</groupId>

            <artifactId>spark-streaming_2.11</artifactId>

            <version>2.2.0</version>

            <!--<scope>provided</scope>-->

        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10 -->

        <dependency>

            <groupId>org.apache.spark</groupId>

            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>

            <version>2.2.0</version>

            <!--<scope>provided</scope>-->

        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->

        <dependency>

            <groupId>org.apache.spark</groupId>

            <artifactId>spark-core_2.11</artifactId>

            <version>2.2.0</version>

            <!--<scope>provided</scope>-->

        </dependency>

    --

    相关文章

      网友评论

          本文标题:game 数据分析

          本文链接:https://www.haomeiwen.com/subject/mnfeyqtx.html