Added panic recovery to all of our background jobs

Fixes issue #32
This commit is contained in:
Asaf Gartner 2022-06-16 00:33:57 +03:00
parent 870a073e22
commit b165bf7c23
5 changed files with 115 additions and 75 deletions

View File

@ -18,6 +18,7 @@ import (
"git.handmade.network/hmn/hmn/src/logging" "git.handmade.network/hmn/hmn/src/logging"
"git.handmade.network/hmn/hmn/src/models" "git.handmade.network/hmn/hmn/src/models"
"git.handmade.network/hmn/hmn/src/oops" "git.handmade.network/hmn/hmn/src/oops"
"git.handmade.network/hmn/hmn/src/utils"
"github.com/jackc/pgx/v4/pgxpool" "github.com/jackc/pgx/v4/pgxpool"
"golang.org/x/crypto/argon2" "golang.org/x/crypto/argon2"
@ -244,22 +245,29 @@ func PeriodicallyDeleteInactiveUsers(ctx context.Context, conn *pgxpool.Pool) jo
for { for {
select { select {
case <-t.C: case <-t.C:
n, err := DeleteInactiveUsers(ctx, conn) err := func() (err error) {
if err == nil { defer utils.RecoverPanicAsError(&err)
if n > 0 { n, err := DeleteInactiveUsers(ctx, conn)
logging.Info().Int64("num deleted users", n).Msg("Deleted inactive users") if err == nil {
if n > 0 {
logging.Info().Int64("num deleted users", n).Msg("Deleted inactive users")
}
} else {
logging.Error().Err(err).Msg("Failed to delete inactive users")
} }
} else {
logging.Error().Err(err).Msg("Failed to delete inactive users")
}
n, err = DeleteExpiredPasswordResets(ctx, conn) n, err = DeleteExpiredPasswordResets(ctx, conn)
if err == nil { if err == nil {
if n > 0 { if n > 0 {
logging.Info().Int64("num deleted password resets", n).Msg("Deleted expired password resets") logging.Info().Int64("num deleted password resets", n).Msg("Deleted expired password resets")
}
} else {
logging.Error().Err(err).Msg("Failed to delete expired password resets")
} }
} else { return nil
logging.Error().Err(err).Msg("Failed to delete expired password resets") }()
if err != nil {
logging.Error().Err(err).Msg("Panicked in PeriodicallyDeleteInactiveUsers")
} }
case <-ctx.Done(): case <-ctx.Done():
return return

View File

@ -15,6 +15,7 @@ import (
"git.handmade.network/hmn/hmn/src/logging" "git.handmade.network/hmn/hmn/src/logging"
"git.handmade.network/hmn/hmn/src/models" "git.handmade.network/hmn/hmn/src/models"
"git.handmade.network/hmn/hmn/src/oops" "git.handmade.network/hmn/hmn/src/oops"
"git.handmade.network/hmn/hmn/src/utils"
"github.com/jackc/pgx/v4/pgxpool" "github.com/jackc/pgx/v4/pgxpool"
) )
@ -142,13 +143,20 @@ func PeriodicallyDeleteExpiredSessions(ctx context.Context, conn *pgxpool.Pool)
for { for {
select { select {
case <-t.C: case <-t.C:
n, err := DeleteExpiredSessions(ctx, conn) err := func() (err error) {
if err == nil { defer utils.RecoverPanicAsError(&err)
if n > 0 { n, err := DeleteExpiredSessions(ctx, conn)
logging.Info().Int64("num deleted sessions", n).Msg("Deleted expired sessions") if err == nil {
if n > 0 {
logging.Info().Int64("num deleted sessions", n).Msg("Deleted expired sessions")
}
} else {
logging.Error().Err(err).Msg("Failed to delete expired sessions")
} }
} else { return nil
logging.Error().Err(err).Msg("Failed to delete expired sessions") }()
if err != nil {
logging.Error().Err(err).Msg("Panicked in PeriodicallyDeleteExpiredSessions")
} }
case <-ctx.Done(): case <-ctx.Done():
return return

View File

@ -51,7 +51,8 @@ func RunDiscordBot(ctx context.Context, dbConn *pgxpool.Pool) jobs.Job {
default: default:
} }
func() { err := func() (retErr error) {
defer utils.RecoverPanicAsError(&retErr)
log.Info().Msg("Connecting to the Discord gateway") log.Info().Msg("Connecting to the Discord gateway")
bot := newBotInstance(dbConn) bot := newBotInstance(dbConn)
err := bot.Run(ctx) err := bot.Run(ctx)
@ -84,7 +85,11 @@ func RunDiscordBot(ctx context.Context, dbConn *pgxpool.Pool) jobs.Job {
time.Sleep(delay) time.Sleep(delay)
boff.Reset() boff.Reset()
return nil
}() }()
if err != nil {
log.Error().Err(err).Msg("Panicked in RunDiscordBot")
}
} }
}() }()
return job return job

View File

@ -10,6 +10,7 @@ import (
"git.handmade.network/hmn/hmn/src/jobs" "git.handmade.network/hmn/hmn/src/jobs"
"git.handmade.network/hmn/hmn/src/logging" "git.handmade.network/hmn/hmn/src/logging"
"git.handmade.network/hmn/hmn/src/models" "git.handmade.network/hmn/hmn/src/models"
"git.handmade.network/hmn/hmn/src/utils"
"github.com/jackc/pgx/v4/pgxpool" "github.com/jackc/pgx/v4/pgxpool"
) )
@ -52,16 +53,25 @@ func RunHistoryWatcher(ctx context.Context, dbConn *pgxpool.Pool) jobs.Job {
} }
for { for {
select { done, err := func() (done bool, err error) {
case <-ctx.Done(): defer utils.RecoverPanicAsError(&err)
select {
case <-ctx.Done():
return true, nil
case <-newUserTicker.C:
// Get content for messages when a user links their account (but do not create snippets)
fetchMissingContent(ctx, dbConn)
case <-backfillFirstRun:
runBackfill()
case <-backfillTicker.C:
runBackfill()
}
return false, nil
}()
if err != nil {
log.Error().Err(err).Msg("Panicked in RunHistoryWatcher")
} else if done {
return return
case <-newUserTicker.C:
// Get content for messages when a user links their account (but do not create snippets)
fetchMissingContent(ctx, dbConn)
case <-backfillFirstRun:
runBackfill()
case <-backfillTicker.C:
runBackfill()
} }
} }
}() }()

