示例:获取剧情图所有节点

用途:下载所有节点视频信息、获取剧情图结构。

需要一定水平才能看懂。

  1. from bilibili_api import interactive_video, sync
  2. import json
  3. BVID = 'BV1Dt411N7LY'
  4. async def main():
  5. # 获取剧情图版本号
  6. graph_version = await interactive_video.get_graph_version(BVID)
  7. # 存储顶点信息
  8. edges_info = {}
  9. # 使用队列来遍历剧情图,初始为 None 是为了从初始顶点开始
  10. queue = [None]
  11. def createEdge(edge_id: int):
  12. """
  13. 创建节点信息到 edges_info
  14. """
  15. edges_info[edge_id] = {
  16. "title": None,
  17. "cid": None,
  18. "option": None
  19. }
  20. while queue:
  21. # 出队
  22. edge_id = queue.pop()
  23. if edge_id in edges_info and edges_info[edge_id]['title'] is not None and edges_info[edge_id]['cid'] is not None:
  24. # 该情况为已获取到所有信息,说明是跳转到之前已处理的顶点,不作处理
  25. continue
  26. # 获取顶点信息,最大重试 3 次
  27. retry = 3
  28. while True:
  29. try:
  30. node = await interactive_video.get_edge_info(BVID, graph_version, edge_id)
  31. # 打印当前顶点信息
  32. print(node['edge_id'], node['title'])
  33. break
  34. except Exception as e:
  35. retry -= 1
  36. if retry < 0:
  37. raise e
  38. # 检查节顶点是否在 edges_info 中,本次步骤得到 title 信息
  39. if node['edge_id'] not in edges_info:
  40. # 不在,新建
  41. createEdge(node['edge_id'])
  42. # 设置 title
  43. edges_info[node['edge_id']]['title'] = node['title']
  44. # 起始顶点,需要用额外技巧获得 cid
  45. if edge_id is None:
  46. for s in node['story_list']:
  47. if s['edge_id'] == 1:
  48. edges_info[node['edge_id']]['cid'] = s['cid']
  49. # 无可达顶点,即不能再往下走了,类似树的叶子节点
  50. if 'questions' not in node['edges']:
  51. continue
  52. # 遍历所有可达顶点
  53. for q in node['edges']['questions']:
  54. for c in q['choices']:
  55. # 该步骤获取顶点的 cid(视频分 P 的 ID)
  56. if c['id'] not in edges_info:
  57. createEdge(c['id'])
  58. edges_info[c['id']]['cid'] = c['cid']
  59. edges_info[c['id']]['option'] = c['option']
  60. # 所有可达顶点 ID 入队
  61. queue.insert(0, c['id'])
  62. print(json.dumps(edges_info, indent=2, ensure_ascii=False))
  63. sync(main())

