Compare commits
5 Commits
02d51a8bfe
...
fe545ff0f3
Author | SHA1 | Date |
---|---|---|
|
fe545ff0f3 | |
|
8c47590b99 | |
|
4c296c9ddd | |
|
9fcc2321ca | |
|
ca614a781b |
|
@ -70,8 +70,8 @@ fi
|
||||||
|
|
||||||
# Install Go
|
# Install Go
|
||||||
if [ $checkpoint -lt 40 ]; then
|
if [ $checkpoint -lt 40 ]; then
|
||||||
wget https://golang.org/dl/go1.17.linux-amd64.tar.gz
|
wget https://go.dev/dl/go1.18.2.linux-amd64.tar.gz
|
||||||
tar -C /usr/local -xzf go1.17.linux-amd64.tar.gz
|
tar -C /usr/local -xzf go1.18.2.linux-amd64.tar.gz
|
||||||
|
|
||||||
export PATH=$PATH:/usr/local/go/bin:/root/go/bin
|
export PATH=$PATH:/usr/local/go/bin:/root/go/bin
|
||||||
echo 'export PATH=$PATH:/usr/local/go/bin:/root/go/bin' >> ~/.bashrc
|
echo 'export PATH=$PATH:/usr/local/go/bin:/root/go/bin' >> ~/.bashrc
|
||||||
|
|
|
@ -9,6 +9,7 @@ import (
|
||||||
"git.handmade.network/hmn/hmn/src/config"
|
"git.handmade.network/hmn/hmn/src/config"
|
||||||
"git.handmade.network/hmn/hmn/src/db"
|
"git.handmade.network/hmn/hmn/src/db"
|
||||||
"git.handmade.network/hmn/hmn/src/hmndata"
|
"git.handmade.network/hmn/hmn/src/hmndata"
|
||||||
|
"git.handmade.network/hmn/hmn/src/logging"
|
||||||
"git.handmade.network/hmn/hmn/src/oops"
|
"git.handmade.network/hmn/hmn/src/oops"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -114,7 +115,8 @@ func UpdateStreamers(ctx context.Context, dbConn db.ConnOrTx, streamers []hmndat
|
||||||
if livestreamMessage != nil {
|
if livestreamMessage != nil {
|
||||||
err = DeleteMessage(ctx, config.Config.Discord.StreamsChannelID, livestreamMessage.MessageID)
|
err = DeleteMessage(ctx, config.Config.Discord.StreamsChannelID, livestreamMessage.MessageID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return oops.New(err, "failed to delete existing discord message from streams channel")
|
log := logging.ExtractLogger(ctx)
|
||||||
|
log.Error().Err(err).Msg("failed to delete existing discord message from streams channel")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -134,6 +134,8 @@ func getStreamStatus(ctx context.Context, twitchIDs []string) ([]streamStatus, e
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, oops.New(err, "failed to read response body while processing stream statuses")
|
return nil, oops.New(err, "failed to read response body while processing stream statuses")
|
||||||
}
|
}
|
||||||
|
log := logging.ExtractLogger(ctx)
|
||||||
|
log.Debug().Str("getStreamStatus response", string(body)).Msg("Got getStreamStatus response")
|
||||||
|
|
||||||
var streamResponse twitchResponse
|
var streamResponse twitchResponse
|
||||||
err = json.Unmarshal(body, &streamResponse)
|
err = json.Unmarshal(body, &streamResponse)
|
||||||
|
|
|
@ -18,8 +18,8 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type twitchNotification struct {
|
type twitchNotification struct {
|
||||||
TwitchID string
|
Status streamStatus
|
||||||
Type twitchNotificationType
|
Type twitchNotificationType
|
||||||
}
|
}
|
||||||
|
|
||||||
var twitchNotificationChannel chan twitchNotification
|
var twitchNotificationChannel chan twitchNotification
|
||||||
|
@ -91,22 +91,19 @@ func MonitorTwitchSubscriptions(ctx context.Context, dbConn *pgxpool.Pool) jobs.
|
||||||
if notification.Type == notificationTypeRevocation {
|
if notification.Type == notificationTypeRevocation {
|
||||||
syncWithTwitch(ctx, dbConn, false)
|
syncWithTwitch(ctx, dbConn, false)
|
||||||
} else {
|
} else {
|
||||||
if notification.Type == notificationTypeChannelUpdate {
|
// NOTE(asaf): The twitch API (getStreamStatus) lags behind the notification and
|
||||||
// NOTE(asaf): The twitch API (getStreamStatus) lags behind the notification and
|
// would return old data if we called it immediately, so we process
|
||||||
// would return old data if we called it immediately, so we have to
|
// the notification to the extent we can, and later do a full update. We can get the
|
||||||
// wait a bit before we process the notification. We can get the
|
// category from the notification, but not the tags (or the up-to-date title),
|
||||||
// category from the notification, but not the tags (or the up-to-date title),
|
// so we can't really skip this.
|
||||||
// so we can't really skip this.
|
var timer *time.Timer
|
||||||
var timer *time.Timer
|
t := time.AfterFunc(3*time.Minute, func() {
|
||||||
t := time.AfterFunc(3*time.Minute, func() {
|
expiredTimers <- timer
|
||||||
expiredTimers <- timer
|
updateStreamStatus(ctx, dbConn, notification.Status.TwitchID, notification.Status.TwitchLogin)
|
||||||
processEventSubNotification(ctx, dbConn, ¬ification)
|
})
|
||||||
})
|
timer = t
|
||||||
timer = t
|
timers = append(timers, t)
|
||||||
timers = append(timers, t)
|
processEventSubNotification(ctx, dbConn, ¬ification)
|
||||||
} else {
|
|
||||||
processEventSubNotification(ctx, dbConn, ¬ification)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -136,6 +133,8 @@ func QueueTwitchNotification(messageType string, body []byte) error {
|
||||||
Event struct {
|
Event struct {
|
||||||
BroadcasterUserID string `json:"broadcaster_user_id"`
|
BroadcasterUserID string `json:"broadcaster_user_id"`
|
||||||
BroadcasterUserLogin string `json:"broadcaster_user_login"`
|
BroadcasterUserLogin string `json:"broadcaster_user_login"`
|
||||||
|
Title string `json:"title"`
|
||||||
|
CategoryID string `json:"category_id"`
|
||||||
} `json:"event"`
|
} `json:"event"`
|
||||||
}
|
}
|
||||||
var incoming notificationJson
|
var incoming notificationJson
|
||||||
|
@ -144,14 +143,20 @@ func QueueTwitchNotification(messageType string, body []byte) error {
|
||||||
return oops.New(err, "failed to parse notification body")
|
return oops.New(err, "failed to parse notification body")
|
||||||
}
|
}
|
||||||
|
|
||||||
notification.TwitchID = incoming.Event.BroadcasterUserID
|
notification.Status.TwitchID = incoming.Event.BroadcasterUserID
|
||||||
|
notification.Status.TwitchLogin = incoming.Event.BroadcasterUserLogin
|
||||||
|
notification.Status.Title = incoming.Event.Title
|
||||||
|
notification.Status.Category = incoming.Event.CategoryID
|
||||||
|
notification.Status.StartedAt = time.Now()
|
||||||
switch incoming.Subscription.Type {
|
switch incoming.Subscription.Type {
|
||||||
case "stream.online":
|
case "stream.online":
|
||||||
notification.Type = notificationTypeOnline
|
notification.Type = notificationTypeOnline
|
||||||
|
notification.Status.Live = true
|
||||||
case "stream.offline":
|
case "stream.offline":
|
||||||
notification.Type = notificationTypeOffline
|
notification.Type = notificationTypeOffline
|
||||||
case "channel.update":
|
case "channel.update":
|
||||||
notification.Type = notificationTypeChannelUpdate
|
notification.Type = notificationTypeChannelUpdate
|
||||||
|
// NOTE(asaf): Can't tell if the user is live here.
|
||||||
default:
|
default:
|
||||||
return oops.New(nil, "unknown subscription type received")
|
return oops.New(nil, "unknown subscription type received")
|
||||||
}
|
}
|
||||||
|
@ -410,6 +415,55 @@ func notifyDiscordOfLiveStream(ctx context.Context, dbConn db.ConnOrTx) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func updateStreamStatus(ctx context.Context, dbConn db.ConnOrTx, twitchID string, twitchLogin string) {
|
||||||
|
log := logging.ExtractLogger(ctx)
|
||||||
|
log.Debug().Str("TwitchID", twitchID).Msg("Updating stream status")
|
||||||
|
var err error
|
||||||
|
|
||||||
|
// NOTE(asaf): Verifying that the streamer we're processing hasn't been removed from our db in the meantime.
|
||||||
|
foundStreamer := false
|
||||||
|
allStreamers, err := hmndata.FetchTwitchStreamers(ctx, dbConn)
|
||||||
|
if err != nil {
|
||||||
|
log.Error().Err(err).Msg("failed to fetch hmn streamers")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for _, streamer := range allStreamers {
|
||||||
|
if streamer.TwitchLogin == twitchLogin {
|
||||||
|
foundStreamer = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !foundStreamer {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
status := streamStatus{
|
||||||
|
TwitchID: twitchID,
|
||||||
|
Live: false,
|
||||||
|
}
|
||||||
|
|
||||||
|
result, err := getStreamStatus(ctx, []string{twitchID})
|
||||||
|
if err != nil {
|
||||||
|
log.Error().Str("TwitchID", twitchID).Err(err).Msg("failed to fetch stream status")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if len(result) > 0 {
|
||||||
|
log.Debug().Interface("Got status", result[0]).Msg("Got streamer status from twitch")
|
||||||
|
status = result[0]
|
||||||
|
}
|
||||||
|
err = updateStreamStatusInDB(ctx, dbConn, &status)
|
||||||
|
if err != nil {
|
||||||
|
log.Error().Err(err).Msg("failed to update twitch stream status")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Debug().Msg("Notifying discord")
|
||||||
|
err = notifyDiscordOfLiveStream(ctx, dbConn)
|
||||||
|
if err != nil {
|
||||||
|
log.Error().Err(err).Msg("failed to notify discord")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func processEventSubNotification(ctx context.Context, dbConn db.ConnOrTx, notification *twitchNotification) {
|
func processEventSubNotification(ctx context.Context, dbConn db.ConnOrTx, notification *twitchNotification) {
|
||||||
log := logging.ExtractLogger(ctx)
|
log := logging.ExtractLogger(ctx)
|
||||||
log.Debug().Interface("Notification", notification).Msg("Processing twitch notification")
|
log.Debug().Interface("Notification", notification).Msg("Processing twitch notification")
|
||||||
|
@ -417,34 +471,47 @@ func processEventSubNotification(ctx context.Context, dbConn db.ConnOrTx, notifi
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
status := streamStatus{
|
// NOTE(asaf): Verifying that the streamer we're processing hasn't been removed from our db in the meantime.
|
||||||
TwitchID: notification.TwitchID,
|
foundStreamer := false
|
||||||
Live: false,
|
allStreamers, err := hmndata.FetchTwitchStreamers(ctx, dbConn)
|
||||||
|
if err != nil {
|
||||||
|
log.Error().Err(err).Msg("failed to fetch hmn streamers")
|
||||||
|
return
|
||||||
}
|
}
|
||||||
var err error
|
for _, streamer := range allStreamers {
|
||||||
if notification.Type == notificationTypeChannelUpdate || notification.Type == notificationTypeOnline {
|
if streamer.TwitchLogin == notification.Status.TwitchLogin {
|
||||||
result, err := getStreamStatus(ctx, []string{notification.TwitchID})
|
foundStreamer = true
|
||||||
if err != nil || len(result) == 0 {
|
break
|
||||||
log.Error().Str("TwitchID", notification.TwitchID).Err(err).Msg("failed to fetch stream status")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
allStreamers, err := hmndata.FetchTwitchStreamers(ctx, dbConn)
|
|
||||||
if err != nil {
|
|
||||||
log.Error().Err(err).Msg("failed to fetch hmn streamers")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
for _, streamer := range allStreamers {
|
|
||||||
if streamer.TwitchLogin == result[0].TwitchLogin {
|
|
||||||
status = result[0]
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if !foundStreamer {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
log.Debug().Interface("Status", status).Msg("Updating status")
|
if notification.Type == notificationTypeOnline || notification.Type == notificationTypeOffline {
|
||||||
err = updateStreamStatusInDB(ctx, dbConn, &status)
|
log.Debug().Interface("Status", notification.Status).Msg("Updating status")
|
||||||
if err != nil {
|
err = updateStreamStatusInDB(ctx, dbConn, ¬ification.Status)
|
||||||
log.Error().Err(err).Msg("failed to update twitch stream status")
|
if err != nil {
|
||||||
|
log.Error().Err(err).Msg("failed to update twitch stream status")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
} else if notification.Type == notificationTypeChannelUpdate {
|
||||||
|
// NOTE(asaf): Channel updates can happen wether or not the streamer is live.
|
||||||
|
// So we just update the title if the user is live in our db, and we rely on the
|
||||||
|
// 3-minute delayed status update to verify live status and category/tag requirements.
|
||||||
|
_, err = dbConn.Exec(ctx,
|
||||||
|
`
|
||||||
|
UPDATE twitch_stream
|
||||||
|
SET title = $1
|
||||||
|
WHERE twitch_id = $2
|
||||||
|
`,
|
||||||
|
notification.Status.Title,
|
||||||
|
notification.Status.TwitchID,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
log.Error().Err(err).Msg("failed to update twitch stream status")
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debug().Msg("Notifying discord")
|
log.Debug().Msg("Notifying discord")
|
||||||
|
|
|
@ -74,7 +74,7 @@ func TwitchDebugPage(c *RequestContext) ResponseData {
|
||||||
`
|
`
|
||||||
SELECT $columns
|
SELECT $columns
|
||||||
FROM
|
FROM
|
||||||
twitch_streams
|
twitch_stream
|
||||||
ORDER BY started_at DESC
|
ORDER BY started_at DESC
|
||||||
`,
|
`,
|
||||||
)
|
)
|
||||||
|
|
Loading…
Reference in New Issue