From 9fcc2321cabad889cbf278f5e22576c35830227b Mon Sep 17 00:00:00 2001 From: Asaf Gartner Date: Mon, 30 May 2022 18:49:30 +0300 Subject: [PATCH] Hopefully fixed desync with twitch --- src/twitch/twitch.go | 57 ++++++++++++++++++++++++++----------------- src/website/twitch.go | 2 +- 2 files changed, 36 insertions(+), 23 deletions(-) diff --git a/src/twitch/twitch.go b/src/twitch/twitch.go index 87131173..a65c8aca 100644 --- a/src/twitch/twitch.go +++ b/src/twitch/twitch.go @@ -18,8 +18,8 @@ import ( ) type twitchNotification struct { - TwitchID string - Type twitchNotificationType + Status streamStatus + Type twitchNotificationType } var twitchNotificationChannel chan twitchNotification @@ -91,22 +91,21 @@ func MonitorTwitchSubscriptions(ctx context.Context, dbConn *pgxpool.Pool) jobs. if notification.Type == notificationTypeRevocation { syncWithTwitch(ctx, dbConn, false) } else { - if notification.Type == notificationTypeChannelUpdate { - // NOTE(asaf): The twitch API (getStreamStatus) lags behind the notification and - // would return old data if we called it immediately, so we have to - // wait a bit before we process the notification. We can get the - // category from the notification, but not the tags (or the up-to-date title), - // so we can't really skip this. - var timer *time.Timer - t := time.AfterFunc(3*time.Minute, func() { - expiredTimers <- timer - processEventSubNotification(ctx, dbConn, ¬ification) - }) - timer = t - timers = append(timers, t) - } else { + // NOTE(asaf): The twitch API (getStreamStatus) lags behind the notification and + // would return old data if we called it immediately, so we have to + // wait a bit before we process the notification. We can get the + // category from the notification, but not the tags (or the up-to-date title), + // so we can't really skip this. + var timer *time.Timer + t := time.AfterFunc(3*time.Minute, func() { + expiredTimers <- timer processEventSubNotification(ctx, dbConn, ¬ification) - } + }) + timer = t + timers = append(timers, t) + // NOTE(asaf): We also run this immediately. Since we get the category in the notification + // we could show the stream as online if it's the right category. + processEventSubNotification(ctx, dbConn, ¬ification) } } } @@ -136,6 +135,8 @@ func QueueTwitchNotification(messageType string, body []byte) error { Event struct { BroadcasterUserID string `json:"broadcaster_user_id"` BroadcasterUserLogin string `json:"broadcaster_user_login"` + Title string `json:"title"` + CategoryID string `json:"category_id"` } `json:"event"` } var incoming notificationJson @@ -144,14 +145,20 @@ func QueueTwitchNotification(messageType string, body []byte) error { return oops.New(err, "failed to parse notification body") } - notification.TwitchID = incoming.Event.BroadcasterUserID + notification.Status.TwitchID = incoming.Event.BroadcasterUserID + notification.Status.TwitchLogin = incoming.Event.BroadcasterUserLogin + notification.Status.Title = incoming.Event.Title + notification.Status.Category = incoming.Event.CategoryID + notification.Status.StartedAt = time.Now() switch incoming.Subscription.Type { case "stream.online": notification.Type = notificationTypeOnline + notification.Status.Live = true case "stream.offline": notification.Type = notificationTypeOffline case "channel.update": notification.Type = notificationTypeChannelUpdate + notification.Status.Live = true default: return oops.New(nil, "unknown subscription type received") } @@ -418,16 +425,18 @@ func processEventSubNotification(ctx context.Context, dbConn db.ConnOrTx, notifi } status := streamStatus{ - TwitchID: notification.TwitchID, + TwitchID: notification.Status.TwitchID, Live: false, } var err error if notification.Type == notificationTypeChannelUpdate || notification.Type == notificationTypeOnline { - result, err := getStreamStatus(ctx, []string{notification.TwitchID}) + result, err := getStreamStatus(ctx, []string{notification.Status.TwitchID}) if err != nil || len(result) == 0 { - log.Error().Str("TwitchID", notification.TwitchID).Err(err).Msg("failed to fetch stream status") + log.Error().Str("TwitchID", notification.Status.TwitchID).Err(err).Msg("failed to fetch stream status") return } + log.Debug().Interface("Got status", result[0]).Msg("Got streamer status for notification") + // NOTE(asaf): Verifying that the streamer we're processing hasn't been removed from our db in the meantime. allStreamers, err := hmndata.FetchTwitchStreamers(ctx, dbConn) if err != nil { log.Error().Err(err).Msg("failed to fetch hmn streamers") @@ -435,7 +444,11 @@ func processEventSubNotification(ctx context.Context, dbConn db.ConnOrTx, notifi } for _, streamer := range allStreamers { if streamer.TwitchLogin == result[0].TwitchLogin { - status = result[0] + if result[0].Live == notification.Status.Live { + status = result[0] + } else { + status = notification.Status + } break } } diff --git a/src/website/twitch.go b/src/website/twitch.go index eda30723..cd8ca7da 100644 --- a/src/website/twitch.go +++ b/src/website/twitch.go @@ -74,7 +74,7 @@ func TwitchDebugPage(c *RequestContext) ResponseData { ` SELECT $columns FROM - twitch_streams + twitch_stream ORDER BY started_at DESC `, )