diff --git a/src/twitch/twitch.go b/src/twitch/twitch.go index 205c2ee0..ea0338a6 100644 --- a/src/twitch/twitch.go +++ b/src/twitch/twitch.go @@ -92,20 +92,18 @@ func MonitorTwitchSubscriptions(ctx context.Context, dbConn *pgxpool.Pool) jobs. syncWithTwitch(ctx, dbConn, false) } else { // NOTE(asaf): The twitch API (getStreamStatus) lags behind the notification and - // would return old data if we called it immediately, so we have to - // wait a bit before we process the notification. We can get the + // would return old data if we called it immediately, so we process + // the notification to the extent we can, and later do a full update. We can get the // category from the notification, but not the tags (or the up-to-date title), // so we can't really skip this. var timer *time.Timer t := time.AfterFunc(3*time.Minute, func() { expiredTimers <- timer - processEventSubNotification(ctx, dbConn, ¬ification, true) + updateStreamStatus(ctx, dbConn, notification.Status.TwitchID, notification.Status.TwitchLogin) }) timer = t timers = append(timers, t) - // NOTE(asaf): We also run this immediately. Since we get the category in the notification - // we could show the stream as online if it's the right category. - processEventSubNotification(ctx, dbConn, ¬ification, false) + processEventSubNotification(ctx, dbConn, ¬ification) } } } @@ -158,7 +156,7 @@ func QueueTwitchNotification(messageType string, body []byte) error { notification.Type = notificationTypeOffline case "channel.update": notification.Type = notificationTypeChannelUpdate - notification.Status.Live = true + // NOTE(asaf): Can't tell if the user is live here. default: return oops.New(nil, "unknown subscription type received") } @@ -417,47 +415,103 @@ func notifyDiscordOfLiveStream(ctx context.Context, dbConn db.ConnOrTx) error { return nil } -func processEventSubNotification(ctx context.Context, dbConn db.ConnOrTx, notification *twitchNotification, delayed bool) { +func updateStreamStatus(ctx context.Context, dbConn db.ConnOrTx, twitchID string, twitchLogin string) { + log := logging.ExtractLogger(ctx) + log.Debug().Str("TwitchID", twitchID).Msg("Updating stream status") + var err error + + // NOTE(asaf): Verifying that the streamer we're processing hasn't been removed from our db in the meantime. + foundStreamer := false + allStreamers, err := hmndata.FetchTwitchStreamers(ctx, dbConn) + if err != nil { + log.Error().Err(err).Msg("failed to fetch hmn streamers") + return + } + for _, streamer := range allStreamers { + if streamer.TwitchLogin == twitchLogin { + foundStreamer = true + break + } + } + if !foundStreamer { + return + } + + status := streamStatus{ + TwitchID: twitchID, + Live: false, + } + + result, err := getStreamStatus(ctx, []string{twitchID}) + if err != nil { + log.Error().Str("TwitchID", twitchID).Err(err).Msg("failed to fetch stream status") + return + } + if len(result) > 0 { + log.Debug().Interface("Got status", result[0]).Msg("Got streamer status from twitch") + status = result[0] + } + err = updateStreamStatusInDB(ctx, dbConn, &status) + if err != nil { + log.Error().Err(err).Msg("failed to update twitch stream status") + return + } + + log.Debug().Msg("Notifying discord") + err = notifyDiscordOfLiveStream(ctx, dbConn) + if err != nil { + log.Error().Err(err).Msg("failed to notify discord") + } +} + +func processEventSubNotification(ctx context.Context, dbConn db.ConnOrTx, notification *twitchNotification) { log := logging.ExtractLogger(ctx) log.Debug().Interface("Notification", notification).Msg("Processing twitch notification") if notification.Type == notificationTypeNone { return } - status := streamStatus{ - TwitchID: notification.Status.TwitchID, - Live: false, + // NOTE(asaf): Verifying that the streamer we're processing hasn't been removed from our db in the meantime. + foundStreamer := false + allStreamers, err := hmndata.FetchTwitchStreamers(ctx, dbConn) + if err != nil { + log.Error().Err(err).Msg("failed to fetch hmn streamers") + return } - var err error - if notification.Type == notificationTypeChannelUpdate || notification.Type == notificationTypeOnline { - result, err := getStreamStatus(ctx, []string{notification.Status.TwitchID}) - if err != nil || len(result) == 0 { - log.Error().Str("TwitchID", notification.Status.TwitchID).Err(err).Msg("failed to fetch stream status") - return - } - log.Debug().Interface("Got status", result[0]).Msg("Got streamer status for notification") - // NOTE(asaf): Verifying that the streamer we're processing hasn't been removed from our db in the meantime. - allStreamers, err := hmndata.FetchTwitchStreamers(ctx, dbConn) - if err != nil { - log.Error().Err(err).Msg("failed to fetch hmn streamers") - return - } - for _, streamer := range allStreamers { - if streamer.TwitchLogin == result[0].TwitchLogin { - if delayed || (result[0].Live == notification.Status.Live) { - status = result[0] - } else { - status = notification.Status - } - break - } + for _, streamer := range allStreamers { + if streamer.TwitchLogin == notification.Status.TwitchLogin { + foundStreamer = true + break } } + if !foundStreamer { + return + } - log.Debug().Interface("Status", status).Msg("Updating status") - err = updateStreamStatusInDB(ctx, dbConn, &status) - if err != nil { - log.Error().Err(err).Msg("failed to update twitch stream status") + if notification.Type == notificationTypeOnline || notification.Type == notificationTypeOffline { + log.Debug().Interface("Status", notification.Status).Msg("Updating status") + err = updateStreamStatusInDB(ctx, dbConn, ¬ification.Status) + if err != nil { + log.Error().Err(err).Msg("failed to update twitch stream status") + return + } + } else if notification.Type == notificationTypeChannelUpdate { + // NOTE(asaf): Channel updates can happen wether or not the streamer is live. + // So we just update the title if the user is live in our db, and we rely on the + // 3-minute delayed status update to verify live status and category/tag requirements. + _, err = dbConn.Exec(ctx, + ` + UPDATE twitch_stream + SET title = $1 + WHERE twitch_id = $2 + `, + notification.Status.Title, + notification.Status.TwitchID, + ) + if err != nil { + log.Error().Err(err).Msg("failed to update twitch stream status") + return + } } log.Debug().Msg("Notifying discord")