From a6ad01143ae04b56839d34e39724bebc9cb59211 Mon Sep 17 00:00:00 2001 From: Asaf Gartner Date: Wed, 19 Oct 2022 11:57:34 +0300 Subject: [PATCH] Twitch should work now hopefully. --- .../2022-10-18T062831Z_AddTwitchEnded.go | 57 +++++++++++++++ src/models/twitch.go | 1 + src/twitch/rest.go | 4 ++ src/twitch/twitch.go | 69 +++++++++++-------- 4 files changed, 103 insertions(+), 28 deletions(-) create mode 100644 src/migration/migrations/2022-10-18T062831Z_AddTwitchEnded.go diff --git a/src/migration/migrations/2022-10-18T062831Z_AddTwitchEnded.go b/src/migration/migrations/2022-10-18T062831Z_AddTwitchEnded.go new file mode 100644 index 0000000..c1f1f6a --- /dev/null +++ b/src/migration/migrations/2022-10-18T062831Z_AddTwitchEnded.go @@ -0,0 +1,57 @@ +package migrations + +import ( + "context" + "time" + + "git.handmade.network/hmn/hmn/src/migration/types" + "github.com/jackc/pgx/v4" +) + +func init() { + registerMigration(AddTwitchEnded{}) +} + +type AddTwitchEnded struct{} + +func (m AddTwitchEnded) Version() types.MigrationVersion { + return types.MigrationVersion(time.Date(2022, 10, 18, 6, 28, 31, 0, time.UTC)) +} + +func (m AddTwitchEnded) Name() string { + return "AddTwitchEnded" +} + +func (m AddTwitchEnded) Description() string { + return "Add stream_ended to twitch history" +} + +func (m AddTwitchEnded) Up(ctx context.Context, tx pgx.Tx) error { + _, err := tx.Exec(ctx, + ` + ALTER TABLE twitch_stream_history + ADD COLUMN stream_ended BOOLEAN NOT NULL DEFAULT FALSE; + `, + ) + if err != nil { + return err + } + _, err = tx.Exec(ctx, + ` + UPDATE twitch_stream_history + SET stream_ended = TRUE + WHERE ended_at > TIMESTAMP '2000-01-01 00:00:00'; + `, + ) + return err +} + +func (m AddTwitchEnded) Down(ctx context.Context, tx pgx.Tx) error { + _, err := tx.Exec(ctx, + ` + ALTER TABLE twitch_stream_history + DROP COLUMN stream_ended BOOLEAN NOT NULL DEFAULT FALSE; + `, + ) + return err +} diff --git a/src/models/twitch.go b/src/models/twitch.go index 75e0c00..648ca6a 100644 --- a/src/models/twitch.go +++ b/src/models/twitch.go @@ -22,6 +22,7 @@ type TwitchStreamHistory struct { TwitchLogin string `db:"twitch_login"` StartedAt time.Time `db:"started_at"` EndedAt time.Time `db:"ended_at"` + StreamEnded bool `db:"stream_ended"` EndApproximated bool `db:"end_approximated"` Title string `db:"title"` CategoryID string `db:"category_id"` diff --git a/src/twitch/rest.go b/src/twitch/rest.go index fa83ce4..5f83790 100644 --- a/src/twitch/rest.go +++ b/src/twitch/rest.go @@ -169,6 +169,8 @@ type archivedVideo struct { Duration time.Duration VODUrl string VODThumbnail string + RawDuration string + RawCreatedAt string } func getArchivedVideosForUser(ctx context.Context, twitchID string, numVODs int) ([]archivedVideo, error) { @@ -257,6 +259,8 @@ func getArchivedVideosByQuery(ctx context.Context, query url.Values) ([]archived Duration: duration, VODUrl: v.Url, VODThumbnail: v.ThumbnailUrl, + RawDuration: v.Duration, + RawCreatedAt: v.CreatedAt, } result = append(result, archived) } diff --git a/src/twitch/twitch.go b/src/twitch/twitch.go index 8255479..ec787ab 100644 --- a/src/twitch/twitch.go +++ b/src/twitch/twitch.go @@ -478,15 +478,15 @@ func notifyDiscordOfLiveStream(ctx context.Context, dbConn db.ConnOrTx) error { updatedHistories := make([]*models.TwitchStreamHistory, 0) for _, h := range history { relevant := isStreamRelevant(h.CategoryID, h.Tags) - if relevant && !h.EndedAt.IsZero() { + if relevant && h.StreamEnded { msgId, err := discord.PostStreamHistory(ctx, h) if err != nil { return oops.New(err, "failed to post twitch history to discord") } - h.DiscordNeedsUpdate = false h.DiscordMessageID = msgId - updatedHistories = append(updatedHistories, h) } + h.DiscordNeedsUpdate = false + updatedHistories = append(updatedHistories, h) } for _, h := range updatedHistories { @@ -645,6 +645,17 @@ func gotStreamOnline(ctx context.Context, conn db.ConnOrTx, status *streamStatus return err } twitchLog(ctx, conn, models.TwitchLogTypeOther, status.TwitchLogin, "GotStreamOnline", fmt.Sprintf("latest: %#v\nstatus: %#v", latest, status)) + + if latest.Live && latest.StreamID != status.StreamID { + // NOTE(asaf): Update history for previous stream + twitchLog(ctx, conn, models.TwitchLogTypeOther, status.TwitchLogin, "GotStreamOnline", fmt.Sprintf("Wrapping up previous stream")) + latest.Live = false + err = updateStreamHistory(ctx, conn, latest) + if err != nil { + return err + } + } + latest.Live = true latest.StreamID = status.StreamID latest.StartedAt = status.StartedAt @@ -717,12 +728,21 @@ func gotRESTUpdate(ctx context.Context, conn db.ConnOrTx, status *streamStatus) } twitchLog(ctx, conn, models.TwitchLogTypeOther, status.TwitchLogin, "GotRestUpdate", fmt.Sprintf("latest: %#v\nstatus: %#v", latest, status)) if latest.LastHookLiveUpdate.Add(3 * time.Minute).Before(time.Now()) { - latest.Live = status.Live if status.Live { + if latest.Live && status.StreamID != latest.StreamID { + twitchLog(ctx, conn, models.TwitchLogTypeOther, status.TwitchLogin, "GotRestUpdate", fmt.Sprintf("Wrapping up previous stream")) + latest.Live = false + err = updateStreamHistory(ctx, conn, latest) + if err != nil { + return err + } + } + // NOTE(asaf): We don't get this information if the user isn't live latest.StartedAt = status.StartedAt latest.StreamID = status.StreamID } + latest.Live = status.Live } if latest.LastHookChannelUpdate.Add(3 * time.Minute).Before(time.Now()) { if status.Live { @@ -781,6 +801,8 @@ func fetchLatestStreamStatus(ctx context.Context, conn db.ConnOrTx, twitchID str } if result.TwitchLogin != twitchLogin { + // NOTE(asaf): If someone changed their twitch login we should + // still reuse their db record by twitch_id. _, err = tx.Exec(ctx, ` UPDATE twitch_latest_status @@ -888,9 +910,10 @@ func updateStreamHistory(ctx context.Context, dbConn db.ConnOrTx, status *models return oops.New(err, "failed to fetch existing stream history") } - if !status.Live && history.EndedAt.IsZero() { + if !status.Live && !history.StreamEnded { twitchLog(ctx, dbConn, models.TwitchLogTypeOther, status.TwitchLogin, "updateStreamHistory", fmt.Sprintf("Setting end time\nstatus: %#v", status)) history.EndedAt = time.Now() + history.StreamEnded = true history.EndApproximated = true history.DiscordNeedsUpdate = true } @@ -906,9 +929,10 @@ func updateStreamHistory(ctx context.Context, dbConn db.ConnOrTx, status *models _, err = tx.Exec(ctx, ` INSERT INTO - twitch_stream_history (stream_id, twitch_id, twitch_login, started_at, ended_at, end_approximated, title, category_id, tags, discord_needs_update) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) + twitch_stream_history (stream_id, twitch_id, twitch_login, started_at, stream_ended, ended_at, end_approximated, title, category_id, tags, discord_needs_update) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) ON CONFLICT (stream_id) DO UPDATE SET + stream_ended = EXCLUDED.stream_ended, ended_at = EXCLUDED.ended_at, end_approximated = EXCLUDED.end_approximated, title = EXCLUDED.title, @@ -920,6 +944,7 @@ func updateStreamHistory(ctx context.Context, dbConn db.ConnOrTx, status *models history.TwitchID, history.TwitchLogin, history.StartedAt, + history.StreamEnded, history.EndedAt, history.EndApproximated, history.Title, @@ -935,32 +960,20 @@ func updateStreamHistory(ctx context.Context, dbConn db.ConnOrTx, status *models return oops.New(err, "failed to commit transaction") } - if !history.EndedAt.IsZero() { - twitchLog(ctx, dbConn, models.TwitchLogTypeOther, status.TwitchLogin, "updateStreamHistory", fmt.Sprintf("Checking VOD\nhistory: %#v", history)) - err = findHistoryVOD(ctx, dbConn, history) - if err != nil { - return oops.New(err, "failed to look up twitch vod") - } + twitchLog(ctx, dbConn, models.TwitchLogTypeOther, status.TwitchLogin, "updateStreamHistory", fmt.Sprintf("Checking VOD\nhistory: %#v", history)) + err = findHistoryVOD(ctx, dbConn, history) + if err != nil { + return oops.New(err, "failed to look up twitch vod") } return nil } func findHistoryVOD(ctx context.Context, dbConn db.ConnOrTx, history *models.TwitchStreamHistory) error { - if history.StreamID == "" || (history.VODID != "" && !history.EndedAt.IsZero()) || history.VODGone { + if history.StreamID == "" || history.VODGone { twitchLog(ctx, dbConn, models.TwitchLogTypeOther, history.TwitchLogin, "findHistoryVOD", fmt.Sprintf("Skipping VOD check\nhistory: %#v", history)) return nil } - latest, err := fetchLatestStreamStatus(ctx, dbConn, history.TwitchID, history.TwitchLogin) - if err != nil { - return oops.New(err, "failed to fetch latest status") - } - - stillLive := false - if latest.StreamID == history.StreamID && latest.Live { - stillLive = true - } - vods, err := getArchivedVideosForUser(ctx, history.TwitchID, 10) twitchLog(ctx, dbConn, models.TwitchLogTypeOther, history.TwitchLogin, "findHistoryVOD", fmt.Sprintf("vods: %#v\nhistory: %#v", vods, history)) if err != nil { @@ -979,11 +992,11 @@ func findHistoryVOD(ctx context.Context, dbConn db.ConnOrTx, history *models.Twi history.VODThumbnail = vod.VODThumbnail history.LastVerifiedVOD = time.Now() history.VODGone = false + history.DiscordNeedsUpdate = true - if !stillLive && vod.Duration.Minutes() > 0 { + if history.StreamEnded && vod.Duration.Minutes() > 0 { history.EndedAt = history.StartedAt.Add(vod.Duration) history.EndApproximated = false - history.DiscordNeedsUpdate = true } _, err = dbConn.Exec(ctx, @@ -1055,11 +1068,11 @@ func findMissingVODs(ctx context.Context, dbConn db.ConnOrTx) error { FROM twitch_stream_history WHERE vod_gone = FALSE AND - ended_at = $1 + stream_ended = TRUE AND + (end_approximated = TRUE OR vod_id = '') ORDER BY last_verified_vod ASC LIMIT 100 `, - time.Time{}, ) if err != nil { return oops.New(err, "failed to fetch stream history for vod updates")