Spark Streaming简介
Spark Streaming是Spark提供的对实时数据进行流计算的组件, 是一个具有吞吐量高, 容错能力墙的实时流数据处理系统, 支持Kafka, Flume等, 获取资源后可以使用map(), reduce(), json(), window()等高级函数进行复杂的算法处理, 处理结果可以存储到文件系统,数据库, 或者展示到实时数据大盘等.
![](https://img.haomeiwen.com/i22206660/124cb94ca2e8569d.png)
Spark Streaming使用离散化流(Discretized Stream)作为抽象表示, 左脚DStream. DStream是随着时间推移而收到的数据序列. 每个时间区间收集到的数据都以RDD的形式存在, DStream是由这些RDD组成的序列那
DStream支持两种类型的操作, 一种是转换操作(Transformation), 会生成一个新的RDD, 另一种是输出操作(Output Operation), 可以把数据持久化写入外部系统.
Spark Streaming WordCount
引入Spark Streaming依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.3.1</version>
<scope>compile</scope>
</dependency>
代码实现wordcount
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object StreamingWordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
val ssc = new StreamingContext(conf, Seconds(1))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
开启shell中的netcat工具输出
root@amarsoft-H81M-S1:~/# nc -lk 9999
aa a aa bbb c
程序输出
Time: 1593756750000 ms
-------------------------------------------
(aa,2)
(a,1)
(bbb,1)
(c,1)
kafka域Spark Streaming整合
引入依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.3.1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
实现往每1秒kafka发送随机数字, 用spark streaming进行消费
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.TimeUnit;
public class ProducerFastStart {
public static final String brokerList = "192.168.61.97:9092";
public static final String topic = "test_gp";
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("bootstrap.servers", brokerList);
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
Random random = new Random();
while (true) {
try {
String msg = String.valueOf(random.nextInt(10));
ProducerRecord<String, String> record = new ProducerRecord<>(topic, msg);
producer.send(record).get();
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
spark streaming每隔2秒拉取当下时间段的数据进行求和
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.ConsumerStrategies._
import org.apache.spark.streaming.kafka010.LocationStrategies._
object StreamingWithKafka {
private val brokers = "192.168.61.97:9092"
private val topic = "test_gp"
private val group = "group.demo"
private val checkpointDir = "/opt/kafka/checkpoint"
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamingWithKafka")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint(checkpointDir)
val kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.GROUP_ID_CONFIG -> group,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false:java.lang.Boolean)
)
val stream = KafkaUtils.createDirectStream[String, String](
ssc, PreferConsistent, Subscribe[String, String](List(topic), kafkaParams))
val value = stream.map(record => {
val iniVal = Integer.valueOf(record.value())
println("当前值为:" + iniVal.toString())
iniVal
}).reduce(_ + _)
value.print()
ssc.start()
ssc.awaitTermination()
}
}
程序输出
-------------------------------------------
Time: 1593759210000 ms
-------------------------------------------
5
当前值为:6
当前值为:7
-------------------------------------------
Time: 1593759212000 ms
-------------------------------------------
13
当前值为:6
-------------------------------------------
Time: 1593759214000 ms
-------------------------------------------
6
当前值为:1
-------------------------------------------
网友评论