risinglightDB tutorial 简单记录

发布时间 2023-08-02 00:29:29作者: NoobSir

01-01 hello-sql执行流程

该示例提供了一个将Sql解析为语法树并返回select 'hello';中字符串的逻辑
其核心逻辑如下:

    pub fn run(&self, sql: &str) -> Result<Vec<String>, Error> {
        // parse -- 借用开源的PostgreSqlDialect进行Sql的解析
        // 来自 sqlparser-0.13.0 解析为Statement类型
        // 返回的是一个list的原因是一个Sql可能包含多条语句
        let stmts = parse(sql)?;

        let mut outputs = vec![];
        // 这里直接对每条Sql的输出进行打印. 没有做跨Sql语句的变量什么的定义
        for stmt in stmts {
            debug!("execute: {:#?}", stmt);
            // 实际的执行函数
            let output = execute(&stmt);
            // 插入结果列表
            outputs.extend(output);
        }
        Ok(outputs)
    }

	/*
	pub use sqlparser::ast::*;
    use sqlparser::dialect::PostgreSqlDialect;
    use sqlparser::parser::Parser;
    pub use sqlparser::parser::ParserError;
	*/
    pub fn parse(sql: &str) -> Result<Vec<Statement>, ParserError> {
        let dialect = PostgreSqlDialect {};
        Parser::parse_sql(&dialect, sql)
    }

	// 将AST中的对应字符串解析为output
	pub fn execute(stmt: &Statement) -> Result<String, ExecuteError> {
        match stmt {
            // 匹配Query类型Sql
            Statement::Query(query) => match &query.body {
			  // 匹配Select类型Sql
                SetExpr::Select(select) => {
                    let mut output = String::new();
                    // projection字段指的是 select xxx 中的字段名或者常量
                    for item in &select.projection {
                        // 为每个字段间添加空格
                        write!(output, " ").unwrap();
                        match item {
                            SelectItem::UnnamedExpr(Expr::Value(v)) => match v {
                                // 支持输出为单引号的字符串
                                Value::SingleQuotedString(s) => write!(output, "{}", s).unwrap(),
							 // 支持输出数值类型
                                Value::Number(s, _) => write!(output, "{}", s).unwrap(),
                                _ => todo!("not supported statement: {:#?}", stmt),
                            },
                            _ => todo!("not supported statement: {:#?}", stmt),
                        }
                    }
                    return Ok(output.trim().to_string());
                }
                _ => todo!("not supported statement: {:#?}", stmt),
            },
            _ => todo!("not supported statement: {:#?}", stmt),
        }
    }
# 对于文件结构的理解
.
├── Cargo.toml
└── src
    ├── db.rs # DB对象处理输入Sql字符串到生产结果字符串的大致流程 sql -> AST -> res
    ├── executor.rs # 执行流程 AST -> res
    ├── lib.rs # 维护依赖, 在lib.rs中创建了引用的mod以及Database, Error类型, 效果类似C中的header文件
    ├── main.rs # 主函数, 用于维护死循环和创建DB实例
    ├── parser.rs # 负责使用sqlparser组件将sql String -> AST的组件
    └── test.rs # 自动测试脚本, 初始化脚本, 读取sqllogictest脚本, 使用sqllogictest进行Sql逻辑的测试

01-02 catalog定义

该任务是定义table, view, column, type, index, schema的数据结构

