From 8495982d3f009032464da9f007c1b4f7ddce472d Mon Sep 17 00:00:00 2001 From: Asaf Gartner Date: Fri, 27 May 2022 11:37:43 +0300 Subject: [PATCH] Added persistent vars and improved stream tracking on discord. --- src/discord/rest.go | 58 +++++++- src/discord/streams.go | 140 ++++++++++++++++++ src/hmndata/persistent_vars.go | 86 +++++++++++ .../2022-05-26T144517Z_AddPersistentVars.go | 57 +++++++ src/models/persistent_vars.go | 6 + src/twitch/twitch.go | 87 ++++++----- 6 files changed, 392 insertions(+), 42 deletions(-) create mode 100644 src/discord/streams.go create mode 100644 src/hmndata/persistent_vars.go create mode 100644 src/migration/migrations/2022-05-26T144517Z_AddPersistentVars.go create mode 100644 src/models/persistent_vars.go diff --git a/src/discord/rest.go b/src/discord/rest.go index 8e8f5f70..91248582 100644 --- a/src/discord/rest.go +++ b/src/discord/rest.go @@ -184,8 +184,26 @@ func GetGuildMember(ctx context.Context, guildID, userID string) (*GuildMember, return &msg, nil } +type MentionType string + +const ( + MentionTypeUsers MentionType = "users" + MentionTypeRoles = "roles" + MentionTypeEveryone = "everyone" +) + +type MessageAllowedMentions struct { + Parse []MentionType `json:"parse"` +} + +const ( + FlagSuppressEmbeds int = 1 << 2 +) + type CreateMessageRequest struct { - Content string `json:"content"` + Content string `json:"content"` + Flags int `json:"flags,omitempty"` + AllowedMentions *MessageAllowedMentions `json:"allowed_mentions,omitempty"` } func CreateMessage(ctx context.Context, channelID string, payloadJSON string, files ...FileUpload) (*Message, error) { @@ -226,6 +244,44 @@ func CreateMessage(ctx context.Context, channelID string, payloadJSON string, fi return &msg, nil } +func EditMessage(ctx context.Context, channelID string, messageID string, payloadJSON string, files ...FileUpload) (*Message, error) { + const name = "Edit Message" + + contentType, body := makeNewMessageBody(payloadJSON, files) + + path := fmt.Sprintf("/channels/%s/messages/%s", channelID, messageID) + res, err := doWithRateLimiting(ctx, name, func(ctx context.Context) *http.Request { + req := makeRequest(ctx, http.MethodPatch, path, body) + req.Header.Add("Content-Type", contentType) + return req + }) + if err != nil { + return nil, err + } + defer res.Body.Close() + + if res.StatusCode >= 400 { + logErrorResponse(ctx, name, res, "") + return nil, oops.New(nil, "received error from Discord") + } + + // Maybe in the future we could more nicely handle errors like "bad channel", + // but honestly what are the odds that we mess that up... + + bodyBytes, err := io.ReadAll(res.Body) + if err != nil { + panic(err) + } + + var msg Message + err = json.Unmarshal(bodyBytes, &msg) + if err != nil { + return nil, oops.New(err, "failed to unmarshal Discord message") + } + + return &msg, nil +} + func DeleteMessage(ctx context.Context, channelID string, messageID string) error { const name = "Delete Message" diff --git a/src/discord/streams.go b/src/discord/streams.go new file mode 100644 index 00000000..031f4a27 --- /dev/null +++ b/src/discord/streams.go @@ -0,0 +1,140 @@ +package discord + +import ( + "context" + "encoding/json" + "fmt" + "strings" + + "git.handmade.network/hmn/hmn/src/config" + "git.handmade.network/hmn/hmn/src/db" + "git.handmade.network/hmn/hmn/src/hmndata" + "git.handmade.network/hmn/hmn/src/oops" +) + +// NOTE(asaf): Updates or creates a discord message according to the following rules: +// Create when: +// * No previous message exists +// * We have non-zero live streamers +// * Message exists, but we're adding a new streamer that wasn't in the existing message +// * Message exists, but is not the most recent message in the channel +// Update otherwise +// That way we ensure that the message doesn't get scrolled offscreen, and the +// new message indicator for the channel doesn't trigger when a streamer goes offline or +// updates the stream title. +// NOTE(asaf): No-op if StreamsChannelID is not specified in the config +func UpdateStreamers(ctx context.Context, dbConn db.ConnOrTx, streamers []hmndata.StreamDetails) error { + if len(config.Config.Discord.StreamsChannelID) == 0 { + return nil + } + + livestreamMessage, err := hmndata.FetchPersistentVar[hmndata.DiscordLivestreamMessage]( + ctx, + dbConn, + hmndata.VarNameDiscordLivestreamMessage, + ) + editExisting := true + if err != nil { + if err == db.NotFound { + editExisting = false + } else { + return oops.New(err, "failed to fetch last message persistent var from db") + } + } + + if editExisting { + _, err := GetChannelMessage(ctx, config.Config.Discord.StreamsChannelID, livestreamMessage.MessageID) + if err != nil { + if err == NotFound { + editExisting = false + } else { + oops.New(err, "failed to fetch existing message from discord") + } + } + } + + if editExisting { + existingStreamers := livestreamMessage.Streamers + for _, s := range streamers { + found := false + for _, es := range existingStreamers { + if es.Username == s.Username { + found = true + break + } + } + if !found { + editExisting = false + break + } + } + } + + if editExisting && len(streamers) > 0 { + messages, err := GetChannelMessages(ctx, config.Config.Discord.StreamsChannelID, GetChannelMessagesInput{ + Limit: 1, + }) + if err != nil { + return oops.New(err, "failed to fetch messages from discord") + } + if len(messages) == 0 || messages[0].ID != livestreamMessage.MessageID { + editExisting = false + } + } + + messageContent := "" + if len(streamers) == 0 { + messageContent = "No one is currently streaming." + } else { + var builder strings.Builder + for _, s := range streamers { + builder.WriteString(fmt.Sprintf(":green_circle: %s is live: \n", s.Username, s.Username, s.StartTime.Unix())) + builder.WriteString(fmt.Sprintf("> %s", s.Title)) + } + messageContent = builder.String() + } + + msgJson, err := json.Marshal(CreateMessageRequest{ + Content: messageContent, + Flags: FlagSuppressEmbeds, + AllowedMentions: &MessageAllowedMentions{}, + }) + if err != nil { + return oops.New(err, "failed to marshal discord message") + } + + newMessageID := "" + if editExisting { + updatedMessage, err := EditMessage(ctx, config.Config.Discord.StreamsChannelID, livestreamMessage.MessageID, string(msgJson)) + if err != nil { + return oops.New(err, "failed to update discord message for streams channel") + } + + newMessageID = updatedMessage.ID + } else { + if livestreamMessage != nil { + err = DeleteMessage(ctx, config.Config.Discord.StreamsChannelID, livestreamMessage.MessageID) + if err != nil { + return oops.New(err, "failed to delete existing discord message from streams channel") + } + } + + sentMessage, err := CreateMessage(ctx, config.Config.Discord.StreamsChannelID, string(msgJson)) + if err != nil { + return oops.New(err, "failed to create discord message for streams channel") + } + + newMessageID = sentMessage.ID + } + + data := hmndata.DiscordLivestreamMessage{ + MessageID: newMessageID, + Streamers: streamers, + } + err = hmndata.StorePersistentVar(ctx, dbConn, hmndata.VarNameDiscordLivestreamMessage, &data) + if err != nil { + return oops.New(err, "failed to store persistent var for discord streams") + } + + return nil +} diff --git a/src/hmndata/persistent_vars.go b/src/hmndata/persistent_vars.go new file mode 100644 index 00000000..f326b988 --- /dev/null +++ b/src/hmndata/persistent_vars.go @@ -0,0 +1,86 @@ +package hmndata + +import ( + "context" + "encoding/json" + "time" + + "git.handmade.network/hmn/hmn/src/db" + "git.handmade.network/hmn/hmn/src/models" + "git.handmade.network/hmn/hmn/src/oops" +) + +type PersistentVarName string + +const ( + VarNameDiscordLivestreamMessage PersistentVarName = "discord_livestream_message" +) + +type StreamDetails struct { + Username string `json:"username"` + StartTime time.Time `json:"start_time"` + Title string `json:"title"` +} + +type DiscordLivestreamMessage struct { + MessageID string `json:"message_id"` + Streamers []StreamDetails `json:"streamers"` +} + +// NOTE(asaf): Returns db.NotFound if the variable isn't in the db. +func FetchPersistentVar[T any]( + ctx context.Context, + dbConn db.ConnOrTx, + varName PersistentVarName, +) (*T, error) { + persistentVar, err := db.QueryOne[models.PersistentVar](ctx, dbConn, + ` + SELECT $columns + FROM persistent_var + WHERE name = $1 + `, + varName, + ) + + if err != nil { + return nil, err + } + + jsonString := persistentVar.Value + var result T + err = json.Unmarshal([]byte(jsonString), &result) + if err != nil { + return nil, oops.New(err, "failed to unmarshal persistent var value") + } + + return &result, nil +} + +func StorePersistentVar[T any]( + ctx context.Context, + dbConn db.ConnOrTx, + name PersistentVarName, + value *T, +) error { + jsonString, err := json.Marshal(value) + if err != nil { + return oops.New(err, "failed to marshal variable") + } + + _, err = dbConn.Exec(ctx, + ` + INSERT INTO persistent_var (name, value) + VALUES ($1, $2) + ON CONFLICT (name) DO UPDATE SET + value = EXCLUDED.value + `, + name, + jsonString, + ) + + if err != nil { + return oops.New(err, "failed to insert var to db") + } + + return nil +} diff --git a/src/migration/migrations/2022-05-26T144517Z_AddPersistentVars.go b/src/migration/migrations/2022-05-26T144517Z_AddPersistentVars.go new file mode 100644 index 00000000..1ed3c4df --- /dev/null +++ b/src/migration/migrations/2022-05-26T144517Z_AddPersistentVars.go @@ -0,0 +1,57 @@ +package migrations + +import ( + "context" + "time" + + "git.handmade.network/hmn/hmn/src/migration/types" + "git.handmade.network/hmn/hmn/src/oops" + "github.com/jackc/pgx/v4" +) + +func init() { + registerMigration(AddPersistentVars{}) +} + +type AddPersistentVars struct{} + +func (m AddPersistentVars) Version() types.MigrationVersion { + return types.MigrationVersion(time.Date(2022, 5, 26, 14, 45, 17, 0, time.UTC)) +} + +func (m AddPersistentVars) Name() string { + return "AddPersistentVars" +} + +func (m AddPersistentVars) Description() string { + return "Create table for persistent_vars" +} + +func (m AddPersistentVars) Up(ctx context.Context, tx pgx.Tx) error { + _, err := tx.Exec(ctx, + ` + CREATE TABLE persistent_var ( + name VARCHAR(255) NOT NULL, + value TEXT NOT NULL + ); + CREATE UNIQUE INDEX persistent_var_name ON persistent_var (name); + `, + ) + if err != nil { + return oops.New(err, "failed to create persistent_var table") + } + return nil +} + +func (m AddPersistentVars) Down(ctx context.Context, tx pgx.Tx) error { + _, err := tx.Exec(ctx, + ` + DROP INDEX persistent_var_name; + DROP TABLE persistent_var; + `, + ) + if err != nil { + return oops.New(err, "failed to drop persistent_var table") + } + return nil +} diff --git a/src/models/persistent_vars.go b/src/models/persistent_vars.go new file mode 100644 index 00000000..9a78bf96 --- /dev/null +++ b/src/models/persistent_vars.go @@ -0,0 +1,6 @@ +package models + +type PersistentVar struct { + Name string `db:"name"` + Value string `db:"value"` +} diff --git a/src/twitch/twitch.go b/src/twitch/twitch.go index 5d5a4cc7..ed8d31f4 100644 --- a/src/twitch/twitch.go +++ b/src/twitch/twitch.go @@ -3,7 +3,6 @@ package twitch import ( "context" "encoding/json" - "fmt" "time" "git.handmade.network/hmn/hmn/src/config" @@ -327,7 +326,7 @@ func syncWithTwitch(ctx context.Context, dbConn *pgxpool.Pool, updateAll bool) { } p.StartBlock("SQL", "Remove untracked streamers") _, err = tx.Exec(ctx, - `DELETE FROM twitch_streams WHERE twitch_id != ANY($1)`, + `DELETE FROM twitch_stream WHERE twitch_id != ANY($1)`, allIDs, ) if err != nil { @@ -362,7 +361,7 @@ func syncWithTwitch(ctx context.Context, dbConn *pgxpool.Pool, updateAll bool) { p.StartBlock("SQL", "Update stream statuses in db") for _, status := range statuses { log.Debug().Interface("Status", status).Msg("Got streamer") - _, err = updateStreamStatusInDB(ctx, tx, &status) + err = updateStreamStatusInDB(ctx, tx, &status) if err != nil { log.Error().Err(err).Msg("failed to update twitch stream status") } @@ -374,19 +373,41 @@ func syncWithTwitch(ctx context.Context, dbConn *pgxpool.Pool, updateAll bool) { } stats.NumStreamsChecked += len(usersToUpdate) log.Info().Interface("Stats", stats).Msg("Twitch sync done") + + log.Debug().Msg("Notifying discord") + err = notifyDiscordOfLiveStream(ctx, dbConn) + if err != nil { + log.Error().Err(err).Msg("failed to notify discord") + } } -func notifyDiscordOfLiveStream(ctx context.Context, dbConn db.ConnOrTx, twitchLogin string, title string) error { - var err error - if config.Config.Discord.StreamsChannelID != "" { - err = discord.SendMessages(ctx, dbConn, discord.MessageToSend{ - ChannelID: config.Config.Discord.StreamsChannelID, - Req: discord.CreateMessageRequest{ - Content: fmt.Sprintf("%s is live: https://twitch.tv/%s\n> %s", twitchLogin, twitchLogin, title), - }, +func notifyDiscordOfLiveStream(ctx context.Context, dbConn db.ConnOrTx) error { + streams, err := db.Query[models.TwitchStream](ctx, dbConn, + ` + SELECT $columns + FROM + twitch_stream + ORDER BY started_at DESC + `, + ) + if err != nil { + return oops.New(err, "failed to fetch livestreams from db") + } + + var streamDetails []hmndata.StreamDetails + for _, s := range streams { + streamDetails = append(streamDetails, hmndata.StreamDetails{ + Username: s.Login, + StartTime: s.StartedAt, + Title: s.Title, }) } - return err + + err = discord.UpdateStreamers(ctx, dbConn, streamDetails) + if err != nil { + return oops.New(err, "failed to update discord with livestream info") + } + return nil } func processEventSubNotification(ctx context.Context, dbConn db.ConnOrTx, notification *twitchNotification) { @@ -421,41 +442,25 @@ func processEventSubNotification(ctx context.Context, dbConn db.ConnOrTx, notifi } log.Debug().Interface("Status", status).Msg("Updating status") - inserted, err := updateStreamStatusInDB(ctx, dbConn, &status) + err = updateStreamStatusInDB(ctx, dbConn, &status) if err != nil { log.Error().Err(err).Msg("failed to update twitch stream status") } - if inserted { - log.Debug().Msg("Notifying discord") - err = notifyDiscordOfLiveStream(ctx, dbConn, status.TwitchLogin, status.Title) - if err != nil { - log.Error().Err(err).Msg("failed to notify discord") - } + + log.Debug().Msg("Notifying discord") + err = notifyDiscordOfLiveStream(ctx, dbConn) + if err != nil { + log.Error().Err(err).Msg("failed to notify discord") } } -func updateStreamStatusInDB(ctx context.Context, conn db.ConnOrTx, status *streamStatus) (bool, error) { +func updateStreamStatusInDB(ctx context.Context, conn db.ConnOrTx, status *streamStatus) error { log := logging.ExtractLogger(ctx) - inserted := false if isStatusRelevant(status) { log.Debug().Msg("Status relevant") - _, err := db.QueryOne[models.TwitchStream](ctx, conn, + _, err := conn.Exec(ctx, ` - SELECT $columns - FROM twitch_streams - WHERE twitch_id = $1 - `, - status.TwitchID, - ) - if err == db.NotFound { - log.Debug().Msg("Inserting new stream") - inserted = true - } else if err != nil { - return false, oops.New(err, "failed to query existing stream") - } - _, err = conn.Exec(ctx, - ` - INSERT INTO twitch_streams (twitch_id, twitch_login, title, started_at) + INSERT INTO twitch_stream (twitch_id, twitch_login, title, started_at) VALUES ($1, $2, $3, $4) ON CONFLICT (twitch_id) DO UPDATE SET title = EXCLUDED.title, @@ -467,21 +472,21 @@ func updateStreamStatusInDB(ctx context.Context, conn db.ConnOrTx, status *strea status.StartedAt, ) if err != nil { - return false, oops.New(err, "failed to insert twitch streamer into db") + return oops.New(err, "failed to insert twitch streamer into db") } } else { log.Debug().Msg("Stream not relevant") _, err := conn.Exec(ctx, ` - DELETE FROM twitch_streams WHERE twitch_id = $1 + DELETE FROM twitch_stream WHERE twitch_id = $1 `, status.TwitchID, ) if err != nil { - return false, oops.New(err, "failed to remove twitch streamer from db") + return oops.New(err, "failed to remove twitch streamer from db") } } - return inserted, nil + return nil } var RelevantCategories = []string{