Better twitch tracking
This commit is contained in:
parent
304371a9a9
commit
5cc920dc2f
|
@ -5,15 +5,18 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"git.handmade.network/hmn/hmn/src/config"
|
"git.handmade.network/hmn/hmn/src/config"
|
||||||
"git.handmade.network/hmn/hmn/src/db"
|
"git.handmade.network/hmn/hmn/src/db"
|
||||||
"git.handmade.network/hmn/hmn/src/hmndata"
|
"git.handmade.network/hmn/hmn/src/hmndata"
|
||||||
"git.handmade.network/hmn/hmn/src/logging"
|
"git.handmade.network/hmn/hmn/src/logging"
|
||||||
|
"git.handmade.network/hmn/hmn/src/models"
|
||||||
"git.handmade.network/hmn/hmn/src/oops"
|
"git.handmade.network/hmn/hmn/src/oops"
|
||||||
)
|
)
|
||||||
|
|
||||||
// NOTE(asaf): Updates or creates a discord message according to the following rules:
|
// NOTE(asaf): Updates or creates a discord message according to the following rules:
|
||||||
|
//
|
||||||
// Create when:
|
// Create when:
|
||||||
// * No previous message exists
|
// * No previous message exists
|
||||||
// * We have non-zero live streamers
|
// * We have non-zero live streamers
|
||||||
|
@ -23,6 +26,7 @@ import (
|
||||||
// That way we ensure that the message doesn't get scrolled offscreen, and the
|
// That way we ensure that the message doesn't get scrolled offscreen, and the
|
||||||
// new message indicator for the channel doesn't trigger when a streamer goes offline or
|
// new message indicator for the channel doesn't trigger when a streamer goes offline or
|
||||||
// updates the stream title.
|
// updates the stream title.
|
||||||
|
//
|
||||||
// NOTE(asaf): No-op if StreamsChannelID is not specified in the config
|
// NOTE(asaf): No-op if StreamsChannelID is not specified in the config
|
||||||
func UpdateStreamers(ctx context.Context, dbConn db.ConnOrTx, streamers []hmndata.StreamDetails) error {
|
func UpdateStreamers(ctx context.Context, dbConn db.ConnOrTx, streamers []hmndata.StreamDetails) error {
|
||||||
if len(config.Config.Discord.StreamsChannelID) == 0 {
|
if len(config.Config.Discord.StreamsChannelID) == 0 {
|
||||||
|
@ -44,6 +48,7 @@ func UpdateStreamers(ctx context.Context, dbConn db.ConnOrTx, streamers []hmndat
|
||||||
}
|
}
|
||||||
|
|
||||||
if editExisting {
|
if editExisting {
|
||||||
|
// Make sure we have a message to edit
|
||||||
_, err := GetChannelMessage(ctx, config.Config.Discord.StreamsChannelID, livestreamMessage.MessageID)
|
_, err := GetChannelMessage(ctx, config.Config.Discord.StreamsChannelID, livestreamMessage.MessageID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == NotFound {
|
if err == NotFound {
|
||||||
|
@ -54,7 +59,20 @@ func UpdateStreamers(ctx context.Context, dbConn db.ConnOrTx, streamers []hmndat
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(streamers) == 0 {
|
||||||
|
if livestreamMessage != nil {
|
||||||
|
err = DeleteMessage(ctx, config.Config.Discord.StreamsChannelID, livestreamMessage.MessageID)
|
||||||
|
if err != nil {
|
||||||
|
return oops.New(err, "failed to delete livestream message from discord")
|
||||||
|
}
|
||||||
|
err = hmndata.RemovePersistentVar(ctx, dbConn, hmndata.VarNameDiscordLivestreamMessage)
|
||||||
|
if err != nil {
|
||||||
|
return oops.New(err, "failed to clear discord persistent var")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
if editExisting {
|
if editExisting {
|
||||||
|
// Check if we have new streamers to add
|
||||||
existingStreamers := livestreamMessage.Streamers
|
existingStreamers := livestreamMessage.Streamers
|
||||||
for _, s := range streamers {
|
for _, s := range streamers {
|
||||||
found := false
|
found := false
|
||||||
|
@ -71,7 +89,8 @@ func UpdateStreamers(ctx context.Context, dbConn db.ConnOrTx, streamers []hmndat
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if editExisting && len(streamers) > 0 {
|
if editExisting {
|
||||||
|
// Check that our editable message is the latest in the channel
|
||||||
messages, err := GetChannelMessages(ctx, config.Config.Discord.StreamsChannelID, GetChannelMessagesInput{
|
messages, err := GetChannelMessages(ctx, config.Config.Discord.StreamsChannelID, GetChannelMessagesInput{
|
||||||
Limit: 1,
|
Limit: 1,
|
||||||
})
|
})
|
||||||
|
@ -84,15 +103,11 @@ func UpdateStreamers(ctx context.Context, dbConn db.ConnOrTx, streamers []hmndat
|
||||||
}
|
}
|
||||||
|
|
||||||
messageContent := ""
|
messageContent := ""
|
||||||
if len(streamers) == 0 {
|
|
||||||
messageContent = "No one is currently streaming."
|
|
||||||
} else {
|
|
||||||
var builder strings.Builder
|
var builder strings.Builder
|
||||||
for _, s := range streamers {
|
for _, s := range streamers {
|
||||||
builder.WriteString(fmt.Sprintf(":red_circle: **%s** is live: <https://twitch.tv/%s>\n> _%s_\nStarted <t:%d:R>\n\n", s.Username, s.Username, s.Title, s.StartTime.Unix()))
|
builder.WriteString(fmt.Sprintf(":red_circle: **%s** is live: <https://twitch.tv/%s>\n> _%s_\nStarted <t:%d:R>\n\n", s.Username, s.Username, s.Title, s.StartTime.Unix()))
|
||||||
}
|
}
|
||||||
messageContent = builder.String()
|
messageContent = builder.String()
|
||||||
}
|
|
||||||
|
|
||||||
msgJson, err := json.Marshal(CreateMessageRequest{
|
msgJson, err := json.Marshal(CreateMessageRequest{
|
||||||
Content: messageContent,
|
Content: messageContent,
|
||||||
|
@ -137,5 +152,50 @@ func UpdateStreamers(ctx context.Context, dbConn db.ConnOrTx, streamers []hmndat
|
||||||
return oops.New(err, "failed to store persistent var for discord streams")
|
return oops.New(err, "failed to store persistent var for discord streams")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func PostStreamHistory(ctx context.Context, history *models.TwitchStreamHistory) (string, error) {
|
||||||
|
if len(config.Config.Discord.StreamsChannelID) == 0 {
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
|
||||||
|
approximated := ""
|
||||||
|
if history.EndApproximated {
|
||||||
|
approximated = "about "
|
||||||
|
}
|
||||||
|
duration := history.EndedAt.Sub(history.StartedAt).Truncate(time.Minute).String()
|
||||||
|
messageContent := fmt.Sprintf(
|
||||||
|
"**%s** was live: https://twitch.tv/%s\n> _%s_\n At <t:%d:F> For %s%s",
|
||||||
|
history.TwitchLogin,
|
||||||
|
history.TwitchLogin,
|
||||||
|
history.Title,
|
||||||
|
history.StartedAt.Unix(),
|
||||||
|
approximated,
|
||||||
|
duration,
|
||||||
|
)
|
||||||
|
msgJson, err := json.Marshal(CreateMessageRequest{
|
||||||
|
Content: messageContent,
|
||||||
|
Flags: FlagSuppressEmbeds,
|
||||||
|
AllowedMentions: &MessageAllowedMentions{},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return "", oops.New(err, "failed to marshal discord message")
|
||||||
|
}
|
||||||
|
messageID := ""
|
||||||
|
if history.DiscordMessageID != "" {
|
||||||
|
updatedMessage, err := EditMessage(ctx, config.Config.Discord.StreamsChannelID, history.DiscordMessageID, string(msgJson))
|
||||||
|
if err != nil {
|
||||||
|
return "", oops.New(err, "failed to update discord message for stream history")
|
||||||
|
}
|
||||||
|
messageID = updatedMessage.ID
|
||||||
|
} else {
|
||||||
|
msg, err := CreateMessage(ctx, config.Config.Discord.StreamsChannelID, string(msgJson))
|
||||||
|
if err != nil {
|
||||||
|
return "", oops.New(err, "failed to create discord message for stream history")
|
||||||
|
}
|
||||||
|
messageID = msg.ID
|
||||||
|
}
|
||||||
|
return messageID, nil
|
||||||
|
}
|
||||||
|
|
|
@ -84,3 +84,17 @@ func StorePersistentVar[T any](
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func RemovePersistentVar(ctx context.Context, dbConn db.ConnOrTx, name PersistentVarName) error {
|
||||||
|
_, err := dbConn.Exec(ctx,
|
||||||
|
`
|
||||||
|
DELETE FROM persistent_var
|
||||||
|
WHERE name = $1
|
||||||
|
`,
|
||||||
|
name,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return oops.New(err, "failed to delete var")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,84 @@
|
||||||
|
package migrations
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.handmade.network/hmn/hmn/src/migration/types"
|
||||||
|
"github.com/jackc/pgx/v4"
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
registerMigration(NewTwitchTracking{})
|
||||||
|
}
|
||||||
|
|
||||||
|
type NewTwitchTracking struct{}
|
||||||
|
|
||||||
|
func (m NewTwitchTracking) Version() types.MigrationVersion {
|
||||||
|
return types.MigrationVersion(time.Date(2022, 9, 14, 11, 46, 45, 0, time.UTC))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m NewTwitchTracking) Name() string {
|
||||||
|
return "NewTwitchTracking"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m NewTwitchTracking) Description() string {
|
||||||
|
return "New table for twitch tracking"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m NewTwitchTracking) Up(ctx context.Context, tx pgx.Tx) error {
|
||||||
|
_, err := tx.Exec(ctx,
|
||||||
|
`
|
||||||
|
CREATE TABLE twitch_stream_history (
|
||||||
|
stream_id VARCHAR(255) NOT NULL PRIMARY KEY,
|
||||||
|
twitch_id VARCHAR(255) NOT NULL,
|
||||||
|
twitch_login VARCHAR(255) NOT NULL,
|
||||||
|
started_at TIMESTAMP WITH TIME ZONE NOT NULL,
|
||||||
|
ended_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT 'epoch',
|
||||||
|
title VARCHAR(255) NOT NULL DEFAULT '',
|
||||||
|
category_id VARCHAR(255) NOT NULL DEFAULT '',
|
||||||
|
tags VARCHAR(255) ARRAY NOT NULL DEFAULT '{}',
|
||||||
|
discord_message_id VARCHAR(255) NOT NULL DEFAULT '',
|
||||||
|
discord_needs_update BOOLEAN NOT NULL DEFAULT FALSE,
|
||||||
|
vod_id VARCHAR(255) NOT NULL DEFAULT '',
|
||||||
|
vod_url VARCHAR(512) NOT NULL DEFAULT '',
|
||||||
|
vod_thumbnail VARCHAR(512) NOT NULL DEFAULT '',
|
||||||
|
last_verified_vod TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT 'epoch',
|
||||||
|
vod_gone BOOLEAN NOT NULL DEFAULT FALSE
|
||||||
|
);
|
||||||
|
CREATE TABLE twitch_latest_status (
|
||||||
|
twitch_id VARCHAR(255) NOT NULL PRIMARY KEY,
|
||||||
|
twitch_login VARCHAR(255) NOT NULL,
|
||||||
|
stream_id VARCHAR(255) NOT NULL DEFAULT '',
|
||||||
|
live BOOLEAN NOT NULL DEFAULT FALSE,
|
||||||
|
started_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT 'epoch',
|
||||||
|
title VARCHAR(255) NOT NULL DEFAULT '',
|
||||||
|
category_id VARCHAR(255) NOT NULL DEFAULT '',
|
||||||
|
tags VARCHAR(255) ARRAY NOT NULL DEFAULT '{}',
|
||||||
|
last_hook_live_update TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT 'epoch',
|
||||||
|
last_hook_channel_update TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT 'epoch',
|
||||||
|
last_rest_update TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT 'epoch'
|
||||||
|
);
|
||||||
|
|
||||||
|
DROP TABLE twitch_stream;
|
||||||
|
`,
|
||||||
|
)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m NewTwitchTracking) Down(ctx context.Context, tx pgx.Tx) error {
|
||||||
|
_, err := tx.Exec(ctx,
|
||||||
|
`
|
||||||
|
DROP TABLE twitch_stream_history;
|
||||||
|
DROP TABLE twitch_latest_status;
|
||||||
|
|
||||||
|
CREATE TABLE twitch_streams (
|
||||||
|
twitch_id VARCHAR(255) NOT NULL,
|
||||||
|
twitch_login VARCHAR(255) NOT NULL,
|
||||||
|
title VARCHAR(255) NOT NULL,
|
||||||
|
started_at TIMESTAMP WITH TIME ZONE
|
||||||
|
);
|
||||||
|
`,
|
||||||
|
)
|
||||||
|
return err
|
||||||
|
}
|
|
@ -2,16 +2,40 @@ package models
|
||||||
|
|
||||||
import "time"
|
import "time"
|
||||||
|
|
||||||
type TwitchID struct {
|
type TwitchLatestStatus struct {
|
||||||
ID string `db:"id"`
|
TwitchID string `db:"twitch_id"`
|
||||||
Login string `db:"login"`
|
TwitchLogin string `db:"twitch_login"`
|
||||||
|
StreamID string `db:"stream_id"`
|
||||||
|
Live bool `db:"live"`
|
||||||
|
StartedAt time.Time `db:"started_at"`
|
||||||
|
Title string `db:"title"`
|
||||||
|
CategoryID string `db:"category_id"`
|
||||||
|
Tags []string `db:"tags"`
|
||||||
|
LastHookLiveUpdate time.Time `db:"last_hook_live_update"`
|
||||||
|
LastHookChannelUpdate time.Time `db:"last_hook_channel_update"`
|
||||||
|
LastRESTUpdate time.Time `db:"last_rest_update"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type TwitchStream struct {
|
type TwitchStreamHistory struct {
|
||||||
ID string `db:"twitch_id"`
|
StreamID string `db:"stream_id"`
|
||||||
Login string `db:"twitch_login"`
|
TwitchID string `db:"twitch_id"`
|
||||||
Title string `db:"title"`
|
TwitchLogin string `db:"twitch_login"`
|
||||||
StartedAt time.Time `db:"started_at"`
|
StartedAt time.Time `db:"started_at"`
|
||||||
|
EndedAt time.Time `db:"ended_at"`
|
||||||
|
EndApproximated bool `db:"end_approximated"`
|
||||||
|
Title string `db:"title"`
|
||||||
|
CategoryID string `db:"category_id"`
|
||||||
|
Tags []string `db:"tags"`
|
||||||
|
DiscordMessageID string `db:"discord_message_id"`
|
||||||
|
DiscordNeedsUpdate bool `db:"discord_needs_update"`
|
||||||
|
VODID string `db:"vod_id"`
|
||||||
|
VODUrl string `db:"vod_url"`
|
||||||
|
VODThumbnail string `db:"vod_thumbnail"`
|
||||||
|
LastVerifiedVOD time.Time `db:"last_verified_vod"`
|
||||||
|
// NOTE(asaf): If we had a VOD for a while, and then it disappeared,
|
||||||
|
// assume it was removed from twitch and don't bother
|
||||||
|
// checking for it again.
|
||||||
|
VODGone bool `db:"vod_gone"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type TwitchLogType int
|
type TwitchLogType int
|
||||||
|
|
|
@ -28,6 +28,7 @@ var MaxRetries = errors.New("hit max retries")
|
||||||
var httpClient = &http.Client{}
|
var httpClient = &http.Client{}
|
||||||
|
|
||||||
// NOTE(asaf): Access token is not thread-safe right now.
|
// NOTE(asaf): Access token is not thread-safe right now.
|
||||||
|
//
|
||||||
// All twitch requests are made through the goroutine in MonitorTwitchSubscriptions.
|
// All twitch requests are made through the goroutine in MonitorTwitchSubscriptions.
|
||||||
var activeAccessToken string
|
var activeAccessToken string
|
||||||
var rateLimitReset time.Time
|
var rateLimitReset time.Time
|
||||||
|
@ -87,16 +88,6 @@ func getTwitchUsersByLogin(ctx context.Context, logins []string) ([]twitchUser,
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type streamStatus struct {
|
|
||||||
TwitchID string
|
|
||||||
TwitchLogin string
|
|
||||||
Live bool
|
|
||||||
Title string
|
|
||||||
StartedAt time.Time
|
|
||||||
Category string
|
|
||||||
Tags []string
|
|
||||||
}
|
|
||||||
|
|
||||||
func getStreamStatus(ctx context.Context, twitchIDs []string) ([]streamStatus, error) {
|
func getStreamStatus(ctx context.Context, twitchIDs []string) ([]streamStatus, error) {
|
||||||
result := make([]streamStatus, 0, len(twitchIDs))
|
result := make([]streamStatus, 0, len(twitchIDs))
|
||||||
numChunks := len(twitchIDs)/100 + 1
|
numChunks := len(twitchIDs)/100 + 1
|
||||||
|
@ -116,6 +107,7 @@ func getStreamStatus(ctx context.Context, twitchIDs []string) ([]streamStatus, e
|
||||||
}
|
}
|
||||||
|
|
||||||
type twitchStatus struct {
|
type twitchStatus struct {
|
||||||
|
StreamID string `json:"id"`
|
||||||
TwitchID string `json:"user_id"`
|
TwitchID string `json:"user_id"`
|
||||||
TwitchLogin string `json:"user_login"`
|
TwitchLogin string `json:"user_login"`
|
||||||
GameID string `json:"game_id"`
|
GameID string `json:"game_id"`
|
||||||
|
@ -150,12 +142,13 @@ func getStreamStatus(ctx context.Context, twitchIDs []string) ([]streamStatus, e
|
||||||
started = time.Now()
|
started = time.Now()
|
||||||
}
|
}
|
||||||
status := streamStatus{
|
status := streamStatus{
|
||||||
|
StreamID: d.StreamID,
|
||||||
TwitchID: d.TwitchID,
|
TwitchID: d.TwitchID,
|
||||||
TwitchLogin: d.TwitchLogin,
|
TwitchLogin: d.TwitchLogin,
|
||||||
Live: d.Type == "live",
|
Live: d.Type == "live",
|
||||||
Title: d.Title,
|
Title: d.Title,
|
||||||
StartedAt: started,
|
StartedAt: started,
|
||||||
Category: d.GameID,
|
CategoryID: d.GameID,
|
||||||
Tags: d.Tags,
|
Tags: d.Tags,
|
||||||
}
|
}
|
||||||
result = append(result, status)
|
result = append(result, status)
|
||||||
|
@ -165,6 +158,111 @@ func getStreamStatus(ctx context.Context, twitchIDs []string) ([]streamStatus, e
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type archivedVideo struct {
|
||||||
|
ID string
|
||||||
|
StreamID string
|
||||||
|
TwitchID string
|
||||||
|
TwitchLogin string
|
||||||
|
Title string
|
||||||
|
Description string
|
||||||
|
CreatedAt time.Time
|
||||||
|
Duration time.Duration
|
||||||
|
VODUrl string
|
||||||
|
VODThumbnail string
|
||||||
|
}
|
||||||
|
|
||||||
|
func getArchivedVideosForUser(ctx context.Context, twitchID string, numVODs int) ([]archivedVideo, error) {
|
||||||
|
query := url.Values{}
|
||||||
|
query.Add("user_id", twitchID)
|
||||||
|
query.Add("type", "archived")
|
||||||
|
query.Add("first", strconv.Itoa(numVODs))
|
||||||
|
|
||||||
|
return getArchivedVideosByQuery(ctx, query)
|
||||||
|
}
|
||||||
|
|
||||||
|
func getArchivedVideos(ctx context.Context, videoIDs []string) ([]archivedVideo, error) {
|
||||||
|
query := url.Values{}
|
||||||
|
for _, vid := range videoIDs {
|
||||||
|
query.Add("id", vid)
|
||||||
|
}
|
||||||
|
return getArchivedVideosByQuery(ctx, query)
|
||||||
|
}
|
||||||
|
|
||||||
|
func getArchivedVideosByQuery(ctx context.Context, query url.Values) ([]archivedVideo, error) {
|
||||||
|
req, err := http.NewRequestWithContext(ctx, "GET", buildUrl("/videos", query.Encode()), nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, oops.New(err, "failed to create request")
|
||||||
|
}
|
||||||
|
res, err := doRequest(ctx, true, req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, oops.New(err, "failed to fetch archived videos for user")
|
||||||
|
}
|
||||||
|
|
||||||
|
type twitchVideo struct {
|
||||||
|
ID string `json:"id"`
|
||||||
|
StreamID string `json:"stream_id"`
|
||||||
|
UserID string `json:"user_id"`
|
||||||
|
UserLogin string `json:"user_login"`
|
||||||
|
UserName string `json:"user_name"`
|
||||||
|
Title string `json:"title"`
|
||||||
|
Description string `json:"description"`
|
||||||
|
CreatedAt string `json:"created_at"`
|
||||||
|
PublishedAt string `json:"published_at"`
|
||||||
|
Url string `json:"url"`
|
||||||
|
ThumbnailUrl string `json:"thumbnail_url"`
|
||||||
|
Viewable string `json:"viewable"`
|
||||||
|
ViewCount int `json:"view_count"`
|
||||||
|
Language string `json:"language"`
|
||||||
|
Type string `json:"type"`
|
||||||
|
Duration string `json:"duration"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type twitchResponse struct {
|
||||||
|
Data []twitchVideo `json:"data"`
|
||||||
|
}
|
||||||
|
body, err := io.ReadAll(res.Body)
|
||||||
|
res.Body.Close()
|
||||||
|
if err != nil {
|
||||||
|
return nil, oops.New(err, "failed to read response body while processing archived videos")
|
||||||
|
}
|
||||||
|
log := logging.ExtractLogger(ctx)
|
||||||
|
log.Debug().Str("getArchivedVideosForUser response", string(body)).Msg("Got getArchivedVideosForUser response")
|
||||||
|
|
||||||
|
var resp twitchResponse
|
||||||
|
err = json.Unmarshal(body, &resp)
|
||||||
|
if err != nil {
|
||||||
|
return nil, oops.New(err, "failed to parse twitch response while processing archived videos")
|
||||||
|
}
|
||||||
|
|
||||||
|
var result []archivedVideo
|
||||||
|
|
||||||
|
for _, v := range resp.Data {
|
||||||
|
createdAt, err := time.Parse(time.RFC3339, v.CreatedAt)
|
||||||
|
if err != nil {
|
||||||
|
logging.ExtractLogger(ctx).Warn().Str("Time string", v.CreatedAt).Msg("Failed to parse twitch timestamp")
|
||||||
|
createdAt = time.Time{}
|
||||||
|
}
|
||||||
|
duration, err := time.ParseDuration(v.Duration)
|
||||||
|
if err != nil {
|
||||||
|
duration = 0
|
||||||
|
}
|
||||||
|
archived := archivedVideo{
|
||||||
|
ID: v.ID,
|
||||||
|
StreamID: v.StreamID,
|
||||||
|
TwitchID: v.UserID,
|
||||||
|
TwitchLogin: v.UserLogin,
|
||||||
|
Title: v.Title,
|
||||||
|
Description: v.Description,
|
||||||
|
CreatedAt: createdAt,
|
||||||
|
Duration: duration,
|
||||||
|
VODUrl: v.Url,
|
||||||
|
VODThumbnail: v.ThumbnailUrl,
|
||||||
|
}
|
||||||
|
result = append(result, archived)
|
||||||
|
}
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
type twitchEventSub struct {
|
type twitchEventSub struct {
|
||||||
EventID string
|
EventID string
|
||||||
TwitchID string
|
TwitchID string
|
||||||
|
|
|
@ -19,6 +19,45 @@ import (
|
||||||
"github.com/jackc/pgx/v4/pgxpool"
|
"github.com/jackc/pgx/v4/pgxpool"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// NOTE(asaf): The twitch api madness:
|
||||||
|
//
|
||||||
|
// | stream.online | stream.offline | channel.update[3] | REST[1][2]
|
||||||
|
// id[4] | YES | NO | NO | YES
|
||||||
|
// twitch_id | YES | YES | YES | YES
|
||||||
|
// twitch_login | YES | YES | YES | YES
|
||||||
|
// is_live | YES | IMPLICIT | NO | YES
|
||||||
|
// started_at | YES | NO | NO | YES
|
||||||
|
// title | NO | NO | YES | YES
|
||||||
|
// cat_id | NO | NO | YES | YES
|
||||||
|
// tags | NO | NO | NO | YES
|
||||||
|
//
|
||||||
|
// [1] REST returns nothing when user is not live
|
||||||
|
// [2] Information received from REST is ~3 minutes old.
|
||||||
|
// [3] channel.update also fires when the user changes their twitch channel settings when they're not live (i.e. as soon as they update it in twitch settings)
|
||||||
|
// [4] ID of the current livestream
|
||||||
|
|
||||||
|
type streamStatus struct {
|
||||||
|
StreamID string
|
||||||
|
TwitchID string
|
||||||
|
TwitchLogin string
|
||||||
|
Live bool
|
||||||
|
Title string
|
||||||
|
StartedAt time.Time
|
||||||
|
CategoryID string
|
||||||
|
Tags []string
|
||||||
|
}
|
||||||
|
|
||||||
|
type twitchNotificationType int
|
||||||
|
|
||||||
|
const (
|
||||||
|
notificationTypeNone twitchNotificationType = 0
|
||||||
|
notificationTypeOnline = 1
|
||||||
|
notificationTypeOffline = 2
|
||||||
|
notificationTypeChannelUpdate = 3
|
||||||
|
|
||||||
|
notificationTypeRevocation = 4
|
||||||
|
)
|
||||||
|
|
||||||
type twitchNotification struct {
|
type twitchNotification struct {
|
||||||
Status streamStatus
|
Status streamStatus
|
||||||
Type twitchNotificationType
|
Type twitchNotificationType
|
||||||
|
@ -75,10 +114,10 @@ func MonitorTwitchSubscriptions(ctx context.Context, dbConn *pgxpool.Pool) jobs.
|
||||||
log.Error().Err(err).Msg("Failed to fetch refresh token on start")
|
log.Error().Err(err).Msg("Failed to fetch refresh token on start")
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
syncWithTwitch(ctx, dbConn, true)
|
syncWithTwitch(ctx, dbConn, true, true)
|
||||||
case <-monitorTicker.C:
|
case <-monitorTicker.C:
|
||||||
twitchLogClear(ctx, dbConn)
|
twitchLogClear(ctx, dbConn)
|
||||||
syncWithTwitch(ctx, dbConn, true)
|
syncWithTwitch(ctx, dbConn, true, true)
|
||||||
case <-linksChangedChannel:
|
case <-linksChangedChannel:
|
||||||
// NOTE(asaf): Since we update links inside transactions for users/projects
|
// NOTE(asaf): Since we update links inside transactions for users/projects
|
||||||
// we won't see the updated list of links until the transaction is committed.
|
// we won't see the updated list of links until the transaction is committed.
|
||||||
|
@ -87,13 +126,13 @@ func MonitorTwitchSubscriptions(ctx context.Context, dbConn *pgxpool.Pool) jobs.
|
||||||
var timer *time.Timer
|
var timer *time.Timer
|
||||||
t := time.AfterFunc(5*time.Second, func() {
|
t := time.AfterFunc(5*time.Second, func() {
|
||||||
expiredTimers <- timer
|
expiredTimers <- timer
|
||||||
syncWithTwitch(ctx, dbConn, false)
|
syncWithTwitch(ctx, dbConn, false, false)
|
||||||
})
|
})
|
||||||
timer = t
|
timer = t
|
||||||
timers = append(timers, t)
|
timers = append(timers, t)
|
||||||
case notification := <-twitchNotificationChannel:
|
case notification := <-twitchNotificationChannel:
|
||||||
if notification.Type == notificationTypeRevocation {
|
if notification.Type == notificationTypeRevocation {
|
||||||
syncWithTwitch(ctx, dbConn, false)
|
syncWithTwitch(ctx, dbConn, false, false)
|
||||||
} else {
|
} else {
|
||||||
// NOTE(asaf): The twitch API (getStreamStatus) lags behind the notification and
|
// NOTE(asaf): The twitch API (getStreamStatus) lags behind the notification and
|
||||||
// would return old data if we called it immediately, so we process
|
// would return old data if we called it immediately, so we process
|
||||||
|
@ -123,17 +162,6 @@ func MonitorTwitchSubscriptions(ctx context.Context, dbConn *pgxpool.Pool) jobs.
|
||||||
return job
|
return job
|
||||||
}
|
}
|
||||||
|
|
||||||
type twitchNotificationType int
|
|
||||||
|
|
||||||
const (
|
|
||||||
notificationTypeNone twitchNotificationType = 0
|
|
||||||
notificationTypeOnline = 1
|
|
||||||
notificationTypeOffline = 2
|
|
||||||
notificationTypeChannelUpdate = 3
|
|
||||||
|
|
||||||
notificationTypeRevocation = 4
|
|
||||||
)
|
|
||||||
|
|
||||||
func QueueTwitchNotification(ctx context.Context, conn db.ConnOrTx, messageType string, body []byte) error {
|
func QueueTwitchNotification(ctx context.Context, conn db.ConnOrTx, messageType string, body []byte) error {
|
||||||
var notification twitchNotification
|
var notification twitchNotification
|
||||||
if messageType == "notification" {
|
if messageType == "notification" {
|
||||||
|
@ -142,6 +170,7 @@ func QueueTwitchNotification(ctx context.Context, conn db.ConnOrTx, messageType
|
||||||
Type string `json:"type"`
|
Type string `json:"type"`
|
||||||
} `json:"subscription"`
|
} `json:"subscription"`
|
||||||
Event struct {
|
Event struct {
|
||||||
|
StreamID string `json:"id"`
|
||||||
BroadcasterUserID string `json:"broadcaster_user_id"`
|
BroadcasterUserID string `json:"broadcaster_user_id"`
|
||||||
BroadcasterUserLogin string `json:"broadcaster_user_login"`
|
BroadcasterUserLogin string `json:"broadcaster_user_login"`
|
||||||
Title string `json:"title"`
|
Title string `json:"title"`
|
||||||
|
@ -159,12 +188,13 @@ func QueueTwitchNotification(ctx context.Context, conn db.ConnOrTx, messageType
|
||||||
notification.Status.TwitchID = incoming.Event.BroadcasterUserID
|
notification.Status.TwitchID = incoming.Event.BroadcasterUserID
|
||||||
notification.Status.TwitchLogin = incoming.Event.BroadcasterUserLogin
|
notification.Status.TwitchLogin = incoming.Event.BroadcasterUserLogin
|
||||||
notification.Status.Title = incoming.Event.Title
|
notification.Status.Title = incoming.Event.Title
|
||||||
notification.Status.Category = incoming.Event.CategoryID
|
notification.Status.CategoryID = incoming.Event.CategoryID
|
||||||
notification.Status.StartedAt = time.Now()
|
notification.Status.StartedAt = time.Now()
|
||||||
switch incoming.Subscription.Type {
|
switch incoming.Subscription.Type {
|
||||||
case "stream.online":
|
case "stream.online":
|
||||||
notification.Type = notificationTypeOnline
|
notification.Type = notificationTypeOnline
|
||||||
notification.Status.Live = true
|
notification.Status.Live = true
|
||||||
|
notification.Status.StreamID = incoming.Event.StreamID
|
||||||
case "stream.offline":
|
case "stream.offline":
|
||||||
notification.Type = notificationTypeOffline
|
notification.Type = notificationTypeOffline
|
||||||
case "channel.update":
|
case "channel.update":
|
||||||
|
@ -206,7 +236,7 @@ func UserOrProjectLinksUpdated(twitchLoginsPreChange, twitchLoginsPostChange []s
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func syncWithTwitch(ctx context.Context, dbConn *pgxpool.Pool, updateAll bool) {
|
func syncWithTwitch(ctx context.Context, dbConn *pgxpool.Pool, updateAll bool, updateVODs bool) {
|
||||||
log := logging.ExtractLogger(ctx)
|
log := logging.ExtractLogger(ctx)
|
||||||
log.Info().Msg("Running twitch sync")
|
log.Info().Msg("Running twitch sync")
|
||||||
p := perf.MakeNewRequestPerf("Background job", "", "syncWithTwitch")
|
p := perf.MakeNewRequestPerf("Background job", "", "syncWithTwitch")
|
||||||
|
@ -264,12 +294,6 @@ func syncWithTwitch(ctx context.Context, dbConn *pgxpool.Pool, updateAll bool) {
|
||||||
}
|
}
|
||||||
p.EndBlock()
|
p.EndBlock()
|
||||||
|
|
||||||
const (
|
|
||||||
EventSubNone = 0 // No event of this type found
|
|
||||||
EventSubRefresh = 1 // Event found, but bad status. Need to unsubscribe and resubscribe.
|
|
||||||
EventSubGood = 2 // All is well.
|
|
||||||
)
|
|
||||||
|
|
||||||
type isSubbedByType map[string]bool
|
type isSubbedByType map[string]bool
|
||||||
|
|
||||||
streamerEventSubs := make(map[string]isSubbedByType)
|
streamerEventSubs := make(map[string]isSubbedByType)
|
||||||
|
@ -345,7 +369,7 @@ func syncWithTwitch(ctx context.Context, dbConn *pgxpool.Pool, updateAll bool) {
|
||||||
}
|
}
|
||||||
p.StartBlock("SQL", "Remove untracked streamers")
|
p.StartBlock("SQL", "Remove untracked streamers")
|
||||||
_, err = tx.Exec(ctx,
|
_, err = tx.Exec(ctx,
|
||||||
`DELETE FROM twitch_stream WHERE twitch_id != ANY($1)`,
|
`DELETE FROM twitch_latest_status WHERE twitch_id != ANY($1)`,
|
||||||
allIDs,
|
allIDs,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -379,10 +403,29 @@ func syncWithTwitch(ctx context.Context, dbConn *pgxpool.Pool, updateAll bool) {
|
||||||
twitchLog(ctx, tx, models.TwitchLogTypeOther, "", "Batch resync", fmt.Sprintf("%#v", statuses))
|
twitchLog(ctx, tx, models.TwitchLogTypeOther, "", "Batch resync", fmt.Sprintf("%#v", statuses))
|
||||||
p.EndBlock()
|
p.EndBlock()
|
||||||
p.StartBlock("SQL", "Update stream statuses in db")
|
p.StartBlock("SQL", "Update stream statuses in db")
|
||||||
for _, status := range statuses {
|
for _, twitchId := range usersToUpdate {
|
||||||
log.Debug().Interface("Status", status).Msg("Got streamer")
|
var status *streamStatus
|
||||||
|
for idx, st := range statuses {
|
||||||
|
if st.TwitchID == twitchId {
|
||||||
|
status = &statuses[idx]
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if status == nil {
|
||||||
|
twitchLogin := ""
|
||||||
|
for _, streamer := range validStreamers {
|
||||||
|
if streamer.TwitchID == twitchId {
|
||||||
|
twitchLogin = streamer.TwitchLogin
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
status = &streamStatus{
|
||||||
|
TwitchID: twitchId,
|
||||||
|
TwitchLogin: twitchLogin,
|
||||||
|
}
|
||||||
|
}
|
||||||
twitchLog(ctx, tx, models.TwitchLogTypeREST, status.TwitchLogin, "Resync", fmt.Sprintf("%#v", status))
|
twitchLog(ctx, tx, models.TwitchLogTypeREST, status.TwitchLogin, "Resync", fmt.Sprintf("%#v", status))
|
||||||
err = updateStreamStatusInDB(ctx, tx, &status)
|
err = gotRESTUpdate(ctx, tx, status)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error().Err(err).Msg("failed to update twitch stream status")
|
log.Error().Err(err).Msg("failed to update twitch stream status")
|
||||||
}
|
}
|
||||||
|
@ -395,6 +438,17 @@ func syncWithTwitch(ctx context.Context, dbConn *pgxpool.Pool, updateAll bool) {
|
||||||
stats.NumStreamsChecked += len(usersToUpdate)
|
stats.NumStreamsChecked += len(usersToUpdate)
|
||||||
log.Info().Interface("Stats", stats).Msg("Twitch sync done")
|
log.Info().Interface("Stats", stats).Msg("Twitch sync done")
|
||||||
|
|
||||||
|
if updateVODs {
|
||||||
|
err = findMissingVODs(ctx, dbConn)
|
||||||
|
if err != nil {
|
||||||
|
log.Error().Err(err).Msg("failed to find missing twitch vods")
|
||||||
|
}
|
||||||
|
err = verifyHistoryVODs(ctx, dbConn)
|
||||||
|
if err != nil {
|
||||||
|
log.Error().Err(err).Msg("failed to verify twitch vods")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
log.Debug().Msg("Notifying discord")
|
log.Debug().Msg("Notifying discord")
|
||||||
err = notifyDiscordOfLiveStream(ctx, dbConn)
|
err = notifyDiscordOfLiveStream(ctx, dbConn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -403,11 +457,56 @@ func syncWithTwitch(ctx context.Context, dbConn *pgxpool.Pool, updateAll bool) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func notifyDiscordOfLiveStream(ctx context.Context, dbConn db.ConnOrTx) error {
|
func notifyDiscordOfLiveStream(ctx context.Context, dbConn db.ConnOrTx) error {
|
||||||
streams, err := db.Query[models.TwitchStream](ctx, dbConn,
|
history, err := db.Query[models.TwitchStreamHistory](ctx, dbConn,
|
||||||
|
`
|
||||||
|
SELECT $columns
|
||||||
|
FROM twitch_stream_history
|
||||||
|
WHERE discord_needs_update = TRUE
|
||||||
|
ORDER BY started_at ASC
|
||||||
|
`,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return oops.New(err, "failed to fetch twitch history")
|
||||||
|
}
|
||||||
|
|
||||||
|
updatedHistories := make([]*models.TwitchStreamHistory, 0)
|
||||||
|
for _, h := range history {
|
||||||
|
relevant := isStreamRelevant(h.CategoryID, h.Tags)
|
||||||
|
if relevant && !h.EndedAt.IsZero() {
|
||||||
|
msgId, err := discord.PostStreamHistory(ctx, h)
|
||||||
|
if err != nil {
|
||||||
|
return oops.New(err, "failed to post twitch history to discord")
|
||||||
|
}
|
||||||
|
h.DiscordNeedsUpdate = false
|
||||||
|
h.DiscordMessageID = msgId
|
||||||
|
updatedHistories = append(updatedHistories, h)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, h := range updatedHistories {
|
||||||
|
_, err = dbConn.Exec(ctx,
|
||||||
|
`
|
||||||
|
UPDATE twitch_stream_history
|
||||||
|
SET
|
||||||
|
discord_needs_update = $2,
|
||||||
|
discord_message_id = $3,
|
||||||
|
WHERE stream_id = $1
|
||||||
|
`,
|
||||||
|
h.StreamID,
|
||||||
|
h.DiscordNeedsUpdate,
|
||||||
|
h.DiscordMessageID,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return oops.New(err, "failed to update twitch history after posting to discord")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
streams, err := db.Query[models.TwitchLatestStatus](ctx, dbConn,
|
||||||
`
|
`
|
||||||
SELECT $columns
|
SELECT $columns
|
||||||
FROM
|
FROM
|
||||||
twitch_stream
|
twitch_latest_status
|
||||||
|
WHERE live = TRUE
|
||||||
ORDER BY started_at ASC
|
ORDER BY started_at ASC
|
||||||
`,
|
`,
|
||||||
)
|
)
|
||||||
|
@ -417,12 +516,14 @@ func notifyDiscordOfLiveStream(ctx context.Context, dbConn db.ConnOrTx) error {
|
||||||
|
|
||||||
var streamDetails []hmndata.StreamDetails
|
var streamDetails []hmndata.StreamDetails
|
||||||
for _, s := range streams {
|
for _, s := range streams {
|
||||||
|
if isStreamRelevant(s.CategoryID, s.Tags) {
|
||||||
streamDetails = append(streamDetails, hmndata.StreamDetails{
|
streamDetails = append(streamDetails, hmndata.StreamDetails{
|
||||||
Username: s.Login,
|
Username: s.TwitchLogin,
|
||||||
StartTime: s.StartedAt,
|
StartTime: s.StartedAt,
|
||||||
Title: s.Title,
|
Title: s.Title,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
err = discord.UpdateStreamers(ctx, dbConn, streamDetails)
|
err = discord.UpdateStreamers(ctx, dbConn, streamDetails)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -455,6 +556,7 @@ func updateStreamStatus(ctx context.Context, dbConn db.ConnOrTx, twitchID string
|
||||||
|
|
||||||
status := streamStatus{
|
status := streamStatus{
|
||||||
TwitchID: twitchID,
|
TwitchID: twitchID,
|
||||||
|
TwitchLogin: twitchLogin,
|
||||||
Live: false,
|
Live: false,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -468,7 +570,7 @@ func updateStreamStatus(ctx context.Context, dbConn db.ConnOrTx, twitchID string
|
||||||
log.Debug().Interface("Got status", result[0]).Msg("Got streamer status from twitch")
|
log.Debug().Interface("Got status", result[0]).Msg("Got streamer status from twitch")
|
||||||
status = result[0]
|
status = result[0]
|
||||||
}
|
}
|
||||||
err = updateStreamStatusInDB(ctx, dbConn, &status)
|
err = gotRESTUpdate(ctx, dbConn, &status)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error().Err(err).Msg("failed to update twitch stream status")
|
log.Error().Err(err).Msg("failed to update twitch stream status")
|
||||||
return
|
return
|
||||||
|
@ -506,29 +608,21 @@ func processEventSubNotification(ctx context.Context, dbConn db.ConnOrTx, notifi
|
||||||
}
|
}
|
||||||
|
|
||||||
twitchLog(ctx, dbConn, models.TwitchLogTypeHook, notification.Status.TwitchLogin, "Processing hook", fmt.Sprintf("%#v", notification))
|
twitchLog(ctx, dbConn, models.TwitchLogTypeHook, notification.Status.TwitchLogin, "Processing hook", fmt.Sprintf("%#v", notification))
|
||||||
if notification.Type == notificationTypeOnline || notification.Type == notificationTypeOffline {
|
switch notification.Type {
|
||||||
log.Debug().Interface("Status", notification.Status).Msg("Updating status")
|
case notificationTypeOnline:
|
||||||
err = updateStreamStatusInDB(ctx, dbConn, ¬ification.Status)
|
err := gotStreamOnline(ctx, dbConn, ¬ification.Status)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error().Err(err).Msg("failed to update twitch stream status")
|
log.Error().Err(err).Msg("failed to update twitch stream status")
|
||||||
return
|
|
||||||
}
|
}
|
||||||
} else if notification.Type == notificationTypeChannelUpdate {
|
case notificationTypeOffline:
|
||||||
// NOTE(asaf): Channel updates can happen wether or not the streamer is live.
|
err := gotStreamOffline(ctx, dbConn, ¬ification.Status)
|
||||||
// So we just update the title if the user is live in our db, and we rely on the
|
if err != nil {
|
||||||
// 3-minute delayed status update to verify live status and category/tag requirements.
|
log.Error().Err(err).Msg("failed to update twitch stream status")
|
||||||
_, err = dbConn.Exec(ctx,
|
}
|
||||||
`
|
case notificationTypeChannelUpdate:
|
||||||
UPDATE twitch_stream
|
err := gotChannelUpdate(ctx, dbConn, ¬ification.Status)
|
||||||
SET title = $1
|
|
||||||
WHERE twitch_id = $2
|
|
||||||
`,
|
|
||||||
notification.Status.Title,
|
|
||||||
notification.Status.TwitchID,
|
|
||||||
)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error().Err(err).Msg("failed to update twitch stream status")
|
log.Error().Err(err).Msg("failed to update twitch stream status")
|
||||||
return
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -539,40 +633,472 @@ func processEventSubNotification(ctx context.Context, dbConn db.ConnOrTx, notifi
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func updateStreamStatusInDB(ctx context.Context, conn db.ConnOrTx, status *streamStatus) error {
|
func gotStreamOnline(ctx context.Context, conn db.ConnOrTx, status *streamStatus) error {
|
||||||
log := logging.ExtractLogger(ctx)
|
latest, err := fetchLatestStreamStatus(ctx, conn, status.TwitchID, status.TwitchLogin)
|
||||||
if isStatusRelevant(status) {
|
if err != nil {
|
||||||
twitchLog(ctx, conn, models.TwitchLogTypeOther, status.TwitchLogin, "Marking online", fmt.Sprintf("%#v", status))
|
return err
|
||||||
log.Debug().Msg("Status relevant")
|
}
|
||||||
_, err := conn.Exec(ctx,
|
latest.Live = true
|
||||||
|
latest.StreamID = status.StreamID
|
||||||
|
latest.StartedAt = status.StartedAt
|
||||||
|
latest.LastHookLiveUpdate = time.Now()
|
||||||
|
err = saveLatestStreamStatus(ctx, conn, latest)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = updateStreamHistory(ctx, conn, latest)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func gotStreamOffline(ctx context.Context, conn db.ConnOrTx, status *streamStatus) error {
|
||||||
|
latest, err := fetchLatestStreamStatus(ctx, conn, status.TwitchID, status.TwitchLogin)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
latest.Live = false
|
||||||
|
latest.LastHookLiveUpdate = time.Now()
|
||||||
|
err = saveLatestStreamStatus(ctx, conn, latest)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = updateStreamHistory(ctx, conn, latest)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func gotChannelUpdate(ctx context.Context, conn db.ConnOrTx, status *streamStatus) error {
|
||||||
|
latest, err := fetchLatestStreamStatus(ctx, conn, status.TwitchID, status.TwitchLogin)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if !latest.Live {
|
||||||
|
// NOTE(asaf): If the stream is live, this channel update applies
|
||||||
|
// to the current livestream. Otherwise, this will
|
||||||
|
// only apply to the next stream, so we clear out
|
||||||
|
// the stream info.
|
||||||
|
latest.StreamID = ""
|
||||||
|
latest.StartedAt = time.Time{}
|
||||||
|
}
|
||||||
|
latest.Title = status.Title
|
||||||
|
if latest.CategoryID != status.CategoryID {
|
||||||
|
latest.CategoryID = status.CategoryID
|
||||||
|
latest.Tags = []string{} // NOTE(asaf): We don't get tags here, but we can't assume they didn't change because some tags are automatic based on category
|
||||||
|
}
|
||||||
|
latest.LastHookChannelUpdate = time.Now()
|
||||||
|
err = saveLatestStreamStatus(ctx, conn, latest)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = updateStreamHistory(ctx, conn, latest)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func gotRESTUpdate(ctx context.Context, conn db.ConnOrTx, status *streamStatus) error {
|
||||||
|
latest, err := fetchLatestStreamStatus(ctx, conn, status.TwitchID, status.TwitchLogin)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if latest.LastHookLiveUpdate.Add(3 * time.Minute).Before(time.Now()) {
|
||||||
|
latest.Live = status.Live
|
||||||
|
if status.Live {
|
||||||
|
// NOTE(asaf): We don't get this information if the user isn't live
|
||||||
|
latest.StartedAt = status.StartedAt
|
||||||
|
latest.StreamID = status.StreamID
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if latest.LastHookChannelUpdate.Add(3 * time.Minute).Before(time.Now()) {
|
||||||
|
if status.Live {
|
||||||
|
// NOTE(asaf): We don't get this information if the user isn't live
|
||||||
|
latest.Title = status.Title
|
||||||
|
latest.CategoryID = status.CategoryID
|
||||||
|
latest.Tags = status.Tags
|
||||||
|
}
|
||||||
|
}
|
||||||
|
latest.LastRESTUpdate = time.Now()
|
||||||
|
err = saveLatestStreamStatus(ctx, conn, latest)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = updateStreamHistory(ctx, conn, latest)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func fetchLatestStreamStatus(ctx context.Context, conn db.ConnOrTx, twitchID string, twitchLogin string) (*models.TwitchLatestStatus, error) {
|
||||||
|
tx, err := conn.Begin(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, oops.New(err, "failed to begin transaction for stream status fetch")
|
||||||
|
}
|
||||||
|
defer tx.Rollback(ctx)
|
||||||
|
|
||||||
|
result, err := db.QueryOne[models.TwitchLatestStatus](ctx, conn,
|
||||||
`
|
`
|
||||||
INSERT INTO twitch_stream (twitch_id, twitch_login, title, started_at)
|
SELECT $columns
|
||||||
VALUES ($1, $2, $3, $4)
|
FROM twitch_latest_status
|
||||||
ON CONFLICT (twitch_id) DO UPDATE SET
|
WHERE twitch_id = $1
|
||||||
title = EXCLUDED.title,
|
|
||||||
started_at = EXCLUDED.started_at
|
|
||||||
`,
|
`,
|
||||||
status.TwitchID,
|
twitchID,
|
||||||
status.TwitchLogin,
|
)
|
||||||
status.Title,
|
if err == db.NotFound {
|
||||||
status.StartedAt,
|
_, err = conn.Exec(ctx,
|
||||||
|
`
|
||||||
|
INSERT INTO twitch_latest_status (twitch_id, twitch_login)
|
||||||
|
VALUES ($1, $2)
|
||||||
|
`,
|
||||||
|
twitchID,
|
||||||
|
twitchLogin,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return oops.New(err, "failed to insert twitch streamer into db")
|
return nil, err
|
||||||
|
}
|
||||||
|
result = &models.TwitchLatestStatus{
|
||||||
|
TwitchID: twitchID,
|
||||||
|
TwitchLogin: twitchLogin,
|
||||||
|
}
|
||||||
|
} else if err != nil {
|
||||||
|
return nil, oops.New(err, "failed to fetch existing twitch status")
|
||||||
|
}
|
||||||
|
|
||||||
|
if result.TwitchLogin != twitchLogin {
|
||||||
|
_, err = conn.Exec(ctx,
|
||||||
|
`
|
||||||
|
UPDATE twitch_latest_status
|
||||||
|
SET twitch_login = $2
|
||||||
|
WHERE twitch_id = $1
|
||||||
|
`,
|
||||||
|
twitchID,
|
||||||
|
twitchLogin,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, oops.New(err, "failed to update twitch login")
|
||||||
|
}
|
||||||
|
result.TwitchLogin = twitchLogin
|
||||||
|
}
|
||||||
|
err = tx.Commit(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, oops.New(err, "failed to commit transaction")
|
||||||
|
}
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func saveLatestStreamStatus(ctx context.Context, conn db.ConnOrTx, latest *models.TwitchLatestStatus) error {
|
||||||
|
tx, err := conn.Begin(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return oops.New(err, "failed to start transaction for stream status update")
|
||||||
|
}
|
||||||
|
defer tx.Rollback(ctx)
|
||||||
|
|
||||||
|
// NOTE(asaf): Ensure that we have a record for it in the db
|
||||||
|
_, err = fetchLatestStreamStatus(ctx, conn, latest.TwitchID, latest.TwitchLogin)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = conn.Exec(ctx,
|
||||||
|
`
|
||||||
|
UPDATE twitch_latest_status
|
||||||
|
SET
|
||||||
|
live = $2,
|
||||||
|
started_at = $3,
|
||||||
|
title = $4,
|
||||||
|
category_id = $5,
|
||||||
|
tags = $6,
|
||||||
|
last_hook_live_update = $7,
|
||||||
|
last_hook_channel_update = $8,
|
||||||
|
last_rest_update = $9
|
||||||
|
WHERE
|
||||||
|
twitch_id = $1
|
||||||
|
`,
|
||||||
|
latest.TwitchID,
|
||||||
|
latest.Live,
|
||||||
|
latest.StartedAt,
|
||||||
|
latest.Title,
|
||||||
|
latest.CategoryID,
|
||||||
|
latest.Tags,
|
||||||
|
latest.LastHookLiveUpdate,
|
||||||
|
latest.LastHookChannelUpdate,
|
||||||
|
latest.LastRESTUpdate,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return oops.New(err, "failed to update twitch latest status")
|
||||||
|
}
|
||||||
|
|
||||||
|
err = tx.Commit(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return oops.New(err, "failed to commit transaction")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func updateStreamHistory(ctx context.Context, dbConn db.ConnOrTx, status *models.TwitchLatestStatus) error {
|
||||||
|
if status.StreamID == "" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
tx, err := dbConn.Begin(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return oops.New(err, "failed to begin transaction for stream history update")
|
||||||
|
}
|
||||||
|
defer tx.Rollback(ctx)
|
||||||
|
history, err := db.QueryOne[models.TwitchStreamHistory](ctx, tx,
|
||||||
|
`
|
||||||
|
SELECT $columns
|
||||||
|
FROM twitch_stream_history
|
||||||
|
WHERE stream_id = $1
|
||||||
|
`,
|
||||||
|
status.StreamID,
|
||||||
|
)
|
||||||
|
|
||||||
|
if err == db.NotFound {
|
||||||
|
history = &models.TwitchStreamHistory{}
|
||||||
|
history.StreamID = status.StreamID
|
||||||
|
history.TwitchID = status.TwitchID
|
||||||
|
history.TwitchLogin = status.TwitchLogin
|
||||||
|
history.StartedAt = status.StartedAt
|
||||||
|
history.DiscordNeedsUpdate = true
|
||||||
|
} else if err != nil {
|
||||||
|
return oops.New(err, "failed to fetch existing stream history")
|
||||||
|
}
|
||||||
|
|
||||||
|
if !status.Live && history.EndedAt.IsZero() {
|
||||||
|
history.EndedAt = time.Now()
|
||||||
|
history.EndApproximated = true
|
||||||
|
history.DiscordNeedsUpdate = true
|
||||||
|
}
|
||||||
|
|
||||||
|
history.Title = status.Title
|
||||||
|
history.CategoryID = status.CategoryID
|
||||||
|
history.Tags = status.Tags
|
||||||
|
|
||||||
|
_, err = tx.Exec(ctx,
|
||||||
|
`
|
||||||
|
INSERT INTO
|
||||||
|
twitch_stream_history (stream_id, twitch_id, twitch_login, started_at, ended_at, title, category_id, tags)
|
||||||
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
||||||
|
ON CONFLICT (stream_id) DO UPDATE SET
|
||||||
|
ended_at = EXCLUDED.ended_at,
|
||||||
|
title = EXCLUDED.title,
|
||||||
|
category_id = EXCLUDED.category_id,
|
||||||
|
tags = EXCLUDED.tags
|
||||||
|
`,
|
||||||
|
history.StreamID,
|
||||||
|
history.TwitchID,
|
||||||
|
history.TwitchLogin,
|
||||||
|
history.StartedAt,
|
||||||
|
history.EndedAt,
|
||||||
|
history.Title,
|
||||||
|
history.CategoryID,
|
||||||
|
history.Tags,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return oops.New(err, "failed to insert/update twitch history")
|
||||||
|
}
|
||||||
|
err = tx.Commit(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return oops.New(err, "failed to commit transaction")
|
||||||
|
}
|
||||||
|
|
||||||
|
if !history.EndedAt.IsZero() {
|
||||||
|
err = findHistoryVOD(ctx, dbConn, history)
|
||||||
|
if err != nil {
|
||||||
|
return oops.New(err, "failed to look up twitch vod")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func findHistoryVOD(ctx context.Context, dbConn db.ConnOrTx, history *models.TwitchStreamHistory) error {
|
||||||
|
if history.StreamID == "" || history.VODID != "" || history.VODGone {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
vods, err := getArchivedVideosForUser(ctx, history.TwitchID, 10)
|
||||||
|
if err != nil {
|
||||||
|
return oops.New(err, "failed to fetch vods for streamer")
|
||||||
|
}
|
||||||
|
|
||||||
|
var vod *archivedVideo
|
||||||
|
for idx, v := range vods {
|
||||||
|
if v.StreamID == history.StreamID {
|
||||||
|
vod = &vods[idx]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if vod != nil {
|
||||||
|
history.VODID = vod.ID
|
||||||
|
history.VODUrl = vod.VODUrl
|
||||||
|
history.VODThumbnail = vod.VODThumbnail
|
||||||
|
history.LastVerifiedVOD = time.Now()
|
||||||
|
history.VODGone = false
|
||||||
|
|
||||||
|
if vod.Duration.Minutes() > 0 {
|
||||||
|
history.EndedAt = history.StartedAt.Add(vod.Duration)
|
||||||
|
history.EndApproximated = false
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = dbConn.Exec(ctx,
|
||||||
|
`
|
||||||
|
UPDATE twitch_stream_history
|
||||||
|
SET
|
||||||
|
vod_id = $2,
|
||||||
|
vod_url = $3,
|
||||||
|
vod_thumbnail = $4,
|
||||||
|
last_verified_vod = $5,
|
||||||
|
vod_gone = $6,
|
||||||
|
ended_at = $7,
|
||||||
|
end_approximated = $8
|
||||||
|
WHERE stream_id = $1
|
||||||
|
`,
|
||||||
|
history.StreamID,
|
||||||
|
history.VODID,
|
||||||
|
history.VODUrl,
|
||||||
|
history.VODThumbnail,
|
||||||
|
history.LastVerifiedVOD,
|
||||||
|
history.VODGone,
|
||||||
|
history.EndedAt,
|
||||||
|
history.EndApproximated,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return oops.New(err, "failed to update stream history with VOD")
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
log.Debug().Msg("Stream not relevant")
|
if history.StartedAt.Add(14 * 24 * time.Hour).Before(time.Now()) {
|
||||||
twitchLog(ctx, conn, models.TwitchLogTypeOther, status.TwitchLogin, "Marking offline", fmt.Sprintf("%#v", status))
|
history.VODGone = true
|
||||||
_, err := conn.Exec(ctx,
|
_, err = dbConn.Exec(ctx, `
|
||||||
`
|
UPDATE twitch_stream_history
|
||||||
DELETE FROM twitch_stream WHERE twitch_id = $1
|
SET
|
||||||
|
vod_gone = $2,
|
||||||
|
WHERE stream_id = $1
|
||||||
`,
|
`,
|
||||||
status.TwitchID,
|
history.StreamID,
|
||||||
|
history.VODGone,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return oops.New(err, "failed to remove twitch streamer from db")
|
return oops.New(err, "failed to update stream history")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func findMissingVODs(ctx context.Context, dbConn db.ConnOrTx) error {
|
||||||
|
histories, err := db.Query[models.TwitchStreamHistory](ctx, dbConn,
|
||||||
|
`
|
||||||
|
SELECT $columns
|
||||||
|
FROM twitch_stream_history
|
||||||
|
WHERE
|
||||||
|
vod_gone = FALSE AND
|
||||||
|
vod_url = '' AND
|
||||||
|
ended_at != $1
|
||||||
|
`,
|
||||||
|
time.Time{},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return oops.New(err, "failed to fetch stream history for vod updates")
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, history := range histories {
|
||||||
|
err = findHistoryVOD(ctx, dbConn, history)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func verifyHistoryVODs(ctx context.Context, dbConn db.ConnOrTx) error {
|
||||||
|
histories, err := db.Query[models.TwitchStreamHistory](ctx, dbConn,
|
||||||
|
`
|
||||||
|
SELECT $columns
|
||||||
|
FROM twitch_stream_history
|
||||||
|
WHERE
|
||||||
|
vod_gone = FALSE AND
|
||||||
|
vod_id != '' AND
|
||||||
|
last_verified_vod < $1
|
||||||
|
ORDER BY last_verified_vod ASC
|
||||||
|
LIMIT 100
|
||||||
|
`,
|
||||||
|
time.Now().Add(-24*time.Hour),
|
||||||
|
)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return oops.New(err, "failed to fetch stream history for vod verification")
|
||||||
|
}
|
||||||
|
|
||||||
|
videoIDs := make([]string, 0, len(histories))
|
||||||
|
for _, h := range histories {
|
||||||
|
videoIDs = append(videoIDs, h.VODID)
|
||||||
|
}
|
||||||
|
|
||||||
|
VODs, err := getArchivedVideos(ctx, videoIDs)
|
||||||
|
if err != nil {
|
||||||
|
return oops.New(err, "failed to fetch vods from twitch")
|
||||||
|
}
|
||||||
|
|
||||||
|
vodGone := make([]string, 0, len(histories))
|
||||||
|
vodFound := make([]string, 0, len(histories))
|
||||||
|
for _, h := range histories {
|
||||||
|
found := false
|
||||||
|
for _, vod := range VODs {
|
||||||
|
if h.VODID == vod.ID {
|
||||||
|
found = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !found {
|
||||||
|
vodGone = append(vodGone, h.VODID)
|
||||||
|
} else {
|
||||||
|
vodFound = append(vodFound, h.VODID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(vodGone) > 0 {
|
||||||
|
_, err = dbConn.Exec(ctx,
|
||||||
|
`
|
||||||
|
UPDATE twitch_stream_history
|
||||||
|
SET
|
||||||
|
discord_needs_update = TRUE,
|
||||||
|
vod_id = '',
|
||||||
|
vod_url = '',
|
||||||
|
vod_thumbnail = '',
|
||||||
|
last_verified_vod = $2,
|
||||||
|
vod_gone = TRUE
|
||||||
|
WHERE
|
||||||
|
vod_id = ANY($1)
|
||||||
|
`,
|
||||||
|
vodGone,
|
||||||
|
time.Now(),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return oops.New(err, "failed to update twitch history")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(vodFound) > 0 {
|
||||||
|
_, err = dbConn.Exec(ctx,
|
||||||
|
`
|
||||||
|
UPDATE twitch_stream_history
|
||||||
|
SET
|
||||||
|
last_verified_vod = $2,
|
||||||
|
WHERE
|
||||||
|
vod_id = ANY($1)
|
||||||
|
`,
|
||||||
|
vodFound,
|
||||||
|
time.Now(),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return oops.New(err, "failed to update twitch history")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -585,22 +1111,20 @@ var RelevantTags = []string{
|
||||||
"6f86127d-6051-4a38-94bb-f7b475dde109", // Software Development
|
"6f86127d-6051-4a38-94bb-f7b475dde109", // Software Development
|
||||||
}
|
}
|
||||||
|
|
||||||
func isStatusRelevant(status *streamStatus) bool {
|
func isStreamRelevant(catID string, tags []string) bool {
|
||||||
if status.Live {
|
|
||||||
for _, cat := range RelevantCategories {
|
for _, cat := range RelevantCategories {
|
||||||
if status.Category == cat {
|
if cat == catID {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tag := range RelevantTags {
|
for _, tag := range RelevantTags {
|
||||||
for _, streamTag := range status.Tags {
|
for _, streamTag := range tags {
|
||||||
if tag == streamTag {
|
if tag == streamTag {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -97,11 +97,13 @@ func TwitchDebugPage(c *RequestContext) ResponseData {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return c.ErrorResponse(http.StatusInternalServerError, oops.New(err, "failed to fetch twitch streamers"))
|
return c.ErrorResponse(http.StatusInternalServerError, oops.New(err, "failed to fetch twitch streamers"))
|
||||||
}
|
}
|
||||||
live, err := db.Query[models.TwitchStream](c, c.Conn,
|
live, err := db.Query[models.TwitchLatestStatus](c, c.Conn,
|
||||||
`
|
`
|
||||||
SELECT $columns
|
SELECT $columns
|
||||||
FROM
|
FROM
|
||||||
twitch_stream
|
twitch_latest_status
|
||||||
|
WHERE
|
||||||
|
live = TRUE
|
||||||
`,
|
`,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -124,7 +126,7 @@ func TwitchDebugPage(c *RequestContext) ResponseData {
|
||||||
user.Login = u.TwitchLogin
|
user.Login = u.TwitchLogin
|
||||||
user.Live = false
|
user.Live = false
|
||||||
for _, l := range live {
|
for _, l := range live {
|
||||||
if l.Login == u.TwitchLogin {
|
if l.TwitchLogin == u.TwitchLogin {
|
||||||
user.Live = true
|
user.Live = true
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue