From 5cc920dc2f6302b078ec9816aee140486c68fbe4 Mon Sep 17 00:00:00 2001 From: Asaf Gartner Date: Wed, 12 Oct 2022 21:42:04 +0300 Subject: [PATCH] Better twitch tracking --- src/discord/streams.go | 192 +++-- src/hmndata/persistent_vars.go | 14 + .../2022-09-14T114645Z_NewTwitchTracking.go | 84 ++ src/models/twitch.go | 40 +- src/twitch/rest.go | 122 ++- src/twitch/twitch.go | 720 +++++++++++++++--- src/website/twitch.go | 8 +- 7 files changed, 993 insertions(+), 187 deletions(-) create mode 100644 src/migration/migrations/2022-09-14T114645Z_NewTwitchTracking.go diff --git a/src/discord/streams.go b/src/discord/streams.go index 1979fe8..68cbbd5 100644 --- a/src/discord/streams.go +++ b/src/discord/streams.go @@ -5,24 +5,28 @@ import ( "encoding/json" "fmt" "strings" + "time" "git.handmade.network/hmn/hmn/src/config" "git.handmade.network/hmn/hmn/src/db" "git.handmade.network/hmn/hmn/src/hmndata" "git.handmade.network/hmn/hmn/src/logging" + "git.handmade.network/hmn/hmn/src/models" "git.handmade.network/hmn/hmn/src/oops" ) // NOTE(asaf): Updates or creates a discord message according to the following rules: -// Create when: -// * No previous message exists -// * We have non-zero live streamers -// * Message exists, but we're adding a new streamer that wasn't in the existing message -// * Message exists, but is not the most recent message in the channel -// Update otherwise -// That way we ensure that the message doesn't get scrolled offscreen, and the -// new message indicator for the channel doesn't trigger when a streamer goes offline or -// updates the stream title. +// +// Create when: +// * No previous message exists +// * We have non-zero live streamers +// * Message exists, but we're adding a new streamer that wasn't in the existing message +// * Message exists, but is not the most recent message in the channel +// Update otherwise +// That way we ensure that the message doesn't get scrolled offscreen, and the +// new message indicator for the channel doesn't trigger when a streamer goes offline or +// updates the stream title. +// // NOTE(asaf): No-op if StreamsChannelID is not specified in the config func UpdateStreamers(ctx context.Context, dbConn db.ConnOrTx, streamers []hmndata.StreamDetails) error { if len(config.Config.Discord.StreamsChannelID) == 0 { @@ -44,6 +48,7 @@ func UpdateStreamers(ctx context.Context, dbConn db.ConnOrTx, streamers []hmndat } if editExisting { + // Make sure we have a message to edit _, err := GetChannelMessage(ctx, config.Config.Discord.StreamsChannelID, livestreamMessage.MessageID) if err != nil { if err == NotFound { @@ -54,88 +59,143 @@ func UpdateStreamers(ctx context.Context, dbConn db.ConnOrTx, streamers []hmndat } } - if editExisting { - existingStreamers := livestreamMessage.Streamers - for _, s := range streamers { - found := false - for _, es := range existingStreamers { - if es.Username == s.Username { - found = true + if len(streamers) == 0 { + if livestreamMessage != nil { + err = DeleteMessage(ctx, config.Config.Discord.StreamsChannelID, livestreamMessage.MessageID) + if err != nil { + return oops.New(err, "failed to delete livestream message from discord") + } + err = hmndata.RemovePersistentVar(ctx, dbConn, hmndata.VarNameDiscordLivestreamMessage) + if err != nil { + return oops.New(err, "failed to clear discord persistent var") + } + } + } else { + if editExisting { + // Check if we have new streamers to add + existingStreamers := livestreamMessage.Streamers + for _, s := range streamers { + found := false + for _, es := range existingStreamers { + if es.Username == s.Username { + found = true + break + } + } + if !found { + editExisting = false break } } - if !found { + } + + if editExisting { + // Check that our editable message is the latest in the channel + messages, err := GetChannelMessages(ctx, config.Config.Discord.StreamsChannelID, GetChannelMessagesInput{ + Limit: 1, + }) + if err != nil { + return oops.New(err, "failed to fetch messages from discord") + } + if len(messages) == 0 || messages[0].ID != livestreamMessage.MessageID { editExisting = false - break } } - } - if editExisting && len(streamers) > 0 { - messages, err := GetChannelMessages(ctx, config.Config.Discord.StreamsChannelID, GetChannelMessagesInput{ - Limit: 1, - }) - if err != nil { - return oops.New(err, "failed to fetch messages from discord") - } - if len(messages) == 0 || messages[0].ID != livestreamMessage.MessageID { - editExisting = false - } - } - - messageContent := "" - if len(streamers) == 0 { - messageContent = "No one is currently streaming." - } else { + messageContent := "" var builder strings.Builder for _, s := range streamers { builder.WriteString(fmt.Sprintf(":red_circle: **%s** is live: \n> _%s_\nStarted \n\n", s.Username, s.Username, s.Title, s.StartTime.Unix())) } messageContent = builder.String() + + msgJson, err := json.Marshal(CreateMessageRequest{ + Content: messageContent, + Flags: FlagSuppressEmbeds, + AllowedMentions: &MessageAllowedMentions{}, + }) + if err != nil { + return oops.New(err, "failed to marshal discord message") + } + + newMessageID := "" + if editExisting { + updatedMessage, err := EditMessage(ctx, config.Config.Discord.StreamsChannelID, livestreamMessage.MessageID, string(msgJson)) + if err != nil { + return oops.New(err, "failed to update discord message for streams channel") + } + + newMessageID = updatedMessage.ID + } else { + if livestreamMessage != nil { + err = DeleteMessage(ctx, config.Config.Discord.StreamsChannelID, livestreamMessage.MessageID) + if err != nil { + log := logging.ExtractLogger(ctx) + log.Error().Err(err).Msg("failed to delete existing discord message from streams channel") + } + } + + sentMessage, err := CreateMessage(ctx, config.Config.Discord.StreamsChannelID, string(msgJson)) + if err != nil { + return oops.New(err, "failed to create discord message for streams channel") + } + + newMessageID = sentMessage.ID + } + + data := hmndata.DiscordLivestreamMessage{ + MessageID: newMessageID, + Streamers: streamers, + } + err = hmndata.StorePersistentVar(ctx, dbConn, hmndata.VarNameDiscordLivestreamMessage, &data) + if err != nil { + return oops.New(err, "failed to store persistent var for discord streams") + } + + } + return nil +} + +func PostStreamHistory(ctx context.Context, history *models.TwitchStreamHistory) (string, error) { + if len(config.Config.Discord.StreamsChannelID) == 0 { + return "", nil } + approximated := "" + if history.EndApproximated { + approximated = "about " + } + duration := history.EndedAt.Sub(history.StartedAt).Truncate(time.Minute).String() + messageContent := fmt.Sprintf( + "**%s** was live: https://twitch.tv/%s\n> _%s_\n At For %s%s", + history.TwitchLogin, + history.TwitchLogin, + history.Title, + history.StartedAt.Unix(), + approximated, + duration, + ) msgJson, err := json.Marshal(CreateMessageRequest{ Content: messageContent, Flags: FlagSuppressEmbeds, AllowedMentions: &MessageAllowedMentions{}, }) if err != nil { - return oops.New(err, "failed to marshal discord message") + return "", oops.New(err, "failed to marshal discord message") } - - newMessageID := "" - if editExisting { - updatedMessage, err := EditMessage(ctx, config.Config.Discord.StreamsChannelID, livestreamMessage.MessageID, string(msgJson)) + messageID := "" + if history.DiscordMessageID != "" { + updatedMessage, err := EditMessage(ctx, config.Config.Discord.StreamsChannelID, history.DiscordMessageID, string(msgJson)) if err != nil { - return oops.New(err, "failed to update discord message for streams channel") + return "", oops.New(err, "failed to update discord message for stream history") } - - newMessageID = updatedMessage.ID + messageID = updatedMessage.ID } else { - if livestreamMessage != nil { - err = DeleteMessage(ctx, config.Config.Discord.StreamsChannelID, livestreamMessage.MessageID) - if err != nil { - log := logging.ExtractLogger(ctx) - log.Error().Err(err).Msg("failed to delete existing discord message from streams channel") - } - } - - sentMessage, err := CreateMessage(ctx, config.Config.Discord.StreamsChannelID, string(msgJson)) + msg, err := CreateMessage(ctx, config.Config.Discord.StreamsChannelID, string(msgJson)) if err != nil { - return oops.New(err, "failed to create discord message for streams channel") + return "", oops.New(err, "failed to create discord message for stream history") } - - newMessageID = sentMessage.ID + messageID = msg.ID } - - data := hmndata.DiscordLivestreamMessage{ - MessageID: newMessageID, - Streamers: streamers, - } - err = hmndata.StorePersistentVar(ctx, dbConn, hmndata.VarNameDiscordLivestreamMessage, &data) - if err != nil { - return oops.New(err, "failed to store persistent var for discord streams") - } - - return nil + return messageID, nil } diff --git a/src/hmndata/persistent_vars.go b/src/hmndata/persistent_vars.go index f326b98..d55675c 100644 --- a/src/hmndata/persistent_vars.go +++ b/src/hmndata/persistent_vars.go @@ -84,3 +84,17 @@ func StorePersistentVar[T any]( return nil } + +func RemovePersistentVar(ctx context.Context, dbConn db.ConnOrTx, name PersistentVarName) error { + _, err := dbConn.Exec(ctx, + ` + DELETE FROM persistent_var + WHERE name = $1 + `, + name, + ) + if err != nil { + return oops.New(err, "failed to delete var") + } + return nil +} diff --git a/src/migration/migrations/2022-09-14T114645Z_NewTwitchTracking.go b/src/migration/migrations/2022-09-14T114645Z_NewTwitchTracking.go new file mode 100644 index 0000000..f20afd4 --- /dev/null +++ b/src/migration/migrations/2022-09-14T114645Z_NewTwitchTracking.go @@ -0,0 +1,84 @@ +package migrations + +import ( + "context" + "time" + + "git.handmade.network/hmn/hmn/src/migration/types" + "github.com/jackc/pgx/v4" +) + +func init() { + registerMigration(NewTwitchTracking{}) +} + +type NewTwitchTracking struct{} + +func (m NewTwitchTracking) Version() types.MigrationVersion { + return types.MigrationVersion(time.Date(2022, 9, 14, 11, 46, 45, 0, time.UTC)) +} + +func (m NewTwitchTracking) Name() string { + return "NewTwitchTracking" +} + +func (m NewTwitchTracking) Description() string { + return "New table for twitch tracking" +} + +func (m NewTwitchTracking) Up(ctx context.Context, tx pgx.Tx) error { + _, err := tx.Exec(ctx, + ` + CREATE TABLE twitch_stream_history ( + stream_id VARCHAR(255) NOT NULL PRIMARY KEY, + twitch_id VARCHAR(255) NOT NULL, + twitch_login VARCHAR(255) NOT NULL, + started_at TIMESTAMP WITH TIME ZONE NOT NULL, + ended_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT 'epoch', + title VARCHAR(255) NOT NULL DEFAULT '', + category_id VARCHAR(255) NOT NULL DEFAULT '', + tags VARCHAR(255) ARRAY NOT NULL DEFAULT '{}', + discord_message_id VARCHAR(255) NOT NULL DEFAULT '', + discord_needs_update BOOLEAN NOT NULL DEFAULT FALSE, + vod_id VARCHAR(255) NOT NULL DEFAULT '', + vod_url VARCHAR(512) NOT NULL DEFAULT '', + vod_thumbnail VARCHAR(512) NOT NULL DEFAULT '', + last_verified_vod TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT 'epoch', + vod_gone BOOLEAN NOT NULL DEFAULT FALSE + ); + CREATE TABLE twitch_latest_status ( + twitch_id VARCHAR(255) NOT NULL PRIMARY KEY, + twitch_login VARCHAR(255) NOT NULL, + stream_id VARCHAR(255) NOT NULL DEFAULT '', + live BOOLEAN NOT NULL DEFAULT FALSE, + started_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT 'epoch', + title VARCHAR(255) NOT NULL DEFAULT '', + category_id VARCHAR(255) NOT NULL DEFAULT '', + tags VARCHAR(255) ARRAY NOT NULL DEFAULT '{}', + last_hook_live_update TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT 'epoch', + last_hook_channel_update TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT 'epoch', + last_rest_update TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT 'epoch' + ); + + DROP TABLE twitch_stream; + `, + ) + return err +} + +func (m NewTwitchTracking) Down(ctx context.Context, tx pgx.Tx) error { + _, err := tx.Exec(ctx, + ` + DROP TABLE twitch_stream_history; + DROP TABLE twitch_latest_status; + + CREATE TABLE twitch_streams ( + twitch_id VARCHAR(255) NOT NULL, + twitch_login VARCHAR(255) NOT NULL, + title VARCHAR(255) NOT NULL, + started_at TIMESTAMP WITH TIME ZONE + ); + `, + ) + return err +} diff --git a/src/models/twitch.go b/src/models/twitch.go index 7b0bed2..75e0c00 100644 --- a/src/models/twitch.go +++ b/src/models/twitch.go @@ -2,16 +2,40 @@ package models import "time" -type TwitchID struct { - ID string `db:"id"` - Login string `db:"login"` +type TwitchLatestStatus struct { + TwitchID string `db:"twitch_id"` + TwitchLogin string `db:"twitch_login"` + StreamID string `db:"stream_id"` + Live bool `db:"live"` + StartedAt time.Time `db:"started_at"` + Title string `db:"title"` + CategoryID string `db:"category_id"` + Tags []string `db:"tags"` + LastHookLiveUpdate time.Time `db:"last_hook_live_update"` + LastHookChannelUpdate time.Time `db:"last_hook_channel_update"` + LastRESTUpdate time.Time `db:"last_rest_update"` } -type TwitchStream struct { - ID string `db:"twitch_id"` - Login string `db:"twitch_login"` - Title string `db:"title"` - StartedAt time.Time `db:"started_at"` +type TwitchStreamHistory struct { + StreamID string `db:"stream_id"` + TwitchID string `db:"twitch_id"` + TwitchLogin string `db:"twitch_login"` + StartedAt time.Time `db:"started_at"` + EndedAt time.Time `db:"ended_at"` + EndApproximated bool `db:"end_approximated"` + Title string `db:"title"` + CategoryID string `db:"category_id"` + Tags []string `db:"tags"` + DiscordMessageID string `db:"discord_message_id"` + DiscordNeedsUpdate bool `db:"discord_needs_update"` + VODID string `db:"vod_id"` + VODUrl string `db:"vod_url"` + VODThumbnail string `db:"vod_thumbnail"` + LastVerifiedVOD time.Time `db:"last_verified_vod"` + // NOTE(asaf): If we had a VOD for a while, and then it disappeared, + // assume it was removed from twitch and don't bother + // checking for it again. + VODGone bool `db:"vod_gone"` } type TwitchLogType int diff --git a/src/twitch/rest.go b/src/twitch/rest.go index 1c1722d..a5ef552 100644 --- a/src/twitch/rest.go +++ b/src/twitch/rest.go @@ -28,7 +28,8 @@ var MaxRetries = errors.New("hit max retries") var httpClient = &http.Client{} // NOTE(asaf): Access token is not thread-safe right now. -// All twitch requests are made through the goroutine in MonitorTwitchSubscriptions. +// +// All twitch requests are made through the goroutine in MonitorTwitchSubscriptions. var activeAccessToken string var rateLimitReset time.Time @@ -87,16 +88,6 @@ func getTwitchUsersByLogin(ctx context.Context, logins []string) ([]twitchUser, return result, nil } -type streamStatus struct { - TwitchID string - TwitchLogin string - Live bool - Title string - StartedAt time.Time - Category string - Tags []string -} - func getStreamStatus(ctx context.Context, twitchIDs []string) ([]streamStatus, error) { result := make([]streamStatus, 0, len(twitchIDs)) numChunks := len(twitchIDs)/100 + 1 @@ -116,6 +107,7 @@ func getStreamStatus(ctx context.Context, twitchIDs []string) ([]streamStatus, e } type twitchStatus struct { + StreamID string `json:"id"` TwitchID string `json:"user_id"` TwitchLogin string `json:"user_login"` GameID string `json:"game_id"` @@ -150,12 +142,13 @@ func getStreamStatus(ctx context.Context, twitchIDs []string) ([]streamStatus, e started = time.Now() } status := streamStatus{ + StreamID: d.StreamID, TwitchID: d.TwitchID, TwitchLogin: d.TwitchLogin, Live: d.Type == "live", Title: d.Title, StartedAt: started, - Category: d.GameID, + CategoryID: d.GameID, Tags: d.Tags, } result = append(result, status) @@ -165,6 +158,111 @@ func getStreamStatus(ctx context.Context, twitchIDs []string) ([]streamStatus, e return result, nil } +type archivedVideo struct { + ID string + StreamID string + TwitchID string + TwitchLogin string + Title string + Description string + CreatedAt time.Time + Duration time.Duration + VODUrl string + VODThumbnail string +} + +func getArchivedVideosForUser(ctx context.Context, twitchID string, numVODs int) ([]archivedVideo, error) { + query := url.Values{} + query.Add("user_id", twitchID) + query.Add("type", "archived") + query.Add("first", strconv.Itoa(numVODs)) + + return getArchivedVideosByQuery(ctx, query) +} + +func getArchivedVideos(ctx context.Context, videoIDs []string) ([]archivedVideo, error) { + query := url.Values{} + for _, vid := range videoIDs { + query.Add("id", vid) + } + return getArchivedVideosByQuery(ctx, query) +} + +func getArchivedVideosByQuery(ctx context.Context, query url.Values) ([]archivedVideo, error) { + req, err := http.NewRequestWithContext(ctx, "GET", buildUrl("/videos", query.Encode()), nil) + if err != nil { + return nil, oops.New(err, "failed to create request") + } + res, err := doRequest(ctx, true, req) + if err != nil { + return nil, oops.New(err, "failed to fetch archived videos for user") + } + + type twitchVideo struct { + ID string `json:"id"` + StreamID string `json:"stream_id"` + UserID string `json:"user_id"` + UserLogin string `json:"user_login"` + UserName string `json:"user_name"` + Title string `json:"title"` + Description string `json:"description"` + CreatedAt string `json:"created_at"` + PublishedAt string `json:"published_at"` + Url string `json:"url"` + ThumbnailUrl string `json:"thumbnail_url"` + Viewable string `json:"viewable"` + ViewCount int `json:"view_count"` + Language string `json:"language"` + Type string `json:"type"` + Duration string `json:"duration"` + } + + type twitchResponse struct { + Data []twitchVideo `json:"data"` + } + body, err := io.ReadAll(res.Body) + res.Body.Close() + if err != nil { + return nil, oops.New(err, "failed to read response body while processing archived videos") + } + log := logging.ExtractLogger(ctx) + log.Debug().Str("getArchivedVideosForUser response", string(body)).Msg("Got getArchivedVideosForUser response") + + var resp twitchResponse + err = json.Unmarshal(body, &resp) + if err != nil { + return nil, oops.New(err, "failed to parse twitch response while processing archived videos") + } + + var result []archivedVideo + + for _, v := range resp.Data { + createdAt, err := time.Parse(time.RFC3339, v.CreatedAt) + if err != nil { + logging.ExtractLogger(ctx).Warn().Str("Time string", v.CreatedAt).Msg("Failed to parse twitch timestamp") + createdAt = time.Time{} + } + duration, err := time.ParseDuration(v.Duration) + if err != nil { + duration = 0 + } + archived := archivedVideo{ + ID: v.ID, + StreamID: v.StreamID, + TwitchID: v.UserID, + TwitchLogin: v.UserLogin, + Title: v.Title, + Description: v.Description, + CreatedAt: createdAt, + Duration: duration, + VODUrl: v.Url, + VODThumbnail: v.ThumbnailUrl, + } + result = append(result, archived) + } + return result, nil +} + type twitchEventSub struct { EventID string TwitchID string diff --git a/src/twitch/twitch.go b/src/twitch/twitch.go index 2f34771..12c0797 100644 --- a/src/twitch/twitch.go +++ b/src/twitch/twitch.go @@ -19,6 +19,45 @@ import ( "github.com/jackc/pgx/v4/pgxpool" ) +// NOTE(asaf): The twitch api madness: +// +// | stream.online | stream.offline | channel.update[3] | REST[1][2] +// id[4] | YES | NO | NO | YES +// twitch_id | YES | YES | YES | YES +// twitch_login | YES | YES | YES | YES +// is_live | YES | IMPLICIT | NO | YES +// started_at | YES | NO | NO | YES +// title | NO | NO | YES | YES +// cat_id | NO | NO | YES | YES +// tags | NO | NO | NO | YES +// +// [1] REST returns nothing when user is not live +// [2] Information received from REST is ~3 minutes old. +// [3] channel.update also fires when the user changes their twitch channel settings when they're not live (i.e. as soon as they update it in twitch settings) +// [4] ID of the current livestream + +type streamStatus struct { + StreamID string + TwitchID string + TwitchLogin string + Live bool + Title string + StartedAt time.Time + CategoryID string + Tags []string +} + +type twitchNotificationType int + +const ( + notificationTypeNone twitchNotificationType = 0 + notificationTypeOnline = 1 + notificationTypeOffline = 2 + notificationTypeChannelUpdate = 3 + + notificationTypeRevocation = 4 +) + type twitchNotification struct { Status streamStatus Type twitchNotificationType @@ -75,10 +114,10 @@ func MonitorTwitchSubscriptions(ctx context.Context, dbConn *pgxpool.Pool) jobs. log.Error().Err(err).Msg("Failed to fetch refresh token on start") return true, nil } - syncWithTwitch(ctx, dbConn, true) + syncWithTwitch(ctx, dbConn, true, true) case <-monitorTicker.C: twitchLogClear(ctx, dbConn) - syncWithTwitch(ctx, dbConn, true) + syncWithTwitch(ctx, dbConn, true, true) case <-linksChangedChannel: // NOTE(asaf): Since we update links inside transactions for users/projects // we won't see the updated list of links until the transaction is committed. @@ -87,13 +126,13 @@ func MonitorTwitchSubscriptions(ctx context.Context, dbConn *pgxpool.Pool) jobs. var timer *time.Timer t := time.AfterFunc(5*time.Second, func() { expiredTimers <- timer - syncWithTwitch(ctx, dbConn, false) + syncWithTwitch(ctx, dbConn, false, false) }) timer = t timers = append(timers, t) case notification := <-twitchNotificationChannel: if notification.Type == notificationTypeRevocation { - syncWithTwitch(ctx, dbConn, false) + syncWithTwitch(ctx, dbConn, false, false) } else { // NOTE(asaf): The twitch API (getStreamStatus) lags behind the notification and // would return old data if we called it immediately, so we process @@ -123,17 +162,6 @@ func MonitorTwitchSubscriptions(ctx context.Context, dbConn *pgxpool.Pool) jobs. return job } -type twitchNotificationType int - -const ( - notificationTypeNone twitchNotificationType = 0 - notificationTypeOnline = 1 - notificationTypeOffline = 2 - notificationTypeChannelUpdate = 3 - - notificationTypeRevocation = 4 -) - func QueueTwitchNotification(ctx context.Context, conn db.ConnOrTx, messageType string, body []byte) error { var notification twitchNotification if messageType == "notification" { @@ -142,6 +170,7 @@ func QueueTwitchNotification(ctx context.Context, conn db.ConnOrTx, messageType Type string `json:"type"` } `json:"subscription"` Event struct { + StreamID string `json:"id"` BroadcasterUserID string `json:"broadcaster_user_id"` BroadcasterUserLogin string `json:"broadcaster_user_login"` Title string `json:"title"` @@ -159,12 +188,13 @@ func QueueTwitchNotification(ctx context.Context, conn db.ConnOrTx, messageType notification.Status.TwitchID = incoming.Event.BroadcasterUserID notification.Status.TwitchLogin = incoming.Event.BroadcasterUserLogin notification.Status.Title = incoming.Event.Title - notification.Status.Category = incoming.Event.CategoryID + notification.Status.CategoryID = incoming.Event.CategoryID notification.Status.StartedAt = time.Now() switch incoming.Subscription.Type { case "stream.online": notification.Type = notificationTypeOnline notification.Status.Live = true + notification.Status.StreamID = incoming.Event.StreamID case "stream.offline": notification.Type = notificationTypeOffline case "channel.update": @@ -206,7 +236,7 @@ func UserOrProjectLinksUpdated(twitchLoginsPreChange, twitchLoginsPostChange []s } } -func syncWithTwitch(ctx context.Context, dbConn *pgxpool.Pool, updateAll bool) { +func syncWithTwitch(ctx context.Context, dbConn *pgxpool.Pool, updateAll bool, updateVODs bool) { log := logging.ExtractLogger(ctx) log.Info().Msg("Running twitch sync") p := perf.MakeNewRequestPerf("Background job", "", "syncWithTwitch") @@ -264,12 +294,6 @@ func syncWithTwitch(ctx context.Context, dbConn *pgxpool.Pool, updateAll bool) { } p.EndBlock() - const ( - EventSubNone = 0 // No event of this type found - EventSubRefresh = 1 // Event found, but bad status. Need to unsubscribe and resubscribe. - EventSubGood = 2 // All is well. - ) - type isSubbedByType map[string]bool streamerEventSubs := make(map[string]isSubbedByType) @@ -345,7 +369,7 @@ func syncWithTwitch(ctx context.Context, dbConn *pgxpool.Pool, updateAll bool) { } p.StartBlock("SQL", "Remove untracked streamers") _, err = tx.Exec(ctx, - `DELETE FROM twitch_stream WHERE twitch_id != ANY($1)`, + `DELETE FROM twitch_latest_status WHERE twitch_id != ANY($1)`, allIDs, ) if err != nil { @@ -379,10 +403,29 @@ func syncWithTwitch(ctx context.Context, dbConn *pgxpool.Pool, updateAll bool) { twitchLog(ctx, tx, models.TwitchLogTypeOther, "", "Batch resync", fmt.Sprintf("%#v", statuses)) p.EndBlock() p.StartBlock("SQL", "Update stream statuses in db") - for _, status := range statuses { - log.Debug().Interface("Status", status).Msg("Got streamer") + for _, twitchId := range usersToUpdate { + var status *streamStatus + for idx, st := range statuses { + if st.TwitchID == twitchId { + status = &statuses[idx] + break + } + } + if status == nil { + twitchLogin := "" + for _, streamer := range validStreamers { + if streamer.TwitchID == twitchId { + twitchLogin = streamer.TwitchLogin + break + } + } + status = &streamStatus{ + TwitchID: twitchId, + TwitchLogin: twitchLogin, + } + } twitchLog(ctx, tx, models.TwitchLogTypeREST, status.TwitchLogin, "Resync", fmt.Sprintf("%#v", status)) - err = updateStreamStatusInDB(ctx, tx, &status) + err = gotRESTUpdate(ctx, tx, status) if err != nil { log.Error().Err(err).Msg("failed to update twitch stream status") } @@ -395,6 +438,17 @@ func syncWithTwitch(ctx context.Context, dbConn *pgxpool.Pool, updateAll bool) { stats.NumStreamsChecked += len(usersToUpdate) log.Info().Interface("Stats", stats).Msg("Twitch sync done") + if updateVODs { + err = findMissingVODs(ctx, dbConn) + if err != nil { + log.Error().Err(err).Msg("failed to find missing twitch vods") + } + err = verifyHistoryVODs(ctx, dbConn) + if err != nil { + log.Error().Err(err).Msg("failed to verify twitch vods") + } + } + log.Debug().Msg("Notifying discord") err = notifyDiscordOfLiveStream(ctx, dbConn) if err != nil { @@ -403,11 +457,56 @@ func syncWithTwitch(ctx context.Context, dbConn *pgxpool.Pool, updateAll bool) { } func notifyDiscordOfLiveStream(ctx context.Context, dbConn db.ConnOrTx) error { - streams, err := db.Query[models.TwitchStream](ctx, dbConn, + history, err := db.Query[models.TwitchStreamHistory](ctx, dbConn, + ` + SELECT $columns + FROM twitch_stream_history + WHERE discord_needs_update = TRUE + ORDER BY started_at ASC + `, + ) + if err != nil { + return oops.New(err, "failed to fetch twitch history") + } + + updatedHistories := make([]*models.TwitchStreamHistory, 0) + for _, h := range history { + relevant := isStreamRelevant(h.CategoryID, h.Tags) + if relevant && !h.EndedAt.IsZero() { + msgId, err := discord.PostStreamHistory(ctx, h) + if err != nil { + return oops.New(err, "failed to post twitch history to discord") + } + h.DiscordNeedsUpdate = false + h.DiscordMessageID = msgId + updatedHistories = append(updatedHistories, h) + } + } + + for _, h := range updatedHistories { + _, err = dbConn.Exec(ctx, + ` + UPDATE twitch_stream_history + SET + discord_needs_update = $2, + discord_message_id = $3, + WHERE stream_id = $1 + `, + h.StreamID, + h.DiscordNeedsUpdate, + h.DiscordMessageID, + ) + if err != nil { + return oops.New(err, "failed to update twitch history after posting to discord") + } + } + + streams, err := db.Query[models.TwitchLatestStatus](ctx, dbConn, ` SELECT $columns FROM - twitch_stream + twitch_latest_status + WHERE live = TRUE ORDER BY started_at ASC `, ) @@ -417,11 +516,13 @@ func notifyDiscordOfLiveStream(ctx context.Context, dbConn db.ConnOrTx) error { var streamDetails []hmndata.StreamDetails for _, s := range streams { - streamDetails = append(streamDetails, hmndata.StreamDetails{ - Username: s.Login, - StartTime: s.StartedAt, - Title: s.Title, - }) + if isStreamRelevant(s.CategoryID, s.Tags) { + streamDetails = append(streamDetails, hmndata.StreamDetails{ + Username: s.TwitchLogin, + StartTime: s.StartedAt, + Title: s.Title, + }) + } } err = discord.UpdateStreamers(ctx, dbConn, streamDetails) @@ -454,8 +555,9 @@ func updateStreamStatus(ctx context.Context, dbConn db.ConnOrTx, twitchID string } status := streamStatus{ - TwitchID: twitchID, - Live: false, + TwitchID: twitchID, + TwitchLogin: twitchLogin, + Live: false, } result, err := getStreamStatus(ctx, []string{twitchID}) @@ -468,7 +570,7 @@ func updateStreamStatus(ctx context.Context, dbConn db.ConnOrTx, twitchID string log.Debug().Interface("Got status", result[0]).Msg("Got streamer status from twitch") status = result[0] } - err = updateStreamStatusInDB(ctx, dbConn, &status) + err = gotRESTUpdate(ctx, dbConn, &status) if err != nil { log.Error().Err(err).Msg("failed to update twitch stream status") return @@ -506,29 +608,21 @@ func processEventSubNotification(ctx context.Context, dbConn db.ConnOrTx, notifi } twitchLog(ctx, dbConn, models.TwitchLogTypeHook, notification.Status.TwitchLogin, "Processing hook", fmt.Sprintf("%#v", notification)) - if notification.Type == notificationTypeOnline || notification.Type == notificationTypeOffline { - log.Debug().Interface("Status", notification.Status).Msg("Updating status") - err = updateStreamStatusInDB(ctx, dbConn, ¬ification.Status) + switch notification.Type { + case notificationTypeOnline: + err := gotStreamOnline(ctx, dbConn, ¬ification.Status) if err != nil { log.Error().Err(err).Msg("failed to update twitch stream status") - return } - } else if notification.Type == notificationTypeChannelUpdate { - // NOTE(asaf): Channel updates can happen wether or not the streamer is live. - // So we just update the title if the user is live in our db, and we rely on the - // 3-minute delayed status update to verify live status and category/tag requirements. - _, err = dbConn.Exec(ctx, - ` - UPDATE twitch_stream - SET title = $1 - WHERE twitch_id = $2 - `, - notification.Status.Title, - notification.Status.TwitchID, - ) + case notificationTypeOffline: + err := gotStreamOffline(ctx, dbConn, ¬ification.Status) + if err != nil { + log.Error().Err(err).Msg("failed to update twitch stream status") + } + case notificationTypeChannelUpdate: + err := gotChannelUpdate(ctx, dbConn, ¬ification.Status) if err != nil { log.Error().Err(err).Msg("failed to update twitch stream status") - return } } @@ -539,40 +633,472 @@ func processEventSubNotification(ctx context.Context, dbConn db.ConnOrTx, notifi } } -func updateStreamStatusInDB(ctx context.Context, conn db.ConnOrTx, status *streamStatus) error { - log := logging.ExtractLogger(ctx) - if isStatusRelevant(status) { - twitchLog(ctx, conn, models.TwitchLogTypeOther, status.TwitchLogin, "Marking online", fmt.Sprintf("%#v", status)) - log.Debug().Msg("Status relevant") - _, err := conn.Exec(ctx, - ` - INSERT INTO twitch_stream (twitch_id, twitch_login, title, started_at) - VALUES ($1, $2, $3, $4) - ON CONFLICT (twitch_id) DO UPDATE SET - title = EXCLUDED.title, - started_at = EXCLUDED.started_at - `, - status.TwitchID, - status.TwitchLogin, - status.Title, - status.StartedAt, - ) - if err != nil { - return oops.New(err, "failed to insert twitch streamer into db") - } - } else { - log.Debug().Msg("Stream not relevant") - twitchLog(ctx, conn, models.TwitchLogTypeOther, status.TwitchLogin, "Marking offline", fmt.Sprintf("%#v", status)) - _, err := conn.Exec(ctx, - ` - DELETE FROM twitch_stream WHERE twitch_id = $1 - `, - status.TwitchID, - ) - if err != nil { - return oops.New(err, "failed to remove twitch streamer from db") +func gotStreamOnline(ctx context.Context, conn db.ConnOrTx, status *streamStatus) error { + latest, err := fetchLatestStreamStatus(ctx, conn, status.TwitchID, status.TwitchLogin) + if err != nil { + return err + } + latest.Live = true + latest.StreamID = status.StreamID + latest.StartedAt = status.StartedAt + latest.LastHookLiveUpdate = time.Now() + err = saveLatestStreamStatus(ctx, conn, latest) + if err != nil { + return err + } + err = updateStreamHistory(ctx, conn, latest) + if err != nil { + return err + } + return nil +} + +func gotStreamOffline(ctx context.Context, conn db.ConnOrTx, status *streamStatus) error { + latest, err := fetchLatestStreamStatus(ctx, conn, status.TwitchID, status.TwitchLogin) + if err != nil { + return err + } + latest.Live = false + latest.LastHookLiveUpdate = time.Now() + err = saveLatestStreamStatus(ctx, conn, latest) + if err != nil { + return err + } + err = updateStreamHistory(ctx, conn, latest) + if err != nil { + return err + } + return nil +} + +func gotChannelUpdate(ctx context.Context, conn db.ConnOrTx, status *streamStatus) error { + latest, err := fetchLatestStreamStatus(ctx, conn, status.TwitchID, status.TwitchLogin) + if err != nil { + return err + } + if !latest.Live { + // NOTE(asaf): If the stream is live, this channel update applies + // to the current livestream. Otherwise, this will + // only apply to the next stream, so we clear out + // the stream info. + latest.StreamID = "" + latest.StartedAt = time.Time{} + } + latest.Title = status.Title + if latest.CategoryID != status.CategoryID { + latest.CategoryID = status.CategoryID + latest.Tags = []string{} // NOTE(asaf): We don't get tags here, but we can't assume they didn't change because some tags are automatic based on category + } + latest.LastHookChannelUpdate = time.Now() + err = saveLatestStreamStatus(ctx, conn, latest) + if err != nil { + return err + } + err = updateStreamHistory(ctx, conn, latest) + if err != nil { + return err + } + return nil +} + +func gotRESTUpdate(ctx context.Context, conn db.ConnOrTx, status *streamStatus) error { + latest, err := fetchLatestStreamStatus(ctx, conn, status.TwitchID, status.TwitchLogin) + if err != nil { + return err + } + if latest.LastHookLiveUpdate.Add(3 * time.Minute).Before(time.Now()) { + latest.Live = status.Live + if status.Live { + // NOTE(asaf): We don't get this information if the user isn't live + latest.StartedAt = status.StartedAt + latest.StreamID = status.StreamID } } + if latest.LastHookChannelUpdate.Add(3 * time.Minute).Before(time.Now()) { + if status.Live { + // NOTE(asaf): We don't get this information if the user isn't live + latest.Title = status.Title + latest.CategoryID = status.CategoryID + latest.Tags = status.Tags + } + } + latest.LastRESTUpdate = time.Now() + err = saveLatestStreamStatus(ctx, conn, latest) + if err != nil { + return err + } + err = updateStreamHistory(ctx, conn, latest) + if err != nil { + return err + } + return nil +} + +func fetchLatestStreamStatus(ctx context.Context, conn db.ConnOrTx, twitchID string, twitchLogin string) (*models.TwitchLatestStatus, error) { + tx, err := conn.Begin(ctx) + if err != nil { + return nil, oops.New(err, "failed to begin transaction for stream status fetch") + } + defer tx.Rollback(ctx) + + result, err := db.QueryOne[models.TwitchLatestStatus](ctx, conn, + ` + SELECT $columns + FROM twitch_latest_status + WHERE twitch_id = $1 + `, + twitchID, + ) + if err == db.NotFound { + _, err = conn.Exec(ctx, + ` + INSERT INTO twitch_latest_status (twitch_id, twitch_login) + VALUES ($1, $2) + `, + twitchID, + twitchLogin, + ) + if err != nil { + return nil, err + } + result = &models.TwitchLatestStatus{ + TwitchID: twitchID, + TwitchLogin: twitchLogin, + } + } else if err != nil { + return nil, oops.New(err, "failed to fetch existing twitch status") + } + + if result.TwitchLogin != twitchLogin { + _, err = conn.Exec(ctx, + ` + UPDATE twitch_latest_status + SET twitch_login = $2 + WHERE twitch_id = $1 + `, + twitchID, + twitchLogin, + ) + if err != nil { + return nil, oops.New(err, "failed to update twitch login") + } + result.TwitchLogin = twitchLogin + } + err = tx.Commit(ctx) + if err != nil { + return nil, oops.New(err, "failed to commit transaction") + } + return result, nil +} + +func saveLatestStreamStatus(ctx context.Context, conn db.ConnOrTx, latest *models.TwitchLatestStatus) error { + tx, err := conn.Begin(ctx) + if err != nil { + return oops.New(err, "failed to start transaction for stream status update") + } + defer tx.Rollback(ctx) + + // NOTE(asaf): Ensure that we have a record for it in the db + _, err = fetchLatestStreamStatus(ctx, conn, latest.TwitchID, latest.TwitchLogin) + if err != nil { + return err + } + + _, err = conn.Exec(ctx, + ` + UPDATE twitch_latest_status + SET + live = $2, + started_at = $3, + title = $4, + category_id = $5, + tags = $6, + last_hook_live_update = $7, + last_hook_channel_update = $8, + last_rest_update = $9 + WHERE + twitch_id = $1 + `, + latest.TwitchID, + latest.Live, + latest.StartedAt, + latest.Title, + latest.CategoryID, + latest.Tags, + latest.LastHookLiveUpdate, + latest.LastHookChannelUpdate, + latest.LastRESTUpdate, + ) + if err != nil { + return oops.New(err, "failed to update twitch latest status") + } + + err = tx.Commit(ctx) + if err != nil { + return oops.New(err, "failed to commit transaction") + } + return nil +} + +func updateStreamHistory(ctx context.Context, dbConn db.ConnOrTx, status *models.TwitchLatestStatus) error { + if status.StreamID == "" { + return nil + } + tx, err := dbConn.Begin(ctx) + if err != nil { + return oops.New(err, "failed to begin transaction for stream history update") + } + defer tx.Rollback(ctx) + history, err := db.QueryOne[models.TwitchStreamHistory](ctx, tx, + ` + SELECT $columns + FROM twitch_stream_history + WHERE stream_id = $1 + `, + status.StreamID, + ) + + if err == db.NotFound { + history = &models.TwitchStreamHistory{} + history.StreamID = status.StreamID + history.TwitchID = status.TwitchID + history.TwitchLogin = status.TwitchLogin + history.StartedAt = status.StartedAt + history.DiscordNeedsUpdate = true + } else if err != nil { + return oops.New(err, "failed to fetch existing stream history") + } + + if !status.Live && history.EndedAt.IsZero() { + history.EndedAt = time.Now() + history.EndApproximated = true + history.DiscordNeedsUpdate = true + } + + history.Title = status.Title + history.CategoryID = status.CategoryID + history.Tags = status.Tags + + _, err = tx.Exec(ctx, + ` + INSERT INTO + twitch_stream_history (stream_id, twitch_id, twitch_login, started_at, ended_at, title, category_id, tags) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + ON CONFLICT (stream_id) DO UPDATE SET + ended_at = EXCLUDED.ended_at, + title = EXCLUDED.title, + category_id = EXCLUDED.category_id, + tags = EXCLUDED.tags + `, + history.StreamID, + history.TwitchID, + history.TwitchLogin, + history.StartedAt, + history.EndedAt, + history.Title, + history.CategoryID, + history.Tags, + ) + if err != nil { + return oops.New(err, "failed to insert/update twitch history") + } + err = tx.Commit(ctx) + if err != nil { + return oops.New(err, "failed to commit transaction") + } + + if !history.EndedAt.IsZero() { + err = findHistoryVOD(ctx, dbConn, history) + if err != nil { + return oops.New(err, "failed to look up twitch vod") + } + } + return nil +} + +func findHistoryVOD(ctx context.Context, dbConn db.ConnOrTx, history *models.TwitchStreamHistory) error { + if history.StreamID == "" || history.VODID != "" || history.VODGone { + return nil + } + + vods, err := getArchivedVideosForUser(ctx, history.TwitchID, 10) + if err != nil { + return oops.New(err, "failed to fetch vods for streamer") + } + + var vod *archivedVideo + for idx, v := range vods { + if v.StreamID == history.StreamID { + vod = &vods[idx] + } + } + if vod != nil { + history.VODID = vod.ID + history.VODUrl = vod.VODUrl + history.VODThumbnail = vod.VODThumbnail + history.LastVerifiedVOD = time.Now() + history.VODGone = false + + if vod.Duration.Minutes() > 0 { + history.EndedAt = history.StartedAt.Add(vod.Duration) + history.EndApproximated = false + } + + _, err = dbConn.Exec(ctx, + ` + UPDATE twitch_stream_history + SET + vod_id = $2, + vod_url = $3, + vod_thumbnail = $4, + last_verified_vod = $5, + vod_gone = $6, + ended_at = $7, + end_approximated = $8 + WHERE stream_id = $1 + `, + history.StreamID, + history.VODID, + history.VODUrl, + history.VODThumbnail, + history.LastVerifiedVOD, + history.VODGone, + history.EndedAt, + history.EndApproximated, + ) + if err != nil { + return oops.New(err, "failed to update stream history with VOD") + } + } else { + if history.StartedAt.Add(14 * 24 * time.Hour).Before(time.Now()) { + history.VODGone = true + _, err = dbConn.Exec(ctx, ` + UPDATE twitch_stream_history + SET + vod_gone = $2, + WHERE stream_id = $1 + `, + history.StreamID, + history.VODGone, + ) + if err != nil { + return oops.New(err, "failed to update stream history") + } + } + } + return nil +} + +func findMissingVODs(ctx context.Context, dbConn db.ConnOrTx) error { + histories, err := db.Query[models.TwitchStreamHistory](ctx, dbConn, + ` + SELECT $columns + FROM twitch_stream_history + WHERE + vod_gone = FALSE AND + vod_url = '' AND + ended_at != $1 + `, + time.Time{}, + ) + if err != nil { + return oops.New(err, "failed to fetch stream history for vod updates") + } + + for _, history := range histories { + err = findHistoryVOD(ctx, dbConn, history) + if err != nil { + return err + } + } + return nil +} + +func verifyHistoryVODs(ctx context.Context, dbConn db.ConnOrTx) error { + histories, err := db.Query[models.TwitchStreamHistory](ctx, dbConn, + ` + SELECT $columns + FROM twitch_stream_history + WHERE + vod_gone = FALSE AND + vod_id != '' AND + last_verified_vod < $1 + ORDER BY last_verified_vod ASC + LIMIT 100 + `, + time.Now().Add(-24*time.Hour), + ) + + if err != nil { + return oops.New(err, "failed to fetch stream history for vod verification") + } + + videoIDs := make([]string, 0, len(histories)) + for _, h := range histories { + videoIDs = append(videoIDs, h.VODID) + } + + VODs, err := getArchivedVideos(ctx, videoIDs) + if err != nil { + return oops.New(err, "failed to fetch vods from twitch") + } + + vodGone := make([]string, 0, len(histories)) + vodFound := make([]string, 0, len(histories)) + for _, h := range histories { + found := false + for _, vod := range VODs { + if h.VODID == vod.ID { + found = true + break + } + } + if !found { + vodGone = append(vodGone, h.VODID) + } else { + vodFound = append(vodFound, h.VODID) + } + } + + if len(vodGone) > 0 { + _, err = dbConn.Exec(ctx, + ` + UPDATE twitch_stream_history + SET + discord_needs_update = TRUE, + vod_id = '', + vod_url = '', + vod_thumbnail = '', + last_verified_vod = $2, + vod_gone = TRUE + WHERE + vod_id = ANY($1) + `, + vodGone, + time.Now(), + ) + if err != nil { + return oops.New(err, "failed to update twitch history") + } + } + + if len(vodFound) > 0 { + _, err = dbConn.Exec(ctx, + ` + UPDATE twitch_stream_history + SET + last_verified_vod = $2, + WHERE + vod_id = ANY($1) + `, + vodFound, + time.Now(), + ) + if err != nil { + return oops.New(err, "failed to update twitch history") + } + } + return nil } @@ -585,19 +1111,17 @@ var RelevantTags = []string{ "6f86127d-6051-4a38-94bb-f7b475dde109", // Software Development } -func isStatusRelevant(status *streamStatus) bool { - if status.Live { - for _, cat := range RelevantCategories { - if status.Category == cat { - return true - } +func isStreamRelevant(catID string, tags []string) bool { + for _, cat := range RelevantCategories { + if cat == catID { + return true } + } - for _, tag := range RelevantTags { - for _, streamTag := range status.Tags { - if tag == streamTag { - return true - } + for _, tag := range RelevantTags { + for _, streamTag := range tags { + if tag == streamTag { + return true } } } diff --git a/src/website/twitch.go b/src/website/twitch.go index d2c7809..0be7555 100644 --- a/src/website/twitch.go +++ b/src/website/twitch.go @@ -97,11 +97,13 @@ func TwitchDebugPage(c *RequestContext) ResponseData { if err != nil { return c.ErrorResponse(http.StatusInternalServerError, oops.New(err, "failed to fetch twitch streamers")) } - live, err := db.Query[models.TwitchStream](c, c.Conn, + live, err := db.Query[models.TwitchLatestStatus](c, c.Conn, ` SELECT $columns FROM - twitch_stream + twitch_latest_status + WHERE + live = TRUE `, ) if err != nil { @@ -124,7 +126,7 @@ func TwitchDebugPage(c *RequestContext) ResponseData { user.Login = u.TwitchLogin user.Live = false for _, l := range live { - if l.Login == u.TwitchLogin { + if l.TwitchLogin == u.TwitchLogin { user.Live = true break }