Flink读写Kafka

作者: Woople | 来源:发表于2019-05-30 08:03 被阅读13次

    本文样例基于flink 1.8.0版本介绍如何通过flink读写kafka数据

    完整样例代码

    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.formats.csv.CsvRowDeserializationSchema;
    import org.apache.flink.formats.csv.CsvRowSerializationSchema;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.kafka.Kafka010TableSink;
    import org.apache.flink.streaming.connectors.kafka.Kafka010TableSource;
    import org.apache.flink.streaming.connectors.kafka.KafkaTableSinkBase;
    import org.apache.flink.streaming.connectors.kafka.KafkaTableSourceBase;
    import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.TableSchema;
    import org.apache.flink.table.api.java.StreamTableEnvironment;
    import org.apache.flink.api.common.typeinfo.Types;
    import org.apache.flink.types.Row;
    
    import java.util.Optional;
    import java.util.Properties;
    
    public class FlinkKafkaDemo {
        public static void main(String[] args) throws Exception {
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
            final TableSchema tableSchema = new TableSchema(new String[]{"imsi", "lac", "cell"}, new TypeInformation[]{Types.STRING, Types.STRING, Types.STRING});
            final TypeInformation<Row> typeInfo = tableSchema.toRowType();
    
            final CsvRowDeserializationSchema.Builder deserSchemaBuilder = new CsvRowDeserializationSchema.Builder(typeInfo).setFieldDelimiter(',');
    
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "hostA:6667");
    
            KafkaTableSourceBase kafkaTableSource = new Kafka010TableSource(
                    tableSchema,
                    "foo",
                    properties,
                    deserSchemaBuilder.build());
    
            tableEnv.registerTableSource("KafkaCsvTable", kafkaTableSource);
    
            Table kafkaCsvTable = tableEnv.scan("KafkaCsvTable");
            Table result = kafkaCsvTable.where("lac != '5'").select("imsi,lac,cell");
    
            DataStream ds = tableEnv.toAppendStream(result, typeInfo);
    
            final CsvRowSerializationSchema.Builder serSchemaBuilder = new CsvRowSerializationSchema.Builder(typeInfo).setFieldDelimiter('|').setQuoteCharacter('\0').setLineDelimiter("\r");
    
            KafkaTableSinkBase sink = new Kafka010TableSink(
                    result.getSchema(),
                    "bar",
                    properties,
                    Optional.of(new FlinkFixedPartitioner<>()),
                    serSchemaBuilder.build());
    
            sink.emitDataStream(ds);
    
            env.execute("Flink kafka demo");
        }
    }
    

    另一种方式为

    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.api.common.typeinfo.Types;
    import org.apache.flink.formats.csv.CsvRowDeserializationSchema;
    import org.apache.flink.formats.csv.CsvRowSerializationSchema;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.kafka.*;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.TableSchema;
    import org.apache.flink.table.api.java.StreamTableEnvironment;
    import org.apache.flink.types.Row;
    import java.util.Properties;
    
    public class FlinkKafkaDemoT {
        public static void main(String[] args) throws Exception {
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
            final TableSchema tableSchema = new TableSchema(new String[]{"imsi","lac","cell"}, new TypeInformation[]{Types.STRING, Types.STRING, Types.STRING});
            final TypeInformation<Row> typeInfo = tableSchema.toRowType();
            final CsvRowDeserializationSchema.Builder deserSchemaBuilder = new CsvRowDeserializationSchema.Builder(typeInfo).setFieldDelimiter(',');
    
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "hostA:6667");
    
            FlinkKafkaConsumer010<Row> myConsumer = new FlinkKafkaConsumer010(
                    "foo",
                    deserSchemaBuilder.build(),
                    properties);
    
            myConsumer.setStartFromLatest();
    
            DataStream<Row> stream = env.addSource(myConsumer);
            tableEnv.registerDataStream("KafkaCsvTable", stream);
            Table kafkaCsvTable = tableEnv.scan("KafkaCsvTable");
            Table result = kafkaCsvTable.where("lac != '5'").select("imsi,lac,cell");
    
            final CsvRowSerializationSchema.Builder serSchemaBuilder = new CsvRowSerializationSchema.Builder(typeInfo).setFieldDelimiter(',').setLineDelimiter("\r");
    
            DataStream ds = tableEnv.toAppendStream(result, typeInfo);
            FlinkKafkaProducer010<Row> myProducer = new FlinkKafkaProducer010<>(
                    "hostA:6667",
                    "bar",
                    serSchemaBuilder.build());
    
            myProducer.setWriteTimestampToKafka(true);
    
            ds.addSink(myProducer);
    
            env.execute("Flink kafka demo");
        }
    }
    

    注意上面代码中,下面两个类的引用需要配置阿里的仓库

    import org.apache.flink.formats.csv.CsvRowDeserializationSchema;
    import org.apache.flink.formats.csv.CsvRowSerializationSchema;
    

    pom.xml文件如下

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.woople.tutorial.flink</groupId>
        <artifactId>flink-examples</artifactId>
        <version>1.0-SNAPSHOT</version>
        <repositories>
            <repository>
                <id>ali</id>
                <name>ali</name>
                <url>http://maven.aliyun.com/nexus/content/groups/public</url>
                <releases>
                    <enabled>true</enabled>
                </releases>
                <snapshots>
                    <enabled>false</enabled>
                </snapshots>
            </repository>
        </repositories>
        <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_2.11</artifactId>
                <version>1.8.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-scala_2.11</artifactId>
                <version>1.8.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
                <version>1.8.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-csv</artifactId>
                <version>1.8.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-planner_2.11</artifactId>
                <version>1.8.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>1.8.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-scala_2.11</artifactId>
                <version>1.8.0</version>
            </dependency>
        </dependencies>
        <build>
            <defaultGoal>package</defaultGoal>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-resources-plugin</artifactId>
                    <configuration>
                        <encoding>UTF-8</encoding>
                    </configuration>
                    <executions>
                        <execution>
                            <goals>
                                <goal>copy-resources</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.2</version>
                    <executions>
                        <execution>
                            <id>eclipse-add-source</id>
                            <goals>
                                <goal>add-source</goal>
                            </goals>
                        </execution>
                        <execution>
                            <id>scala-compile-first</id>
                            <phase>process-resources</phase>
                            <goals>
                                <goal>compile</goal>
                            </goals>
                        </execution>
                        <execution>
                            <id>scala-test-compile-first</id>
                            <phase>process-test-resources</phase>
                            <goals>
                                <goal>testCompile</goal>
                            </goals>
                        </execution>
                        <execution>
                            <id>attach-scaladocs</id>
                            <phase>verify</phase>
                            <goals>
                                <goal>doc-jar</goal>
                            </goals>
                        </execution>
                    </executions>
                    <configuration>
                        <scalaVersion>2.11.8</scalaVersion>
                        <recompileMode>incremental</recompileMode>
                        <useZincServer>true</useZincServer>
                    </configuration>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.5.1</version>
                    <executions>
                        <execution>
                            <phase>compile</phase>
                            <goals>
                                <goal>compile</goal>
                            </goals>
                        </execution>
                    </executions>
                    <configuration>
                        <source>8</source>
                        <target>8</target>
                    </configuration>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>2.4.1</version>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                        </execution>
                    </executions>
                    <configuration>
                        <shadedArtifactAttached>false</shadedArtifactAttached>
                        <filters>
                            <filter>
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
                        <finalName>${project.artifactId}-${project.version}-bundle</finalName>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    </project>
    

    相关文章

      网友评论

        本文标题:Flink读写Kafka

        本文链接:https://www.haomeiwen.com/subject/ciiptctx.html