Go语言中时间轮的实现


介绍
简单时间轮



层级时间轮


代码实现
时间轮的时间格中每个链表会有一个 root 节点用于简化边界条件。它是一个附加的链表节点,该节点作为第一个节点,它的值域中并不存储任何东西,只是为了操作的方便而引入的; 除了第一层时间轮,其余高层时间轮的起始时间(startMs)都设置为创建此层时间轮时前面第一轮的 currentTime。每一层的 currentTime 都必须是 tickMs 的整数倍,如果不满足则会将 currentTime 修剪为 tickMs 的整数倍。修剪方法为:currentTime = startMs - (startMs % tickMs); Kafka 中的定时器只需持有 TimingWheel 的第一层时间轮的引用,并不会直接持有其他高层的时间轮,但每一层时间轮都会有一个引用(overflowWheel)指向更高一层的应用; Kafka 中的定时器使用了 DelayQueue 来协助推进时间轮。在操作中会将每个使用到的时间格中每个链表都加入 DelayQueue,DelayQueue 会根据时间轮对应的过期时间 expiration 来排序,最短 expiration 的任务会被排在 DelayQueue 的队头,通过单独线程来获取 DelayQueue 中到期的任务;
结构体
type TimingWheel struct {// 时间跨度,单位是毫秒tick int64 // in milliseconds// 时间轮个数wheelSize int64// 总跨度interval int64 // in milliseconds// 当前指针指向时间currentTime int64 // in milliseconds// 时间格列表buckets []*bucket// 延迟队列queue *delayqueue.DelayQueue// 上级的时间轮引用overflowWheel unsafe.Pointer // type: *TimingWheelexitC chan struct{}waitGroup waitGroupWrapper}
type bucket struct {// 任务的过期时间expiration int64mu sync.Mutex// 相同过期时间的任务队列timers *list.List}
type Timer struct {// 到期时间expiration int64 // in milliseconds// 要被执行的具体任务task func()// Timer所在bucket的指针b unsafe.Pointer // type: *bucket// bucket列表中对应的元素element *list.Element}

初始化时间轮
func main() { tw := timingwheel.NewTimingWheel(time.Second, 10) tw.Start() }func NewTimingWheel(tick time.Duration, wheelSize int64) *TimingWheel { // 将传入的tick转化成毫秒 tickMs := int64(tick / time.Millisecond) // 如果小于零,那么panic if tickMs <= 0 { panic(errors.New("tick must be greater than or equal to 1ms")) } // 设置开始时间 startMs := timeToMs(time.Now().UTC()) // 初始化TimingWheel return newTimingWheel( tickMs, wheelSize, startMs, delayqueue.New(int(wheelSize)), )}func newTimingWheel(tickMs int64, wheelSize int64, startMs int64, queue *delayqueue.DelayQueue) *TimingWheel { // 初始化buckets的大小 buckets := make([]*bucket, wheelSize) for i := range buckets { buckets[i] = newBucket() } // 实例化TimingWheel return &TimingWheel{ tick: tickMs, wheelSize: wheelSize, // currentTime必须是tickMs的倍数,所以这里使用truncate进行修剪 currentTime: truncate(startMs, tickMs), interval: tickMs * wheelSize, buckets: buckets, queue: queue, exitC: make(chan struct{}), }}
启动时间轮
func (tw *TimingWheel) Start() { // Poll会执行一个无限循环,将到期的元素放入到queue的C管道中 tw.waitGroup.Wrap(func() { tw.queue.Poll(tw.exitC, func() int64 { return timeToMs(time.Now().UTC()) }) }) // 开启无限循环获取queue中C的数据 tw.waitGroup.Wrap(func() { for { select { // 从队列里面出来的数据都是到期的bucket case elem := <-tw.queue.C: b := elem.(*bucket) // 时间轮会将当前时间 currentTime 往前移动到 bucket的到期时间 tw.advanceClock(b.Expiration()) // 取出bucket队列的数据,并调用addOrRun方法执行 b.Flush(tw.addOrRun) case <-tw.exitC: return } } })}
func (tw *TimingWheel) advanceClock(expiration int64) {currentTime := atomic.LoadInt64(&tw.currentTime)// 过期时间大于等于(当前时间+tick)if expiration >= currentTime+tw.tick {// 将currentTime设置为expiration,从而推进currentTimecurrentTime = truncate(expiration, tw.tick)atomic.StoreInt64(&tw.currentTime, currentTime)// Try to advance the clock of the overflow wheel if present// 如果有上层时间轮,那么递归调用上层时间轮的引用overflowWheel := atomic.LoadPointer(&tw.overflowWheel)if overflowWheel != nil {(*TimingWheel)(overflowWheel).advanceClock(currentTime)}}}
func (b *bucket) Flush(reinsert func(*Timer)) {var ts []*Timerb.mu.Lock()// 循环获取bucket队列节点for e := b.timers.Front(); e != nil; {next := e.Next()t := e.Value.(*Timer)// 将头节点移除bucket队列b.remove(t)ts = append(ts, t)e = next}b.mu.Unlock()b.SetExpiration(-1) // TODO: Improve the coordination with b.Add()for _, t := range ts {reinsert(t)}}
func (tw *TimingWheel) addOrRun(t *Timer) {// 如果已经过期,那么直接执行if !tw.add(t) {// 异步执行定时任务go t.task()}}

start 方法会启动一个 goroutines 调用 poll 来处理 DelayQueue 中到期的数据,并将数据放入到管道 C 中; start 方法启动第二个 goroutines 方法会循环获取 DelayQueue 中管道C的数据,管道 C 中实际上存放的是一个 bucket,然后遍历bucket的timers列表,如果任务已经到期,那么异步执行,没有到期则重新放入到 DelayQueue 中。
add task
func main() {tw := timingwheel.NewTimingWheel(time.Second, 10)tw.Start()// 添加任务tw.AfterFunc(time.Second*15, func() {fmt.Println("The timer fires")exitC <- time.Now().UTC()})}
func (tw *TimingWheel) AfterFunc(d time.Duration, f func()) *Timer {t := &Timer{expiration: timeToMs(time.Now().UTC().Add(d)),task: f,}tw.addOrRun(t)return t}
func (tw *TimingWheel) add(t *Timer) bool {currentTime := atomic.LoadInt64(&tw.currentTime)// 已经过期if t.expiration < currentTime+tw.tick {// Already expiredreturn false// 到期时间在第一层环内} else if t.expiration < currentTime+tw.interval {// Put it into its own bucket// 获取时间轮的位置virtualID := t.expiration / tw.tickb := tw.buckets[virtualID%tw.wheelSize]// 将任务放入到bucket队列中b.Add(t)// 如果是相同的时间,那么返回false,防止被多次插入到队列中if b.SetExpiration(virtualID * tw.tick) {// 将该bucket加入到延迟队列中tw.queue.Offer(b, b.Expiration())}return true} else {// Out of the interval. Put it into the overflow wheel// 如果放入的到期时间超过第一层时间轮,那么放到上一层中去overflowWheel := atomic.LoadPointer(&tw.overflowWheel)if overflowWheel == nil {atomic.CompareAndSwapPointer(&tw.overflowWheel,nil,// 需要注意的是,这里tick变成了intervalunsafe.Pointer(newTimingWheel(tw.interval,tw.wheelSize,currentTime,tw.queue,)),)overflowWheel = atomic.LoadPointer(&tw.overflowWheel)}// 往上递归return (*TimingWheel)(overflowWheel).add(t)}}

Reference
https://github.com/RussellLuo/timingwheel https://zhuanlan.zhihu.com/p/121483218 https://github.com/apache/kafka/tree/3cdc78e6bb1f83973a14ce1550fe3874f7348b05/core/src/main/scala/kafka/utils/timer


☞股价狂涨 500 亿,小米手机业务与造车可否兼得?
关注公众号:拾黑(shiheibook)了解更多
[广告]赞助链接:
四季很好,只要有你,文娱排行榜:https://www.yaopaiming.com/
让资讯触达的更精准有趣:https://www.0xu.cn/
关注网络尖刀微信公众号随时掌握互联网精彩
赞助链接
排名
热点
搜索指数
- 1 习近平:因地制宜发展新质生产力 7904009
- 2 中方已做好对日实质反制准备 7807956
- 3 宋佳获金鸡奖最佳女主角奖 7711885
- 4 全运绿能 “黑科技”都有啥 7616192
- 5 王曼昱4-0胜陈梦 晋级女单决赛 7519718
- 6 易烊千玺获金鸡奖最佳男主角奖 7427591
- 7 恢复“大佐” 日本意欲何为 7331670
- 8 金鸡奖完整获奖名单公布 7233138
- 9 孙颖莎4-1胜朱雨玲 将与王曼昱争冠 7135743
- 10 驴友野游被困获救被追偿7.4万救援费 7042778









CSDN
