美文网首页
从kafka获取信息写入redis集群

从kafka获取信息写入redis集群

作者: pingforever | 来源:发表于2017-05-06 21:29 被阅读0次

    1. 环境

    • kafka kafka-0.10.2.1-src.tgz (asc, md5)
    • redis集群版本: 3.2.1
    • redis服务器:6台, CPU信息 : 8 Intel(R) Xeon(R) CPU E5410 @ 2.33GHz 内存:32G

    2. 问题说明

    接收GW网关设备发送的radius报文,量级在2000TPS左右,本来想用flume接收udp报文。测试时发现只能使用syslogudp方式,但是解析的报文会丢失很多字段排查后发现flume入库时报文信息被截断了。从git上查到几个udp插件,奈何不搞java了还要编译成jar包遂决定自己写。目前测试了单机从kafka获取数据并解析的效率在3W/s, 加上redis入库后降到1.5w/s

    2. kafka连接

    本来是想使用travisjeffery/jocko但是发现不支持group,所以使用的confluentinc/confluent-kafka-go 虽然星星不是太多,但是看到这个Confluent's Apache Kafka Golang client你懂的。

    使用该模块前,需要安装librdkafka,在安装时遇到

    make[1]: 离开目录“/slview/librdkafka-master/src-cpp”
    make -C examples
    make[1]: 进入目录“/slview/librdkafka-master/examples”
    gcc -g -O2 -fPIC -Wall -Wsign-compare -Wfloat-equal -Wpointer-arith -Wcast-align -I../src rdkafka_example.c -o rdkafka_example  \
            ../src/librdkafka.a -lpthread -lz -lssl -lrt
    /bin/ld: ../src/librdkafka.a(rdkafka_sasl_scram.o): undefined reference to symbol 'BIO_read'
    /bin/ld: note: 'BIO_read' is defined in DSO /usr/local/ssl/lib/libcrypto.so.1.0.0 so try adding it to the linker command line
    /usr/local/ssl/lib/libcrypto.so.1.0.0: could not read symbols: 无效的操作
    collect2: 错误:ld 返回 1
    make[1]: *** [rdkafka_example] 错误 1
    make[1]: 离开目录“/slview/librdkafka-master/examples”
    

    google好半天没发现解决方法,最后还是先避开吧。不编译ssl可以成功

    ./configure --disable-ssl
    

    连接方式:

    c, err := kafka.NewConsumer(&kafka.ConfigMap{
            "bootstrap.servers":    broker,
            "group.id":             group,
            "session.timeout.ms":   6000,
            "default.topic.config": kafka.ConfigMap{"auto.offset.reset": "smallest"}})
    

    3. redis连接

    redis 3.2在建立集群是使用的内网地址,使用对应的外网地址无法连接,不清是否是配置问题

    client := redis.NewClusterClient(&redis.ClusterOptions{
    Addrs:        []string{"172.17.50.73:6379", "172.17.50.74:6379", "172.17.50.75:6379", "172.17.50.76:6379", "172.17.50.77:6379", "172.17.50.78:6379"},
    ReadOnly:    false,
    DialTimeout:  10 * time.Second,
    ReadTimeout:  30 * time.Second,
    WriteTimeout: 30 * time.Second,
    PoolSize:    6000,
    PoolTimeout:  30 * time.Second,
    })
    
    if client == nil {
    return client, ConnectError
    }
    

    4. radius报文解析

    使用bronze1man/radius模块,代码如下:

    radinfo := make(map[string]interface{})
    
    pac, err := radius.DecodePacket(secret, p)
    if err != nil {
        log.Println("[pac.Decode]", err)
        return
    }
    
    for i := range pac.AVPs {
        Type := pac.AVPs[i].Type
        if Type == radius.EventTimestamp {
            Value := pac.AVPs[i].Decode(pac).([]uint8)
            timestamp := binary.BigEndian.Uint32([]byte(Value))
            timestr := time.Unix(int64(timestamp), 0).Format("2006-01-02 15:04:05")
            Typestr := fmt.Sprintf("%s", Type)
            radinfo[Typestr] = timestr
            //fmt.Printf("Type: %s    Value: %s\n", Type, timestr)
        } else {
            Typestr := fmt.Sprintf("%s", Type)
    
            if _, ok := Item[Typestr]; ok {
                Value := pac.AVPs[i].GetValue()
                radinfo[Typestr] = Value
            }
        }
    }
    

    5. 入库后结果

    172.17.50.76:6379> hgetall 86xxxxxxxxxx
     1) "SessionID"
     2) "73aa0e5b1fca2b97"
     3) "APN"
     4) "ctnet"
     5) "Status"
     6) "Stop"
     7) "StartTime"
     8) ""
     9) "MDN"
    10) "xxxxxxxxx"
    11) "IPAddr"
    12) "x.x.x.x"
    13) "StopTime"
    14) "2017-05-06 14:45:44"
    15) "TerminateCause"
    16) "UserRequest"
    17) "Duration"
    18) "2846"
    19) "OutputOctets"
    20) "0"
    21) "InputOctets"
    22) "0"
    

    相关文章

      网友评论

          本文标题:从kafka获取信息写入redis集群

          本文链接:https://www.haomeiwen.com/subject/ttsytxtx.html