01-01 hello-sql执行流程
该示例提供了一个将Sql解析为语法树并返回select 'hello';
中字符串的逻辑
其核心逻辑如下:
pub fn run(&self, sql: &str) -> Result<Vec<String>, Error> {
// parse -- 借用开源的PostgreSqlDialect进行Sql的解析
// 来自 sqlparser-0.13.0 解析为Statement类型
// 返回的是一个list的原因是一个Sql可能包含多条语句
let stmts = parse(sql)?;
let mut outputs = vec![];
// 这里直接对每条Sql的输出进行打印. 没有做跨Sql语句的变量什么的定义
for stmt in stmts {
debug!("execute: {:#?}", stmt);
// 实际的执行函数
let output = execute(&stmt);
// 插入结果列表
outputs.extend(output);
}
Ok(outputs)
}
/*
pub use sqlparser::ast::*;
use sqlparser::dialect::PostgreSqlDialect;
use sqlparser::parser::Parser;
pub use sqlparser::parser::ParserError;
*/
pub fn parse(sql: &str) -> Result<Vec<Statement>, ParserError> {
let dialect = PostgreSqlDialect {};
Parser::parse_sql(&dialect, sql)
}
// 将AST中的对应字符串解析为output
pub fn execute(stmt: &Statement) -> Result<String, ExecuteError> {
match stmt {
// 匹配Query类型Sql
Statement::Query(query) => match &query.body {
// 匹配Select类型Sql
SetExpr::Select(select) => {
let mut output = String::new();
// projection字段指的是 select xxx 中的字段名或者常量
for item in &select.projection {
// 为每个字段间添加空格
write!(output, " ").unwrap();
match item {
SelectItem::UnnamedExpr(Expr::Value(v)) => match v {
// 支持输出为单引号的字符串
Value::SingleQuotedString(s) => write!(output, "{}", s).unwrap(),
// 支持输出数值类型
Value::Number(s, _) => write!(output, "{}", s).unwrap(),
_ => todo!("not supported statement: {:#?}", stmt),
},
_ => todo!("not supported statement: {:#?}", stmt),
}
}
return Ok(output.trim().to_string());
}
_ => todo!("not supported statement: {:#?}", stmt),
},
_ => todo!("not supported statement: {:#?}", stmt),
}
}
# 对于文件结构的理解
.
├── Cargo.toml
└── src
├── db.rs # DB对象处理输入Sql字符串到生产结果字符串的大致流程 sql -> AST -> res
├── executor.rs # 执行流程 AST -> res
├── lib.rs # 维护依赖, 在lib.rs中创建了引用的mod以及Database, Error类型, 效果类似C中的header文件
├── main.rs # 主函数, 用于维护死循环和创建DB实例
├── parser.rs # 负责使用sqlparser组件将sql String -> AST的组件
└── test.rs # 自动测试脚本, 初始化脚本, 读取sqllogictest脚本, 使用sqllogictest进行Sql逻辑的测试
01-02 catalog定义
该任务是定义table, view, column, type, index, schema
的数据结构
# 项目结构分析
.
├── Cargo.toml
└── src
├── catalog # 定义DB中存储内容的基本逻辑的包
│ ├── column.rs # 两个公共类型:
# ColumnDesc 描述一个列的信息包括DataType和是否为主键
# ColumnCatalog 描述列Id(序号)列名称和ColumnDesc的结构, 其维护了这些信息之间的绑定
│ ├── database.rs # 一个公共类型:
# DatabaseCatalog 描述一个DB的元信息, 其是对于Inner对象的加锁.
# 相当于维护了一个syncrolized的变量, 用于存放 Schema 信息
# (1) 维护了三个字段:
# schema名称到index的映射: schema_idxs: HashMap<String, SchemaId>,
# schemaId到Schema内容的映射: schemas: HashMap<SchemaId, Arc<SchemaCatalog>>,
# DataBase粒度自增的下一个SchemaId: next_schema_id: SchemaId,
# (2) 特殊类型:
# std::sync::mutex::Mutex : 用于对于对象加锁的元语
# alloc::sync::Arc : 线程安全的对象共享指针,
# 会统计该对象的被引用和释放的次数,如果为0则会释放对象
# (3) 提供的能力:
# 1> 根据输入的名字创建一个将名字和SchemaId绑定的空Schema
# 2> 删除名字指定的Schema -- 删除Map
# 3> 显示所有的Schema的信息 -- 直接将内部存储的schemaMap复制了一份
# 4> 根据输入的SchemaId信息获取对应的SchemaCatalog信息
# 5> 根据名字获取对应的SchemaCatalog信息
# 6> 根据TableRefId先获取对应Schema再获取对应TableCatalog信息
│ ├── mod.rs # 两个公共类: TableRefId, ColumnRefId
# 这两个公共类是通过存储 SchemaId + TableId 和 SchemaId + TableId + ColumnId 实现的
# 对于Table和Column的全局唯一Id
│ ├── schema.rs # 一个公共类: SchemaCatalog
# 用于描述Schema, 一个Schema中可以有多个Table
# 对于Schema的内部信息是线程加锁了的
# (1) 字段信息:
# schemaId:u32
# name:String
# name -> tableId; tableId -> TableCatalog 的两个Map
# Schema粒度自增的TableId
# (2) 函数:
# new: 创建; id: 当前SchemaId; name: 当前SchemaName;
# add_table, del_table_by_name, del_table,
# all_tables, get_table, get_table_by_name 其他这些函数顾名思义即可
│ └── table.rs # 一个公共类: TableCatalog
# (1) 字段信息
# id, name, next_column_id(Table粒度自增Id)
# Name->ColumnId, ColumnId->ColumnCatalog 两个Map
├── db.rs # 未作任何修改
├── executor.rs # 未作任何修改
├── lib.rs # 添加对于catalog包和types类的依赖
├── main.rs # 未作任何修改
├── parser.rs # 未作任何修改
├── test.rs # 未作任何修改
└── types.rs # 对于sqlparser::ast::DataType的扩展, 为其添加了nullable配置, 用于表示DB支持的数据类型
-
重要实现:
-
数据加锁:
pub struct DatabaseCatalog { inner: Mutex<Inner>, // 定义了锁 } pub fn add_schema(&self, name: &str) -> Result<SchemaId, CatalogError> { // 获取对应的锁 let mut inner = self.inner.lock().unwrap(); if inner.schema_idxs.contains_key(name) { return Err(CatalogError::Duplicated("schema", name.into())); } let id = inner.next_schema_id; inner.next_schema_id += 1; let schema_catalog = Arc::new(SchemaCatalog::new(id, name.into())); inner.schema_idxs.insert(name.into(), id); inner.schemas.insert(id, schema_catalog); Ok(id) // 当对象被释放的时候锁自动释放 }
-
Map数据结构的使用
schema_idxs.insert(name.into(), id); // 串行操作, 从schema_idxs中获取schemaId并查询对应Schema. schema_idxs.get(name).and_then(|id| inner.schemas.get(id));
-
Lambda使用
|id| inner.schemas.get(id) // 等效于以下函数 fn funct(id:&u32) -> Option<&Arc<SchemaCatalog>>{ return inner.schemas.get(id); }
-
Option的使用
let a:Option<&Arc<SchemaCatalog>> = xxx; let b:Option<Arc<SchemaCatalog>> = a.cloned() // 将Option中的变量拷贝一份后输出
-
Arc的使用
// 创建 let schema_catalog = Arc::new(SchemaCatalog::new(id, name.into())); // 使用 -- 直接和正常对象执行的方式一致 schema_catalog.get_table(table_ref_id.table_id)
-
01-03 创建表
.
├── Cargo.toml
└── src
├── binder
│ ├── mod.rs # 两个公共类和一个异常类型
# BoundStatement: 定义了现在可以被绑定的Sql类型的枚举: CreateTable, Select
# Binder: 绑定DataBaseCatalog的类型, 是statement中使用catalog数据的句柄
# BindError: 绑定过程中出现的异常
│ └── statement
│ ├── create_table.rs # 一个公共类和一个Binder函数
# BoundCreateTable 定义了Create操作后的返回值类型
# Binder::bind_create_table 定义了如何将sqlparser中的参数创建
# 对应Catalog并绑定到当前Binder对应DataBase上的逻辑
# ColumnDesc::from 定义了如何拷贝ColumnDesc结构用于进行对象复制
│ ├── mod.rs # 引入create_table函数和select函数
│ └── select.rs # 和之前的executor的逻辑一致, 定义了对于Binder对象执行select操作的返回值
├── catalog # 定义基础数据结构
│ ├── column.rs # 无变化
│ ├── database.rs # 无变化
│ ├── mod.rs # 无变化
│ ├── schema.rs # 无变化
│ └── table.rs # 无变化
├── db.rs # 添加了Binder逻辑, 输入的Statement需要和Binder绑定后进行执行
├── executor
│ ├── create.rs # 定义了Executor的execute_create_table函数, 执行BoundCreateTable并
# 输出格式化好的字符串结果
│ ├── mod.rs # 定义了Executor, 绑定一个Database的catalog数据后可以执行BoundStatement只是做一个路由
│ └── select.rs # 定义了Executor的execute_select函数, 执行BoundSelect并格式化结果
├── lib.rs # 添加了binder依赖
├── main.rs # 无变化
├── parser.rs # 无变化
├── test.rs # 添加了对于01-03的测试脚本
└── types.rs # 无变化
-
主体逻辑梳理:
-
初始化流程中创建Binder对象, 并和全局唯一的DataBaseCatalog进行了绑定
-
sql输入Parser转化为Statement
-
Statement输入Binder和特定类型的BoundStatement进行绑定, 每个BoundStatement绑定了一个处理函数
-
将BoundStatement输入Executor, 调用该BoundStatement对应的处理函数, 并对于结果进行format
-
将结果返回给控制台输出
01-04 内存存储
. ├── Cargo.toml └── src ├── array # 定义内存存储的对应数据结构和操作 │ ├── data_chunk.rs │ ├── iter.rs │ ├── mod.rs │ ├── primitive_array.rs │ └── utf8_array.rs ├── binder │ ├── mod.rs │ └── statement │ ├── create_table.rs │ ├── mod.rs │ └── select.rs ├── catalog │ ├── column.rs │ ├── database.rs │ ├── mod.rs │ ├── schema.rs │ └── table.rs ├── db.rs ├── executor │ ├── create.rs │ ├── mod.rs │ └── select.rs ├── lib.rs ├── main.rs ├── parser.rs ├── storage.rs ├── test.rs └── types.rs
-
01-05 Insert语句的执行
.
├── Cargo.toml
└── src
├── array
│ ├── data_chunk.rs
│ ├── iter.rs
│ ├── mod.rs
│ ├── primitive_array.rs
│ └── utf8_array.rs
├── binder
│ ├── expression
│ │ └── mod.rs
│ ├── mod.rs
│ └── statement
│ ├── create_table.rs
│ ├── insert.rs
│ ├── mod.rs
│ └── select.rs
├── catalog
│ ├── column.rs
│ ├── database.rs
│ ├── mod.rs
│ ├── schema.rs
│ └── table.rs
├── db.rs
├── executor
│ ├── create.rs
│ ├── insert.rs
│ ├── mod.rs
│ ├── select.rs
│ └── values.rs
├── lib.rs
├── main.rs
├── parser.rs
├── storage.rs
├── test.rs
└── types.rs