Pipeline

Pipeline是数据入库时流经的管道,用户可自定义,以便对接其他数据库。

框架已内置mysql及mongo管道,其他管道作为扩展方式提供,可从feapder_pipelines项目中按需安装

项目地址:https://github.com/Boris-code/feapder_pipelines

使用方式

注:item会被聚合成多条一起流经pipeline,方便批量入库

1. 编写pipeline

  1. from feapder.pipelines import BasePipeline
  2. from typing import Dict, List, Tuple
  3. class Pipeline(BasePipeline):
  4. """
  5. pipeline 是单线程的,批量保存数据的操作,不建议在这里写网络请求代码,如下载图片等
  6. """
  7. def save_items(self, table, items: List[Dict]) -> bool:
  8. """
  9. 保存数据
  10. Args:
  11. table: 表名
  12. items: 数据,[{},{},...]
  13. Returns: 是否保存成功 True / False
  14. 若False,不会将本批数据入到去重库,以便再次入库
  15. """
  16. print("自定义pipeline, 保存数据 >>>>", table, items)
  17. return True
  18. def update_items(self, table, items: List[Dict], update_keys=Tuple) -> bool:
  19. """
  20. 更新数据, 与UpdateItem配合使用,若爬虫中没使用UpdateItem,则可不实现此接口
  21. Args:
  22. table: 表名
  23. items: 数据,[{},{},...]
  24. update_keys: 更新的字段, 如 ("title", "publish_time")
  25. Returns: 是否更新成功 True / False
  26. 若False,不会将本批数据入到去重库,以便再次入库
  27. """
  28. print("自定义pipeline, 更新数据 >>>>", table, items, update_keys)
  29. return True

Pipeline需继承BasePipeline,类名和存放位置随意,需要实现save_items接口。一定要有返回值,返回False表示数据没保存成功,会触发重试逻辑

update_items接口与UpdateItem配合使用,更新数据时使用,若爬虫中没使用UpdateItem,则可不实现此接口

2. 编写配置文件

  1. # 数据入库的pipeline,支持多个
  2. ITEM_PIPELINES = [
  3. "pipeline.Pipeline"
  4. ]

将编写好的pipeline配置进来,值为类的模块路径,需要指定到具体的类名

示例

地址:https://github.com/Boris-code/feapder/tree/master/tests/test-pipeline