美文网首页
使用 Python 解压缩 gzip 数据流

使用 Python 解压缩 gzip 数据流

作者: 焉知非鱼 | 来源:发表于2018-07-17 21:21 被阅读707次

有一个使用场景:将 tar.gz 读成字节数组(byte array) 然后发往 Kafka。我要查看 Kafka 里面的消息, 直接查看的话就是二进制的乱码,首先想到的是把 Kafka 里面保存的字节数组存储到本地,每一行存成一个 .tar.gz 文件:

# -*- coding: utf-8 -*-
import sys,os
from confluent_kafka import Consumer, KafkaError
import gzip

def run(args):

    c = Consumer({
        "bootstrap.servers" : args[1], #  broker
        'group.id': args[2],           # group id, 每次不一样
        'default.topic.config': {
            'auto.offset.reset': 'earliest'
        }
    })
    
    c.subscribe([ args[3] ]); #  topic
    i=0
    while True:
        msg = c.poll(1.0)
    
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                continue
            else:
                print(msg.error())
                break
       
        try: 
            outF = file( '/data/app/tar/' + str(i) + ".tar.gz", 'wb')
            outF.write(msg.value())
            outF.close()
            i+=1
            print i
        except Exception,e:
            print e
    c.close()
       

if  __name__ == '__main__':
    print sys.argv 
    run(sys.argv)

这样就能打开压缩文件查看了。但是这样也不方便。所以要直接将字节数组解压成文本文件:

# -*- coding: utf-8 -*-
import sys,os
from confluent_kafka import Consumer, KafkaError
import gzip,io
import zlib

def run(args):

    c = Consumer({
        "bootstrap.servers" : args[1], 
        'group.id': args[2],           # group id
        'default.topic.config': {
            'auto.offset.reset': 'earliest'
        }
    })
    
    c.subscribe([ args[3] ]); # topic
    i=0
    while True:
        msg = c.poll(1.0)
    
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                continue
            else:
                print(msg.error())
                break
        try: 
            decompressed_data = zlib.decompress(msg.value(),zlib.MAX_WBITS|32)  # header 自动检测
            print decompressed_data
        except Exception,e:
            print e
    c.close()
       

if  __name__ == '__main__':
    print sys.argv 
    run(sys.argv)

解压缩字节数组并读取文件内容

# -*- coding: utf-8 -*-
import sys,os
from confluent_kafka import Consumer, KafkaError
import tarfile, io

def run(args):

    c = Consumer({
        "bootstrap.servers" : args[1], # "10.10.20.11:9092"
        'group.id': args[2],           # group id, 每次不一样
        'default.topic.config': {
            'auto.offset.reset': 'earliest'
        }
    })
    
    c.subscribe([ args[3] ]); # dc-diagnostic-report
    i=0
    while True:
        msg = c.poll(1.0)
    
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                continue
            else:
                print(msg.error())
                break
        try: 
            file_like_object = io.BytesIO(msg.value())
            tar = tarfile.open(fileobj=file_like_object)
            # use "tar" as a regular TarFile object
            for member in tar.getmembers():
                print(member)
                f = tar.extractfile(member)
                content = f.read() 
                print(content)
                
            tar.close()
        except Exception,e:
            print e
    c.close()
       

if  __name__ == '__main__':
    print sys.argv 
    run(sys.argv)

参考链接

相关文章

网友评论

      本文标题:使用 Python 解压缩 gzip 数据流

      本文链接:https://www.haomeiwen.com/subject/paagpftx.html