DB-GPT项目其实我们一直都走的比较坚定。 在今年4月初我们定下来整个技术架构到现在,都没有多大的变化。 我们一直坚持想做一件事就是: “Revolutionizing Database Interactions with Private LLM Technology”。对”用大模型技术定义数据库下一代交互方式”。
1. 大模型交互三层架构
通用模型能解决所有问题吗? 需不需要领域模型? 面向未来,多模型之间是怎么协作交互的? 这几个问题其实我们一直都在思考。但截至目前,跟我们5个月之前的认知也没太大的变化。 那就是以后应用落地需要三层的大模型交互架构。 如下图所示,从上往下依次是- 通用大语言模型(> 100B)
- 领域大语言模型(10~70B)
- 工具类模型(<10B)


2. DB-GPT的架构与特点
下图是DB-GPT的架构图,整体结构比较简单。左侧是知识(RAG),右侧是工具(Agents), 中间是多模型管理(SMMF),同时增加了向量存储这样的大模型记忆体,以及各类数据源,在往上是一层通用的交互层面。
关键特性
- 私域问答&数据处理&RAG
- 多数据源&GBI
- 多模型管理
- 自动化微调
- Data-Driven Multi-Agents&Plugins
- 隐私安全
3.面向未来
经过这一年的折腾与抽象,我们将DB-GPT未来的架构进行了分层。如下图所示,主要分为以下7层,自上而下以此为:- 可视化层: 可视化层主要的工作是对话、交互、图表显示、可视化编排等能力。
- 应用层: 基于底层能力的应用构建,如GBI应用、ChatDB类应用、ChatData类应用、ChatExcel类应用等。
- 服务层: 服务层主要是对外暴露的服务,比如LLMServer、APIServer、RAGServer、dbgptserver等
- 核心模块层: 核心模块主要有三个分别是,SMMF、RAGs、Agents
- 协议层:协议层主要是指AWEL(Agentic Workflow Expression Language), 即智能体编排语言,是专门为大模型应用开发设计的智能体工作流表达式语言。
- 训练层: 训练层会主要关注Text2SQL、Text2DSL、Text2API方向的微调,提供标准的微调脚手架。
- 运行环境: 运行环境是指整个框架的运行在什么环境当中,我们后期会优先支持基于Ray与Kubernetes的环境。

3.1 多模型管理(SMMF)
前面我们有提到说面向未来是通用大模型、领域模型、小模型一起交互协作的。 那就特别需要对对模型进行管理,而且模型需要是一个个随时可调用的服务。在我们的框架里面叫Service-oriented Multi-model Management Framework(SMMF) 服务化多模型管理框架。 多模型管理框架的本质其实就是模型服务的Serverless化。
关于此部分内容更详细的介绍,可以参考文档:多模型介绍 ### 3.2 MS-RAG
import openaiopenai.api_key = "EMPTY"openai.api_base = "http://127.0.0.1:8100/api/v1"model = "vicuna-13b-v1.5"completion = openai.ChatCompletion.create(model=model,messages=[{"role": "user", "content": "hello"}])# print the completionprint(completion.choices[0].message.content)
MS-RAG指的是多文档检索增强能力,这部分也是DB-GPT中的一个基础核心模块。 当前MS-RAG实现了基本的增强检索的操作,并对多文档、多源数据检索场景做了非常多的定制优化。整个流程中涵盖了知识构建、知识检索、答案生成全链路的能力。关于此部分更细节的内容,我这里暂且按下不表,后面会有专门的文章进行介绍。
3.3 Agents



3.4 AWEL
AWEL(Agentic Workflow Expression Language)是一套专为大模型应用开发设计的智能体工作流表达语言,它提供了强大的功能和灵活性。通过 AWEL API 您可以专注于大模型应用业务逻辑的开发,而不需要关注繁琐的模型和环境细节,AWEL 采用分层 API 的设计, AWEL 的分层 API 设计架构如下图所示:
AWEL分层设计
AWEL在设计上分为三个层次,依次为算子层、AgentFrame层以及DSL层,以下对三个层次做简要介绍。- 算子层
- AgentFream层
- DSL层
使用案例
AWEL初步的版本也已经在V0.4.2发布,我们内置提供了一些使用样例。算子层API-RAG例子
源码在项目中位置 examples/awel/simple_rag_example.py
位运算会将整个过程以DAG的形式编排。
with DAG("simple_rag_example") as dag:trigger_task = HttpTrigger("/examples/simple_rag", methods="POST", request_body=ConversationVo)req_parse_task = RequestParseOperator()# TODO should register prompt template firstprompt_task = PromptManagerOperator()history_storage_task = ChatHistoryStorageOperator()history_task = ChatHistoryOperator()embedding_task = EmbeddingEngingOperator()chat_task = BaseChatOperator()model_task = ModelOperator()output_parser_task = MapOperator(lambda out: out.to_dict()["text"])(trigger_task>> req_parse_task>> prompt_task>> history_storage_task>> history_task>> embedding_task>> chat_task>> model_task>> output_parser_task)

