美文网首页
ffmpeg开发——如何解析MP4,AAC,H.264码流

ffmpeg开发——如何解析MP4,AAC,H.264码流

作者: 拉丁吴 | 来源:发表于2024-05-11 23:59 被阅读0次

    前言

    我们在前面的几篇文章中介绍了MP4的视频文件格式,以及MP4文件内封装的AAC音频数据格式和H.264视频码流。

    FFmpeg开发——深入理解MP4文件格式

    FFmpeg开发——AAC音频格式解析

    FFmpeg开发——初探H.264

    但是上述分析主要停留在知识层面,因此文本主要讲述通过代码来实现对上述格式的数据(文件)进行正确的解析,从而获得一些有效的信息。

    目前代码已经上传到github上,有兴趣可以自取。

    读取MP4文件信息

    本次的解析功能主要使用python来实现(主要比较简单用起来顺手,其他编程语言比如Java,kotlin,c++等效果是一样的 )。

    
    
    class Box(object):
        # 通用的4字节
        one_bytes = 1
        two_bytes = 2
        three_bytes = 3
        four_bytes = 4
        eight_bytes = 8
    
        # 当前box的大小
        box_size = 0
        box_header_size = 8
        box_type = ''
        # fullbox 的version字段 int
        box_version = 0
        # 当前box是否是 FullBox
        isFullBox = False
    
        header_read_already = False
    
        def __init__(self, box_type=None, size=None):
            self.box_size = size
            self.box_type = box_type
            if box_type is not None and size is not None:
                self.header_read_already = True
    
        def findBoxHeader(self, file):
            self.box_size = int.from_bytes(file.read(self.four_bytes))
            self.box_type = file.read(self.four_bytes).decode()
            return self.box_type, self.box_size
    
        def print_origin_for_test(self, file):
            if not self.header_read_already:
                self.findBoxHeader(file)
            last = file.read(self.box_size - self.four_bytes)
            print("{0} box,size:{1} last byte:{2}".format(self.box_type, self.box_size, last))
    
        def printSelf(self, file):
            if not self.header_read_already:
                self.findBoxHeader(file)
            print(
                "\n============================================== {0} box ==========================================".format(
                    self.box_type))
            print("type:{0}  ".format( self.box_type))
            print("size:{0}  ".format( self.box_size))
            if self.box_size == 1:
                box_large_size = file.read(self.eight_bytes)
                print("large_size:{0}  ".format(box_large_size))
                self.box_header_size = self.box_header_size + 8
            if self.isFullBox:
                box_version = int.from_bytes(file.read(self.one_bytes))
                box_flags = int.from_bytes(file.read(self.three_bytes))
                print("box_version:{0}  ".format(box_version))
                print("box_flags:{0}  ".format(box_flags))
                self.box_header_size = self.box_header_size + 4
    
            # 是否读取了下一个box的头部信息,读了的话要返回回去
            return None,None
    
    class FtypBox(Box):
        used_bytes = 16
    
        def printSelf(self, file):
            super().printSelf(file)
    
            minor_brand = file.read(self.four_bytes).decode()
            minor_version = int.from_bytes(file.read(self.four_bytes))
            last_size = self.box_size - self.used_bytes
            index = 0
            compatible_brand = []
            while index < last_size:
                count = 4
                compatible_brand_item = file.read(count).decode()
                compatible_brand.append(compatible_brand_item)
                index = index + count
    
            print("minor_brand:{0}  ".format(minor_brand))
            print("minor_version:{0}  ".format(minor_version))
            print("compatible_brand:{0}  ".format(str(compatible_brand)))
            return None,None
    
    class MoovBox(Box):
        def printSelf(self, file):
            super().printSelf(file)
            type, size = Box().findBoxHeader(file)
    
            while type=='mvhd' or type == 'trak':
                if type == 'mvhd':
                    mvhd_box = MvhdBox(type,size)
                    type, size = mvhd_box.printSelf(file)
    
                elif type == 'trak':
                    trak_box = TrakBox(type,size)
                    type, size = trak_box.printSelf(file)
    
                if type is None and size is None:
                    type, size = Box().findBoxHeader(file)
            return type,size;
    
    class MvhdBox(Box):
    
        def __init__(self, box_type=None, size=None):
            self.isFullBox = True
            super().__init__(box_type, size)
    
        def printSelf(self, file):
            super().printSelf(file)
            if self.box_version == 1:
                creation_time = int.from_bytes(file.read(self.eight_bytes))
                modification_time = int.from_bytes(file.read(self.eight_bytes))
                timescale = int.from_bytes(file.read(self.four_bytes))
                duration = int.from_bytes(file.read(self.eight_bytes))
            else:
                creation_time = int.from_bytes(file.read(self.four_bytes))
                modification_time = int.from_bytes(file.read(self.four_bytes))
                timescale = int.from_bytes(file.read(self.four_bytes))
                duration = int.from_bytes(file.read(self.four_bytes))
            rate_all = int.from_bytes(file.read(self.four_bytes))
            rate_f = rate_all & 0xffff
            rate_i = rate_all >> 16
    
            volume_all = int.from_bytes(file.read(self.two_bytes))
            volume_f = volume_all & 0xff
            volume_i = volume_all >> 8
    
            reserved = file.read(self.two_bytes)
    
            reserved_1 = file.read(self.four_bytes)
            reserved_2 = file.read(self.four_bytes)
            matrix = []
            for i in range(9):
                matrix.append(int.from_bytes(file.read(self.four_bytes)))
    
            pre_define = []
            for i in range(6):
                pre_define.append(int.from_bytes(file.read(self.four_bytes)))
    
            next_track_ID = int.from_bytes(file.read(self.four_bytes))
    
            print("creation_time:{0}  ".format(creation_time))
            print("modification_time:{0}  ".format(modification_time))
            print("timescale:{0}  ".format(timescale))
            print("duration:{0}  ".format(duration))
            print("rate:{0}.{1}  ".format(rate_i, rate_f))
            print("volume:{0}.{1}  ".format(volume_i, volume_f))
            print("reserved:{0}  ".format(reserved))
            print("reserved[] :{0} , {1} ".format(reserved_1, reserved_2))
            print("matrix:{0}  ".format(matrix))
            print("pre_define:{0}  ".format(pre_define))
            print("next_track_ID:{0}  ".format(next_track_ID))
            return None,None
    
    
    class TrakBox(Box):
        def printSelf(self, file):
            super().printSelf(file)
            #获取内部的下一个box
            type,size = Box().findBoxHeader(file)
            index = 0
            while type == 'tkhd' :
                tkhd_box = TkhdBox(type,size)
                type, size = tkhd_box.printSelf(file)
                print("\n{0} box remain size======> {1}".format(tkhd_box.box_type,(self.box_size-tkhd_box.box_size)))
                # box剩余的还没读取数据一次性计提
                tkhd_remain_size = file.read(self.box_size-tkhd_box.box_size)
    
                if type is None and size is None:
                    type, size = Box().findBoxHeader(file) #读取下一个box的头部信息
    
            return type,size #下一个box的头部信息
    
    
    
    class TkhdBox(Box):
    
        def __init__(self, box_type=None, size=None):
            self.isFullBox = True
            super().__init__(box_type, size)
    
        def printSelf(self, file):
            super().printSelf(file)
            if self.box_version == 1:
                creation_time = int.from_bytes(file.read(self.eight_bytes))
                modification_time = int.from_bytes(file.read(self.eight_bytes))
                track_ID = int.from_bytes(file.read(self.four_bytes))
                reserved_32 = int.from_bytes(file.read(self.four_bytes))
                duration = int.from_bytes(file.read(self.eight_bytes))
            else:
                creation_time = int.from_bytes(file.read(self.four_bytes))
                modification_time = int.from_bytes(file.read(self.four_bytes))
                track_ID = int.from_bytes(file.read(self.four_bytes))
                reserved_32 = int.from_bytes(file.read(self.four_bytes))
                duration = int.from_bytes(file.read(self.four_bytes))
    
            reserved_1 = int.from_bytes(file.read(self.four_bytes))
            reserved_2 = int.from_bytes(file.read(self.four_bytes))
    
    
            layer  = int.from_bytes(file.read(self.two_bytes))
            alternate_group = int.from_bytes(file.read(self.two_bytes))
            volume_all = int.from_bytes(file.read(self.two_bytes))
            volume_f = volume_all & 0xff
            volume_i = volume_all >> 8
            reserved_16 = int.from_bytes(file.read(self.two_bytes))
    
            matrix = []
            for i in range(9):
                matrix.append(int.from_bytes(file.read(self.four_bytes)))
    
            width = int.from_bytes(file.read(self.four_bytes)) >> 16
            height = int.from_bytes(file.read(self.four_bytes)) >> 16
    
            print("creation_time:{0}  ".format(creation_time))
            print("modification_time:{0}  ".format(modification_time))
            print("track_ID:{0}  ".format(track_ID))
            print("reserved_32:{0}  ".format(reserved_32))
            print("duration:{0}  ".format(duration))
            print("duration:{0}  ".format(duration))
            print("reserved[]:{0} {1}  ".format(reserved_1,reserved_2))
            print("layer:{0}  ".format(layer))
            print("alternate_group:{0}  ".format(alternate_group))
            print("volume:{0}.{1}  ".format(volume_i,volume_f))
            print("reserved_16:{0}  ".format(reserved_16))
            print("matrix:{0}  ".format(matrix))
            print("width:{0}  ".format(width))
            print("height:{0}  ".format(height))
    
            return None, None
    
    

    主要实现了MP4文件的ftyp/moov/mvhd/trak/tkhd这几个box的解析,读取其中存储的信息。

    核心逻辑就是读取box的头8个字节来判断它的类型和大小,然后根据对应的类型进行解析即可。

    入口程序如下

    
    def print_MP4(file_name):
    
        with open(file_name, 'rb') as file:
            box_type, box_size = Box().findBoxHeader(file)
            while True:
    
                #print("current type {0}".format(box_type))
                if box_type == 'ftyp':
                    box_type, box_size = FtypBox(box_type,box_size).printSelf(file)
    
                elif box_type == 'moov':
                    box_type, box_size = MoovBox(box_type,box_size).printSelf(file)
    
                else:
                    break
    
                if box_type is None and box_size is None:
                    box_type, box_size = Box().findBoxHeader(file)
    
            print("\n=======================read end ==============================")
    
    
    if __name__ == '__main__':
        print_MP4('sample.mp4')
    
    

    当然,假如只想读取特定内容则大可不必把box完整读取出来,只需要读取特定位置的内容即可。

    读取AAC格式数据

    aac不仅仅常用于MP4文件中的音频数据存储,它可以作为单独的音频文件被大家消费。

    如果不方便找到一个AAC文件的话,可以从MP4文件中提取出一个AAC文件(使用ffmpeg):

    ffmpeg -i test.mp4 -acodec aac -vn output.aac
    

    此时我们已经获得了一个sample.aac文件(从sample.mp4中提取的),那么接下来如何读取它的数据从而获得有效的信息呢?

    class ADTSHeader(object):
        one_byte = 1
        two_byte = 2
        three_byte = 3
    
        def getProfile(self,profile):
            profile_real = 'reserved'
            if profile == 1:
                profile_real = 'Low Complexity profile (LC) '
            elif profile == 2:
                profile_real = 'Scalable Sampling Rate profile (SSR)'
            elif profile == 0:
                profile_real = 'Main Profile'
    
            return profile_real
    
    
        def getLayer(self,layer):
            layer_real = 'reserved'
            if layer == 1:
                layer_real = 'Layer III'
            elif layer == 2:
                layer_real = 'Layer II'
            elif layer == 3:
                layer_real = 'Layer I'
    
            return layer_real
    
        def getChannelConfiguration(self,chanel):
            channel_configure = str(chanel)
            if chanel == 6:
                channel_configure = '5+1'
            elif chanel == 7:
                channel_configure = '7+1'
    
            return channel_configure
    
    
    
        def getSampling(self,sampling_frequency):
            sampling_frequency_value = '0'
            if sampling_frequency == 0:
                sampling_frequency_value = '96khz'
            elif sampling_frequency == 1:
                sampling_frequency_value = '88.2khz'
            elif sampling_frequency == 2:
                sampling_frequency_value = '64khz'
            elif sampling_frequency == 3:
                sampling_frequency_value = '48khz'
            elif sampling_frequency == 4:
                sampling_frequency_value = '44.1khz'
            elif sampling_frequency == 5:
                sampling_frequency_value = '32khz'
            elif sampling_frequency == 6:
                sampling_frequency_value = '24khz'
            elif sampling_frequency == 7:
                sampling_frequency_value = '22khz'
            elif sampling_frequency == 8:
                sampling_frequency_value = '16khz'
            elif sampling_frequency == 9:
                sampling_frequency_value = '12khz'
            elif sampling_frequency == 10:
                sampling_frequency_value = '11.025khz'
            elif sampling_frequency == 10:
                sampling_frequency_value = '0.8khz'
            else:
                sampling_frequency_value = 'reserved'
    
            return sampling_frequency_value
    
        def printSelf(self,file):
            result = int.from_bytes(file.read(self.two_byte))
            syncword = result >> 4
            id = (result & 0x0008) >> 3
            layer = (result & 0x0006) >> 1
            protection_absent = (result & 0x0001)
    
            result = int.from_bytes(file.read(self.two_byte))
            profile = result >> 14
            sampling_frequency_index = (result & 0x3c00) >> 10
            private_bit = (result & 0x0200) >> 9
            channel_configuration = (result & 0x01c0) >> 6
            original_copy = (result & 0x0020) >> 5
            home = (result & 0x0010) >> 4
    
            # 以下是可变头部的数据读取
            copyright_identification_bit = (result & 0x0008) >> 3
            copyright_identification_start = (result & 0x0004) >> 2
            remain_2 = (result & 0x3) # 剩余2bit
    
            result = int.from_bytes(file.read(self.three_byte)) #读取剩余3byte
    
            aac_frame_length = (result >> 13) | (remain_2 << 11)
    
            adts_buffer_fullness = (result & 0x1ffc) >> 2
            number_of_raw_data_blocks_in_frame = (result & 0x3)
    
            print("================================= adts_fixed_header ==========================")
            print("syncword: {0}".format(hex(syncword)))
            print("id: {0}".format(id))
            print("layer: {0} : {1}".format(layer,self.getLayer(layer)))
            print("protection_absent: {0}".format(protection_absent))
            print("profile:  {0} ".format(self.getProfile(profile)))
            print("sampling_frequency_index:  {0} ".format(self.getSampling(sampling_frequency_index)))
            print("private_bit: {0}".format(private_bit))
            print("channel_configuration: {0} ".format(self.getChannelConfiguration(channel_configuration)))
            print("original_copy: {0}".format(original_copy))
            print("home: {0}".format(home))
    
    
            print("================================= adts_variable_header ==========================")
            print("copyright_identification_bit: {0}".format(copyright_identification_bit))
            print("copyright_identification_start: {0}".format(copyright_identification_start))
            print("aac_frame_length: {0}".format(aac_frame_length))
            print("adts_buffer_fullness: {0}".format(hex(adts_buffer_fullness)))
            print("number_of_raw_data_blocks_in_frame: {0}".format(number_of_raw_data_blocks_in_frame))
    

    入口程序如下:

    
    def print_AAC(file_name):
        with open(file_name, 'rb') as file:
            ADTSHeader().printSelf(file)
            print("\n=======================read end ==============================")
    
    
    if __name__ == '__main__':
        print_AAC('sample.aac')
    

    程序执行之后打印的内容如下:

    ================================= adts_fixed_header ==========================
    syncword: 0xfff
    id: 0
    layer: 0 : reserved
    protection_absent: 1
    profile:  Low Complexity profile (LC)  
    sampling_frequency_index:  44.1khz  // 44.1khz
    private_bit: 0
    channel_configuration: 2   // 两个声道
    original_copy: 0
    home: 0
    ================================= adts_variable_header ==========================
    copyright_identification_bit: 0
    copyright_identification_start: 0
    aac_frame_length: 378
    adts_buffer_fullness: 0x7ff
    number_of_raw_data_blocks_in_frame: 0
    
    =======================read end ==============================
    

    然后我们利用ffmpeg打印sample.aac文件的基本信息,对照以下看是否一致

    > ffprobe -show_streams sample.aac
    [STREAM]
    index=0
    codec_name=aac
    codec_long_name=AAC (Advanced Audio Coding)
    profile=LC
    codec_type=audio
    codec_tag_string=[0][0][0][0]
    codec_tag=0x0000
    sample_fmt=fltp
    sample_rate=44100  
    channels=2
    channel_layout=stereo
    ...
    ...
    [/STREAM]
    
    

    采样率,声道数,profile这些都是一致的。

    读取H.264码流

    首先当然是从MP4文件中提取H.264码流数据:

    ffmpeg -i sample.mp4 -codec copy -bsf: h264_mp4toannexb -f h264 sample.264
    

    然后我们就可以按照H.264数据编码格式来读取一些信息了。H.264格式解析见ffmpeg开发——初探H.264

    具体的解析逻辑如下:

    class NALU(object):
        forbidden_zero_bit = -1
        nal_ref_idc = -1
        nal_unit_type = -1
        nal_unit_type_str = ''
        start_in_file = -1
        end_in_file = -1
        size = -1
    
        def copy_from(self,nalu_obj):
            self.forbidden_zero_bit = nalu_obj.forbidden_zero_bit
            self.nal_ref_idc = nalu_obj.nal_ref_idc
            self.nal_unit_type = nalu_obj.nal_unit_type
            self.nal_unit_type_str = nalu_obj.nal_unit_type_str
            self.start_in_file = nalu_obj.start_in_file
            self.end_in_file = nalu_obj.end_in_file
            self.size = nalu_obj.size
    
        def parse_data(self,file):
            pass
    
    class NaluDataFinder(object):
        BYTE_ONE = 1
        BYTE_TWO = 2
        BYTE_THREE = 3
        BYTE_FOUR = 4
        BYTE_10M = 10*1024*1024
    
        def isStartCode(self,file):
            data_byte = file.read(self.BYTE_THREE)
            if len(data_byte) < self.BYTE_THREE: # 没读到预期值表明已经读到结尾了
                #print("数据不足 {0},已经读到文件末尾了".format(len(data_byte)))
                return False ,len(data_byte)
            data =  int.from_bytes(data_byte)
            if not data_byte:
                return False,0
            #print(hex(data))
            byte_num = 3
            if data == 0x000001:
                #print("start code 0x000001")
                return True,byte_num
            if data == 0x000000:
                end = int.from_bytes(file.read(self.BYTE_ONE))
                if not end:
                    return False,byte_num
                byte_num = byte_num+1
                data = (data << 8) | end
                if end == 0x01:
                    #print("start code 0x00000001  ===== ")
    
                    return True, byte_num
    
            #print("start code not found !!!")
            return False,byte_num
        def getNALUType(self,nalu_type):
            if nalu_type == 0:
                return "unspecified"
            elif nalu_type == 1:
                return "non-IDR slice layer"
            elif nalu_type == 2 or nalu_type == 3 or nalu_type == 4:
                return "A/B/C slice data"
            elif nalu_type == 5:
                return "IDR slice layer"
            elif nalu_type == 6:
                return "SEI"
            elif nalu_type == 7:
                return "SPS"
            elif nalu_type == 8:
                return "PPS"
            elif nalu_type == 9:
                return "unit-delimiter"
            else:
                return "other-type"
        def printSelf(self,file,start_index):
            isEnd = False
            nalu_size = 1 #当前的nalu的大小
            one_byte_data = file.read(self.BYTE_ONE)
            nalu_obj = NALU()
            # if not one_byte_data:
            #     #print("read file eof +++")
            #     isEnd = True
            #     return isEnd,0
            byte_data = int.from_bytes(one_byte_data)
    
            forbidden_zero_bit = byte_data >> 7
            nal_ref_idc = (byte_data & 0x70) >> 5
            nal_unit_type = (byte_data & 0x1f)
    
            nalu_obj.forbidden_zero_bit = forbidden_zero_bit
            nalu_obj.nal_ref_idc = nal_ref_idc
            nalu_obj.nal_unit_type = nal_unit_type
            nalu_obj.nal_unit_type_str = self.getNALUType(nal_unit_type)
            nalu_obj.start_in_file = start_index
    
            is_start_code, read_byte_num =self.isStartCode(file)
            while not is_start_code and read_byte_num >= 3:
                nalu_size = nalu_size+1;
                seek_num = 0-(read_byte_num-1)
                file.seek(seek_num,1)
                is_start_code, read_byte_num = self.isStartCode(file)
    
            if not is_start_code and read_byte_num < 3:
                nalu_size = nalu_size+read_byte_num
    
            nalu_obj.end_in_file = start_index+nalu_size
            nalu_obj.size = nalu_size
            # print("nalu size: {0}  in file start:{1} end:{2}".format(nalu_size,nalu_obj.start_in_file,nalu_obj.end_in_file))
            next_start = start_index + nalu_size + read_byte_num
            if read_byte_num < 3:
                print("read file eof ===")
                isEnd = True
                return isEnd,next_start,nalu_obj
    
            return isEnd,next_start,nalu_obj
        
        
        
    class H264Reader(object):
        def printSelf(self,file):
            nalu_finder =  NaluDataFinder()
            is_start_code,read_byte_num = nalu_finder.isStartCode(file)
            isEnd = False
            if read_byte_num == 0: # 读取完毕
                #print("read file eof ----")
                isEnd = True
                return isEnd
    
            start_index = read_byte_num
            nalu_array = [] # 存储NALU列表
            end,start_index,nalu_obj = nalu_finder.printSelf(file,start_index)
            nalu_array.append(nalu_obj)
            while not end:
                end,start_index,nalu_obj = nalu_finder.printSelf(file,start_index)
                nalu_array.append(nalu_obj)
    
    

    入口程序也是类似:

    def print_h264(file_name):
        with open(file_name, 'rb') as file:
            H264Reader().printSelf(file)
            print("\n======================= read end ==============================")
    
    if __name__ == '__main__':
        print_h264('sample.264')
    
    

    对于H.264数据解析目前只解析到NALU的层级,定位了h264码流中的每个NALU所在的位置,后面会补充一些具体的结构比如SLice,SPS,PPS等的解析逻辑,会直接更新在github上。

    总结

    其实无论解析MP4封装文件,还是AAC音频,h264码流,当我们了解了他们的内部结构的定义之后,解析的逻辑可以称得上是按部就班,读取每个字节,甚至每个bit的数据,把他们按照定义标准文档解读出来即可。

    相关文章

      网友评论

          本文标题:ffmpeg开发——如何解析MP4,AAC,H.264码流

          本文链接:https://www.haomeiwen.com/subject/thayfjtx.html