Python提取极客时间-阿里云HLS加密视频解密过程分析

Python提取极客时间-阿里云HLS加密视频解密过程分析

经验文章nimo972025-05-06 14:52:504A+A-


最近在看极客的视频课,想把视频离线到本地看,发现下载下来的ts流打开均无法播放,后面又下载了极客APP可以离线下载视频,但是离线的视频只能在极客APP里播放,这哪天不用APP了,视频也看不了了,所以视频一定是被加密过的,需要进行解密。在网上找了很多方法均没有找到一个完美的解密方法,看了好几天的博客,现在终于有眉目了,好了不说那么多了,下面我们直接进入正题。


一、准备工作

本文只针对极客加密视频课程的分析处理,专栏的略过。

需一定的Chrome开发者工具调试能力和代码能力,需要准备的工具:极客时间里购买的视频课程、Chrome F12、Pycharm、Python,我使用的是Python,当然也可以用Java等语言。

阿里云视频加密方案包含两部分:加密转码 + 解密播放,我们主要分析解密播放部分。


二、获取视频课程列表

首先登录极客时间首页,按F12打开调试工具-Network,选择Fetch/XHR,点击个人头像-我的课程,可以看到product接口,此接口返回了所有的课程信息,需要我们过滤出视频课程类型(type=c3)

示例代码如下:

def _product(self, _type='c3'):
    """ 商品列表(就是课程)的接口)方法 """
    log.info("请求获取课程列表接口:")
    url = "https://time.geekbang.org/serv/v3/learn/product"
    method = "POST"
    headers = deepcopy(self.common_headers)
    headers["Host"] = "time.geekbang.org"
    headers["Origin"] = "https://time.geekbang.org"
    headers["Cookie"] = self.cookie.cookie_string
    params = {
        "desc": 'true',
        "expire": 1,
        "last_learn": 0,
        "learn_status": 0,
        "prev": 0,
        "size": 20,
        "sort": 1,
        "type": "",
        "with_learn_count": 1
    }

    log.info(f"接口请求参数:{params}")
    res = requests.request(method, url, headers=headers, json=params)

    if res.status_code != 200:
        log.info(f"此时 products 的数据为:{self.products}")
        log.error(f"课程列表接口请求出错,返回内容为:{res.content.decode()}")
        raise RequestError(f"课程列表接口请求出错,返回内容为:{res.content.decode()}")
    data = res.json().get('data', {})
    self.cookie.load_set_cookie(res.headers['Set-Cookie'])

    if data:
        self.products += self._parser_products(data, _type)
    else:
        _save_finish_article_id_to_file()
        log.info(f"此时 products 的数据为:{self.products}")
        log.error(f"课程列表接口没有获取到内容,请检查请求。返回结果为:{res.content.decode()}")
        raise NotValueError(f"课程列表接口没有获取到内容,请检查请求。返回结果为:{res.content.decode()}")
    log.info('-' * 40)

def _parser_products(self, data, _type='c3'):
    """
    解析课程列表内容的方法(从中提取部分数据)
    Args:
        data: 课程相关信息,一般为接口返回的数据
        _type: 课程类型,c1 代表专栏,c3 代表视频课,all 代表全部, 默认只获取c3的内容
    Returns:
        解析后的结果,以列表形式
    """
    result = []
    keys = ['title', 'type', 'id']  # 定义要拿取的字段
    products = data.get('products', [])
    lists = data.get('list', [])
    for product in products:
        # 如果课程标题在需要排除的列表中,则跳过该课程
        if product.get('title', '') in self.exclude:
            continue

        new_product = {key: value for key, value in product.items() if key in keys}
        new_product['articles'] = []  # 定义章节列表(用来存储文章信息)
        new_product['article_ids'] = []  # 定义章节 ID 列表(用来存储文章 ID 信息) )
        for pro in lists:
            if new_product['id'] == pro['pid']:
                new_product['aid'] = pro['aid']
        if _type.lower() == 'all' or new_product['type'] == _type:
            result.append(new_product)
    return result


三、遍历获取视频课程章节信息

点击视频课程,此时调用了articles接口,此为获取该视频课章节列表接口,cid为对应课程id

