id: timetravel.md related_key: Time Travel

summary: Learn how to search with Time Travel in Milvus.

使用 Time Travel 搜索

当前主题介绍如何在向量搜索过程中使用 Time Travel 功能。

Milvus 为所有数据插入和删除操作维护一条时间线。它允许用户在搜索中指定时间戳,以便检索指定时间点的数据视图,而无需花费大量资源维护集群使其回滚数据。

默认情况下,Milvus 允许 Time Travel 的跨度为 432,000 秒(120h0m0s)。你可以在 common.retentionDuration 中配置此参数。

准备工作

以下的示例代码演示了插入数据之前的步骤。

如果你在现有 Milvus 实例中使用自己的数据集,则可以继续下一步。

{{fragments/multiple_code.md}}

  1. from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType
  2. connections.connect("default", host='localhost', port='19530')
  3. collection_name = "test_time_travel"
  4. schema = CollectionSchema([
  5. FieldSchema("pk", DataType.INT64, is_primary=True),
  6. FieldSchema("example_field", dtype=DataType.FLOAT_VECTOR, dim=2)
  7. ])
  8. collection = Collection(collection_name, schema)
  1. const { MilvusClient } =require("@zilliz/milvus2-sdk-node");
  2. const milvusClient = new MilvusClient("localhost:19530");
  3. const params = {
  4. collection_name: "test_time_travel",
  5. fields: [{
  6. name: "example_field",
  7. description: "",
  8. data_type: 101, // DataType.FloatVector
  9. type_params: {
  10. dim: "2",
  11. },
  12. },
  13. {
  14. name: "pk",
  15. data_type: 5, //DataType.Int64
  16. is_primary_key: true,
  17. description: "",
  18. },
  19. ],
  20. };
  21. await milvusClient.collectionManager.createCollection(params);
  1. // This function is under active development on the GO client.
  1. // Java User Guide will be ready soon.
  1. connect -h localhost -p 19530 -a default
  2. create collection -c test_time_travel -f pk:INT64:primary_field -f example_field:FLOAT_VECTOR:2 -p pk

插入第一批数据

插入随机数据以模拟原始数据(Milvus CLI 示例使用包含类似数据的预构建远程 CSV 文件)。

