Added persistent vars and improved stream tracking on discord.
This commit is contained in:
parent
2f19e6e1b8
commit
8495982d3f
|
@ -184,8 +184,26 @@ func GetGuildMember(ctx context.Context, guildID, userID string) (*GuildMember,
|
||||||
return &msg, nil
|
return &msg, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type MentionType string
|
||||||
|
|
||||||
|
const (
|
||||||
|
MentionTypeUsers MentionType = "users"
|
||||||
|
MentionTypeRoles = "roles"
|
||||||
|
MentionTypeEveryone = "everyone"
|
||||||
|
)
|
||||||
|
|
||||||
|
type MessageAllowedMentions struct {
|
||||||
|
Parse []MentionType `json:"parse"`
|
||||||
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
FlagSuppressEmbeds int = 1 << 2
|
||||||
|
)
|
||||||
|
|
||||||
type CreateMessageRequest struct {
|
type CreateMessageRequest struct {
|
||||||
Content string `json:"content"`
|
Content string `json:"content"`
|
||||||
|
Flags int `json:"flags,omitempty"`
|
||||||
|
AllowedMentions *MessageAllowedMentions `json:"allowed_mentions,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func CreateMessage(ctx context.Context, channelID string, payloadJSON string, files ...FileUpload) (*Message, error) {
|
func CreateMessage(ctx context.Context, channelID string, payloadJSON string, files ...FileUpload) (*Message, error) {
|
||||||
|
@ -226,6 +244,44 @@ func CreateMessage(ctx context.Context, channelID string, payloadJSON string, fi
|
||||||
return &msg, nil
|
return &msg, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func EditMessage(ctx context.Context, channelID string, messageID string, payloadJSON string, files ...FileUpload) (*Message, error) {
|
||||||
|
const name = "Edit Message"
|
||||||
|
|
||||||
|
contentType, body := makeNewMessageBody(payloadJSON, files)
|
||||||
|
|
||||||
|
path := fmt.Sprintf("/channels/%s/messages/%s", channelID, messageID)
|
||||||
|
res, err := doWithRateLimiting(ctx, name, func(ctx context.Context) *http.Request {
|
||||||
|
req := makeRequest(ctx, http.MethodPatch, path, body)
|
||||||
|
req.Header.Add("Content-Type", contentType)
|
||||||
|
return req
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer res.Body.Close()
|
||||||
|
|
||||||
|
if res.StatusCode >= 400 {
|
||||||
|
logErrorResponse(ctx, name, res, "")
|
||||||
|
return nil, oops.New(nil, "received error from Discord")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Maybe in the future we could more nicely handle errors like "bad channel",
|
||||||
|
// but honestly what are the odds that we mess that up...
|
||||||
|
|
||||||
|
bodyBytes, err := io.ReadAll(res.Body)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var msg Message
|
||||||
|
err = json.Unmarshal(bodyBytes, &msg)
|
||||||
|
if err != nil {
|
||||||
|
return nil, oops.New(err, "failed to unmarshal Discord message")
|
||||||
|
}
|
||||||
|
|
||||||
|
return &msg, nil
|
||||||
|
}
|
||||||
|
|
||||||
func DeleteMessage(ctx context.Context, channelID string, messageID string) error {
|
func DeleteMessage(ctx context.Context, channelID string, messageID string) error {
|
||||||
const name = "Delete Message"
|
const name = "Delete Message"
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,140 @@
|
||||||
|
package discord
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"git.handmade.network/hmn/hmn/src/config"
|
||||||
|
"git.handmade.network/hmn/hmn/src/db"
|
||||||
|
"git.handmade.network/hmn/hmn/src/hmndata"
|
||||||
|
"git.handmade.network/hmn/hmn/src/oops"
|
||||||
|
)
|
||||||
|
|
||||||
|
// NOTE(asaf): Updates or creates a discord message according to the following rules:
|
||||||
|
// Create when:
|
||||||
|
// * No previous message exists
|
||||||
|
// * We have non-zero live streamers
|
||||||
|
// * Message exists, but we're adding a new streamer that wasn't in the existing message
|
||||||
|
// * Message exists, but is not the most recent message in the channel
|
||||||
|
// Update otherwise
|
||||||
|
// That way we ensure that the message doesn't get scrolled offscreen, and the
|
||||||
|
// new message indicator for the channel doesn't trigger when a streamer goes offline or
|
||||||
|
// updates the stream title.
|
||||||
|
// NOTE(asaf): No-op if StreamsChannelID is not specified in the config
|
||||||
|
func UpdateStreamers(ctx context.Context, dbConn db.ConnOrTx, streamers []hmndata.StreamDetails) error {
|
||||||
|
if len(config.Config.Discord.StreamsChannelID) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
livestreamMessage, err := hmndata.FetchPersistentVar[hmndata.DiscordLivestreamMessage](
|
||||||
|
ctx,
|
||||||
|
dbConn,
|
||||||
|
hmndata.VarNameDiscordLivestreamMessage,
|
||||||
|
)
|
||||||
|
editExisting := true
|
||||||
|
if err != nil {
|
||||||
|
if err == db.NotFound {
|
||||||
|
editExisting = false
|
||||||
|
} else {
|
||||||
|
return oops.New(err, "failed to fetch last message persistent var from db")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if editExisting {
|
||||||
|
_, err := GetChannelMessage(ctx, config.Config.Discord.StreamsChannelID, livestreamMessage.MessageID)
|
||||||
|
if err != nil {
|
||||||
|
if err == NotFound {
|
||||||
|
editExisting = false
|
||||||
|
} else {
|
||||||
|
oops.New(err, "failed to fetch existing message from discord")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if editExisting {
|
||||||
|
existingStreamers := livestreamMessage.Streamers
|
||||||
|
for _, s := range streamers {
|
||||||
|
found := false
|
||||||
|
for _, es := range existingStreamers {
|
||||||
|
if es.Username == s.Username {
|
||||||
|
found = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !found {
|
||||||
|
editExisting = false
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if editExisting && len(streamers) > 0 {
|
||||||
|
messages, err := GetChannelMessages(ctx, config.Config.Discord.StreamsChannelID, GetChannelMessagesInput{
|
||||||
|
Limit: 1,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return oops.New(err, "failed to fetch messages from discord")
|
||||||
|
}
|
||||||
|
if len(messages) == 0 || messages[0].ID != livestreamMessage.MessageID {
|
||||||
|
editExisting = false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
messageContent := ""
|
||||||
|
if len(streamers) == 0 {
|
||||||
|
messageContent = "No one is currently streaming."
|
||||||
|
} else {
|
||||||
|
var builder strings.Builder
|
||||||
|
for _, s := range streamers {
|
||||||
|
builder.WriteString(fmt.Sprintf(":green_circle: %s is live: <https://twitch.tv/%s> <t:%d:R>\n", s.Username, s.Username, s.StartTime.Unix()))
|
||||||
|
builder.WriteString(fmt.Sprintf("> %s", s.Title))
|
||||||
|
}
|
||||||
|
messageContent = builder.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
msgJson, err := json.Marshal(CreateMessageRequest{
|
||||||
|
Content: messageContent,
|
||||||
|
Flags: FlagSuppressEmbeds,
|
||||||
|
AllowedMentions: &MessageAllowedMentions{},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return oops.New(err, "failed to marshal discord message")
|
||||||
|
}
|
||||||
|
|
||||||
|
newMessageID := ""
|
||||||
|
if editExisting {
|
||||||
|
updatedMessage, err := EditMessage(ctx, config.Config.Discord.StreamsChannelID, livestreamMessage.MessageID, string(msgJson))
|
||||||
|
if err != nil {
|
||||||
|
return oops.New(err, "failed to update discord message for streams channel")
|
||||||
|
}
|
||||||
|
|
||||||
|
newMessageID = updatedMessage.ID
|
||||||
|
} else {
|
||||||
|
if livestreamMessage != nil {
|
||||||
|
err = DeleteMessage(ctx, config.Config.Discord.StreamsChannelID, livestreamMessage.MessageID)
|
||||||
|
if err != nil {
|
||||||
|
return oops.New(err, "failed to delete existing discord message from streams channel")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sentMessage, err := CreateMessage(ctx, config.Config.Discord.StreamsChannelID, string(msgJson))
|
||||||
|
if err != nil {
|
||||||
|
return oops.New(err, "failed to create discord message for streams channel")
|
||||||
|
}
|
||||||
|
|
||||||
|
newMessageID = sentMessage.ID
|
||||||
|
}
|
||||||
|
|
||||||
|
data := hmndata.DiscordLivestreamMessage{
|
||||||
|
MessageID: newMessageID,
|
||||||
|
Streamers: streamers,
|
||||||
|
}
|
||||||
|
err = hmndata.StorePersistentVar(ctx, dbConn, hmndata.VarNameDiscordLivestreamMessage, &data)
|
||||||
|
if err != nil {
|
||||||
|
return oops.New(err, "failed to store persistent var for discord streams")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -0,0 +1,86 @@
|
||||||
|
package hmndata
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.handmade.network/hmn/hmn/src/db"
|
||||||
|
"git.handmade.network/hmn/hmn/src/models"
|
||||||
|
"git.handmade.network/hmn/hmn/src/oops"
|
||||||
|
)
|
||||||
|
|
||||||
|
type PersistentVarName string
|
||||||
|
|
||||||
|
const (
|
||||||
|
VarNameDiscordLivestreamMessage PersistentVarName = "discord_livestream_message"
|
||||||
|
)
|
||||||
|
|
||||||
|
type StreamDetails struct {
|
||||||
|
Username string `json:"username"`
|
||||||
|
StartTime time.Time `json:"start_time"`
|
||||||
|
Title string `json:"title"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type DiscordLivestreamMessage struct {
|
||||||
|
MessageID string `json:"message_id"`
|
||||||
|
Streamers []StreamDetails `json:"streamers"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// NOTE(asaf): Returns db.NotFound if the variable isn't in the db.
|
||||||
|
func FetchPersistentVar[T any](
|
||||||
|
ctx context.Context,
|
||||||
|
dbConn db.ConnOrTx,
|
||||||
|
varName PersistentVarName,
|
||||||
|
) (*T, error) {
|
||||||
|
persistentVar, err := db.QueryOne[models.PersistentVar](ctx, dbConn,
|
||||||
|
`
|
||||||
|
SELECT $columns
|
||||||
|
FROM persistent_var
|
||||||
|
WHERE name = $1
|
||||||
|
`,
|
||||||
|
varName,
|
||||||
|
)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
jsonString := persistentVar.Value
|
||||||
|
var result T
|
||||||
|
err = json.Unmarshal([]byte(jsonString), &result)
|
||||||
|
if err != nil {
|
||||||
|
return nil, oops.New(err, "failed to unmarshal persistent var value")
|
||||||
|
}
|
||||||
|
|
||||||
|
return &result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func StorePersistentVar[T any](
|
||||||
|
ctx context.Context,
|
||||||
|
dbConn db.ConnOrTx,
|
||||||
|
name PersistentVarName,
|
||||||
|
value *T,
|
||||||
|
) error {
|
||||||
|
jsonString, err := json.Marshal(value)
|
||||||
|
if err != nil {
|
||||||
|
return oops.New(err, "failed to marshal variable")
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = dbConn.Exec(ctx,
|
||||||
|
`
|
||||||
|
INSERT INTO persistent_var (name, value)
|
||||||
|
VALUES ($1, $2)
|
||||||
|
ON CONFLICT (name) DO UPDATE SET
|
||||||
|
value = EXCLUDED.value
|
||||||
|
`,
|
||||||
|
name,
|
||||||
|
jsonString,
|
||||||
|
)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return oops.New(err, "failed to insert var to db")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -0,0 +1,57 @@
|
||||||
|
package migrations
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.handmade.network/hmn/hmn/src/migration/types"
|
||||||
|
"git.handmade.network/hmn/hmn/src/oops"
|
||||||
|
"github.com/jackc/pgx/v4"
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
registerMigration(AddPersistentVars{})
|
||||||
|
}
|
||||||
|
|
||||||
|
type AddPersistentVars struct{}
|
||||||
|
|
||||||
|
func (m AddPersistentVars) Version() types.MigrationVersion {
|
||||||
|
return types.MigrationVersion(time.Date(2022, 5, 26, 14, 45, 17, 0, time.UTC))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m AddPersistentVars) Name() string {
|
||||||
|
return "AddPersistentVars"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m AddPersistentVars) Description() string {
|
||||||
|
return "Create table for persistent_vars"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m AddPersistentVars) Up(ctx context.Context, tx pgx.Tx) error {
|
||||||
|
_, err := tx.Exec(ctx,
|
||||||
|
`
|
||||||
|
CREATE TABLE persistent_var (
|
||||||
|
name VARCHAR(255) NOT NULL,
|
||||||
|
value TEXT NOT NULL
|
||||||
|
);
|
||||||
|
CREATE UNIQUE INDEX persistent_var_name ON persistent_var (name);
|
||||||
|
`,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return oops.New(err, "failed to create persistent_var table")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m AddPersistentVars) Down(ctx context.Context, tx pgx.Tx) error {
|
||||||
|
_, err := tx.Exec(ctx,
|
||||||
|
`
|
||||||
|
DROP INDEX persistent_var_name;
|
||||||
|
DROP TABLE persistent_var;
|
||||||
|
`,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return oops.New(err, "failed to drop persistent_var table")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -0,0 +1,6 @@
|
||||||
|
package models
|
||||||
|
|
||||||
|
type PersistentVar struct {
|
||||||
|
Name string `db:"name"`
|
||||||
|
Value string `db:"value"`
|
||||||
|
}
|
|
@ -3,7 +3,6 @@ package twitch
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.handmade.network/hmn/hmn/src/config"
|
"git.handmade.network/hmn/hmn/src/config"
|
||||||
|
@ -327,7 +326,7 @@ func syncWithTwitch(ctx context.Context, dbConn *pgxpool.Pool, updateAll bool) {
|
||||||
}
|
}
|
||||||
p.StartBlock("SQL", "Remove untracked streamers")
|
p.StartBlock("SQL", "Remove untracked streamers")
|
||||||
_, err = tx.Exec(ctx,
|
_, err = tx.Exec(ctx,
|
||||||
`DELETE FROM twitch_streams WHERE twitch_id != ANY($1)`,
|
`DELETE FROM twitch_stream WHERE twitch_id != ANY($1)`,
|
||||||
allIDs,
|
allIDs,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -362,7 +361,7 @@ func syncWithTwitch(ctx context.Context, dbConn *pgxpool.Pool, updateAll bool) {
|
||||||
p.StartBlock("SQL", "Update stream statuses in db")
|
p.StartBlock("SQL", "Update stream statuses in db")
|
||||||
for _, status := range statuses {
|
for _, status := range statuses {
|
||||||
log.Debug().Interface("Status", status).Msg("Got streamer")
|
log.Debug().Interface("Status", status).Msg("Got streamer")
|
||||||
_, err = updateStreamStatusInDB(ctx, tx, &status)
|
err = updateStreamStatusInDB(ctx, tx, &status)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error().Err(err).Msg("failed to update twitch stream status")
|
log.Error().Err(err).Msg("failed to update twitch stream status")
|
||||||
}
|
}
|
||||||
|
@ -374,19 +373,41 @@ func syncWithTwitch(ctx context.Context, dbConn *pgxpool.Pool, updateAll bool) {
|
||||||
}
|
}
|
||||||
stats.NumStreamsChecked += len(usersToUpdate)
|
stats.NumStreamsChecked += len(usersToUpdate)
|
||||||
log.Info().Interface("Stats", stats).Msg("Twitch sync done")
|
log.Info().Interface("Stats", stats).Msg("Twitch sync done")
|
||||||
|
|
||||||
|
log.Debug().Msg("Notifying discord")
|
||||||
|
err = notifyDiscordOfLiveStream(ctx, dbConn)
|
||||||
|
if err != nil {
|
||||||
|
log.Error().Err(err).Msg("failed to notify discord")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func notifyDiscordOfLiveStream(ctx context.Context, dbConn db.ConnOrTx, twitchLogin string, title string) error {
|
func notifyDiscordOfLiveStream(ctx context.Context, dbConn db.ConnOrTx) error {
|
||||||
var err error
|
streams, err := db.Query[models.TwitchStream](ctx, dbConn,
|
||||||
if config.Config.Discord.StreamsChannelID != "" {
|
`
|
||||||
err = discord.SendMessages(ctx, dbConn, discord.MessageToSend{
|
SELECT $columns
|
||||||
ChannelID: config.Config.Discord.StreamsChannelID,
|
FROM
|
||||||
Req: discord.CreateMessageRequest{
|
twitch_stream
|
||||||
Content: fmt.Sprintf("%s is live: https://twitch.tv/%s\n> %s", twitchLogin, twitchLogin, title),
|
ORDER BY started_at DESC
|
||||||
},
|
`,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return oops.New(err, "failed to fetch livestreams from db")
|
||||||
|
}
|
||||||
|
|
||||||
|
var streamDetails []hmndata.StreamDetails
|
||||||
|
for _, s := range streams {
|
||||||
|
streamDetails = append(streamDetails, hmndata.StreamDetails{
|
||||||
|
Username: s.Login,
|
||||||
|
StartTime: s.StartedAt,
|
||||||
|
Title: s.Title,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
return err
|
|
||||||
|
err = discord.UpdateStreamers(ctx, dbConn, streamDetails)
|
||||||
|
if err != nil {
|
||||||
|
return oops.New(err, "failed to update discord with livestream info")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func processEventSubNotification(ctx context.Context, dbConn db.ConnOrTx, notification *twitchNotification) {
|
func processEventSubNotification(ctx context.Context, dbConn db.ConnOrTx, notification *twitchNotification) {
|
||||||
|
@ -421,41 +442,25 @@ func processEventSubNotification(ctx context.Context, dbConn db.ConnOrTx, notifi
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debug().Interface("Status", status).Msg("Updating status")
|
log.Debug().Interface("Status", status).Msg("Updating status")
|
||||||
inserted, err := updateStreamStatusInDB(ctx, dbConn, &status)
|
err = updateStreamStatusInDB(ctx, dbConn, &status)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error().Err(err).Msg("failed to update twitch stream status")
|
log.Error().Err(err).Msg("failed to update twitch stream status")
|
||||||
}
|
}
|
||||||
if inserted {
|
|
||||||
log.Debug().Msg("Notifying discord")
|
log.Debug().Msg("Notifying discord")
|
||||||
err = notifyDiscordOfLiveStream(ctx, dbConn, status.TwitchLogin, status.Title)
|
err = notifyDiscordOfLiveStream(ctx, dbConn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error().Err(err).Msg("failed to notify discord")
|
log.Error().Err(err).Msg("failed to notify discord")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
func updateStreamStatusInDB(ctx context.Context, conn db.ConnOrTx, status *streamStatus) (bool, error) {
|
func updateStreamStatusInDB(ctx context.Context, conn db.ConnOrTx, status *streamStatus) error {
|
||||||
log := logging.ExtractLogger(ctx)
|
log := logging.ExtractLogger(ctx)
|
||||||
inserted := false
|
|
||||||
if isStatusRelevant(status) {
|
if isStatusRelevant(status) {
|
||||||
log.Debug().Msg("Status relevant")
|
log.Debug().Msg("Status relevant")
|
||||||
_, err := db.QueryOne[models.TwitchStream](ctx, conn,
|
_, err := conn.Exec(ctx,
|
||||||
`
|
`
|
||||||
SELECT $columns
|
INSERT INTO twitch_stream (twitch_id, twitch_login, title, started_at)
|
||||||
FROM twitch_streams
|
|
||||||
WHERE twitch_id = $1
|
|
||||||
`,
|
|
||||||
status.TwitchID,
|
|
||||||
)
|
|
||||||
if err == db.NotFound {
|
|
||||||
log.Debug().Msg("Inserting new stream")
|
|
||||||
inserted = true
|
|
||||||
} else if err != nil {
|
|
||||||
return false, oops.New(err, "failed to query existing stream")
|
|
||||||
}
|
|
||||||
_, err = conn.Exec(ctx,
|
|
||||||
`
|
|
||||||
INSERT INTO twitch_streams (twitch_id, twitch_login, title, started_at)
|
|
||||||
VALUES ($1, $2, $3, $4)
|
VALUES ($1, $2, $3, $4)
|
||||||
ON CONFLICT (twitch_id) DO UPDATE SET
|
ON CONFLICT (twitch_id) DO UPDATE SET
|
||||||
title = EXCLUDED.title,
|
title = EXCLUDED.title,
|
||||||
|
@ -467,21 +472,21 @@ func updateStreamStatusInDB(ctx context.Context, conn db.ConnOrTx, status *strea
|
||||||
status.StartedAt,
|
status.StartedAt,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, oops.New(err, "failed to insert twitch streamer into db")
|
return oops.New(err, "failed to insert twitch streamer into db")
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
log.Debug().Msg("Stream not relevant")
|
log.Debug().Msg("Stream not relevant")
|
||||||
_, err := conn.Exec(ctx,
|
_, err := conn.Exec(ctx,
|
||||||
`
|
`
|
||||||
DELETE FROM twitch_streams WHERE twitch_id = $1
|
DELETE FROM twitch_stream WHERE twitch_id = $1
|
||||||
`,
|
`,
|
||||||
status.TwitchID,
|
status.TwitchID,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, oops.New(err, "failed to remove twitch streamer from db")
|
return oops.New(err, "failed to remove twitch streamer from db")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return inserted, nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var RelevantCategories = []string{
|
var RelevantCategories = []string{
|
||||||
|
|
Loading…
Reference in New Issue