【雪花算法】

发布时间 2023-09-11 17:51:03作者: Chimengmeng

【一】引入

  • 现在的服务基本是分布式,微服务形式的,而且大数据量也导致分库分表的产生,对于水平分表就需要保证表中 id 的全局唯一性。
  • 对于 MySQL 而言,一个表中的主键 id 一般使用自增的方式,但是如果进行水平分表之后,多个表中会生成重复的 id 值。
    • 那么如何保证水平分表后的多张表中的 id 是全局唯一性的呢?
  • 如果还是借助数据库主键自增的形式,那么可以让不同表初始化一个不同的初始值,然后按指定的步长进行自增。
    • 例如有3张拆分表,初始主键值为1,2,3,自增步长为3。
  • 当然也有人使用 UUID 来作为主键,但是 UUID 生成的是一个无序的字符串,对于 MySQL 推荐使用增长的数值类型值作为主键来说不适合。
  • 也可以使用 Redis 的自增原子性来生成唯一 id,但是这种方式业内比较少用。
  • 当然还有其他解决方案,不同互联网公司也有自己内部的实现方案。
    • 雪花算法是其中一个用于解决分布式 id 的高效方案,也是许多互联网公司在推荐使用的。

【二】SnowFlake 雪花算法

【1】雪花算法

  • 雪花算法是一种分布式全局唯一ID
  • 一般不需要过多的深入了解,一般个人项目用不到分布式之类的大型架构
  • 另一方面,则是因为,就算用到市面上很多 ID 生成器帮我们完成了这项工作。

【2】介绍

  • Twitter 于 2010 年开源了内部团队在用的一款全局唯一 ID 生成算法 Snowflake,翻译过来叫做雪花算法。
  • Snowflake 不借助数据库,可直接由编程语言生成,它通过巧妙的位设计使得 ID 能够满足递增属性,且生成的 ID 并不是依次连续的。

【3】原理及介绍

  • Snowflake 是 Twitter 提出的一个算法,其目的是生成一个64位的整数;
  • 64位的分布图如下图所示:

雪花算法

  • 1 bit
    • 一般是符号位,不做处理。
  • 41bit
    • 用来记录时间戳
    • 这里可以记录69年,如果设置好起始时间
    • 比如今年是 2023 ,那么可以用到 2092 年,到时候怎么办,这个系统要是能够使用 69 年,估计系统早已经优化过很多次了。
  • 10bit
    • 用来记录机器ID
    • 总共可以记录1024台机器
    • 一般用前5位代表数据中心,后面5位是某个数据中心的机器ID。
    • (注:位数可以根据情况进行设定。)
  • 12bit
    • 循环用来对同一个毫秒内产生的不同的 ID
    • 12位可以最多记录4095(212-1)次,多余的需要在下一毫秒进行处理。
  • 上面是一个将64bit划分标准

    • 当然也不一定这么做,可以根据不同的业务的具体场景进行行划分
    • 例如:
      • 服务目前QPS10万,预计几年之内会发展到百万。

      • 当前机器三地部署,上海,北京,深圳都有。

      • 当前机器10台左右,预计未来会增加至百台。

  • 这个时候我们根据上面的场景可以再次合理的划分62 bit

    • QPS 几年之内会发展到百万,那么每毫秒就是千级的请求
    • 目前 10 台机器那么每台机器承担百级的请求
    • 为了保证扩展,后面的循环位可以限制到 1024,也就是2^10,那么循环位10位就足够了
  • 机器三地部署我们可以用3bit总共8来表示机房位置

    • 当前的机器10台,为了保证扩展到百台那么可以用7bit 128来表示,时间位依然是41bit,那么还剩下64-10-3-7-41-1 = 2bit,还剩下2bit可以用来进行扩展。

复制

  • 时钟回拨
    • 因为机器的原因会发生时间回拨,雪花算法是强依赖时间的
    • 如果发生时间回拨,有可能会发生重复ID
    • 在我们上面的nextId中我们用当前时间和上一次时间进行判断
      • 如果当前时间小于上一次的时间,那么肯定是发生了回拨,算法会直接抛出异常。

【4】雪花算法优点

  • 高并发分布式环境下生成不重复 id,每秒可生成百万个不重复 id。
  • 基于时间戳,以及同一时间戳下序列号自增,基本保证 id 有序递增。
  • 不依赖第三方库或者中间件。
  • 算法简单,在内存中进行,效率高。

【5】雪花算法缺点

  • 依赖服务器时间,服务器时钟回拨时可能会生成重复 id。
    • 算法中可通过记录最后一个生成 id 时的时间戳来解决,每次生成 id 之前比较当前服务器时钟是否被回拨,避免生成重复 id。
  • 需要配置机器ID和服务器ID

【三】Python中实现雪花算法

【1】前置知识补充

(1)Python中的位运算