def _articles(self, cid, pro):
    """ 获取视频课程章节列表接口方法 """
    global ALL_ARTICLES
    log.info("请求获取视频课程章节列表接口:")
    url = "https://time.geekbang.org/serv/v1/column/articles"
    method = "POST"
    headers = deepcopy(self.common_headers)
    headers["Host"] = "time.geekbang.org"
    headers["Origin"] = "https://time.geekbang.org"
    headers["Cookie"] = self.cookie.cookie_string
    params = {
        "cid": cid,
        "size": 500,
        "prev": 0,
        "order": "earliest",
        "sample": "false"
    }

    log.info(f"接口请求参数:{params}")
    res = requests.request(method, url, headers=headers, json=params)

    if res.status_code != 200:
        _save_finish_article_id_to_file()
        log.info(f"此时 products 的数据为:{self.products}")
        log.error(f"获取章节列表接口请求出错,返回内容为:{res.json()}")
        raise RequestError(f"获取章节列表接口请求出错,返回内容为:{res.json()}")
    data = res.json().get('data', {})
    self.cookie.load_set_cookie(res.headers['Set-Cookie'])

    if data:
        ids = []
        article_list = data.get('list', [])
        for article in article_list:
            ids.append(article['id'])
        ALL_ARTICLES += ids
        pro['article_ids'] += ids
    else:
        _save_finish_article_id_to_file()
        log.info(f"此时 products 的数据为:{self.products}")
        log.error(f"获取章节列表接口没有获取到内容,请检查请求。返回结果为:{res.json()}")
        raise NotValueError(f"获取章节列表接口没有获取到内容,请检查请求。返回结果为:{res.json()}")
    log.info('-' * 40)


点击任意章节进行播放时,我们发现调用了article接口,此为章节信息接口,aid是课程章节id。在此接口中,我们需要拿到m3u8下载地址,章节title及章节id。然后开始处理m3u8文件。

def _article(self, aid, pro, file_type=None, get_comments=False):
    """ 通过课程 ID 获取视频课程章节信息接口方法 """
    global FINISH_ARTICLES
    log.info("请求获取视频课程章节信息接口:")
    url = "https://time.geekbang.org/serv/v1/article"
    method = "POST"
    headers = deepcopy(self.common_headers)
    headers["Host"] = "time.geekbang.org"
    headers["Origin"] = "https://time.geekbang.org"
    headers["Cookie"] = self.cookie.cookie_string
    params = {
        "id": aid,
        "include_neighbors": "true",
        "is_freelyread": "true"
    }

    log.info(f"接口请求参数:{params}")
    res = requests.request(method, url, headers=headers, json=params)

    if res.status_code != 200:
        _save_finish_article_id_to_file()
        log.info(f"此时 products 的数据为:{self.products}")
        log.error(f"获取章节信息接口请求出错,返回内容为:{res.content.decode()}")
        raise RequestError(f"获取章节信息接口请求出错,返回内容为:{res.content.decode()}")
    data = res.json().get('data', {})
    self.cookie.load_set_cookie(res.headers['Set-Cookie'])

    if data:
        # comments = self._comments(aid) if get_comments else None
        keys = ['hls_videos', 'article_title', 'id']  # 定义要拿取的字段
        article = {key: value for key, value in data.items() if key in keys}
        m3u8_url = article['hls_videos']['ld']['url']

        log.info("【开始下载】课程章节:%s" % article['article_title'])
        parse_m3u8_url(m3u8_url, article['article_title'])
        log.info("【下载完成】课程章节:%s" % article['article_title'])

        FINISH_ARTICLES.append(article['id'])  # 将该章节 ID 加入到遍历完成的列表中
        pro['cid'] = data['cid']
    else:
        _save_finish_article_id_to_file()
        log.info(f"此时 products 的数据为:{self.products}")
        log.error(f"获取章节信息接口没有获取到内容,请检查请求。返回结果为:{res.content.decode()}")
        raise NotValueError(f"获取j 信息接口没有获取到内容,请检查请求。返回结果为:{res.content.decode()}")
    log.info('-' * 40)


首先需要把m3u8文件内容提取出来,可以看到m3u8内容中,视频流是有加密的,ts文件没有解密是无法播放的


四、遍历视频课程章节m3u8文件

我们发现这个m3u8里只有一个解密地址,应该是获取解密key的,即所有ts文件都用同一个key进行解密,但是没有iv,后面发现iv就是16位16进制的0即可。

注意:

  • 每个m3u8里的解密地址是不一样的,所以每遍历一个新的章节时,需要重新获取key进行解密ts
  • 后面分析发现这个解密地址只能访问一次,再次访问就会失效,代码中需要控制下次数

提取出解密地址以及所有ts流地址:

def parse_m3u8(self, m3u8_content, url_path):
    self.m3u8 = m3u8_content
    self.ts_url_list = ['{}/{}'.format(url_path, ts_name) for ts_name in re.findall(r'.*\.ts', self.m3u8)]
    key_url_list = re.findall(r'EXT-X-KEY:METHOD=AES-128,URI="http.*"', self.m3u8)  # 提取ts解密地址
    iv_list = re.findall(r'IV=0x.{32}', self.m3u8)

    self.key_url_dealt = []
    for key in key_url_list:
        key = key[30:-1]
        self.key_url_dealt.append(key)

    self.iv_dealt = []
    for iv in iv_list:
        iv = iv[5:]
        self.iv_dealt.append(iv)


五、获取课程章节m3u8对应解密key

获取解密key方法,返回bytes类型:

def parse_key(self, key_url):
    try:
        req = requests.get(key_url, self.headers)
        req.raise_for_status()
        req.encoding = req.apparent_encoding
        key = req.content
        return key
    except:
        traceback.print_exc()


