Compare commits

...

5 Commits

Author SHA1 Message Date
Asaf Gartner fe545ff0f3 Debugging twitch 2022-05-30 21:19:54 +03:00
Asaf Gartner 8c47590b99 Debugging twitch 2022-05-30 19:08:09 +03:00
Asaf Gartner 4c296c9ddd Soft failure for failing to delete discord message. 2022-05-30 18:55:10 +03:00
Asaf Gartner 9fcc2321ca Hopefully fixed desync with twitch 2022-05-30 18:49:30 +03:00
Asaf Gartner ca614a781b Updated Go version in serversetup.sh 2022-05-30 18:48:45 +03:00
5 changed files with 118 additions and 47 deletions

View File

@ -70,8 +70,8 @@ fi
# Install Go
if [ $checkpoint -lt 40 ]; then
wget https://golang.org/dl/go1.17.linux-amd64.tar.gz
tar -C /usr/local -xzf go1.17.linux-amd64.tar.gz
wget https://go.dev/dl/go1.18.2.linux-amd64.tar.gz
tar -C /usr/local -xzf go1.18.2.linux-amd64.tar.gz
export PATH=$PATH:/usr/local/go/bin:/root/go/bin
echo 'export PATH=$PATH:/usr/local/go/bin:/root/go/bin' >> ~/.bashrc

View File

