Jean's Blog

一个专注软件测试开发技术的个人博客

0%

RAG组件--向量数据库集合

向量数据库基本处理流程

以Milvus向量数据库为例

image-20250818180534338

  • 核心指标:向量数据库处理需平衡三个关键指标:准确率(逼近模型上限)、检索速度、内存消耗
  • 完整流程:
    • 数据准备:游戏数据/医疗数据等原始数据
    • 向量化处理:使用OpenAI Embedding Model等模型生成向量
    • 数据库操作:
      • 创建Collection并定义Schema
      • 创建索引(索引类型直接影响检索效率)
      • 批量插入向量数据形成实体
    • 检索功能:
      • 相似度搜索
      • 条件过滤
      • 向量距离计算
      • 属性筛选
  • 关键因素:
    • 模型能力:Embedding模型质量决定准确率上限
    • 索引优化:需根据数据量和内存情况选择最优索引参数
    • 工程考量:不能简单使用AUTOINDEX,需了解不同索引方式的适用场景

数据库

Milvus官方网址:https://milvus.io/

在 Milvus 中,数据库是组织和管理数据的逻辑单元。多租户创建多个数据库,为不同的应用程序或租户从逻辑上隔离数据。例如,创建一个数据库用于存储用户 A 的数据,另一个数据库用于存储用户 B 的数据。

  • 逻辑单元:数据库是组织管理数据的逻辑单元,支持多租户隔离

  • 创建方法,参考代码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    from pymilvus import MilvusClient

    client = MilvusClient(
    uri="http://localhost:19530",
    token="root:Milvus"
    )

    client.create_database(
    db_name="my_database_1"
    )

  • 版本要求:需使用Standalone版(Local版功能受限,不支持量化索引等高级功能)

数据库操作

  1. 连接Milvus客户端

    • uri: 协议+地址+端口,默认为 http://localhost:19530

    • token: “用户名:密码”,默认 root:Milvus (注:也可以为空)

      1
      2
      3
      4
      5
      6
      from pymilvus import MilvusClient, exceptions

      client = MilvusClient(
      uri="http://localhost:19530",
      # token="root:Milvus"
      )
  1. 创建数据库

    1
    2
    3
    4
    5
    6
    7
    from pymilvus import exceptions

    try:
    client.create_database(db_name="test_vector_db")
    print("✓ test_vector_db创建成功")
    except exceptions.AlreadyExistError:
    print("ℹ test_vector_db 已存在")
  1. 列出所有数据库

    1
    2
    db_list = client.list_databases()
    print("当前所有数据库:", db_list)
  1. 查看数据库详情

    1
    2
    default_info = client.describe_database(db_name="default")
    print("默认数据库详情:", default_info)
  1. 修改数据库属性:限制最大集合数为10

    1
    2
    3
    4
    5
    client.alter_database_properties(
    db_name="test_vector_db",
    properties={"database.max.collections": 10}
    )
    print("✓ 已为 test_vector_db 限制最大集合数为 10")
  1. 删除数据最大集合限制

    1
    2
    3
    4
    5
    client.drop_database_properties(
    db_name="test_vector_db",
    property_keys=["database.max.collections"]
    )
    print("✓ 已移除 test_vector_db 的最大集合数限制")
  1. 切换数据库

    1
    2
    client.use_database(db_name="dev_vector_db")
    print("✓ 已切换当前数据库为 dev_vector_db")
  1. 删除数据库

    • 注意:如果库内有 Collection,需先 client.drop_collection() 将其清空
    1
    2
    client.drop_database(db_name="test_vector_db")
    print("✓ test_vector_db 已删除")

集合:Collection

  • 基本概念:Collection是管理数据的容器,可以创建多个Collections来组织数据,类似于关系数据库中的表。
  • 数据结构:采用二维表结构,具有固定的列(字段)和变化的行(实体),每列代表一个字段,每行代表一个实体。
  • 类比关系:Collections对应数据库表,实体(Entity)对应表中的记录

