|
|
|
@ -18,8 +18,8 @@ import (
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
type twitchNotification struct {
|
|
|
|
|
TwitchID string
|
|
|
|
|
Type twitchNotificationType
|
|
|
|
|
Status streamStatus
|
|
|
|
|
Type twitchNotificationType
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var twitchNotificationChannel chan twitchNotification
|
|
|
|
@ -91,22 +91,19 @@ func MonitorTwitchSubscriptions(ctx context.Context, dbConn *pgxpool.Pool) jobs.
|
|
|
|
|
if notification.Type == notificationTypeRevocation {
|
|
|
|
|
syncWithTwitch(ctx, dbConn, false)
|
|
|
|
|
} else {
|
|
|
|
|
if notification.Type == notificationTypeChannelUpdate {
|
|
|
|
|
// NOTE(asaf): The twitch API (getStreamStatus) lags behind the notification and
|
|
|
|
|
// would return old data if we called it immediately, so we have to
|
|
|
|
|
// wait a bit before we process the notification. We can get the
|
|
|
|
|
// category from the notification, but not the tags (or the up-to-date title),
|
|
|
|
|
// so we can't really skip this.
|
|
|
|
|
var timer *time.Timer
|
|
|
|
|
t := time.AfterFunc(3*time.Minute, func() {
|
|
|
|
|
expiredTimers <- timer
|
|
|
|
|
processEventSubNotification(ctx, dbConn, ¬ification)
|
|
|
|
|
})
|
|
|
|
|
timer = t
|
|
|
|
|
timers = append(timers, t)
|
|
|
|
|
} else {
|
|
|
|
|
processEventSubNotification(ctx, dbConn, ¬ification)
|
|
|
|
|
}
|
|
|
|
|
// NOTE(asaf): The twitch API (getStreamStatus) lags behind the notification and
|
|
|
|
|
// would return old data if we called it immediately, so we process
|
|
|
|
|
// the notification to the extent we can, and later do a full update. We can get the
|
|
|
|
|
// category from the notification, but not the tags (or the up-to-date title),
|
|
|
|
|
// so we can't really skip this.
|
|
|
|
|
var timer *time.Timer
|
|
|
|
|
t := time.AfterFunc(3*time.Minute, func() {
|
|
|
|
|
expiredTimers <- timer
|
|
|
|
|
updateStreamStatus(ctx, dbConn, notification.Status.TwitchID, notification.Status.TwitchLogin)
|
|
|
|
|
})
|
|
|
|
|
timer = t
|
|
|
|
|
timers = append(timers, t)
|
|
|
|
|
processEventSubNotification(ctx, dbConn, ¬ification)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -136,6 +133,8 @@ func QueueTwitchNotification(messageType string, body []byte) error {
|
|
|
|
|
Event struct {
|
|
|
|
|
BroadcasterUserID string `json:"broadcaster_user_id"`
|
|
|
|
|
BroadcasterUserLogin string `json:"broadcaster_user_login"`
|
|
|
|
|
Title string `json:"title"`
|
|
|
|
|
CategoryID string `json:"category_id"`
|
|
|
|
|
} `json:"event"`
|
|
|
|
|
}
|
|
|
|
|
var incoming notificationJson
|
|
|
|
@ -144,14 +143,20 @@ func QueueTwitchNotification(messageType string, body []byte) error {
|
|
|
|
|
return oops.New(err, "failed to parse notification body")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
notification.TwitchID = incoming.Event.BroadcasterUserID
|
|
|
|
|
notification.Status.TwitchID = incoming.Event.BroadcasterUserID
|
|
|
|
|
notification.Status.TwitchLogin = incoming.Event.BroadcasterUserLogin
|
|
|
|
|
notification.Status.Title = incoming.Event.Title
|
|
|
|
|
notification.Status.Category = incoming.Event.CategoryID
|
|
|
|
|
notification.Status.StartedAt = time.Now()
|
|
|
|
|
switch incoming.Subscription.Type {
|
|
|
|
|
case "stream.online":
|
|
|
|
|
notification.Type = notificationTypeOnline
|
|
|
|
|
notification.Status.Live = true
|
|
|
|
|
case "stream.offline":
|
|
|
|
|
notification.Type = notificationTypeOffline
|
|
|
|
|
case "channel.update":
|
|
|
|
|
notification.Type = notificationTypeChannelUpdate
|
|
|
|
|
// NOTE(asaf): Can't tell if the user is live here.
|
|
|
|
|
default:
|
|
|
|
|
return oops.New(nil, "unknown subscription type received")
|
|
|
|
|
}
|
|
|
|
@ -410,6 +415,55 @@ func notifyDiscordOfLiveStream(ctx context.Context, dbConn db.ConnOrTx) error {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func updateStreamStatus(ctx context.Context, dbConn db.ConnOrTx, twitchID string, twitchLogin string) {
|
|
|
|
|
log := logging.ExtractLogger(ctx)
|
|
|
|
|
log.Debug().Str("TwitchID", twitchID).Msg("Updating stream status")
|
|
|
|
|
var err error
|
|
|
|
|
|
|
|
|
|
// NOTE(asaf): Verifying that the streamer we're processing hasn't been removed from our db in the meantime.
|
|
|
|
|
foundStreamer := false
|
|
|
|
|
allStreamers, err := hmndata.FetchTwitchStreamers(ctx, dbConn)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Error().Err(err).Msg("failed to fetch hmn streamers")
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
for _, streamer := range allStreamers {
|
|
|
|
|
if streamer.TwitchLogin == twitchLogin {
|
|
|
|
|
foundStreamer = true
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if !foundStreamer {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
status := streamStatus{
|
|
|
|
|
TwitchID: twitchID,
|
|
|
|
|
Live: false,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
result, err := getStreamStatus(ctx, []string{twitchID})
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Error().Str("TwitchID", twitchID).Err(err).Msg("failed to fetch stream status")
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if len(result) > 0 {
|
|
|
|
|
log.Debug().Interface("Got status", result[0]).Msg("Got streamer status from twitch")
|
|
|
|
|
status = result[0]
|
|
|
|
|
}
|
|
|
|
|
err = updateStreamStatusInDB(ctx, dbConn, &status)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Error().Err(err).Msg("failed to update twitch stream status")
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
log.Debug().Msg("Notifying discord")
|
|
|
|
|
err = notifyDiscordOfLiveStream(ctx, dbConn)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Error().Err(err).Msg("failed to notify discord")
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func processEventSubNotification(ctx context.Context, dbConn db.ConnOrTx, notification *twitchNotification) {
|
|
|
|
|
log := logging.ExtractLogger(ctx)
|
|
|
|
|
log.Debug().Interface("Notification", notification).Msg("Processing twitch notification")
|
|
|
|
@ -417,34 +471,47 @@ func processEventSubNotification(ctx context.Context, dbConn db.ConnOrTx, notifi
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
status := streamStatus{
|
|
|
|
|
TwitchID: notification.TwitchID,
|
|
|
|
|
Live: false,
|
|
|
|
|
// NOTE(asaf): Verifying that the streamer we're processing hasn't been removed from our db in the meantime.
|
|
|
|
|
foundStreamer := false
|
|
|
|
|
allStreamers, err := hmndata.FetchTwitchStreamers(ctx, dbConn)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Error().Err(err).Msg("failed to fetch hmn streamers")
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
var err error
|
|
|
|
|
if notification.Type == notificationTypeChannelUpdate || notification.Type == notificationTypeOnline {
|
|
|
|
|
result, err := getStreamStatus(ctx, []string{notification.TwitchID})
|
|
|
|
|
if err != nil || len(result) == 0 {
|
|
|
|
|
log.Error().Str("TwitchID", notification.TwitchID).Err(err).Msg("failed to fetch stream status")
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
allStreamers, err := hmndata.FetchTwitchStreamers(ctx, dbConn)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Error().Err(err).Msg("failed to fetch hmn streamers")
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
for _, streamer := range allStreamers {
|
|
|
|
|
if streamer.TwitchLogin == result[0].TwitchLogin {
|
|
|
|
|
status = result[0]
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
for _, streamer := range allStreamers {
|
|
|
|
|
if streamer.TwitchLogin == notification.Status.TwitchLogin {
|
|
|
|
|
foundStreamer = true
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if !foundStreamer {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
log.Debug().Interface("Status", status).Msg("Updating status")
|
|
|
|
|
err = updateStreamStatusInDB(ctx, dbConn, &status)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Error().Err(err).Msg("failed to update twitch stream status")
|
|
|
|
|
if notification.Type == notificationTypeOnline || notification.Type == notificationTypeOffline {
|
|
|
|
|
log.Debug().Interface("Status", notification.Status).Msg("Updating status")
|
|
|
|
|
err = updateStreamStatusInDB(ctx, dbConn, ¬ification.Status)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Error().Err(err).Msg("failed to update twitch stream status")
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
} else if notification.Type == notificationTypeChannelUpdate {
|
|
|
|
|
// NOTE(asaf): Channel updates can happen wether or not the streamer is live.
|
|
|
|
|
// So we just update the title if the user is live in our db, and we rely on the
|
|
|
|
|
// 3-minute delayed status update to verify live status and category/tag requirements.
|
|
|
|
|
_, err = dbConn.Exec(ctx,
|
|
|
|
|
`
|
|
|
|
|
UPDATE twitch_stream
|
|
|
|
|
SET title = $1
|
|
|
|
|
WHERE twitch_id = $2
|
|
|
|
|
`,
|
|
|
|
|
notification.Status.Title,
|
|
|
|
|
notification.Status.TwitchID,
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Error().Err(err).Msg("failed to update twitch stream status")
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
log.Debug().Msg("Notifying discord")
|
|
|
|
|