From 42e1ed95fbf051e1475278803ae04e043660b097 Mon Sep 17 00:00:00 2001 From: Asaf Gartner Date: Mon, 29 Aug 2022 01:18:55 +0300 Subject: [PATCH] Added logs for twitch --- .../2022-08-28T203935Z_AddTwitchLog.go | 52 ++++++ src/models/twitch.go | 17 ++ src/templates/src/twitch_debug.html | 163 ++++++++++++++++++ src/twitch/twitch.go | 45 ++++- src/website/twitch.go | 86 ++++++++- 5 files changed, 353 insertions(+), 10 deletions(-) create mode 100644 src/migration/migrations/2022-08-28T203935Z_AddTwitchLog.go create mode 100644 src/templates/src/twitch_debug.html diff --git a/src/migration/migrations/2022-08-28T203935Z_AddTwitchLog.go b/src/migration/migrations/2022-08-28T203935Z_AddTwitchLog.go new file mode 100644 index 0000000..7d2f8c8 --- /dev/null +++ b/src/migration/migrations/2022-08-28T203935Z_AddTwitchLog.go @@ -0,0 +1,52 @@ +package migrations + +import ( + "context" + "time" + + "git.handmade.network/hmn/hmn/src/migration/types" + "github.com/jackc/pgx/v4" +) + +func init() { + registerMigration(AddTwitchLog{}) +} + +type AddTwitchLog struct{} + +func (m AddTwitchLog) Version() types.MigrationVersion { + return types.MigrationVersion(time.Date(2022, 8, 28, 20, 39, 35, 0, time.UTC)) +} + +func (m AddTwitchLog) Name() string { + return "AddTwitchLog" +} + +func (m AddTwitchLog) Description() string { + return "Add twitch logging" +} + +func (m AddTwitchLog) Up(ctx context.Context, tx pgx.Tx) error { + _, err := tx.Exec(ctx, + ` + CREATE TABLE twitch_log ( + id SERIAL NOT NULL PRIMARY KEY, + logged_at TIMESTAMP WITH TIME ZONE NOT NULL, + twitch_login VARCHAR(256) NOT NULL DEFAULT '', + type INT NOT NULL DEFAULT 0, + message TEXT NOT NULL DEFAULT '', + payload TEXT NOT NULL DEFAULT '' + ); + `, + ) + return err +} + +func (m AddTwitchLog) Down(ctx context.Context, tx pgx.Tx) error { + _, err := tx.Exec(ctx, + ` + DROP TABLE twitch_log; + `, + ) + return err +} diff --git a/src/models/twitch.go b/src/models/twitch.go index 9b6caac..7b0bed2 100644 --- a/src/models/twitch.go +++ b/src/models/twitch.go @@ -13,3 +13,20 @@ type TwitchStream struct { Title string `db:"title"` StartedAt time.Time `db:"started_at"` } + +type TwitchLogType int + +const ( + TwitchLogTypeOther TwitchLogType = iota + 1 + TwitchLogTypeHook + TwitchLogTypeREST +) + +type TwitchLog struct { + ID int `db:"id"` + LoggedAt time.Time `db:"logged_at"` + Login string `db:"twitch_login"` + Type TwitchLogType `db:"type"` + Message string `db:"message"` + Payload string `db:"payload"` +} diff --git a/src/templates/src/twitch_debug.html b/src/templates/src/twitch_debug.html new file mode 100644 index 0000000..2300bcc --- /dev/null +++ b/src/templates/src/twitch_debug.html @@ -0,0 +1,163 @@ +{{ template "base.html" . }} + +{{ define "extrahead" }} + +{{ end }} + +{{ define "content" }} +
+
+ All + No login +
+ + + + + + + + + +
TimeTypeLoginMessage
+
+
+
+ + +{{ end }} diff --git a/src/twitch/twitch.go b/src/twitch/twitch.go index 2a9df41..2f34771 100644 --- a/src/twitch/twitch.go +++ b/src/twitch/twitch.go @@ -3,6 +3,7 @@ package twitch import ( "context" "encoding/json" + "fmt" "time" "git.handmade.network/hmn/hmn/src/config" @@ -76,6 +77,7 @@ func MonitorTwitchSubscriptions(ctx context.Context, dbConn *pgxpool.Pool) jobs. } syncWithTwitch(ctx, dbConn, true) case <-monitorTicker.C: + twitchLogClear(ctx, dbConn) syncWithTwitch(ctx, dbConn, true) case <-linksChangedChannel: // NOTE(asaf): Since we update links inside transactions for users/projects @@ -132,7 +134,7 @@ const ( notificationTypeRevocation = 4 ) -func QueueTwitchNotification(messageType string, body []byte) error { +func QueueTwitchNotification(ctx context.Context, conn db.ConnOrTx, messageType string, body []byte) error { var notification twitchNotification if messageType == "notification" { type notificationJson struct { @@ -152,6 +154,8 @@ func QueueTwitchNotification(messageType string, body []byte) error { return oops.New(err, "failed to parse notification body") } + twitchLog(ctx, conn, models.TwitchLogTypeHook, incoming.Event.BroadcasterUserLogin, "Got hook: "+incoming.Subscription.Type, string(body)) + notification.Status.TwitchID = incoming.Event.BroadcasterUserID notification.Status.TwitchLogin = incoming.Event.BroadcasterUserLogin notification.Status.Title = incoming.Event.Title @@ -170,6 +174,7 @@ func QueueTwitchNotification(messageType string, body []byte) error { return oops.New(nil, "unknown subscription type received") } } else if messageType == "revocation" { + twitchLog(ctx, conn, models.TwitchLogTypeHook, "", "Got hook: Revocation", string(body)) notification.Type = notificationTypeRevocation } @@ -371,10 +376,12 @@ func syncWithTwitch(ctx context.Context, dbConn *pgxpool.Pool, updateAll bool) { log.Error().Err(err).Msg("failed to fetch stream statuses") return } + twitchLog(ctx, tx, models.TwitchLogTypeOther, "", "Batch resync", fmt.Sprintf("%#v", statuses)) p.EndBlock() p.StartBlock("SQL", "Update stream statuses in db") for _, status := range statuses { log.Debug().Interface("Status", status).Msg("Got streamer") + twitchLog(ctx, tx, models.TwitchLogTypeREST, status.TwitchLogin, "Resync", fmt.Sprintf("%#v", status)) err = updateStreamStatusInDB(ctx, tx, &status) if err != nil { log.Error().Err(err).Msg("failed to update twitch stream status") @@ -456,6 +463,7 @@ func updateStreamStatus(ctx context.Context, dbConn db.ConnOrTx, twitchID string log.Error().Str("TwitchID", twitchID).Err(err).Msg("failed to fetch stream status") return } + twitchLog(ctx, dbConn, models.TwitchLogTypeREST, twitchLogin, "Fetched status", fmt.Sprintf("%#v", result)) if len(result) > 0 { log.Debug().Interface("Got status", result[0]).Msg("Got streamer status from twitch") status = result[0] @@ -497,6 +505,7 @@ func processEventSubNotification(ctx context.Context, dbConn db.ConnOrTx, notifi return } + twitchLog(ctx, dbConn, models.TwitchLogTypeHook, notification.Status.TwitchLogin, "Processing hook", fmt.Sprintf("%#v", notification)) if notification.Type == notificationTypeOnline || notification.Type == notificationTypeOffline { log.Debug().Interface("Status", notification.Status).Msg("Updating status") err = updateStreamStatusInDB(ctx, dbConn, ¬ification.Status) @@ -533,6 +542,7 @@ func processEventSubNotification(ctx context.Context, dbConn db.ConnOrTx, notifi func updateStreamStatusInDB(ctx context.Context, conn db.ConnOrTx, status *streamStatus) error { log := logging.ExtractLogger(ctx) if isStatusRelevant(status) { + twitchLog(ctx, conn, models.TwitchLogTypeOther, status.TwitchLogin, "Marking online", fmt.Sprintf("%#v", status)) log.Debug().Msg("Status relevant") _, err := conn.Exec(ctx, ` @@ -552,6 +562,7 @@ func updateStreamStatusInDB(ctx context.Context, conn db.ConnOrTx, status *strea } } else { log.Debug().Msg("Stream not relevant") + twitchLog(ctx, conn, models.TwitchLogTypeOther, status.TwitchLogin, "Marking offline", fmt.Sprintf("%#v", status)) _, err := conn.Exec(ctx, ` DELETE FROM twitch_stream WHERE twitch_id = $1 @@ -592,3 +603,35 @@ func isStatusRelevant(status *streamStatus) bool { } return false } + +func twitchLog(ctx context.Context, conn db.ConnOrTx, logType models.TwitchLogType, login string, message string, payload string) { + _, err := conn.Exec(ctx, + ` + INSERT INTO twitch_log (logged_at, twitch_login, type, message, payload) + VALUES ($1, $2, $3, $4, $5) + `, + time.Now(), + login, + logType, + message, + payload, + ) + if err != nil { + log := logging.ExtractLogger(ctx).With().Str("twitch goroutine", "twitch logger").Logger() + log.Error().Err(err).Msg("Failed to log twitch event") + } +} + +func twitchLogClear(ctx context.Context, conn db.ConnOrTx) { + _, err := conn.Exec(ctx, + ` + DELETE FROM twitch_log + WHERE timestamp <= $1 + `, + time.Now().Add(-(time.Hour * 24 * 4)), + ) + if err != nil { + log := logging.ExtractLogger(ctx).With().Str("twitch goroutine", "twitch logger").Logger() + log.Error().Err(err).Msg("Failed to clear old twitch logs") + } +} diff --git a/src/website/twitch.go b/src/website/twitch.go index 0af15c2..d2c7809 100644 --- a/src/website/twitch.go +++ b/src/website/twitch.go @@ -11,8 +11,10 @@ import ( "git.handmade.network/hmn/hmn/src/config" "git.handmade.network/hmn/hmn/src/db" + "git.handmade.network/hmn/hmn/src/hmndata" "git.handmade.network/hmn/hmn/src/models" "git.handmade.network/hmn/hmn/src/oops" + "git.handmade.network/hmn/hmn/src/templates" "git.handmade.network/hmn/hmn/src/twitch" ) @@ -58,7 +60,7 @@ func TwitchEventSubCallback(c *RequestContext) ResponseData { res.Write([]byte(data.Challenge)) return res } else { - err := twitch.QueueTwitchNotification(messageType, body) + err := twitch.QueueTwitchNotification(c, c.Conn, messageType, body) if err != nil { c.Logger.Error().Err(err).Msg("Failed to process twitch callback") // NOTE(asaf): Returning 200 either way here @@ -69,25 +71,91 @@ func TwitchEventSubCallback(c *RequestContext) ResponseData { } } +type TwitchDebugData struct { + templates.BaseData + DataJson string +} + func TwitchDebugPage(c *RequestContext) ResponseData { - streams, err := db.Query[models.TwitchStream](c, c.Conn, + type dataUser struct { + Login string `json:"login"` + Live bool `json:"live"` + } + type dataLog struct { + ID int `json:"id"` + LoggedAt int64 `json:"loggedAt"` + Type string `json:"type"` + Login string `json:"login"` + Message string `json:"message"` + Payload string `json:"payload"` + } + type dataJson struct { + Users []dataUser `json:"users"` + Logs []dataLog `json:"logs"` + } + streamers, err := hmndata.FetchTwitchStreamers(c, c.Conn) + if err != nil { + return c.ErrorResponse(http.StatusInternalServerError, oops.New(err, "failed to fetch twitch streamers")) + } + live, err := db.Query[models.TwitchStream](c, c.Conn, ` SELECT $columns FROM twitch_stream - ORDER BY started_at DESC `, ) if err != nil { - return c.ErrorResponse(http.StatusInternalServerError, oops.New(err, "failed to fetch twitch streams")) + return c.ErrorResponse(http.StatusInternalServerError, oops.New(err, "failed to fetch live twitch streamers")) + } + logs, err := db.Query[models.TwitchLog](c, c.Conn, + ` + SELECT $columns + FROM twitch_log + ORDER BY logged_at DESC, id DESC + `, + ) + if err != nil { + return c.ErrorResponse(http.StatusInternalServerError, oops.New(err, "failed to fetch twitch logs")) } - html := "" - for _, s := range streams { - html += fmt.Sprintf(`%s%s
`, s.Login, s.Login, s.Title) + var data dataJson + for _, u := range streamers { + var user dataUser + user.Login = u.TwitchLogin + user.Live = false + for _, l := range live { + if l.Login == u.TwitchLogin { + user.Live = true + break + } + } + data.Users = append(data.Users, user) + } + messageTypes := []string{ + "", + "Other", + "Hook", + "REST", + } + data.Logs = make([]dataLog, 0, 0) + for _, l := range logs { + var log dataLog + log.ID = l.ID + log.LoggedAt = l.LoggedAt.UnixMilli() + log.Login = l.Login + log.Type = messageTypes[l.Type] + log.Message = l.Message + log.Payload = l.Payload + data.Logs = append(data.Logs, log) + } + jsonStr, err := json.Marshal(data) + if err != nil { + return c.ErrorResponse(http.StatusInternalServerError, oops.New(err, "failed to stringify twitch logs")) } var res ResponseData - res.StatusCode = 200 - res.Write([]byte(html)) + res.MustWriteTemplate("twitch_debug.html", TwitchDebugData{ + BaseData: getBaseDataAutocrumb(c, "Twitch Debug"), + DataJson: string(jsonStr), + }, c.Perf) return res }