diff --git a/src/twitch/twitch.go b/src/twitch/twitch.go index e641a63..99279cb 100644 --- a/src/twitch/twitch.go +++ b/src/twitch/twitch.go @@ -56,21 +56,59 @@ func MonitorTwitchSubscriptions(ctx context.Context, dbConn *pgxpool.Pool) <-cha monitorTicker := time.NewTicker(2 * time.Hour) firstRunChannel := make(chan struct{}, 1) firstRunChannel <- struct{}{} + + timers := make([]*time.Timer, 0) + expiredTimers := make(chan *time.Timer, 10) for { select { case <-ctx.Done(): + for _, timer := range timers { + timer.Stop() + } return + case expired := <-expiredTimers: + for idx, timer := range timers { + if timer == expired { + timers = append(timers[:idx], timers[idx+1:]...) + break + } + } case <-firstRunChannel: syncWithTwitch(ctx, dbConn, true) case <-monitorTicker.C: syncWithTwitch(ctx, dbConn, true) case <-linksChangedChannel: - syncWithTwitch(ctx, dbConn, false) + // NOTE(asaf): Since we update links inside transactions for users/projects + // we won't see the updated list of links until the transaction is committed. + // Waiting 5 seconds is just a quick workaround for that. It's not + // convenient to only trigger this after the transaction is committed. + var timer *time.Timer + t := time.AfterFunc(5*time.Second, func() { + expiredTimers <- timer + syncWithTwitch(ctx, dbConn, false) + }) + timer = t + timers = append(timers, t) case notification := <-twitchNotificationChannel: if notification.Type == notificationTypeRevocation { syncWithTwitch(ctx, dbConn, false) } else { - processEventSubNotification(ctx, dbConn, ¬ification) + if notification.Type == notificationTypeChannelUpdate { + // NOTE(asaf): The twitch API (getStreamStatus) lags behind the notification and + // would return old data if we called it immediately, so we have to + // wait a bit before we process the notification. We can get the + // category from the notification, but not the tags (or the up-to-date title), + // so we can't really skip this. + var timer *time.Timer + t := time.AfterFunc(3*time.Minute, func() { + expiredTimers <- timer + processEventSubNotification(ctx, dbConn, ¬ification) + }) + timer = t + timers = append(timers, t) + } else { + processEventSubNotification(ctx, dbConn, ¬ification) + } } } } @@ -144,14 +182,9 @@ func UserOrProjectLinksUpdated(twitchLoginsPreChange, twitchLoginsPostChange []s } } } - if twitchChanged { - // NOTE(asaf): Since we update links inside transactions for users/projects - // we won't see the updated list of links until the transaction is committed. - // Waiting 10 seconds is just a quick workaround for that. It's not - // convenient to only trigger this after the transaction is committed. - time.AfterFunc(10*time.Second, func() { - linksChangedChannel <- struct{}{} - }) + select { + case linksChangedChannel <- struct{}{}: + default: } } }