Added support for db arrays and some twitch fixes.

This commit is contained in:
Asaf Gartner 2022-10-13 00:40:37 +03:00
parent 5cc920dc2f
commit 0f58cfc2da
5 changed files with 132 additions and 92 deletions

View File

@ -618,6 +618,12 @@ func setValueFromDB(dest reflect.Value, value reflect.Value) {
dest.SetInt(value.Int()) dest.SetInt(value.Int())
case reflect.String: case reflect.String:
dest.SetString(value.String()) dest.SetString(value.String())
case reflect.Slice:
switch v := value.Interface().(type) {
case pgtype.Value:
v.AssignTo(dest.Interface())
default:
}
// TODO(ben): More kinds? All the kinds? It kind of feels like we should be able to assign to any destination whose underlying type is a primitive. // TODO(ben): More kinds? All the kinds? It kind of feels like we should be able to assign to any destination whose underlying type is a primitive.
default: default:
dest.Set(value) dest.Set(value)

View File

@ -167,7 +167,7 @@ func PostStreamHistory(ctx context.Context, history *models.TwitchStreamHistory)
} }
duration := history.EndedAt.Sub(history.StartedAt).Truncate(time.Minute).String() duration := history.EndedAt.Sub(history.StartedAt).Truncate(time.Minute).String()
messageContent := fmt.Sprintf( messageContent := fmt.Sprintf(
"**%s** was live: https://twitch.tv/%s\n> _%s_\n At <t:%d:F> For %s%s", "**%s** was live: https://twitch.tv/%s\n> _%s_\nAt <t:%d:F> For %s%s",
history.TwitchLogin, history.TwitchLogin,
history.TwitchLogin, history.TwitchLogin,
history.Title, history.Title,
@ -175,6 +175,9 @@ func PostStreamHistory(ctx context.Context, history *models.TwitchStreamHistory)
approximated, approximated,
duration, duration,
) )
if history.VODUrl != "" {
messageContent += fmt.Sprintf("\nVOD: %s", history.VODUrl)
}
msgJson, err := json.Marshal(CreateMessageRequest{ msgJson, err := json.Marshal(CreateMessageRequest{
Content: messageContent, Content: messageContent,
Flags: FlagSuppressEmbeds, Flags: FlagSuppressEmbeds,

View File

@ -35,6 +35,7 @@ func (m NewTwitchTracking) Up(ctx context.Context, tx pgx.Tx) error {
twitch_login VARCHAR(255) NOT NULL, twitch_login VARCHAR(255) NOT NULL,
started_at TIMESTAMP WITH TIME ZONE NOT NULL, started_at TIMESTAMP WITH TIME ZONE NOT NULL,
ended_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT 'epoch', ended_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT 'epoch',
end_approximated BOOLEAN NOT NULL DEFAULT FALSE,
title VARCHAR(255) NOT NULL DEFAULT '', title VARCHAR(255) NOT NULL DEFAULT '',
category_id VARCHAR(255) NOT NULL DEFAULT '', category_id VARCHAR(255) NOT NULL DEFAULT '',
tags VARCHAR(255) ARRAY NOT NULL DEFAULT '{}', tags VARCHAR(255) ARRAY NOT NULL DEFAULT '{}',
@ -72,7 +73,7 @@ func (m NewTwitchTracking) Down(ctx context.Context, tx pgx.Tx) error {
DROP TABLE twitch_stream_history; DROP TABLE twitch_stream_history;
DROP TABLE twitch_latest_status; DROP TABLE twitch_latest_status;
CREATE TABLE twitch_streams ( CREATE TABLE twitch_stream (
twitch_id VARCHAR(255) NOT NULL, twitch_id VARCHAR(255) NOT NULL,
twitch_login VARCHAR(255) NOT NULL, twitch_login VARCHAR(255) NOT NULL,
title VARCHAR(255) NOT NULL, title VARCHAR(255) NOT NULL,

View File

@ -174,7 +174,7 @@ type archivedVideo struct {
func getArchivedVideosForUser(ctx context.Context, twitchID string, numVODs int) ([]archivedVideo, error) { func getArchivedVideosForUser(ctx context.Context, twitchID string, numVODs int) ([]archivedVideo, error) {
query := url.Values{} query := url.Values{}
query.Add("user_id", twitchID) query.Add("user_id", twitchID)
query.Add("type", "archived") query.Add("type", "archive")
query.Add("first", strconv.Itoa(numVODs)) query.Add("first", strconv.Itoa(numVODs))
return getArchivedVideosByQuery(ctx, query) return getArchivedVideosByQuery(ctx, query)
@ -471,7 +471,7 @@ func doRequest(ctx context.Context, waitOnRateLimit bool, req *http.Request) (*h
if err != nil { if err != nil {
return nil, oops.New(err, "failed to read response body") return nil, oops.New(err, "failed to read response body")
} }
logging.ExtractLogger(ctx).Warn().Interface("Headers", res.Header).Int("Status code", res.StatusCode).Str("Body", string(body[:])).Msg("Unexpected status code from twitch") logging.ExtractLogger(ctx).Error().Interface("Headers", res.Header).Int("Status code", res.StatusCode).Str("Body", string(body[:])).Msg("Unexpected status code from twitch")
res.Body.Close() res.Body.Close()
return res, oops.New(nil, "got an unexpected status code from twitch") return res, oops.New(nil, "got an unexpected status code from twitch")
} }

View File

@ -267,13 +267,17 @@ func syncWithTwitch(ctx context.Context, dbConn *pgxpool.Pool, updateAll bool, u
streamerMap[streamer.TwitchLogin] = &streamers[idx] streamerMap[streamer.TwitchLogin] = &streamers[idx]
} }
p.StartBlock("TwitchAPI", "Fetch twitch user info") twitchUsers := []twitchUser{}
twitchUsers, err := getTwitchUsersByLogin(ctx, needID) if len(needID) > 0 {
if err != nil { p.StartBlock("TwitchAPI", "Fetch twitch user info")
log.Error().Err(err).Msg("Error while monitoring twitch") log.Debug().Interface("needID", needID).Msg("IDs")
return twitchUsers, err = getTwitchUsersByLogin(ctx, needID)
if err != nil {
log.Error().Err(err).Msg("Error while monitoring twitch")
return
}
p.EndBlock()
} }
p.EndBlock()
for _, tu := range twitchUsers { for _, tu := range twitchUsers {
streamerMap[tu.TwitchLogin].TwitchID = tu.TwitchID streamerMap[tu.TwitchLogin].TwitchID = tu.TwitchID
@ -394,43 +398,45 @@ func syncWithTwitch(ctx context.Context, dbConn *pgxpool.Pool, updateAll bool, u
} }
} }
p.StartBlock("TwitchAPI", "Fetch twitch stream statuses") if len(usersToUpdate) > 0 {
statuses, err := getStreamStatus(ctx, usersToUpdate) p.StartBlock("TwitchAPI", "Fetch twitch stream statuses")
if err != nil { statuses, err := getStreamStatus(ctx, usersToUpdate)
log.Error().Err(err).Msg("failed to fetch stream statuses") if err != nil {
return log.Error().Err(err).Msg("failed to fetch stream statuses")
} return
twitchLog(ctx, tx, models.TwitchLogTypeOther, "", "Batch resync", fmt.Sprintf("%#v", statuses))
p.EndBlock()
p.StartBlock("SQL", "Update stream statuses in db")
for _, twitchId := range usersToUpdate {
var status *streamStatus
for idx, st := range statuses {
if st.TwitchID == twitchId {
status = &statuses[idx]
break
}
} }
if status == nil { twitchLog(ctx, tx, models.TwitchLogTypeOther, "", "Batch resync", fmt.Sprintf("%#v", statuses))
twitchLogin := "" p.EndBlock()
for _, streamer := range validStreamers { p.StartBlock("SQL", "Update stream statuses in db")
if streamer.TwitchID == twitchId { for _, twitchId := range usersToUpdate {
twitchLogin = streamer.TwitchLogin var status *streamStatus
for idx, st := range statuses {
if st.TwitchID == twitchId {
status = &statuses[idx]
break break
} }
} }
status = &streamStatus{ if status == nil {
TwitchID: twitchId, twitchLogin := ""
TwitchLogin: twitchLogin, for _, streamer := range validStreamers {
if streamer.TwitchID == twitchId {
twitchLogin = streamer.TwitchLogin
break
}
}
status = &streamStatus{
TwitchID: twitchId,
TwitchLogin: twitchLogin,
}
}
twitchLog(ctx, tx, models.TwitchLogTypeREST, status.TwitchLogin, "Resync", fmt.Sprintf("%#v", status))
err = gotRESTUpdate(ctx, tx, status)
if err != nil {
log.Error().Err(err).Msg("failed to update twitch stream status")
} }
} }
twitchLog(ctx, tx, models.TwitchLogTypeREST, status.TwitchLogin, "Resync", fmt.Sprintf("%#v", status)) p.EndBlock()
err = gotRESTUpdate(ctx, tx, status)
if err != nil {
log.Error().Err(err).Msg("failed to update twitch stream status")
}
} }
p.EndBlock()
err = tx.Commit(ctx) err = tx.Commit(ctx)
if err != nil { if err != nil {
log.Error().Err(err).Msg("failed to commit transaction") log.Error().Err(err).Msg("failed to commit transaction")
@ -489,7 +495,7 @@ func notifyDiscordOfLiveStream(ctx context.Context, dbConn db.ConnOrTx) error {
UPDATE twitch_stream_history UPDATE twitch_stream_history
SET SET
discord_needs_update = $2, discord_needs_update = $2,
discord_message_id = $3, discord_message_id = $3
WHERE stream_id = $1 WHERE stream_id = $1
`, `,
h.StreamID, h.StreamID,
@ -638,6 +644,7 @@ func gotStreamOnline(ctx context.Context, conn db.ConnOrTx, status *streamStatus
if err != nil { if err != nil {
return err return err
} }
twitchLog(ctx, conn, models.TwitchLogTypeOther, status.TwitchLogin, "GotStreamOnline", fmt.Sprintf("latest: %#v\nstatus: %#v", latest, status))
latest.Live = true latest.Live = true
latest.StreamID = status.StreamID latest.StreamID = status.StreamID
latest.StartedAt = status.StartedAt latest.StartedAt = status.StartedAt
@ -658,6 +665,7 @@ func gotStreamOffline(ctx context.Context, conn db.ConnOrTx, status *streamStatu
if err != nil { if err != nil {
return err return err
} }
twitchLog(ctx, conn, models.TwitchLogTypeOther, status.TwitchLogin, "GotStreamOffline", fmt.Sprintf("latest: %#v\nstatus: %#v", latest, status))
latest.Live = false latest.Live = false
latest.LastHookLiveUpdate = time.Now() latest.LastHookLiveUpdate = time.Now()
err = saveLatestStreamStatus(ctx, conn, latest) err = saveLatestStreamStatus(ctx, conn, latest)
@ -676,6 +684,7 @@ func gotChannelUpdate(ctx context.Context, conn db.ConnOrTx, status *streamStatu
if err != nil { if err != nil {
return err return err
} }
twitchLog(ctx, conn, models.TwitchLogTypeOther, status.TwitchLogin, "GotChannelUpdate", fmt.Sprintf("latest: %#v\nstatus: %#v", latest, status))
if !latest.Live { if !latest.Live {
// NOTE(asaf): If the stream is live, this channel update applies // NOTE(asaf): If the stream is live, this channel update applies
// to the current livestream. Otherwise, this will // to the current livestream. Otherwise, this will
@ -706,6 +715,7 @@ func gotRESTUpdate(ctx context.Context, conn db.ConnOrTx, status *streamStatus)
if err != nil { if err != nil {
return err return err
} }
twitchLog(ctx, conn, models.TwitchLogTypeOther, status.TwitchLogin, "GotRestUpdate", fmt.Sprintf("latest: %#v\nstatus: %#v", latest, status))
if latest.LastHookLiveUpdate.Add(3 * time.Minute).Before(time.Now()) { if latest.LastHookLiveUpdate.Add(3 * time.Minute).Before(time.Now()) {
latest.Live = status.Live latest.Live = status.Live
if status.Live { if status.Live {
@ -764,6 +774,7 @@ func fetchLatestStreamStatus(ctx context.Context, conn db.ConnOrTx, twitchID str
result = &models.TwitchLatestStatus{ result = &models.TwitchLatestStatus{
TwitchID: twitchID, TwitchID: twitchID,
TwitchLogin: twitchLogin, TwitchLogin: twitchLogin,
Tags: []string{},
} }
} else if err != nil { } else if err != nil {
return nil, oops.New(err, "failed to fetch existing twitch status") return nil, oops.New(err, "failed to fetch existing twitch status")
@ -809,18 +820,20 @@ func saveLatestStreamStatus(ctx context.Context, conn db.ConnOrTx, latest *model
UPDATE twitch_latest_status UPDATE twitch_latest_status
SET SET
live = $2, live = $2,
started_at = $3, stream_id = $3,
title = $4, started_at = $4,
category_id = $5, title = $5,
tags = $6, category_id = $6,
last_hook_live_update = $7, tags = $7,
last_hook_channel_update = $8, last_hook_live_update = $8,
last_rest_update = $9 last_hook_channel_update = $9,
last_rest_update = $10
WHERE WHERE
twitch_id = $1 twitch_id = $1
`, `,
latest.TwitchID, latest.TwitchID,
latest.Live, latest.Live,
latest.StreamID,
latest.StartedAt, latest.StartedAt,
latest.Title, latest.Title,
latest.CategoryID, latest.CategoryID,
@ -842,6 +855,7 @@ func saveLatestStreamStatus(ctx context.Context, conn db.ConnOrTx, latest *model
func updateStreamHistory(ctx context.Context, dbConn db.ConnOrTx, status *models.TwitchLatestStatus) error { func updateStreamHistory(ctx context.Context, dbConn db.ConnOrTx, status *models.TwitchLatestStatus) error {
if status.StreamID == "" { if status.StreamID == "" {
twitchLog(ctx, dbConn, models.TwitchLogTypeOther, status.TwitchLogin, "updateStreamHistory", fmt.Sprintf("No StreamID - Skipping\nstatus: %#v", status))
return nil return nil
} }
tx, err := dbConn.Begin(ctx) tx, err := dbConn.Begin(ctx)
@ -917,11 +931,22 @@ func updateStreamHistory(ctx context.Context, dbConn db.ConnOrTx, status *models
} }
func findHistoryVOD(ctx context.Context, dbConn db.ConnOrTx, history *models.TwitchStreamHistory) error { func findHistoryVOD(ctx context.Context, dbConn db.ConnOrTx, history *models.TwitchStreamHistory) error {
if history.StreamID == "" || history.VODID != "" || history.VODGone { if history.StreamID == "" || (history.VODID != "" && !history.EndedAt.IsZero()) || history.VODGone {
return nil return nil
} }
latest, err := fetchLatestStreamStatus(ctx, dbConn, history.TwitchID, history.TwitchLogin)
if err != nil {
return oops.New(err, "failed to fetch latest status")
}
stillLive := false
if latest.StreamID == history.StreamID && latest.Live {
stillLive = true
}
vods, err := getArchivedVideosForUser(ctx, history.TwitchID, 10) vods, err := getArchivedVideosForUser(ctx, history.TwitchID, 10)
twitchLog(ctx, dbConn, models.TwitchLogTypeOther, history.TwitchLogin, "findHistoryVOD", fmt.Sprintf("vods: %#v\nhistory: %#v", vods, history))
if err != nil { if err != nil {
return oops.New(err, "failed to fetch vods for streamer") return oops.New(err, "failed to fetch vods for streamer")
} }
@ -939,9 +964,10 @@ func findHistoryVOD(ctx context.Context, dbConn db.ConnOrTx, history *models.Twi
history.LastVerifiedVOD = time.Now() history.LastVerifiedVOD = time.Now()
history.VODGone = false history.VODGone = false
if vod.Duration.Minutes() > 0 { if !stillLive && vod.Duration.Minutes() > 0 {
history.EndedAt = history.StartedAt.Add(vod.Duration) history.EndedAt = history.StartedAt.Add(vod.Duration)
history.EndApproximated = false history.EndApproximated = false
history.DiscordNeedsUpdate = true
} }
_, err = dbConn.Exec(ctx, _, err = dbConn.Exec(ctx,
@ -954,7 +980,8 @@ func findHistoryVOD(ctx context.Context, dbConn db.ConnOrTx, history *models.Twi
last_verified_vod = $5, last_verified_vod = $5,
vod_gone = $6, vod_gone = $6,
ended_at = $7, ended_at = $7,
end_approximated = $8 end_approximated = $8,
discord_needs_update = $9
WHERE stream_id = $1 WHERE stream_id = $1
`, `,
history.StreamID, history.StreamID,
@ -965,6 +992,7 @@ func findHistoryVOD(ctx context.Context, dbConn db.ConnOrTx, history *models.Twi
history.VODGone, history.VODGone,
history.EndedAt, history.EndedAt,
history.EndApproximated, history.EndApproximated,
history.DiscordNeedsUpdate,
) )
if err != nil { if err != nil {
return oops.New(err, "failed to update stream history with VOD") return oops.New(err, "failed to update stream history with VOD")
@ -996,19 +1024,19 @@ func findMissingVODs(ctx context.Context, dbConn db.ConnOrTx) error {
FROM twitch_stream_history FROM twitch_stream_history
WHERE WHERE
vod_gone = FALSE AND vod_gone = FALSE AND
vod_url = '' AND vod_id = ''
ended_at != $1
`, `,
time.Time{},
) )
if err != nil { if err != nil {
return oops.New(err, "failed to fetch stream history for vod updates") return oops.New(err, "failed to fetch stream history for vod updates")
} }
for _, history := range histories { for _, history := range histories {
err = findHistoryVOD(ctx, dbConn, history) if history.EndedAt.IsZero() {
if err != nil { err = findHistoryVOD(ctx, dbConn, history)
return err if err != nil {
return err
}
} }
} }
return nil return nil
@ -1038,31 +1066,32 @@ func verifyHistoryVODs(ctx context.Context, dbConn db.ConnOrTx) error {
videoIDs = append(videoIDs, h.VODID) videoIDs = append(videoIDs, h.VODID)
} }
VODs, err := getArchivedVideos(ctx, videoIDs) if len(videoIDs) > 0 {
if err != nil { VODs, err := getArchivedVideos(ctx, videoIDs)
return oops.New(err, "failed to fetch vods from twitch") if err != nil {
} return oops.New(err, "failed to fetch vods from twitch")
}
vodGone := make([]string, 0, len(histories)) vodGone := make([]string, 0, len(histories))
vodFound := make([]string, 0, len(histories)) vodFound := make([]string, 0, len(histories))
for _, h := range histories { for _, h := range histories {
found := false found := false
for _, vod := range VODs { for _, vod := range VODs {
if h.VODID == vod.ID { if h.VODID == vod.ID {
found = true found = true
break break
}
}
if !found {
vodGone = append(vodGone, h.VODID)
} else {
vodFound = append(vodFound, h.VODID)
} }
} }
if !found {
vodGone = append(vodGone, h.VODID)
} else {
vodFound = append(vodFound, h.VODID)
}
}
if len(vodGone) > 0 { if len(vodGone) > 0 {
_, err = dbConn.Exec(ctx, _, err = dbConn.Exec(ctx,
` `
UPDATE twitch_stream_history UPDATE twitch_stream_history
SET SET
discord_needs_update = TRUE, discord_needs_update = TRUE,
@ -1074,28 +1103,29 @@ func verifyHistoryVODs(ctx context.Context, dbConn db.ConnOrTx) error {
WHERE WHERE
vod_id = ANY($1) vod_id = ANY($1)
`, `,
vodGone, vodGone,
time.Now(), time.Now(),
) )
if err != nil { if err != nil {
return oops.New(err, "failed to update twitch history") return oops.New(err, "failed to update twitch history")
}
} }
}
if len(vodFound) > 0 { if len(vodFound) > 0 {
_, err = dbConn.Exec(ctx, _, err = dbConn.Exec(ctx,
` `
UPDATE twitch_stream_history UPDATE twitch_stream_history
SET SET
last_verified_vod = $2, last_verified_vod = $2,
WHERE WHERE
vod_id = ANY($1) vod_id = ANY($1)
`, `,
vodFound, vodFound,
time.Now(), time.Now(),
) )
if err != nil { if err != nil {
return oops.New(err, "failed to update twitch history") return oops.New(err, "failed to update twitch history")
}
} }
} }