美文网首页
Flume 采集 kafka 数据实时写入 Kudu

Flume 采集 kafka 数据实时写入 Kudu

作者: lei_charles | 来源:发表于2020-05-17 19:12 被阅读0次
    1. 创建 JsonKuduOperationsProducer.java 用于处理 Json 字符串写入Kudu
      import com.alibaba.fastjson.JSON;
      import org.apache.flume.Context;
      import org.apache.flume.Event;
      import org.apache.flume.FlumeException;
      import org.apache.flume.annotations.InterfaceAudience;
      import org.apache.flume.annotations.InterfaceStability;
      import org.apache.kudu.ColumnSchema;
      import org.apache.kudu.Schema;
      import org.apache.kudu.Type;
      import org.apache.kudu.client.KuduTable;
      import org.apache.kudu.client.Operation;
      import org.apache.kudu.client.PartialRow;
      import org.apache.kudu.flume.sink.KuduOperationsProducer;
      import org.apache.kudu.shaded.com.google.common.base.Preconditions;
      import org.apache.kudu.shaded.com.google.common.collect.Lists;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      
      import java.math.BigDecimal;
      import java.nio.charset.Charset;
      import java.util.*;
      
      
      @InterfaceAudience.Public
      @InterfaceStability.Evolving
      public class JsonKuduOperationsProducer implements KuduOperationsProducer {
      
          private static final Logger logger = LoggerFactory.getLogger(JsonKuduOperationsProducer.class);
          private static final String INSERT = "insert";
          private static final String UPSERT = "upsert";
          private static final List<String> validOperations = Lists.newArrayList(UPSERT, INSERT);
          public static final String ENCODING_PROP = "encoding";
          public static final String DEFAULT_ENCODING = "utf-8";
          public static final String OPERATION_PROP = "operation";
          public static final String DEFAULT_OPERATION = UPSERT;
      
          @Deprecated
          public static final String SKIP_MISSING_COLUMN_PROP = "skipMissingColumn";
          @Deprecated
          public static final boolean DEFAULT_SKIP_MISSING_COLUMN = false;
          @Deprecated
          public static final String SKIP_BAD_COLUMN_VALUE_PROP = "skipBadColumnValue";
          @Deprecated
          public static final boolean DEFAULT_SKIP_BAD_COLUMN_VALUE = false;
          @Deprecated
          public static final String WARN_UNMATCHED_ROWS_PROP = "skipUnmatchedRows";
          @Deprecated
          public static final boolean DEFAULT_WARN_UNMATCHED_ROWS = true;
      
          public static final String MISSING_COLUMN_POLICY_PROP = "missingColumnPolicy";
          public static final JsonKuduOperationsProducer.ParseErrorPolicy DEFAULT_MISSING_COLUMN_POLICY;
          public static final String BAD_COLUMN_VALUE_POLICY_PROP = "badColumnValuePolicy";
          public static final JsonKuduOperationsProducer.ParseErrorPolicy DEFAULT_BAD_COLUMN_VALUE_POLICY;
          public static final String UNMATCHED_ROW_POLICY_PROP = "unmatchedRowPolicy";
          public static final JsonKuduOperationsProducer.ParseErrorPolicy DEFAULT_UNMATCHED_ROW_POLICY;
      
          private KuduTable table;
          private Charset charset;
          private String operation;
      
          private JsonKuduOperationsProducer.ParseErrorPolicy missingColumnPolicy;
          private JsonKuduOperationsProducer.ParseErrorPolicy badColumnValuePolicy;
          private JsonKuduOperationsProducer.ParseErrorPolicy unmatchedRowPolicy;
      
      
          public JsonKuduOperationsProducer() {
      
          }
      
      
          @Override
          public void configure(Context context) {
      
              String charsetName = context.getString(ENCODING_PROP, DEFAULT_ENCODING);
              try {
                  charset = Charset.forName(charsetName);
              } catch (IllegalArgumentException e) {
                  throw new FlumeException(
                          String.format("Invalid or unsupported charset %s", charsetName), e);
              }
              this.operation = context.getString(OPERATION_PROP, DEFAULT_OPERATION).toLowerCase();
              Preconditions.checkArgument(validOperations.contains(this.operation),
                      "Unrecognized operation '%s'", this.operation);
      
              this.missingColumnPolicy = this.getParseErrorPolicyCheckingDeprecatedProperty(
                      context, SKIP_MISSING_COLUMN_PROP, MISSING_COLUMN_POLICY_PROP,
                      JsonKuduOperationsProducer.ParseErrorPolicy.WARN,
                      JsonKuduOperationsProducer.ParseErrorPolicy.REJECT, DEFAULT_MISSING_COLUMN_POLICY);
              this.badColumnValuePolicy = this.getParseErrorPolicyCheckingDeprecatedProperty(
                      context, SKIP_BAD_COLUMN_VALUE_PROP, BAD_COLUMN_VALUE_POLICY_PROP,
                      JsonKuduOperationsProducer.ParseErrorPolicy.WARN,
                      JsonKuduOperationsProducer.ParseErrorPolicy.REJECT, DEFAULT_BAD_COLUMN_VALUE_POLICY);
              this.unmatchedRowPolicy = this.getParseErrorPolicyCheckingDeprecatedProperty(
                      context, WARN_UNMATCHED_ROWS_PROP, UNMATCHED_ROW_POLICY_PROP,
                      JsonKuduOperationsProducer.ParseErrorPolicy.WARN,
                      JsonKuduOperationsProducer.ParseErrorPolicy.IGNORE, DEFAULT_UNMATCHED_ROW_POLICY);
          }
      
          @Override
          public void initialize(KuduTable kuduTable) {
              this.table = kuduTable;
          }
      
          @Override
          public List<Operation> getOperations(Event event) throws FlumeException {
              String raw = new String(event.getBody(), charset);
              Map<String, String> rawMap = JSON.parseObject(raw, HashMap.class);
              Schema schema = this.table.getSchema();
              List<Operation> ops = Lists.newArrayList();
              if (raw != null && !raw.isEmpty()) {
                  Operation op;
                  switch (operation) {
                      case UPSERT:
                          op = this.table.newUpsert();
                          break;
                      case INSERT:
                          op = this.table.newInsert();
                          break;
                      default:
                          throw new FlumeException(
                                  String.format("Unrecognized operation type '%s' in getOperations(): " +
                                          "this should never happen!", this.operation));
                  }
                  PartialRow row = op.getRow();
      
                  Iterator iterator = schema.getColumns().iterator();
                  while (iterator.hasNext()) {
                      ColumnSchema col = (ColumnSchema) iterator.next();
      //                logger.error("Column:" + col.getName() + "----" + rawMap.get(col.getName()) + "----" + col.getType());
                      String msg;
                      try {
                          this.coerceAndSet(rawMap.get(col.getName()), col.getName(), col.getType(), row);
                      } catch (NumberFormatException e) {
                          msg = String.format("Raw value '%s' couldn't be parsed to type %s for column '%s'",
                                  raw, col.getType(), col.getName());
                          this.logOrThrow(this.badColumnValuePolicy, msg, e);
                      } catch (IllegalArgumentException e) {
                          msg = String.format("Column '%s' has no matching group in '%s'", col.getName(), raw);
                          this.logOrThrow(this.missingColumnPolicy, msg, e);
                      } catch (Exception e) {
                          throw new FlumeException("Failed to create Kudu operation", e);
                      }
                  }
                  ops.add(op);
              }
              return ops;
          }
      
          private void coerceAndSet(String rawVal, String colName, Type type, PartialRow row) throws NumberFormatException {
              switch (type) {
                  case BOOL:
                      row.addBoolean(colName, Boolean.parseBoolean(rawVal));
                      break;
                  case INT8:
                      row.addByte(colName, Byte.parseByte(rawVal));
                      break;
                  case INT16:
                      row.addShort(colName, Short.parseShort(rawVal));
                      break;
                  case INT32:
                      row.addInt(colName, Integer.parseInt(rawVal));
                      break;
                  case INT64:
                  case UNIXTIME_MICROS:
                      row.addLong(colName, Long.parseLong(rawVal));
                      break;
                  case FLOAT:
                      row.addFloat(colName, Float.parseFloat(rawVal));
                      break;
                  case DOUBLE:
                      row.addDouble(colName, Double.parseDouble(rawVal));
                      break;
                  case DECIMAL:
                      row.addDecimal(colName, new BigDecimal(rawVal));
                      break;
                  case BINARY:
                      row.addBinary(colName, rawVal.getBytes(this.charset));
                      break;
                  case STRING:
                      row.addString(colName, rawVal == null ? "" : rawVal);
                      break;
                  default:
                      logger.warn("got unknown type {} for column '{}'-- ignoring this column", type, colName);
              }
      
          }
      
          private void logOrThrow(JsonKuduOperationsProducer.ParseErrorPolicy policy, String msg, Exception e) throws FlumeException {
              switch (policy) {
                  case REJECT:
                      throw new FlumeException(msg, e);
                  case WARN:
                      logger.warn(msg, e);
                  case IGNORE:
                  default:
              }
          }
      
          @Override
          public void close() {
      
          }
      
          private JsonKuduOperationsProducer.ParseErrorPolicy getParseErrorPolicyCheckingDeprecatedProperty(
                  Context context, String deprecatedPropertyName, String newPropertyName,
                  JsonKuduOperationsProducer.ParseErrorPolicy trueValue,
                  JsonKuduOperationsProducer.ParseErrorPolicy falseValue,
                  JsonKuduOperationsProducer.ParseErrorPolicy defaultValue) {
              JsonKuduOperationsProducer.ParseErrorPolicy policy;
              if (context.containsKey(deprecatedPropertyName)) {
                  logger.info("Configuration property {} is deprecated. Use {} instead.", deprecatedPropertyName, newPropertyName);
                  Preconditions.checkArgument(!context.containsKey(newPropertyName), "Both {} and {} specified. Use only one of them, preferably {}.", deprecatedPropertyName, newPropertyName, newPropertyName);
                  policy = context.getBoolean(deprecatedPropertyName) ? trueValue : falseValue;
              } else {
                  String policyString = context.getString(newPropertyName, defaultValue.name());
                  try {
                      policy = JsonKuduOperationsProducer.ParseErrorPolicy.valueOf(policyString.toUpperCase());
                  } catch (IllegalArgumentException var10) {
                      throw new IllegalArgumentException("Unknown policy '" + policyString + "'. Use one of the following: " + Arrays.toString(JsonKuduOperationsProducer.ParseErrorPolicy.values()), var10);
                  }
              }
              return policy;
          }
      
          static {
              DEFAULT_MISSING_COLUMN_POLICY = JsonKuduOperationsProducer.ParseErrorPolicy.REJECT;
              DEFAULT_BAD_COLUMN_VALUE_POLICY = JsonKuduOperationsProducer.ParseErrorPolicy.REJECT;
              DEFAULT_UNMATCHED_ROW_POLICY = JsonKuduOperationsProducer.ParseErrorPolicy.WARN;
          }
      
          private static enum ParseErrorPolicy {
              WARN,
              IGNORE,
              REJECT;
      
              private ParseErrorPolicy() {
              }
          }
      }
      
    2. 编写处理复杂 json 格式的拦截器
      import com.alibaba.fastjson.JSON;
      import com.alibaba.fastjson.JSONObject;
      import org.apache.flume.Context;
      import org.apache.flume.Event;
      import org.apache.flume.interceptor.Interceptor;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      
      import java.nio.charset.Charset;
      import java.util.HashMap;
      import java.util.Iterator;
      import java.util.List;
      import java.util.Map;
      
      public class JsonInterceptor implements Interceptor {
      
          private static final Logger logger = LoggerFactory.getLogger(JsonInterceptor.class);
          private boolean preserveExisting;
          private String sourceKeys;
          private String targetKeys;
      
          private JsonInterceptor() {
      
          }
      
          private JsonInterceptor(boolean preserveExisting, String sourceKeys, String targetKeys) {
              this.preserveExisting = preserveExisting;
              this.sourceKeys = sourceKeys;
              this.targetKeys = targetKeys;
          }
      
          public void initialize() {
          }
      
          public Event intercept(Event event) {
              byte[] body = event.getBody();
              String bodyStr = new String(body);
              String[] sourceArray = sourceKeys.trim().split(",", -1);
              String[] targetArray = targetKeys.trim().toLowerCase().split(",", -1);
              Map resultMap = new HashMap<String, String>();
              if (sourceArray.length == targetArray.length) {
                  JSONObject jsonObject = JSONObject.parseObject(bodyStr);
                  JSONObject jsonObjectTemp = null;
                  String[] arrayTemp = null;
                  for (int i = 0; i < sourceArray.length; i++) {
                      if (sourceArray[i].contains(".")) {
                          arrayTemp = sourceArray[i].trim().split("\\.", -1);
                          jsonObjectTemp = jsonObject;
                          for (int j = 0; j < arrayTemp.length - 1; j++) {
                              if (jsonObjectTemp != null) {
                                  jsonObjectTemp = jsonObjectTemp.getJSONObject(arrayTemp[j].trim());
                              }else {
                                  break;
                              }
                          }
                          if (jsonObjectTemp != null){
                              resultMap.put(targetArray[i].trim(), String.valueOf(jsonObjectTemp.getOrDefault(arrayTemp[arrayTemp.length - 1], "")));
                          }else {
                              resultMap.put(targetArray[i].trim(), "");
                          }
                      } else {
                          resultMap.put(targetArray[i].trim(), String.valueOf(jsonObject.getOrDefault(sourceArray[i], "")));
                      }
                  }
              } else {
                  logger.error("The sourceKeys and targetkeys lengths do not match");
              }
              event.setBody(JSON.toJSONString(resultMap).getBytes(Charset.forName("UTF-8")));
              return event;
          }
      
          public List<Event> intercept(List<Event> events) {
              Iterator i$ = events.iterator();
              while (i$.hasNext()) {
                  Event event = (Event) i$.next();
                  this.intercept(event);
              }
              return events;
          }
      
          public void close() {
          }
      
          public static class Constants {
              public static String PRESERVE = "preserveExisting";
              public static String SOURCE_KEYS = "sourceKeys";
              public static String TARGET_KEYS = "targetKeys";
              public static boolean PRESERVE_DFLT = false;
      
              public Constants() {
              }
          }
      
          public static class Builder implements Interceptor.Builder {
              private boolean preserveExisting;
              private String sourceKeys;
              private String targetKeys;
      
              public Builder() {
                  this.preserveExisting = JsonInterceptor.Constants.PRESERVE_DFLT;
              }
      
              public Interceptor build() {
      
                  return new JsonInterceptor(this.preserveExisting, this.sourceKeys, this.targetKeys);
              }
      
              public void configure(Context context) {
                  this.preserveExisting = context.getBoolean(JsonInterceptor.Constants.PRESERVE, JsonInterceptor.Constants.PRESERVE_DFLT);
                  this.sourceKeys = context.getString(Constants.SOURCE_KEYS);
                  this.targetKeys = context.getString(Constants.TARGET_KEYS);
              }
          }
      }
      
    3. pom.xml
      <?xml version="1.0" encoding="UTF-8"?>
      <project xmlns="http://maven.apache.org/POM/4.0.0"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
          <modelVersion>4.0.0</modelVersion>
      
          <groupId>com.cloudera</groupId>
          <artifactId>flume-demo</artifactId>
          <version>1.0-SNAPSHOT</version>
          <name>flume-demo</name>
      
      
          <repositories>
              <!-- cloudera 的仓库 -->
              <repository>
                  <id>cloudera</id>
                  <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
                  <name>Cloudera Repositories</name>
                  <releases>
                      <enabled>true</enabled>
                  </releases>
                  <snapshots>
                      <enabled>false</enabled>
                  </snapshots>
              </repository>
              <!-- maven 的仓库 -->
              <repository>
                  <id>aliyun</id>
                  <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
                  <name>Maven2 Repositories</name>
                  <snapshots>
                      <enabled>false</enabled>
                  </snapshots>
              </repository>
          </repositories>
      
          <properties>
              <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
              <maven.compiler.source>1.8</maven.compiler.source>
              <maven.compiler.target>1.8</maven.compiler.target>
              <avro.version>1.8.2-cdh6.3.2</avro.version>
              <flume.version>1.9.0-cdh6.3.2</flume.version>
              <hadoop.version>3.0.0-cdh6.3.2</hadoop.version>
              <kudu.version>1.10.0-cdh6.3.2</kudu.version>
              <fastjson.version>1.2.58</fastjson.version>
              <slf4j.version>1.7.12</slf4j.version>
          </properties>
      
          <dependencies>
              <dependency>
                  <groupId>org.apache.flume</groupId>
                  <artifactId>flume-ng-core</artifactId>
                  <version>${flume.version}</version>
                  <scope>provided</scope>
              </dependency>
              <dependency>
                  <groupId>org.apache.flume</groupId>
                  <artifactId>flume-ng-sdk</artifactId>
                  <version>${flume.version}</version>
                  <scope>provided</scope>
              </dependency>
              <dependency>
                  <groupId>org.apache.flume</groupId>
                  <artifactId>flume-ng-configuration</artifactId>
                  <version>${flume.version}</version>
                  <scope>provided</scope>
              </dependency>
              <dependency>
                  <groupId>com.alibaba</groupId>
                  <artifactId>fastjson</artifactId>
                  <version>${fastjson.version}</version>
              </dependency>
              <dependency>
                  <groupId>org.apache.kudu</groupId>
                  <artifactId>kudu-client</artifactId>
                  <version>${kudu.version}</version>
              </dependency>
              <dependency>
                  <groupId>org.apache.kudu</groupId>
                  <artifactId>kudu-flume-sink</artifactId>
                  <version>${kudu.version}</version>
              </dependency>
              <dependency>
                  <groupId>org.apache.avro</groupId>
                  <artifactId>avro</artifactId>
                  <version>${avro.version}</version>
                  <scope>provided</scope>
              </dependency>
              <dependency>
                  <groupId>org.apache.hadoop</groupId>
                  <artifactId>hadoop-client</artifactId>
                  <version>${hadoop.version}</version>
                  <scope>provided</scope>
              </dependency>
              <dependency>
                  <groupId>org.slf4j</groupId>
                  <artifactId>slf4j-api</artifactId>
                  <version>${slf4j.version}</version>
                  <scope>provided</scope>
              </dependency>
          </dependencies>
      
      
          <build>
              <defaultGoal>install</defaultGoal>
              <sourceDirectory>src/main/java</sourceDirectory>
              <!--<testSourceDirectory>src/test/scala</testSourceDirectory>-->
              <resources>
                  <resource>
                      <directory>src/main/resources/</directory>
                      <excludes>
                          <exclude>env/*/*</exclude>
                      </excludes>
                      <includes>
                          <include>**/*</include>
                      </includes>
                  </resource>
                  <resource>
                      <directory>src/main/resources/env/${profile.active}</directory>
                      <includes>
                          <include>**/*.properties</include>
                          <include>**/*.xml</include>
                      </includes>
                  </resource>
              </resources>
              <pluginManagement>
                  <plugins>
                      <plugin>
                          <groupId>org.apache.maven.plugins</groupId>
                          <artifactId>maven-compiler-plugin</artifactId>
                          <version>3.8.0</version>
                          <configuration>
                              <source>1.8</source>
                              <target>1.8</target>
                          </configuration>
                      </plugin>
                      <plugin>
                          <groupId>org.apache.maven.plugins</groupId>
                          <artifactId>maven-resources-plugin</artifactId>
                          <version>3.0.2</version>
                          <configuration>
                              <encoding>UTF-8</encoding>
                          </configuration>
                      </plugin>
                      <plugin>
                          <groupId>net.alchim31.maven</groupId>
                          <artifactId>scala-maven-plugin</artifactId>
                          <version>3.2.2</version>
                          <executions>
                              <execution>
                                  <goals>
                                      <goal>compile</goal>
                                      <goal>testCompile</goal>
                                  </goals>
                              </execution>
                          </executions>
                      </plugin>
                      <plugin>
                          <groupId>org.apache.maven.plugins</groupId>
                          <artifactId>maven-resources-plugin</artifactId>
                          <version>3.0.2</version>
                          <configuration>
                              <encoding>UTF-8</encoding>
                          </configuration>
                      </plugin>
                  </plugins>
              </pluginManagement>
              <plugins>
                  <plugin>
                      <groupId>net.alchim31.maven</groupId>
                      <artifactId>scala-maven-plugin</artifactId>
                      <executions>
                          <execution>
                              <id>scala-compile-first</id>
                              <phase>process-resources</phase>
                              <goals>
                                  <goal>add-source</goal>
                                  <goal>compile</goal>
                              </goals>
                          </execution>
                          <execution>
                              <id>scala-test-compile</id>
                              <phase>process-test-resources</phase>
                              <goals>
                                  <goal>testCompile</goal>
                              </goals>
                          </execution>
                      </executions>
                  </plugin>
      
                  <plugin>
                      <groupId>org.apache.maven.plugins</groupId>
                      <artifactId>maven-compiler-plugin</artifactId>
                      <executions>
                          <execution>
                              <phase>compile</phase>
                              <goals>
                                  <goal>compile</goal>
                              </goals>
                          </execution>
                      </executions>
                  </plugin>
      
                  <plugin>
                      <groupId>org.apache.maven.plugins</groupId>
                      <artifactId>maven-shade-plugin</artifactId>
                      <version>3.2.1</version>
                      <configuration>
                          <createDependencyReducedPom>false</createDependencyReducedPom>
                      </configuration>
                      <executions>
                          <execution>
                              <phase>package</phase>
                              <goals>
                                  <goal>shade</goal>
                              </goals>
                              <configuration>
                                  <filters>
                                      <filter>
                                          <artifact>*:*</artifact>
                                          <excludes>
                                              <exclude>META-INF/*.SF</exclude>
                                              <exclude>META-INF/*.DSA</exclude>
                                              <exclude>META-INF/*.RSA</exclude>
                                          </excludes>
                                      </filter>
                                  </filters>
                                  <transformers>
                                      <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"/>
                                  </transformers>
                              </configuration>
                          </execution>
                      </executions>
                  </plugin>
              </plugins>
          </build>
      </project>
      
    4. flume 配置文件
      agent.sources  = r1
      agent.channels = c1
      agent.sinks = k1
      
      ## source   r1
      agent.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
      agent.sources.r1.batchSize = 5000
      agent.sources.r1.batchDurationMillis = 2000
      agent.sources.r1.kafka.bootstrap.servers = node01:9092,node02:9092,node03:9092
      agent.sources.r1.kafka.topics = test_topic
      agent.sources.r1.kafka.consumer.group.id = test_consumer
      agent.sources.r1.kafka.consumer.auto.offset.reset = earliest
      
      ## interceptor   i1
      agent.sources.r1.interceptors = i1
      agent.sources.r1.interceptors.i1.type = com.cloudera.flume.interceptor.JsonInterceptor$Builder
      ## 以逗号分隔要提取的属性
      agent.sources.r1.interceptors.i1.sourceKeys = $id,$name,$properties.$age,$properties.$address.addrDetail
      ## 配置转换后的属性名,与要提取的属性一一对应
      agent.sources.r1.interceptors.i1.targetKeys = id,name,age,addrDetail
      
      ## channel   c1
      agent.channels.c1.type = file
      agent.channels.c1.checkpointDir = /data/flume-app/channel/checkpointDir/test_topic/checkpoint
      agent.channels.c1.dataDirs = /data/flume-app/channel/checkpointDir/test_topic/data
      agent.channels.c1.maxFileSize = 2146435071
      agent.channels.c1.transactionCapacity = 10000
      agent.channels.c1.capacity = 1000000
      agent.channels.c1.keep-alive = 6
      
      ## sinks   k1
      agent.sinks.k1.type = org.apache.kudu.flume.sink.KuduSink
      agent.sinks.k1.masterAddresses = node01,node02,node03
      agent.sinks.k1.tableName = default.test
      agent.sinks.k1.batchSize = 5000
      agent.sinks.k1.producer = com.cloudera.flume.sink.JsonKuduOperationsProducer
      
      ## 拼装
      agent.sources.r1.channels = c1
      agent.sinks.k1.channel= c1
      
    5. flume 启动脚本
      #! /bin/bash
      
      case $1 in
      "start"){
               echo " --------启动 kudu-flume-sink 采集 flume-------"
               nohup flume-ng agent --conf-file kudu-flume-sink.conf --name agent -Xmx2048m -Dflume.root.logger=INFO,LOGFILE >> /data/flume-app/logs/kudu-flume-sink.log 2>&1 &
      };;
      "stop"){
              echo " --------停止 kudu-flume-sink 采集flume-------"
              ps -ef | grep kudu-flume-sink.conf | grep -v grep |awk "{print \$2}" | xargs kill
      };;
      esac
      

    相关文章

      网友评论

          本文标题:Flume 采集 kafka 数据实时写入 Kudu

          本文链接:https://www.haomeiwen.com/subject/dkupohtx.html