{{fragments/multiple_code.md}}

  1. import random
  2. data = [
  3. [i for i in range(10)],
  4. [[random.random() for _ in range(2)] for _ in range(10)],
  5. ]
  6. batch1 = collection.insert(data)
  1. const entities1 = Array.from({ length: 10 }, (v, k) => ({
  2. "example_field": Array.from({ length: 2 }, () => Math.random()),
  3. "pk": k,
  4. }));
  5. const batch1 = milvusClient.dataManager.insert({
  6. collection_name: "test_time_travel",
  7. fields_data: entities1,
  8. });
  1. // This function is under active development on the GO client.
  1. // Java User Guide will be ready soon.
  1. import -c test_time_travel https://raw.githubusercontent.com/zilliztech/milvus_cli/main/examples/user_guide/search_with_timetravel_1.csv
  2. Reading file from remote URL.
  3. Reading csv rows... [####################################] 100%
  4. Column names are ['pk', 'example_field']
  5. Processed 11 lines.
  6. Inserted successfully.
  7. -------------------------- ------------------
  8. Total insert entities: 10
  9. Total collection entities: 10
  10. Milvus timestamp: 430390410783752199
  11. -------------------------- ------------------

查看第一批数据的时间戳

查看第一批数据的时间戳并使用 Time Travel 进行搜索。在同一批次中插入的数据共享相同的时间戳。

  1. batch1.timestamp
  2. 428828271234252802
  1. batch1.timestamp
  2. 428828271234252802
  1. // This function is under active development on the GO client.
  1. // Java User Guide will be ready soon.
  1. # Milvus CLI automatically returns the timestamp as shown in the previous step.
Milvus 采用物理时钟和逻辑计数器的组合作为混合时间戳。 64 位时间戳由 46 位物理部分(高位)和 18 位逻辑部分(低位)组成。物理部分是自 1970 年 1 月 1 日(UTC/GMT 午夜)以来经过的毫秒数。

插入第二批数据

插入第二批数据以模拟脏数据,其中追加一条 primary key 值为 19,向量值为 [1.0,1.0] 的数据作为后续步骤搜索的目标数据(Milvus CLI 示例使用包含类似数据的预构建远程 CSV 文件)。

{{fragments/multiple_code.md}}

  1. data = [
  2. [i for i in range(10, 20)],
  3. [[random.random() for _ in range(2)] for _ in range(9)],
  4. ]
  5. data[1].append([1.0,1.0])
  6. batch2 = collection.insert(data)
  1. const entities2 = Array.from({
  2. length: 9
  3. }, (v, k) => ({
  4. "example_field": Array.from({
  5. length: 2
  6. }, () => Math.random()),
  7. "pk": k + 10,
  8. }));
  9. entities2.push({
  10. "pk": 19,
  11. "example_field": [1.0, 1.0],
  12. });
  13. const batch2 = await milvusClient.dataManager.insert({
  14. collection_name: "test_time_travel",
  15. fields_data: entities2,
  16. });
  1. // This function is under active development on the GO client.
  1. // Java User Guide will be ready soon.
  1. import -c test_time_travel https://raw.githubusercontent.com/zilliztech/milvus_cli/main/examples/user_guide/search_with_timetravel_2.csv
  2. Reading file from remote URL.
  3. Reading csv rows... [####################################] 100%
  4. Column names are ['pk', 'example_field']
  5. Processed 11 lines.
  6. Inserted successfully.
  7. -------------------------- ------------------
  8. Total insert entities: 10
  9. Total collection entities: 20
  10. Milvus timestamp: 430390435713122310
  11. -------------------------- ------------------

使用指定的时间戳搜索

加载 collection 并使用第一批数据的时间戳搜索目标数据。指定时间戳后,Milvus 只会在时间戳指示的时间点检索数据视图。

{{fragments/multiple_code.md}}

  1. collection.load()
  2. search_param = {
  3. "data": [[1.0, 1.0]],
  4. "anns_field": "example_field",
  5. "param": {"metric_type": "L2"},
  6. "limit": 10,
  7. "travel_timestamp": batch1.timestamp,
  8. }
  9. res = collection.search(**search_param)
  10. res[0].ids
  1. await milvusClient.collectionManager.loadCollection({
  2. collection_name: "test_time_travel",
  3. });
  4. const res = await milvusClient.dataManager.search({
  5. collection_name: "test_time_travel",
  6. vectors: [
  7. [1.0, 1.0]
  8. ],
  9. travel_timestamp: batch1.timestamp,
  10. search_params: {
  11. anns_field: "example_field",
  12. topk: "10",
  13. metric_type: "L2",
  14. params: JSON.stringify({
  15. nprobe: 10
  16. }),
  17. },
  18. vector_type: 101, // DataType.FloatVector,
  19. });
  20. console.log(res1.results)
  1. // This function is under active development on the GO client.
  1. // Java User Guide will be ready soon.
  1. search
  2. Collection name (test_collection_query, test_time_travel): test_time_travel
  3. The vectors of search data (the length of data is number of query (nq), the dim of every vector in data must be equal to vector fields of collection. You can also import a CSV file without headers): [[1.0, 1.0]]
  4. The vector field used to search of collection (example_field): example_field
  5. The specified number of decimal places of returned distance [-1]:
  6. The max number of returned record, also known as topk: 10
  7. The boolean expression used to filter attribute []:
  8. The names of partitions to search (split by "," if multiple) ['_default'] []:
  9. Timeout []:
  10. Guarantee Timestamp(It instructs Milvus to see all operations performed before a provided timestamp. If no such timestamp is provided, then Milvus will search all operations performed to date) [0]:
  11. Travel Timestamp(Specify a timestamp in a search to get results based on a data view) [0]: 430390410783752199

如下所示,目标数据本身和后面插入的其他数据都不会作为结果返回。

  1. [8, 7, 4, 2, 5, 6, 9, 3, 0, 1]
  1. [8, 7, 4, 2, 5, 6, 9, 3, 0, 1]
  1. // This function is under active development on the GO client.
  1. // Java User Guide will be ready soon.
  1. Search results:
  2. No.1:
  3. +---------+------+------------+-----------+
  4. | Index | ID | Distance | Score |
  5. +=========+======+============+===========+
  6. | 0 | 2 | 0.0563737 | 0.0563737 |
  7. +---------+------+------------+-----------+
  8. | 1 | 5 | 0.122474 | 0.122474 |
  9. +---------+------+------------+-----------+
  10. | 2 | 3 | 0.141737 | 0.141737 |
  11. +---------+------+------------+-----------+
  12. | 3 | 8 | 0.331008 | 0.331008 |
  13. +---------+------+------------+-----------+
  14. | 4 | 0 | 0.618705 | 0.618705 |
  15. +---------+------+------------+-----------+
  16. | 5 | 1 | 0.676788 | 0.676788 |
  17. +---------+------+------------+-----------+
  18. | 6 | 9 | 0.69871 | 0.69871 |
  19. +---------+------+------------+-----------+
  20. | 7 | 6 | 0.706456 | 0.706456 |
  21. +---------+------+------------+-----------+
  22. | 8 | 4 | 0.956929 | 0.956929 |
  23. +---------+------+------------+-----------+
  24. | 9 | 7 | 1.19445 | 1.19445 |
  25. +---------+------+------------+-----------+

如果不指定时间戳或指定第二批数据的时间戳,Milvus 将返回两个批次的结果。

{{fragments/multiple_code.md}}

  1. batch2.timestamp
  2. 428828283406123011
  3. search_param = {
  4. "data": [[1.0, 1.0]],
  5. "anns_field": "example_field",
  6. "param": {"metric_type": "L2"},
  7. "limit": 10,
  8. "travel_timestamp": batch2.timestamp,
  9. }
  10. res = collection.search(**search_param)
  11. res[0].ids
  12. [19, 10, 8, 7, 4, 17, 2, 5, 13, 15]
  1. batch2.timestamp
  2. 428828283406123011
  3. const res2 = await milvusClient.dataManager.search({
  4. collection_name: "test_time_travel",
  5. vectors: [
  6. [1.0, 1.0]
  7. ],
  8. travel_timestamp: batch2.timestamp,
  9. search_params: {
  10. anns_field: "example_field",
  11. topk: "10",
  12. metric_type: "L2",
  13. params: JSON.stringify({
  14. nprobe: 10
  15. }),
  16. },
  17. vector_type: 101, // DataType.FloatVector,
  18. });
  19. console.log(res2.results)
  1. // This function is under active development on the GO client.
  1. // Java User Guide will be ready soon.
  1. search
  2. Collection name (test_collection_query, test_time_travel): test_time_travel
  3. The vectors of search data (the length of data is number of query (nq), the dim of every vector in data must be equal to vector fields of collection. You can also import a CSV file without headers): [[1.0, 1.0]]
  4. The vector field used to search of collection (example_field): example_field
  5. The specified number of decimal places of returned distance [-1]:
  6. The max number of returned record, also known as topk: 10
  7. The boolean expression used to filter attribute []:
  8. The names of partitions to search (split by "," if multiple) ['_default'] []:
  9. Timeout []:
  10. Guarantee Timestamp(It instructs Milvus to see all operations performed before a provided timestamp. If no such timestamp is provided, then Milvus will search all operations performed to date) [0]:
  11. Travel Timestamp(Specify a timestamp in a search to get results based on a data view) [0]:
  12. Search results:
  13. No.1:
  14. +---------+------+------------+------------+
  15. | Index | ID | Distance | Score |
  16. +=========+======+============+============+
  17. | 0 | 19 | 0 | 0 |
  18. +---------+------+------------+------------+
  19. | 1 | 12 | 0.00321393 | 0.00321393 |
  20. +---------+------+------------+------------+
  21. | 2 | 2 | 0.0563737 | 0.0563737 |
  22. +---------+------+------------+------------+
  23. | 3 | 5 | 0.122474 | 0.122474 |
  24. +---------+------+------------+------------+
  25. | 4 | 3 | 0.141737 | 0.141737 |
  26. +---------+------+------------+------------+
  27. | 5 | 10 | 0.238646 | 0.238646 |
  28. +---------+------+------------+------------+
  29. | 6 | 8 | 0.331008 | 0.331008 |
  30. +---------+------+------------+------------+
  31. | 7 | 18 | 0.403166 | 0.403166 |
  32. +---------+------+------------+------------+
  33. | 8 | 13 | 0.508617 | 0.508617 |
  34. +---------+------+------------+------------+
  35. | 9 | 11 | 0.531529 | 0.531529 |
  36. +---------+------+------------+------------+

为搜索生成时间戳

在没有记录之前的时间戳的情况下,Milvus 允许你使用现有的时间戳、Unix Epoch 时间或日期时间生成时间戳。

下面的示例模拟了意外的删除操作,并展示如何生成删除操作前的时间戳并使用它进行搜索。

根据删除前的日期时间或 Unix Epoch 时间生成时间戳。

  1. import datetime
  2. datetime = datetime.datetime.now()
  3. from pymilvus import utility
  4. pre_del_timestamp = utility.mkts_from_datetime(datetime)
  1. const { datetimeToHybrids } = require("@zilliz/milvus2-sdk-node/milvus/utils/Format");
  2. const datetime = new Date().getTime()
  3. const pre_del_timestamp = datetimeToHybrids(datetime)
  1. // This function is under active development on the GO client.
  1. // Java User Guide will be ready soon.
  1. calc mkts_from_unixtime -e 1641809375
  2. 430390476800000000

删除部分数据以模拟意外删除操作。

  1. expr = "pk in [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]"
  2. collection.delete(expr)
  1. const expr = "pk in [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]"
  2. await milvusClient.dataManager.deleteEntities({
  3. collection_name: "test_time_travel",
  4. expr: expr,
  5. });
  1. // This function is under active development on the GO client.
  1. // Java User Guide will be ready soon.
  1. delete entities -c test_time_travel
  2. The expression to specify entities to be deleted, such as "film_id in [ 0, 1 ]": pk in [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
  3. You are trying to delete the entities of collection. This action cannot be undone!
  4. Do you want to continue? [y/N]: y
  5. (insert count: 0, delete count: 10, upsert count: 0, timestamp: 430390494161534983)

如下所示,如果不指定时间戳进行搜索,结果中不会返回被删除的 entity。

  1. search_param = {
  2. "data": [[1.0, 1.0]],
  3. "anns_field": "example_field",
  4. "param": {"metric_type": "L2"},
  5. "limit": 10,
  6. }
  7. res = collection.search(**search_param)
  8. res[0].ids
  1. const res3 = await milvusClient.dataManager.search({
  2. collection_name: "test_time_travel",
  3. vectors: [
  4. [1.0, 1.0]
  5. ],
  6. search_params: {
  7. anns_field: "example_field",
  8. topk: "10",
  9. metric_type: "L2",
  10. params: JSON.stringify({
  11. nprobe: 10
  12. }),
  13. },
  14. vector_type: 101, // DataType.FloatVector,
  15. });
  16. console.log(res3.results)
  1. // This function is under active development on the GO client.
  1. // Java User Guide will be ready soon.
  1. search
  2. Collection name (test_collection_query, test_time_travel): test_time_travel
  3. The vectors of search data (the length of data is number of query (nq), the dim of every vector in data must be equal to vector fields of collection. You can also import a CSV file without headers): [[1.0, 1.0]]
  4. The vector field used to search of collection (example_field): example_field
  5. The specified number of decimal places of returned distance [-1]:
  6. The max number of returned record, also known as topk: 10
  7. The boolean expression used to filter attribute []:
  8. The names of partitions to search (split by "," if multiple) ['_default'] []:
  9. Timeout []:
  10. Guarantee Timestamp(It instructs Milvus to see all operations performed before a provided timestamp. If no such timestamp is provided, then Milvus will search all operations performed to date) [0]:
  11. Travel Timestamp(Specify a timestamp in a search to get results based on a data view) [0]:
  12. Search results:
  13. No.1:
  14. +---------+------+------------+----------+
  15. | Index | ID | Distance | Score |
  16. +=========+======+============+==========+
  17. | 0 | 19 | 0 | 0 |
  18. +---------+------+------------+----------+
  19. | 1 | 5 | 0.122474 | 0.122474 |
  20. +---------+------+------------+----------+
  21. | 2 | 3 | 0.141737 | 0.141737 |
  22. +---------+------+------------+----------+
  23. | 3 | 13 | 0.508617 | 0.508617 |
  24. +---------+------+------------+----------+
  25. | 4 | 11 | 0.531529 | 0.531529 |
  26. +---------+------+------------+----------+
  27. | 5 | 17 | 0.593702 | 0.593702 |
  28. +---------+------+------------+----------+
  29. | 6 | 1 | 0.676788 | 0.676788 |
  30. +---------+------+------------+----------+
  31. | 7 | 9 | 0.69871 | 0.69871 |
  32. +---------+------+------------+----------+
  33. | 8 | 7 | 1.19445 | 1.19445 |
  34. +---------+------+------------+----------+
  35. | 9 | 15 | 1.53964 | 1.53964 |
  36. +---------+------+------------+----------+

使用删除之前的时间戳进行搜索。 Milvus 在删除操作之前的数据视图中检索 entity。

  1. search_param = {
  2. "data": [[1.0, 1.0]],
  3. "anns_field": "example_field",
  4. "param": {"metric_type": "L2"},
  5. "limit": 10,
  6. "travel_timestamp": pre_del_timestamp,
  7. }
  8. res = collection.search(**search_param)
  9. res[0].ids
  1. const res4 = await milvusClient.dataManager.search({
  2. collection_name: "test_time_travel",
  3. vectors: [
  4. [1.0, 1.0]
  5. ],
  6. travel_timestamp: pre_del_timestamp,
  7. search_params: {
  8. anns_field: "example_field",
  9. topk: "10",
  10. metric_type: "L2",
  11. params: JSON.stringify({
  12. nprobe: 10
  13. }),
  14. },
  15. vector_type: 101, // DataType.FloatVector,
  16. });
  17. console.log(res4.results)
  1. // This function is under active development on the GO client.
  1. // Java User Guide will be ready soon.
  1. search
  2. Collection name (test_collection_query, test_time_travel): test_time_travel
  3. The vectors of search data (the length of data is number of query (nq), the dim of every vector in data must be equal to vector fields of collection. You can also import a CSV file without headers): [[1.0, 1.0]]
  4. The vector field used to search of collection (example_field): example_field
  5. The specified number of decimal places of returned distance [-1]:
  6. The max number of returned record, also known as topk: 10
  7. The boolean expression used to filter attribute []:
  8. The names of partitions to search (split by "," if multiple) ['_default'] []:
  9. Timeout []:
  10. Guarantee Timestamp(It instructs Milvus to see all operations performed before a provided timestamp. If no such timestamp is provided, then Milvus will search all operations performed to date) [0]:
  11. Travel Timestamp(Specify a timestamp in a search to get results based on a data view) [0]: 430390476800000000
  12. Search results:
  13. No.1:
  14. +---------+------+------------+------------+
  15. | Index | ID | Distance | Score |
  16. +=========+======+============+============+
  17. | 0 | 19 | 0 | 0 |
  18. +---------+------+------------+------------+
  19. | 1 | 12 | 0.00321393 | 0.00321393 |
  20. +---------+------+------------+------------+
  21. | 2 | 2 | 0.0563737 | 0.0563737 |
  22. +---------+------+------------+------------+
  23. | 3 | 5 | 0.122474 | 0.122474 |
  24. +---------+------+------------+------------+
  25. | 4 | 3 | 0.141737 | 0.141737 |
  26. +---------+------+------------+------------+
  27. | 5 | 10 | 0.238646 | 0.238646 |
  28. +---------+------+------------+------------+
  29. | 6 | 8 | 0.331008 | 0.331008 |
  30. +---------+------+------------+------------+
  31. | 7 | 18 | 0.403166 | 0.403166 |
  32. +---------+------+------------+------------+
  33. | 8 | 13 | 0.508617 | 0.508617 |
  34. +---------+------+------------+------------+
  35. | 9 | 11 | 0.531529 | 0.531529 |
  36. +---------+------+------------+------------+

更多内容