@ -9,6 +9,7 @@ import (
"git.handmade.network/hmn/hmn/src/config"
"git.handmade.network/hmn/hmn/src/db"
"git.handmade.network/hmn/hmn/src/hmndata"
"git.handmade.network/hmn/hmn/src/logging"
"git.handmade.network/hmn/hmn/src/oops"
)
@ -114,7 +115,8 @@ func UpdateStreamers(ctx context.Context, dbConn db.ConnOrTx, streamers []hmndat
if livestreamMessage != nil {
err = DeleteMessage(ctx, config.Config.Discord.StreamsChannelID, livestreamMessage.MessageID)
if err != nil {
return oops.New(err, "failed to delete existing discord message from streams channel")
log := logging.ExtractLogger(ctx)
log.Error().Err(err).Msg("failed to delete existing discord message from streams channel")
}
}

View File

@ -134,6 +134,8 @@ func getStreamStatus(ctx context.Context, twitchIDs []string) ([]streamStatus, e
if err != nil {
return nil, oops.New(err, "failed to read response body while processing stream statuses")
}
log := logging.ExtractLogger(ctx)
log.Debug().Str("getStreamStatus response", string(body)).Msg("Got getStreamStatus response")
var streamResponse twitchResponse
err = json.Unmarshal(body, &streamResponse)

View File

@ -18,8 +18,8 @@ import (
)
type twitchNotification struct {
TwitchID string
Type twitchNotificationType
Status streamStatus
Type twitchNotificationType
}
var twitchNotificationChannel chan twitchNotification
@ -91,22 +91,19 @@ func MonitorTwitchSubscriptions(ctx context.Context, dbConn *pgxpool.Pool) jobs.
if notification.Type == notificationTypeRevocation {
syncWithTwitch(ctx, dbConn, false)
} else {
if notification.Type == notificationTypeChannelUpdate {
// NOTE(asaf): The twitch API (getStreamStatus) lags behind the notification and
// would return old data if we called it immediately, so we have to
// wait a bit before we process the notification. We can get the
// category from the notification, but not the tags (or the up-to-date title),
// so we can't really skip this.
var timer *time.Timer
t := time.AfterFunc(3*time.Minute, func() {
expiredTimers <- timer
processEventSubNotification(ctx, dbConn, &notification)
})
timer = t
timers = append(timers, t)
} else {
processEventSubNotification(ctx, dbConn, &notification)
}
// NOTE(asaf): The twitch API (getStreamStatus) lags behind the notification and
// would return old data if we called it immediately, so we process
// the notification to the extent we can, and later do a full update. We can get the
// category from the notification, but not the tags (or the up-to-date title),
// so we can't really skip this.
var timer *time.Timer
t := time.AfterFunc(3*time.Minute, func() {
expiredTimers <- timer
updateStreamStatus(ctx, dbConn, notification.Status.TwitchID, notification.Status.TwitchLogin)
})
timer = t
timers = append(timers, t)
processEventSubNotification(ctx, dbConn, &notification)
}
}
}
@ -136,6 +133,8 @@ func QueueTwitchNotification(messageType string, body []byte) error {
Event struct {
BroadcasterUserID string `json:"broadcaster_user_id"`
BroadcasterUserLogin string `json:"broadcaster_user_login"`
Title string `json:"title"`
CategoryID string `json:"category_id"`
} `json:"event"`
}
var incoming notificationJson
@ -144,14 +143,20 @@ func QueueTwitchNotification(messageType string, body []byte) error {
return oops.New(err, "failed to parse notification body")
}
notification.TwitchID = incoming.Event.BroadcasterUserID
notification.Status.TwitchID = incoming.Event.BroadcasterUserID
notification.Status.TwitchLogin = incoming.Event.BroadcasterUserLogin
notification.Status.Title = incoming.Event.Title
notification.Status.Category = incoming.Event.CategoryID
notification.Status.StartedAt = time.Now()
switch incoming.Subscription.Type {
case "stream.online":
notification.Type = notificationTypeOnline
notification.Status.Live = true
case "stream.offline":
notification.Type = notificationTypeOffline
case "channel.update":
notification.Type = notificationTypeChannelUpdate
// NOTE(asaf): Can't tell if the user is live here.
default:
return oops.New(nil, "unknown subscription type received")
}
@ -410,6 +415,55 @@ func notifyDiscordOfLiveStream(ctx context.Context, dbConn db.ConnOrTx) error {
return nil
}
func updateStreamStatus(ctx context.Context, dbConn db.ConnOrTx, twitchID string, twitchLogin string) {
log := logging.ExtractLogger(ctx)
log.Debug().Str("TwitchID", twitchID).Msg("Updating stream status")
var err error
// NOTE(asaf): Verifying that the streamer we're processing hasn't been removed from our db in the meantime.
foundStreamer := false
allStreamers, err := hmndata.FetchTwitchStreamers(ctx, dbConn)
if err != nil {
log.Error().Err(err).Msg("failed to fetch hmn streamers")
return
}
for _, streamer := range allStreamers {
if streamer.TwitchLogin == twitchLogin {
foundStreamer = true
break
}
}
if !foundStreamer {
return
}
status := streamStatus{
TwitchID: twitchID,
Live: false,
}
result, err := getStreamStatus(ctx, []string{twitchID})
if err != nil {
log.Error().Str("TwitchID", twitchID).Err(err).Msg("failed to fetch stream status")
return
}
if len(result) > 0 {
log.Debug().Interface("Got status", result[0]).Msg("Got streamer status from twitch")
status = result[0]
}
err = updateStreamStatusInDB(ctx, dbConn, &status)
if err != nil {
log.Error().Err(err).Msg("failed to update twitch stream status")
return
}
log.Debug().Msg("Notifying discord")
err = notifyDiscordOfLiveStream(ctx, dbConn)
if err != nil {
log.Error().Err(err).Msg("failed to notify discord")
}
}
func processEventSubNotification(ctx context.Context, dbConn db.ConnOrTx, notification *twitchNotification) {
log := logging.ExtractLogger(ctx)
log.Debug().Interface("Notification", notification).Msg("Processing twitch notification")
@ -417,34 +471,47 @@ func processEventSubNotification(ctx context.Context, dbConn db.ConnOrTx, notifi
return
}
status := streamStatus{
TwitchID: notification.TwitchID,
Live: false,
// NOTE(asaf): Verifying that the streamer we're processing hasn't been removed from our db in the meantime.
foundStreamer := false
allStreamers, err := hmndata.FetchTwitchStreamers(ctx, dbConn)
if err != nil {
log.Error().Err(err).Msg("failed to fetch hmn streamers")
return
}
var err error
if notification.Type == notificationTypeChannelUpdate || notification.Type == notificationTypeOnline {
result, err := getStreamStatus(ctx, []string{notification.TwitchID})
if err != nil || len(result) == 0 {
log.Error().Str("TwitchID", notification.TwitchID).Err(err).Msg("failed to fetch stream status")
return
}
allStreamers, err := hmndata.FetchTwitchStreamers(ctx, dbConn)
if err != nil {
log.Error().Err(err).Msg("failed to fetch hmn streamers")
return
}
for _, streamer := range allStreamers {
if streamer.TwitchLogin == result[0].TwitchLogin {
status = result[0]
break
}
for _, streamer := range allStreamers {
if streamer.TwitchLogin == notification.Status.TwitchLogin {
foundStreamer = true
break
}
}
if !foundStreamer {
return
}
log.Debug().Interface("Status", status).Msg("Updating status")
err = updateStreamStatusInDB(ctx, dbConn, &status)
if err != nil {
log.Error().Err(err).Msg("failed to update twitch stream status")
if notification.Type == notificationTypeOnline || notification.Type == notificationTypeOffline {
log.Debug().Interface("Status", notification.Status).Msg("Updating status")
err = updateStreamStatusInDB(ctx, dbConn, &notification.Status)
if err != nil {
log.Error().Err(err).Msg("failed to update twitch stream status")
return
}
} else if notification.Type == notificationTypeChannelUpdate {
// NOTE(asaf): Channel updates can happen wether or not the streamer is live.
// So we just update the title if the user is live in our db, and we rely on the
// 3-minute delayed status update to verify live status and category/tag requirements.
_, err = dbConn.Exec(ctx,
`
UPDATE twitch_stream
SET title = $1
WHERE twitch_id = $2
`,
notification.Status.Title,
notification.Status.TwitchID,
)
if err != nil {
log.Error().Err(err).Msg("failed to update twitch stream status")
return
}
}
log.Debug().Msg("Notifying discord")

View File

@ -74,7 +74,7 @@ func TwitchDebugPage(c *RequestContext) ResponseData {
`
SELECT $columns
FROM
twitch_streams
twitch_stream
ORDER BY started_at DESC
`,
)