Timers timers timers

This commit is contained in:
Asaf Gartner 2022-03-27 20:30:24 +03:00
parent 70cd2ec72b
commit 8951bf1aa5
1 changed files with 43 additions and 10 deletions

View File

@ -56,21 +56,59 @@ func MonitorTwitchSubscriptions(ctx context.Context, dbConn *pgxpool.Pool) <-cha
monitorTicker := time.NewTicker(2 * time.Hour) monitorTicker := time.NewTicker(2 * time.Hour)
firstRunChannel := make(chan struct{}, 1) firstRunChannel := make(chan struct{}, 1)
firstRunChannel <- struct{}{} firstRunChannel <- struct{}{}
timers := make([]*time.Timer, 0)
expiredTimers := make(chan *time.Timer, 10)
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
for _, timer := range timers {
timer.Stop()
}
return return
case expired := <-expiredTimers:
for idx, timer := range timers {
if timer == expired {
timers = append(timers[:idx], timers[idx+1:]...)
break
}
}
case <-firstRunChannel: case <-firstRunChannel:
syncWithTwitch(ctx, dbConn, true) syncWithTwitch(ctx, dbConn, true)
case <-monitorTicker.C: case <-monitorTicker.C:
syncWithTwitch(ctx, dbConn, true) syncWithTwitch(ctx, dbConn, true)
case <-linksChangedChannel: case <-linksChangedChannel:
syncWithTwitch(ctx, dbConn, false) // NOTE(asaf): Since we update links inside transactions for users/projects
// we won't see the updated list of links until the transaction is committed.
// Waiting 5 seconds is just a quick workaround for that. It's not
// convenient to only trigger this after the transaction is committed.
var timer *time.Timer
t := time.AfterFunc(5*time.Second, func() {
expiredTimers <- timer
syncWithTwitch(ctx, dbConn, false)
})
timer = t
timers = append(timers, t)
case notification := <-twitchNotificationChannel: case notification := <-twitchNotificationChannel:
if notification.Type == notificationTypeRevocation { if notification.Type == notificationTypeRevocation {
syncWithTwitch(ctx, dbConn, false) syncWithTwitch(ctx, dbConn, false)
} else { } else {
processEventSubNotification(ctx, dbConn, &notification) if notification.Type == notificationTypeChannelUpdate {
// NOTE(asaf): The twitch API (getStreamStatus) lags behind the notification and
// would return old data if we called it immediately, so we have to
// wait a bit before we process the notification. We can get the
// category from the notification, but not the tags (or the up-to-date title),
// so we can't really skip this.
var timer *time.Timer
t := time.AfterFunc(3*time.Minute, func() {
expiredTimers <- timer
processEventSubNotification(ctx, dbConn, &notification)
})
timer = t
timers = append(timers, t)
} else {
processEventSubNotification(ctx, dbConn, &notification)
}
} }
} }
} }
@ -144,14 +182,9 @@ func UserOrProjectLinksUpdated(twitchLoginsPreChange, twitchLoginsPostChange []s
} }
} }
} }
if twitchChanged { select {
// NOTE(asaf): Since we update links inside transactions for users/projects case linksChangedChannel <- struct{}{}:
// we won't see the updated list of links until the transaction is committed. default:
// Waiting 10 seconds is just a quick workaround for that. It's not
// convenient to only trigger this after the transaction is committed.
time.AfterFunc(10*time.Second, func() {
linksChangedChannel <- struct{}{}
})
} }
} }
} }