Hopefully fixed desync with twitch

This commit is contained in:
Asaf Gartner 2022-05-30 18:49:30 +03:00
parent ca614a781b
commit 9fcc2321ca
2 changed files with 36 additions and 23 deletions

View File

@ -18,8 +18,8 @@ import (
) )
type twitchNotification struct { type twitchNotification struct {
TwitchID string Status streamStatus
Type twitchNotificationType Type twitchNotificationType
} }
var twitchNotificationChannel chan twitchNotification var twitchNotificationChannel chan twitchNotification
@ -91,22 +91,21 @@ func MonitorTwitchSubscriptions(ctx context.Context, dbConn *pgxpool.Pool) jobs.
if notification.Type == notificationTypeRevocation { if notification.Type == notificationTypeRevocation {
syncWithTwitch(ctx, dbConn, false) syncWithTwitch(ctx, dbConn, false)
} else { } else {
if notification.Type == notificationTypeChannelUpdate { // NOTE(asaf): The twitch API (getStreamStatus) lags behind the notification and
// NOTE(asaf): The twitch API (getStreamStatus) lags behind the notification and // would return old data if we called it immediately, so we have to
// would return old data if we called it immediately, so we have to // wait a bit before we process the notification. We can get the
// wait a bit before we process the notification. We can get the // category from the notification, but not the tags (or the up-to-date title),
// category from the notification, but not the tags (or the up-to-date title), // so we can't really skip this.
// so we can't really skip this. var timer *time.Timer
var timer *time.Timer t := time.AfterFunc(3*time.Minute, func() {
t := time.AfterFunc(3*time.Minute, func() { expiredTimers <- timer
expiredTimers <- timer
processEventSubNotification(ctx, dbConn, &notification)
})
timer = t
timers = append(timers, t)
} else {
processEventSubNotification(ctx, dbConn, &notification) processEventSubNotification(ctx, dbConn, &notification)
} })
timer = t
timers = append(timers, t)
// NOTE(asaf): We also run this immediately. Since we get the category in the notification
// we could show the stream as online if it's the right category.
processEventSubNotification(ctx, dbConn, &notification)
} }
} }
} }
@ -136,6 +135,8 @@ func QueueTwitchNotification(messageType string, body []byte) error {
Event struct { Event struct {
BroadcasterUserID string `json:"broadcaster_user_id"` BroadcasterUserID string `json:"broadcaster_user_id"`
BroadcasterUserLogin string `json:"broadcaster_user_login"` BroadcasterUserLogin string `json:"broadcaster_user_login"`
Title string `json:"title"`
CategoryID string `json:"category_id"`
} `json:"event"` } `json:"event"`
} }
var incoming notificationJson var incoming notificationJson
@ -144,14 +145,20 @@ func QueueTwitchNotification(messageType string, body []byte) error {
return oops.New(err, "failed to parse notification body") return oops.New(err, "failed to parse notification body")
} }
notification.TwitchID = incoming.Event.BroadcasterUserID notification.Status.TwitchID = incoming.Event.BroadcasterUserID
notification.Status.TwitchLogin = incoming.Event.BroadcasterUserLogin
notification.Status.Title = incoming.Event.Title
notification.Status.Category = incoming.Event.CategoryID
notification.Status.StartedAt = time.Now()
switch incoming.Subscription.Type { switch incoming.Subscription.Type {
case "stream.online": case "stream.online":
notification.Type = notificationTypeOnline notification.Type = notificationTypeOnline
notification.Status.Live = true
case "stream.offline": case "stream.offline":
notification.Type = notificationTypeOffline notification.Type = notificationTypeOffline
case "channel.update": case "channel.update":
notification.Type = notificationTypeChannelUpdate notification.Type = notificationTypeChannelUpdate
notification.Status.Live = true
default: default:
return oops.New(nil, "unknown subscription type received") return oops.New(nil, "unknown subscription type received")
} }
@ -418,16 +425,18 @@ func processEventSubNotification(ctx context.Context, dbConn db.ConnOrTx, notifi
} }
status := streamStatus{ status := streamStatus{
TwitchID: notification.TwitchID, TwitchID: notification.Status.TwitchID,
Live: false, Live: false,
} }
var err error var err error
if notification.Type == notificationTypeChannelUpdate || notification.Type == notificationTypeOnline { if notification.Type == notificationTypeChannelUpdate || notification.Type == notificationTypeOnline {
result, err := getStreamStatus(ctx, []string{notification.TwitchID}) result, err := getStreamStatus(ctx, []string{notification.Status.TwitchID})
if err != nil || len(result) == 0 { if err != nil || len(result) == 0 {
log.Error().Str("TwitchID", notification.TwitchID).Err(err).Msg("failed to fetch stream status") log.Error().Str("TwitchID", notification.Status.TwitchID).Err(err).Msg("failed to fetch stream status")
return return
} }
log.Debug().Interface("Got status", result[0]).Msg("Got streamer status for notification")
// NOTE(asaf): Verifying that the streamer we're processing hasn't been removed from our db in the meantime.
allStreamers, err := hmndata.FetchTwitchStreamers(ctx, dbConn) allStreamers, err := hmndata.FetchTwitchStreamers(ctx, dbConn)
if err != nil { if err != nil {
log.Error().Err(err).Msg("failed to fetch hmn streamers") log.Error().Err(err).Msg("failed to fetch hmn streamers")
@ -435,7 +444,11 @@ func processEventSubNotification(ctx context.Context, dbConn db.ConnOrTx, notifi
} }
for _, streamer := range allStreamers { for _, streamer := range allStreamers {
if streamer.TwitchLogin == result[0].TwitchLogin { if streamer.TwitchLogin == result[0].TwitchLogin {
status = result[0] if result[0].Live == notification.Status.Live {
status = result[0]
} else {
status = notification.Status
}
break break
} }
} }

View File

@ -74,7 +74,7 @@ func TwitchDebugPage(c *RequestContext) ResponseData {
` `
SELECT $columns SELECT $columns
FROM FROM
twitch_streams twitch_stream
ORDER BY started_at DESC ORDER BY started_at DESC
`, `,
) )