美文网首页
Flume 采集 kafka 数据实时写入 Kudu

Flume 采集 kafka 数据实时写入 Kudu

作者: lei_charles | 来源:发表于2020-05-17 19:12 被阅读0次
  1. 创建 JsonKuduOperationsProducer.java 用于处理 Json 字符串写入Kudu
    import com.alibaba.fastjson.JSON;
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.FlumeException;
    import org.apache.flume.annotations.InterfaceAudience;
    import org.apache.flume.annotations.InterfaceStability;
    import org.apache.kudu.ColumnSchema;
    import org.apache.kudu.Schema;
    import org.apache.kudu.Type;
    import org.apache.kudu.client.KuduTable;
    import org.apache.kudu.client.Operation;
    import org.apache.kudu.client.PartialRow;
    import org.apache.kudu.flume.sink.KuduOperationsProducer;
    import org.apache.kudu.shaded.com.google.common.base.Preconditions;
    import org.apache.kudu.shaded.com.google.common.collect.Lists;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.math.BigDecimal;
    import java.nio.charset.Charset;
    import java.util.*;
    
    
    @InterfaceAudience.Public
    @InterfaceStability.Evolving
    public class JsonKuduOperationsProducer implements KuduOperationsProducer {
    
        private static final Logger logger = LoggerFactory.getLogger(JsonKuduOperationsProducer.class);
        private static final String INSERT = "insert";
        private static final String UPSERT = "upsert";
        private static final List<String> validOperations = Lists.newArrayList(UPSERT, INSERT);
        public static final String ENCODING_PROP = "encoding";
        public static final String DEFAULT_ENCODING = "utf-8";
        public static final String OPERATION_PROP = "operation";
        public static final String DEFAULT_OPERATION = UPSERT;
    
        @Deprecated
        public static final String SKIP_MISSING_COLUMN_PROP = "skipMissingColumn";
        @Deprecated
        public static final boolean DEFAULT_SKIP_MISSING_COLUMN = false;
        @Deprecated
        public static final String SKIP_BAD_COLUMN_VALUE_PROP = "skipBadColumnValue";
        @Deprecated
        public static final boolean DEFAULT_SKIP_BAD_COLUMN_VALUE = false;
        @Deprecated
        public static final String WARN_UNMATCHED_ROWS_PROP = "skipUnmatchedRows";
        @Deprecated
        public static final boolean DEFAULT_WARN_UNMATCHED_ROWS = true;
    
        public static final String MISSING_COLUMN_POLICY_PROP = "missingColumnPolicy";
        public static final JsonKuduOperationsProducer.ParseErrorPolicy DEFAULT_MISSING_COLUMN_POLICY;
        public static final String BAD_COLUMN_VALUE_POLICY_PROP = "badColumnValuePolicy";
        public static final JsonKuduOperationsProducer.ParseErrorPolicy DEFAULT_BAD_COLUMN_VALUE_POLICY;
        public static final String UNMATCHED_ROW_POLICY_PROP = "unmatchedRowPolicy";
        public static final JsonKuduOperationsProducer.ParseErrorPolicy DEFAULT_UNMATCHED_ROW_POLICY;
    
        private KuduTable table;
        private Charset charset;
        private String operation;
    
        private JsonKuduOperationsProducer.ParseErrorPolicy missingColumnPolicy;
        private JsonKuduOperationsProducer.ParseErrorPolicy badColumnValuePolicy;
        private JsonKuduOperationsProducer.ParseErrorPolicy unmatchedRowPolicy;
    
    
        public JsonKuduOperationsProducer() {
    
        }
    
    
        @Override
        public void configure(Context context) {
    
            String charsetName = context.getString(ENCODING_PROP, DEFAULT_ENCODING);
            try {
                charset = Charset.forName(charsetName);
            } catch (IllegalArgumentException e) {
                throw new FlumeException(
                        String.format("Invalid or unsupported charset %s", charsetName), e);
            }
            this.operation = context.getString(OPERATION_PROP, DEFAULT_OPERATION).toLowerCase();
            Preconditions.checkArgument(validOperations.contains(this.operation),
                    "Unrecognized operation '%s'", this.operation);
    
            this.missingColumnPolicy = this.getParseErrorPolicyCheckingDeprecatedProperty(
                    context, SKIP_MISSING_COLUMN_PROP, MISSING_COLUMN_POLICY_PROP,
                    JsonKuduOperationsProducer.ParseErrorPolicy.WARN,
                    JsonKuduOperationsProducer.ParseErrorPolicy.REJECT, DEFAULT_MISSING_COLUMN_POLICY);
            this.badColumnValuePolicy = this.getParseErrorPolicyCheckingDeprecatedProperty(
                    context, SKIP_BAD_COLUMN_VALUE_PROP, BAD_COLUMN_VALUE_POLICY_PROP,
                    JsonKuduOperationsProducer.ParseErrorPolicy.WARN,
                    JsonKuduOperationsProducer.ParseErrorPolicy.REJECT, DEFAULT_BAD_COLUMN_VALUE_POLICY);
            this.unmatchedRowPolicy = this.getParseErrorPolicyCheckingDeprecatedProperty(
                    context, WARN_UNMATCHED_ROWS_PROP, UNMATCHED_ROW_POLICY_PROP,
                    JsonKuduOperationsProducer.ParseErrorPolicy.WARN,
                    JsonKuduOperationsProducer.ParseErrorPolicy.IGNORE, DEFAULT_UNMATCHED_ROW_POLICY);
        }
    
        @Override
        public void initialize(KuduTable kuduTable) {
            this.table = kuduTable;
        }
    
        @Override
        public List<Operation> getOperations(Event event) throws FlumeException {
            String raw = new String(event.getBody(), charset);
            Map<String, String> rawMap = JSON.parseObject(raw, HashMap.class);
            Schema schema = this.table.getSchema();
            List<Operation> ops = Lists.newArrayList();
            if (raw != null && !raw.isEmpty()) {
                Operation op;
                switch (operation) {
                    case UPSERT:
                        op = this.table.newUpsert();
                        break;
                    case INSERT:
                        op = this.table.newInsert();
                        break;
                    default:
                        throw new FlumeException(
                                String.format("Unrecognized operation type '%s' in getOperations(): " +
                                        "this should never happen!", this.operation));
                }
                PartialRow row = op.getRow();
    
                Iterator iterator = schema.getColumns().iterator();
                while (iterator.hasNext()) {
                    ColumnSchema col = (ColumnSchema) iterator.next();
    //                logger.error("Column:" + col.getName() + "----" + rawMap.get(col.getName()) + "----" + col.getType());
                    String msg;
                    try {
                        this.coerceAndSet(rawMap.get(col.getName()), col.getName(), col.getType(), row);
                    } catch (NumberFormatException e) {
                        msg = String.format("Raw value '%s' couldn't be parsed to type %s for column '%s'",
                                raw, col.getType(), col.getName());
                        this.logOrThrow(this.badColumnValuePolicy, msg, e);
                    } catch (IllegalArgumentException e) {
                        msg = String.format("Column '%s' has no matching group in '%s'", col.getName(), raw);
                        this.logOrThrow(this.missingColumnPolicy, msg, e);
                    } catch (Exception e) {
                        throw new FlumeException("Failed to create Kudu operation", e);
                    }
                }
                ops.add(op);
            }
            return ops;
        }
    
        private void coerceAndSet(String rawVal, String colName, Type type, PartialRow row) throws NumberFormatException {
            switch (type) {
                case BOOL:
                    row.addBoolean(colName, Boolean.parseBoolean(rawVal));
                    break;
                case INT8:
                    row.addByte(colName, Byte.parseByte(rawVal));
                    break;
                case INT16:
                    row.addShort(colName, Short.parseShort(rawVal));
                    break;
                case INT32:
                    row.addInt(colName, Integer.parseInt(rawVal));
                    break;
                case INT64:
                case UNIXTIME_MICROS:
                    row.addLong(colName, Long.parseLong(rawVal));
                    break;
                case FLOAT:
                    row.addFloat(colName, Float.parseFloat(rawVal));
                    break;
                case DOUBLE:
                    row.addDouble(colName, Double.parseDouble(rawVal));
                    break;
                case DECIMAL:
                    row.addDecimal(colName, new BigDecimal(rawVal));
                    break;
                case BINARY:
                    row.addBinary(colName, rawVal.getBytes(this.charset));
                    break;
                case STRING:
                    row.addString(colName, rawVal == null ? "" : rawVal);
                    break;
                default:
                    logger.warn("got unknown type {} for column '{}'-- ignoring this column", type, colName);
            }
    
        }
    
        private void logOrThrow(JsonKuduOperationsProducer.ParseErrorPolicy policy, String msg, Exception e) throws FlumeException {
            switch (policy) {
                case REJECT:
                    throw new FlumeException(msg, e);
                case WARN:
                    logger.warn(msg, e);
                case IGNORE:
                default:
            }
        }
    
        @Override
        public void close() {
    
        }
    
        private JsonKuduOperationsProducer.ParseErrorPolicy getParseErrorPolicyCheckingDeprecatedProperty(
                Context context, String deprecatedPropertyName, String newPropertyName,
                JsonKuduOperationsProducer.ParseErrorPolicy trueValue,
                JsonKuduOperationsProducer.ParseErrorPolicy falseValue,
                JsonKuduOperationsProducer.ParseErrorPolicy defaultValue) {
            JsonKuduOperationsProducer.ParseErrorPolicy policy;
            if (context.containsKey(deprecatedPropertyName)) {
                logger.info("Configuration property {} is deprecated. Use {} instead.", deprecatedPropertyName, newPropertyName);
                Preconditions.checkArgument(!context.containsKey(newPropertyName), "Both {} and {} specified. Use only one of them, preferably {}.", deprecatedPropertyName, newPropertyName, newPropertyName);
                policy = context.getBoolean(deprecatedPropertyName) ? trueValue : falseValue;
            } else {
                String policyString = context.getString(newPropertyName, defaultValue.name());
                try {
                    policy = JsonKuduOperationsProducer.ParseErrorPolicy.valueOf(policyString.toUpperCase());
                } catch (IllegalArgumentException var10) {
                    throw new IllegalArgumentException("Unknown policy '" + policyString + "'. Use one of the following: " + Arrays.toString(JsonKuduOperationsProducer.ParseErrorPolicy.values()), var10);
                }
            }
            return policy;
        }
    
        static {
            DEFAULT_MISSING_COLUMN_POLICY = JsonKuduOperationsProducer.ParseErrorPolicy.REJECT;
            DEFAULT_BAD_COLUMN_VALUE_POLICY = JsonKuduOperationsProducer.ParseErrorPolicy.REJECT;
            DEFAULT_UNMATCHED_ROW_POLICY = JsonKuduOperationsProducer.ParseErrorPolicy.WARN;
        }
    
        private static enum ParseErrorPolicy {
            WARN,
            IGNORE,
            REJECT;
    
            private ParseErrorPolicy() {
            }
        }
    }
    
  2. 编写处理复杂 json 格式的拦截器
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.interceptor.Interceptor;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.nio.charset.Charset;
    import java.util.HashMap;
    import java.util.Iterator;
    import java.util.List;
    import java.util.Map;
    
    public class JsonInterceptor implements Interceptor {
    
        private static final Logger logger = LoggerFactory.getLogger(JsonInterceptor.class);
        private boolean preserveExisting;
        private String sourceKeys;
        private String targetKeys;
    
        private JsonInterceptor() {
    
        }
    
        private JsonInterceptor(boolean preserveExisting, String sourceKeys, String targetKeys) {
            this.preserveExisting = preserveExisting;
            this.sourceKeys = sourceKeys;
            this.targetKeys = targetKeys;
        }
    
        public void initialize() {
        }
    
        public Event intercept(Event event) {
            byte[] body = event.getBody();
            String bodyStr = new String(body);
            String[] sourceArray = sourceKeys.trim().split(",", -1);
            String[] targetArray = targetKeys.trim().toLowerCase().split(",", -1);
            Map resultMap = new HashMap<String, String>();
            if (sourceArray.length == targetArray.length) {
                JSONObject jsonObject = JSONObject.parseObject(bodyStr);
                JSONObject jsonObjectTemp = null;
                String[] arrayTemp = null;
                for (int i = 0; i < sourceArray.length; i++) {
                    if (sourceArray[i].contains(".")) {
                        arrayTemp = sourceArray[i].trim().split("\\.", -1);
                        jsonObjectTemp = jsonObject;
                        for (int j = 0; j < arrayTemp.length - 1; j++) {
                            if (jsonObjectTemp != null) {
                                jsonObjectTemp = jsonObjectTemp.getJSONObject(arrayTemp[j].trim());
                            }else {
                                break;
                            }
                        }
                        if (jsonObjectTemp != null){
                            resultMap.put(targetArray[i].trim(), String.valueOf(jsonObjectTemp.getOrDefault(arrayTemp[arrayTemp.length - 1], "")));
                        }else {
                            resultMap.put(targetArray[i].trim(), "");
                        }
                    } else {
                        resultMap.put(targetArray[i].trim(), String.valueOf(jsonObject.getOrDefault(sourceArray[i], "")));
                    }
                }
            } else {
                logger.error("The sourceKeys and targetkeys lengths do not match");
            }
            event.setBody(JSON.toJSONString(resultMap).getBytes(Charset.forName("UTF-8")));
            return event;
        }
    
        public List<Event> intercept(List<Event> events) {
            Iterator i$ = events.iterator();
            while (i$.hasNext()) {
                Event event = (Event) i$.next();
                this.intercept(event);
            }
            return events;
        }
    
        public void close() {
        }
    
        public static class Constants {
            public static String PRESERVE = "preserveExisting";
            public static String SOURCE_KEYS = "sourceKeys";
            public static String TARGET_KEYS = "targetKeys";
            public static boolean PRESERVE_DFLT = false;
    
            public Constants() {
            }
        }
    
        public static class Builder implements Interceptor.Builder {
            private boolean preserveExisting;
            private String sourceKeys;
            private String targetKeys;
    
            public Builder() {
                this.preserveExisting = JsonInterceptor.Constants.PRESERVE_DFLT;
            }
    
            public Interceptor build() {
    
                return new JsonInterceptor(this.preserveExisting, this.sourceKeys, this.targetKeys);
            }
    
            public void configure(Context context) {
                this.preserveExisting = context.getBoolean(JsonInterceptor.Constants.PRESERVE, JsonInterceptor.Constants.PRESERVE_DFLT);
                this.sourceKeys = context.getString(Constants.SOURCE_KEYS);
                this.targetKeys = context.getString(Constants.TARGET_KEYS);
            }
        }
    }
    
  3. pom.xml
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.cloudera</groupId>
        <artifactId>flume-demo</artifactId>
        <version>1.0-SNAPSHOT</version>
        <name>flume-demo</name>
    
    
        <repositories>
            <!-- cloudera 的仓库 -->
            <repository>
                <id>cloudera</id>
                <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
                <name>Cloudera Repositories</name>
                <releases>
                    <enabled>true</enabled>
                </releases>
                <snapshots>
                    <enabled>false</enabled>
                </snapshots>
            </repository>
            <!-- maven 的仓库 -->
            <repository>
                <id>aliyun</id>
                <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
                <name>Maven2 Repositories</name>
                <snapshots>
                    <enabled>false</enabled>
                </snapshots>
            </repository>
        </repositories>
    
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <maven.compiler.source>1.8</maven.compiler.source>
            <maven.compiler.target>1.8</maven.compiler.target>
            <avro.version>1.8.2-cdh6.3.2</avro.version>
            <flume.version>1.9.0-cdh6.3.2</flume.version>
            <hadoop.version>3.0.0-cdh6.3.2</hadoop.version>
            <kudu.version>1.10.0-cdh6.3.2</kudu.version>
            <fastjson.version>1.2.58</fastjson.version>
            <slf4j.version>1.7.12</slf4j.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.apache.flume</groupId>
                <artifactId>flume-ng-core</artifactId>
                <version>${flume.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flume</groupId>
                <artifactId>flume-ng-sdk</artifactId>
                <version>${flume.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flume</groupId>
                <artifactId>flume-ng-configuration</artifactId>
                <version>${flume.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>${fastjson.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.kudu</groupId>
                <artifactId>kudu-client</artifactId>
                <version>${kudu.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.kudu</groupId>
                <artifactId>kudu-flume-sink</artifactId>
                <version>${kudu.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro</artifactId>
                <version>${avro.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>${hadoop.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>${slf4j.version}</version>
                <scope>provided</scope>
            </dependency>
        </dependencies>
    
    
        <build>
            <defaultGoal>install</defaultGoal>
            <sourceDirectory>src/main/java</sourceDirectory>
            <!--<testSourceDirectory>src/test/scala</testSourceDirectory>-->
            <resources>
                <resource>
                    <directory>src/main/resources/</directory>
                    <excludes>
                        <exclude>env/*/*</exclude>
                    </excludes>
                    <includes>
                        <include>**/*</include>
                    </includes>
                </resource>
                <resource>
                    <directory>src/main/resources/env/${profile.active}</directory>
                    <includes>
                        <include>**/*.properties</include>
                        <include>**/*.xml</include>
                    </includes>
                </resource>
            </resources>
            <pluginManagement>
                <plugins>
                    <plugin>
                        <groupId>org.apache.maven.plugins</groupId>
                        <artifactId>maven-compiler-plugin</artifactId>
                        <version>3.8.0</version>
                        <configuration>
                            <source>1.8</source>
                            <target>1.8</target>
                        </configuration>
                    </plugin>
                    <plugin>
                        <groupId>org.apache.maven.plugins</groupId>
                        <artifactId>maven-resources-plugin</artifactId>
                        <version>3.0.2</version>
                        <configuration>
                            <encoding>UTF-8</encoding>
                        </configuration>
                    </plugin>
                    <plugin>
                        <groupId>net.alchim31.maven</groupId>
                        <artifactId>scala-maven-plugin</artifactId>
                        <version>3.2.2</version>
                        <executions>
                            <execution>
                                <goals>
                                    <goal>compile</goal>
                                    <goal>testCompile</goal>
                                </goals>
                            </execution>
                        </executions>
                    </plugin>
                    <plugin>
                        <groupId>org.apache.maven.plugins</groupId>
                        <artifactId>maven-resources-plugin</artifactId>
                        <version>3.0.2</version>
                        <configuration>
                            <encoding>UTF-8</encoding>
                        </configuration>
                    </plugin>
                </plugins>
            </pluginManagement>
            <plugins>
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <executions>
                        <execution>
                            <id>scala-compile-first</id>
                            <phase>process-resources</phase>
                            <goals>
                                <goal>add-source</goal>
                                <goal>compile</goal>
                            </goals>
                        </execution>
                        <execution>
                            <id>scala-test-compile</id>
                            <phase>process-test-resources</phase>
                            <goals>
                                <goal>testCompile</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
    
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <executions>
                        <execution>
                            <phase>compile</phase>
                            <goals>
                                <goal>compile</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
    
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>3.2.1</version>
                    <configuration>
                        <createDependencyReducedPom>false</createDependencyReducedPom>
                    </configuration>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                            <configuration>
                                <filters>
                                    <filter>
                                        <artifact>*:*</artifact>
                                        <excludes>
                                            <exclude>META-INF/*.SF</exclude>
                                            <exclude>META-INF/*.DSA</exclude>
                                            <exclude>META-INF/*.RSA</exclude>
                                        </excludes>
                                    </filter>
                                </filters>
                                <transformers>
                                    <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"/>
                                </transformers>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    </project>
    
  4. flume 配置文件
    agent.sources  = r1
    agent.channels = c1
    agent.sinks = k1
    
    ## source   r1
    agent.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
    agent.sources.r1.batchSize = 5000
    agent.sources.r1.batchDurationMillis = 2000
    agent.sources.r1.kafka.bootstrap.servers = node01:9092,node02:9092,node03:9092
    agent.sources.r1.kafka.topics = test_topic
    agent.sources.r1.kafka.consumer.group.id = test_consumer
    agent.sources.r1.kafka.consumer.auto.offset.reset = earliest
    
    ## interceptor   i1
    agent.sources.r1.interceptors = i1
    agent.sources.r1.interceptors.i1.type = com.cloudera.flume.interceptor.JsonInterceptor$Builder
    ## 以逗号分隔要提取的属性
    agent.sources.r1.interceptors.i1.sourceKeys = $id,$name,$properties.$age,$properties.$address.addrDetail
    ## 配置转换后的属性名,与要提取的属性一一对应
    agent.sources.r1.interceptors.i1.targetKeys = id,name,age,addrDetail
    
    ## channel   c1
    agent.channels.c1.type = file
    agent.channels.c1.checkpointDir = /data/flume-app/channel/checkpointDir/test_topic/checkpoint
    agent.channels.c1.dataDirs = /data/flume-app/channel/checkpointDir/test_topic/data
    agent.channels.c1.maxFileSize = 2146435071
    agent.channels.c1.transactionCapacity = 10000
    agent.channels.c1.capacity = 1000000
    agent.channels.c1.keep-alive = 6
    
    ## sinks   k1
    agent.sinks.k1.type = org.apache.kudu.flume.sink.KuduSink
    agent.sinks.k1.masterAddresses = node01,node02,node03
    agent.sinks.k1.tableName = default.test
    agent.sinks.k1.batchSize = 5000
    agent.sinks.k1.producer = com.cloudera.flume.sink.JsonKuduOperationsProducer
    
    ## 拼装
    agent.sources.r1.channels = c1
    agent.sinks.k1.channel= c1
    
  5. flume 启动脚本
    #! /bin/bash
    
    case $1 in
    "start"){
             echo " --------启动 kudu-flume-sink 采集 flume-------"
             nohup flume-ng agent --conf-file kudu-flume-sink.conf --name agent -Xmx2048m -Dflume.root.logger=INFO,LOGFILE >> /data/flume-app/logs/kudu-flume-sink.log 2>&1 &
    };;
    "stop"){
            echo " --------停止 kudu-flume-sink 采集flume-------"
            ps -ef | grep kudu-flume-sink.conf | grep -v grep |awk "{print \$2}" | xargs kill
    };;
    esac
    

相关文章

网友评论

      本文标题:Flume 采集 kafka 数据实时写入 Kudu

      本文链接:https://www.haomeiwen.com/subject/dkupohtx.html