# 项目结构分析
.
├── Cargo.toml
└── src
    ├── catalog # 定义DB中存储内容的基本逻辑的包
    │   ├── column.rs # 两个公共类型: 
    				# ColumnDesc 描述一个列的信息包括DataType和是否为主键
    				# ColumnCatalog 描述列Id(序号)列名称和ColumnDesc的结构, 其维护了这些信息之间的绑定
    │   ├── database.rs # 一个公共类型: 
    				# DatabaseCatalog 描述一个DB的元信息, 其是对于Inner对象的加锁. 
    				# 相当于维护了一个syncrolized的变量, 用于存放 Schema 信息
    				# (1) 维护了三个字段: 
    				# 	schema名称到index的映射: schema_idxs: HashMap<String, SchemaId>,
    				# 	schemaId到Schema内容的映射: schemas: HashMap<SchemaId, Arc<SchemaCatalog>>,
    				# 	DataBase粒度自增的下一个SchemaId: next_schema_id: SchemaId,
    				# (2) 特殊类型: 
    				# 	std::sync::mutex::Mutex : 用于对于对象加锁的元语
    				# 	alloc::sync::Arc : 线程安全的对象共享指针,
    				# 		会统计该对象的被引用和释放的次数,如果为0则会释放对象
    				# (3) 提供的能力: 
    				# 	1> 根据输入的名字创建一个将名字和SchemaId绑定的空Schema
    				# 	2> 删除名字指定的Schema -- 删除Map
    				# 	3> 显示所有的Schema的信息 -- 直接将内部存储的schemaMap复制了一份
    				# 	4> 根据输入的SchemaId信息获取对应的SchemaCatalog信息
    				# 	5> 根据名字获取对应的SchemaCatalog信息
    				# 	6> 根据TableRefId先获取对应Schema再获取对应TableCatalog信息
    │   ├── mod.rs # 两个公共类: TableRefId, ColumnRefId
    			# 这两个公共类是通过存储 SchemaId + TableId 和 SchemaId + TableId + ColumnId 实现的
    			# 对于Table和Column的全局唯一Id
    │   ├── schema.rs # 一个公共类: SchemaCatalog
    				# 用于描述Schema, 一个Schema中可以有多个Table
    				# 对于Schema的内部信息是线程加锁了的
    				# (1) 字段信息:
    				# 	schemaId:u32
    				#	name:String
    				#	name -> tableId; tableId -> TableCatalog 的两个Map
    				#	Schema粒度自增的TableId
    				# (2) 函数:
    				# 	new: 创建; id: 当前SchemaId; name: 当前SchemaName; 
    				# 	add_table, del_table_by_name, del_table, 
    				#	all_tables, get_table, get_table_by_name 其他这些函数顾名思义即可
    │   └── table.rs # 一个公共类: TableCatalog
    				# (1) 字段信息
    				# 	id, name, next_column_id(Table粒度自增Id)
    				#	Name->ColumnId, ColumnId->ColumnCatalog 两个Map
    				
    ├── db.rs # 未作任何修改
    ├── executor.rs # 未作任何修改
    ├── lib.rs # 添加对于catalog包和types类的依赖
    ├── main.rs # 未作任何修改
    ├── parser.rs # 未作任何修改
    ├── test.rs # 未作任何修改
    └── types.rs # 对于sqlparser::ast::DataType的扩展, 为其添加了nullable配置, 用于表示DB支持的数据类型
  1. 重要实现:

    1. 数据加锁:

      pub struct DatabaseCatalog {
          inner: Mutex<Inner>, // 定义了锁
      }
      
          pub fn add_schema(&self, name: &str) -> Result<SchemaId, CatalogError> {
              // 获取对应的锁
              let mut inner = self.inner.lock().unwrap();
              if inner.schema_idxs.contains_key(name) {
                  return Err(CatalogError::Duplicated("schema", name.into()));
              }
              let id = inner.next_schema_id;
              inner.next_schema_id += 1;
              let schema_catalog = Arc::new(SchemaCatalog::new(id, name.into()));
              inner.schema_idxs.insert(name.into(), id);
              inner.schemas.insert(id, schema_catalog);
              Ok(id)
              // 当对象被释放的时候锁自动释放
          }
      
    2. Map数据结构的使用

      schema_idxs.insert(name.into(), id);
      // 串行操作, 从schema_idxs中获取schemaId并查询对应Schema. 
      schema_idxs.get(name).and_then(|id| inner.schemas.get(id));
      
    3. Lambda使用

      |id| inner.schemas.get(id)
      // 等效于以下函数
      fn funct(id:&u32) -> Option<&Arc<SchemaCatalog>>{
          return inner.schemas.get(id);
      }
      
    4. Option的使用

      let a:Option<&Arc<SchemaCatalog>> = xxx;
      let b:Option<Arc<SchemaCatalog>> = a.cloned() // 将Option中的变量拷贝一份后输出
      
    5. Arc的使用

      // 创建
      let schema_catalog = Arc::new(SchemaCatalog::new(id, name.into()));
      // 使用 -- 直接和正常对象执行的方式一致
      schema_catalog.get_table(table_ref_id.table_id)
      

01-03 创建表