运算符 描述 实例
<< 左移运算符:运算数的各二进位全部左移若干位, 由<<右边的数字指定了移动的位数,高位丢弃(前面无效的0),低位补0. 60 << 2 = 240
>> 右移运算符:把>>左边的的运算数的各二进位全部 右移若干位,运算符右边的数字指定了右移的位数。 低位丢弃(无效的0),高位补0. 60>>2 = 15
^ 按位异或运算符:当两两对应的二进位相异时,结果取1. 01^11 = 10
a = 60  # 60 的二进制位数是: 0011 1100 (111100)

print(a << 2)  # 0011 1100 左移两位 1111 0000 = 240

print(a >> 2)  # 0011 1100 右移两位 0000 1111 = 15

image-20220819153440572

# ^ 二进制之间的异或运算,当两值不同的时候为 1.
8 ^ 16 = 24

# 0000 1000   8 
# 0001 0000	  16
# 0001 1000   24   结果

image-20220819154645947

(2)bin 函数

  • bin()函数的返回值要从Ob之后开始阅读。
print(bin(2))  # 0b10

image-20220819151336024

【2】算法实现(封包)

  • 结构树
SnowFlake						# 包
	- __init__.py    	 		# 初始化文件
    - snowflake_main.py			# 算法主函数
    - snowflake_exceptions.py	# 算法异常捕获
    - snowflake_settings.py		# 算法配置信息
  • __init__.py
# -*-coding: Utf-8 -*-
# @File : __init__.py .py
# author: Chimengmeng
# blog_url : https://www.cnblogs.com/dream-ze/
# Time:2023/9/11
from .snowflake_main import IdWorker
  • snowflake_main.py
# -*-coding: Utf-8 -*-
# @File : snowflake_main .py
# author: Chimengmeng
# blog_url : https://www.cnblogs.com/dream-ze/
# Time:2023/9/11
import time
import logging
import snowflake_settings
# 继承的Excpetion即可。
from snowflake_exceptions import InvalidSystemClock

# 可选择记录日志
logger = logging.getLogger('雪花算法')


class IdWorker(object):
    '''
    生成唯一ID的类
    '''

    def __init__(self, datacenter_id, worker_id, sequence=0):
        '''
        初始化方法
        :param datacenter_id: 数据中心id
        :param worker_id: 机器id
        :param sequence: 序列号
        '''
        if worker_id > snowflake_settings.MAX_WORKER_ID or worker_id < 0:
            raise ValueError('worker_id 值越界')
        if datacenter_id > snowflake_settings.MAX_DATACENTER_ID or datacenter_id < 0:
            raise ValueError('datacenter_id 值越界')

        self.worker_id = worker_id
        self.datacenter_id = datacenter_id
        self.sequence = sequence

        # 上次计算的时间戳
        self.last_timestamp = -1

    def _gen_timestamp(self):
        '''
        生成整数时间戳
        :return: 整数时间戳
        '''
        return int(time.time() * 1000)

    def get_id(self):
        '''
        获取新的ID
        :return: 新的ID
        '''
        # 获取当前时间戳
        timestamp = self._gen_timestamp()

        # 处理时钟回拨的情况
        if timestamp < self.last_timestamp:
            logging.error('clock is moving backwards. Rejecting requests util {}'.format(self.last_timestamp))
            raise InvalidSystemClock

        if timestamp == self.last_timestamp:
            # 同一毫秒内生成多个ID的情况
            self.sequence = (self.sequence + 1) & snowflake_settings.SEQUENCE_MASK
            if self.sequence == 0:
                timestamp = self._til_next_millis(self.last_timestamp)
        else:
            self.sequence = 0

        self.last_timestamp = timestamp

        # 通过位运算生成最终的ID
        new_id = (((timestamp - snowflake_settings.TWEPOCH) << snowflake_settings.TIMESTAMP_LEFT_SHIFT) | (
                self.datacenter_id << snowflake_settings.DATACENTER_ID_SHIFT) | (
                          self.worker_id << snowflake_settings.WORKER_ID_SHIFT)) | self.sequence
        return new_id

    def _til_next_millis(self, last_timestamp):
        '''
        等待下一毫秒
        :param last_timestamp: 上一次的时间戳
        :return: 下一毫秒的时间戳
        '''
        timestamp = self._gen_timestamp()
        while timestamp <= last_timestamp:
            timestamp = self._gen_timestamp()
        return timestamp


if __name__ == '__main__':
    worker = IdWorker(1, 2, 0)
    print(worker.get_id())
  • snowflake_exceptions.py
# -*-coding: Utf-8 -*-
# @File : snowflake_exceptions.py
# author: Chimengmeng
# blog_url : https://www.cnblogs.com/dream-ze/
# Time:2023/9/11

class InvalidSystemClock(Exception):
    """Exception raised when the system clock is invalid."""

    def __init__(self, message="时钟回拨异常."):
        self.message = message
        super().__init__(self.message)

  • snowflake_settings.py
# -*-coding: Utf-8 -*-
# @File : snowflake_settings.py
# author: Chimengmeng
# blog_url : https://www.cnblogs.com/dream-ze/
# Time:2023/9/11

