hmn/src/twitch/twitch.go

1246 lines
35 KiB
Go
Raw Permalink Normal View History

2022-03-22 18:07:43 +00:00
package twitch
import (
"context"
"encoding/json"
"errors"
2022-08-28 22:18:55 +00:00
"fmt"
2022-03-22 18:07:43 +00:00
"time"
"git.handmade.network/hmn/hmn/src/config"
"git.handmade.network/hmn/hmn/src/db"
"git.handmade.network/hmn/hmn/src/discord"
"git.handmade.network/hmn/hmn/src/hmndata"
"git.handmade.network/hmn/hmn/src/jobs"
2022-03-22 18:07:43 +00:00
"git.handmade.network/hmn/hmn/src/logging"
2022-03-27 16:13:47 +00:00
"git.handmade.network/hmn/hmn/src/models"
2022-03-22 18:07:43 +00:00
"git.handmade.network/hmn/hmn/src/oops"
"git.handmade.network/hmn/hmn/src/perf"
"git.handmade.network/hmn/hmn/src/utils"
2023-01-02 21:52:41 +00:00
"github.com/jackc/pgx/v5/pgxpool"
2022-03-22 18:07:43 +00:00
)
2022-10-12 18:42:04 +00:00
// NOTE(asaf): The twitch api madness:
//
// | stream.online | stream.offline | channel.update[3] | REST[1][2]
// id[4] | YES | NO | NO | YES
// twitch_id | YES | YES | YES | YES
// twitch_login | YES | YES | YES | YES
// is_live | YES | IMPLICIT | NO | YES
// started_at | YES | NO | NO | YES
// title | NO | NO | YES | YES
// cat_id | NO | NO | YES | YES
// tags | NO | NO | NO | YES
//
// [1] REST returns nothing when user is not live
// [2] Information received from REST is ~3 minutes old.
// [3] channel.update also fires when the user changes their twitch channel settings when they're not live (i.e. as soon as they update it in twitch settings)
// [4] ID of the current livestream
type streamStatus struct {
StreamID string
TwitchID string
TwitchLogin string
Live bool
Title string
StartedAt time.Time
CategoryID string
Tags []string
}
type twitchNotificationType int
const (
notificationTypeNone twitchNotificationType = 0
notificationTypeOnline = 1
notificationTypeOffline = 2
notificationTypeChannelUpdate = 3
notificationTypeRevocation = 4
)
2022-03-22 18:07:43 +00:00
type twitchNotification struct {
2022-05-30 15:49:30 +00:00
Status streamStatus
Type twitchNotificationType
2022-03-22 18:07:43 +00:00
}
var twitchNotificationChannel chan twitchNotification
var linksChangedChannel chan struct{}
func MonitorTwitchSubscriptions(ctx context.Context, dbConn *pgxpool.Pool) jobs.Job {
2022-03-22 18:07:43 +00:00
log := logging.ExtractLogger(ctx).With().Str("twitch goroutine", "stream monitor").Logger()
ctx = logging.AttachLoggerToContext(&log, ctx)
if config.Config.Twitch.ClientID == "" {
log.Warn().Msg("No twitch config provided.")
return jobs.Noop()
2022-03-22 18:07:43 +00:00
}
twitchNotificationChannel = make(chan twitchNotification, 100)
linksChangedChannel = make(chan struct{}, 10)
job := jobs.New()
2022-03-22 18:07:43 +00:00
go func() {
defer func() {
log.Info().Msg("Shutting down twitch monitor")
job.Done()
2022-03-22 18:07:43 +00:00
}()
log.Info().Msg("Running twitch monitor...")
monitorTicker := time.NewTicker(2 * time.Hour)
firstRunChannel := make(chan struct{}, 1)
firstRunChannel <- struct{}{}
2022-03-27 17:30:24 +00:00
timers := make([]*time.Timer, 0)
expiredTimers := make(chan *time.Timer, 10)
2022-03-22 18:07:43 +00:00
for {
done, err := func() (done bool, retErr error) {
defer utils.RecoverPanicAsError(&retErr)
select {
case <-ctx.Done():
for _, timer := range timers {
timer.Stop()
2022-03-27 17:30:24 +00:00
}
return true, nil
case expired := <-expiredTimers:
for idx, timer := range timers {
if timer == expired {
timers = append(timers[:idx], timers[idx+1:]...)
break
}
}
case <-firstRunChannel:
err := refreshAccessToken(ctx)
if err != nil {
log.Error().Err(err).Msg("Failed to fetch refresh token on start")
return true, nil
}
2022-10-12 18:42:04 +00:00
syncWithTwitch(ctx, dbConn, true, true)
case <-monitorTicker.C:
2022-08-28 22:18:55 +00:00
twitchLogClear(ctx, dbConn)
2022-10-12 18:42:04 +00:00
syncWithTwitch(ctx, dbConn, true, true)
case <-linksChangedChannel:
// NOTE(asaf): Since we update links inside transactions for users/projects
// we won't see the updated list of links until the transaction is committed.
// Waiting 5 seconds is just a quick workaround for that. It's not
// convenient to only trigger this after the transaction is committed.
2022-05-30 15:49:30 +00:00
var timer *time.Timer
t := time.AfterFunc(5*time.Second, func() {
2022-05-30 15:49:30 +00:00
expiredTimers <- timer
2022-10-12 18:42:04 +00:00
syncWithTwitch(ctx, dbConn, false, false)
2022-05-30 15:49:30 +00:00
})
timer = t
timers = append(timers, t)
case notification := <-twitchNotificationChannel:
if notification.Type == notificationTypeRevocation {
2022-10-12 18:42:04 +00:00
syncWithTwitch(ctx, dbConn, false, false)
} else {
// NOTE(asaf): The twitch API (getStreamStatus) lags behind the notification and
// would return old data if we called it immediately, so we process
// the notification to the extent we can, and later do a full update. We can get the
// category from the notification, but not the tags (or the up-to-date title),
// so we can't really skip this.
var timer *time.Timer
t := time.AfterFunc(3*time.Minute, func() {
expiredTimers <- timer
updateStreamStatus(ctx, dbConn, notification.Status.TwitchID, notification.Status.TwitchLogin)
})
timer = t
timers = append(timers, t)
processEventSubNotification(ctx, dbConn, &notification)
}
2022-03-22 18:07:43 +00:00
}
return false, nil
}()
if err != nil {
log.Error().Err(err).Msg("Panicked in MonitorTwitchSubscriptions")
} else if done {
return
2022-03-22 18:07:43 +00:00
}
}
}()
return job
2022-03-22 18:07:43 +00:00
}
2022-08-28 22:18:55 +00:00
func QueueTwitchNotification(ctx context.Context, conn db.ConnOrTx, messageType string, body []byte) error {
2022-03-22 18:07:43 +00:00
var notification twitchNotification
if messageType == "notification" {
type notificationJson struct {
Subscription struct {
Type string `json:"type"`
} `json:"subscription"`
Event struct {
2022-10-12 18:42:04 +00:00
StreamID string `json:"id"`
2022-03-22 18:07:43 +00:00
BroadcasterUserID string `json:"broadcaster_user_id"`
BroadcasterUserLogin string `json:"broadcaster_user_login"`
2022-05-30 15:49:30 +00:00
Title string `json:"title"`
CategoryID string `json:"category_id"`
2022-03-22 18:07:43 +00:00
} `json:"event"`
}
var incoming notificationJson
err := json.Unmarshal(body, &incoming)
if err != nil {
return oops.New(err, "failed to parse notification body")
}
2022-08-28 22:18:55 +00:00
twitchLog(ctx, conn, models.TwitchLogTypeHook, incoming.Event.BroadcasterUserLogin, "Got hook: "+incoming.Subscription.Type, string(body))
2022-05-30 15:49:30 +00:00
notification.Status.TwitchID = incoming.Event.BroadcasterUserID
notification.Status.TwitchLogin = incoming.Event.BroadcasterUserLogin
notification.Status.Title = incoming.Event.Title
2022-10-12 18:42:04 +00:00
notification.Status.CategoryID = incoming.Event.CategoryID
2022-05-30 15:49:30 +00:00
notification.Status.StartedAt = time.Now()
2022-03-22 18:07:43 +00:00
switch incoming.Subscription.Type {
case "stream.online":
notification.Type = notificationTypeOnline
2022-05-30 15:49:30 +00:00
notification.Status.Live = true
2022-10-12 18:42:04 +00:00
notification.Status.StreamID = incoming.Event.StreamID
2022-03-22 18:07:43 +00:00
case "stream.offline":
notification.Type = notificationTypeOffline
case "channel.update":
notification.Type = notificationTypeChannelUpdate
2022-05-30 18:19:54 +00:00
// NOTE(asaf): Can't tell if the user is live here.
2022-03-22 18:07:43 +00:00
default:
return oops.New(nil, "unknown subscription type received")
}
} else if messageType == "revocation" {
2022-08-28 22:18:55 +00:00
twitchLog(ctx, conn, models.TwitchLogTypeHook, "", "Got hook: Revocation", string(body))
2022-03-22 18:07:43 +00:00
notification.Type = notificationTypeRevocation
}
if twitchNotificationChannel != nil && notification.Type != notificationTypeNone {
select {
case twitchNotificationChannel <- notification:
default:
return oops.New(nil, "twitch notification channel is full")
}
}
return nil
}
func UserOrProjectLinksUpdated(twitchLoginsPreChange, twitchLoginsPostChange []string) {
if linksChangedChannel != nil {
twitchChanged := (len(twitchLoginsPreChange) != len(twitchLoginsPostChange))
if !twitchChanged {
for idx, _ := range twitchLoginsPreChange {
if twitchLoginsPreChange[idx] != twitchLoginsPostChange[idx] {
twitchChanged = true
break
}
}
}
2022-03-27 17:30:24 +00:00
select {
case linksChangedChannel <- struct{}{}:
default:
2022-03-22 18:07:43 +00:00
}
}
}
2022-10-12 18:42:04 +00:00
func syncWithTwitch(ctx context.Context, dbConn *pgxpool.Pool, updateAll bool, updateVODs bool) {
2022-03-22 18:07:43 +00:00
log := logging.ExtractLogger(ctx)
log.Info().Msg("Running twitch sync")
p := perf.MakeNewRequestPerf("Background job", "", "syncWithTwitch")
defer func() {
p.EndRequest()
perf.LogPerf(p, log.Info())
}()
type twitchSyncStats struct {
NumSubbed int
NumUnsubbed int
NumStreamsChecked int
}
var stats twitchSyncStats
p.StartBlock("SQL", "Fetch list of streamers")
streamers, err := hmndata.FetchTwitchStreamers(ctx, dbConn)
if err != nil {
log.Error().Err(err).Msg("Error while monitoring twitch")
return
}
p.EndBlock()
needID := make([]string, 0)
streamerMap := make(map[string]*hmndata.TwitchStreamer)
for idx, streamer := range streamers {
needID = append(needID, streamer.TwitchLogin)
streamerMap[streamer.TwitchLogin] = &streamers[idx]
}
twitchUsers := []twitchUser{}
if len(needID) > 0 {
p.StartBlock("TwitchAPI", "Fetch twitch user info")
log.Debug().Interface("needID", needID).Msg("IDs")
twitchUsers, err = getTwitchUsersByLogin(ctx, needID)
if err != nil {
log.Error().Err(err).Msg("Error while monitoring twitch")
return
}
p.EndBlock()
2022-03-22 18:07:43 +00:00
}
for _, tu := range twitchUsers {
streamerMap[tu.TwitchLogin].TwitchID = tu.TwitchID
}
validStreamers := make([]hmndata.TwitchStreamer, 0, len(streamers))
for _, streamer := range streamers {
if len(streamer.TwitchID) > 0 {
validStreamers = append(validStreamers, streamer)
}
}
p.StartBlock("TwitchAPI", "Fetch event subscriptions")
subscriptions, err := getEventSubscriptions(ctx)
if err != nil {
log.Error().Err(err).Msg("Error while monitoring twitch")
return
}
p.EndBlock()
type isSubbedByType map[string]bool
streamerEventSubs := make(map[string]isSubbedByType)
for _, streamer := range validStreamers {
streamerEventSubs[streamer.TwitchID] = make(isSubbedByType)
streamerEventSubs[streamer.TwitchID]["channel.update"] = false
streamerEventSubs[streamer.TwitchID]["stream.online"] = false
streamerEventSubs[streamer.TwitchID]["stream.offline"] = false
}
type unsubEvent struct {
TwitchID string
EventID string
}
toUnsub := make([]unsubEvent, 0)
for _, sub := range subscriptions {
handled := false
if eventSubs, ok := streamerEventSubs[sub.TwitchID]; ok {
if _, ok := eventSubs[sub.Type]; ok { // Make sure it's a known type
if !sub.GoodStatus {
log.Debug().Str("TwitchID", sub.TwitchID).Str("Event Type", sub.Type).Msg("Twitch doesn't like our sub")
toUnsub = append(toUnsub, unsubEvent{TwitchID: sub.TwitchID, EventID: sub.EventID})
} else {
2022-03-22 18:52:05 +00:00
streamerEventSubs[sub.TwitchID][sub.Type] = true
2022-03-22 18:07:43 +00:00
}
handled = true
}
}
if !handled {
// NOTE(asaf): Found an unknown type or an event subscription that we don't have a matching user for.
// Make sure we unsubscribe.
toUnsub = append(toUnsub, unsubEvent{TwitchID: sub.TwitchID, EventID: sub.EventID})
}
}
if config.Config.Env != config.Dev { // NOTE(asaf): Can't subscribe to events from dev. We need a non-localhost callback url.
p.StartBlock("TwitchAPI", "Sync subscriptions with twitch")
for _, ev := range toUnsub {
err = unsubscribeFromEvent(ctx, ev.EventID)
if err != nil {
log.Error().Err(err).Msg("Error while unsubscribing events")
// NOTE(asaf): Soft error. Don't care if it fails.
}
stats.NumUnsubbed += 1
}
for twitchID, evStatuses := range streamerEventSubs {
for evType, isSubbed := range evStatuses {
if !isSubbed {
err = subscribeToEvent(ctx, evType, twitchID)
if err != nil {
log.Error().Err(err).Msg("Error while monitoring twitch")
return
}
stats.NumSubbed += 1
}
}
}
p.EndBlock()
}
tx, err := dbConn.Begin(ctx)
if err != nil {
log.Error().Err(err).Msg("failed to start transaction")
}
defer tx.Rollback(ctx)
allIDs := make([]string, 0, len(validStreamers))
for _, streamer := range validStreamers {
allIDs = append(allIDs, streamer.TwitchID)
}
p.StartBlock("SQL", "Remove untracked streamers")
_, err = tx.Exec(ctx,
2022-10-14 04:36:31 +00:00
`DELETE FROM twitch_latest_status WHERE NOT (twitch_id = ANY($1))`,
2022-03-22 18:07:43 +00:00
allIDs,
)
if err != nil {
log.Error().Err(err).Msg("Failed to remove untracked twitch ids from streamer list in db")
return
}
p.EndBlock()
usersToUpdate := make([]string, 0)
if updateAll {
usersToUpdate = allIDs
} else {
// NOTE(asaf): Twitch can revoke our subscriptions, so we need to
// update users whose subs were revoked or missing since last time we checked.
for twitchID, evStatuses := range streamerEventSubs {
for _, isSubbed := range evStatuses {
if !isSubbed {
usersToUpdate = append(usersToUpdate, twitchID)
break
}
}
}
}
if len(usersToUpdate) > 0 {
p.StartBlock("TwitchAPI", "Fetch twitch stream statuses")
statuses, err := getStreamStatus(ctx, usersToUpdate)
if err != nil {
log.Error().Err(err).Msg("failed to fetch stream statuses")
return
2022-10-12 18:42:04 +00:00
}
twitchLog(ctx, tx, models.TwitchLogTypeOther, "", "Batch resync", fmt.Sprintf("%#v", statuses))
p.EndBlock()
p.StartBlock("SQL", "Update stream statuses in db")
for _, twitchId := range usersToUpdate {
var status *streamStatus
for idx, st := range statuses {
if st.TwitchID == twitchId {
status = &statuses[idx]
2022-10-12 18:42:04 +00:00
break
}
}
if status == nil {
twitchLogin := ""
for _, streamer := range validStreamers {
if streamer.TwitchID == twitchId {
twitchLogin = streamer.TwitchLogin
break
}
}
status = &streamStatus{
TwitchID: twitchId,
TwitchLogin: twitchLogin,
}
}
twitchLog(ctx, tx, models.TwitchLogTypeREST, status.TwitchLogin, "Resync", fmt.Sprintf("%#v", status))
err = gotRESTUpdate(ctx, tx, status)
if err != nil {
log.Error().Err(err).Msg("failed to update twitch stream status")
2022-10-12 18:42:04 +00:00
}
}
p.EndBlock()
2022-03-22 18:07:43 +00:00
}
err = tx.Commit(ctx)
if err != nil {
log.Error().Err(err).Msg("failed to commit transaction")
}
stats.NumStreamsChecked += len(usersToUpdate)
log.Info().Interface("Stats", stats).Msg("Twitch sync done")
2022-10-12 18:42:04 +00:00
if updateVODs {
err = findMissingVODs(ctx, dbConn)
if err != nil {
log.Error().Err(err).Msg("failed to find missing twitch vods")
}
err = verifyHistoryVODs(ctx, dbConn)
if err != nil {
log.Error().Err(err).Msg("failed to verify twitch vods")
}
}
log.Debug().Msg("Notifying discord")
err = notifyDiscordOfLiveStream(ctx, dbConn)
if err != nil {
log.Error().Err(err).Msg("failed to notify discord")
}
2022-03-22 18:07:43 +00:00
}
func notifyDiscordOfLiveStream(ctx context.Context, dbConn db.ConnOrTx) error {
2022-10-12 18:42:04 +00:00
history, err := db.Query[models.TwitchStreamHistory](ctx, dbConn,
`
SELECT $columns
FROM twitch_stream_history
WHERE discord_needs_update = TRUE
ORDER BY started_at ASC
`,
)
if err != nil {
return oops.New(err, "failed to fetch twitch history")
}
updatedHistories := make([]*models.TwitchStreamHistory, 0)
for _, h := range history {
relevant := isStreamRelevant(h.CategoryID, h.Tags)
2022-10-19 08:57:34 +00:00
if relevant && h.StreamEnded {
2022-10-12 18:42:04 +00:00
msgId, err := discord.PostStreamHistory(ctx, h)
if err != nil {
return oops.New(err, "failed to post twitch history to discord")
}
h.DiscordMessageID = msgId
}
2022-10-19 08:57:34 +00:00
h.DiscordNeedsUpdate = false
updatedHistories = append(updatedHistories, h)
2022-10-12 18:42:04 +00:00
}
for _, h := range updatedHistories {
_, err = dbConn.Exec(ctx,
`
UPDATE twitch_stream_history
SET
discord_needs_update = $2,
discord_message_id = $3
2022-10-12 18:42:04 +00:00
WHERE stream_id = $1
`,
h.StreamID,
h.DiscordNeedsUpdate,
h.DiscordMessageID,
)
if err != nil {
return oops.New(err, "failed to update twitch history after posting to discord")
}
}
streams, err := db.Query[models.TwitchLatestStatus](ctx, dbConn,
`
SELECT $columns
FROM
2022-10-12 18:42:04 +00:00
twitch_latest_status
WHERE live = TRUE
ORDER BY started_at ASC
`,
)
if err != nil {
return oops.New(err, "failed to fetch livestreams from db")
}
var streamDetails []hmndata.StreamDetails
for _, s := range streams {
2022-10-12 18:42:04 +00:00
if isStreamRelevant(s.CategoryID, s.Tags) {
streamDetails = append(streamDetails, hmndata.StreamDetails{
Username: s.TwitchLogin,
StartTime: s.StartedAt,
Title: s.Title,
})
}
2022-03-22 18:07:43 +00:00
}
err = discord.UpdateStreamers(ctx, dbConn, streamDetails)
if err != nil {
return oops.New(err, "failed to update discord with livestream info")
}
return nil
2022-03-22 18:07:43 +00:00
}
2022-05-30 18:19:54 +00:00
func updateStreamStatus(ctx context.Context, dbConn db.ConnOrTx, twitchID string, twitchLogin string) {
2022-03-27 16:41:21 +00:00
log := logging.ExtractLogger(ctx)
2022-05-30 18:19:54 +00:00
log.Debug().Str("TwitchID", twitchID).Msg("Updating stream status")
var err error
// NOTE(asaf): Verifying that the streamer we're processing hasn't been removed from our db in the meantime.
foundStreamer := false
allStreamers, err := hmndata.FetchTwitchStreamers(ctx, dbConn)
if err != nil {
log.Error().Err(err).Msg("failed to fetch hmn streamers")
return
}
for _, streamer := range allStreamers {
if streamer.TwitchLogin == twitchLogin {
foundStreamer = true
break
}
}
if !foundStreamer {
2022-03-22 18:07:43 +00:00
return
}
status := streamStatus{
2022-10-12 18:42:04 +00:00
TwitchID: twitchID,
TwitchLogin: twitchLogin,
Live: false,
2022-03-22 18:07:43 +00:00
}
2022-05-30 18:19:54 +00:00
result, err := getStreamStatus(ctx, []string{twitchID})
if err != nil {
log.Error().Str("TwitchID", twitchID).Err(err).Msg("failed to fetch stream status")
return
}
2022-08-28 22:18:55 +00:00
twitchLog(ctx, dbConn, models.TwitchLogTypeREST, twitchLogin, "Fetched status", fmt.Sprintf("%#v", result))
2022-05-30 18:19:54 +00:00
if len(result) > 0 {
log.Debug().Interface("Got status", result[0]).Msg("Got streamer status from twitch")
status = result[0]
}
2022-10-12 18:42:04 +00:00
err = gotRESTUpdate(ctx, dbConn, &status)
2022-05-30 18:19:54 +00:00
if err != nil {
log.Error().Err(err).Msg("failed to update twitch stream status")
return
}
log.Debug().Msg("Notifying discord")
err = notifyDiscordOfLiveStream(ctx, dbConn)
if err != nil {
log.Error().Err(err).Msg("failed to notify discord")
}
}
func processEventSubNotification(ctx context.Context, dbConn db.ConnOrTx, notification *twitchNotification) {
log := logging.ExtractLogger(ctx)
log.Debug().Interface("Notification", notification).Msg("Processing twitch notification")
if notification.Type == notificationTypeNone {
return
}
// NOTE(asaf): Verifying that the streamer we're processing hasn't been removed from our db in the meantime.
foundStreamer := false
allStreamers, err := hmndata.FetchTwitchStreamers(ctx, dbConn)
if err != nil {
log.Error().Err(err).Msg("failed to fetch hmn streamers")
return
}
for _, streamer := range allStreamers {
if streamer.TwitchLogin == notification.Status.TwitchLogin {
foundStreamer = true
break
2022-03-22 18:07:43 +00:00
}
2022-05-30 18:19:54 +00:00
}
if !foundStreamer {
return
}
2022-08-28 22:18:55 +00:00
twitchLog(ctx, dbConn, models.TwitchLogTypeHook, notification.Status.TwitchLogin, "Processing hook", fmt.Sprintf("%#v", notification))
2022-10-12 18:42:04 +00:00
switch notification.Type {
case notificationTypeOnline:
err := gotStreamOnline(ctx, dbConn, &notification.Status)
2022-03-22 18:07:43 +00:00
if err != nil {
2022-05-30 18:19:54 +00:00
log.Error().Err(err).Msg("failed to update twitch stream status")
2022-03-22 18:07:43 +00:00
}
2022-10-12 18:42:04 +00:00
case notificationTypeOffline:
err := gotStreamOffline(ctx, dbConn, &notification.Status)
if err != nil {
log.Error().Err(err).Msg("failed to update twitch stream status")
}
case notificationTypeChannelUpdate:
err := gotChannelUpdate(ctx, dbConn, &notification.Status)
2022-05-30 18:19:54 +00:00
if err != nil {
log.Error().Err(err).Msg("failed to update twitch stream status")
2022-03-22 18:07:43 +00:00
}
}
log.Debug().Msg("Notifying discord")
err = notifyDiscordOfLiveStream(ctx, dbConn)
if err != nil {
log.Error().Err(err).Msg("failed to notify discord")
2022-03-22 18:07:43 +00:00
}
}
2022-10-12 18:42:04 +00:00
func gotStreamOnline(ctx context.Context, conn db.ConnOrTx, status *streamStatus) error {
latest, err := fetchLatestStreamStatus(ctx, conn, status.TwitchID, status.TwitchLogin)
if err != nil {
return err
}
twitchLog(ctx, conn, models.TwitchLogTypeOther, status.TwitchLogin, "GotStreamOnline", fmt.Sprintf("latest: %#v\nstatus: %#v", latest, status))
2022-10-19 08:57:34 +00:00
if latest.Live && latest.StreamID != status.StreamID {
// NOTE(asaf): Update history for previous stream
twitchLog(ctx, conn, models.TwitchLogTypeOther, status.TwitchLogin, "GotStreamOnline", fmt.Sprintf("Wrapping up previous stream"))
latest.Live = false
err = updateStreamHistory(ctx, conn, latest)
if err != nil {
return err
}
}
2022-10-12 18:42:04 +00:00
latest.Live = true
latest.StreamID = status.StreamID
latest.StartedAt = status.StartedAt
latest.LastHookLiveUpdate = time.Now()
err = saveLatestStreamStatus(ctx, conn, latest)
if err != nil {
return err
}
err = updateStreamHistory(ctx, conn, latest)
if err != nil {
return err
}
return nil
}
func gotStreamOffline(ctx context.Context, conn db.ConnOrTx, status *streamStatus) error {
latest, err := fetchLatestStreamStatus(ctx, conn, status.TwitchID, status.TwitchLogin)
if err != nil {
return err
}
twitchLog(ctx, conn, models.TwitchLogTypeOther, status.TwitchLogin, "GotStreamOffline", fmt.Sprintf("latest: %#v\nstatus: %#v", latest, status))
2022-10-12 18:42:04 +00:00
latest.Live = false
latest.LastHookLiveUpdate = time.Now()
err = saveLatestStreamStatus(ctx, conn, latest)
if err != nil {
return err
}
err = updateStreamHistory(ctx, conn, latest)
if err != nil {
return err
}
return nil
}
func gotChannelUpdate(ctx context.Context, conn db.ConnOrTx, status *streamStatus) error {
latest, err := fetchLatestStreamStatus(ctx, conn, status.TwitchID, status.TwitchLogin)
if err != nil {
return err
}
twitchLog(ctx, conn, models.TwitchLogTypeOther, status.TwitchLogin, "GotChannelUpdate", fmt.Sprintf("latest: %#v\nstatus: %#v", latest, status))
2022-10-12 18:42:04 +00:00
if !latest.Live {
// NOTE(asaf): If the stream is live, this channel update applies
// to the current livestream. Otherwise, this will
// only apply to the next stream, so we clear out
// the stream info.
latest.StreamID = ""
latest.StartedAt = time.Time{}
}
latest.Title = status.Title
if latest.CategoryID != status.CategoryID {
latest.CategoryID = status.CategoryID
latest.Tags = []string{} // NOTE(asaf): We don't get tags here, but we can't assume they didn't change because some tags are automatic based on category
}
latest.LastHookChannelUpdate = time.Now()
err = saveLatestStreamStatus(ctx, conn, latest)
if err != nil {
return err
}
err = updateStreamHistory(ctx, conn, latest)
if err != nil {
return err
}
return nil
}
func gotRESTUpdate(ctx context.Context, conn db.ConnOrTx, status *streamStatus) error {
latest, err := fetchLatestStreamStatus(ctx, conn, status.TwitchID, status.TwitchLogin)
if err != nil {
return err
}
twitchLog(ctx, conn, models.TwitchLogTypeOther, status.TwitchLogin, "GotRestUpdate", fmt.Sprintf("latest: %#v\nstatus: %#v", latest, status))
2022-10-12 18:42:04 +00:00
if latest.LastHookLiveUpdate.Add(3 * time.Minute).Before(time.Now()) {
if status.Live {
2022-10-19 08:57:34 +00:00
if latest.Live && status.StreamID != latest.StreamID {
twitchLog(ctx, conn, models.TwitchLogTypeOther, status.TwitchLogin, "GotRestUpdate", fmt.Sprintf("Wrapping up previous stream"))
latest.Live = false
err = updateStreamHistory(ctx, conn, latest)
if err != nil {
return err
}
}
2022-10-12 18:42:04 +00:00
// NOTE(asaf): We don't get this information if the user isn't live
latest.StartedAt = status.StartedAt
latest.StreamID = status.StreamID
}
2022-10-19 08:57:34 +00:00
latest.Live = status.Live
2022-10-12 18:42:04 +00:00
}
if latest.LastHookChannelUpdate.Add(3 * time.Minute).Before(time.Now()) {
if status.Live {
// NOTE(asaf): We don't get this information if the user isn't live
latest.Title = status.Title
latest.CategoryID = status.CategoryID
latest.Tags = status.Tags
}
}
latest.LastRESTUpdate = time.Now()
err = saveLatestStreamStatus(ctx, conn, latest)
if err != nil {
return err
}
err = updateStreamHistory(ctx, conn, latest)
if err != nil {
return err
}
return nil
}
func fetchLatestStreamStatus(ctx context.Context, conn db.ConnOrTx, twitchID string, twitchLogin string) (*models.TwitchLatestStatus, error) {
tx, err := conn.Begin(ctx)
if err != nil {
return nil, oops.New(err, "failed to begin transaction for stream status fetch")
}
defer tx.Rollback(ctx)
2022-10-14 04:36:31 +00:00
result, err := db.QueryOne[models.TwitchLatestStatus](ctx, tx,
2022-10-12 18:42:04 +00:00
`
SELECT $columns
FROM twitch_latest_status
WHERE twitch_id = $1
`,
twitchID,
)
if err == db.NotFound {
2022-10-14 04:36:31 +00:00
twitchLog(ctx, tx, models.TwitchLogTypeOther, twitchLogin, "Creating new streamer", fmt.Sprintf("twitchID: %s", twitchID))
_, err = tx.Exec(ctx,
2022-03-22 18:07:43 +00:00
`
2022-10-12 18:42:04 +00:00
INSERT INTO twitch_latest_status (twitch_id, twitch_login)
VALUES ($1, $2)
2022-03-22 18:07:43 +00:00
`,
2022-10-12 18:42:04 +00:00
twitchID,
twitchLogin,
2022-03-22 18:07:43 +00:00
)
if err != nil {
2022-10-12 18:42:04 +00:00
return nil, err
}
result = &models.TwitchLatestStatus{
TwitchID: twitchID,
TwitchLogin: twitchLogin,
}
} else if err != nil {
return nil, oops.New(err, "failed to fetch existing twitch status")
}
if result.TwitchLogin != twitchLogin {
2022-10-19 08:57:34 +00:00
// NOTE(asaf): If someone changed their twitch login we should
// still reuse their db record by twitch_id.
2022-10-14 04:36:31 +00:00
_, err = tx.Exec(ctx,
2022-10-12 18:42:04 +00:00
`
UPDATE twitch_latest_status
SET twitch_login = $2
WHERE twitch_id = $1
`,
twitchID,
twitchLogin,
)
if err != nil {
return nil, oops.New(err, "failed to update twitch login")
}
result.TwitchLogin = twitchLogin
}
err = tx.Commit(ctx)
if err != nil {
return nil, oops.New(err, "failed to commit transaction")
}
return result, nil
}
func saveLatestStreamStatus(ctx context.Context, conn db.ConnOrTx, latest *models.TwitchLatestStatus) error {
tx, err := conn.Begin(ctx)
if err != nil {
return oops.New(err, "failed to start transaction for stream status update")
}
defer tx.Rollback(ctx)
// NOTE(asaf): Ensure that we have a record for it in the db
2022-10-14 04:36:31 +00:00
_, err = fetchLatestStreamStatus(ctx, tx, latest.TwitchID, latest.TwitchLogin)
2022-10-12 18:42:04 +00:00
if err != nil {
return err
}
2022-10-14 03:37:50 +00:00
if latest.Tags == nil {
latest.Tags = make([]string, 0)
}
2022-10-14 04:36:31 +00:00
_, err = tx.Exec(ctx,
2022-10-12 18:42:04 +00:00
`
UPDATE twitch_latest_status
SET
live = $2,
stream_id = $3,
started_at = $4,
title = $5,
category_id = $6,
tags = $7,
last_hook_live_update = $8,
last_hook_channel_update = $9,
last_rest_update = $10
2022-10-12 18:42:04 +00:00
WHERE
twitch_id = $1
`,
latest.TwitchID,
latest.Live,
latest.StreamID,
2022-10-12 18:42:04 +00:00
latest.StartedAt,
latest.Title,
latest.CategoryID,
latest.Tags,
latest.LastHookLiveUpdate,
latest.LastHookChannelUpdate,
latest.LastRESTUpdate,
)
if err != nil {
return oops.New(err, "failed to update twitch latest status")
}
err = tx.Commit(ctx)
if err != nil {
return oops.New(err, "failed to commit transaction")
}
return nil
}
func updateStreamHistory(ctx context.Context, dbConn db.ConnOrTx, status *models.TwitchLatestStatus) error {
if status.StreamID == "" {
twitchLog(ctx, dbConn, models.TwitchLogTypeOther, status.TwitchLogin, "updateStreamHistory", fmt.Sprintf("No StreamID - Skipping\nstatus: %#v", status))
2022-10-12 18:42:04 +00:00
return nil
}
tx, err := dbConn.Begin(ctx)
if err != nil {
return oops.New(err, "failed to begin transaction for stream history update")
}
defer tx.Rollback(ctx)
history, err := db.QueryOne[models.TwitchStreamHistory](ctx, tx,
`
SELECT $columns
FROM twitch_stream_history
WHERE stream_id = $1
`,
status.StreamID,
)
if err == db.NotFound {
2022-10-15 21:30:08 +00:00
twitchLog(ctx, dbConn, models.TwitchLogTypeOther, status.TwitchLogin, "updateStreamHistory", fmt.Sprintf("Creating new history\nstatus: %#v", status))
2022-10-12 18:42:04 +00:00
history = &models.TwitchStreamHistory{}
history.StreamID = status.StreamID
history.TwitchID = status.TwitchID
history.TwitchLogin = status.TwitchLogin
history.StartedAt = status.StartedAt
history.DiscordNeedsUpdate = true
} else if err != nil {
return oops.New(err, "failed to fetch existing stream history")
}
2022-10-19 08:57:34 +00:00
if !status.Live && !history.StreamEnded {
2022-10-15 21:30:08 +00:00
twitchLog(ctx, dbConn, models.TwitchLogTypeOther, status.TwitchLogin, "updateStreamHistory", fmt.Sprintf("Setting end time\nstatus: %#v", status))
2022-10-12 18:42:04 +00:00
history.EndedAt = time.Now()
2022-10-19 08:57:34 +00:00
history.StreamEnded = true
2022-10-12 18:42:04 +00:00
history.EndApproximated = true
history.DiscordNeedsUpdate = true
}
history.Title = status.Title
history.CategoryID = status.CategoryID
history.Tags = status.Tags
2022-10-14 03:37:50 +00:00
if history.Tags == nil {
history.Tags = make([]string, 0)
}
2022-10-12 18:42:04 +00:00
_, err = tx.Exec(ctx,
`
INSERT INTO
2022-10-19 08:57:34 +00:00
twitch_stream_history (stream_id, twitch_id, twitch_login, started_at, stream_ended, ended_at, end_approximated, title, category_id, tags, discord_needs_update)
Add Discord login (#106) This leverages our existing Discord OAuth implementation. Any users with a linked Discord account will be able to log in immediately. When logging in, we request the `email` scope in addition to `identity`, so existing users will be prompted one time to accept the new permissions. On subsequent logins, Discord will skip the prompt. When linking your Discord account to an existing HMN account, we continue to only request the `identity` scope, so we do not receive the user's Discord email. Both login and linking go through the same Discord OAuth callback. All flows through the callback try to achieve the same end goal: a logged-in HMN user with a linked Discord account. Linking works the same as it ever has. Login, however, is different because we do not have a session ID to use as the OAuth state. To account for this, I have added a `pending_login` table that stores a secure unique ID and the eventual destination URL. These pending logins expire after 10 minutes. When we receive the OAuth callback, we look up the pending login by the OAuth `state` and immediately delete it. The destination URL will be used to redirect the user to the right place. If we have a `discord_user` entry for the OAuth'd Discord user, we immediately log the user into the associated HMN account. This is the typical login case. If we do not have a `discord_user`, but there is exactly one HMN user with the same email address as the Discord user, we will link the two accounts and log into the HMN account. (It is possible for multiple HMN accounts to have the same email, because we don't have a uniqueness constraint there. We fail the login in this case rather than link to the wrong account.) Finally, if no associated HMN user exists, a new one will be created. It will use the Discord user's username, email, and avatar. This user will have no password, but they can set or reset a password through the usual flows. Co-authored-by: Ben Visness <bvisness@gmail.com> Reviewed-on: https://git.handmade.network/hmn/hmn/pulls/106
2023-05-06 19:38:50 +00:00
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
2022-10-12 18:42:04 +00:00
ON CONFLICT (stream_id) DO UPDATE SET
2022-10-19 08:57:34 +00:00
stream_ended = EXCLUDED.stream_ended,
2022-10-12 18:42:04 +00:00
ended_at = EXCLUDED.ended_at,
2022-10-15 21:30:08 +00:00
end_approximated = EXCLUDED.end_approximated,
2022-10-12 18:42:04 +00:00
title = EXCLUDED.title,
category_id = EXCLUDED.category_id,
2022-10-15 21:30:08 +00:00
tags = EXCLUDED.tags,
discord_needs_update = EXCLUDED.discord_needs_update
2022-10-12 18:42:04 +00:00
`,
history.StreamID,
history.TwitchID,
history.TwitchLogin,
history.StartedAt,
2022-10-19 08:57:34 +00:00
history.StreamEnded,
2022-10-12 18:42:04 +00:00
history.EndedAt,
2022-10-15 21:30:08 +00:00
history.EndApproximated,
2022-10-12 18:42:04 +00:00
history.Title,
history.CategoryID,
history.Tags,
2022-10-15 21:30:08 +00:00
history.DiscordNeedsUpdate,
2022-10-12 18:42:04 +00:00
)
if err != nil {
return oops.New(err, "failed to insert/update twitch history")
}
err = tx.Commit(ctx)
if err != nil {
return oops.New(err, "failed to commit transaction")
}
2022-10-19 08:57:34 +00:00
twitchLog(ctx, dbConn, models.TwitchLogTypeOther, status.TwitchLogin, "updateStreamHistory", fmt.Sprintf("Checking VOD\nhistory: %#v", history))
err = findHistoryVOD(ctx, dbConn, history)
if err != nil {
return oops.New(err, "failed to look up twitch vod")
2022-10-12 18:42:04 +00:00
}
return nil
}
func findHistoryVOD(ctx context.Context, dbConn db.ConnOrTx, history *models.TwitchStreamHistory) error {
2022-10-19 08:57:34 +00:00
if history.StreamID == "" || history.VODGone {
2022-10-15 21:30:08 +00:00
twitchLog(ctx, dbConn, models.TwitchLogTypeOther, history.TwitchLogin, "findHistoryVOD", fmt.Sprintf("Skipping VOD check\nhistory: %#v", history))
2022-10-12 18:42:04 +00:00
return nil
}
vods, err := getArchivedVideosForUser(ctx, history.TwitchID, 10)
twitchLog(ctx, dbConn, models.TwitchLogTypeOther, history.TwitchLogin, "findHistoryVOD", fmt.Sprintf("vods: %#v\nhistory: %#v", vods, history))
2022-10-12 18:42:04 +00:00
if err != nil {
return oops.New(err, "failed to fetch vods for streamer")
}
var vod *archivedVideo
for idx, v := range vods {
if v.StreamID == history.StreamID {
vod = &vods[idx]
}
}
if vod != nil {
history.VODID = vod.ID
history.VODUrl = vod.VODUrl
history.VODThumbnail = vod.VODThumbnail
history.LastVerifiedVOD = time.Now()
history.VODGone = false
2022-10-19 08:57:34 +00:00
history.DiscordNeedsUpdate = true
2022-10-12 18:42:04 +00:00
2022-10-19 08:57:34 +00:00
if history.StreamEnded && vod.Duration.Minutes() > 0 {
2022-10-12 18:42:04 +00:00
history.EndedAt = history.StartedAt.Add(vod.Duration)
history.EndApproximated = false
}
_, err = dbConn.Exec(ctx,
`
UPDATE twitch_stream_history
SET
vod_id = $2,
vod_url = $3,
vod_thumbnail = $4,
last_verified_vod = $5,
vod_gone = $6,
ended_at = $7,
end_approximated = $8,
discord_needs_update = $9
2022-10-12 18:42:04 +00:00
WHERE stream_id = $1
`,
history.StreamID,
history.VODID,
history.VODUrl,
history.VODThumbnail,
history.LastVerifiedVOD,
history.VODGone,
history.EndedAt,
history.EndApproximated,
history.DiscordNeedsUpdate,
2022-10-12 18:42:04 +00:00
)
if err != nil {
return oops.New(err, "failed to update stream history with VOD")
2022-03-22 18:07:43 +00:00
}
} else {
2022-10-12 18:42:04 +00:00
if history.StartedAt.Add(14 * 24 * time.Hour).Before(time.Now()) {
history.VODGone = true
_, err = dbConn.Exec(ctx, `
UPDATE twitch_stream_history
SET
vod_gone = $2,
2022-10-13 16:55:51 +00:00
last_verified_vod = $3
2022-10-12 18:42:04 +00:00
WHERE stream_id = $1
`,
history.StreamID,
history.VODGone,
2022-10-13 16:55:51 +00:00
time.Now(),
)
if err != nil {
return oops.New(err, "failed to update stream history")
}
} else {
_, err = dbConn.Exec(ctx, `
UPDATE twitch_stream_history
SET
last_verified_vod = $2
WHERE stream_id = $1
`,
history.StreamID,
time.Now(),
2022-10-12 18:42:04 +00:00
)
if err != nil {
return oops.New(err, "failed to update stream history")
}
}
}
return nil
}
func findMissingVODs(ctx context.Context, dbConn db.ConnOrTx) error {
histories, err := db.Query[models.TwitchStreamHistory](ctx, dbConn,
`
SELECT $columns
FROM twitch_stream_history
WHERE
vod_gone = FALSE AND
2022-10-19 08:57:34 +00:00
stream_ended = TRUE AND
(end_approximated = TRUE OR vod_id = '')
2022-10-13 16:55:51 +00:00
ORDER BY last_verified_vod ASC
LIMIT 100
2022-10-12 18:42:04 +00:00
`,
)
if err != nil {
return oops.New(err, "failed to fetch stream history for vod updates")
}
for _, history := range histories {
2022-10-13 16:55:51 +00:00
err = findHistoryVOD(ctx, dbConn, history)
if err != nil {
return err
2022-10-12 18:42:04 +00:00
}
}
return nil
}
func verifyHistoryVODs(ctx context.Context, dbConn db.ConnOrTx) error {
histories, err := db.Query[models.TwitchStreamHistory](ctx, dbConn,
`
SELECT $columns
FROM twitch_stream_history
WHERE
vod_gone = FALSE AND
vod_id != '' AND
last_verified_vod < $1
ORDER BY last_verified_vod ASC
LIMIT 100
`,
time.Now().Add(-24*time.Hour),
)
if err != nil {
return oops.New(err, "failed to fetch stream history for vod verification")
}
videoIDs := make([]string, 0, len(histories))
for _, h := range histories {
videoIDs = append(videoIDs, h.VODID)
}
if len(videoIDs) > 0 {
VODs, err := getArchivedVideos(ctx, videoIDs)
if err != nil && !errors.Is(err, twitchNotFound) {
return oops.New(err, "failed to fetch vods from twitch")
}
2022-10-12 18:42:04 +00:00
vodGone := make([]string, 0, len(histories))
vodFound := make([]string, 0, len(histories))
for _, h := range histories {
found := false
for _, vod := range VODs {
if h.VODID == vod.ID {
found = true
break
}
}
if !found {
vodGone = append(vodGone, h.VODID)
} else {
vodFound = append(vodFound, h.VODID)
2022-10-12 18:42:04 +00:00
}
}
if len(vodGone) > 0 {
_, err = dbConn.Exec(ctx,
`
2022-10-12 18:42:04 +00:00
UPDATE twitch_stream_history
SET
discord_needs_update = TRUE,
vod_id = '',
vod_url = '',
vod_thumbnail = '',
last_verified_vod = $2,
vod_gone = TRUE
WHERE
vod_id = ANY($1)
2022-03-22 18:07:43 +00:00
`,
vodGone,
time.Now(),
)
if err != nil {
return oops.New(err, "failed to update twitch history")
}
2022-03-22 18:07:43 +00:00
}
2022-10-12 18:42:04 +00:00
if len(vodFound) > 0 {
_, err = dbConn.Exec(ctx,
`
2022-10-12 18:42:04 +00:00
UPDATE twitch_stream_history
SET
2022-10-14 04:36:31 +00:00
last_verified_vod = $2
2022-10-12 18:42:04 +00:00
WHERE
vod_id = ANY($1)
`,
vodFound,
time.Now(),
)
if err != nil {
return oops.New(err, "failed to update twitch history")
}
2022-10-12 18:42:04 +00:00
}
}
return nil
2022-03-22 18:07:43 +00:00
}
var RelevantCategories = []string{
"1469308723", // Software and Game Development
}
/*
NOTE(asaf): Looks like twitch returns `tags` properly, but only one in `tag_ids`
var RelevantTags = []string{
"a59f1e4e-257b-4bd0-90c7-189c3efbf917", // Programming
"6f86127d-6051-4a38-94bb-f7b475dde109", // Software Development
}
*/
2022-03-22 18:07:43 +00:00
var RelevantTags = []string{
"Programming", // Programming
"Software Development", // Software Development
2022-03-22 18:07:43 +00:00
}
2022-10-12 18:42:04 +00:00
func isStreamRelevant(catID string, tags []string) bool {
for _, cat := range RelevantCategories {
if cat == catID {
return true
2022-03-22 18:07:43 +00:00
}
2022-10-12 18:42:04 +00:00
}
2022-03-22 18:07:43 +00:00
2022-10-12 18:42:04 +00:00
for _, tag := range RelevantTags {
for _, streamTag := range tags {
if tag == streamTag {
return true
2022-03-22 18:07:43 +00:00
}
}
}
return false
}
2022-08-28 22:18:55 +00:00
func twitchLog(ctx context.Context, conn db.ConnOrTx, logType models.TwitchLogType, login string, message string, payload string) {
_, err := conn.Exec(ctx,
`
INSERT INTO twitch_log (logged_at, twitch_login, type, message, payload)
VALUES ($1, $2, $3, $4, $5)
`,
time.Now(),
login,
logType,
message,
payload,
)
if err != nil {
log := logging.ExtractLogger(ctx).With().Str("twitch goroutine", "twitch logger").Logger()
log.Error().Err(err).Msg("Failed to log twitch event")
}
}
func twitchLogClear(ctx context.Context, conn db.ConnOrTx) {
_, err := conn.Exec(ctx,
`
DELETE FROM twitch_log
2022-10-14 04:36:31 +00:00
WHERE logged_at <= $1
2022-08-28 22:18:55 +00:00
`,
time.Now().Add(-(time.Hour * 24 * 4)),
)
if err != nil {
log := logging.ExtractLogger(ctx).With().Str("twitch goroutine", "twitch logger").Logger()
log.Error().Err(err).Msg("Failed to clear old twitch logs")
}
}