算子层API调用模型+缓存例子

AgentFream层API样例
af = AgentFream(HttpSource("/examples/run_code", method = "post"))result = (af.text2vec(model="text2vec").filter(vstore, store = "chromadb", db="default").llm(model="vicuna-13b", temperature=0.7).map(code_parse_func).map(run_sql_func).reduce(lambda a, b: a + b))result.write_to_sink(type='source_slink')
DSL层API样例
DSL 采用ANTLR4 / Lark解析器
CREATE WORKFLOW RAG ASBEGINDATA requestData = RECEIVE REQUEST FROMhttp_source("/examples/rags", method = "post");DATA processedData = TRANSFORM requestData USING embedding(model = "text2vec");DATA retrievedData = RETRIEVE DATAFROM vstore(database = "chromadb", key = processedData)ON ERROR FAIL;DATA modelResult = APPLY LLM "vicuna-13b"WITH DATA retrievedData AND PARAMETERS (temperature = 0.7)ON ERROR RETRY 2 TIMES;RESPOND TO http_source WITH modelResultON ERROR LOG "Failed to respond to request";END;
3.5 Train
作为DB-GPT项目的一部分,我们提供了Text2SQL微调相关的代码,目前已经按照独立的pypi包发布,可以直接安装进行使用。
pip install dbgpt_hub
查看模型基线
from dbgpt_hub.baseline import show_scoresshow_scores()

微调
更多信息可以参考我们的开源代码 ### 3.6 Benchmark 简介:所有用户,相同 prompt (内容、input length 和 output length 相同),同一时间发起推理请求,在同一硬件条件下的首字延迟、推理延迟和吞吐量。
from dbgpt_hub.data_process import preprocess_sft_datafrom dbgpt_hub.train import start_sftfrom dbgpt_hub.predict import start_predictfrom dbgpt_hub.eval import start_evaluatedata_folder = "dbgpt_hub/data"data_info = [{"data_source": "spider","train_file": ["train_spider.json", "train_others.json"],"dev_file": ["dev.json"],"tables_file": "tables.json","db_id_name": "db_id","is_multiple_turn": False,"train_output": "spider_train.json","dev_output": "spider_dev.json",}]train_args = {"model_name_or_path": "codellama/CodeLlama-13b-Instruct-hf","do_train": True,"dataset": "example_text2sql_train","max_source_length": 2048,"max_target_length": 512,"finetuning_type": "lora","lora_target": "q_proj,v_proj","template": "llama2","lora_rank": 64,"lora_alpha": 32,"output_dir": "dbgpt_hub/output/adapter/CodeLlama-13b-sql-lora","overwrite_cache": True,"overwrite_output_dir": True,"per_device_train_batch_size": 1,"gradient_accumulation_steps": 16,"lr_scheduler_type": "cosine_with_restarts","logging_steps": 50,"save_steps": 2000,"learning_rate": 2e-4,"num_train_epochs": 8,"plot_loss": True,"bf16": True,}predict_args = {"model_name_or_path": "codellama/CodeLlama-13b-Instruct-hf","template": "llama2","finetuning_type": "lora","checkpoint_dir": "dbgpt_hub/output/adapter/CodeLlama-13b-sql-lora","predict_file_path": "dbgpt_hub/data/eval_data/dev_sql.json","predict_out_dir": "dbgpt_hub/output/","predicted_out_filename": "pred_sql.sql",}evaluate_args = {"input": "./dbgpt_hub/output/pred/pred_sql_dev_skeleton.sql","gold": "./dbgpt_hub/data/eval_data/gold.txt","gold_natsql": "./dbgpt_hub/data/eval_data/gold_natsql2sql.txt","db": "./dbgpt_hub/data/spider/database","table": "./dbgpt_hub/data/eval_data/tables.json","table_natsql": "./dbgpt_hub/data/eval_data/tables_for_natsql2sql.json","etype": "exec","plug_value": True,"keep_distict": False,"progress_bar_for_each_datapoint": False,"natsql": False,}preprocess_sft_data(data_folder = data_folder,data_info = data_info)start_sft(train_args)start_predict(predict_args)start_evaluate(evaluate_args)
指标
1. 首字延迟(First Token Latency)
单位:毫秒 从 DB-GPT 模型部署框架接收到模型推理请求到推理框架得到第一个 token 的时间(prefill 时间 + 第一个decoding token 的时间)延迟(**Latency**)
单位:毫秒 从 DB-GPT 模型部署框架接收到模型推理请求到生成完整的响应的时间。2. 吞吐量(**Throughput**)
单位:tokens 每秒 DB-GPT 模型部署框架每秒中处理的所有用户和所有请求的 token 数量。硬件配置
GPU: NVIDIA A100-PCIE-40GB * 1
CPU: Intel Xeon Processor (Skylake, IBRS) 40核80线程 2992.953 MHz
内存:629 GB
硬盘: 1T HDD
推理框架和模型
推理框架:vllm 和 huggingface transformers 模型:Qwen-7b-Chat、Qwen-14b-ChatVLLM

