示例:获取视频信息并点赞

  1. import asyncio
  2. from bilibili_api import video, Credential
  3. SESSDATA = ""
  4. BILI_JCT = ""
  5. BUVID3 = ""
  6. async def main():
  7. # 实例化 Credential 类
  8. credential = Credential(sessdata=SESSDATA, bili_jct=BILI_JCT, buvid3=BUVID3)
  9. # 实例化 Video 类
  10. v = video.Video(bvid="BVxxxxxxxx", credential=credential)
  11. # 获取视频信息
  12. info = await v.get_info()
  13. # 打印视频信息
  14. print(info)
  15. # 给视频点赞
  16. await v.like(True)
  17. if __name__ == '__main__':
  18. # 主入口
  19. asyncio.get_event_loop().run_until_complete(main())

示例:视频在线人数监测

  1. from bilibili_api import video
  2. import asyncio
  3. # 实例化
  4. v = video.VideoOnlineMonitor(bvid="BV1AV411x7Gs")
  5. @v.on('ONLINE')
  6. async def on_online_update(event):
  7. """
  8. 在线人数更新
  9. """
  10. print(event)
  11. @v.on('DANMAKU')
  12. async def on_danmaku(event):
  13. """
  14. 收到实时弹幕
  15. """
  16. print(event)
  17. if __name__ == '__main__':
  18. # 主入口,v.connect() 为连接服务器
  19. asyncio.get_event_loop().run_until_complete(v.connect())

示例:获取视频弹幕

  1. from bilibili_api import video, sync
  2. v = video.Video(bvid='BV1AV411x7Gs')
  3. dms = sync(v.get_danmakus(0))
  4. for dm in dms:
  5. print(dm)

示例:下载视频

  1. import asyncio
  2. from bilibili_api import video, Credential
  3. import aiohttp
  4. import os
  5. SESSDATA = ""
  6. BILI_JCT = ""
  7. BUVID3 = ""
  8. # FFMPEG 路径,查看:http://ffmpeg.org/
  9. FFMPEG_PATH = "ffmpeg"
  10. async def main():
  11. # 实例化 Credential 类
  12. credential = Credential(sessdata=SESSDATA, bili_jct=BILI_JCT, buvid3=BUVID3)
  13. # 实例化 Video 类
  14. v = video.Video(bvid="BV1AV411x7Gs", credential=credential)
  15. # 获取视频下载链接
  16. url = await v.get_download_url(0)
  17. # 视频轨链接
  18. video_url = url["dash"]["video"][0]['baseUrl']
  19. # 音频轨链接
  20. audio_url = url["dash"]["audio"][0]['baseUrl']
  21. HEADERS = {
  22. "User-Agent": "Mozilla/5.0",
  23. "Referer": "https://www.bilibili.com/"
  24. }
  25. async with aiohttp.ClientSession() as sess:
  26. # 下载视频流
  27. async with sess.get(video_url, headers=HEADERS) as resp:
  28. length = resp.headers.get('content-length')
  29. with open('video_temp.m4s', 'wb') as f:
  30. process = 0
  31. while True:
  32. chunk = await resp.content.read(1024)
  33. if not chunk:
  34. break
  35. process += len(chunk)
  36. print(f'下载视频流 {process} / {length}')
  37. f.write(chunk)
  38. # 下载音频流
  39. async with sess.get(audio_url, headers=HEADERS) as resp:
  40. length = resp.headers.get('content-length')
  41. with open('audio_temp.m4s', 'wb') as f:
  42. process = 0
  43. while True:
  44. chunk = await resp.content.read(1024)
  45. if not chunk:
  46. break
  47. process += len(chunk)
  48. print(f'下载音频流 {process} / {length}')
  49. f.write(chunk)
  50. # 混流
  51. print('混流中')
  52. os.system(f'{FFMPEG_PATH} -i video_temp.m4s -i audio_temp.m4s -vcodec copy -acodec copy video.mp4')
  53. # 删除临时文件
  54. os.remove("video_temp.m4s")
  55. os.remove("audio_temp.m4s")
  56. print('已下载为:video.mp4')
  57. if __name__ == '__main__':
  58. # 主入口
  59. asyncio.get_event_loop().run_until_complete(main())