m3u8文件是一堆小片段的集合,将所有的小片段都保存后,在合成就等于下载完成了,m3u8有固定的格式,根据格式解析出所有小片段的url链接,有可能存在加密的情况,大部分都是AES加密,这里的办法是将加密的key与ts片段都获取保存下来,本地生成一个m3u8文件。最后使用ffmpeg直接转码为mp4 ,ffmpeg转码时会自行解密。
1、解析m3u8
python不愧是超高级语言,好用的三方库特别多,有一个库就叫 m3u8,地址如下:m3u8 · PyPI
image.png可以使用pip安装:pip install m3u8
下面贴出解析的代码:
以下代码是拿到m3u8解析后的对象实例,根据该对象实例去获取片段的链接等
注意:因考虑到m3u8的内容可能是二级的m3u8链接,因此使用递归解析
# 解析得到m3u8实例
def parseM3u8(self, m3u8Url):
playlist = m3u8.load(m3u8Url)
if playlist.is_variant:
playlists = playlist.playlists
if len(playlists) > 0 :
subPlaylist = playlists[0] # 默认取第一个,有多个的情况,其实可以优化(应该是选择分辨率高的)
#subM3u8Url = subPlaylist.base_uri + subPlaylist.uri
subM3u8Url = subPlaylist.absolute_uri
return self.parseM3u8(subM3u8Url)
else :
return None
else :
return playlist
根据m3u8的实例,获取ts片段以及加密key的链接。同时取得文件的下载路径。方便下载时使用
片段的absolute_uri属性,就是该片段的完整的url链接了。
# 从m3u8中,得到ts和key的链接,以及文件的存储全路径
def getFileUrlsAndPaths(self, playlist):
allUrls = []
allFilePaths = []
tempPath = self.tempPath(self.downloadPath, self.fileName)
for seg in playlist.segments:
if seg == None:
break
allUrls.append(seg.absolute_uri)
na = tempPath + '/' + getUrlLastPath(seg.absolute_uri)
allFilePaths.append(na)
for key in playlist.keys:
if key == None:
break
allUrls.append(key.absolute_uri)
na = tempPath + '/' + getUrlLastPath(key.absolute_uri)
allFilePaths.append(na)
return allUrls, allFilePaths
至此,拿到了片段的url,m3u8解析就完成了
2、生成本地的m3u8文件,用于ffmpeg进行转码
因为我们下载的ts片段文件与m3u8文件保存到同一目录下,因此就需要将m3u8中的原ts片段的url替换为本地的路径。因此这里通过修改uri值,来达到这一目地。
修改完成后,使用m3u8库的dump方法,将内容保存到本地文件中
# 创建本地m3u8文件,用于ffmpeg的ts合并以及转码mp4
def createNativeM3u8File(self, playlist, m3u8Path):
for seg in playlist.segments:
if seg == None:
break
seg.uri = getUrlLastPath(seg.absolute_uri)
for key in playlist.keys:
if key == None:
break
key.uri = getUrlLastPath(key.absolute_uri)
playlist.dump(m3u8Path)
3、下载所有片段以及key等文件
这里使用多线程下载,同时支持5个任务的下载,统计下载数量。当全部下载完成后开始转码
# 下载单个文件:url文件的地址,filepath文件的存储全路径
def download_file(self, url, filePath):
fn = getUrlLastPath(filePath)
if os.path.exists(filePath):
self.downloadedCount += 1
cnt = f"({self.downloadedCount}/{self.totalCount})"
print(f"文件 {fn} 已存在{cnt}")
return
response = requests.get(url)
if response.status_code == 200:
with open(filePath, 'wb') as file:
file.write(response.content)
self.downloadedCount += 1
cnt = f"({self.downloadedCount}/{self.totalCount})"
print(f"文件 {fn} 下载完成{cnt}")
else:
# 若下载失败,则开始重试
allKeys = self.repeatTimesDic.keys()
retryTimes = 0
if url in allKeys :
retryTimes = self.repeatTimesDic[url]
print(f"无法下载文件 {fn}: {response.status_code}")
if retryTimes >= self.maxRepeatTimes :
print(f'文件 {fn} 已达到最大下载次数 {self.maxRepeatTimes} 次')
print(f'文件url:{url}')
self.failedCount += 1
else :
retryTimes = retryTimes + 1
self.repeatTimesDic[url] = retryTimes
print(f'开始第{retryTimes}次重试')
self.download_file(url, filePath)
# 多线程下载
def download_files_concurrently(self, url_list, filename_list):
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# executor.map(lambda x: self.download_file(*x), zip(url_list, filename_list))
# 将 url_list 和 filename_list 中的对应元素作为参数传递给 download_file 方法
tasks = [executor.submit(self.download_file, url, filename) for url, filename in zip(url_list, filename_list)]
# 等待所有任务完成
for future in concurrent.futures.as_completed(tasks):
# Future.result() 会阻塞并等待任务完成,并返回任务结果(这里是 None)
result = future.result()
if result is not None: # 如果结果不为 None,则说明任务完成了
print("Task completed:", result)
# 所有任务都已完成
printTime("All tasks are done.")
self.downloadComplete()
下载效果图
image.png
4、转码为mp4
当ts片段、key(若存在)、m3u8文件都保存完成后。使用命令行调用ffmpeg进行转码。
转码函数如下:只有两个参数,m3u8的本地路径,输出的文件路径
def m3u8ToMp4(m3u8Path, mp4Path):
ffmpeg_command = f"ffmpeg -allowed_extensions ALL -protocol_whitelist \"file,http,crypto,tcp\" -i {m3u8Path} -c copy {mp4Path}"
print('=== ffmpeg开始转码 ===')
print('m3u8路径: '+m3u8Path)
print('输出mp4路径: '+mp4Path)
# 使用subprocess调用FFmpeg并捕获输出
process = subprocess.Popen(ffmpeg_command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = process.communicate()
# 检查命令是转码行成功
if process.returncode == 0:
print("=== ffmpeg转码完成 ====")
print(stdout.decode())
return True
else:
print("=== 转换失败!=== ")
print(stderr.decode())
return False
整体来说都很容易,唯一的麻烦就是多线程的下载,需要处理并发的问题。
5、Demo代码
完整代码请到以下链接:
https://download.csdn.net/download/wangkunggxx/88541574
一共有3个文件,m3u.py、m3u8dl.py、transcode.py
transcode.py 用来转码
m3u8dl.py 核心代码,封装了一个M3u8Downloader的类
m3u.py 供直接调用。
调用示例:
python3 m3u.py 'https://hls.cntv.cdn20.com/asp/hls/1200/0303000a/3/default/1049a60b8b914d4ab7066c568ca616fd/1200.m3u8' '小孩子大梦想'
网友评论