六、下载ts流解密并保存至本地

遍历ts地址列表并解密ts流,保存到章节临时列表:

def decoding(self):
    key = ""
    for i in range(0, len(self.ts_url_list)):
        ts_url = self.ts_url_list[i]
        print("No", i, "file\t", ts_url)
        ts = self.save_ts_url(ts_url)

        key_name = ts_url.split("/")[-1].split(".ts")[0] + ".key"
        iv_name = ts_url.split("/")[-1].split(".ts")[0] + ".iv"
        ts_name = ts_url.split("/")[-1].split(".ts")[0] + "_convert.ts"

        # 只可获取一次key,第二次及之后无效
        if i <= 0:
            # key = self.get(self.key_url_dealt[0])
            key = self.parse_key(self.key_url_dealt[0])

        iv = b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
        print("key_url:", self.key_url_dealt[0])
        print("key:", key)
        print("iv:", iv)

        self.save_content(key_name, key, self.path)
        self.save_content(iv_name, iv, self.path)

        pc = PrpCrypt(key, iv)
        result = pc.decrypt(ts)
        with open(self.ts_path + "\\" + ts_name, 'wb') as f:
            f.write(result)
        self.ts_list.append(result) # 章节解密的ts流列表


七、合并课程章节已解密的ts流

最后把解密的ts流列表写入到本地文件,即可正常播放了:

def merge_ts(self, title):
    print("【开始合并】课程章节 ==> {}.ts".format(title))
    out_file = open(self.result_path + os.path.sep + self.check_filename("{}.ts".format(title)), "wb")

    for i in range(0, len(self.ts_list)):
        in_file = self.ts_list[i]
        out_file.write(in_file)
    out_file.close()
    print("【合并完成】课程章节 ==> {}.ts".format(title))

def check_filename(self, file_name):
    """
    校验文件名称的方法,在 windows 中文件名不能包含('\','/','*','?','<','>','|') 字符
    Args:
        file_name: 文件名称
    Returns:
        修复后的文件名称
    """
    return file_name.replace('\\', '') \
        .replace('/', '') \
        .replace('*', 'x') \
        .replace('?', '') \
        .replace('<', '《') \
        .replace('>', '》') \
        .replace('|', '_') \
        .replace('\n', '') \
        .replace('\b', '') \
        .replace('\f', '') \
        .replace('\t', '') \
        .replace('\r', '')


总体思路,先遍历过滤出的视频课程列表,再遍历课程章节信息,再遍历m3u8解密保存:

for pro in geek.products:
    geek._articles(pro['id'], pro)  # 获取章节列表

    article_ids = pro['article_ids']
    for aid in article_ids:
        if set(ALL_ARTICLES) == set(FINISH_ARTICLES):
            import sys
            log.info("正常抓取完成啦,不用再继续跑脚本了。")
            sys.exit(1)

        if str(aid) in FINISH_ARTICLES:
            continue
        geek._article(aid, pro, file_type=file_type, get_comments=get_comments)  # 获取单个章节的信息


以上整个过程中,涉及到的几个接口都是验证登录的,所以需要写个登录接口,事先获取cookie使用。

登录接口不可频繁调用,否则会验证码拦截,严重也可能封IP:

def _login(self):
    """ 登录接口方法 """
    log.info("请求登录接口:")
    url = "https://account.geekbang.org/account/ticket/login"
    method = "POST"
    headers = deepcopy(self.common_headers)
    headers["Host"] = "account.geekbang.org"
    headers["Origin"] = "https://account.geekbang.org"
    headers["Cookie"] = self.cookie.cookie_string
    params = {
        "country": 86,
        "cellphone": self.cellphone,
        "password": self.password,
        "captcha": "",
        "remember": 1,
        "platform": 3,
        "appid": 1,
        "source": ""
    }

    log.info(f"接口请求参数:{params}")
    res = requests.request(method, url, headers=headers, json=params)

    if (res.status_code != 200) or (str(res.json().get('code', '')) == '-1'):
        _save_finish_article_id_to_file()
        log.info(f"此时 products 的数据为:{self.products}")
        log.error(f"登录接口请求出错,返回内容为:{res.content.decode()}")
        raise RequestError(f"登录接口请求出错,返回内容为:{res.content.decode()}")
    self.cookie.load_set_cookie(res.headers['Set-Cookie'])
    log.info('-'*40)


忙了一晚上,终于大功告成,成果图:




注意:

  1. 此文仅为学习交流分享,请勿用于非法及商业用途,否则后果自负,与本人无关。
  2. 对Python或爬虫有兴趣的同学,可分享转发交流。
点击这里复制本文地址 以上内容由nimo97整理呈现,请务必在转载分享时注明本文地址!如对内容有疑问,请联系我们,谢谢!
qrcode

尼墨宝库 © All Rights Reserved.  蜀ICP备2024111239号-7