Kafka 里面保存的是字节数组,使用 Archive::Libarchive
模块解压缩出其中的文本文件, 读取文本内容作为 post 请求的参数,将返回的结果入库:
use PKafka::Consumer;
use PKafka::Message;
use PKafka::Producer;
use Archive::Libarchive;
use Archive::Libarchive::Constants;
use Cro::HTTP::Client;
use JSON::Fast;
use JSON::Path;
use DBIish;
sub MAIN ()
{
my $brokers = "127.0.0.1";
my $test = PKafka::Consumer.new( topic=>"mytopic", brokers=>$brokers);
$test.messages.tap(-> $msg
{
given $msg
{
when PKafka::Message
{
say "got offset: {$msg.offset}";
my $a = Archive::Libarchive.new: operation => LibarchiveRead, file => $msg.payload;
my Archive::Libarchive::Entry $e .= new;
my ($taskid, $log, $result) = ('', '', '');
while $a.next-header($e) {
$log = get-log($a,$e) if $e.pathname.ends-with('_log.txt');
($taskid, $result) = get-result($a,$e) if $e.pathname.ends-with('_result.txt');
}
my $json = request_ads($taskid, $log, $result); # 获取 json
my @values = parse-json($json); # 解析 json, 提取出 sql value
write2db(@values); # 写数据库
}
when PKafka::EOF
{
say "Messages Consumed { $msg.total-consumed}";
}
when PKafka::Error
{
say "Error {$msg.what}";
$test.stop;
}
}
});
my $t1 = $test.consume-from-beginning(partition=>0);
await $t1;
}
#| 获取 log # encoding: https://stackoverflow.com/questions/50674498/perl6-malformed-utf-8-causes-program-crash
sub get-log($a, $e) {
return $a.read-file-content($e).decode('UTF8-C8');
}
#| 获取 taskid 和 result
sub get-result($a, $e) {
my $log-result = $a.read-file-content($e).decode('UTF8-C8');
my $taskid = $log-result.lines[0].split(":")[1]; # 获取 taskid
my $result = $log-result.lines[1].split(":")[1]; # 获取 result 的内容
return ($taskid, $result);
}
#| 请求 ADS
sub request_ads($taskid, $log, $result) {
my $client = Cro::HTTP::Client.new: content-type => 'application/json';
my %rds = taskId => $taskid, :$log, :$result;
my $resp = await $client.post: 'http://10.10.10.25/test', body => %rds;
my $json = to-json await $resp.body-text;
return $json;
}
#| 解析 JSON
sub parse-json($json) {
my $oj = from-json($json);
my $path = JSON::Path.new('$.data.dtcs.ecu[0]');
my $ecuid = $path.values($oj)[0].{'ecuid'};
my $dtcnum = $path.values($oj)[0].{'dtcnum'};
my $dtc = $path.values($oj)[0].{'dtc'};
my $taskid = JSON::Path.new('$.taskId').values($oj)[0];
my $vtype = JSON::Path.new('$.vtype').values($oj)[0];
my $ecunum = JSON::Path.new('$.ecunum').values($oj)[0];
my $funid = JSON::Path.new('$.funid').values($oj)[0];
return ($taskid, $vtype, $funid, $ecuid, $dtcnum, $dtc);
}
#| 写数据库
sub write2db(@values) {
my $dbh = DBIish.connect("mysql", :database<xxx>, :host<127.0.0.1>, :user<root>, :password<xxx>, :port<6606>, :RaiseError);
my ($taskid, $vtype, $funid, $ecuid, $dtcnum, $dtc) = @values;
my $sth = $dbh.prepare(q:to/STATEMENT/);
insert into test (taskid, vtype, funid, ecuid, dtcnum, dtc)
values (?,?,?,?,?,?)
ON DUPLICATE KEY
UPDATE vtype=?, funid=?, ecuid=?, dtcnum=?, dtc=?
STATEMENT
$sth.execute($taskid, $vtype, $funid, $ecuid, $dtcnum, $dtc,$vtype, $funid, $ecuid, $dtcnum, $dtc);
$sth.finish;
$dbh.dispose;
}
网友评论