示例:提交情节图

  • best_story.py
  1. from bilibili_storytree import StoryGraph, ScriptNode
  2. class Oscar:
  3. def __init__(self, videos: dict, aid: int):
  4. self.videos = videos
  5. # Create graph
  6. self.graph = StoryGraph(aid=aid)
  7. def gen_simple_graph(self, root_title: str):
  8. # 建情节树:只有一个分支剧情模块
  9. root_video = self.videos[root_title]
  10. n_root = ScriptNode(node_type="videoNode", isRoot=True)
  11. n_root.set_video(video=root_video)
  12. self.graph._update_script_nodes([n_root]) # update graph["script"]["nodes"]
  13. self.graph._sync_nodes() # create graph["nodes"]
  14. self.graph._sync_vars() # create graph["regional_vars"]
  • ivideo_submit.py
  1. import os
  2. import asyncio
  3. from bilibili_api import video, interactive_video, Credential
  4. import re
  5. from best_story import Oscar
  6. import json
  7. import time
  8. SESSDATA = "记得填"
  9. BILI_JCT = "记得填"
  10. BUVID3 = "记得填"
  11. def reform_videos(videos):
  12. video_objs = {}
  13. for v in videos:
  14. video_objs[v["title"]] = v
  15. return video_objs
  16. async def save_tree(bvid):
  17. # 实例化 Credential 类
  18. credential = Credential(sessdata=SESSDATA, bili_jct=BILI_JCT, buvid3=BUVID3)
  19. # 查询交互视频信息
  20. info = await interactive_video.up_get_ivideo_pages(bvid=bvid, credential=credential)
  21. vobjs = reform_videos(info["videos"])
  22. aid = info["videos"][0]["aid"]
  23. # 自定义一个情节树
  24. g = Oscar(videos=vobjs, aid=aid)
  25. g.gen_simple_graph(root_title="root") # 记得改 pick one video title
  26. g.graph.pretty_print()
  27. story = json.dumps({"graph": g.graph._serialize()})
  28. #print(story)
  29. # 上传情节树
  30. graph_result = await interactive_video.up_submit_story_tree(story_tree=story, credential=credential)
  31. return graph_result
  32. UPLOAD_CONFIG = {
  33. "copyright": 1, #"1 自制,2 转载。",
  34. "source": "", #"str, 视频来源。投稿类型为转载时注明来源,为原创时为空。",
  35. "desc": "", #"str, 视频简介。",
  36. "desc_format_id": 0,
  37. "dynamic": "", #"str, 动态信息。",
  38. "interactive": 1,
  39. "open_elec": 0, #"int, 是否展示充电信息。1 为是,0 为否。",
  40. "no_reprint": 1, #"int, 显示未经作者授权禁止转载,仅当为原创视频时有效。1 为启用,0 为关闭。",
  41. "subtitles": {
  42. "lan": "", #"字幕语言,不清楚作用请将该项设置为空",
  43. "open": 0
  44. },
  45. "tag": "学习,测试", #"str, 视频标签。使用英文半角逗号分隔的标签组。示例:标签1,标签2,标签3",
  46. "tid": 208, #"int, 分区ID。可以使用 channel 模块进行查询。",
  47. #"title": "jump jump jump", #"视频标题",
  48. "up_close_danmaku": False, #"bool, 是否关闭弹幕。",
  49. "up_close_reply": False, #"bool, 是否关闭评论。",
  50. }
  51. async def upload_videos(video_dir, cover_img, title):
  52. # 上传多P视频到互动视频
  53. cover = open(cover_img, "r+b")
  54. videos = []
  55. for filename in os.listdir(video_dir):
  56. if filename.endswith(".mp4"):
  57. video_file = os.path.join(video_dir, filename)
  58. print(video_file)
  59. videos.append(video.VideoUploaderPageObject(video_stream=open(video_file, "r+b"), title=filename.split(".")[0]))
  60. config = UPLOAD_CONFIG
  61. config["title"] = title
  62. credential = Credential(sessdata=SESSDATA, bili_jct=BILI_JCT, buvid3=BUVID3)
  63. uploader = video.VideoUploader(cover=cover, cover_type="jpg", pages=videos, config=config, credential=credential)
  64. info = await uploader.start()
  65. return info
  66. if __name__ == '__main__':
  67. folder = "directory contains video files" # 记得填
  68. img = "image file path" # 记得填
  69. title = "interactive video name" # 记得填
  70. info = asyncio.get_event_loop().run_until_complete(upload_videos(folder, img, title))
  71. print(info) # {"bvid": "ssssss", "aid": ddddddd}
  72. bvid = info["bvid"]
  73. time.sleep(120) # 为了 B 站视频编码的时间, 躺平 2 分钟
  74. graph_result = asyncio.get_event_loop().run_until_complete(save_tree(bvid))
  75. print(graph_result) # 成功之后,编辑树会延迟,不超过5分钟。 如果觉得有问题,就在执行一次上个存树的命令, 这时多半立即执行,推测是队列机制。