【一】引入
- 现在的服务基本是分布式,微服务形式的,而且大数据量也导致分库分表的产生,对于水平分表就需要保证表中 id 的全局唯一性。
- 对于 MySQL 而言,一个表中的主键 id 一般使用自增的方式,但是如果进行水平分表之后,多个表中会生成重复的 id 值。
- 那么如何保证水平分表后的多张表中的 id 是全局唯一性的呢?
- 如果还是借助数据库主键自增的形式,那么可以让不同表初始化一个不同的初始值,然后按指定的步长进行自增。
- 例如有3张拆分表,初始主键值为1,2,3,自增步长为3。
- 当然也有人使用 UUID 来作为主键,但是 UUID 生成的是一个无序的字符串,对于 MySQL 推荐使用增长的数值类型值作为主键来说不适合。
- 也可以使用 Redis 的自增原子性来生成唯一 id,但是这种方式业内比较少用。
- 当然还有其他解决方案,不同互联网公司也有自己内部的实现方案。
- 雪花算法是其中一个用于解决分布式 id 的高效方案,也是许多互联网公司在推荐使用的。
【二】SnowFlake 雪花算法
【1】雪花算法
- 雪花算法是一种分布式全局唯一ID
- 一般不需要过多的深入了解,一般个人项目用不到分布式之类的大型架构
- 另一方面,则是因为,就算用到市面上很多 ID 生成器帮我们完成了这项工作。
【2】介绍
- Twitter 于 2010 年开源了内部团队在用的一款全局唯一 ID 生成算法 Snowflake,翻译过来叫做雪花算法。
- Snowflake 不借助数据库,可直接由编程语言生成,它通过巧妙的位设计使得 ID 能够满足递增属性,且生成的 ID 并不是依次连续的。
【3】原理及介绍
- Snowflake 是 Twitter 提出的一个算法,其目的是生成一个64位的整数;
- 64位的分布图如下图所示:
- 1 bit
- 一般是符号位,不做处理。
- 41bit
- 用来记录时间戳
- 这里可以记录69年,如果设置好起始时间
- 比如今年是 2023 ,那么可以用到 2092 年,到时候怎么办,这个系统要是能够使用 69 年,估计系统早已经优化过很多次了。
- 10bit
- 用来记录机器ID
- 总共可以记录1024台机器
- 一般用前5位代表数据中心,后面5位是某个数据中心的机器ID。
- (注:位数可以根据情况进行设定。)
- 12bit
- 循环用来对同一个毫秒内产生的不同的 ID
- 12位可以最多记录4095(212-1)次,多余的需要在下一毫秒进行处理。
-
上面是一个将64bit划分标准
- 当然也不一定这么做,可以根据不同的业务的具体场景进行行划分
- 例如:
-
服务目前QPS10万,预计几年之内会发展到百万。
-
当前机器三地部署,上海,北京,深圳都有。
-
当前机器10台左右,预计未来会增加至百台。
-
-
这个时候我们根据上面的场景可以再次合理的划分62 bit
- QPS 几年之内会发展到百万,那么每毫秒就是千级的请求
- 目前 10 台机器那么每台机器承担百级的请求
- 为了保证扩展,后面的循环位可以限制到 1024,也就是2^10,那么循环位10位就足够了
-
机器三地部署我们可以用3bit总共8来表示机房位置
- 当前的机器10台,为了保证扩展到百台那么可以用7bit 128来表示,时间位依然是41bit,那么还剩下64-10-3-7-41-1 = 2bit,还剩下2bit可以用来进行扩展。
- 时钟回拨
- 因为机器的原因会发生时间回拨,雪花算法是强依赖时间的
- 如果发生时间回拨,有可能会发生重复ID
- 在我们上面的
nextId
中我们用当前时间和上一次时间进行判断- 如果当前时间小于上一次的时间,那么肯定是发生了回拨,算法会直接抛出异常。
【4】雪花算法优点
- 高并发分布式环境下生成不重复 id,每秒可生成百万个不重复 id。
- 基于时间戳,以及同一时间戳下序列号自增,基本保证 id 有序递增。
- 不依赖第三方库或者中间件。
- 算法简单,在内存中进行,效率高。
【5】雪花算法缺点
- 依赖服务器时间,服务器时钟回拨时可能会生成重复 id。
- 算法中可通过记录最后一个生成 id 时的时间戳来解决,每次生成 id 之前比较当前服务器时钟是否被回拨,避免生成重复 id。
- 需要配置机器ID和服务器ID
【三】Python中实现雪花算法
【1】前置知识补充
(1)Python中的位运算
运算符 | 描述 | 实例 |
---|---|---|
<< | 左移运算符:运算数的各二进位全部左移若干位, 由<< 右边的数字指定了移动的位数,高位丢弃(前面无效的0),低位补0. |
60 << 2 = 240 |
>> | 右移运算符:把>> 左边的的运算数的各二进位全部 右移若干位,运算符右边的数字指定了右移的位数。 低位丢弃(无效的0),高位补0. |
60>>2 = 15 |
^ | 按位异或运算符:当两两对应的二进位相异时,结果取1. | 01^11 = 10 |
a = 60 # 60 的二进制位数是: 0011 1100 (111100)
print(a << 2) # 0011 1100 左移两位 1111 0000 = 240
print(a >> 2) # 0011 1100 右移两位 0000 1111 = 15
# ^ 二进制之间的异或运算,当两值不同的时候为 1.
8 ^ 16 = 24
# 0000 1000 8
# 0001 0000 16
# 0001 1000 24 结果
(2)bin 函数
bin()
函数的返回值要从Ob
之后开始阅读。
print(bin(2)) # 0b10
【2】算法实现(封包)
- 结构树
SnowFlake # 包
- __init__.py # 初始化文件
- snowflake_main.py # 算法主函数
- snowflake_exceptions.py # 算法异常捕获
- snowflake_settings.py # 算法配置信息
__init__.py
# -*-coding: Utf-8 -*-
# @File : __init__.py .py
# author: Chimengmeng
# blog_url : https://www.cnblogs.com/dream-ze/
# Time:2023/9/11
from .snowflake_main import IdWorker
snowflake_main.py
# -*-coding: Utf-8 -*-
# @File : snowflake_main .py
# author: Chimengmeng
# blog_url : https://www.cnblogs.com/dream-ze/
# Time:2023/9/11
import time
import logging
import snowflake_settings
# 继承的Excpetion即可。
from snowflake_exceptions import InvalidSystemClock
# 可选择记录日志
logger = logging.getLogger('雪花算法')
class IdWorker(object):
'''
生成唯一ID的类
'''
def __init__(self, datacenter_id, worker_id, sequence=0):
'''
初始化方法
:param datacenter_id: 数据中心id
:param worker_id: 机器id
:param sequence: 序列号
'''
if worker_id > snowflake_settings.MAX_WORKER_ID or worker_id < 0:
raise ValueError('worker_id 值越界')
if datacenter_id > snowflake_settings.MAX_DATACENTER_ID or datacenter_id < 0:
raise ValueError('datacenter_id 值越界')
self.worker_id = worker_id
self.datacenter_id = datacenter_id
self.sequence = sequence
# 上次计算的时间戳
self.last_timestamp = -1
def _gen_timestamp(self):
'''
生成整数时间戳
:return: 整数时间戳
'''
return int(time.time() * 1000)
def get_id(self):
'''
获取新的ID
:return: 新的ID
'''
# 获取当前时间戳
timestamp = self._gen_timestamp()
# 处理时钟回拨的情况
if timestamp < self.last_timestamp:
logging.error('clock is moving backwards. Rejecting requests util {}'.format(self.last_timestamp))
raise InvalidSystemClock
if timestamp == self.last_timestamp:
# 同一毫秒内生成多个ID的情况
self.sequence = (self.sequence + 1) & snowflake_settings.SEQUENCE_MASK
if self.sequence == 0:
timestamp = self._til_next_millis(self.last_timestamp)
else:
self.sequence = 0
self.last_timestamp = timestamp
# 通过位运算生成最终的ID
new_id = (((timestamp - snowflake_settings.TWEPOCH) << snowflake_settings.TIMESTAMP_LEFT_SHIFT) | (
self.datacenter_id << snowflake_settings.DATACENTER_ID_SHIFT) | (
self.worker_id << snowflake_settings.WORKER_ID_SHIFT)) | self.sequence
return new_id
def _til_next_millis(self, last_timestamp):
'''
等待下一毫秒
:param last_timestamp: 上一次的时间戳
:return: 下一毫秒的时间戳
'''
timestamp = self._gen_timestamp()
while timestamp <= last_timestamp:
timestamp = self._gen_timestamp()
return timestamp
if __name__ == '__main__':
worker = IdWorker(1, 2, 0)
print(worker.get_id())
snowflake_exceptions.py
# -*-coding: Utf-8 -*-
# @File : snowflake_exceptions.py
# author: Chimengmeng
# blog_url : https://www.cnblogs.com/dream-ze/
# Time:2023/9/11
class InvalidSystemClock(Exception):
"""Exception raised when the system clock is invalid."""
def __init__(self, message="时钟回拨异常."):
self.message = message
super().__init__(self.message)
snowflake_settings.py
# -*-coding: Utf-8 -*-
# @File : snowflake_settings.py
# author: Chimengmeng
# blog_url : https://www.cnblogs.com/dream-ze/
# Time:2023/9/11
# 64 位 id 的划分,通常机器位和数据位各为 5 位
# 机器位
WORKER_ID_BITS = 5
# 数据位
DATACENTER_ID_BITS = 5
# 循环位
SEQUENCE_BITS = 12
# 最大取值计算,计算机中负数表示为他的补码
# 机器id的最大取值
MAX_WORKER_ID = -1 ^ (-1 << WORKER_ID_BITS) # 2**5 -1 =31
# 数据中心id的最大取值
MAX_DATACENTER_ID = -1 ^ (-1 << DATACENTER_ID_BITS)
# 移位偏移计算
# 机器id的位移量
WORKER_ID_SHIFT = SEQUENCE_BITS
# 数据中心id的位移量
DATACENTER_ID_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS
# 时间戳的位移量
TIMESTAMP_LEFT_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS + DATACENTER_ID_BITS
# X序号循环掩码
# 序列号的掩码,用于限定序列号的取值范围
SEQUENCE_MASK = -1 ^ (-1 << SEQUENCE_BITS)
# Twitter 元年时间戳
# 这里的时间戳可以看作是一个起始时间,这个值可以是任意在当前系统上线之前的时间戳
TWEPOCH = 1288834974657
- 参数详解
- 机器位(WORKER_ID_BITS):表示用于标识机器的位数,这里设置为5位。也就是说,最多可以有32台不同的机器。
- 数据位(DATACENTER_ID_BITS):表示用于标识数据中心的位数,这里设置为5位。也就是说,最多可以有32个不同的数据中心。
- 循环位(SEQUENCE_BITS):表示用于标识序列号的位数,这里设置为12位。也就是说,序列号的取值范围是0到4095。
- 最大机器ID(MAX_WORKER_ID):通过将所有机器位都设为1,然后进行异或操作,得到的结果是31。因此,最多可以有31个机器ID。
- 最大数据中心ID(MAX_DATACENTER_ID):通过将所有数据位都设为1,然后进行异或操作,得到的结果是31。因此,最多可以有31个数据中心ID。
- 机器ID的位移量(WORKER_ID_SHIFT):表示机器ID需要左移的位数,这里是12位。
- 数据中心ID的位移量(DATACENTER_ID_SHIFT):表示数据中心ID需要左移的位数,这里是17位。
- 时间戳的位移量(TIMESTAMP_LEFT_SHIFT):表示时间戳需要左移的位数,这里是22位。
- 序列号的掩码(SEQUENCE_MASK):通过将所有循环位都设为1,然后进行异或操作,得到的结果是4095。这个掩码用于限定序列号的取值范围。
- Twitter元年时间戳(TWEPOCH):表示一个起始时间戳,用于生成ID。这个值可以是系统上线之前的任何时间戳。例如,这个值可以是Twitter的创建时间戳。
- 测试
- 循环十次生成ID如下
1701156367164907520
1701156367164907521
1701156367164907522
1701156367164907523
1701156367164907524
1701156367164907525
1701156367164907526
1701156367164907527
1701156367164907528
1701156367164907529
【3】算法实现(第三方模块)
(1)安装
- 使用pip命令安装pysnowflake模块。
- 请确保您的Python环境已正确配置,并执行以下命令来安装:
pip install pysnowflake
(2)启动服务
- 在使用pysnowflake之前,您需要启动一个Snowflake服务。
- Snowflake服务负责分配唯一的ID。
- 可以通过以下命令启动服务:
snowflake_start_server [--dc=DC_ID] [--worker=WORKER_ID] [--host=ADDRESS] [--port=PORT]
- DC(整数,2 位):首先在环境中搜索PSF_DC,如果未找到,则获取 0 值。
- worker (int, 8 bit):首先在环境中搜索PSF_WORKER,如果未找到,则获取 0 值。
- 地址(域,INET):默认值为本地主机。
- 端口 (INT):默认值为 8910。
- 上述命令将启动一个worker数量为1的Snowflake服务,在本地运行。
(3)编写程序,获取ID
- 接下来,您可以编写程序来获取Snowflake算法生成的唯一ID。
- 首先,导入必要的模块和类:
import snowflake.client
- 然后,创建一个SnowflakeClient对象,并连接到Snowflake服务:
host = '127.0.0.1'
port = 8910
snowflake.client.setup(host=host, port=port)
- 通过调用
get_guid()
方法,您可以获取一个Snowflake算法生成的全局唯一ID:
unique_id = snowflake.client.get_guid()
print(unique_id)
- 在以上示例代码中,
unique_id
变量将保存获取到的唯一ID,并将其打印到控制台。
- 注意:在实际使用中,您可能需要对snowflake.client.setup进行配置
- 例如指定snowflake.client.setup服务的地址、端口和其他属性。
- 您可以参考pysnowflake模块的文档来了解更多配置选项。