Compare commits

...

10 Commits

Author SHA1 Message Date
Asaf Gartner 59b2573cbe Twitch should work now hopefully. 2022-10-19 11:57:34 +03:00
Asaf Gartner 7d2a906f19 More twitch logs 2022-10-16 00:30:08 +03:00
Asaf Gartner 3bd3d309dd SQL fix 2022-10-14 07:36:31 +03:00
Asaf Gartner 408e7cb701 Trying some debugging 2022-10-14 07:04:00 +03:00
Asaf Gartner 0b62490a71 Trying another fix 2022-10-14 06:37:50 +03:00
Asaf Gartner f88861a50d Better time truncation for stream notifications. 2022-10-13 19:58:28 +03:00
Asaf Gartner de9de2b9ac Fixed twitch history query 2022-10-13 19:55:51 +03:00
Asaf Gartner 420ec3e55d Twitch fixes 2022-10-13 19:38:46 +03:00
Asaf Gartner 66f8c10ca9 Added support for db arrays and some twitch fixes. 2022-10-13 00:40:37 +03:00
Asaf Gartner 6cfc5d25a5 Better twitch tracking 2022-10-12 21:43:19 +03:00
9 changed files with 1159 additions and 206 deletions

View File

@ -618,6 +618,12 @@ func setValueFromDB(dest reflect.Value, value reflect.Value) {
dest.SetInt(value.Int())
case reflect.String:
dest.SetString(value.String())
case reflect.Slice:
switch v := value.Interface().(type) {
case pgtype.Value:
v.AssignTo(dest.Interface())
default:
}
// TODO(ben): More kinds? All the kinds? It kind of feels like we should be able to assign to any destination whose underlying type is a primitive.
default:
dest.Set(value)

View File

@ -5,24 +5,28 @@ import (
"encoding/json"
"fmt"
"strings"
"time"
"git.handmade.network/hmn/hmn/src/config"
"git.handmade.network/hmn/hmn/src/db"
"git.handmade.network/hmn/hmn/src/hmndata"
"git.handmade.network/hmn/hmn/src/logging"
"git.handmade.network/hmn/hmn/src/models"
"git.handmade.network/hmn/hmn/src/oops"
)
// NOTE(asaf): Updates or creates a discord message according to the following rules:
// Create when:
// * No previous message exists
// * We have non-zero live streamers
// * Message exists, but we're adding a new streamer that wasn't in the existing message
// * Message exists, but is not the most recent message in the channel
// Update otherwise
// That way we ensure that the message doesn't get scrolled offscreen, and the
// new message indicator for the channel doesn't trigger when a streamer goes offline or
// updates the stream title.
//
// Create when:
// * No previous message exists
// * We have non-zero live streamers
// * Message exists, but we're adding a new streamer that wasn't in the existing message
// * Message exists, but is not the most recent message in the channel
// Update otherwise
// That way we ensure that the message doesn't get scrolled offscreen, and the
// new message indicator for the channel doesn't trigger when a streamer goes offline or
// updates the stream title.
//
// NOTE(asaf): No-op if StreamsChannelID is not specified in the config
func UpdateStreamers(ctx context.Context, dbConn db.ConnOrTx, streamers []hmndata.StreamDetails) error {
if len(config.Config.Discord.StreamsChannelID) == 0 {
@ -44,6 +48,7 @@ func UpdateStreamers(ctx context.Context, dbConn db.ConnOrTx, streamers []hmndat
}
if editExisting {
// Make sure we have a message to edit
_, err := GetChannelMessage(ctx, config.Config.Discord.StreamsChannelID, livestreamMessage.MessageID)
if err != nil {
if err == NotFound {
@ -54,88 +59,146 @@ func UpdateStreamers(ctx context.Context, dbConn db.ConnOrTx, streamers []hmndat
}
}
if editExisting {
existingStreamers := livestreamMessage.Streamers
for _, s := range streamers {
found := false
for _, es := range existingStreamers {
if es.Username == s.Username {
found = true
if len(streamers) == 0 {
if livestreamMessage != nil {
err = DeleteMessage(ctx, config.Config.Discord.StreamsChannelID, livestreamMessage.MessageID)
if err != nil {
return oops.New(err, "failed to delete livestream message from discord")
}
err = hmndata.RemovePersistentVar(ctx, dbConn, hmndata.VarNameDiscordLivestreamMessage)
if err != nil {
return oops.New(err, "failed to clear discord persistent var")
}
}
} else {
if editExisting {
// Check if we have new streamers to add
existingStreamers := livestreamMessage.Streamers
for _, s := range streamers {
found := false
for _, es := range existingStreamers {
if es.Username == s.Username {
found = true
break
}
}
if !found {
editExisting = false
break
}
}
if !found {
}
if editExisting {
// Check that our editable message is the latest in the channel
messages, err := GetChannelMessages(ctx, config.Config.Discord.StreamsChannelID, GetChannelMessagesInput{
Limit: 1,
})
if err != nil {
return oops.New(err, "failed to fetch messages from discord")
}
if len(messages) == 0 || messages[0].ID != livestreamMessage.MessageID {
editExisting = false
break
}
}
}
if editExisting && len(streamers) > 0 {
messages, err := GetChannelMessages(ctx, config.Config.Discord.StreamsChannelID, GetChannelMessagesInput{
Limit: 1,
})
if err != nil {
return oops.New(err, "failed to fetch messages from discord")
}
if len(messages) == 0 || messages[0].ID != livestreamMessage.MessageID {
editExisting = false
}
}
messageContent := ""
if len(streamers) == 0 {
messageContent = "No one is currently streaming."
} else {
messageContent := ""
var builder strings.Builder
for _, s := range streamers {
builder.WriteString(fmt.Sprintf(":red_circle: **%s** is live: <https://twitch.tv/%s>\n> _%s_\nStarted <t:%d:R>\n\n", s.Username, s.Username, s.Title, s.StartTime.Unix()))
}
messageContent = builder.String()
msgJson, err := json.Marshal(CreateMessageRequest{
Content: messageContent,
Flags: FlagSuppressEmbeds,
AllowedMentions: &MessageAllowedMentions{},
})
if err != nil {
return oops.New(err, "failed to marshal discord message")
}
newMessageID := ""
if editExisting {
updatedMessage, err := EditMessage(ctx, config.Config.Discord.StreamsChannelID, livestreamMessage.MessageID, string(msgJson))
if err != nil {
return oops.New(err, "failed to update discord message for streams channel")
}
newMessageID = updatedMessage.ID
} else {
if livestreamMessage != nil {
err = DeleteMessage(ctx, config.Config.Discord.StreamsChannelID, livestreamMessage.MessageID)
if err != nil {
log := logging.ExtractLogger(ctx)
log.Error().Err(err).Msg("failed to delete existing discord message from streams channel")
}
}
sentMessage, err := CreateMessage(ctx, config.Config.Discord.StreamsChannelID, string(msgJson))
if err != nil {
return oops.New(err, "failed to create discord message for streams channel")
}
newMessageID = sentMessage.ID
}
data := hmndata.DiscordLivestreamMessage{
MessageID: newMessageID,
Streamers: streamers,
}
err = hmndata.StorePersistentVar(ctx, dbConn, hmndata.VarNameDiscordLivestreamMessage, &data)
if err != nil {
return oops.New(err, "failed to store persistent var for discord streams")
}
}
return nil
}
func PostStreamHistory(ctx context.Context, history *models.TwitchStreamHistory) (string, error) {
if len(config.Config.Discord.StreamsChannelID) == 0 {
return "", nil
}
approximated := ""
if history.EndApproximated {
approximated = "about "
}
duration := history.EndedAt.Sub(history.StartedAt).Truncate(time.Second).String()
messageContent := fmt.Sprintf(
":o: **%s** was live: https://twitch.tv/%s\n> _%s_\nOn <t:%d:F> for %s%s",
history.TwitchLogin,
history.TwitchLogin,
history.Title,
history.StartedAt.Unix(),
approximated,
duration,
)
if history.VODUrl != "" {
messageContent += fmt.Sprintf("\nVOD: %s", history.VODUrl)
}
msgJson, err := json.Marshal(CreateMessageRequest{
Content: messageContent,
Flags: FlagSuppressEmbeds,
AllowedMentions: &MessageAllowedMentions{},
})
if err != nil {
return oops.New(err, "failed to marshal discord message")
return "", oops.New(err, "failed to marshal discord message")
}
newMessageID := ""
if editExisting {
updatedMessage, err := EditMessage(ctx, config.Config.Discord.StreamsChannelID, livestreamMessage.MessageID, string(msgJson))
messageID := ""
if history.DiscordMessageID != "" {
updatedMessage, err := EditMessage(ctx, config.Config.Discord.StreamsChannelID, history.DiscordMessageID, string(msgJson))
if err != nil {
return oops.New(err, "failed to update discord message for streams channel")
return "", oops.New(err, "failed to update discord message for stream history")
}
newMessageID = updatedMessage.ID
messageID = updatedMessage.ID
} else {
if livestreamMessage != nil {
err = DeleteMessage(ctx, config.Config.Discord.StreamsChannelID, livestreamMessage.MessageID)
if err != nil {
log := logging.ExtractLogger(ctx)
log.Error().Err(err).Msg("failed to delete existing discord message from streams channel")
}
}
sentMessage, err := CreateMessage(ctx, config.Config.Discord.StreamsChannelID, string(msgJson))
msg, err := CreateMessage(ctx, config.Config.Discord.StreamsChannelID, string(msgJson))
if err != nil {
return oops.New(err, "failed to create discord message for streams channel")
return "", oops.New(err, "failed to create discord message for stream history")
}
newMessageID = sentMessage.ID
messageID = msg.ID
}
data := hmndata.DiscordLivestreamMessage{
MessageID: newMessageID,
Streamers: streamers,
}
err = hmndata.StorePersistentVar(ctx, dbConn, hmndata.VarNameDiscordLivestreamMessage, &data)
if err != nil {
return oops.New(err, "failed to store persistent var for discord streams")
}
return nil
return messageID, nil
}

View File

@ -84,3 +84,17 @@ func StorePersistentVar[T any](
return nil
}
func RemovePersistentVar(ctx context.Context, dbConn db.ConnOrTx, name PersistentVarName) error {
_, err := dbConn.Exec(ctx,
`
DELETE FROM persistent_var
WHERE name = $1
`,
name,
)
if err != nil {
return oops.New(err, "failed to delete var")
}
return nil
}

View File

@ -0,0 +1,85 @@
package migrations
import (
"context"
"time"
"git.handmade.network/hmn/hmn/src/migration/types"
"github.com/jackc/pgx/v4"
)
func init() {
registerMigration(NewTwitchTracking{})
}
type NewTwitchTracking struct{}
func (m NewTwitchTracking) Version() types.MigrationVersion {
return types.MigrationVersion(time.Date(2022, 9, 14, 11, 46, 45, 0, time.UTC))
}
func (m NewTwitchTracking) Name() string {
return "NewTwitchTracking"
}
func (m NewTwitchTracking) Description() string {
return "New table for twitch tracking"
}
func (m NewTwitchTracking) Up(ctx context.Context, tx pgx.Tx) error {
_, err := tx.Exec(ctx,
`
CREATE TABLE twitch_stream_history (
stream_id VARCHAR(255) NOT NULL PRIMARY KEY,
twitch_id VARCHAR(255) NOT NULL,
twitch_login VARCHAR(255) NOT NULL,
started_at TIMESTAMP WITH TIME ZONE NOT NULL,
ended_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT 'epoch',
end_approximated BOOLEAN NOT NULL DEFAULT FALSE,
title VARCHAR(255) NOT NULL DEFAULT '',
category_id VARCHAR(255) NOT NULL DEFAULT '',
tags VARCHAR(255) ARRAY NOT NULL DEFAULT '{}',
discord_message_id VARCHAR(255) NOT NULL DEFAULT '',
discord_needs_update BOOLEAN NOT NULL DEFAULT FALSE,
vod_id VARCHAR(255) NOT NULL DEFAULT '',
vod_url VARCHAR(512) NOT NULL DEFAULT '',
vod_thumbnail VARCHAR(512) NOT NULL DEFAULT '',
last_verified_vod TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT 'epoch',
vod_gone BOOLEAN NOT NULL DEFAULT FALSE
);
CREATE TABLE twitch_latest_status (
twitch_id VARCHAR(255) NOT NULL PRIMARY KEY,
twitch_login VARCHAR(255) NOT NULL,
stream_id VARCHAR(255) NOT NULL DEFAULT '',
live BOOLEAN NOT NULL DEFAULT FALSE,
started_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT 'epoch',
title VARCHAR(255) NOT NULL DEFAULT '',
category_id VARCHAR(255) NOT NULL DEFAULT '',
tags VARCHAR(255) ARRAY NOT NULL DEFAULT '{}',
last_hook_live_update TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT 'epoch',
last_hook_channel_update TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT 'epoch',
last_rest_update TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT 'epoch'
);
DROP TABLE twitch_stream;
`,
)
return err
}
func (m NewTwitchTracking) Down(ctx context.Context, tx pgx.Tx) error {
_, err := tx.Exec(ctx,
`
DROP TABLE twitch_stream_history;
DROP TABLE twitch_latest_status;
CREATE TABLE twitch_stream (
twitch_id VARCHAR(255) NOT NULL,
twitch_login VARCHAR(255) NOT NULL,
title VARCHAR(255) NOT NULL,
started_at TIMESTAMP WITH TIME ZONE
);
`,
)
return err
}

View File

@ -0,0 +1,57 @@
package migrations
import (
"context"
"time"
"git.handmade.network/hmn/hmn/src/migration/types"
"github.com/jackc/pgx/v4"
)
func init() {
registerMigration(AddTwitchEnded{})
}
type AddTwitchEnded struct{}
func (m AddTwitchEnded) Version() types.MigrationVersion {
return types.MigrationVersion(time.Date(2022, 10, 18, 6, 28, 31, 0, time.UTC))
}
func (m AddTwitchEnded) Name() string {
return "AddTwitchEnded"
}
func (m AddTwitchEnded) Description() string {
return "Add stream_ended to twitch history"
}
func (m AddTwitchEnded) Up(ctx context.Context, tx pgx.Tx) error {
_, err := tx.Exec(ctx,
`
ALTER TABLE twitch_stream_history
ADD COLUMN stream_ended BOOLEAN NOT NULL DEFAULT FALSE;
`,
)
if err != nil {
return err
}
_, err = tx.Exec(ctx,
`
UPDATE twitch_stream_history
SET stream_ended = TRUE
WHERE ended_at > TIMESTAMP '2000-01-01 00:00:00';
`,
)
return err
}
func (m AddTwitchEnded) Down(ctx context.Context, tx pgx.Tx) error {
_, err := tx.Exec(ctx,
`
ALTER TABLE twitch_stream_history
DROP COLUMN stream_ended BOOLEAN NOT NULL DEFAULT FALSE;
`,
)
return err
}

View File

@ -2,16 +2,41 @@ package models
import "time"
type TwitchID struct {
ID string `db:"id"`
Login string `db:"login"`
type TwitchLatestStatus struct {
TwitchID string `db:"twitch_id"`
TwitchLogin string `db:"twitch_login"`
StreamID string `db:"stream_id"`
Live bool `db:"live"`
StartedAt time.Time `db:"started_at"`
Title string `db:"title"`
CategoryID string `db:"category_id"`
Tags []string `db:"tags"`
LastHookLiveUpdate time.Time `db:"last_hook_live_update"`
LastHookChannelUpdate time.Time `db:"last_hook_channel_update"`
LastRESTUpdate time.Time `db:"last_rest_update"`
}
type TwitchStream struct {
ID string `db:"twitch_id"`
Login string `db:"twitch_login"`
Title string `db:"title"`
StartedAt time.Time `db:"started_at"`
type TwitchStreamHistory struct {
StreamID string `db:"stream_id"`
TwitchID string `db:"twitch_id"`
TwitchLogin string `db:"twitch_login"`
StartedAt time.Time `db:"started_at"`
EndedAt time.Time `db:"ended_at"`
StreamEnded bool `db:"stream_ended"`
EndApproximated bool `db:"end_approximated"`
Title string `db:"title"`
CategoryID string `db:"category_id"`
Tags []string `db:"tags"`
DiscordMessageID string `db:"discord_message_id"`
DiscordNeedsUpdate bool `db:"discord_needs_update"`
VODID string `db:"vod_id"`
VODUrl string `db:"vod_url"`
VODThumbnail string `db:"vod_thumbnail"`
LastVerifiedVOD time.Time `db:"last_verified_vod"`
// NOTE(asaf): If we had a VOD for a while, and then it disappeared,
// assume it was removed from twitch and don't bother
// checking for it again.
VODGone bool `db:"vod_gone"`
}
type TwitchLogType int

View File

@ -28,7 +28,8 @@ var MaxRetries = errors.New("hit max retries")
var httpClient = &http.Client{}
// NOTE(asaf): Access token is not thread-safe right now.
// All twitch requests are made through the goroutine in MonitorTwitchSubscriptions.
//
// All twitch requests are made through the goroutine in MonitorTwitchSubscriptions.
var activeAccessToken string
var rateLimitReset time.Time
@ -87,16 +88,6 @@ func getTwitchUsersByLogin(ctx context.Context, logins []string) ([]twitchUser,
return result, nil
}
type streamStatus struct {
TwitchID string
TwitchLogin string
Live bool
Title string
StartedAt time.Time
Category string
Tags []string
}
func getStreamStatus(ctx context.Context, twitchIDs []string) ([]streamStatus, error) {
result := make([]streamStatus, 0, len(twitchIDs))
numChunks := len(twitchIDs)/100 + 1
@ -116,6 +107,7 @@ func getStreamStatus(ctx context.Context, twitchIDs []string) ([]streamStatus, e
}
type twitchStatus struct {
StreamID string `json:"id"`
TwitchID string `json:"user_id"`
TwitchLogin string `json:"user_login"`
GameID string `json:"game_id"`
@ -150,12 +142,13 @@ func getStreamStatus(ctx context.Context, twitchIDs []string) ([]streamStatus, e
started = time.Now()
}
status := streamStatus{
StreamID: d.StreamID,
TwitchID: d.TwitchID,
TwitchLogin: d.TwitchLogin,
Live: d.Type == "live",
Title: d.Title,
StartedAt: started,
Category: d.GameID,
CategoryID: d.GameID,
Tags: d.Tags,
}
result = append(result, status)
@ -165,6 +158,115 @@ func getStreamStatus(ctx context.Context, twitchIDs []string) ([]streamStatus, e
return result, nil
}
type archivedVideo struct {
ID string
StreamID string
TwitchID string
TwitchLogin string
Title string
Description string
CreatedAt time.Time
Duration time.Duration
VODUrl string
VODThumbnail string
RawDuration string
RawCreatedAt string
}
func getArchivedVideosForUser(ctx context.Context, twitchID string, numVODs int) ([]archivedVideo, error) {
query := url.Values{}
query.Add("user_id", twitchID)
query.Add("type", "archive")
query.Add("first", strconv.Itoa(numVODs))
return getArchivedVideosByQuery(ctx, query)
}
func getArchivedVideos(ctx context.Context, videoIDs []string) ([]archivedVideo, error) {
query := url.Values{}
for _, vid := range videoIDs {
query.Add("id", vid)
}
return getArchivedVideosByQuery(ctx, query)
}
func getArchivedVideosByQuery(ctx context.Context, query url.Values) ([]archivedVideo, error) {
req, err := http.NewRequestWithContext(ctx, "GET", buildUrl("/videos", query.Encode()), nil)
if err != nil {
return nil, oops.New(err, "failed to create request")
}
res, err := doRequest(ctx, true, req)
if err != nil {
return nil, oops.New(err, "failed to fetch archived videos for user")
}
type twitchVideo struct {
ID string `json:"id"`
StreamID string `json:"stream_id"`
UserID string `json:"user_id"`
UserLogin string `json:"user_login"`
UserName string `json:"user_name"`
Title string `json:"title"`
Description string `json:"description"`
CreatedAt string `json:"created_at"`
PublishedAt string `json:"published_at"`
Url string `json:"url"`
ThumbnailUrl string `json:"thumbnail_url"`
Viewable string `json:"viewable"`
ViewCount int `json:"view_count"`
Language string `json:"language"`
Type string `json:"type"`
Duration string `json:"duration"`
}
type twitchResponse struct {
Data []twitchVideo `json:"data"`
}
body, err := io.ReadAll(res.Body)
res.Body.Close()
if err != nil {
return nil, oops.New(err, "failed to read response body while processing archived videos")
}
log := logging.ExtractLogger(ctx)
log.Debug().Str("getArchivedVideosForUser response", string(body)).Msg("Got getArchivedVideosForUser response")
var resp twitchResponse
err = json.Unmarshal(body, &resp)
if err != nil {
return nil, oops.New(err, "failed to parse twitch response while processing archived videos")
}
var result []archivedVideo
for _, v := range resp.Data {
createdAt, err := time.Parse(time.RFC3339, v.CreatedAt)
if err != nil {
logging.ExtractLogger(ctx).Warn().Str("Time string", v.CreatedAt).Msg("Failed to parse twitch timestamp")
createdAt = time.Time{}
}
duration, err := time.ParseDuration(v.Duration)
if err != nil {
duration = 0
}
archived := archivedVideo{
ID: v.ID,
StreamID: v.StreamID,
TwitchID: v.UserID,
TwitchLogin: v.UserLogin,
Title: v.Title,
Description: v.Description,
CreatedAt: createdAt,
Duration: duration,
VODUrl: v.Url,
VODThumbnail: v.ThumbnailUrl,
RawDuration: v.Duration,
RawCreatedAt: v.CreatedAt,
}
result = append(result, archived)
}
return result, nil
}
type twitchEventSub struct {
EventID string
TwitchID string
@ -373,7 +475,7 @@ func doRequest(ctx context.Context, waitOnRateLimit bool, req *http.Request) (*h
if err != nil {
return nil, oops.New(err, "failed to read response body")
}
logging.ExtractLogger(ctx).Warn().Interface("Headers", res.Header).Int("Status code", res.StatusCode).Str("Body", string(body[:])).Msg("Unexpected status code from twitch")
logging.ExtractLogger(ctx).Error().Interface("Headers", res.Header).Int("Status code", res.StatusCode).Str("Body", string(body[:])).Msg("Unexpected status code from twitch")
res.Body.Close()
return res, oops.New(nil, "got an unexpected status code from twitch")
}

View File

@ -19,6 +19,45 @@ import (
"github.com/jackc/pgx/v4/pgxpool"
)
// NOTE(asaf): The twitch api madness:
//
// | stream.online | stream.offline | channel.update[3] | REST[1][2]
// id[4] | YES | NO | NO | YES
// twitch_id | YES | YES | YES | YES
// twitch_login | YES | YES | YES | YES
// is_live | YES | IMPLICIT | NO | YES
// started_at | YES | NO | NO | YES
// title | NO | NO | YES | YES
// cat_id | NO | NO | YES | YES
// tags | NO | NO | NO | YES
//
// [1] REST returns nothing when user is not live
// [2] Information received from REST is ~3 minutes old.
// [3] channel.update also fires when the user changes their twitch channel settings when they're not live (i.e. as soon as they update it in twitch settings)
// [4] ID of the current livestream
type streamStatus struct {
StreamID string
TwitchID string
TwitchLogin string
Live bool
Title string
StartedAt time.Time
CategoryID string
Tags []string
}
type twitchNotificationType int
const (
notificationTypeNone twitchNotificationType = 0
notificationTypeOnline = 1
notificationTypeOffline = 2
notificationTypeChannelUpdate = 3
notificationTypeRevocation = 4
)
type twitchNotification struct {
Status streamStatus
Type twitchNotificationType
@ -75,10 +114,10 @@ func MonitorTwitchSubscriptions(ctx context.Context, dbConn *pgxpool.Pool) jobs.
log.Error().Err(err).Msg("Failed to fetch refresh token on start")
return true, nil
}
syncWithTwitch(ctx, dbConn, true)
syncWithTwitch(ctx, dbConn, true, true)
case <-monitorTicker.C:
twitchLogClear(ctx, dbConn)
syncWithTwitch(ctx, dbConn, true)
syncWithTwitch(ctx, dbConn, true, true)
case <-linksChangedChannel:
// NOTE(asaf): Since we update links inside transactions for users/projects
// we won't see the updated list of links until the transaction is committed.
@ -87,13 +126,13 @@ func MonitorTwitchSubscriptions(ctx context.Context, dbConn *pgxpool.Pool) jobs.
var timer *time.Timer
t := time.AfterFunc(5*time.Second, func() {
expiredTimers <- timer
syncWithTwitch(ctx, dbConn, false)
syncWithTwitch(ctx, dbConn, false, false)
})
timer = t
timers = append(timers, t)
case notification := <-twitchNotificationChannel:
if notification.Type == notificationTypeRevocation {
syncWithTwitch(ctx, dbConn, false)
syncWithTwitch(ctx, dbConn, false, false)
} else {
// NOTE(asaf): The twitch API (getStreamStatus) lags behind the notification and
// would return old data if we called it immediately, so we process
@ -123,17 +162,6 @@ func MonitorTwitchSubscriptions(ctx context.Context, dbConn *pgxpool.Pool) jobs.
return job
}
type twitchNotificationType int
const (
notificationTypeNone twitchNotificationType = 0
notificationTypeOnline = 1
notificationTypeOffline = 2
notificationTypeChannelUpdate = 3
notificationTypeRevocation = 4
)
func QueueTwitchNotification(ctx context.Context, conn db.ConnOrTx, messageType string, body []byte) error {
var notification twitchNotification
if messageType == "notification" {
@ -142,6 +170,7 @@ func QueueTwitchNotification(ctx context.Context, conn db.ConnOrTx, messageType
Type string `json:"type"`
} `json:"subscription"`
Event struct {
StreamID string `json:"id"`
BroadcasterUserID string `json:"broadcaster_user_id"`
BroadcasterUserLogin string `json:"broadcaster_user_login"`
Title string `json:"title"`
@ -159,12 +188,13 @@ func QueueTwitchNotification(ctx context.Context, conn db.ConnOrTx, messageType
notification.Status.TwitchID = incoming.Event.BroadcasterUserID
notification.Status.TwitchLogin = incoming.Event.BroadcasterUserLogin
notification.Status.Title = incoming.Event.Title
notification.Status.Category = incoming.Event.CategoryID
notification.Status.CategoryID = incoming.Event.CategoryID
notification.Status.StartedAt = time.Now()
switch incoming.Subscription.Type {
case "stream.online":
notification.Type = notificationTypeOnline
notification.Status.Live = true
notification.Status.StreamID = incoming.Event.StreamID
case "stream.offline":
notification.Type = notificationTypeOffline
case "channel.update":
@ -206,7 +236,7 @@ func UserOrProjectLinksUpdated(twitchLoginsPreChange, twitchLoginsPostChange []s
}
}
func syncWithTwitch(ctx context.Context, dbConn *pgxpool.Pool, updateAll bool) {
func syncWithTwitch(ctx context.Context, dbConn *pgxpool.Pool, updateAll bool, updateVODs bool) {
log := logging.ExtractLogger(ctx)
log.Info().Msg("Running twitch sync")
p := perf.MakeNewRequestPerf("Background job", "", "syncWithTwitch")
@ -237,13 +267,17 @@ func syncWithTwitch(ctx context.Context, dbConn *pgxpool.Pool, updateAll bool) {
streamerMap[streamer.TwitchLogin] = &streamers[idx]
}
p.StartBlock("TwitchAPI", "Fetch twitch user info")
twitchUsers, err := getTwitchUsersByLogin(ctx, needID)
if err != nil {
log.Error().Err(err).Msg("Error while monitoring twitch")
return
twitchUsers := []twitchUser{}
if len(needID) > 0 {
p.StartBlock("TwitchAPI", "Fetch twitch user info")
log.Debug().Interface("needID", needID).Msg("IDs")
twitchUsers, err = getTwitchUsersByLogin(ctx, needID)
if err != nil {
log.Error().Err(err).Msg("Error while monitoring twitch")
return
}
p.EndBlock()
}
p.EndBlock()
for _, tu := range twitchUsers {
streamerMap[tu.TwitchLogin].TwitchID = tu.TwitchID
@ -264,12 +298,6 @@ func syncWithTwitch(ctx context.Context, dbConn *pgxpool.Pool, updateAll bool) {
}
p.EndBlock()
const (
EventSubNone = 0 // No event of this type found
EventSubRefresh = 1 // Event found, but bad status. Need to unsubscribe and resubscribe.
EventSubGood = 2 // All is well.
)
type isSubbedByType map[string]bool
streamerEventSubs := make(map[string]isSubbedByType)
@ -345,7 +373,7 @@ func syncWithTwitch(ctx context.Context, dbConn *pgxpool.Pool, updateAll bool) {
}
p.StartBlock("SQL", "Remove untracked streamers")
_, err = tx.Exec(ctx,
`DELETE FROM twitch_stream WHERE twitch_id != ANY($1)`,
`DELETE FROM twitch_latest_status WHERE NOT (twitch_id = ANY($1))`,
allIDs,
)
if err != nil {
@ -370,24 +398,45 @@ func syncWithTwitch(ctx context.Context, dbConn *pgxpool.Pool, updateAll bool) {
}
}
p.StartBlock("TwitchAPI", "Fetch twitch stream statuses")
statuses, err := getStreamStatus(ctx, usersToUpdate)
if err != nil {
log.Error().Err(err).Msg("failed to fetch stream statuses")
return
}
twitchLog(ctx, tx, models.TwitchLogTypeOther, "", "Batch resync", fmt.Sprintf("%#v", statuses))
p.EndBlock()
p.StartBlock("SQL", "Update stream statuses in db")
for _, status := range statuses {
log.Debug().Interface("Status", status).Msg("Got streamer")
twitchLog(ctx, tx, models.TwitchLogTypeREST, status.TwitchLogin, "Resync", fmt.Sprintf("%#v", status))
err = updateStreamStatusInDB(ctx, tx, &status)
if len(usersToUpdate) > 0 {
p.StartBlock("TwitchAPI", "Fetch twitch stream statuses")
statuses, err := getStreamStatus(ctx, usersToUpdate)
if err != nil {
log.Error().Err(err).Msg("failed to update twitch stream status")
log.Error().Err(err).Msg("failed to fetch stream statuses")
return
}
twitchLog(ctx, tx, models.TwitchLogTypeOther, "", "Batch resync", fmt.Sprintf("%#v", statuses))
p.EndBlock()
p.StartBlock("SQL", "Update stream statuses in db")
for _, twitchId := range usersToUpdate {
var status *streamStatus
for idx, st := range statuses {
if st.TwitchID == twitchId {
status = &statuses[idx]
break
}
}
if status == nil {
twitchLogin := ""
for _, streamer := range validStreamers {
if streamer.TwitchID == twitchId {
twitchLogin = streamer.TwitchLogin
break
}
}
status = &streamStatus{
TwitchID: twitchId,
TwitchLogin: twitchLogin,
}
}
twitchLog(ctx, tx, models.TwitchLogTypeREST, status.TwitchLogin, "Resync", fmt.Sprintf("%#v", status))
err = gotRESTUpdate(ctx, tx, status)
if err != nil {
log.Error().Err(err).Msg("failed to update twitch stream status")
}
}
p.EndBlock()
}
p.EndBlock()
err = tx.Commit(ctx)
if err != nil {
log.Error().Err(err).Msg("failed to commit transaction")
@ -395,6 +444,17 @@ func syncWithTwitch(ctx context.Context, dbConn *pgxpool.Pool, updateAll bool) {
stats.NumStreamsChecked += len(usersToUpdate)
log.Info().Interface("Stats", stats).Msg("Twitch sync done")
if updateVODs {
err = findMissingVODs(ctx, dbConn)
if err != nil {
log.Error().Err(err).Msg("failed to find missing twitch vods")
}
err = verifyHistoryVODs(ctx, dbConn)
if err != nil {
log.Error().Err(err).Msg("failed to verify twitch vods")
}
}
log.Debug().Msg("Notifying discord")
err = notifyDiscordOfLiveStream(ctx, dbConn)
if err != nil {
@ -403,11 +463,56 @@ func syncWithTwitch(ctx context.Context, dbConn *pgxpool.Pool, updateAll bool) {
}
func notifyDiscordOfLiveStream(ctx context.Context, dbConn db.ConnOrTx) error {
streams, err := db.Query[models.TwitchStream](ctx, dbConn,
history, err := db.Query[models.TwitchStreamHistory](ctx, dbConn,
`
SELECT $columns
FROM twitch_stream_history
WHERE discord_needs_update = TRUE
ORDER BY started_at ASC
`,
)
if err != nil {
return oops.New(err, "failed to fetch twitch history")
}
updatedHistories := make([]*models.TwitchStreamHistory, 0)
for _, h := range history {
relevant := isStreamRelevant(h.CategoryID, h.Tags)
if relevant && h.StreamEnded {
msgId, err := discord.PostStreamHistory(ctx, h)
if err != nil {
return oops.New(err, "failed to post twitch history to discord")
}
h.DiscordMessageID = msgId
}
h.DiscordNeedsUpdate = false
updatedHistories = append(updatedHistories, h)
}
for _, h := range updatedHistories {
_, err = dbConn.Exec(ctx,
`
UPDATE twitch_stream_history
SET
discord_needs_update = $2,
discord_message_id = $3
WHERE stream_id = $1
`,
h.StreamID,
h.DiscordNeedsUpdate,
h.DiscordMessageID,
)
if err != nil {
return oops.New(err, "failed to update twitch history after posting to discord")
}
}
streams, err := db.Query[models.TwitchLatestStatus](ctx, dbConn,
`
SELECT $columns
FROM
twitch_stream
twitch_latest_status
WHERE live = TRUE
ORDER BY started_at ASC
`,
)
@ -417,11 +522,13 @@ func notifyDiscordOfLiveStream(ctx context.Context, dbConn db.ConnOrTx) error {
var streamDetails []hmndata.StreamDetails
for _, s := range streams {
streamDetails = append(streamDetails, hmndata.StreamDetails{
Username: s.Login,
StartTime: s.StartedAt,
Title: s.Title,
})
if isStreamRelevant(s.CategoryID, s.Tags) {
streamDetails = append(streamDetails, hmndata.StreamDetails{
Username: s.TwitchLogin,
StartTime: s.StartedAt,
Title: s.Title,
})
}
}
err = discord.UpdateStreamers(ctx, dbConn, streamDetails)
@ -454,8 +561,9 @@ func updateStreamStatus(ctx context.Context, dbConn db.ConnOrTx, twitchID string
}
status := streamStatus{
TwitchID: twitchID,
Live: false,
TwitchID: twitchID,
TwitchLogin: twitchLogin,
Live: false,
}
result, err := getStreamStatus(ctx, []string{twitchID})
@ -468,7 +576,7 @@ func updateStreamStatus(ctx context.Context, dbConn db.ConnOrTx, twitchID string
log.Debug().Interface("Got status", result[0]).Msg("Got streamer status from twitch")
status = result[0]
}
err = updateStreamStatusInDB(ctx, dbConn, &status)
err = gotRESTUpdate(ctx, dbConn, &status)
if err != nil {
log.Error().Err(err).Msg("failed to update twitch stream status")
return
@ -506,29 +614,21 @@ func processEventSubNotification(ctx context.Context, dbConn db.ConnOrTx, notifi
}
twitchLog(ctx, dbConn, models.TwitchLogTypeHook, notification.Status.TwitchLogin, "Processing hook", fmt.Sprintf("%#v", notification))
if notification.Type == notificationTypeOnline || notification.Type == notificationTypeOffline {
log.Debug().Interface("Status", notification.Status).Msg("Updating status")
err = updateStreamStatusInDB(ctx, dbConn, &notification.Status)
switch notification.Type {
case notificationTypeOnline:
err := gotStreamOnline(ctx, dbConn, &notification.Status)
if err != nil {
log.Error().Err(err).Msg("failed to update twitch stream status")
return
}
} else if notification.Type == notificationTypeChannelUpdate {
// NOTE(asaf): Channel updates can happen wether or not the streamer is live.
// So we just update the title if the user is live in our db, and we rely on the
// 3-minute delayed status update to verify live status and category/tag requirements.
_, err = dbConn.Exec(ctx,
`
UPDATE twitch_stream
SET title = $1
WHERE twitch_id = $2
`,
notification.Status.Title,
notification.Status.TwitchID,
)
case notificationTypeOffline:
err := gotStreamOffline(ctx, dbConn, &notification.Status)
if err != nil {
log.Error().Err(err).Msg("failed to update twitch stream status")
}
case notificationTypeChannelUpdate:
err := gotChannelUpdate(ctx, dbConn, &notification.Status)
if err != nil {
log.Error().Err(err).Msg("failed to update twitch stream status")
return
}
}
@ -539,40 +639,541 @@ func processEventSubNotification(ctx context.Context, dbConn db.ConnOrTx, notifi
}
}
func updateStreamStatusInDB(ctx context.Context, conn db.ConnOrTx, status *streamStatus) error {
log := logging.ExtractLogger(ctx)
if isStatusRelevant(status) {
twitchLog(ctx, conn, models.TwitchLogTypeOther, status.TwitchLogin, "Marking online", fmt.Sprintf("%#v", status))
log.Debug().Msg("Status relevant")
_, err := conn.Exec(ctx,
`
INSERT INTO twitch_stream (twitch_id, twitch_login, title, started_at)
VALUES ($1, $2, $3, $4)
ON CONFLICT (twitch_id) DO UPDATE SET
title = EXCLUDED.title,
started_at = EXCLUDED.started_at
`,
status.TwitchID,
status.TwitchLogin,
status.Title,
status.StartedAt,
)
func gotStreamOnline(ctx context.Context, conn db.ConnOrTx, status *streamStatus) error {
latest, err := fetchLatestStreamStatus(ctx, conn, status.TwitchID, status.TwitchLogin)
if err != nil {
return err
}
twitchLog(ctx, conn, models.TwitchLogTypeOther, status.TwitchLogin, "GotStreamOnline", fmt.Sprintf("latest: %#v\nstatus: %#v", latest, status))
if latest.Live && latest.StreamID != status.StreamID {
// NOTE(asaf): Update history for previous stream
twitchLog(ctx, conn, models.TwitchLogTypeOther, status.TwitchLogin, "GotStreamOnline", fmt.Sprintf("Wrapping up previous stream"))
latest.Live = false
err = updateStreamHistory(ctx, conn, latest)
if err != nil {
return oops.New(err, "failed to insert twitch streamer into db")
}
} else {
log.Debug().Msg("Stream not relevant")
twitchLog(ctx, conn, models.TwitchLogTypeOther, status.TwitchLogin, "Marking offline", fmt.Sprintf("%#v", status))
_, err := conn.Exec(ctx,
`
DELETE FROM twitch_stream WHERE twitch_id = $1
`,
status.TwitchID,
)
if err != nil {
return oops.New(err, "failed to remove twitch streamer from db")
return err
}
}
latest.Live = true
latest.StreamID = status.StreamID
latest.StartedAt = status.StartedAt
latest.LastHookLiveUpdate = time.Now()
err = saveLatestStreamStatus(ctx, conn, latest)
if err != nil {
return err
}
err = updateStreamHistory(ctx, conn, latest)
if err != nil {
return err
}
return nil
}
func gotStreamOffline(ctx context.Context, conn db.ConnOrTx, status *streamStatus) error {
latest, err := fetchLatestStreamStatus(ctx, conn, status.TwitchID, status.TwitchLogin)
if err != nil {
return err
}
twitchLog(ctx, conn, models.TwitchLogTypeOther, status.TwitchLogin, "GotStreamOffline", fmt.Sprintf("latest: %#v\nstatus: %#v", latest, status))
latest.Live = false
latest.LastHookLiveUpdate = time.Now()
err = saveLatestStreamStatus(ctx, conn, latest)
if err != nil {
return err
}
err = updateStreamHistory(ctx, conn, latest)
if err != nil {
return err
}
return nil
}
func gotChannelUpdate(ctx context.Context, conn db.ConnOrTx, status *streamStatus) error {
latest, err := fetchLatestStreamStatus(ctx, conn, status.TwitchID, status.TwitchLogin)
if err != nil {
return err
}
twitchLog(ctx, conn, models.TwitchLogTypeOther, status.TwitchLogin, "GotChannelUpdate", fmt.Sprintf("latest: %#v\nstatus: %#v", latest, status))
if !latest.Live {
// NOTE(asaf): If the stream is live, this channel update applies
// to the current livestream. Otherwise, this will
// only apply to the next stream, so we clear out
// the stream info.
latest.StreamID = ""
latest.StartedAt = time.Time{}
}
latest.Title = status.Title
if latest.CategoryID != status.CategoryID {
latest.CategoryID = status.CategoryID
latest.Tags = []string{} // NOTE(asaf): We don't get tags here, but we can't assume they didn't change because some tags are automatic based on category
}
latest.LastHookChannelUpdate = time.Now()
err = saveLatestStreamStatus(ctx, conn, latest)
if err != nil {
return err
}
err = updateStreamHistory(ctx, conn, latest)
if err != nil {
return err
}
return nil
}
func gotRESTUpdate(ctx context.Context, conn db.ConnOrTx, status *streamStatus) error {
latest, err := fetchLatestStreamStatus(ctx, conn, status.TwitchID, status.TwitchLogin)
if err != nil {
return err
}
twitchLog(ctx, conn, models.TwitchLogTypeOther, status.TwitchLogin, "GotRestUpdate", fmt.Sprintf("latest: %#v\nstatus: %#v", latest, status))
if latest.LastHookLiveUpdate.Add(3 * time.Minute).Before(time.Now()) {
if status.Live {
if latest.Live && status.StreamID != latest.StreamID {
twitchLog(ctx, conn, models.TwitchLogTypeOther, status.TwitchLogin, "GotRestUpdate", fmt.Sprintf("Wrapping up previous stream"))
latest.Live = false
err = updateStreamHistory(ctx, conn, latest)
if err != nil {
return err
}
}
// NOTE(asaf): We don't get this information if the user isn't live
latest.StartedAt = status.StartedAt
latest.StreamID = status.StreamID
}
latest.Live = status.Live
}
if latest.LastHookChannelUpdate.Add(3 * time.Minute).Before(time.Now()) {
if status.Live {
// NOTE(asaf): We don't get this information if the user isn't live
latest.Title = status.Title
latest.CategoryID = status.CategoryID
latest.Tags = status.Tags
}
}
latest.LastRESTUpdate = time.Now()
err = saveLatestStreamStatus(ctx, conn, latest)
if err != nil {
return err
}
err = updateStreamHistory(ctx, conn, latest)
if err != nil {
return err
}
return nil
}
func fetchLatestStreamStatus(ctx context.Context, conn db.ConnOrTx, twitchID string, twitchLogin string) (*models.TwitchLatestStatus, error) {
tx, err := conn.Begin(ctx)
if err != nil {
return nil, oops.New(err, "failed to begin transaction for stream status fetch")
}
defer tx.Rollback(ctx)
result, err := db.QueryOne[models.TwitchLatestStatus](ctx, tx,
`
SELECT $columns
FROM twitch_latest_status
WHERE twitch_id = $1
`,
twitchID,
)
if err == db.NotFound {
twitchLog(ctx, tx, models.TwitchLogTypeOther, twitchLogin, "Creating new streamer", fmt.Sprintf("twitchID: %s", twitchID))
_, err = tx.Exec(ctx,
`
INSERT INTO twitch_latest_status (twitch_id, twitch_login)
VALUES ($1, $2)
`,
twitchID,
twitchLogin,
)
if err != nil {
return nil, err
}
result = &models.TwitchLatestStatus{
TwitchID: twitchID,
TwitchLogin: twitchLogin,
}
} else if err != nil {
return nil, oops.New(err, "failed to fetch existing twitch status")
}
if result.TwitchLogin != twitchLogin {
// NOTE(asaf): If someone changed their twitch login we should
// still reuse their db record by twitch_id.
_, err = tx.Exec(ctx,
`
UPDATE twitch_latest_status
SET twitch_login = $2
WHERE twitch_id = $1
`,
twitchID,
twitchLogin,
)
if err != nil {
return nil, oops.New(err, "failed to update twitch login")
}
result.TwitchLogin = twitchLogin
}
err = tx.Commit(ctx)
if err != nil {
return nil, oops.New(err, "failed to commit transaction")
}
return result, nil
}
func saveLatestStreamStatus(ctx context.Context, conn db.ConnOrTx, latest *models.TwitchLatestStatus) error {
tx, err := conn.Begin(ctx)
if err != nil {
return oops.New(err, "failed to start transaction for stream status update")
}
defer tx.Rollback(ctx)
// NOTE(asaf): Ensure that we have a record for it in the db
_, err = fetchLatestStreamStatus(ctx, tx, latest.TwitchID, latest.TwitchLogin)
if err != nil {
return err
}
if latest.Tags == nil {
latest.Tags = make([]string, 0)
}
_, err = tx.Exec(ctx,
`
UPDATE twitch_latest_status
SET
live = $2,
stream_id = $3,
started_at = $4,
title = $5,
category_id = $6,
tags = $7,
last_hook_live_update = $8,
last_hook_channel_update = $9,
last_rest_update = $10
WHERE
twitch_id = $1
`,
latest.TwitchID,
latest.Live,
latest.StreamID,
latest.StartedAt,
latest.Title,
latest.CategoryID,
latest.Tags,
latest.LastHookLiveUpdate,
latest.LastHookChannelUpdate,
latest.LastRESTUpdate,
)
if err != nil {
return oops.New(err, "failed to update twitch latest status")
}
err = tx.Commit(ctx)
if err != nil {
return oops.New(err, "failed to commit transaction")
}
return nil
}
func updateStreamHistory(ctx context.Context, dbConn db.ConnOrTx, status *models.TwitchLatestStatus) error {
if status.StreamID == "" {
twitchLog(ctx, dbConn, models.TwitchLogTypeOther, status.TwitchLogin, "updateStreamHistory", fmt.Sprintf("No StreamID - Skipping\nstatus: %#v", status))
return nil
}
tx, err := dbConn.Begin(ctx)
if err != nil {
return oops.New(err, "failed to begin transaction for stream history update")
}
defer tx.Rollback(ctx)
history, err := db.QueryOne[models.TwitchStreamHistory](ctx, tx,
`
SELECT $columns
FROM twitch_stream_history
WHERE stream_id = $1
`,
status.StreamID,
)
if err == db.NotFound {
twitchLog(ctx, dbConn, models.TwitchLogTypeOther, status.TwitchLogin, "updateStreamHistory", fmt.Sprintf("Creating new history\nstatus: %#v", status))
history = &models.TwitchStreamHistory{}
history.StreamID = status.StreamID
history.TwitchID = status.TwitchID
history.TwitchLogin = status.TwitchLogin
history.StartedAt = status.StartedAt
history.DiscordNeedsUpdate = true
} else if err != nil {
return oops.New(err, "failed to fetch existing stream history")
}
if !status.Live && !history.StreamEnded {
twitchLog(ctx, dbConn, models.TwitchLogTypeOther, status.TwitchLogin, "updateStreamHistory", fmt.Sprintf("Setting end time\nstatus: %#v", status))
history.EndedAt = time.Now()
history.StreamEnded = true
history.EndApproximated = true
history.DiscordNeedsUpdate = true
}
history.Title = status.Title
history.CategoryID = status.CategoryID
history.Tags = status.Tags
if history.Tags == nil {
history.Tags = make([]string, 0)
}
_, err = tx.Exec(ctx,
`
INSERT INTO
twitch_stream_history (stream_id, twitch_id, twitch_login, started_at, stream_ended, ended_at, end_approximated, title, category_id, tags, discord_needs_update)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
ON CONFLICT (stream_id) DO UPDATE SET
stream_ended = EXCLUDED.stream_ended,
ended_at = EXCLUDED.ended_at,
end_approximated = EXCLUDED.end_approximated,
title = EXCLUDED.title,
category_id = EXCLUDED.category_id,
tags = EXCLUDED.tags,
discord_needs_update = EXCLUDED.discord_needs_update
`,
history.StreamID,
history.TwitchID,
history.TwitchLogin,
history.StartedAt,
history.StreamEnded,
history.EndedAt,
history.EndApproximated,
history.Title,
history.CategoryID,
history.Tags,
history.DiscordNeedsUpdate,
)
if err != nil {
return oops.New(err, "failed to insert/update twitch history")
}
err = tx.Commit(ctx)
if err != nil {
return oops.New(err, "failed to commit transaction")
}
twitchLog(ctx, dbConn, models.TwitchLogTypeOther, status.TwitchLogin, "updateStreamHistory", fmt.Sprintf("Checking VOD\nhistory: %#v", history))
err = findHistoryVOD(ctx, dbConn, history)
if err != nil {
return oops.New(err, "failed to look up twitch vod")
}
return nil
}
func findHistoryVOD(ctx context.Context, dbConn db.ConnOrTx, history *models.TwitchStreamHistory) error {
if history.StreamID == "" || history.VODGone {
twitchLog(ctx, dbConn, models.TwitchLogTypeOther, history.TwitchLogin, "findHistoryVOD", fmt.Sprintf("Skipping VOD check\nhistory: %#v", history))
return nil
}
vods, err := getArchivedVideosForUser(ctx, history.TwitchID, 10)
twitchLog(ctx, dbConn, models.TwitchLogTypeOther, history.TwitchLogin, "findHistoryVOD", fmt.Sprintf("vods: %#v\nhistory: %#v", vods, history))
if err != nil {
return oops.New(err, "failed to fetch vods for streamer")
}
var vod *archivedVideo
for idx, v := range vods {
if v.StreamID == history.StreamID {
vod = &vods[idx]
}
}
if vod != nil {
history.VODID = vod.ID
history.VODUrl = vod.VODUrl
history.VODThumbnail = vod.VODThumbnail
history.LastVerifiedVOD = time.Now()
history.VODGone = false
history.DiscordNeedsUpdate = true
if history.StreamEnded && vod.Duration.Minutes() > 0 {
history.EndedAt = history.StartedAt.Add(vod.Duration)
history.EndApproximated = false
}
_, err = dbConn.Exec(ctx,
`
UPDATE twitch_stream_history
SET
vod_id = $2,
vod_url = $3,
vod_thumbnail = $4,
last_verified_vod = $5,
vod_gone = $6,
ended_at = $7,
end_approximated = $8,
discord_needs_update = $9
WHERE stream_id = $1
`,
history.StreamID,
history.VODID,
history.VODUrl,
history.VODThumbnail,
history.LastVerifiedVOD,
history.VODGone,
history.EndedAt,
history.EndApproximated,
history.DiscordNeedsUpdate,
)
if err != nil {
return oops.New(err, "failed to update stream history with VOD")
}
} else {
if history.StartedAt.Add(14 * 24 * time.Hour).Before(time.Now()) {
history.VODGone = true
_, err = dbConn.Exec(ctx, `
UPDATE twitch_stream_history
SET
vod_gone = $2,
last_verified_vod = $3
WHERE stream_id = $1
`,
history.StreamID,
history.VODGone,
time.Now(),
)
if err != nil {
return oops.New(err, "failed to update stream history")
}
} else {
_, err = dbConn.Exec(ctx, `
UPDATE twitch_stream_history
SET
last_verified_vod = $2
WHERE stream_id = $1
`,
history.StreamID,
time.Now(),
)
if err != nil {
return oops.New(err, "failed to update stream history")
}
}
}
return nil
}
func findMissingVODs(ctx context.Context, dbConn db.ConnOrTx) error {
histories, err := db.Query[models.TwitchStreamHistory](ctx, dbConn,
`
SELECT $columns
FROM twitch_stream_history
WHERE
vod_gone = FALSE AND
stream_ended = TRUE AND
(end_approximated = TRUE OR vod_id = '')
ORDER BY last_verified_vod ASC
LIMIT 100
`,
)
if err != nil {
return oops.New(err, "failed to fetch stream history for vod updates")
}
for _, history := range histories {
err = findHistoryVOD(ctx, dbConn, history)
if err != nil {
return err
}
}
return nil
}
func verifyHistoryVODs(ctx context.Context, dbConn db.ConnOrTx) error {
histories, err := db.Query[models.TwitchStreamHistory](ctx, dbConn,
`
SELECT $columns
FROM twitch_stream_history
WHERE
vod_gone = FALSE AND
vod_id != '' AND
last_verified_vod < $1
ORDER BY last_verified_vod ASC
LIMIT 100
`,
time.Now().Add(-24*time.Hour),
)
if err != nil {
return oops.New(err, "failed to fetch stream history for vod verification")
}
videoIDs := make([]string, 0, len(histories))
for _, h := range histories {
videoIDs = append(videoIDs, h.VODID)
}
if len(videoIDs) > 0 {
VODs, err := getArchivedVideos(ctx, videoIDs)
if err != nil {
return oops.New(err, "failed to fetch vods from twitch")
}
vodGone := make([]string, 0, len(histories))
vodFound := make([]string, 0, len(histories))
for _, h := range histories {
found := false
for _, vod := range VODs {
if h.VODID == vod.ID {
found = true
break
}
}
if !found {
vodGone = append(vodGone, h.VODID)
} else {
vodFound = append(vodFound, h.VODID)
}
}
if len(vodGone) > 0 {
_, err = dbConn.Exec(ctx,
`
UPDATE twitch_stream_history
SET
discord_needs_update = TRUE,
vod_id = '',
vod_url = '',
vod_thumbnail = '',
last_verified_vod = $2,
vod_gone = TRUE
WHERE
vod_id = ANY($1)
`,
vodGone,
time.Now(),
)
if err != nil {
return oops.New(err, "failed to update twitch history")
}
}
if len(vodFound) > 0 {
_, err = dbConn.Exec(ctx,
`
UPDATE twitch_stream_history
SET
last_verified_vod = $2
WHERE
vod_id = ANY($1)
`,
vodFound,
time.Now(),
)
if err != nil {
return oops.New(err, "failed to update twitch history")
}
}
}
return nil
}
@ -585,19 +1186,17 @@ var RelevantTags = []string{
"6f86127d-6051-4a38-94bb-f7b475dde109", // Software Development
}
func isStatusRelevant(status *streamStatus) bool {
if status.Live {
for _, cat := range RelevantCategories {
if status.Category == cat {
return true
}
func isStreamRelevant(catID string, tags []string) bool {
for _, cat := range RelevantCategories {
if cat == catID {
return true
}
}
for _, tag := range RelevantTags {
for _, streamTag := range status.Tags {
if tag == streamTag {
return true
}
for _, tag := range RelevantTags {
for _, streamTag := range tags {
if tag == streamTag {
return true
}
}
}
@ -626,7 +1225,7 @@ func twitchLogClear(ctx context.Context, conn db.ConnOrTx) {
_, err := conn.Exec(ctx,
`
DELETE FROM twitch_log
WHERE timestamp <= $1
WHERE logged_at <= $1
`,
time.Now().Add(-(time.Hour * 24 * 4)),
)

View File

@ -97,11 +97,13 @@ func TwitchDebugPage(c *RequestContext) ResponseData {
if err != nil {
return c.ErrorResponse(http.StatusInternalServerError, oops.New(err, "failed to fetch twitch streamers"))
}
live, err := db.Query[models.TwitchStream](c, c.Conn,
live, err := db.Query[models.TwitchLatestStatus](c, c.Conn,
`
SELECT $columns
FROM
twitch_stream
twitch_latest_status
WHERE
live = TRUE
`,
)
if err != nil {
@ -124,7 +126,7 @@ func TwitchDebugPage(c *RequestContext) ResponseData {
user.Login = u.TwitchLogin
user.Live = false
for _, l := range live {
if l.Login == u.TwitchLogin {
if l.TwitchLogin == u.TwitchLogin {
user.Live = true
break
}