hg

Benchmark代码地址: https://github.com/eosphoros-ai/DB-GPT/blob/main/dbgpt/util/benchmarks
async def run_model(wh: WorkerManager) -> None:global result_csv_fileif not result_csv_file:result_csv_file = get_result_csv_file()if os.path.exists(result_csv_file):now = datetime.now()now_str = now.strftime("%Y-%m-%d")os.rename(result_csv_file, f"{result_csv_file}.bak_{now_str}.csv")for parallel_num in parallel_nums:for input_len, output_len in zip(input_lens, output_lens):try:await run_batch(wh, input_len, output_len, parallel_num, result_csv_file)except Exception:msg = traceback.format_exc()logging.error(f"Run benchmarks error, input_len: {input_len}, output_len: {output_len}, parallel_num: {parallel_num}, error message: {msg}")if "torch.cuda.OutOfMemoryError" in msg:returnsys.exit(0)async def run_batch(wh: WorkerManager,input_len: int,output_len: int,parallel_num: int,output_file: str,):tasks = []prompt = read_prompt_from_file("11k")if model_type == "vllm":max_input_str_len = input_lenif "baichuan" in model_name:# TODO prompt handle firstmax_input_str_len *= 2prompt = prompt[-max_input_str_len:]# Warmup firstparams = build_param(input_len, output_len, prompt, system_prompt="")await wh.generate(params)for _ in range(parallel_num):params = build_param(input_len, output_len, prompt, system_prompt="")tasks.append(wh.generate(params))print(f"Begin run benchmarks, model name: {model_name}, input_len: {input_len}, output_len: {output_len}, parallel_num: {parallel_num}, save result to {output_file}")start_time_ms = time.time_ns() // 1_000_000results: List[ModelOutput] = await asyncio.gather(*tasks)end_time_ms = time.time_ns() // 1_000_000test_time_cost_ms = end_time_ms - start_time_mstest_total_tokens = 0first_token_latency_ms = 0latency_ms = 0gpu_nums = 0avg_gpu_mem = 0rows = []for r in results:metrics = r.metricsif isinstance(metrics, dict):metrics = ModelInferenceMetrics(**metrics)print(r)test_total_tokens += metrics.total_tokensfirst_token_latency_ms += metrics.first_token_time_ms - metrics.start_time_mslatency_ms += metrics.end_time_ms - metrics.start_time_msrow_data = metrics.to_dict()del row_data["collect_index"]if "avg_gpu_infos" in row_data:avg_gpu_infos = row_data["avg_gpu_infos"]gpu_nums = len(avg_gpu_infos)avg_gpu_mem = (sum(i["allocated_memory_gb"] for i in avg_gpu_infos) / gpu_nums)del row_data["avg_gpu_infos"]del row_data["current_gpu_infos"]rows.append(row_data)avg_test_speed_per_second = test_total_tokens / (test_time_cost_ms / 1000.0)avg_first_token_latency_ms = first_token_latency_ms / len(results)avg_latency_ms = latency_ms / len(results)with open(output_file, "a", newline="", encoding="utf-8") as f:writer = csv.DictWriter(f, fieldnames=METRICS_HEADERS)if f.tell() == 0:# Fist timewriter.writeheader()for row in rows:row["model_name"] = model_namerow["parallel_nums"] = parallel_numrow["input_length"] = input_lenrow["output_length"] = output_lenrow["test_time_cost_ms"] = test_time_cost_msrow["test_total_tokens"] = test_total_tokensrow["avg_test_speed_per_second(tokens/s)"] = avg_test_speed_per_secondrow["avg_first_token_latency_ms"] = avg_first_token_latency_msrow["avg_latency_ms"] = avg_latency_msrow["gpu_nums"] = gpu_numsrow["gpu_mem(GiB)"] = avg_gpu_memwriter.writerow(row)print(f"input_len: {input_len}, output_len: {output_len}, parallel_num: {parallel_num}, save result to {output_file}")
