package twitch import ( "context" "encoding/json" "errors" "fmt" "time" "git.handmade.network/hmn/hmn/src/config" "git.handmade.network/hmn/hmn/src/db" "git.handmade.network/hmn/hmn/src/discord" "git.handmade.network/hmn/hmn/src/hmndata" "git.handmade.network/hmn/hmn/src/jobs" "git.handmade.network/hmn/hmn/src/logging" "git.handmade.network/hmn/hmn/src/models" "git.handmade.network/hmn/hmn/src/oops" "git.handmade.network/hmn/hmn/src/perf" "git.handmade.network/hmn/hmn/src/utils" "github.com/jackc/pgx/v5/pgxpool" ) // NOTE(asaf): The twitch api madness: // // | stream.online | stream.offline | channel.update[3] | REST[1][2] // id[4] | YES | NO | NO | YES // twitch_id | YES | YES | YES | YES // twitch_login | YES | YES | YES | YES // is_live | YES | IMPLICIT | NO | YES // started_at | YES | NO | NO | YES // title | NO | NO | YES | YES // cat_id | NO | NO | YES | YES // tags | NO | NO | NO | YES // // [1] REST returns nothing when user is not live // [2] Information received from REST is ~3 minutes old. // [3] channel.update also fires when the user changes their twitch channel settings when they're not live (i.e. as soon as they update it in twitch settings) // [4] ID of the current livestream type streamStatus struct { StreamID string TwitchID string TwitchLogin string Live bool Title string StartedAt time.Time CategoryID string Tags []string } type twitchNotificationType int const ( notificationTypeNone twitchNotificationType = 0 notificationTypeOnline = 1 notificationTypeOffline = 2 notificationTypeChannelUpdate = 3 notificationTypeRevocation = 4 ) type twitchNotification struct { Status streamStatus Type twitchNotificationType } var twitchNotificationChannel chan twitchNotification var linksChangedChannel chan struct{} func MonitorTwitchSubscriptions(ctx context.Context, dbConn *pgxpool.Pool) jobs.Job { log := logging.ExtractLogger(ctx).With().Str("twitch goroutine", "stream monitor").Logger() ctx = logging.AttachLoggerToContext(&log, ctx) if config.Config.Twitch.ClientID == "" { log.Warn().Msg("No twitch config provided.") return jobs.Noop() } twitchNotificationChannel = make(chan twitchNotification, 100) linksChangedChannel = make(chan struct{}, 10) job := jobs.New() go func() { defer func() { log.Info().Msg("Shutting down twitch monitor") job.Done() }() log.Info().Msg("Running twitch monitor...") monitorTicker := time.NewTicker(2 * time.Hour) firstRunChannel := make(chan struct{}, 1) firstRunChannel <- struct{}{} timers := make([]*time.Timer, 0) expiredTimers := make(chan *time.Timer, 10) for { done, err := func() (done bool, retErr error) { defer utils.RecoverPanicAsError(&retErr) select { case <-ctx.Done(): for _, timer := range timers { timer.Stop() } return true, nil case expired := <-expiredTimers: for idx, timer := range timers { if timer == expired { timers = append(timers[:idx], timers[idx+1:]...) break } } case <-firstRunChannel: err := refreshAccessToken(ctx) if err != nil { log.Error().Err(err).Msg("Failed to fetch refresh token on start") return true, nil } syncWithTwitch(ctx, dbConn, true, true) case <-monitorTicker.C: twitchLogClear(ctx, dbConn) syncWithTwitch(ctx, dbConn, true, true) case <-linksChangedChannel: // NOTE(asaf): Since we update links inside transactions for users/projects // we won't see the updated list of links until the transaction is committed. // Waiting 5 seconds is just a quick workaround for that. It's not // convenient to only trigger this after the transaction is committed. var timer *time.Timer t := time.AfterFunc(5*time.Second, func() { expiredTimers <- timer syncWithTwitch(ctx, dbConn, false, false) }) timer = t timers = append(timers, t) case notification := <-twitchNotificationChannel: if notification.Type == notificationTypeRevocation { syncWithTwitch(ctx, dbConn, false, false) } else { // NOTE(asaf): The twitch API (getStreamStatus) lags behind the notification and // would return old data if we called it immediately, so we process // the notification to the extent we can, and later do a full update. We can get the // category from the notification, but not the tags (or the up-to-date title), // so we can't really skip this. var timer *time.Timer t := time.AfterFunc(3*time.Minute, func() { expiredTimers <- timer updateStreamStatus(ctx, dbConn, notification.Status.TwitchID, notification.Status.TwitchLogin) }) timer = t timers = append(timers, t) processEventSubNotification(ctx, dbConn, ¬ification) } } return false, nil }() if err != nil { log.Error().Err(err).Msg("Panicked in MonitorTwitchSubscriptions") } else if done { return } } }() return job } func QueueTwitchNotification(ctx context.Context, conn db.ConnOrTx, messageType string, body []byte) error { var notification twitchNotification if messageType == "notification" { type notificationJson struct { Subscription struct { Type string `json:"type"` } `json:"subscription"` Event struct { StreamID string `json:"id"` BroadcasterUserID string `json:"broadcaster_user_id"` BroadcasterUserLogin string `json:"broadcaster_user_login"` Title string `json:"title"` CategoryID string `json:"category_id"` } `json:"event"` } var incoming notificationJson err := json.Unmarshal(body, &incoming) if err != nil { return oops.New(err, "failed to parse notification body") } twitchLog(ctx, conn, models.TwitchLogTypeHook, incoming.Event.BroadcasterUserLogin, "Got hook: "+incoming.Subscription.Type, string(body)) notification.Status.TwitchID = incoming.Event.BroadcasterUserID notification.Status.TwitchLogin = incoming.Event.BroadcasterUserLogin notification.Status.Title = incoming.Event.Title notification.Status.CategoryID = incoming.Event.CategoryID notification.Status.StartedAt = time.Now() switch incoming.Subscription.Type { case "stream.online": notification.Type = notificationTypeOnline notification.Status.Live = true notification.Status.StreamID = incoming.Event.StreamID case "stream.offline": notification.Type = notificationTypeOffline case "channel.update": notification.Type = notificationTypeChannelUpdate // NOTE(asaf): Can't tell if the user is live here. default: return oops.New(nil, "unknown subscription type received") } } else if messageType == "revocation" { twitchLog(ctx, conn, models.TwitchLogTypeHook, "", "Got hook: Revocation", string(body)) notification.Type = notificationTypeRevocation } if twitchNotificationChannel != nil && notification.Type != notificationTypeNone { select { case twitchNotificationChannel <- notification: default: return oops.New(nil, "twitch notification channel is full") } } return nil } func UserOrProjectLinksUpdated(twitchLoginsPreChange, twitchLoginsPostChange []string) { if linksChangedChannel != nil { twitchChanged := (len(twitchLoginsPreChange) != len(twitchLoginsPostChange)) if !twitchChanged { for idx, _ := range twitchLoginsPreChange { if twitchLoginsPreChange[idx] != twitchLoginsPostChange[idx] { twitchChanged = true break } } } select { case linksChangedChannel <- struct{}{}: default: } } } func syncWithTwitch(ctx context.Context, dbConn *pgxpool.Pool, updateAll bool, updateVODs bool) { log := logging.ExtractLogger(ctx) log.Info().Msg("Running twitch sync") p := perf.MakeNewRequestPerf("Background job", "", "syncWithTwitch") defer func() { p.EndRequest() perf.LogPerf(p, log.Info()) }() type twitchSyncStats struct { NumSubbed int NumUnsubbed int NumStreamsChecked int } var stats twitchSyncStats p.StartBlock("SQL", "Fetch list of streamers") streamers, err := hmndata.FetchTwitchStreamers(ctx, dbConn) if err != nil { log.Error().Err(err).Msg("Error while monitoring twitch") return } p.EndBlock() needID := make([]string, 0) streamerMap := make(map[string]*hmndata.TwitchStreamer) for idx, streamer := range streamers { needID = append(needID, streamer.TwitchLogin) streamerMap[streamer.TwitchLogin] = &streamers[idx] } twitchUsers := []twitchUser{} if len(needID) > 0 { p.StartBlock("TwitchAPI", "Fetch twitch user info") log.Debug().Interface("needID", needID).Msg("IDs") twitchUsers, err = getTwitchUsersByLogin(ctx, needID) if err != nil { log.Error().Err(err).Msg("Error while monitoring twitch") return } p.EndBlock() } for _, tu := range twitchUsers { streamerMap[tu.TwitchLogin].TwitchID = tu.TwitchID } validStreamers := make([]hmndata.TwitchStreamer, 0, len(streamers)) for _, streamer := range streamers { if len(streamer.TwitchID) > 0 { validStreamers = append(validStreamers, streamer) } } p.StartBlock("TwitchAPI", "Fetch event subscriptions") subscriptions, err := getEventSubscriptions(ctx) if err != nil { log.Error().Err(err).Msg("Error while monitoring twitch") return } p.EndBlock() type isSubbedByType map[string]bool streamerEventSubs := make(map[string]isSubbedByType) for _, streamer := range validStreamers { streamerEventSubs[streamer.TwitchID] = make(isSubbedByType) streamerEventSubs[streamer.TwitchID]["channel.update"] = false streamerEventSubs[streamer.TwitchID]["stream.online"] = false streamerEventSubs[streamer.TwitchID]["stream.offline"] = false } type unsubEvent struct { TwitchID string EventID string } toUnsub := make([]unsubEvent, 0) for _, sub := range subscriptions { handled := false if eventSubs, ok := streamerEventSubs[sub.TwitchID]; ok { if _, ok := eventSubs[sub.Type]; ok { // Make sure it's a known type if !sub.GoodStatus { log.Debug().Str("TwitchID", sub.TwitchID).Str("Event Type", sub.Type).Msg("Twitch doesn't like our sub") toUnsub = append(toUnsub, unsubEvent{TwitchID: sub.TwitchID, EventID: sub.EventID}) } else { streamerEventSubs[sub.TwitchID][sub.Type] = true } handled = true } } if !handled { // NOTE(asaf): Found an unknown type or an event subscription that we don't have a matching user for. // Make sure we unsubscribe. toUnsub = append(toUnsub, unsubEvent{TwitchID: sub.TwitchID, EventID: sub.EventID}) } } if config.Config.Env != config.Dev { // NOTE(asaf): Can't subscribe to events from dev. We need a non-localhost callback url. p.StartBlock("TwitchAPI", "Sync subscriptions with twitch") for _, ev := range toUnsub { err = unsubscribeFromEvent(ctx, ev.EventID) if err != nil { log.Error().Err(err).Msg("Error while unsubscribing events") // NOTE(asaf): Soft error. Don't care if it fails. } stats.NumUnsubbed += 1 } for twitchID, evStatuses := range streamerEventSubs { for evType, isSubbed := range evStatuses { if !isSubbed { err = subscribeToEvent(ctx, evType, twitchID) if err != nil { log.Error().Err(err).Msg("Error while monitoring twitch") return } stats.NumSubbed += 1 } } } p.EndBlock() } tx, err := dbConn.Begin(ctx) if err != nil { log.Error().Err(err).Msg("failed to start transaction") } defer tx.Rollback(ctx) allIDs := make([]string, 0, len(validStreamers)) for _, streamer := range validStreamers { allIDs = append(allIDs, streamer.TwitchID) } p.StartBlock("SQL", "Remove untracked streamers") _, err = tx.Exec(ctx, `DELETE FROM twitch_latest_status WHERE NOT (twitch_id = ANY($1))`, allIDs, ) if err != nil { log.Error().Err(err).Msg("Failed to remove untracked twitch ids from streamer list in db") return } p.EndBlock() usersToUpdate := make([]string, 0) if updateAll { usersToUpdate = allIDs } else { // NOTE(asaf): Twitch can revoke our subscriptions, so we need to // update users whose subs were revoked or missing since last time we checked. for twitchID, evStatuses := range streamerEventSubs { for _, isSubbed := range evStatuses { if !isSubbed { usersToUpdate = append(usersToUpdate, twitchID) break } } } } if len(usersToUpdate) > 0 { p.StartBlock("TwitchAPI", "Fetch twitch stream statuses") statuses, err := getStreamStatus(ctx, usersToUpdate) if err != nil { log.Error().Err(err).Msg("failed to fetch stream statuses") return } twitchLog(ctx, tx, models.TwitchLogTypeOther, "", "Batch resync", fmt.Sprintf("%#v", statuses)) p.EndBlock() p.StartBlock("SQL", "Update stream statuses in db") for _, twitchId := range usersToUpdate { var status *streamStatus for idx, st := range statuses { if st.TwitchID == twitchId { status = &statuses[idx] break } } if status == nil { twitchLogin := "" for _, streamer := range validStreamers { if streamer.TwitchID == twitchId { twitchLogin = streamer.TwitchLogin break } } status = &streamStatus{ TwitchID: twitchId, TwitchLogin: twitchLogin, } } twitchLog(ctx, tx, models.TwitchLogTypeREST, status.TwitchLogin, "Resync", fmt.Sprintf("%#v", status)) err = gotRESTUpdate(ctx, tx, status) if err != nil { log.Error().Err(err).Msg("failed to update twitch stream status") } } p.EndBlock() } err = tx.Commit(ctx) if err != nil { log.Error().Err(err).Msg("failed to commit transaction") } stats.NumStreamsChecked += len(usersToUpdate) log.Info().Interface("Stats", stats).Msg("Twitch sync done") if updateVODs { err = findMissingVODs(ctx, dbConn) if err != nil { log.Error().Err(err).Msg("failed to find missing twitch vods") } err = verifyHistoryVODs(ctx, dbConn) if err != nil { log.Error().Err(err).Msg("failed to verify twitch vods") } } log.Debug().Msg("Notifying discord") err = notifyDiscordOfLiveStream(ctx, dbConn) if err != nil { log.Error().Err(err).Msg("failed to notify discord") } } func notifyDiscordOfLiveStream(ctx context.Context, dbConn db.ConnOrTx) error { history, err := db.Query[models.TwitchStreamHistory](ctx, dbConn, ` SELECT $columns FROM twitch_stream_history WHERE discord_needs_update = TRUE ORDER BY started_at ASC `, ) if err != nil { return oops.New(err, "failed to fetch twitch history") } updatedHistories := make([]*models.TwitchStreamHistory, 0) for _, h := range history { relevant := isStreamRelevant(h.CategoryID, h.Tags) if relevant && h.StreamEnded { msgId, err := discord.PostStreamHistory(ctx, h) if err != nil { return oops.New(err, "failed to post twitch history to discord") } h.DiscordMessageID = msgId } h.DiscordNeedsUpdate = false updatedHistories = append(updatedHistories, h) } for _, h := range updatedHistories { _, err = dbConn.Exec(ctx, ` UPDATE twitch_stream_history SET discord_needs_update = $2, discord_message_id = $3 WHERE stream_id = $1 `, h.StreamID, h.DiscordNeedsUpdate, h.DiscordMessageID, ) if err != nil { return oops.New(err, "failed to update twitch history after posting to discord") } } streams, err := db.Query[models.TwitchLatestStatus](ctx, dbConn, ` SELECT $columns FROM twitch_latest_status WHERE live = TRUE ORDER BY started_at ASC `, ) if err != nil { return oops.New(err, "failed to fetch livestreams from db") } var streamDetails []hmndata.StreamDetails for _, s := range streams { if isStreamRelevant(s.CategoryID, s.Tags) { streamDetails = append(streamDetails, hmndata.StreamDetails{ Username: s.TwitchLogin, StartTime: s.StartedAt, Title: s.Title, }) } } err = discord.UpdateStreamers(ctx, dbConn, streamDetails) if err != nil { return oops.New(err, "failed to update discord with livestream info") } return nil } func updateStreamStatus(ctx context.Context, dbConn db.ConnOrTx, twitchID string, twitchLogin string) { log := logging.ExtractLogger(ctx) log.Debug().Str("TwitchID", twitchID).Msg("Updating stream status") var err error // NOTE(asaf): Verifying that the streamer we're processing hasn't been removed from our db in the meantime. foundStreamer := false allStreamers, err := hmndata.FetchTwitchStreamers(ctx, dbConn) if err != nil { log.Error().Err(err).Msg("failed to fetch hmn streamers") return } for _, streamer := range allStreamers { if streamer.TwitchLogin == twitchLogin { foundStreamer = true break } } if !foundStreamer { return } status := streamStatus{ TwitchID: twitchID, TwitchLogin: twitchLogin, Live: false, } result, err := getStreamStatus(ctx, []string{twitchID}) if err != nil { log.Error().Str("TwitchID", twitchID).Err(err).Msg("failed to fetch stream status") return } twitchLog(ctx, dbConn, models.TwitchLogTypeREST, twitchLogin, "Fetched status", fmt.Sprintf("%#v", result)) if len(result) > 0 { log.Debug().Interface("Got status", result[0]).Msg("Got streamer status from twitch") status = result[0] } err = gotRESTUpdate(ctx, dbConn, &status) if err != nil { log.Error().Err(err).Msg("failed to update twitch stream status") return } log.Debug().Msg("Notifying discord") err = notifyDiscordOfLiveStream(ctx, dbConn) if err != nil { log.Error().Err(err).Msg("failed to notify discord") } } func processEventSubNotification(ctx context.Context, dbConn db.ConnOrTx, notification *twitchNotification) { log := logging.ExtractLogger(ctx) log.Debug().Interface("Notification", notification).Msg("Processing twitch notification") if notification.Type == notificationTypeNone { return } // NOTE(asaf): Verifying that the streamer we're processing hasn't been removed from our db in the meantime. foundStreamer := false allStreamers, err := hmndata.FetchTwitchStreamers(ctx, dbConn) if err != nil { log.Error().Err(err).Msg("failed to fetch hmn streamers") return } for _, streamer := range allStreamers { if streamer.TwitchLogin == notification.Status.TwitchLogin { foundStreamer = true break } } if !foundStreamer { return } twitchLog(ctx, dbConn, models.TwitchLogTypeHook, notification.Status.TwitchLogin, "Processing hook", fmt.Sprintf("%#v", notification)) switch notification.Type { case notificationTypeOnline: err := gotStreamOnline(ctx, dbConn, ¬ification.Status) if err != nil { log.Error().Err(err).Msg("failed to update twitch stream status") } case notificationTypeOffline: err := gotStreamOffline(ctx, dbConn, ¬ification.Status) if err != nil { log.Error().Err(err).Msg("failed to update twitch stream status") } case notificationTypeChannelUpdate: err := gotChannelUpdate(ctx, dbConn, ¬ification.Status) if err != nil { log.Error().Err(err).Msg("failed to update twitch stream status") } } log.Debug().Msg("Notifying discord") err = notifyDiscordOfLiveStream(ctx, dbConn) if err != nil { log.Error().Err(err).Msg("failed to notify discord") } } func gotStreamOnline(ctx context.Context, conn db.ConnOrTx, status *streamStatus) error { latest, err := fetchLatestStreamStatus(ctx, conn, status.TwitchID, status.TwitchLogin) if err != nil { return err } twitchLog(ctx, conn, models.TwitchLogTypeOther, status.TwitchLogin, "GotStreamOnline", fmt.Sprintf("latest: %#v\nstatus: %#v", latest, status)) if latest.Live && latest.StreamID != status.StreamID { // NOTE(asaf): Update history for previous stream twitchLog(ctx, conn, models.TwitchLogTypeOther, status.TwitchLogin, "GotStreamOnline", fmt.Sprintf("Wrapping up previous stream")) latest.Live = false err = updateStreamHistory(ctx, conn, latest) if err != nil { return err } } latest.Live = true latest.StreamID = status.StreamID latest.StartedAt = status.StartedAt latest.LastHookLiveUpdate = time.Now() err = saveLatestStreamStatus(ctx, conn, latest) if err != nil { return err } err = updateStreamHistory(ctx, conn, latest) if err != nil { return err } return nil } func gotStreamOffline(ctx context.Context, conn db.ConnOrTx, status *streamStatus) error { latest, err := fetchLatestStreamStatus(ctx, conn, status.TwitchID, status.TwitchLogin) if err != nil { return err } twitchLog(ctx, conn, models.TwitchLogTypeOther, status.TwitchLogin, "GotStreamOffline", fmt.Sprintf("latest: %#v\nstatus: %#v", latest, status)) latest.Live = false latest.LastHookLiveUpdate = time.Now() err = saveLatestStreamStatus(ctx, conn, latest) if err != nil { return err } err = updateStreamHistory(ctx, conn, latest) if err != nil { return err } return nil } func gotChannelUpdate(ctx context.Context, conn db.ConnOrTx, status *streamStatus) error { latest, err := fetchLatestStreamStatus(ctx, conn, status.TwitchID, status.TwitchLogin) if err != nil { return err } twitchLog(ctx, conn, models.TwitchLogTypeOther, status.TwitchLogin, "GotChannelUpdate", fmt.Sprintf("latest: %#v\nstatus: %#v", latest, status)) if !latest.Live { // NOTE(asaf): If the stream is live, this channel update applies // to the current livestream. Otherwise, this will // only apply to the next stream, so we clear out // the stream info. latest.StreamID = "" latest.StartedAt = time.Time{} } latest.Title = status.Title if latest.CategoryID != status.CategoryID { latest.CategoryID = status.CategoryID latest.Tags = []string{} // NOTE(asaf): We don't get tags here, but we can't assume they didn't change because some tags are automatic based on category } latest.LastHookChannelUpdate = time.Now() err = saveLatestStreamStatus(ctx, conn, latest) if err != nil { return err } err = updateStreamHistory(ctx, conn, latest) if err != nil { return err } return nil } func gotRESTUpdate(ctx context.Context, conn db.ConnOrTx, status *streamStatus) error { latest, err := fetchLatestStreamStatus(ctx, conn, status.TwitchID, status.TwitchLogin) if err != nil { return err } twitchLog(ctx, conn, models.TwitchLogTypeOther, status.TwitchLogin, "GotRestUpdate", fmt.Sprintf("latest: %#v\nstatus: %#v", latest, status)) if latest.LastHookLiveUpdate.Add(3 * time.Minute).Before(time.Now()) { if status.Live { if latest.Live && status.StreamID != latest.StreamID { twitchLog(ctx, conn, models.TwitchLogTypeOther, status.TwitchLogin, "GotRestUpdate", fmt.Sprintf("Wrapping up previous stream")) latest.Live = false err = updateStreamHistory(ctx, conn, latest) if err != nil { return err } } // NOTE(asaf): We don't get this information if the user isn't live latest.StartedAt = status.StartedAt latest.StreamID = status.StreamID } latest.Live = status.Live } if latest.LastHookChannelUpdate.Add(3 * time.Minute).Before(time.Now()) { if status.Live { // NOTE(asaf): We don't get this information if the user isn't live latest.Title = status.Title latest.CategoryID = status.CategoryID latest.Tags = status.Tags } } latest.LastRESTUpdate = time.Now() err = saveLatestStreamStatus(ctx, conn, latest) if err != nil { return err } err = updateStreamHistory(ctx, conn, latest) if err != nil { return err } return nil } func fetchLatestStreamStatus(ctx context.Context, conn db.ConnOrTx, twitchID string, twitchLogin string) (*models.TwitchLatestStatus, error) { tx, err := conn.Begin(ctx) if err != nil { return nil, oops.New(err, "failed to begin transaction for stream status fetch") } defer tx.Rollback(ctx) result, err := db.QueryOne[models.TwitchLatestStatus](ctx, tx, ` SELECT $columns FROM twitch_latest_status WHERE twitch_id = $1 `, twitchID, ) if err == db.NotFound { twitchLog(ctx, tx, models.TwitchLogTypeOther, twitchLogin, "Creating new streamer", fmt.Sprintf("twitchID: %s", twitchID)) _, err = tx.Exec(ctx, ` INSERT INTO twitch_latest_status (twitch_id, twitch_login) VALUES ($1, $2) `, twitchID, twitchLogin, ) if err != nil { return nil, err } result = &models.TwitchLatestStatus{ TwitchID: twitchID, TwitchLogin: twitchLogin, } } else if err != nil { return nil, oops.New(err, "failed to fetch existing twitch status") } if result.TwitchLogin != twitchLogin { // NOTE(asaf): If someone changed their twitch login we should // still reuse their db record by twitch_id. _, err = tx.Exec(ctx, ` UPDATE twitch_latest_status SET twitch_login = $2 WHERE twitch_id = $1 `, twitchID, twitchLogin, ) if err != nil { return nil, oops.New(err, "failed to update twitch login") } result.TwitchLogin = twitchLogin } err = tx.Commit(ctx) if err != nil { return nil, oops.New(err, "failed to commit transaction") } return result, nil } func saveLatestStreamStatus(ctx context.Context, conn db.ConnOrTx, latest *models.TwitchLatestStatus) error { tx, err := conn.Begin(ctx) if err != nil { return oops.New(err, "failed to start transaction for stream status update") } defer tx.Rollback(ctx) // NOTE(asaf): Ensure that we have a record for it in the db _, err = fetchLatestStreamStatus(ctx, tx, latest.TwitchID, latest.TwitchLogin) if err != nil { return err } if latest.Tags == nil { latest.Tags = make([]string, 0) } _, err = tx.Exec(ctx, ` UPDATE twitch_latest_status SET live = $2, stream_id = $3, started_at = $4, title = $5, category_id = $6, tags = $7, last_hook_live_update = $8, last_hook_channel_update = $9, last_rest_update = $10 WHERE twitch_id = $1 `, latest.TwitchID, latest.Live, latest.StreamID, latest.StartedAt, latest.Title, latest.CategoryID, latest.Tags, latest.LastHookLiveUpdate, latest.LastHookChannelUpdate, latest.LastRESTUpdate, ) if err != nil { return oops.New(err, "failed to update twitch latest status") } err = tx.Commit(ctx) if err != nil { return oops.New(err, "failed to commit transaction") } return nil } func updateStreamHistory(ctx context.Context, dbConn db.ConnOrTx, status *models.TwitchLatestStatus) error { if status.StreamID == "" { twitchLog(ctx, dbConn, models.TwitchLogTypeOther, status.TwitchLogin, "updateStreamHistory", fmt.Sprintf("No StreamID - Skipping\nstatus: %#v", status)) return nil } tx, err := dbConn.Begin(ctx) if err != nil { return oops.New(err, "failed to begin transaction for stream history update") } defer tx.Rollback(ctx) history, err := db.QueryOne[models.TwitchStreamHistory](ctx, tx, ` SELECT $columns FROM twitch_stream_history WHERE stream_id = $1 `, status.StreamID, ) if err == db.NotFound { twitchLog(ctx, dbConn, models.TwitchLogTypeOther, status.TwitchLogin, "updateStreamHistory", fmt.Sprintf("Creating new history\nstatus: %#v", status)) history = &models.TwitchStreamHistory{} history.StreamID = status.StreamID history.TwitchID = status.TwitchID history.TwitchLogin = status.TwitchLogin history.StartedAt = status.StartedAt history.DiscordNeedsUpdate = true } else if err != nil { return oops.New(err, "failed to fetch existing stream history") } if !status.Live && !history.StreamEnded { twitchLog(ctx, dbConn, models.TwitchLogTypeOther, status.TwitchLogin, "updateStreamHistory", fmt.Sprintf("Setting end time\nstatus: %#v", status)) history.EndedAt = time.Now() history.StreamEnded = true history.EndApproximated = true history.DiscordNeedsUpdate = true } history.Title = status.Title history.CategoryID = status.CategoryID history.Tags = status.Tags if history.Tags == nil { history.Tags = make([]string, 0) } _, err = tx.Exec(ctx, ` INSERT INTO twitch_stream_history (stream_id, twitch_id, twitch_login, started_at, stream_ended, ended_at, end_approximated, title, category_id, tags, discord_needs_update) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) ON CONFLICT (stream_id) DO UPDATE SET stream_ended = EXCLUDED.stream_ended, ended_at = EXCLUDED.ended_at, end_approximated = EXCLUDED.end_approximated, title = EXCLUDED.title, category_id = EXCLUDED.category_id, tags = EXCLUDED.tags, discord_needs_update = EXCLUDED.discord_needs_update `, history.StreamID, history.TwitchID, history.TwitchLogin, history.StartedAt, history.StreamEnded, history.EndedAt, history.EndApproximated, history.Title, history.CategoryID, history.Tags, history.DiscordNeedsUpdate, ) if err != nil { return oops.New(err, "failed to insert/update twitch history") } err = tx.Commit(ctx) if err != nil { return oops.New(err, "failed to commit transaction") } twitchLog(ctx, dbConn, models.TwitchLogTypeOther, status.TwitchLogin, "updateStreamHistory", fmt.Sprintf("Checking VOD\nhistory: %#v", history)) err = findHistoryVOD(ctx, dbConn, history) if err != nil { return oops.New(err, "failed to look up twitch vod") } return nil } func findHistoryVOD(ctx context.Context, dbConn db.ConnOrTx, history *models.TwitchStreamHistory) error { if history.StreamID == "" || history.VODGone { twitchLog(ctx, dbConn, models.TwitchLogTypeOther, history.TwitchLogin, "findHistoryVOD", fmt.Sprintf("Skipping VOD check\nhistory: %#v", history)) return nil } vods, err := getArchivedVideosForUser(ctx, history.TwitchID, 10) twitchLog(ctx, dbConn, models.TwitchLogTypeOther, history.TwitchLogin, "findHistoryVOD", fmt.Sprintf("vods: %#v\nhistory: %#v", vods, history)) if err != nil { return oops.New(err, "failed to fetch vods for streamer") } var vod *archivedVideo for idx, v := range vods { if v.StreamID == history.StreamID { vod = &vods[idx] } } if vod != nil { history.VODID = vod.ID history.VODUrl = vod.VODUrl history.VODThumbnail = vod.VODThumbnail history.LastVerifiedVOD = time.Now() history.VODGone = false history.DiscordNeedsUpdate = true if history.StreamEnded && vod.Duration.Minutes() > 0 { history.EndedAt = history.StartedAt.Add(vod.Duration) history.EndApproximated = false } _, err = dbConn.Exec(ctx, ` UPDATE twitch_stream_history SET vod_id = $2, vod_url = $3, vod_thumbnail = $4, last_verified_vod = $5, vod_gone = $6, ended_at = $7, end_approximated = $8, discord_needs_update = $9 WHERE stream_id = $1 `, history.StreamID, history.VODID, history.VODUrl, history.VODThumbnail, history.LastVerifiedVOD, history.VODGone, history.EndedAt, history.EndApproximated, history.DiscordNeedsUpdate, ) if err != nil { return oops.New(err, "failed to update stream history with VOD") } } else { if history.StartedAt.Add(14 * 24 * time.Hour).Before(time.Now()) { history.VODGone = true _, err = dbConn.Exec(ctx, ` UPDATE twitch_stream_history SET vod_gone = $2, last_verified_vod = $3 WHERE stream_id = $1 `, history.StreamID, history.VODGone, time.Now(), ) if err != nil { return oops.New(err, "failed to update stream history") } } else { _, err = dbConn.Exec(ctx, ` UPDATE twitch_stream_history SET last_verified_vod = $2 WHERE stream_id = $1 `, history.StreamID, time.Now(), ) if err != nil { return oops.New(err, "failed to update stream history") } } } return nil } func findMissingVODs(ctx context.Context, dbConn db.ConnOrTx) error { histories, err := db.Query[models.TwitchStreamHistory](ctx, dbConn, ` SELECT $columns FROM twitch_stream_history WHERE vod_gone = FALSE AND stream_ended = TRUE AND (end_approximated = TRUE OR vod_id = '') ORDER BY last_verified_vod ASC LIMIT 100 `, ) if err != nil { return oops.New(err, "failed to fetch stream history for vod updates") } for _, history := range histories { err = findHistoryVOD(ctx, dbConn, history) if err != nil { return err } } return nil } func verifyHistoryVODs(ctx context.Context, dbConn db.ConnOrTx) error { histories, err := db.Query[models.TwitchStreamHistory](ctx, dbConn, ` SELECT $columns FROM twitch_stream_history WHERE vod_gone = FALSE AND vod_id != '' AND last_verified_vod < $1 ORDER BY last_verified_vod ASC LIMIT 100 `, time.Now().Add(-24*time.Hour), ) if err != nil { return oops.New(err, "failed to fetch stream history for vod verification") } videoIDs := make([]string, 0, len(histories)) for _, h := range histories { videoIDs = append(videoIDs, h.VODID) } if len(videoIDs) > 0 { VODs, err := getArchivedVideos(ctx, videoIDs) if err != nil && !errors.Is(err, twitchNotFound) { return oops.New(err, "failed to fetch vods from twitch") } vodGone := make([]string, 0, len(histories)) vodFound := make([]string, 0, len(histories)) for _, h := range histories { found := false for _, vod := range VODs { if h.VODID == vod.ID { found = true break } } if !found { vodGone = append(vodGone, h.VODID) } else { vodFound = append(vodFound, h.VODID) } } if len(vodGone) > 0 { _, err = dbConn.Exec(ctx, ` UPDATE twitch_stream_history SET discord_needs_update = TRUE, vod_id = '', vod_url = '', vod_thumbnail = '', last_verified_vod = $2, vod_gone = TRUE WHERE vod_id = ANY($1) `, vodGone, time.Now(), ) if err != nil { return oops.New(err, "failed to update twitch history") } } if len(vodFound) > 0 { _, err = dbConn.Exec(ctx, ` UPDATE twitch_stream_history SET last_verified_vod = $2 WHERE vod_id = ANY($1) `, vodFound, time.Now(), ) if err != nil { return oops.New(err, "failed to update twitch history") } } } return nil } var RelevantCategories = []string{ "1469308723", // Software and Game Development } /* NOTE(asaf): Looks like twitch returns `tags` properly, but only one in `tag_ids` var RelevantTags = []string{ "a59f1e4e-257b-4bd0-90c7-189c3efbf917", // Programming "6f86127d-6051-4a38-94bb-f7b475dde109", // Software Development } */ var RelevantTags = []string{ "Programming", // Programming "Software Development", // Software Development } func isStreamRelevant(catID string, tags []string) bool { for _, cat := range RelevantCategories { if cat == catID { return true } } for _, tag := range RelevantTags { for _, streamTag := range tags { if tag == streamTag { return true } } } return false } func twitchLog(ctx context.Context, conn db.ConnOrTx, logType models.TwitchLogType, login string, message string, payload string) { _, err := conn.Exec(ctx, ` INSERT INTO twitch_log (logged_at, twitch_login, type, message, payload) VALUES ($1, $2, $3, $4, $5) `, time.Now(), login, logType, message, payload, ) if err != nil { log := logging.ExtractLogger(ctx).With().Str("twitch goroutine", "twitch logger").Logger() log.Error().Err(err).Msg("Failed to log twitch event") } } func twitchLogClear(ctx context.Context, conn db.ConnOrTx) { _, err := conn.Exec(ctx, ` DELETE FROM twitch_log WHERE logged_at <= $1 `, time.Now().Add(-(time.Hour * 24 * 4)), ) if err != nil { log := logging.ExtractLogger(ctx).With().Str("twitch goroutine", "twitch logger").Logger() log.Error().Err(err).Msg("Failed to clear old twitch logs") } }