From 0f58cfc2da371f32d852b80bb7be3763f4049195 Mon Sep 17 00:00:00 2001 From: Asaf Gartner Date: Thu, 13 Oct 2022 00:40:37 +0300 Subject: [PATCH] Added support for db arrays and some twitch fixes. --- src/db/db.go | 6 + src/discord/streams.go | 5 +- .../2022-09-14T114645Z_NewTwitchTracking.go | 3 +- src/twitch/rest.go | 4 +- src/twitch/twitch.go | 206 ++++++++++-------- 5 files changed, 132 insertions(+), 92 deletions(-) diff --git a/src/db/db.go b/src/db/db.go index fb25b193..24941197 100644 --- a/src/db/db.go +++ b/src/db/db.go @@ -618,6 +618,12 @@ func setValueFromDB(dest reflect.Value, value reflect.Value) { dest.SetInt(value.Int()) case reflect.String: dest.SetString(value.String()) + case reflect.Slice: + switch v := value.Interface().(type) { + case pgtype.Value: + v.AssignTo(dest.Interface()) + default: + } // TODO(ben): More kinds? All the kinds? It kind of feels like we should be able to assign to any destination whose underlying type is a primitive. default: dest.Set(value) diff --git a/src/discord/streams.go b/src/discord/streams.go index 68cbbd51..9de52568 100644 --- a/src/discord/streams.go +++ b/src/discord/streams.go @@ -167,7 +167,7 @@ func PostStreamHistory(ctx context.Context, history *models.TwitchStreamHistory) } duration := history.EndedAt.Sub(history.StartedAt).Truncate(time.Minute).String() messageContent := fmt.Sprintf( - "**%s** was live: https://twitch.tv/%s\n> _%s_\n At For %s%s", + "**%s** was live: https://twitch.tv/%s\n> _%s_\nAt For %s%s", history.TwitchLogin, history.TwitchLogin, history.Title, @@ -175,6 +175,9 @@ func PostStreamHistory(ctx context.Context, history *models.TwitchStreamHistory) approximated, duration, ) + if history.VODUrl != "" { + messageContent += fmt.Sprintf("\nVOD: %s", history.VODUrl) + } msgJson, err := json.Marshal(CreateMessageRequest{ Content: messageContent, Flags: FlagSuppressEmbeds, diff --git a/src/migration/migrations/2022-09-14T114645Z_NewTwitchTracking.go b/src/migration/migrations/2022-09-14T114645Z_NewTwitchTracking.go index f20afd49..a0227410 100644 --- a/src/migration/migrations/2022-09-14T114645Z_NewTwitchTracking.go +++ b/src/migration/migrations/2022-09-14T114645Z_NewTwitchTracking.go @@ -35,6 +35,7 @@ func (m NewTwitchTracking) Up(ctx context.Context, tx pgx.Tx) error { twitch_login VARCHAR(255) NOT NULL, started_at TIMESTAMP WITH TIME ZONE NOT NULL, ended_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT 'epoch', + end_approximated BOOLEAN NOT NULL DEFAULT FALSE, title VARCHAR(255) NOT NULL DEFAULT '', category_id VARCHAR(255) NOT NULL DEFAULT '', tags VARCHAR(255) ARRAY NOT NULL DEFAULT '{}', @@ -72,7 +73,7 @@ func (m NewTwitchTracking) Down(ctx context.Context, tx pgx.Tx) error { DROP TABLE twitch_stream_history; DROP TABLE twitch_latest_status; - CREATE TABLE twitch_streams ( + CREATE TABLE twitch_stream ( twitch_id VARCHAR(255) NOT NULL, twitch_login VARCHAR(255) NOT NULL, title VARCHAR(255) NOT NULL, diff --git a/src/twitch/rest.go b/src/twitch/rest.go index a5ef5520..fa83ce4c 100644 --- a/src/twitch/rest.go +++ b/src/twitch/rest.go @@ -174,7 +174,7 @@ type archivedVideo struct { func getArchivedVideosForUser(ctx context.Context, twitchID string, numVODs int) ([]archivedVideo, error) { query := url.Values{} query.Add("user_id", twitchID) - query.Add("type", "archived") + query.Add("type", "archive") query.Add("first", strconv.Itoa(numVODs)) return getArchivedVideosByQuery(ctx, query) @@ -471,7 +471,7 @@ func doRequest(ctx context.Context, waitOnRateLimit bool, req *http.Request) (*h if err != nil { return nil, oops.New(err, "failed to read response body") } - logging.ExtractLogger(ctx).Warn().Interface("Headers", res.Header).Int("Status code", res.StatusCode).Str("Body", string(body[:])).Msg("Unexpected status code from twitch") + logging.ExtractLogger(ctx).Error().Interface("Headers", res.Header).Int("Status code", res.StatusCode).Str("Body", string(body[:])).Msg("Unexpected status code from twitch") res.Body.Close() return res, oops.New(nil, "got an unexpected status code from twitch") } diff --git a/src/twitch/twitch.go b/src/twitch/twitch.go index 12c0797e..09bd2068 100644 --- a/src/twitch/twitch.go +++ b/src/twitch/twitch.go @@ -267,13 +267,17 @@ func syncWithTwitch(ctx context.Context, dbConn *pgxpool.Pool, updateAll bool, u streamerMap[streamer.TwitchLogin] = &streamers[idx] } - p.StartBlock("TwitchAPI", "Fetch twitch user info") - twitchUsers, err := getTwitchUsersByLogin(ctx, needID) - if err != nil { - log.Error().Err(err).Msg("Error while monitoring twitch") - return + twitchUsers := []twitchUser{} + if len(needID) > 0 { + p.StartBlock("TwitchAPI", "Fetch twitch user info") + log.Debug().Interface("needID", needID).Msg("IDs") + twitchUsers, err = getTwitchUsersByLogin(ctx, needID) + if err != nil { + log.Error().Err(err).Msg("Error while monitoring twitch") + return + } + p.EndBlock() } - p.EndBlock() for _, tu := range twitchUsers { streamerMap[tu.TwitchLogin].TwitchID = tu.TwitchID @@ -394,43 +398,45 @@ func syncWithTwitch(ctx context.Context, dbConn *pgxpool.Pool, updateAll bool, u } } - p.StartBlock("TwitchAPI", "Fetch twitch stream statuses") - statuses, err := getStreamStatus(ctx, usersToUpdate) - if err != nil { - log.Error().Err(err).Msg("failed to fetch stream statuses") - return - } - twitchLog(ctx, tx, models.TwitchLogTypeOther, "", "Batch resync", fmt.Sprintf("%#v", statuses)) - p.EndBlock() - p.StartBlock("SQL", "Update stream statuses in db") - for _, twitchId := range usersToUpdate { - var status *streamStatus - for idx, st := range statuses { - if st.TwitchID == twitchId { - status = &statuses[idx] - break - } + if len(usersToUpdate) > 0 { + p.StartBlock("TwitchAPI", "Fetch twitch stream statuses") + statuses, err := getStreamStatus(ctx, usersToUpdate) + if err != nil { + log.Error().Err(err).Msg("failed to fetch stream statuses") + return } - if status == nil { - twitchLogin := "" - for _, streamer := range validStreamers { - if streamer.TwitchID == twitchId { - twitchLogin = streamer.TwitchLogin + twitchLog(ctx, tx, models.TwitchLogTypeOther, "", "Batch resync", fmt.Sprintf("%#v", statuses)) + p.EndBlock() + p.StartBlock("SQL", "Update stream statuses in db") + for _, twitchId := range usersToUpdate { + var status *streamStatus + for idx, st := range statuses { + if st.TwitchID == twitchId { + status = &statuses[idx] break } } - status = &streamStatus{ - TwitchID: twitchId, - TwitchLogin: twitchLogin, + if status == nil { + twitchLogin := "" + for _, streamer := range validStreamers { + if streamer.TwitchID == twitchId { + twitchLogin = streamer.TwitchLogin + break + } + } + status = &streamStatus{ + TwitchID: twitchId, + TwitchLogin: twitchLogin, + } + } + twitchLog(ctx, tx, models.TwitchLogTypeREST, status.TwitchLogin, "Resync", fmt.Sprintf("%#v", status)) + err = gotRESTUpdate(ctx, tx, status) + if err != nil { + log.Error().Err(err).Msg("failed to update twitch stream status") } } - twitchLog(ctx, tx, models.TwitchLogTypeREST, status.TwitchLogin, "Resync", fmt.Sprintf("%#v", status)) - err = gotRESTUpdate(ctx, tx, status) - if err != nil { - log.Error().Err(err).Msg("failed to update twitch stream status") - } + p.EndBlock() } - p.EndBlock() err = tx.Commit(ctx) if err != nil { log.Error().Err(err).Msg("failed to commit transaction") @@ -489,7 +495,7 @@ func notifyDiscordOfLiveStream(ctx context.Context, dbConn db.ConnOrTx) error { UPDATE twitch_stream_history SET discord_needs_update = $2, - discord_message_id = $3, + discord_message_id = $3 WHERE stream_id = $1 `, h.StreamID, @@ -638,6 +644,7 @@ func gotStreamOnline(ctx context.Context, conn db.ConnOrTx, status *streamStatus if err != nil { return err } + twitchLog(ctx, conn, models.TwitchLogTypeOther, status.TwitchLogin, "GotStreamOnline", fmt.Sprintf("latest: %#v\nstatus: %#v", latest, status)) latest.Live = true latest.StreamID = status.StreamID latest.StartedAt = status.StartedAt @@ -658,6 +665,7 @@ func gotStreamOffline(ctx context.Context, conn db.ConnOrTx, status *streamStatu if err != nil { return err } + twitchLog(ctx, conn, models.TwitchLogTypeOther, status.TwitchLogin, "GotStreamOffline", fmt.Sprintf("latest: %#v\nstatus: %#v", latest, status)) latest.Live = false latest.LastHookLiveUpdate = time.Now() err = saveLatestStreamStatus(ctx, conn, latest) @@ -676,6 +684,7 @@ func gotChannelUpdate(ctx context.Context, conn db.ConnOrTx, status *streamStatu if err != nil { return err } + twitchLog(ctx, conn, models.TwitchLogTypeOther, status.TwitchLogin, "GotChannelUpdate", fmt.Sprintf("latest: %#v\nstatus: %#v", latest, status)) if !latest.Live { // NOTE(asaf): If the stream is live, this channel update applies // to the current livestream. Otherwise, this will @@ -706,6 +715,7 @@ func gotRESTUpdate(ctx context.Context, conn db.ConnOrTx, status *streamStatus) if err != nil { return err } + twitchLog(ctx, conn, models.TwitchLogTypeOther, status.TwitchLogin, "GotRestUpdate", fmt.Sprintf("latest: %#v\nstatus: %#v", latest, status)) if latest.LastHookLiveUpdate.Add(3 * time.Minute).Before(time.Now()) { latest.Live = status.Live if status.Live { @@ -764,6 +774,7 @@ func fetchLatestStreamStatus(ctx context.Context, conn db.ConnOrTx, twitchID str result = &models.TwitchLatestStatus{ TwitchID: twitchID, TwitchLogin: twitchLogin, + Tags: []string{}, } } else if err != nil { return nil, oops.New(err, "failed to fetch existing twitch status") @@ -809,18 +820,20 @@ func saveLatestStreamStatus(ctx context.Context, conn db.ConnOrTx, latest *model UPDATE twitch_latest_status SET live = $2, - started_at = $3, - title = $4, - category_id = $5, - tags = $6, - last_hook_live_update = $7, - last_hook_channel_update = $8, - last_rest_update = $9 + stream_id = $3, + started_at = $4, + title = $5, + category_id = $6, + tags = $7, + last_hook_live_update = $8, + last_hook_channel_update = $9, + last_rest_update = $10 WHERE twitch_id = $1 `, latest.TwitchID, latest.Live, + latest.StreamID, latest.StartedAt, latest.Title, latest.CategoryID, @@ -842,6 +855,7 @@ func saveLatestStreamStatus(ctx context.Context, conn db.ConnOrTx, latest *model func updateStreamHistory(ctx context.Context, dbConn db.ConnOrTx, status *models.TwitchLatestStatus) error { if status.StreamID == "" { + twitchLog(ctx, dbConn, models.TwitchLogTypeOther, status.TwitchLogin, "updateStreamHistory", fmt.Sprintf("No StreamID - Skipping\nstatus: %#v", status)) return nil } tx, err := dbConn.Begin(ctx) @@ -917,11 +931,22 @@ func updateStreamHistory(ctx context.Context, dbConn db.ConnOrTx, status *models } func findHistoryVOD(ctx context.Context, dbConn db.ConnOrTx, history *models.TwitchStreamHistory) error { - if history.StreamID == "" || history.VODID != "" || history.VODGone { + if history.StreamID == "" || (history.VODID != "" && !history.EndedAt.IsZero()) || history.VODGone { return nil } + latest, err := fetchLatestStreamStatus(ctx, dbConn, history.TwitchID, history.TwitchLogin) + if err != nil { + return oops.New(err, "failed to fetch latest status") + } + + stillLive := false + if latest.StreamID == history.StreamID && latest.Live { + stillLive = true + } + vods, err := getArchivedVideosForUser(ctx, history.TwitchID, 10) + twitchLog(ctx, dbConn, models.TwitchLogTypeOther, history.TwitchLogin, "findHistoryVOD", fmt.Sprintf("vods: %#v\nhistory: %#v", vods, history)) if err != nil { return oops.New(err, "failed to fetch vods for streamer") } @@ -939,9 +964,10 @@ func findHistoryVOD(ctx context.Context, dbConn db.ConnOrTx, history *models.Twi history.LastVerifiedVOD = time.Now() history.VODGone = false - if vod.Duration.Minutes() > 0 { + if !stillLive && vod.Duration.Minutes() > 0 { history.EndedAt = history.StartedAt.Add(vod.Duration) history.EndApproximated = false + history.DiscordNeedsUpdate = true } _, err = dbConn.Exec(ctx, @@ -954,7 +980,8 @@ func findHistoryVOD(ctx context.Context, dbConn db.ConnOrTx, history *models.Twi last_verified_vod = $5, vod_gone = $6, ended_at = $7, - end_approximated = $8 + end_approximated = $8, + discord_needs_update = $9 WHERE stream_id = $1 `, history.StreamID, @@ -965,6 +992,7 @@ func findHistoryVOD(ctx context.Context, dbConn db.ConnOrTx, history *models.Twi history.VODGone, history.EndedAt, history.EndApproximated, + history.DiscordNeedsUpdate, ) if err != nil { return oops.New(err, "failed to update stream history with VOD") @@ -996,19 +1024,19 @@ func findMissingVODs(ctx context.Context, dbConn db.ConnOrTx) error { FROM twitch_stream_history WHERE vod_gone = FALSE AND - vod_url = '' AND - ended_at != $1 + vod_id = '' `, - time.Time{}, ) if err != nil { return oops.New(err, "failed to fetch stream history for vod updates") } for _, history := range histories { - err = findHistoryVOD(ctx, dbConn, history) - if err != nil { - return err + if history.EndedAt.IsZero() { + err = findHistoryVOD(ctx, dbConn, history) + if err != nil { + return err + } } } return nil @@ -1038,31 +1066,32 @@ func verifyHistoryVODs(ctx context.Context, dbConn db.ConnOrTx) error { videoIDs = append(videoIDs, h.VODID) } - VODs, err := getArchivedVideos(ctx, videoIDs) - if err != nil { - return oops.New(err, "failed to fetch vods from twitch") - } + if len(videoIDs) > 0 { + VODs, err := getArchivedVideos(ctx, videoIDs) + if err != nil { + return oops.New(err, "failed to fetch vods from twitch") + } - vodGone := make([]string, 0, len(histories)) - vodFound := make([]string, 0, len(histories)) - for _, h := range histories { - found := false - for _, vod := range VODs { - if h.VODID == vod.ID { - found = true - break + vodGone := make([]string, 0, len(histories)) + vodFound := make([]string, 0, len(histories)) + for _, h := range histories { + found := false + for _, vod := range VODs { + if h.VODID == vod.ID { + found = true + break + } + } + if !found { + vodGone = append(vodGone, h.VODID) + } else { + vodFound = append(vodFound, h.VODID) } } - if !found { - vodGone = append(vodGone, h.VODID) - } else { - vodFound = append(vodFound, h.VODID) - } - } - if len(vodGone) > 0 { - _, err = dbConn.Exec(ctx, - ` + if len(vodGone) > 0 { + _, err = dbConn.Exec(ctx, + ` UPDATE twitch_stream_history SET discord_needs_update = TRUE, @@ -1074,28 +1103,29 @@ func verifyHistoryVODs(ctx context.Context, dbConn db.ConnOrTx) error { WHERE vod_id = ANY($1) `, - vodGone, - time.Now(), - ) - if err != nil { - return oops.New(err, "failed to update twitch history") + vodGone, + time.Now(), + ) + if err != nil { + return oops.New(err, "failed to update twitch history") + } } - } - if len(vodFound) > 0 { - _, err = dbConn.Exec(ctx, - ` + if len(vodFound) > 0 { + _, err = dbConn.Exec(ctx, + ` UPDATE twitch_stream_history SET last_verified_vod = $2, WHERE vod_id = ANY($1) `, - vodFound, - time.Now(), - ) - if err != nil { - return oops.New(err, "failed to update twitch history") + vodFound, + time.Now(), + ) + if err != nil { + return oops.New(err, "failed to update twitch history") + } } }