Debugging twitch

This commit is contained in:
Asaf Gartner 2022-05-30 21:19:54 +03:00
parent 8c47590b99
commit fe545ff0f3
1 changed files with 92 additions and 38 deletions

View File

@ -92,20 +92,18 @@ func MonitorTwitchSubscriptions(ctx context.Context, dbConn *pgxpool.Pool) jobs.
syncWithTwitch(ctx, dbConn, false) syncWithTwitch(ctx, dbConn, false)
} else { } else {
// NOTE(asaf): The twitch API (getStreamStatus) lags behind the notification and // NOTE(asaf): The twitch API (getStreamStatus) lags behind the notification and
// would return old data if we called it immediately, so we have to // would return old data if we called it immediately, so we process
// wait a bit before we process the notification. We can get the // the notification to the extent we can, and later do a full update. We can get the
// category from the notification, but not the tags (or the up-to-date title), // category from the notification, but not the tags (or the up-to-date title),
// so we can't really skip this. // so we can't really skip this.
var timer *time.Timer var timer *time.Timer
t := time.AfterFunc(3*time.Minute, func() { t := time.AfterFunc(3*time.Minute, func() {
expiredTimers <- timer expiredTimers <- timer
processEventSubNotification(ctx, dbConn, &notification, true) updateStreamStatus(ctx, dbConn, notification.Status.TwitchID, notification.Status.TwitchLogin)
}) })
timer = t timer = t
timers = append(timers, t) timers = append(timers, t)
// NOTE(asaf): We also run this immediately. Since we get the category in the notification processEventSubNotification(ctx, dbConn, &notification)
// we could show the stream as online if it's the right category.
processEventSubNotification(ctx, dbConn, &notification, false)
} }
} }
} }
@ -158,7 +156,7 @@ func QueueTwitchNotification(messageType string, body []byte) error {
notification.Type = notificationTypeOffline notification.Type = notificationTypeOffline
case "channel.update": case "channel.update":
notification.Type = notificationTypeChannelUpdate notification.Type = notificationTypeChannelUpdate
notification.Status.Live = true // NOTE(asaf): Can't tell if the user is live here.
default: default:
return oops.New(nil, "unknown subscription type received") return oops.New(nil, "unknown subscription type received")
} }
@ -417,47 +415,103 @@ func notifyDiscordOfLiveStream(ctx context.Context, dbConn db.ConnOrTx) error {
return nil return nil
} }
func processEventSubNotification(ctx context.Context, dbConn db.ConnOrTx, notification *twitchNotification, delayed bool) { func updateStreamStatus(ctx context.Context, dbConn db.ConnOrTx, twitchID string, twitchLogin string) {
log := logging.ExtractLogger(ctx)
log.Debug().Str("TwitchID", twitchID).Msg("Updating stream status")
var err error
// NOTE(asaf): Verifying that the streamer we're processing hasn't been removed from our db in the meantime.
foundStreamer := false
allStreamers, err := hmndata.FetchTwitchStreamers(ctx, dbConn)
if err != nil {
log.Error().Err(err).Msg("failed to fetch hmn streamers")
return
}
for _, streamer := range allStreamers {
if streamer.TwitchLogin == twitchLogin {
foundStreamer = true
break
}
}
if !foundStreamer {
return
}
status := streamStatus{
TwitchID: twitchID,
Live: false,
}
result, err := getStreamStatus(ctx, []string{twitchID})
if err != nil {
log.Error().Str("TwitchID", twitchID).Err(err).Msg("failed to fetch stream status")
return
}
if len(result) > 0 {
log.Debug().Interface("Got status", result[0]).Msg("Got streamer status from twitch")
status = result[0]
}
err = updateStreamStatusInDB(ctx, dbConn, &status)
if err != nil {
log.Error().Err(err).Msg("failed to update twitch stream status")
return
}
log.Debug().Msg("Notifying discord")
err = notifyDiscordOfLiveStream(ctx, dbConn)
if err != nil {
log.Error().Err(err).Msg("failed to notify discord")
}
}
func processEventSubNotification(ctx context.Context, dbConn db.ConnOrTx, notification *twitchNotification) {
log := logging.ExtractLogger(ctx) log := logging.ExtractLogger(ctx)
log.Debug().Interface("Notification", notification).Msg("Processing twitch notification") log.Debug().Interface("Notification", notification).Msg("Processing twitch notification")
if notification.Type == notificationTypeNone { if notification.Type == notificationTypeNone {
return return
} }
status := streamStatus{ // NOTE(asaf): Verifying that the streamer we're processing hasn't been removed from our db in the meantime.
TwitchID: notification.Status.TwitchID, foundStreamer := false
Live: false, allStreamers, err := hmndata.FetchTwitchStreamers(ctx, dbConn)
if err != nil {
log.Error().Err(err).Msg("failed to fetch hmn streamers")
return
} }
var err error for _, streamer := range allStreamers {
if notification.Type == notificationTypeChannelUpdate || notification.Type == notificationTypeOnline { if streamer.TwitchLogin == notification.Status.TwitchLogin {
result, err := getStreamStatus(ctx, []string{notification.Status.TwitchID}) foundStreamer = true
if err != nil || len(result) == 0 { break
log.Error().Str("TwitchID", notification.Status.TwitchID).Err(err).Msg("failed to fetch stream status")
return
}
log.Debug().Interface("Got status", result[0]).Msg("Got streamer status for notification")
// NOTE(asaf): Verifying that the streamer we're processing hasn't been removed from our db in the meantime.
allStreamers, err := hmndata.FetchTwitchStreamers(ctx, dbConn)
if err != nil {
log.Error().Err(err).Msg("failed to fetch hmn streamers")
return
}
for _, streamer := range allStreamers {
if streamer.TwitchLogin == result[0].TwitchLogin {
if delayed || (result[0].Live == notification.Status.Live) {
status = result[0]
} else {
status = notification.Status
}
break
}
} }
} }
if !foundStreamer {
return
}
log.Debug().Interface("Status", status).Msg("Updating status") if notification.Type == notificationTypeOnline || notification.Type == notificationTypeOffline {
err = updateStreamStatusInDB(ctx, dbConn, &status) log.Debug().Interface("Status", notification.Status).Msg("Updating status")
if err != nil { err = updateStreamStatusInDB(ctx, dbConn, &notification.Status)
log.Error().Err(err).Msg("failed to update twitch stream status") if err != nil {
log.Error().Err(err).Msg("failed to update twitch stream status")
return
}
} else if notification.Type == notificationTypeChannelUpdate {
// NOTE(asaf): Channel updates can happen wether or not the streamer is live.
// So we just update the title if the user is live in our db, and we rely on the
// 3-minute delayed status update to verify live status and category/tag requirements.
_, err = dbConn.Exec(ctx,
`
UPDATE twitch_stream
SET title = $1
WHERE twitch_id = $2
`,
notification.Status.Title,
notification.Status.TwitchID,
)
if err != nil {
log.Error().Err(err).Msg("failed to update twitch stream status")
return
}
} }
log.Debug().Msg("Notifying discord") log.Debug().Msg("Notifying discord")