image-20250819110839640

集合的Schema

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from pymilvus import MilvusClient, DataType

client = MilvusClient(
uri="http://localhost:19530",
token="root:Milvus"
)

# Create schema
schema = MilvusClient.create_schema(
auto_id=False,
enable_dynamic_field=True,
)

# Add fields to schema
schema.add_field(field_name="my_id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="my_vector", datatype=DataType.FLOAT_VECTOR, dim=5)
schema.add_field(field_name="my_varchar", datatype=DataType.VARCHAR, max_length=512)

  • 定义:Schema规定了集合的数据结构,在创建集合时必须明确设计。
  • 核心字段:
    • ID字段:必须存在的主键字段,支持整数或字符串类型
    • Vector字段:存储向量数据的核心字段
    • Text字段:与向量对应的原始文本字段
  • 动态字段:可通过设置enable_dynamic_field=True允许后续添加未定义的字段

集合的字段架构

image-20250819111346793

  • 字段限制:
    • 必须包含1个主键字段
    • 最多支持4个向量字段
  • 多向量应用:
    • 支持混合检索(密集向量+稀疏向量)
    • 支持多模态检索(文本+图像等)

主键和Auto ID

与关系数据库中的主字段类似,Collection 也有一个主字段,用于将实体与其他实体区分开来。主字段中的每个值都是全局唯一的,并与一个特定实体相对应。

名为id的字段是主字段。主字段只接受整数或字符串。

  • 插入实体时,默认情况下应包含主字段值。
  • 如果在创建 Collections 时启用了AutoId,Milvus 将在插入数据时生成这些值。此时,从要插入的实体中排除主字段值。

总结:

  • 主键特性:
    • 每个值全局唯一
    • 支持整数或字符串类型
    • 用于精确查询而非向量检索
  • Auto ID:
    • 创建集合时可启用自动生成主键功能
    • 启用后插入数据时无需指定主键值
    • 系统会自动分配唯一ID值

集合的操作

1
2
3
4
5
6
7
from pymilvus import MilvusClient

# 连接Miilvus
client = MilvusClient(
uri="http://localhost:19530",
# token="root:Milvus"
)
  1. 创建 Collection

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    # 检查并删除已存在的集合
    collection_name = "quick_setup"

    if collection_name in client.list_collections():
    client.drop_collection(collection_name=collection_name)
    print(f"✓ 已删除已存在的集合 {collection_name}")

    # 创建新集合,
    client.create_collection(
    collection_name=collection_name,
    dimension=5
    )
    print(f"✓ {collection_name} 已创建") # 没有指定数据库创建集合,则集合会创建在default下

    使用默认设置立即创建集合时,将应用以下设置:

    • 将主要字段和向量字段添加到模式(id和矢量)中。
    • 主字段接受整数并禁用AutoId。
    • 向量字段接受浮点向量嵌入。
    • AUTOINDEX用于在矢量字段上创建索引。
    • COSINE用于测量向量嵌入之间的相似性。
    • 启用名为$meta的保留动态字段,以键值对的形式保存非模式定义的字段及其值。
    • 该集合在创建时自动加载。

创建了包含索引参数的集合,Milvus 会在创建时自动加载该集合。在这种情况下,索引参数中提到的所有字段都会被索引。

1
2
3
4
5
client.create_collection(
collection_name="customized_setup_1",
schema=schema,
index_params=index_params
)
  1. 列出所有 Collections

    1
    2
    cols = client.list_collections()
    print("当前所有集合:", cols)
  1. 查看 Collection 详情

    1
    2
    info = client.describe_collection(collection_name=collection_name)
    print(f"{collection_name} 详情:", info)
  1. 重命名 Collection

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    new_collection_name = "quick_renamed"

    if new_collection_name in client.list_collections():
    client.drop_collection(collection_name=new_collection_name)
    print(f"✓ 已删除已存在的集合 {new_collection_name}")

    client.rename_collection(
    old_name=collection_name,
    new_name=new_collection_name
    )
    print(f"✓ {collection_name} 已重命名为 {new_collection_name}")
  1. 修改 Collection 属性(设置 TTL 60 秒)

    1
    2
    3
    4
    5
    client.alter_collection_properties(
    collection_name=new_collection_name,
    properties={"collection.ttl.seconds": 60}
    )
    print(f"✓ 已为 {new_collection_name} 设置 TTL=60s")
  1. 删除 Collection 属性(TTL)

    1
    2
    3
    4
    5
    client.drop_collection_properties(
    collection_name=new_collection_name,
    property_keys=["collection.ttl.seconds"]
    )
    print(f"✓ 已删除 {new_collection_name} 的 TTL 属性")
  1. 加载 & 检查加载状态

    1
    2
    3
    client.load_collection(collection_name=new_collection_name)
    state = client.get_load_state(collection_name=new_collection_name)
    print("加载状态:", state)
  1. 释放 & 检查释放状态

    1
    2
    3
    client.release_collection(collection_name=new_collection_name)
    state = client.get_load_state(collection_name=new_collection_name)
    print("释放后状态:", state)
  1. 管理 Partition(分区)

    1
    2
    3
    4
    # 9.1 列出 Partition(默认只有 "_default")

    parts = client.list_partitions(collection_name=new_collection_name)
    print("Partition 列表:", parts)
    1
    2
    3
    4
    5
    6
    7
    8
    # 9.2 创建新 Partition

    client.create_partition(
    collection_name=new_collection_name,
    partition_name="partA"
    )
    print("✓ 已创建 partition partA")
    print("更新后 Partition 列表:", client.list_partitions(new_collection_name))
    1
    2
    3
    4
    5
    6
    7
    # 9.3 检查 Partition 是否存在

    exists = client.has_partition(
    collection_name=new_collection_name,
    partition_name="partA"
    )
    print("partA 存在?", exists)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    # 9.4 加载 & 释放 指定 Partition

    client.load_partitions(
    collection_name=new_collection_name,
    partition_names=["partA"]
    )
    print("partA 加载状态:", client.get_load_state(new_collection_name, partition_name="partA"))

    client.release_partitions(
    collection_name=new_collection_name,
    partition_names=["partA"]
    )
    print("partA 释放后状态:", client.get_load_state(new_collection_name, partition_name="partA"))
    1
    2
    3
    4
    5
    6
    7
    8
    # 9.5 删除 Partition(需先 release)

    client.drop_partition(
    collection_name=new_collection_name,
    partition_name="partA"
    )
    print("✓ 已删除 partition partA")
    print("最终 Partition 列表:", client.list_partitions(new_collection_name))
  1. 管理 Alias(别名)

    1
    2
    3
    4
    5
    6
    # 10.1 创建 Alias

    client.create_alias(collection_name=new_collection_name, alias="alias3")
    client.create_alias(collection_name=new_collection_name, alias="alias4")

    print("✓ 已创建 alias3, alias4")
    1
    2
    3
    4
    # 10.2 列出 Alias

    aliases = client.list_aliases(collection_name=new_collection_name)
    print("当前 aliases:", aliases)
    1
    2
    3
    4
    # 10.3 查看 Alias 详情

    desc = client.describe_alias(alias="alias3")
    print("alias3 详情:", desc)
    1
    2
    3
    4
    # 10.4 重新分配 Alias

    client.alter_alias(collection_name=new_collection_name, alias="alias4")
    print("✓ 已将 alias4 重新分配给 quick_renamed")
    1
    2
    3
    4
    5
    # 10.5 删除 Alias

    client.drop_alias(alias="alias4")
    print("✓ 已删除 alias4")
    print("剩余 aliases:", client.list_aliases(new_collection_name))
  1. 删除 Collection

    1
    2
    client.drop_collection(collection_name=new_collection_name) # 注:集合中的Alias也要删干净,否则集合也不会删除成功
    print(f"✓ 集合 {new_collection_name} 已删除")

Schema操作

1
2
3
4
5
6
7
from pymilvus import MilvusClient, DataType

# 连接Miilvus
client = MilvusClient(
uri="http://localhost:19530",
# token="root:Milvus"
)
  1. 创建基本 Schema

    1
    2
    schema = MilvusClient.create_schema()
    print("✓ 已创建空 Schema")
  1. 添加主键字段(Primary Field)

    1
    2
    3
    4
    5
    6
    7
    8
    # 2.1 INT64类型主键(手动指定ID)

    schema.add_field(
    field_name="id",
    datatype=DataType.INT64,
    is_primary=True, # 设置为主键
    auto_id=False # 不自动生成ID
    )
    1
    2
    3
    4
    5
    6
    7
    8
    9
    # 2.2 VARCHAR类型主键(自动生成ID)

    schema.add_field(
    field_name="doc_id",
    datatype=DataType.VARCHAR,
    is_primary=True, # 设置为主键
    auto_id=True, # 自动生成ID
    max_length=100 # VARCHAR类型需要指定最大长度
    )
  1. 添加向量字段(Vector Field)

    1
    2
    3
    4
    5
    6
    # 3.1 Dense Vector (浮点向量)
    schema.add_field(
    field_name="text_vector",
    datatype=DataType.FLOAT_VECTOR, # 32位浮点向量
    dim=768 # 向量维度
    )
    1
    2
    3
    4
    5
    6
    # 3.2 Binary Vector (二进制向量)
    schema.add_field(
    field_name="image_vector",
    datatype=DataType.BINARY_VECTOR, # 二进制向量
    dim=256 # 维度必须是8的倍数
    )
  1. 添加标量字段(Scalar Field)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    # 4.1 字符串字段
    schema.add_field(
    field_name="title",
    datatype=DataType.VARCHAR,
    max_length=200,
    # 可以为空且有默认值
    is_nullable=True,
    default_value="untitled"
    )
    1
    2
    3
    4
    5
    6
    # 4.2 数值字段
    schema.add_field(
    field_name="age",
    datatype=DataType.INT32,
    is_nullable=False # 不可为空
    )
    1
    2
    3
    4
    5
    6
    # 4.3 布尔字段
    schema.add_field(
    field_name="is_active",
    datatype=DataType.BOOL,
    default_value=True # 默认值为True
    )
    1
    2
    3
    4
    5
    # 4.4 JSON字段
    schema.add_field(
    field_name="metadata",
    datatype=DataType.JSON
    )
    1
    2
    3
    4
    5
    6
    7
    8
    # 4.5 数组字段
    schema.add_field(
    field_name="tags",
    datatype=DataType.ARRAY,
    element_type=DataType.VARCHAR, # 数组元素类型
    max_capacity=10, # 数组最大容量
    max_length=50 # 每个元素最大长度
    )
  1. 添加动态字段(Dynamic Field)

    1
    2
    3
    4
    5
    6
    schema.add_field(
    field_name="dynamic_field",
    datatype=DataType.VARCHAR,
    is_dynamic=True, # 设置为动态字段
    max_length=500
    )
  1. 使用Schema创建Collection

    1
    2
    3
    4
    5
    collection_name = "document_store10"
    client.create_collection(
    collection_name=collection_name,
    schema=schema
    )
  1. 修改Collection字段

    1
    2
    3
    4
    5
    6
    7
    8
    # 添加新字段
    client.alter_collection_field(
    collection_name=collection_name,
    field_name="tags",
    field_params={
    "max_capacity": 64
    }
    )
  1. 查看Collection详情

    1
    2
    info = client.describe_collection(collection_name=collection_name)
    print("Collection详情:", info)
  1. 清理(删除集合)

    1
    2
    client.drop_collection(collection_name=collection_name)
    print("✓ 已删除测试集合")

索引

  • 必要性:向量字段必须创建索引才能高效检索
  • 索引参数:
    • 向量字段需指定索引类型和度量类型
    • 标量字段只需指定索引类型
  • 度量类型:包括余弦相似度(COSINE)、内积(IP)、欧式距离(L2)等

创建索引示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
# Prepare index parameters
index_params = client.prepare_index_params()

# Add indexes
index_params.add_index(
field_name="my_id",
index_type="AUTOINDEX"
)
index_params.add_index(
field_name="my_vector",
index_type="AUTOINDEX",
metric_type="COSINE"
)

实体:Entity

在 Milvus 中,Entity是指Collection中共享同一个Schema的数据记录,一行中每个字段的数据构成一个 Entity。因此,同一个 Collection 内的 Entity 具有相同的属性(例如字段名、数据类型和其他约束)。

将实体插入集合 (Collection) 时,只有包含 Schema 中定义的所有字段,插入的实体才能成功添加。插入的实体将按插入顺序进入名为_default的分区 (Partition )。如果存在某个分区 (Partition),您也可以通过在插入请求中指定分区名称,将实体插入到该分区 (Partition) 中。

实体:Upsert

当你需要更新集合中的实体,或者不确定是更新还是插入时,可以尝试使用 Upsert 操作。务必确保 Upsert 请求中包含的实体包含主键,否则会报错。

image-20250819160329230

  1. 检查Collection的主字段是否启用了AutoId。
    • 如果是,Milvus 会将实体中的主键替换为自动生成的主键,并插入数据。
    • 如果没有,Milvus 将使用实体携带的主键来插入数据。
  2. 根据Upsert请求中包含的Entity的主键值执行删除操作。
  • Upsert操作:
    • 结合insert和update功能
    • 检查AutoID设置决定主键处理方式
    • 存在相同主键时先删除再插入

实体Entity操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from pymilvus import MilvusClient
import random

# 连接Miilvus
client = MilvusClient(
uri="http://localhost:19530",
# token="root:Milvus"
)

# 创建集合
# 检查集合是否存在,如果存在则删除
if client.has_collection("quick_setup"):
client.drop_collection("quick_setup")

# 创建集合
client.create_collection(
collection_name="quick_setup",
dimension=5, # vector 维度
primary_field_name="id",
vector_field_name="vector",
id_type="int"
)
  1. 插入实体

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    data=[
    {"id": 0, "vector": [0.3580376395471989, -0.6023495712049978, 0.18414012509913835, -0.26286205330961354, 0.9029438446296592], "color": "pink_8682"},
    {"id": 1, "vector": [0.19886812562848388, 0.06023560599112088, 0.6976963061752597, 0.2614474506242501, 0.838729485096104], "color": "red_7025"},
    {"id": 2, "vector": [0.43742130801983836, -0.5597502546264526, 0.6457887650909682, 0.7894058910881185, 0.20785793220625592], "color": "orange_6781"},
    {"id": 3, "vector": [0.3172005263489739, 0.9719044792798428, -0.36981146090600725, -0.4860894583077995, 0.95791889146345], "color": "pink_9298"},
    {"id": 4, "vector": [0.4452349528804562, -0.8757026943054742, 0.8220779437047674, 0.46406290649483184, 0.30337481143159106], "color": "red_4794"},
    {"id": 5, "vector": [0.985825131989184, -0.8144651566660419, 0.6299267002202009, 0.1206906911183383, -0.1446277761879955], "color": "yellow_4222"},
    {"id": 6, "vector": [0.8371977790571115, -0.015764369584852833, -0.31062937026679327, -0.562666951622192, -0.8984947637863987], "color": "red_9392"},
    {"id": 7, "vector": [-0.33445148015177995, -0.2567135004164067, 0.8987539745369246, 0.9402995886420709, 0.5378064918413052], "color": "grey_8510"},
    {"id": 8, "vector": [0.39524717779832685, 0.4000257286739164, -0.5890507376891594, -0.8650502298996872, -0.6140360785406336], "color": "white_9381"},
    {"id": 9, "vector": [0.5718280481994695, 0.24070317428066512, -0.3737913482606834, -0.06726932177492717, -0.6980531615588608], "color": "purple_4976"}
    ]

    res = client.insert(
    collection_name="quick_setup",
    data=data
    )

    print(res)
  1. 更新实体

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    update_data = [
    {"id": 0, "vector": [random.random() for _ in range(5)], "color": "updated_pink_8682"},
    {"id": 1, "vector": [random.random() for _ in range(5)], "color": "updated_red_7025"}
    ]

    res = client.upsert(
    collection_name="quick_setup",
    data=update_data
    )
    print("\n更新结果:", res)
  1. 删除实体

    1
    2
    3
    4
    5
    res = client.delete(
    collection_name="quick_setup",
    ids=[0]
    )
    print("\n删除结果:", res)
  1. 查询实体

    1
    2
    3
    4
    5
    6
    7
    8
    client.flush(collection_name="quick_setup") # 刷新内存

    res = client.query(
    collection_name="quick_setup",
    filter="id in [1,2]",
    output_fields=["id", "color"]
    )
    print("\n查询结果:", res)

其它概念

  • Partition - 分区是集合的子集,与其父集合共享相同的字段集,每个分区包含一个实体子集。通过将实体分配到不同的分区,您可以创建实体组。您可以在特定分区内进行搜索和查询,让Milvus 忽略其他分区中的实体,从而提高搜索效率。
  • Shard - 分片是集合的水平切片。每个分片对应一个数据输入通道。每个集合默认都有一个分片。您可以根据预期吞吐量和要插入到集合中的数据量,在创建集合时设置适当的分片数量。。
  • Alias - 您可以为集合创建别名。一个集合可以有多个别名,但集合之间不能共享一个别名。收到针对某个集合的请求后,Milvus 会根据提供的名称查找该集合。
  • Function - 您可以在 Milvus 创建集合时设置函数来导出字段。例如,全文搜索功能使用用户自定义函数从特定的 varchar 字段导出稀疏向量字段。
  • Consistency Level - 分布式数据库系统通常使用一致性级别来定义跨数据节点和副本的数据相同性。可以在创建集合或在集合内进行相似性搜索时设置单独的一致性级别。适用的一致性级别包括Strong, Bounded Staleness, Session, Eventually

数据一致性

一致性级别 含义 特性 典型场景
Strong(一致性) 任何读操作都能立刻看到所有已完成写操作的最新数据,保证“线性化” • 最高读写一致性
• 延迟和吞吐可能受限于副本同步速度
• 对数据准确性要求极高的业务,如金融交易、订单支付
• 写入与读取强耦合的场景
Bounded Staleness(有界时滞一致性) 读操作可能看到最新写操作之前的旧数据,但“落后量”可控 • 最长时延或版本差可配置(如最多滞后5秒或3个版本)
• 在可接受延迟范围内兼顾性能与一致性
• 社交媒体时序数据显示,只要误差在几秒以内可接受
• 仪表盘、监控类场景
Session(会话一致性) 同一客户端会话内,读写操作按照客户端顺序一致;跨会话可见性无保证 • 保证“读已所写”、“读后写”、“写后读”
• 不同客户端之间可能看到不同版本
• 用户个性化操作:自己写入的数据自己即刻可见
• 聊天、个人设置等场景
Eventually(最终一致性) 写操作会被异步传播到所有副本,短期内读到旧数据,最终收敛到一致 • 写入延迟最低、吞吐最高
• 不保证任何时刻的一致性,只保证最终一致
• 海量日志收集、埋点数据、离线分析场景
• 缓存回源、CDN数据分发
  • Strong(强一致性):
    • 任何读都能看到最新写结果
    • 适用于金融交易等高要求场景
  • Bounded Staleness(有界时滞):
    • 默认级别,落后量可控(如5秒内)
    • 兼顾性能与一致性
  • Session(会话一致性):
    • 保证同一会话内的读写顺序
    • 适用于个性化操作场景
  • Eventually(最终一致性):
    • 写入延迟最低,最终收敛一致
    • 适用于日志收集等场景