Merge branch 'streams_update'

This commit is contained in:
Asaf Gartner 2022-05-30 17:33:52 +03:00
commit 02d51a8bfe
6 changed files with 391 additions and 42 deletions

View File

@ -184,8 +184,26 @@ func GetGuildMember(ctx context.Context, guildID, userID string) (*GuildMember,
return &msg, nil return &msg, nil
} }
type MentionType string
const (
MentionTypeUsers MentionType = "users"
MentionTypeRoles = "roles"
MentionTypeEveryone = "everyone"
)
type MessageAllowedMentions struct {
Parse []MentionType `json:"parse"`
}
const (
FlagSuppressEmbeds int = 1 << 2
)
type CreateMessageRequest struct { type CreateMessageRequest struct {
Content string `json:"content"` Content string `json:"content"`
Flags int `json:"flags,omitempty"`
AllowedMentions *MessageAllowedMentions `json:"allowed_mentions,omitempty"`
} }
func CreateMessage(ctx context.Context, channelID string, payloadJSON string, files ...FileUpload) (*Message, error) { func CreateMessage(ctx context.Context, channelID string, payloadJSON string, files ...FileUpload) (*Message, error) {
@ -226,6 +244,44 @@ func CreateMessage(ctx context.Context, channelID string, payloadJSON string, fi
return &msg, nil return &msg, nil
} }
func EditMessage(ctx context.Context, channelID string, messageID string, payloadJSON string, files ...FileUpload) (*Message, error) {
const name = "Edit Message"
contentType, body := makeNewMessageBody(payloadJSON, files)
path := fmt.Sprintf("/channels/%s/messages/%s", channelID, messageID)
res, err := doWithRateLimiting(ctx, name, func(ctx context.Context) *http.Request {
req := makeRequest(ctx, http.MethodPatch, path, body)
req.Header.Add("Content-Type", contentType)
return req
})
if err != nil {
return nil, err
}
defer res.Body.Close()
if res.StatusCode >= 400 {
logErrorResponse(ctx, name, res, "")
return nil, oops.New(nil, "received error from Discord")
}
// Maybe in the future we could more nicely handle errors like "bad channel",
// but honestly what are the odds that we mess that up...
bodyBytes, err := io.ReadAll(res.Body)
if err != nil {
panic(err)
}
var msg Message
err = json.Unmarshal(bodyBytes, &msg)
if err != nil {
return nil, oops.New(err, "failed to unmarshal Discord message")
}
return &msg, nil
}
func DeleteMessage(ctx context.Context, channelID string, messageID string) error { func DeleteMessage(ctx context.Context, channelID string, messageID string) error {
const name = "Delete Message" const name = "Delete Message"

139
src/discord/streams.go Normal file
View File

@ -0,0 +1,139 @@
package discord
import (
"context"
"encoding/json"
"fmt"
"strings"
"git.handmade.network/hmn/hmn/src/config"
"git.handmade.network/hmn/hmn/src/db"
"git.handmade.network/hmn/hmn/src/hmndata"
"git.handmade.network/hmn/hmn/src/oops"
)
// NOTE(asaf): Updates or creates a discord message according to the following rules:
// Create when:
// * No previous message exists
// * We have non-zero live streamers
// * Message exists, but we're adding a new streamer that wasn't in the existing message
// * Message exists, but is not the most recent message in the channel
// Update otherwise
// That way we ensure that the message doesn't get scrolled offscreen, and the
// new message indicator for the channel doesn't trigger when a streamer goes offline or
// updates the stream title.
// NOTE(asaf): No-op if StreamsChannelID is not specified in the config
func UpdateStreamers(ctx context.Context, dbConn db.ConnOrTx, streamers []hmndata.StreamDetails) error {
if len(config.Config.Discord.StreamsChannelID) == 0 {
return nil
}
livestreamMessage, err := hmndata.FetchPersistentVar[hmndata.DiscordLivestreamMessage](
ctx,
dbConn,
hmndata.VarNameDiscordLivestreamMessage,
)
editExisting := true
if err != nil {
if err == db.NotFound {
editExisting = false
} else {
return oops.New(err, "failed to fetch last message persistent var from db")
}
}
if editExisting {
_, err := GetChannelMessage(ctx, config.Config.Discord.StreamsChannelID, livestreamMessage.MessageID)
if err != nil {
if err == NotFound {
editExisting = false
} else {
oops.New(err, "failed to fetch existing message from discord")
}
}
}
if editExisting {
existingStreamers := livestreamMessage.Streamers
for _, s := range streamers {
found := false
for _, es := range existingStreamers {
if es.Username == s.Username {
found = true
break
}
}
if !found {
editExisting = false
break
}
}
}
if editExisting && len(streamers) > 0 {
messages, err := GetChannelMessages(ctx, config.Config.Discord.StreamsChannelID, GetChannelMessagesInput{
Limit: 1,
})
if err != nil {
return oops.New(err, "failed to fetch messages from discord")
}
if len(messages) == 0 || messages[0].ID != livestreamMessage.MessageID {
editExisting = false
}
}
messageContent := ""
if len(streamers) == 0 {
messageContent = "No one is currently streaming."
} else {
var builder strings.Builder
for _, s := range streamers {
builder.WriteString(fmt.Sprintf(":red_circle: **%s** is live: <https://twitch.tv/%s>\n> _%s_\nStarted <t:%d:R>\n\n", s.Username, s.Username, s.Title, s.StartTime.Unix()))
}
messageContent = builder.String()
}
msgJson, err := json.Marshal(CreateMessageRequest{
Content: messageContent,
Flags: FlagSuppressEmbeds,
AllowedMentions: &MessageAllowedMentions{},
})
if err != nil {
return oops.New(err, "failed to marshal discord message")
}
newMessageID := ""
if editExisting {
updatedMessage, err := EditMessage(ctx, config.Config.Discord.StreamsChannelID, livestreamMessage.MessageID, string(msgJson))
if err != nil {
return oops.New(err, "failed to update discord message for streams channel")
}
newMessageID = updatedMessage.ID
} else {
if livestreamMessage != nil {
err = DeleteMessage(ctx, config.Config.Discord.StreamsChannelID, livestreamMessage.MessageID)
if err != nil {
return oops.New(err, "failed to delete existing discord message from streams channel")
}
}
sentMessage, err := CreateMessage(ctx, config.Config.Discord.StreamsChannelID, string(msgJson))
if err != nil {
return oops.New(err, "failed to create discord message for streams channel")
}
newMessageID = sentMessage.ID
}
data := hmndata.DiscordLivestreamMessage{
MessageID: newMessageID,
Streamers: streamers,
}
err = hmndata.StorePersistentVar(ctx, dbConn, hmndata.VarNameDiscordLivestreamMessage, &data)
if err != nil {
return oops.New(err, "failed to store persistent var for discord streams")
}
return nil
}

View File

@ -0,0 +1,86 @@
package hmndata
import (
"context"
"encoding/json"
"time"
"git.handmade.network/hmn/hmn/src/db"
"git.handmade.network/hmn/hmn/src/models"
"git.handmade.network/hmn/hmn/src/oops"
)
type PersistentVarName string
const (
VarNameDiscordLivestreamMessage PersistentVarName = "discord_livestream_message"
)
type StreamDetails struct {
Username string `json:"username"`
StartTime time.Time `json:"start_time"`
Title string `json:"title"`
}
type DiscordLivestreamMessage struct {
MessageID string `json:"message_id"`
Streamers []StreamDetails `json:"streamers"`
}
// NOTE(asaf): Returns db.NotFound if the variable isn't in the db.
func FetchPersistentVar[T any](
ctx context.Context,
dbConn db.ConnOrTx,
varName PersistentVarName,
) (*T, error) {
persistentVar, err := db.QueryOne[models.PersistentVar](ctx, dbConn,
`
SELECT $columns
FROM persistent_var
WHERE name = $1
`,
varName,
)
if err != nil {
return nil, err
}
jsonString := persistentVar.Value
var result T
err = json.Unmarshal([]byte(jsonString), &result)
if err != nil {
return nil, oops.New(err, "failed to unmarshal persistent var value")
}
return &result, nil
}
func StorePersistentVar[T any](
ctx context.Context,
dbConn db.ConnOrTx,
name PersistentVarName,
value *T,
) error {
jsonString, err := json.Marshal(value)
if err != nil {
return oops.New(err, "failed to marshal variable")
}
_, err = dbConn.Exec(ctx,
`
INSERT INTO persistent_var (name, value)
VALUES ($1, $2)
ON CONFLICT (name) DO UPDATE SET
value = EXCLUDED.value
`,
name,
jsonString,
)
if err != nil {
return oops.New(err, "failed to insert var to db")
}
return nil
}

View File

@ -0,0 +1,57 @@
package migrations
import (
"context"
"time"
"git.handmade.network/hmn/hmn/src/migration/types"
"git.handmade.network/hmn/hmn/src/oops"
"github.com/jackc/pgx/v4"
)
func init() {
registerMigration(AddPersistentVars{})
}
type AddPersistentVars struct{}
func (m AddPersistentVars) Version() types.MigrationVersion {
return types.MigrationVersion(time.Date(2022, 5, 26, 14, 45, 17, 0, time.UTC))
}
func (m AddPersistentVars) Name() string {
return "AddPersistentVars"
}
func (m AddPersistentVars) Description() string {
return "Create table for persistent_vars"
}
func (m AddPersistentVars) Up(ctx context.Context, tx pgx.Tx) error {
_, err := tx.Exec(ctx,
`
CREATE TABLE persistent_var (
name VARCHAR(255) NOT NULL,
value TEXT NOT NULL
);
CREATE UNIQUE INDEX persistent_var_name ON persistent_var (name);
`,
)
if err != nil {
return oops.New(err, "failed to create persistent_var table")
}
return nil
}
func (m AddPersistentVars) Down(ctx context.Context, tx pgx.Tx) error {
_, err := tx.Exec(ctx,
`
DROP INDEX persistent_var_name;
DROP TABLE persistent_var;
`,
)
if err != nil {
return oops.New(err, "failed to drop persistent_var table")
}
return nil
}

View File

@ -0,0 +1,6 @@
package models
type PersistentVar struct {
Name string `db:"name"`
Value string `db:"value"`
}

View File

@ -3,7 +3,6 @@ package twitch
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"fmt"
"time" "time"
"git.handmade.network/hmn/hmn/src/config" "git.handmade.network/hmn/hmn/src/config"
@ -327,7 +326,7 @@ func syncWithTwitch(ctx context.Context, dbConn *pgxpool.Pool, updateAll bool) {
} }
p.StartBlock("SQL", "Remove untracked streamers") p.StartBlock("SQL", "Remove untracked streamers")
_, err = tx.Exec(ctx, _, err = tx.Exec(ctx,
`DELETE FROM twitch_streams WHERE twitch_id != ANY($1)`, `DELETE FROM twitch_stream WHERE twitch_id != ANY($1)`,
allIDs, allIDs,
) )
if err != nil { if err != nil {
@ -362,7 +361,7 @@ func syncWithTwitch(ctx context.Context, dbConn *pgxpool.Pool, updateAll bool) {
p.StartBlock("SQL", "Update stream statuses in db") p.StartBlock("SQL", "Update stream statuses in db")
for _, status := range statuses { for _, status := range statuses {
log.Debug().Interface("Status", status).Msg("Got streamer") log.Debug().Interface("Status", status).Msg("Got streamer")
_, err = updateStreamStatusInDB(ctx, tx, &status) err = updateStreamStatusInDB(ctx, tx, &status)
if err != nil { if err != nil {
log.Error().Err(err).Msg("failed to update twitch stream status") log.Error().Err(err).Msg("failed to update twitch stream status")
} }
@ -374,19 +373,41 @@ func syncWithTwitch(ctx context.Context, dbConn *pgxpool.Pool, updateAll bool) {
} }
stats.NumStreamsChecked += len(usersToUpdate) stats.NumStreamsChecked += len(usersToUpdate)
log.Info().Interface("Stats", stats).Msg("Twitch sync done") log.Info().Interface("Stats", stats).Msg("Twitch sync done")
log.Debug().Msg("Notifying discord")
err = notifyDiscordOfLiveStream(ctx, dbConn)
if err != nil {
log.Error().Err(err).Msg("failed to notify discord")
}
} }
func notifyDiscordOfLiveStream(ctx context.Context, dbConn db.ConnOrTx, twitchLogin string, title string) error { func notifyDiscordOfLiveStream(ctx context.Context, dbConn db.ConnOrTx) error {
var err error streams, err := db.Query[models.TwitchStream](ctx, dbConn,
if config.Config.Discord.StreamsChannelID != "" { `
err = discord.SendMessages(ctx, dbConn, discord.MessageToSend{ SELECT $columns
ChannelID: config.Config.Discord.StreamsChannelID, FROM
Req: discord.CreateMessageRequest{ twitch_stream
Content: fmt.Sprintf("%s is live: https://twitch.tv/%s\n> %s", twitchLogin, twitchLogin, title), ORDER BY started_at ASC
}, `,
)
if err != nil {
return oops.New(err, "failed to fetch livestreams from db")
}
var streamDetails []hmndata.StreamDetails
for _, s := range streams {
streamDetails = append(streamDetails, hmndata.StreamDetails{
Username: s.Login,
StartTime: s.StartedAt,
Title: s.Title,
}) })
} }
return err
err = discord.UpdateStreamers(ctx, dbConn, streamDetails)
if err != nil {
return oops.New(err, "failed to update discord with livestream info")
}
return nil
} }
func processEventSubNotification(ctx context.Context, dbConn db.ConnOrTx, notification *twitchNotification) { func processEventSubNotification(ctx context.Context, dbConn db.ConnOrTx, notification *twitchNotification) {
@ -421,41 +442,25 @@ func processEventSubNotification(ctx context.Context, dbConn db.ConnOrTx, notifi
} }
log.Debug().Interface("Status", status).Msg("Updating status") log.Debug().Interface("Status", status).Msg("Updating status")
inserted, err := updateStreamStatusInDB(ctx, dbConn, &status) err = updateStreamStatusInDB(ctx, dbConn, &status)
if err != nil { if err != nil {
log.Error().Err(err).Msg("failed to update twitch stream status") log.Error().Err(err).Msg("failed to update twitch stream status")
} }
if inserted {
log.Debug().Msg("Notifying discord") log.Debug().Msg("Notifying discord")
err = notifyDiscordOfLiveStream(ctx, dbConn, status.TwitchLogin, status.Title) err = notifyDiscordOfLiveStream(ctx, dbConn)
if err != nil { if err != nil {
log.Error().Err(err).Msg("failed to notify discord") log.Error().Err(err).Msg("failed to notify discord")
}
} }
} }
func updateStreamStatusInDB(ctx context.Context, conn db.ConnOrTx, status *streamStatus) (bool, error) { func updateStreamStatusInDB(ctx context.Context, conn db.ConnOrTx, status *streamStatus) error {
log := logging.ExtractLogger(ctx) log := logging.ExtractLogger(ctx)
inserted := false
if isStatusRelevant(status) { if isStatusRelevant(status) {
log.Debug().Msg("Status relevant") log.Debug().Msg("Status relevant")
_, err := db.QueryOne[models.TwitchStream](ctx, conn, _, err := conn.Exec(ctx,
` `
SELECT $columns INSERT INTO twitch_stream (twitch_id, twitch_login, title, started_at)
FROM twitch_streams
WHERE twitch_id = $1
`,
status.TwitchID,
)
if err == db.NotFound {
log.Debug().Msg("Inserting new stream")
inserted = true
} else if err != nil {
return false, oops.New(err, "failed to query existing stream")
}
_, err = conn.Exec(ctx,
`
INSERT INTO twitch_streams (twitch_id, twitch_login, title, started_at)
VALUES ($1, $2, $3, $4) VALUES ($1, $2, $3, $4)
ON CONFLICT (twitch_id) DO UPDATE SET ON CONFLICT (twitch_id) DO UPDATE SET
title = EXCLUDED.title, title = EXCLUDED.title,
@ -467,21 +472,21 @@ func updateStreamStatusInDB(ctx context.Context, conn db.ConnOrTx, status *strea
status.StartedAt, status.StartedAt,
) )
if err != nil { if err != nil {
return false, oops.New(err, "failed to insert twitch streamer into db") return oops.New(err, "failed to insert twitch streamer into db")
} }
} else { } else {
log.Debug().Msg("Stream not relevant") log.Debug().Msg("Stream not relevant")
_, err := conn.Exec(ctx, _, err := conn.Exec(ctx,
` `
DELETE FROM twitch_streams WHERE twitch_id = $1 DELETE FROM twitch_stream WHERE twitch_id = $1
`, `,
status.TwitchID, status.TwitchID,
) )
if err != nil { if err != nil {
return false, oops.New(err, "failed to remove twitch streamer from db") return oops.New(err, "failed to remove twitch streamer from db")
} }
} }
return inserted, nil return nil
} }
var RelevantCategories = []string{ var RelevantCategories = []string{