From b165bf7c23196f9fc424c9c92d40a2a3709f7eb9 Mon Sep 17 00:00:00 2001 From: Asaf Gartner Date: Thu, 16 Jun 2022 00:33:57 +0300 Subject: [PATCH] Added panic recovery to all of our background jobs Fixes issue #32 --- src/auth/auth.go | 34 ++++++++------ src/auth/session.go | 20 +++++--- src/discord/gateway.go | 7 ++- src/discord/history.go | 28 ++++++++---- src/twitch/twitch.go | 101 ++++++++++++++++++++++------------------- 5 files changed, 115 insertions(+), 75 deletions(-) diff --git a/src/auth/auth.go b/src/auth/auth.go index 5000041e..65689666 100644 --- a/src/auth/auth.go +++ b/src/auth/auth.go @@ -18,6 +18,7 @@ import ( "git.handmade.network/hmn/hmn/src/logging" "git.handmade.network/hmn/hmn/src/models" "git.handmade.network/hmn/hmn/src/oops" + "git.handmade.network/hmn/hmn/src/utils" "github.com/jackc/pgx/v4/pgxpool" "golang.org/x/crypto/argon2" @@ -244,22 +245,29 @@ func PeriodicallyDeleteInactiveUsers(ctx context.Context, conn *pgxpool.Pool) jo for { select { case <-t.C: - n, err := DeleteInactiveUsers(ctx, conn) - if err == nil { - if n > 0 { - logging.Info().Int64("num deleted users", n).Msg("Deleted inactive users") + err := func() (err error) { + defer utils.RecoverPanicAsError(&err) + n, err := DeleteInactiveUsers(ctx, conn) + if err == nil { + if n > 0 { + logging.Info().Int64("num deleted users", n).Msg("Deleted inactive users") + } + } else { + logging.Error().Err(err).Msg("Failed to delete inactive users") } - } else { - logging.Error().Err(err).Msg("Failed to delete inactive users") - } - n, err = DeleteExpiredPasswordResets(ctx, conn) - if err == nil { - if n > 0 { - logging.Info().Int64("num deleted password resets", n).Msg("Deleted expired password resets") + n, err = DeleteExpiredPasswordResets(ctx, conn) + if err == nil { + if n > 0 { + logging.Info().Int64("num deleted password resets", n).Msg("Deleted expired password resets") + } + } else { + logging.Error().Err(err).Msg("Failed to delete expired password resets") } - } else { - logging.Error().Err(err).Msg("Failed to delete expired password resets") + return nil + }() + if err != nil { + logging.Error().Err(err).Msg("Panicked in PeriodicallyDeleteInactiveUsers") } case <-ctx.Done(): return diff --git a/src/auth/session.go b/src/auth/session.go index a0d7d461..7bb45c88 100644 --- a/src/auth/session.go +++ b/src/auth/session.go @@ -15,6 +15,7 @@ import ( "git.handmade.network/hmn/hmn/src/logging" "git.handmade.network/hmn/hmn/src/models" "git.handmade.network/hmn/hmn/src/oops" + "git.handmade.network/hmn/hmn/src/utils" "github.com/jackc/pgx/v4/pgxpool" ) @@ -142,13 +143,20 @@ func PeriodicallyDeleteExpiredSessions(ctx context.Context, conn *pgxpool.Pool) for { select { case <-t.C: - n, err := DeleteExpiredSessions(ctx, conn) - if err == nil { - if n > 0 { - logging.Info().Int64("num deleted sessions", n).Msg("Deleted expired sessions") + err := func() (err error) { + defer utils.RecoverPanicAsError(&err) + n, err := DeleteExpiredSessions(ctx, conn) + if err == nil { + if n > 0 { + logging.Info().Int64("num deleted sessions", n).Msg("Deleted expired sessions") + } + } else { + logging.Error().Err(err).Msg("Failed to delete expired sessions") } - } else { - logging.Error().Err(err).Msg("Failed to delete expired sessions") + return nil + }() + if err != nil { + logging.Error().Err(err).Msg("Panicked in PeriodicallyDeleteExpiredSessions") } case <-ctx.Done(): return diff --git a/src/discord/gateway.go b/src/discord/gateway.go index b22beb98..acd2237a 100644 --- a/src/discord/gateway.go +++ b/src/discord/gateway.go @@ -51,7 +51,8 @@ func RunDiscordBot(ctx context.Context, dbConn *pgxpool.Pool) jobs.Job { default: } - func() { + err := func() (retErr error) { + defer utils.RecoverPanicAsError(&retErr) log.Info().Msg("Connecting to the Discord gateway") bot := newBotInstance(dbConn) err := bot.Run(ctx) @@ -84,7 +85,11 @@ func RunDiscordBot(ctx context.Context, dbConn *pgxpool.Pool) jobs.Job { time.Sleep(delay) boff.Reset() + return nil }() + if err != nil { + log.Error().Err(err).Msg("Panicked in RunDiscordBot") + } } }() return job diff --git a/src/discord/history.go b/src/discord/history.go index 8a8b32c0..e7295b67 100644 --- a/src/discord/history.go +++ b/src/discord/history.go @@ -10,6 +10,7 @@ import ( "git.handmade.network/hmn/hmn/src/jobs" "git.handmade.network/hmn/hmn/src/logging" "git.handmade.network/hmn/hmn/src/models" + "git.handmade.network/hmn/hmn/src/utils" "github.com/jackc/pgx/v4/pgxpool" ) @@ -52,16 +53,25 @@ func RunHistoryWatcher(ctx context.Context, dbConn *pgxpool.Pool) jobs.Job { } for { - select { - case <-ctx.Done(): + done, err := func() (done bool, err error) { + defer utils.RecoverPanicAsError(&err) + select { + case <-ctx.Done(): + return true, nil + case <-newUserTicker.C: + // Get content for messages when a user links their account (but do not create snippets) + fetchMissingContent(ctx, dbConn) + case <-backfillFirstRun: + runBackfill() + case <-backfillTicker.C: + runBackfill() + } + return false, nil + }() + if err != nil { + log.Error().Err(err).Msg("Panicked in RunHistoryWatcher") + } else if done { return - case <-newUserTicker.C: - // Get content for messages when a user links their account (but do not create snippets) - fetchMissingContent(ctx, dbConn) - case <-backfillFirstRun: - runBackfill() - case <-backfillTicker.C: - runBackfill() } } }() diff --git a/src/twitch/twitch.go b/src/twitch/twitch.go index ea0338a6..2a9df41f 100644 --- a/src/twitch/twitch.go +++ b/src/twitch/twitch.go @@ -14,6 +14,7 @@ import ( "git.handmade.network/hmn/hmn/src/models" "git.handmade.network/hmn/hmn/src/oops" "git.handmade.network/hmn/hmn/src/perf" + "git.handmade.network/hmn/hmn/src/utils" "github.com/jackc/pgx/v4/pgxpool" ) @@ -45,12 +46,6 @@ func MonitorTwitchSubscriptions(ctx context.Context, dbConn *pgxpool.Pool) jobs. }() log.Info().Msg("Running twitch monitor...") - err := refreshAccessToken(ctx) - if err != nil { - log.Error().Err(err).Msg("Failed to fetch refresh token on start") - return - } - monitorTicker := time.NewTicker(2 * time.Hour) firstRunChannel := make(chan struct{}, 1) firstRunChannel <- struct{}{} @@ -58,53 +53,67 @@ func MonitorTwitchSubscriptions(ctx context.Context, dbConn *pgxpool.Pool) jobs. timers := make([]*time.Timer, 0) expiredTimers := make(chan *time.Timer, 10) for { - select { - case <-ctx.Done(): - for _, timer := range timers { - timer.Stop() - } - return - case expired := <-expiredTimers: - for idx, timer := range timers { - if timer == expired { - timers = append(timers[:idx], timers[idx+1:]...) - break + done, err := func() (done bool, retErr error) { + defer utils.RecoverPanicAsError(&retErr) + select { + case <-ctx.Done(): + for _, timer := range timers { + timer.Stop() } - } - case <-firstRunChannel: - syncWithTwitch(ctx, dbConn, true) - case <-monitorTicker.C: - syncWithTwitch(ctx, dbConn, true) - case <-linksChangedChannel: - // NOTE(asaf): Since we update links inside transactions for users/projects - // we won't see the updated list of links until the transaction is committed. - // Waiting 5 seconds is just a quick workaround for that. It's not - // convenient to only trigger this after the transaction is committed. - var timer *time.Timer - t := time.AfterFunc(5*time.Second, func() { - expiredTimers <- timer - syncWithTwitch(ctx, dbConn, false) - }) - timer = t - timers = append(timers, t) - case notification := <-twitchNotificationChannel: - if notification.Type == notificationTypeRevocation { - syncWithTwitch(ctx, dbConn, false) - } else { - // NOTE(asaf): The twitch API (getStreamStatus) lags behind the notification and - // would return old data if we called it immediately, so we process - // the notification to the extent we can, and later do a full update. We can get the - // category from the notification, but not the tags (or the up-to-date title), - // so we can't really skip this. + return true, nil + case expired := <-expiredTimers: + for idx, timer := range timers { + if timer == expired { + timers = append(timers[:idx], timers[idx+1:]...) + break + } + } + case <-firstRunChannel: + err := refreshAccessToken(ctx) + if err != nil { + log.Error().Err(err).Msg("Failed to fetch refresh token on start") + return true, nil + } + syncWithTwitch(ctx, dbConn, true) + case <-monitorTicker.C: + syncWithTwitch(ctx, dbConn, true) + case <-linksChangedChannel: + // NOTE(asaf): Since we update links inside transactions for users/projects + // we won't see the updated list of links until the transaction is committed. + // Waiting 5 seconds is just a quick workaround for that. It's not + // convenient to only trigger this after the transaction is committed. var timer *time.Timer - t := time.AfterFunc(3*time.Minute, func() { + t := time.AfterFunc(5*time.Second, func() { expiredTimers <- timer - updateStreamStatus(ctx, dbConn, notification.Status.TwitchID, notification.Status.TwitchLogin) + syncWithTwitch(ctx, dbConn, false) }) timer = t timers = append(timers, t) - processEventSubNotification(ctx, dbConn, ¬ification) + case notification := <-twitchNotificationChannel: + if notification.Type == notificationTypeRevocation { + syncWithTwitch(ctx, dbConn, false) + } else { + // NOTE(asaf): The twitch API (getStreamStatus) lags behind the notification and + // would return old data if we called it immediately, so we process + // the notification to the extent we can, and later do a full update. We can get the + // category from the notification, but not the tags (or the up-to-date title), + // so we can't really skip this. + var timer *time.Timer + t := time.AfterFunc(3*time.Minute, func() { + expiredTimers <- timer + updateStreamStatus(ctx, dbConn, notification.Status.TwitchID, notification.Status.TwitchLogin) + }) + timer = t + timers = append(timers, t) + processEventSubNotification(ctx, dbConn, ¬ification) + } } + return false, nil + }() + if err != nil { + log.Error().Err(err).Msg("Panicked in MonitorTwitchSubscriptions") + } else if done { + return } } }()