Python提取极客时间-阿里云HLS加密视频解密过程分析
最近在看极客的视频课,想把视频离线到本地看,发现下载下来的ts流打开均无法播放,后面又下载了极客APP可以离线下载视频,但是离线的视频只能在极客APP里播放,这哪天不用APP了,视频也看不了了,所以视频一定是被加密过的,需要进行解密。在网上找了很多方法均没有找到一个完美的解密方法,看了好几天的博客,现在终于有眉目了,好了不说那么多了,下面我们直接进入正题。
一、准备工作
本文只针对极客加密视频课程的分析处理,专栏的略过。
需一定的Chrome开发者工具调试能力和代码能力,需要准备的工具:极客时间里购买的视频课程、Chrome F12、Pycharm、Python,我使用的是Python,当然也可以用Java等语言。
阿里云视频加密方案包含两部分:加密转码 + 解密播放,我们主要分析解密播放部分。
二、获取视频课程列表
首先登录极客时间首页,按F12打开调试工具-Network,选择Fetch/XHR,点击个人头像-我的课程,可以看到product接口,此接口返回了所有的课程信息,需要我们过滤出视频课程类型(type=c3)
示例代码如下:
def _product(self, _type='c3'):
""" 商品列表(就是课程)的接口)方法 """
log.info("请求获取课程列表接口:")
url = "https://time.geekbang.org/serv/v3/learn/product"
method = "POST"
headers = deepcopy(self.common_headers)
headers["Host"] = "time.geekbang.org"
headers["Origin"] = "https://time.geekbang.org"
headers["Cookie"] = self.cookie.cookie_string
params = {
"desc": 'true',
"expire": 1,
"last_learn": 0,
"learn_status": 0,
"prev": 0,
"size": 20,
"sort": 1,
"type": "",
"with_learn_count": 1
}
log.info(f"接口请求参数:{params}")
res = requests.request(method, url, headers=headers, json=params)
if res.status_code != 200:
log.info(f"此时 products 的数据为:{self.products}")
log.error(f"课程列表接口请求出错,返回内容为:{res.content.decode()}")
raise RequestError(f"课程列表接口请求出错,返回内容为:{res.content.decode()}")
data = res.json().get('data', {})
self.cookie.load_set_cookie(res.headers['Set-Cookie'])
if data:
self.products += self._parser_products(data, _type)
else:
_save_finish_article_id_to_file()
log.info(f"此时 products 的数据为:{self.products}")
log.error(f"课程列表接口没有获取到内容,请检查请求。返回结果为:{res.content.decode()}")
raise NotValueError(f"课程列表接口没有获取到内容,请检查请求。返回结果为:{res.content.decode()}")
log.info('-' * 40)
def _parser_products(self, data, _type='c3'):
"""
解析课程列表内容的方法(从中提取部分数据)
Args:
data: 课程相关信息,一般为接口返回的数据
_type: 课程类型,c1 代表专栏,c3 代表视频课,all 代表全部, 默认只获取c3的内容
Returns:
解析后的结果,以列表形式
"""
result = []
keys = ['title', 'type', 'id'] # 定义要拿取的字段
products = data.get('products', [])
lists = data.get('list', [])
for product in products:
# 如果课程标题在需要排除的列表中,则跳过该课程
if product.get('title', '') in self.exclude:
continue
new_product = {key: value for key, value in product.items() if key in keys}
new_product['articles'] = [] # 定义章节列表(用来存储文章信息)
new_product['article_ids'] = [] # 定义章节 ID 列表(用来存储文章 ID 信息) )
for pro in lists:
if new_product['id'] == pro['pid']:
new_product['aid'] = pro['aid']
if _type.lower() == 'all' or new_product['type'] == _type:
result.append(new_product)
return result
三、遍历获取视频课程章节信息
点击视频课程,此时调用了articles接口,此为获取该视频课章节列表接口,cid为对应课程id
def _articles(self, cid, pro):
""" 获取视频课程章节列表接口方法 """
global ALL_ARTICLES
log.info("请求获取视频课程章节列表接口:")
url = "https://time.geekbang.org/serv/v1/column/articles"
method = "POST"
headers = deepcopy(self.common_headers)
headers["Host"] = "time.geekbang.org"
headers["Origin"] = "https://time.geekbang.org"
headers["Cookie"] = self.cookie.cookie_string
params = {
"cid": cid,
"size": 500,
"prev": 0,
"order": "earliest",
"sample": "false"
}
log.info(f"接口请求参数:{params}")
res = requests.request(method, url, headers=headers, json=params)
if res.status_code != 200:
_save_finish_article_id_to_file()
log.info(f"此时 products 的数据为:{self.products}")
log.error(f"获取章节列表接口请求出错,返回内容为:{res.json()}")
raise RequestError(f"获取章节列表接口请求出错,返回内容为:{res.json()}")
data = res.json().get('data', {})
self.cookie.load_set_cookie(res.headers['Set-Cookie'])
if data:
ids = []
article_list = data.get('list', [])
for article in article_list:
ids.append(article['id'])
ALL_ARTICLES += ids
pro['article_ids'] += ids
else:
_save_finish_article_id_to_file()
log.info(f"此时 products 的数据为:{self.products}")
log.error(f"获取章节列表接口没有获取到内容,请检查请求。返回结果为:{res.json()}")
raise NotValueError(f"获取章节列表接口没有获取到内容,请检查请求。返回结果为:{res.json()}")
log.info('-' * 40)
点击任意章节进行播放时,我们发现调用了article接口,此为章节信息接口,aid是课程章节id。在此接口中,我们需要拿到m3u8下载地址,章节title及章节id。然后开始处理m3u8文件。
def _article(self, aid, pro, file_type=None, get_comments=False):
""" 通过课程 ID 获取视频课程章节信息接口方法 """
global FINISH_ARTICLES
log.info("请求获取视频课程章节信息接口:")
url = "https://time.geekbang.org/serv/v1/article"
method = "POST"
headers = deepcopy(self.common_headers)
headers["Host"] = "time.geekbang.org"
headers["Origin"] = "https://time.geekbang.org"
headers["Cookie"] = self.cookie.cookie_string
params = {
"id": aid,
"include_neighbors": "true",
"is_freelyread": "true"
}
log.info(f"接口请求参数:{params}")
res = requests.request(method, url, headers=headers, json=params)
if res.status_code != 200:
_save_finish_article_id_to_file()
log.info(f"此时 products 的数据为:{self.products}")
log.error(f"获取章节信息接口请求出错,返回内容为:{res.content.decode()}")
raise RequestError(f"获取章节信息接口请求出错,返回内容为:{res.content.decode()}")
data = res.json().get('data', {})
self.cookie.load_set_cookie(res.headers['Set-Cookie'])
if data:
# comments = self._comments(aid) if get_comments else None
keys = ['hls_videos', 'article_title', 'id'] # 定义要拿取的字段
article = {key: value for key, value in data.items() if key in keys}
m3u8_url = article['hls_videos']['ld']['url']
log.info("【开始下载】课程章节:%s" % article['article_title'])
parse_m3u8_url(m3u8_url, article['article_title'])
log.info("【下载完成】课程章节:%s" % article['article_title'])
FINISH_ARTICLES.append(article['id']) # 将该章节 ID 加入到遍历完成的列表中
pro['cid'] = data['cid']
else:
_save_finish_article_id_to_file()
log.info(f"此时 products 的数据为:{self.products}")
log.error(f"获取章节信息接口没有获取到内容,请检查请求。返回结果为:{res.content.decode()}")
raise NotValueError(f"获取j 信息接口没有获取到内容,请检查请求。返回结果为:{res.content.decode()}")
log.info('-' * 40)
首先需要把m3u8文件内容提取出来,可以看到m3u8内容中,视频流是有加密的,ts文件没有解密是无法播放的
四、遍历视频课程章节m3u8文件
我们发现这个m3u8里只有一个解密地址,应该是获取解密key的,即所有ts文件都用同一个key进行解密,但是没有iv,后面发现iv就是16位16进制的0即可。
注意:
- 每个m3u8里的解密地址是不一样的,所以每遍历一个新的章节时,需要重新获取key进行解密ts
- 后面分析发现这个解密地址只能访问一次,再次访问就会失效,代码中需要控制下次数
提取出解密地址以及所有ts流地址:
def parse_m3u8(self, m3u8_content, url_path):
self.m3u8 = m3u8_content
self.ts_url_list = ['{}/{}'.format(url_path, ts_name) for ts_name in re.findall(r'.*\.ts', self.m3u8)]
key_url_list = re.findall(r'EXT-X-KEY:METHOD=AES-128,URI="http.*"', self.m3u8) # 提取ts解密地址
iv_list = re.findall(r'IV=0x.{32}', self.m3u8)
self.key_url_dealt = []
for key in key_url_list:
key = key[30:-1]
self.key_url_dealt.append(key)
self.iv_dealt = []
for iv in iv_list:
iv = iv[5:]
self.iv_dealt.append(iv)
五、获取课程章节m3u8对应解密key
获取解密key方法,返回bytes类型:
def parse_key(self, key_url):
try:
req = requests.get(key_url, self.headers)
req.raise_for_status()
req.encoding = req.apparent_encoding
key = req.content
return key
except:
traceback.print_exc()
六、下载ts流解密并保存至本地
遍历ts地址列表并解密ts流,保存到章节临时列表:
def decoding(self):
key = ""
for i in range(0, len(self.ts_url_list)):
ts_url = self.ts_url_list[i]
print("No", i, "file\t", ts_url)
ts = self.save_ts_url(ts_url)
key_name = ts_url.split("/")[-1].split(".ts")[0] + ".key"
iv_name = ts_url.split("/")[-1].split(".ts")[0] + ".iv"
ts_name = ts_url.split("/")[-1].split(".ts")[0] + "_convert.ts"
# 只可获取一次key,第二次及之后无效
if i <= 0:
# key = self.get(self.key_url_dealt[0])
key = self.parse_key(self.key_url_dealt[0])
iv = b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
print("key_url:", self.key_url_dealt[0])
print("key:", key)
print("iv:", iv)
self.save_content(key_name, key, self.path)
self.save_content(iv_name, iv, self.path)
pc = PrpCrypt(key, iv)
result = pc.decrypt(ts)
with open(self.ts_path + "\\" + ts_name, 'wb') as f:
f.write(result)
self.ts_list.append(result) # 章节解密的ts流列表
七、合并课程章节已解密的ts流
最后把解密的ts流列表写入到本地文件,即可正常播放了:
def merge_ts(self, title):
print("【开始合并】课程章节 ==> {}.ts".format(title))
out_file = open(self.result_path + os.path.sep + self.check_filename("{}.ts".format(title)), "wb")
for i in range(0, len(self.ts_list)):
in_file = self.ts_list[i]
out_file.write(in_file)
out_file.close()
print("【合并完成】课程章节 ==> {}.ts".format(title))
def check_filename(self, file_name):
"""
校验文件名称的方法,在 windows 中文件名不能包含('\','/','*','?','<','>','|') 字符
Args:
file_name: 文件名称
Returns:
修复后的文件名称
"""
return file_name.replace('\\', '') \
.replace('/', '') \
.replace('*', 'x') \
.replace('?', '') \
.replace('<', '《') \
.replace('>', '》') \
.replace('|', '_') \
.replace('\n', '') \
.replace('\b', '') \
.replace('\f', '') \
.replace('\t', '') \
.replace('\r', '')
总体思路,先遍历过滤出的视频课程列表,再遍历课程章节信息,再遍历m3u8解密保存:
for pro in geek.products:
geek._articles(pro['id'], pro) # 获取章节列表
article_ids = pro['article_ids']
for aid in article_ids:
if set(ALL_ARTICLES) == set(FINISH_ARTICLES):
import sys
log.info("正常抓取完成啦,不用再继续跑脚本了。")
sys.exit(1)
if str(aid) in FINISH_ARTICLES:
continue
geek._article(aid, pro, file_type=file_type, get_comments=get_comments) # 获取单个章节的信息
以上整个过程中,涉及到的几个接口都是验证登录的,所以需要写个登录接口,事先获取cookie使用。
登录接口不可频繁调用,否则会验证码拦截,严重也可能封IP:
def _login(self):
""" 登录接口方法 """
log.info("请求登录接口:")
url = "https://account.geekbang.org/account/ticket/login"
method = "POST"
headers = deepcopy(self.common_headers)
headers["Host"] = "account.geekbang.org"
headers["Origin"] = "https://account.geekbang.org"
headers["Cookie"] = self.cookie.cookie_string
params = {
"country": 86,
"cellphone": self.cellphone,
"password": self.password,
"captcha": "",
"remember": 1,
"platform": 3,
"appid": 1,
"source": ""
}
log.info(f"接口请求参数:{params}")
res = requests.request(method, url, headers=headers, json=params)
if (res.status_code != 200) or (str(res.json().get('code', '')) == '-1'):
_save_finish_article_id_to_file()
log.info(f"此时 products 的数据为:{self.products}")
log.error(f"登录接口请求出错,返回内容为:{res.content.decode()}")
raise RequestError(f"登录接口请求出错,返回内容为:{res.content.decode()}")
self.cookie.load_set_cookie(res.headers['Set-Cookie'])
log.info('-'*40)
忙了一晚上,终于大功告成,成果图:
注意:
- 此文仅为学习交流分享,请勿用于非法及商业用途,否则后果自负,与本人无关。
- 对Python或爬虫有兴趣的同学,可分享转发交流。