MENU

Golang时间轮精简实现

August 3, 2024 • Read: 7798 • 默认分类

文章背景

最近遇到一个业务上的问题,用户进行一个操作,会同时生成两个kafka消息

在一个消费者消费消息的时候依赖另一个消费者生产的数据

被依赖方执行的速度比依赖方的慢,所以希望延迟一点消费这条数据

处理方法

kafka生产消息的时候可以加入一个delay参数,用于控制消息的延迟消费

但是这里的问题是生产者面对非常多的消费者,加入这个参数风险不可控

所以决定在希望延迟消费的消费者这里加入一个时间轮,用于实现延迟消费的功能

所以有了这篇文章

时间轮代码

package timewheel

import (
    "container/list"
    "fmt"
    "sync"
    "time"
)

type Timer struct {
    expiration time.Time
    task       func()
}

type TimeWheel struct {
    ticker      *time.Ticker  // 定时器
    slots       []*list.List  // 时间槽
    currentSlot int           // 当前时间槽
    slotCount   int           // 时间槽数量
    duration    time.Duration // 时间槽间隔
    lock        sync.Mutex    // 锁
}

// NewTimeWheel 创建时间轮
func NewTimeWheel(slotCount int, duration time.Duration) *TimeWheel {
    slots := make([]*list.List, slotCount)
    for i := range slots {
        slots[i] = list.New()
    }
    return &TimeWheel{
        ticker:      time.NewTicker(duration),
        slots:       slots,
        slotCount:   slotCount,
        duration:    duration,
        currentSlot: 0,
    }
}

// AddTask 添加一个定时任务到时间轮
func (tw *TimeWheel) AddTask(delay time.Duration, task func()) {
    tw.lock.Lock()
    defer tw.lock.Unlock()

    expiration := time.Now().Add(delay)

    // 计算定时任务在时间轮中的到期时间, 添加到对应的时间槽
    ticks := int(delay / tw.duration)
    slotIndex := (tw.currentSlot + ticks) % tw.slotCount

    timer := &Timer{expiration: expiration, task: task}
    tw.slots[slotIndex].PushBack(timer)
}

// Start 启动时间轮
func (tw *TimeWheel) Start() {
    go func() {
        defer func() {
            if err := recover(); err != nil {
                fmt.Println("timeWheel panic: ", err)
            }
        }()
        for range tw.ticker.C {
            tw.tickHandler()
        }
    }()
}

// Stop 停止时间轮
func (tw *TimeWheel) Stop() {
    tw.ticker.Stop()
}

func (tw *TimeWheel) tickHandler() {
    tw.lock.Lock()
    defer tw.lock.Unlock()

    slot := tw.slots[tw.currentSlot]
    tw.currentSlot = (tw.currentSlot + 1) % tw.slotCount

    for e := slot.Front(); e != nil; {
        next := e.Next()
        timer := e.Value.(*Timer)
        if timer.expiration.Before(time.Now()) || timer.expiration.Equal(time.Now()) {
            go timer.task()
            slot.Remove(e)
        }
        e = next
    }
}


Last Modified: August 5, 2024
Leave a Comment

37 Comments
  1. 2025年10月新盘 做第一批吃螃蟹的人coinsrore.com
    新车新盘 嘎嘎稳 嘎嘎靠谱coinsrore.com
    新车首发,新的一年,只带想赚米的人coinsrore.com
    新盘 上车集合 留下 我要发发 立马进裙coinsrore.com
    做了几十年的项目 我总结了最好的一个盘(纯干货)coinsrore.com
    新车上路,只带前10个人coinsrore.com
    新盘首开 新盘首开 征召客户!!!coinsrore.com
    新项目准备上线,寻找志同道合的合作伙伴coinsrore.com
    新车即将上线 真正的项目,期待你的参与coinsrore.com
    新盘新项目,不再等待,现在就是最佳上车机会!coinsrore.com
    新盘新盘 这个月刚上新盘 新车第一个吃螃蟹!coinsrore.com

  2. 2025年10月新盘 做第一批吃螃蟹的人coinsrore.com
    新车新盘 嘎嘎稳 嘎嘎靠谱coinsrore.com
    新车首发,新的一年,只带想赚米的人coinsrore.com
    新盘 上车集合 留下 我要发发 立马进裙coinsrore.com
    做了几十年的项目 我总结了最好的一个盘(纯干货)coinsrore.com
    新车上路,只带前10个人coinsrore.com
    新盘首开 新盘首开 征召客户!!!coinsrore.com
    新项目准备上线,寻找志同道合的合作伙伴coinsrore.com
    新车即将上线 真正的项目,期待你的参与coinsrore.com
    新盘新项目,不再等待,现在就是最佳上车机会!coinsrore.com
    新盘新盘 这个月刚上新盘 新车第一个吃螃蟹!coinsrore.com

  3. 新车首发,新的一年,只带想赚米的人coinsrore.com

  4. 感谢大佬的贡献

  5. 《暗红》剧情片高清在线免费观看:https://www.jgz518.com/xingkong/131068.html