美文网首页
使用 Perl 6 解析 Kafka 消息并入库

使用 Perl 6 解析 Kafka 消息并入库

作者: 焉知非鱼 | 来源:发表于2018-07-21 00:27 被阅读27次

    Kafka 里面保存的是字节数组,使用 Archive::Libarchive 模块解压缩出其中的文本文件, 读取文本内容作为 post 请求的参数,将返回的结果入库:

    use PKafka::Consumer;
    use PKafka::Message;
    use PKafka::Producer;
    use Archive::Libarchive; 
    use Archive::Libarchive::Constants;
    use Cro::HTTP::Client;
    use JSON::Fast;
    use JSON::Path;
    use DBIish;
    
    sub MAIN ()
    {
        my $brokers = "127.0.0.1";
        my $test = PKafka::Consumer.new( topic=>"mytopic", brokers=>$brokers);
    
        $test.messages.tap(-> $msg
        {
            given $msg
            {
                when PKafka::Message
                {
                    say "got offset: {$msg.offset}";
                    my $a = Archive::Libarchive.new: operation => LibarchiveRead, file => $msg.payload;
                    my Archive::Libarchive::Entry $e .= new;
    
                    my ($taskid, $log, $result) = ('', '', '');
                    while $a.next-header($e) {
                        $log = get-log($a,$e) if $e.pathname.ends-with('_log.txt');
                        ($taskid, $result) = get-result($a,$e) if $e.pathname.ends-with('_result.txt');
                    }
    
                    my $json = request_ads($taskid, $log, $result);   # 获取 json
                    my @values =  parse-json($json);                  # 解析 json, 提取出 sql value
                    write2db(@values);                                # 写数据库
                }
                when PKafka::EOF
                {
                    say "Messages Consumed { $msg.total-consumed}";
                }
                when PKafka::Error
                {
                    say "Error {$msg.what}";
                    $test.stop;
                }
            }
        });
    
        my $t1 = $test.consume-from-beginning(partition=>0);
    
        await $t1;
    }
    
    #| 获取 log # encoding: https://stackoverflow.com/questions/50674498/perl6-malformed-utf-8-causes-program-crash
    sub get-log($a, $e) {
        return $a.read-file-content($e).decode('UTF8-C8');
    }
    
    #| 获取 taskid 和 result
    sub get-result($a, $e) {
        my $log-result = $a.read-file-content($e).decode('UTF8-C8');
        my $taskid = $log-result.lines[0].split(":")[1];  # 获取 taskid
        my $result = $log-result.lines[1].split(":")[1];  # 获取 result 的内容
        return ($taskid, $result);
    }
    
    #| 请求 ADS
    sub request_ads($taskid, $log, $result) {
        my $client = Cro::HTTP::Client.new: content-type => 'application/json';
        my %rds = taskId => $taskid, :$log, :$result;
        my $resp = await $client.post: 'http://10.10.10.25/test', body => %rds;
        my $json = to-json await $resp.body-text;
        return $json;
    }
    
    #| 解析 JSON
    sub parse-json($json) {
        my $oj     = from-json($json);
        my $path   = JSON::Path.new('$.data.dtcs.ecu[0]');
        my $ecuid  = $path.values($oj)[0].{'ecuid'};
        my $dtcnum = $path.values($oj)[0].{'dtcnum'};
        my $dtc    = $path.values($oj)[0].{'dtc'};
        
        my $taskid  = JSON::Path.new('$.taskId').values($oj)[0];
        my $vtype   = JSON::Path.new('$.vtype').values($oj)[0];
        my $ecunum  = JSON::Path.new('$.ecunum').values($oj)[0];
        my $funid   = JSON::Path.new('$.funid').values($oj)[0];
    
        return ($taskid, $vtype, $funid, $ecuid, $dtcnum, $dtc);
    }
    
    #| 写数据库
    sub write2db(@values) {
        my $dbh = DBIish.connect("mysql", :database<xxx>, :host<127.0.0.1>, :user<root>, :password<xxx>, :port<6606>, :RaiseError);
        my ($taskid, $vtype, $funid, $ecuid, $dtcnum, $dtc) = @values;
        my $sth = $dbh.prepare(q:to/STATEMENT/);
            insert into test (taskid, vtype, funid, ecuid, dtcnum, dtc)
            values (?,?,?,?,?,?) 
            ON DUPLICATE KEY 
            UPDATE vtype=?, funid=?, ecuid=?, dtcnum=?, dtc=?
        STATEMENT
    
        $sth.execute($taskid, $vtype, $funid, $ecuid, $dtcnum, $dtc,$vtype, $funid, $ecuid, $dtcnum, $dtc);
    
        $sth.finish;
        $dbh.dispose;
    }
    

    相关文章

      网友评论

          本文标题:使用 Perl 6 解析 Kafka 消息并入库

          本文链接:https://www.haomeiwen.com/subject/caycmftx.html