slackhq_nebula/timeout.go

226 lines
5.6 KiB
Go

package nebula
import (
"sync"
"time"
)
// How many timer objects should be cached
const timerCacheMax = 50000
type TimerWheel[T any] struct {
// Current tick
current int
// Cheat on finding the length of the wheel
wheelLen int
// Last time we ticked, since we are lazy ticking
lastTick *time.Time
// Durations of a tick and the entire wheel
tickDuration time.Duration
wheelDuration time.Duration
// The actual wheel which is just a set of singly linked lists, head/tail pointers
wheel []*TimeoutList[T]
// Singly linked list of items that have timed out of the wheel
expired *TimeoutList[T]
// Item cache to avoid garbage collect
itemCache *TimeoutItem[T]
itemsCached int
}
type LockingTimerWheel[T any] struct {
m sync.Mutex
t *TimerWheel[T]
}
// TimeoutList Represents a tick in the wheel
type TimeoutList[T any] struct {
Head *TimeoutItem[T]
Tail *TimeoutItem[T]
}
// TimeoutItem Represents an item within a tick
type TimeoutItem[T any] struct {
Item T
Next *TimeoutItem[T]
}
// NewTimerWheel Builds a timer wheel and identifies the tick duration and wheel duration from the provided values
// Purge must be called once per entry to actually remove anything
// The TimerWheel does not handle concurrency on its own.
// Locks around access to it must be used if multiple routines are manipulating it.
func NewTimerWheel[T any](min, max time.Duration) *TimerWheel[T] {
//TODO provide an error
//if min >= max {
// return nil
//}
// Round down and add 2 so we can have the smallest # of ticks in the wheel and still account for a full
// max duration, even if our current tick is at the maximum position and the next item to be added is at maximum
// timeout
wLen := int((max / min) + 2)
tw := TimerWheel[T]{
wheelLen: wLen,
wheel: make([]*TimeoutList[T], wLen),
tickDuration: min,
wheelDuration: max,
expired: &TimeoutList[T]{},
}
for i := range tw.wheel {
tw.wheel[i] = &TimeoutList[T]{}
}
return &tw
}
// NewLockingTimerWheel is version of TimerWheel that is safe for concurrent use with a small performance penalty
func NewLockingTimerWheel[T any](min, max time.Duration) *LockingTimerWheel[T] {
return &LockingTimerWheel[T]{
t: NewTimerWheel[T](min, max),
}
}
// Add will add an item to the wheel in its proper timeout.
// Caller should Advance the wheel prior to ensure the proper slot is used.
func (tw *TimerWheel[T]) Add(v T, timeout time.Duration) *TimeoutItem[T] {
i := tw.findWheel(timeout)
// Try to fetch off the cache
ti := tw.itemCache
if ti != nil {
tw.itemCache = ti.Next
tw.itemsCached--
ti.Next = nil
} else {
ti = &TimeoutItem[T]{}
}
// Relink and return
ti.Item = v
if tw.wheel[i].Tail == nil {
tw.wheel[i].Head = ti
tw.wheel[i].Tail = ti
} else {
tw.wheel[i].Tail.Next = ti
tw.wheel[i].Tail = ti
}
return ti
}
// Purge removes and returns the first available expired item from the wheel and the 2nd argument is true.
// If no item is available then an empty T is returned and the 2nd argument is false.
func (tw *TimerWheel[T]) Purge() (T, bool) {
if tw.expired.Head == nil {
var na T
return na, false
}
ti := tw.expired.Head
tw.expired.Head = ti.Next
if tw.expired.Head == nil {
tw.expired.Tail = nil
}
// Clear out the items references
ti.Next = nil
// Maybe cache it for later
if tw.itemsCached < timerCacheMax {
ti.Next = tw.itemCache
tw.itemCache = ti
tw.itemsCached++
}
return ti.Item, true
}
// findWheel find the next position in the wheel for the provided timeout given the current tick
func (tw *TimerWheel[T]) findWheel(timeout time.Duration) (i int) {
if timeout < tw.tickDuration {
// Can't track anything below the set resolution
timeout = tw.tickDuration
} else if timeout > tw.wheelDuration {
// We aren't handling timeouts greater than the wheels duration
timeout = tw.wheelDuration
}
// Find the next highest, rounding up
tick := int(((timeout - 1) / tw.tickDuration) + 1)
// Add another tick since the current tick may almost be over then map it to the wheel from our
// current position
tick += tw.current + 1
if tick >= tw.wheelLen {
tick -= tw.wheelLen
}
return tick
}
// Advance will move the wheel forward by the appropriate number of ticks for the provided time and all items
// passed over will be moved to the expired list. Calling Purge is necessary to remove them entirely.
func (tw *TimerWheel[T]) Advance(now time.Time) {
if tw.lastTick == nil {
tw.lastTick = &now
}
// We want to round down
ticks := int(now.Sub(*tw.lastTick) / tw.tickDuration)
adv := ticks
if ticks > tw.wheelLen {
ticks = tw.wheelLen
}
for i := 0; i < ticks; i++ {
tw.current++
if tw.current >= tw.wheelLen {
tw.current = 0
}
if tw.wheel[tw.current].Head != nil {
// We need to append the expired items as to not starve evicting the oldest ones
if tw.expired.Tail == nil {
tw.expired.Head = tw.wheel[tw.current].Head
tw.expired.Tail = tw.wheel[tw.current].Tail
} else {
tw.expired.Tail.Next = tw.wheel[tw.current].Head
tw.expired.Tail = tw.wheel[tw.current].Tail
}
tw.wheel[tw.current].Head = nil
tw.wheel[tw.current].Tail = nil
}
}
// Advance the tick based on duration to avoid losing some accuracy
newTick := tw.lastTick.Add(tw.tickDuration * time.Duration(adv))
tw.lastTick = &newTick
}
func (lw *LockingTimerWheel[T]) Add(v T, timeout time.Duration) *TimeoutItem[T] {
lw.m.Lock()
defer lw.m.Unlock()
return lw.t.Add(v, timeout)
}
func (lw *LockingTimerWheel[T]) Purge() (T, bool) {
lw.m.Lock()
defer lw.m.Unlock()
return lw.t.Purge()
}
func (lw *LockingTimerWheel[T]) Advance(now time.Time) {
lw.m.Lock()
defer lw.m.Unlock()
lw.t.Advance(now)
}