# 64 位 id 的划分,通常机器位和数据位各为 5 位

# 机器位
WORKER_ID_BITS = 5
# 数据位
DATACENTER_ID_BITS = 5
# 循环位
SEQUENCE_BITS = 12

# 最大取值计算,计算机中负数表示为他的补码

# 机器id的最大取值
MAX_WORKER_ID = -1 ^ (-1 << WORKER_ID_BITS)  # 2**5 -1 =31
# 数据中心id的最大取值
MAX_DATACENTER_ID = -1 ^ (-1 << DATACENTER_ID_BITS)

# 移位偏移计算
# 机器id的位移量
WORKER_ID_SHIFT = SEQUENCE_BITS
# 数据中心id的位移量
DATACENTER_ID_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS
# 时间戳的位移量
TIMESTAMP_LEFT_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS + DATACENTER_ID_BITS

# X序号循环掩码
# 序列号的掩码,用于限定序列号的取值范围
SEQUENCE_MASK = -1 ^ (-1 << SEQUENCE_BITS)

# Twitter 元年时间戳
# 这里的时间戳可以看作是一个起始时间,这个值可以是任意在当前系统上线之前的时间戳
TWEPOCH = 1288834974657
  • 参数详解
    • 机器位(WORKER_ID_BITS):表示用于标识机器的位数,这里设置为5位。也就是说,最多可以有32台不同的机器。
    • 数据位(DATACENTER_ID_BITS):表示用于标识数据中心的位数,这里设置为5位。也就是说,最多可以有32个不同的数据中心。
    • 循环位(SEQUENCE_BITS):表示用于标识序列号的位数,这里设置为12位。也就是说,序列号的取值范围是0到4095。
    • 最大机器ID(MAX_WORKER_ID):通过将所有机器位都设为1,然后进行异或操作,得到的结果是31。因此,最多可以有31个机器ID。
    • 最大数据中心ID(MAX_DATACENTER_ID):通过将所有数据位都设为1,然后进行异或操作,得到的结果是31。因此,最多可以有31个数据中心ID。
    • 机器ID的位移量(WORKER_ID_SHIFT):表示机器ID需要左移的位数,这里是12位。
    • 数据中心ID的位移量(DATACENTER_ID_SHIFT):表示数据中心ID需要左移的位数,这里是17位。
    • 时间戳的位移量(TIMESTAMP_LEFT_SHIFT):表示时间戳需要左移的位数,这里是22位。
    • 序列号的掩码(SEQUENCE_MASK):通过将所有循环位都设为1,然后进行异或操作,得到的结果是4095。这个掩码用于限定序列号的取值范围。
    • Twitter元年时间戳(TWEPOCH):表示一个起始时间戳,用于生成ID。这个值可以是系统上线之前的任何时间戳。例如,这个值可以是Twitter的创建时间戳。
  • 测试
    • 循环十次生成ID如下
1701156367164907520
1701156367164907521
1701156367164907522
1701156367164907523
1701156367164907524
1701156367164907525
1701156367164907526
1701156367164907527
1701156367164907528
1701156367164907529

【3】算法实现(第三方模块)

(1)安装

  • 使用pip命令安装pysnowflake模块。
  • 请确保您的Python环境已正确配置,并执行以下命令来安装:
pip install pysnowflake

(2)启动服务

  • 在使用pysnowflake之前,您需要启动一个Snowflake服务。
  • Snowflake服务负责分配唯一的ID。
  • 可以通过以下命令启动服务:
snowflake_start_server [--dc=DC_ID] [--worker=WORKER_ID] [--host=ADDRESS] [--port=PORT]
  • DC(整数,2 位):首先在环境中搜索PSF_DC,如果未找到,则获取 0 值。
  • worker (int, 8 bit):首先在环境中搜索PSF_WORKER,如果未找到,则获取 0 值。
  • 地址(域,INET):默认值为本地主机。
  • 端口 (INT):默认值为 8910。
  • 上述命令将启动一个worker数量为1的Snowflake服务,在本地运行。

image-20230911173857207

(3)编写程序,获取ID

  • 接下来,您可以编写程序来获取Snowflake算法生成的唯一ID。
  • 首先,导入必要的模块和类:
import snowflake.client
  • 然后,创建一个SnowflakeClient对象,并连接到Snowflake服务:
host = '127.0.0.1'
port = 8910
snowflake.client.setup(host=host, port=port)
  • 通过调用get_guid()方法,您可以获取一个Snowflake算法生成的全局唯一ID:
unique_id = snowflake.client.get_guid()
print(unique_id)
  • 在以上示例代码中,unique_id变量将保存获取到的唯一ID,并将其打印到控制台。

image-20230911173922730

  • 注意:在实际使用中,您可能需要对snowflake.client.setup进行配置
  • 例如指定snowflake.client.setup服务的地址、端口和其他属性。
  • 您可以参考pysnowflake模块的文档来了解更多配置选项。