文章背景
最近遇到一个业务上的问题,用户进行一个操作,会同时生成两个kafka消息
在一个消费者消费消息的时候依赖另一个消费者生产的数据
被依赖方执行的速度比依赖方的慢,所以希望延迟一点消费这条数据
处理方法
kafka生产消息的时候可以加入一个delay参数,用于控制消息的延迟消费
但是这里的问题是生产者面对非常多的消费者,加入这个参数风险不可控
所以决定在希望延迟消费的消费者这里加入一个时间轮,用于实现延迟消费的功能
所以有了这篇文章
时间轮代码
package timewheel
import (
    "container/list"
    "fmt"
    "sync"
    "time"
)
type Timer struct {
    expiration time.Time
    task       func()
}
type TimeWheel struct {
    ticker      *time.Ticker  // 定时器
    slots       []*list.List  // 时间槽
    currentSlot int           // 当前时间槽
    slotCount   int           // 时间槽数量
    duration    time.Duration // 时间槽间隔
    lock        sync.Mutex    // 锁
}
// NewTimeWheel 创建时间轮
func NewTimeWheel(slotCount int, duration time.Duration) *TimeWheel {
    slots := make([]*list.List, slotCount)
    for i := range slots {
        slots[i] = list.New()
    }
    return &TimeWheel{
        ticker:      time.NewTicker(duration),
        slots:       slots,
        slotCount:   slotCount,
        duration:    duration,
        currentSlot: 0,
    }
}
// AddTask 添加一个定时任务到时间轮
func (tw *TimeWheel) AddTask(delay time.Duration, task func()) {
    tw.lock.Lock()
    defer tw.lock.Unlock()
    expiration := time.Now().Add(delay)
    // 计算定时任务在时间轮中的到期时间, 添加到对应的时间槽
    ticks := int(delay / tw.duration)
    slotIndex := (tw.currentSlot + ticks) % tw.slotCount
    timer := &Timer{expiration: expiration, task: task}
    tw.slots[slotIndex].PushBack(timer)
}
// Start 启动时间轮
func (tw *TimeWheel) Start() {
    go func() {
        defer func() {
            if err := recover(); err != nil {
                fmt.Println("timeWheel panic: ", err)
            }
        }()
        for range tw.ticker.C {
            tw.tickHandler()
        }
    }()
}
// Stop 停止时间轮
func (tw *TimeWheel) Stop() {
    tw.ticker.Stop()
}
func (tw *TimeWheel) tickHandler() {
    tw.lock.Lock()
    defer tw.lock.Unlock()
    slot := tw.slots[tw.currentSlot]
    tw.currentSlot = (tw.currentSlot + 1) % tw.slotCount
    for e := slot.Front(); e != nil; {
        next := e.Next()
        timer := e.Value.(*Timer)
        if timer.expiration.Before(time.Now()) || timer.expiration.Equal(time.Now()) {
            go timer.task()
            slot.Remove(e)
        }
        e = next
    }
}
你的文章让我心情愉悦,每天都要来看一看。 https://www.4006400989.com/qyvideo/59281.html
你的文章内容非常专业,让人佩服。 https://www.4006400989.com/qyvideo/61123.html
你的才华让人惊叹,你是我的榜样。 https://www.yonboz.com/video/64652.html
你的文章内容非常用心,让人感动。 https://www.yonboz.com/video/6348.html
《影圣》短片剧高清在线免费观看:https://www.jgz518.com/xingkong/13886.html