您的当前位置:首页>全部文章>文章详情
python 实现 mybatis ID 雪花算法
发表于:2021-12-05浏览:37次TAG: #唯一ID #雪花算法 #python
import copy
import os
import random
import time
class Sequence:
    # 时间起始标记点,作为基准,一般取系统的最近时间(一旦确定不能变动)
    twepoch = 1288834974657
    # 机器标识位数
    worker_id_bits = 5
    datacenter_id_bits = 5
    max_worker_id = -1 ^ (-1 << worker_id_bits)
    maxDatacenterId = -1 ^ (-1 << datacenter_id_bits)
    # 毫秒内自增位
    sequenceBits = 12
    workerIdShift = sequenceBits
    datacenterIdShift = sequenceBits + worker_id_bits
    timestampLeftShift = sequenceBits + worker_id_bits + datacenter_id_bits
    sequenceMask = -1 ^ (-1 << sequenceBits)
    workerId = 1
    # 数据标识 ID  部分
    datacenterId = 0
    # 并发控制
    sequence = 0
    # 上次生产ID时间戳
    lastTimestamp = -1
    def __int__(self):
        self.datacenterId = self._get_datacenter_id(self.maxDatacenterId)
        self.workerId = self._get_max_workerId(self.datacenterId, self.max_worker_id)
    # 数据标识id部分
    def _get_datacenter_id(self, maxDatacenterId):
        mac = self.__get_mac_address()
        id = ((0x000000FF & mac[len(mac) - 1]) | (0x0000FF00 & ((mac[len(mac) - 2]) << 8))) >> 6
        id = id % (maxDatacenterId + 1)
        return id
    # 获取maxworkerid
    def _get_max_workerId(self, datacenterId, maxWorkerId):
        mpid = str(datacenterId) + str(os.getpid())
        return (hash(mpid) & 0xffff) % (maxWorkerId + 1)
    def __get_mac_address(self):
        import uuid
        node = uuid.getnode()
        mac = uuid.UUID(int=node).hex[-12:]
        decimalismMac = [
            int(mac[0:2], 16), int(mac[2:4], 16), int(mac[4:6], 16), int(mac[6:8], 16), int(mac[8:10], 16),
            int(mac[10:12], 16)
        ]
        return decimalismMac
    # 获取下一个id
    @classmethod
    def nextId(cls):
        timestamp = int(time.time() * 1000)
        # 闰秒
        if (timestamp < cls.lastTimestamp):
            offset = cls.lastTimestamp - timestamp
            if offset <= 5:
                time.sleep(offset << 1)
                timestamp = int(time.time() * 1000)
                if timestamp < cls.lastTimestamp:
                    return None
            else:
                return None
        if cls.lastTimestamp == timestamp:
            #  相同毫秒内,序列号自增
            cls.sequence = (cls.sequence + 1) & cls.sequenceMask
            if cls.sequence == 0:
                # 同一毫秒的序列数已经达到最大
                timestamp = cls.__til_next_millis(cls.lastTimestamp)
        else:
            # 不同毫秒内,序列号置为 1 - 3 随机数
            cls.sequence = random.randrange(1, 4)
        cls.lastTimestamp = copy.copy(timestamp)
        # 时间戳部分 | 数据中心部分 | 机器标识部分 | 序列号部分
        return ((timestamp - cls.twepoch) << cls.timestampLeftShift) | (
                cls.datacenterId << cls.datacenterIdShift) | (cls.workerId << cls.workerIdShift) | cls.sequence
    def __til_next_millis(self, lastTimestamp):
        timestamp = int(time.time() * 1000)
        while timestamp <= lastTimestamp:
            timestamp = int(time.time() * 1000)
        return timestamp

调用

Sequence.nextId


标签云