.
├── Cargo.toml
└── src
    ├── binder
    │   ├── mod.rs # 两个公共类和一个异常类型
    			# BoundStatement: 定义了现在可以被绑定的Sql类型的枚举: CreateTable, Select
    			# Binder: 绑定DataBaseCatalog的类型, 是statement中使用catalog数据的句柄
    			# BindError: 绑定过程中出现的异常
    │   └── statement
    │       ├── create_table.rs # 一个公共类和一个Binder函数
    						# BoundCreateTable 定义了Create操作后的返回值类型
    						# Binder::bind_create_table 定义了如何将sqlparser中的参数创建
    						# 	对应Catalog并绑定到当前Binder对应DataBase上的逻辑
    						# ColumnDesc::from 定义了如何拷贝ColumnDesc结构用于进行对象复制
    │       ├── mod.rs # 引入create_table函数和select函数
    │       └── select.rs # 和之前的executor的逻辑一致, 定义了对于Binder对象执行select操作的返回值
    ├── catalog # 定义基础数据结构
    │   ├── column.rs # 无变化
    │   ├── database.rs # 无变化
    │   ├── mod.rs # 无变化
    │   ├── schema.rs # 无变化
    │   └── table.rs # 无变化
    ├── db.rs # 添加了Binder逻辑, 输入的Statement需要和Binder绑定后进行执行
    ├── executor
    │   ├── create.rs # 定义了Executor的execute_create_table函数, 执行BoundCreateTable并
    				# 输出格式化好的字符串结果
    │   ├── mod.rs # 定义了Executor, 绑定一个Database的catalog数据后可以执行BoundStatement只是做一个路由
    │   └── select.rs # 定义了Executor的execute_select函数, 执行BoundSelect并格式化结果
    ├── lib.rs # 添加了binder依赖
    ├── main.rs # 无变化
    ├── parser.rs # 无变化
    ├── test.rs # 添加了对于01-03的测试脚本
    └── types.rs # 无变化
  1. 主体逻辑梳理:

    • 初始化流程中创建Binder对象, 并和全局唯一的DataBaseCatalog进行了绑定

    • sql输入Parser转化为Statement

    • Statement输入Binder和特定类型的BoundStatement进行绑定, 每个BoundStatement绑定了一个处理函数

    • 将BoundStatement输入Executor, 调用该BoundStatement对应的处理函数, 并对于结果进行format

    • 将结果返回给控制台输出

    01-04 内存存储

    .
    ├── Cargo.toml
    └── src
        ├── array # 定义内存存储的对应数据结构和操作
        │   ├── data_chunk.rs
        │   ├── iter.rs
        │   ├── mod.rs
        │   ├── primitive_array.rs
        │   └── utf8_array.rs
        ├── binder
        │   ├── mod.rs
        │   └── statement
        │       ├── create_table.rs
        │       ├── mod.rs
        │       └── select.rs
        ├── catalog
        │   ├── column.rs
        │   ├── database.rs
        │   ├── mod.rs
        │   ├── schema.rs
        │   └── table.rs
        ├── db.rs
        ├── executor
        │   ├── create.rs
        │   ├── mod.rs
        │   └── select.rs
        ├── lib.rs
        ├── main.rs
        ├── parser.rs
        ├── storage.rs
        ├── test.rs
        └── types.rs
    

01-05 Insert语句的执行

.
├── Cargo.toml
└── src
    ├── array
    │   ├── data_chunk.rs
    │   ├── iter.rs
    │   ├── mod.rs
    │   ├── primitive_array.rs
    │   └── utf8_array.rs
    ├── binder
    │   ├── expression
    │   │   └── mod.rs
    │   ├── mod.rs
    │   └── statement
    │       ├── create_table.rs
    │       ├── insert.rs
    │       ├── mod.rs
    │       └── select.rs
    ├── catalog
    │   ├── column.rs
    │   ├── database.rs
    │   ├── mod.rs
    │   ├── schema.rs
    │   └── table.rs
    ├── db.rs
    ├── executor
    │   ├── create.rs
    │   ├── insert.rs
    │   ├── mod.rs
    │   ├── select.rs
    │   └── values.rs
    ├── lib.rs
    ├── main.rs
    ├── parser.rs
    ├── storage.rs
    ├── test.rs
    └── types.rs