View File

@ -14,6 +14,7 @@ import (
"git.handmade.network/hmn/hmn/src/models" "git.handmade.network/hmn/hmn/src/models"
"git.handmade.network/hmn/hmn/src/oops" "git.handmade.network/hmn/hmn/src/oops"
"git.handmade.network/hmn/hmn/src/perf" "git.handmade.network/hmn/hmn/src/perf"
"git.handmade.network/hmn/hmn/src/utils"
"github.com/jackc/pgx/v4/pgxpool" "github.com/jackc/pgx/v4/pgxpool"
) )
@ -45,12 +46,6 @@ func MonitorTwitchSubscriptions(ctx context.Context, dbConn *pgxpool.Pool) jobs.
}() }()
log.Info().Msg("Running twitch monitor...") log.Info().Msg("Running twitch monitor...")
err := refreshAccessToken(ctx)
if err != nil {
log.Error().Err(err).Msg("Failed to fetch refresh token on start")
return
}
monitorTicker := time.NewTicker(2 * time.Hour) monitorTicker := time.NewTicker(2 * time.Hour)
firstRunChannel := make(chan struct{}, 1) firstRunChannel := make(chan struct{}, 1)
firstRunChannel <- struct{}{} firstRunChannel <- struct{}{}
@ -58,53 +53,67 @@ func MonitorTwitchSubscriptions(ctx context.Context, dbConn *pgxpool.Pool) jobs.
timers := make([]*time.Timer, 0) timers := make([]*time.Timer, 0)
expiredTimers := make(chan *time.Timer, 10) expiredTimers := make(chan *time.Timer, 10)
for { for {
select { done, err := func() (done bool, retErr error) {
case <-ctx.Done(): defer utils.RecoverPanicAsError(&retErr)
for _, timer := range timers { select {
timer.Stop() case <-ctx.Done():
} for _, timer := range timers {
return timer.Stop()
case expired := <-expiredTimers:
for idx, timer := range timers {
if timer == expired {
timers = append(timers[:idx], timers[idx+1:]...)
break
} }
} return true, nil
case <-firstRunChannel: case expired := <-expiredTimers:
syncWithTwitch(ctx, dbConn, true) for idx, timer := range timers {
case <-monitorTicker.C: if timer == expired {
syncWithTwitch(ctx, dbConn, true) timers = append(timers[:idx], timers[idx+1:]...)
case <-linksChangedChannel: break
// NOTE(asaf): Since we update links inside transactions for users/projects }
// we won't see the updated list of links until the transaction is committed. }
// Waiting 5 seconds is just a quick workaround for that. It's not case <-firstRunChannel:
// convenient to only trigger this after the transaction is committed. err := refreshAccessToken(ctx)
var timer *time.Timer if err != nil {
t := time.AfterFunc(5*time.Second, func() { log.Error().Err(err).Msg("Failed to fetch refresh token on start")
expiredTimers <- timer return true, nil
syncWithTwitch(ctx, dbConn, false) }
}) syncWithTwitch(ctx, dbConn, true)
timer = t case <-monitorTicker.C:
timers = append(timers, t) syncWithTwitch(ctx, dbConn, true)
case notification := <-twitchNotificationChannel: case <-linksChangedChannel:
if notification.Type == notificationTypeRevocation { // NOTE(asaf): Since we update links inside transactions for users/projects
syncWithTwitch(ctx, dbConn, false) // we won't see the updated list of links until the transaction is committed.
} else { // Waiting 5 seconds is just a quick workaround for that. It's not
// NOTE(asaf): The twitch API (getStreamStatus) lags behind the notification and // convenient to only trigger this after the transaction is committed.
// would return old data if we called it immediately, so we process
// the notification to the extent we can, and later do a full update. We can get the
// category from the notification, but not the tags (or the up-to-date title),
// so we can't really skip this.
var timer *time.Timer var timer *time.Timer
t := time.AfterFunc(3*time.Minute, func() { t := time.AfterFunc(5*time.Second, func() {
expiredTimers <- timer expiredTimers <- timer
updateStreamStatus(ctx, dbConn, notification.Status.TwitchID, notification.Status.TwitchLogin) syncWithTwitch(ctx, dbConn, false)
}) })
timer = t timer = t
timers = append(timers, t) timers = append(timers, t)
processEventSubNotification(ctx, dbConn, &notification) case notification := <-twitchNotificationChannel:
if notification.Type == notificationTypeRevocation {
syncWithTwitch(ctx, dbConn, false)
} else {
// NOTE(asaf): The twitch API (getStreamStatus) lags behind the notification and
// would return old data if we called it immediately, so we process
// the notification to the extent we can, and later do a full update. We can get the
// category from the notification, but not the tags (or the up-to-date title),
// so we can't really skip this.
var timer *time.Timer
t := time.AfterFunc(3*time.Minute, func() {
expiredTimers <- timer
updateStreamStatus(ctx, dbConn, notification.Status.TwitchID, notification.Status.TwitchLogin)
})
timer = t
timers = append(timers, t)
processEventSubNotification(ctx, dbConn, &notification)
}
} }
return false, nil
}()
if err != nil {
log.Error().Err(err).Msg("Panicked in MonitorTwitchSubscriptions")
} else if done {
return
} }
} }
}() }()