Twitch monitoring

This commit is contained in:
Asaf Gartner 2022-03-22 20:07:43 +02:00
parent 5c29f3f814
commit 11dd75ad03
14 changed files with 1268 additions and 0 deletions

View File

@ -57,6 +57,14 @@ var Config = HMNConfig{
MemberRoleID: "",
ShowcaseChannelID: "",
LibraryChannelID: "",
StreamsChannelID: "",
},
Twitch: TwitchConfig{
ClientID: "",
ClientSecret: "",
EventSubSecret: "",
BaseUrl: "https://api.twitch.tv/helix",
BaseIDUrl: "https://id.twitch.tv/oauth2",
},
EpisodeGuide: EpisodeGuide{
CineraOutputPath: "./annotations/",

View File

@ -27,6 +27,7 @@ type HMNConfig struct {
Email EmailConfig
DigitalOcean DigitalOceanConfig
Discord DiscordConfig
Twitch TwitchConfig
EpisodeGuide EpisodeGuide
}
@ -76,9 +77,18 @@ type DiscordConfig struct {
MemberRoleID string
ShowcaseChannelID string
LibraryChannelID string
StreamsChannelID string
JamShowcaseChannelID string
}
type TwitchConfig struct {
ClientID string
ClientSecret string
EventSubSecret string // NOTE(asaf): Between 10-100 chars long. Anything will do.
BaseUrl string
BaseIDUrl string
}
type EpisodeGuide struct {
CineraOutputPath string
Projects map[string]string // NOTE(asaf): Maps from slugs to default episode guide topic

97
src/hmndata/twitch.go Normal file
View File

@ -0,0 +1,97 @@
package hmndata
import (
"context"
"regexp"
"strings"
"git.handmade.network/hmn/hmn/src/db"
"git.handmade.network/hmn/hmn/src/models"
"git.handmade.network/hmn/hmn/src/oops"
)
const InvalidUserTwitchID = "INVALID_USER"
type TwitchStreamer struct {
TwitchID string
TwitchLogin string
UserID *int
ProjectID *int
}
var twitchRegex = regexp.MustCompile(`twitch\.tv/(?P<login>[^/]+)$`)
func FetchTwitchStreamers(ctx context.Context, dbConn db.ConnOrTx) ([]TwitchStreamer, error) {
streamers, err := db.Query(ctx, dbConn, models.Link{},
`
SELECT $columns
FROM
handmade_links AS link
WHERE
url ~* 'twitch\.tv/([^/]+)$'
`,
)
if err != nil {
return nil, oops.New(err, "failed to fetch twitch links")
}
result := make([]TwitchStreamer, 0, len(streamers))
for _, s := range streamers {
dbStreamer := s.(*models.Link)
streamer := TwitchStreamer{
UserID: dbStreamer.UserID,
ProjectID: dbStreamer.ProjectID,
}
match := twitchRegex.FindStringSubmatch(dbStreamer.URL)
if match != nil {
login := strings.ToLower(match[twitchRegex.SubexpIndex("login")])
streamer.TwitchLogin = login
}
if len(streamer.TwitchLogin) > 0 {
duplicate := false
for _, r := range result {
if r.TwitchLogin == streamer.TwitchLogin {
duplicate = true
break
}
}
if !duplicate {
result = append(result, streamer)
}
}
}
return result, nil
}
func FetchTwitchLoginsForUserOrProject(ctx context.Context, dbConn db.ConnOrTx, userId *int, projectId *int) ([]string, error) {
links, err := db.Query(ctx, dbConn, models.Link{},
`
SELECT $columns
FROM
handmade_links AS link
WHERE
url ~* 'twitch\.tv/([^/]+)$'
AND ((user_id = $1 AND project_id IS NULL) OR (user_id IS NULL AND project_id = $2))
ORDER BY url ASC
`,
userId,
projectId,
)
if err != nil {
return nil, oops.New(err, "failed to fetch twitch links")
}
result := make([]string, 0, len(links))
for _, l := range links {
url := l.(*models.Link).URL
match := twitchRegex.FindStringSubmatch(url)
if match != nil {
login := strings.ToLower(match[twitchRegex.SubexpIndex("login")])
result = append(result, login)
}
}
return result, nil
}

View File

@ -687,6 +687,18 @@ func BuildAPICheckUsername() string {
return Url("/api/check_username", nil)
}
/*
* Twitch stuff
*/
var RegexTwitchEventSubCallback = regexp.MustCompile("^/twitch_eventsub$")
func BuildTwitchEventSubCallback() string {
return Url("/twitch_eventsub", nil)
}
var RegexTwitchDebugPage = regexp.MustCompile("^/twitch_debug$")
/*
* User assets
*/

View File

@ -0,0 +1,60 @@
package migrations
import (
"context"
"time"
"git.handmade.network/hmn/hmn/src/migration/types"
"git.handmade.network/hmn/hmn/src/oops"
"github.com/jackc/pgx/v4"
)
func init() {
registerMigration(TwitchTables{})
}
type TwitchTables struct{}
func (m TwitchTables) Version() types.MigrationVersion {
return types.MigrationVersion(time.Date(2022, 3, 15, 1, 21, 44, 0, time.UTC))
}
func (m TwitchTables) Name() string {
return "TwitchTables"
}
func (m TwitchTables) Description() string {
return "Create tables for live twitch streams and twitch ID cache"
}
func (m TwitchTables) Up(ctx context.Context, tx pgx.Tx) error {
_, err := tx.Exec(ctx,
`
CREATE TABLE twitch_streams (
twitch_id VARCHAR(255) NOT NULL,
twitch_login VARCHAR(255) NOT NULL,
title VARCHAR(255) NOT NULL,
started_at TIMESTAMP WITH TIME ZONE
);
`,
)
if err != nil {
return oops.New(err, "failed to create twitch tables")
}
return err
}
func (m TwitchTables) Down(ctx context.Context, tx pgx.Tx) error {
_, err := tx.Exec(ctx,
`
DROP TABLE twitch_ids;
DROP TABLE twitch_streams;
`,
)
if err != nil {
return oops.New(err, "failed to create twitch tables")
}
return err
}

View File

@ -0,0 +1,45 @@
package migrations
import (
"context"
"time"
"git.handmade.network/hmn/hmn/src/migration/types"
"github.com/jackc/pgx/v4"
)
func init() {
registerMigration(AddIndexOnTwitchStreams{})
}
type AddIndexOnTwitchStreams struct{}
func (m AddIndexOnTwitchStreams) Version() types.MigrationVersion {
return types.MigrationVersion(time.Date(2022, 3, 15, 6, 35, 6, 0, time.UTC))
}
func (m AddIndexOnTwitchStreams) Name() string {
return "AddIndexOnTwitchStreams"
}
func (m AddIndexOnTwitchStreams) Description() string {
return "Add unique index on twitch streams"
}
func (m AddIndexOnTwitchStreams) Up(ctx context.Context, tx pgx.Tx) error {
_, err := tx.Exec(ctx,
`
CREATE UNIQUE INDEX twitch_streams_twitch_id ON twitch_streams (twitch_id);
`,
)
return err
}
func (m AddIndexOnTwitchStreams) Down(ctx context.Context, tx pgx.Tx) error {
_, err := tx.Exec(ctx,
`
DROP INDEX twitch_streams_twitch_id;
`,
)
return err
}

15
src/models/twitch.go Normal file
View File

@ -0,0 +1,15 @@
package models
import "time"
type TwitchID struct {
ID string `db:"id"`
Login string `db:"login"`
}
type TwitchStream struct {
ID string `db:"twitch_id"`
Login string `db:"twitch_login"`
Title string `db:"title"`
StartedAt time.Time `db:"started_at"`
}

450
src/twitch/rest.go Normal file
View File

@ -0,0 +1,450 @@
package twitch
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"strconv"
"strings"
"time"
"git.handmade.network/hmn/hmn/src/config"
"git.handmade.network/hmn/hmn/src/hmnurl"
"git.handmade.network/hmn/hmn/src/logging"
"git.handmade.network/hmn/hmn/src/oops"
"git.handmade.network/hmn/hmn/src/utils"
)
var twitchAPIBaseUrl = config.Config.Twitch.BaseUrl
var HitRateLimit = errors.New("hit rate limit")
var MaxRetries = errors.New("hit max retries")
var httpClient = &http.Client{}
// NOTE(asaf): Access token is not thread-safe right now.
// All twitch requests are made through the goroutine in MonitorTwitchSubscriptions.
var activeAccessToken string
var rateLimitReset time.Time
type twitchUser struct {
TwitchID string
TwitchLogin string
}
func getTwitchUsersByLogin(ctx context.Context, logins []string) ([]twitchUser, error) {
result := make([]twitchUser, 0, len(logins))
numChunks := len(logins)/100 + 1
for i := 0; i < numChunks; i++ {
query := url.Values{}
query.Add("first", "100")
for _, login := range logins[i*100 : utils.IntMin((i+1)*100, len(logins))] {
query.Add("login", login)
}
req, err := http.NewRequestWithContext(ctx, "GET", buildUrl("/users", query.Encode()), nil)
if err != nil {
return nil, oops.New(err, "failed to create requset")
}
res, err := doRequest(ctx, true, req)
if err != nil {
return nil, oops.New(err, "failed to fetch twitch users")
}
type user struct {
ID string `json:"id"`
Login string `json:"login"`
}
type twitchResponse struct {
Data []user `json:"data"`
}
body, err := io.ReadAll(res.Body)
res.Body.Close()
if err != nil {
return nil, oops.New(err, "failed to read response body while fetching twitch users")
}
var userResponse twitchResponse
err = json.Unmarshal(body, &userResponse)
if err != nil {
return nil, oops.New(err, "failed to parse twitch response while fetching twitch users")
}
for _, u := range userResponse.Data {
result = append(result, twitchUser{
TwitchID: u.ID,
TwitchLogin: u.Login,
})
}
}
return result, nil
}
type streamStatus struct {
TwitchID string
TwitchLogin string
Live bool
Title string
StartedAt time.Time
Category string
Tags []string
}
func getStreamStatus(ctx context.Context, twitchIDs []string) ([]streamStatus, error) {
result := make([]streamStatus, 0, len(twitchIDs))
numChunks := len(twitchIDs)/100 + 1
for i := 0; i < numChunks; i++ {
query := url.Values{}
query.Add("first", "100")
for _, tid := range twitchIDs[i*100 : utils.IntMin((i+1)*100, len(twitchIDs))] {
query.Add("user_id", tid)
}
req, err := http.NewRequestWithContext(ctx, "GET", buildUrl("/streams", query.Encode()), nil)
if err != nil {
return nil, oops.New(err, "failed to create request")
}
res, err := doRequest(ctx, true, req)
if err != nil {
return nil, oops.New(err, "failed to fetch stream statuses")
}
type twitchStatus struct {
TwitchID string `json:"user_id"`
TwitchLogin string `json:"user_login"`
GameID string `json:"game_id"`
Type string `json:"type"`
Title string `json:"title"`
StartedAt string `json:"started_at"`
Thumbnail string `json:"thumbnail_url"`
Tags []string `json:"tag_ids"`
}
type twitchResponse struct {
Data []twitchStatus `json:"data"`
}
body, err := io.ReadAll(res.Body)
res.Body.Close()
if err != nil {
return nil, oops.New(err, "failed to read response body while processing stream statuses")
}
var streamResponse twitchResponse
err = json.Unmarshal(body, &streamResponse)
if err != nil {
return nil, oops.New(err, "failed to parse twitch response while processing stream statuses")
}
for _, d := range streamResponse.Data {
started, err := time.Parse(time.RFC3339, d.StartedAt)
if err != nil {
logging.ExtractLogger(ctx).Warn().Str("Time string", d.StartedAt).Msg("Failed to parse twitch timestamp")
started = time.Now()
}
status := streamStatus{
TwitchID: d.TwitchID,
TwitchLogin: d.TwitchLogin,
Live: d.Type == "live",
Title: d.Title,
StartedAt: started,
Category: d.GameID,
Tags: d.Tags,
}
result = append(result, status)
}
}
return result, nil
}
type twitchEventSub struct {
EventID string
TwitchID string
Type string
GoodStatus bool
}
func getEventSubscriptions(ctx context.Context) ([]twitchEventSub, error) {
result := make([]twitchEventSub, 0)
after := ""
for {
query := url.Values{}
if len(after) > 0 {
query.Add("after", after)
}
req, err := http.NewRequestWithContext(ctx, "GET", buildUrl("/eventsub/subscriptions", query.Encode()), nil)
if err != nil {
return nil, oops.New(err, "failed to create request")
}
res, err := doRequest(ctx, true, req)
if err != nil {
return nil, oops.New(err, "failed to fetch twitch event subscriptions")
}
type eventSub struct {
ID string `json:"id"`
Status string `json:"status"`
Type string `json:"type"`
Condition struct {
TwitchID string `json:"broadcaster_user_id"`
} `json:"condition"`
}
type twitchResponse struct {
Data []eventSub `json:"data"`
Pagination *struct {
After string `json:"after"`
} `json:"pagination"`
}
body, err := io.ReadAll(res.Body)
res.Body.Close()
if err != nil {
return nil, oops.New(err, "failed to read response body while fetching twitch eventsubs")
}
var eventSubResponse twitchResponse
err = json.Unmarshal(body, &eventSubResponse)
if err != nil {
return nil, oops.New(err, "failed to parse twitch response while fetching twitch eventsubs")
}
for _, es := range eventSubResponse.Data {
result = append(result, twitchEventSub{
EventID: es.ID,
TwitchID: es.Condition.TwitchID,
Type: es.Type,
GoodStatus: es.Status == "enabled" || es.Status == "webhook_callback_verification_pending",
})
}
if eventSubResponse.Pagination == nil || eventSubResponse.Pagination.After == "" {
return result, nil
} else {
after = eventSubResponse.Pagination.After
}
}
}
func subscribeToEvent(ctx context.Context, eventType string, twitchID string) error {
type eventBody struct {
Type string `json:"type"`
Version string `json:"version"`
Condition struct {
TwitchID string `json:"broadcaster_user_id"`
} `json:"condition"`
Transport struct {
Method string `json:"method"`
Callback string `json:"callback"`
Secret string `json:"secret"`
} `json:"transport"`
}
ev := eventBody{
Type: eventType,
Version: "1",
}
ev.Condition.TwitchID = twitchID
ev.Transport.Method = "webhook"
// NOTE(asaf): Twitch has special treatment for localhost. We can keep this around for live/beta because it just won't replace anything.
ev.Transport.Callback = strings.ReplaceAll(hmnurl.BuildTwitchEventSubCallback(), "handmade.local:9001", "localhost")
ev.Transport.Secret = config.Config.Twitch.EventSubSecret
evJson, err := json.Marshal(ev)
if err != nil {
return oops.New(err, "failed to marshal event sub data")
}
req, err := http.NewRequestWithContext(ctx, "POST", buildUrl("/eventsub/subscriptions", ""), bytes.NewReader(evJson))
req.Header.Set("Content-Type", "application/json")
if err != nil {
return oops.New(err, "failed to create request")
}
res, err := doRequest(ctx, true, req)
if err != nil {
return oops.New(err, "failed to create new event subscription")
}
defer readAndClose(res)
if res.StatusCode != 201 {
body, err := io.ReadAll(res.Body)
res.Body.Close()
if err != nil {
return oops.New(err, "failed to read response body while creating twitch eventsubs")
}
logging.ExtractLogger(ctx).Error().Interface("Headers", res.Header).Int("Status code", res.StatusCode).Str("Body", string(body[:])).Msg("Failed to create twitch event sub")
return oops.New(nil, "failed to create new event subscription")
}
return nil
}
func unsubscribeFromEvent(ctx context.Context, eventID string) error {
query := url.Values{}
query.Add("id", eventID)
req, err := http.NewRequestWithContext(ctx, "DELETE", buildUrl("/eventsub/subscriptions", query.Encode()), nil)
if err != nil {
return oops.New(err, "failed to create request")
}
res, err := doRequest(ctx, true, req)
if err != nil {
return oops.New(err, "failed to delete new event subscription")
}
defer readAndClose(res)
if res.StatusCode != 204 {
body, err := io.ReadAll(res.Body)
res.Body.Close()
if err != nil {
return oops.New(err, "failed to read response body while deleting twitch eventsubs")
}
logging.ExtractLogger(ctx).Error().Interface("Headers", res.Header).Int("Status code", res.StatusCode).Str("Body", string(body[:])).Msg("Failed to delete twitch event sub")
return oops.New(nil, "failed to delete new event subscription")
}
return nil
}
func doRequest(ctx context.Context, waitOnRateLimit bool, req *http.Request) (*http.Response, error) {
serviceUnavailable := false
numRetries := 5
for {
if numRetries == 0 {
return nil, MaxRetries
}
numRetries -= 1
now := time.Now()
if rateLimitReset.After(now) {
if waitOnRateLimit {
timer := time.NewTimer(rateLimitReset.Sub(now))
select {
case <-timer.C:
case <-ctx.Done():
return nil, errors.New("request interrupted during rate limiting")
}
} else {
return nil, HitRateLimit
}
}
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", activeAccessToken))
req.Header.Set("Client-Id", config.Config.Twitch.ClientID)
res, err := httpClient.Do(req)
if err != nil {
return nil, oops.New(err, "twitch request failed")
}
if res.StatusCode != 503 {
serviceUnavailable = false
}
if res.StatusCode >= 200 && res.StatusCode < 300 {
return res, nil
} else if res.StatusCode == 503 {
readAndClose(res)
if serviceUnavailable {
// NOTE(asaf): The docs say we should retry once if we receive 503
return nil, oops.New(nil, "got 503 Service Unavailable twice in a row")
} else {
serviceUnavailable = true
}
} else if res.StatusCode == 429 {
logging.ExtractLogger(ctx).Warn().Interface("Headers", res.Header).Msg("Hit Twitch rate limit")
err = updateRateLimitReset(res)
if err != nil {
return nil, err
}
} else if res.StatusCode == 401 {
logging.ExtractLogger(ctx).Warn().Msg("Twitch refresh token is invalid. Renewing...")
readAndClose(res)
err = refreshAccessToken(ctx)
if err != nil {
return nil, err
}
} else {
body, err := io.ReadAll(res.Body)
if err != nil {
return nil, oops.New(err, "failed to read response body")
}
logging.ExtractLogger(ctx).Warn().Interface("Headers", res.Header).Int("Status code", res.StatusCode).Str("Body", string(body[:])).Msg("Unexpected status code from twitch")
res.Body.Close()
return res, oops.New(nil, "got an unexpected status code from twitch")
}
}
}
func updateRateLimitReset(res *http.Response) error {
defer readAndClose(res)
resetStr := res.Header.Get("Ratelimit-Reset")
if len(resetStr) == 0 {
return oops.New(nil, "no ratelimit data on response")
}
resetUnix, err := strconv.Atoi(resetStr)
if err != nil {
return oops.New(err, "failed to parse reset time")
}
rateLimitReset = time.Unix(int64(resetUnix), 0)
return nil
}
type AccessTokenResponse struct {
AccessToken string `json:"access_token"`
}
func refreshAccessToken(ctx context.Context) error {
logging.ExtractLogger(ctx).Info().Msg("Refreshing twitch token")
query := url.Values{}
query.Add("client_id", config.Config.Twitch.ClientID)
query.Add("client_secret", config.Config.Twitch.ClientSecret)
query.Add("grant_type", "client_credentials")
url := fmt.Sprintf("%s/token?%s", config.Config.Twitch.BaseIDUrl, query.Encode())
req, err := http.NewRequestWithContext(ctx, "POST", url, nil)
if err != nil {
return oops.New(err, "failed to create request")
}
res, err := httpClient.Do(req)
if err != nil {
return oops.New(err, "failed to request new access token")
}
defer readAndClose(res)
if res.StatusCode >= 400 {
// NOTE(asaf): The docs don't specify the error cases for this call.
// NOTE(asaf): According to the docs rate limiting is per-token, and we don't use a token for this call,
// so who knows how rate limiting works here.
body, _ := io.ReadAll(res.Body)
logging.ExtractLogger(ctx).Error().Interface("Headers", res.Header).Int("Status code", res.StatusCode).Str("body", string(body[:])).Msg("Got bad status code from twitch access token refresh")
return oops.New(nil, "received unexpected status code from twitch access token refresh")
}
body, err := io.ReadAll(res.Body)
if err != nil {
return oops.New(err, "failed to read response body")
}
var accessTokenResponse AccessTokenResponse
err = json.Unmarshal(body, &accessTokenResponse)
if err != nil {
return oops.New(err, "failed to unmarshal access token response")
}
activeAccessToken = accessTokenResponse.AccessToken
return nil
}
func readAndClose(res *http.Response) {
io.ReadAll(res.Body)
res.Body.Close()
}
func buildUrl(path string, queryParams string) string {
return fmt.Sprintf("%s%s?%s", config.Config.Twitch.BaseUrl, path, queryParams)
}

462
src/twitch/twitch.go Normal file
View File

@ -0,0 +1,462 @@
package twitch
import (
"context"
"encoding/json"
"fmt"
"time"
"git.handmade.network/hmn/hmn/src/config"
"git.handmade.network/hmn/hmn/src/db"
"git.handmade.network/hmn/hmn/src/discord"
"git.handmade.network/hmn/hmn/src/hmndata"
"git.handmade.network/hmn/hmn/src/logging"
"git.handmade.network/hmn/hmn/src/oops"
"git.handmade.network/hmn/hmn/src/perf"
"github.com/jackc/pgx/v4/pgxpool"
)
type twitchNotification struct {
TwitchID string
Type twitchNotificationType
}
var twitchNotificationChannel chan twitchNotification
var linksChangedChannel chan struct{}
func MonitorTwitchSubscriptions(ctx context.Context, dbConn *pgxpool.Pool) <-chan struct{} {
log := logging.ExtractLogger(ctx).With().Str("twitch goroutine", "stream monitor").Logger()
ctx = logging.AttachLoggerToContext(&log, ctx)
if config.Config.Twitch.ClientID == "" {
log.Warn().Msg("No twitch config provided.")
done := make(chan struct{}, 1)
done <- struct{}{}
return done
}
twitchNotificationChannel = make(chan twitchNotification, 100)
linksChangedChannel = make(chan struct{}, 10)
done := make(chan struct{})
go func() {
defer func() {
log.Info().Msg("Shutting down twitch monitor")
done <- struct{}{}
}()
log.Info().Msg("Running twitch monitor...")
err := refreshAccessToken(ctx)
if err != nil {
log.Error().Err(err).Msg("Failed to fetch refresh token on start")
return
}
monitorTicker := time.NewTicker(2 * time.Hour)
firstRunChannel := make(chan struct{}, 1)
firstRunChannel <- struct{}{}
for {
select {
case <-ctx.Done():
return
case <-firstRunChannel:
syncWithTwitch(ctx, dbConn, true)
case <-monitorTicker.C:
syncWithTwitch(ctx, dbConn, true)
case <-linksChangedChannel:
syncWithTwitch(ctx, dbConn, false)
case notification := <-twitchNotificationChannel:
if notification.Type == notificationTypeRevocation {
syncWithTwitch(ctx, dbConn, false)
} else {
processEventSubNotification(ctx, dbConn, &notification)
}
}
}
}()
return done
}
type twitchNotificationType int
const (
notificationTypeNone twitchNotificationType = 0
notificationTypeOnline = 1
notificationTypeOffline = 2
notificationTypeChannelUpdate = 3
notificationTypeRevocation = 4
)
func QueueTwitchNotification(messageType string, body []byte) error {
var notification twitchNotification
if messageType == "notification" {
type notificationJson struct {
Subscription struct {
Type string `json:"type"`
} `json:"subscription"`
Event struct {
BroadcasterUserID string `json:"broadcaster_user_id"`
BroadcasterUserLogin string `json:"broadcaster_user_login"`
} `json:"event"`
}
var incoming notificationJson
err := json.Unmarshal(body, &incoming)
if err != nil {
return oops.New(err, "failed to parse notification body")
}
notification.TwitchID = incoming.Event.BroadcasterUserID
switch incoming.Subscription.Type {
case "stream.online":
notification.Type = notificationTypeOnline
case "stream.offline":
notification.Type = notificationTypeOffline
case "channel.update":
notification.Type = notificationTypeChannelUpdate
default:
return oops.New(nil, "unknown subscription type received")
}
} else if messageType == "revocation" {
notification.Type = notificationTypeRevocation
}
if twitchNotificationChannel != nil && notification.Type != notificationTypeNone {
select {
case twitchNotificationChannel <- notification:
default:
return oops.New(nil, "twitch notification channel is full")
}
}
return nil
}
func UserOrProjectLinksUpdated(twitchLoginsPreChange, twitchLoginsPostChange []string) {
if linksChangedChannel != nil {
twitchChanged := (len(twitchLoginsPreChange) != len(twitchLoginsPostChange))
if !twitchChanged {
for idx, _ := range twitchLoginsPreChange {
if twitchLoginsPreChange[idx] != twitchLoginsPostChange[idx] {
twitchChanged = true
break
}
}
}
if twitchChanged {
// NOTE(asaf): Since we update links inside transactions for users/projects
// we won't see the updated list of links until the transaction is committed.
// Waiting 10 seconds is just a quick workaround for that. It's not
// convenient to only trigger this after the transaction is committed.
time.AfterFunc(10*time.Second, func() {
linksChangedChannel <- struct{}{}
})
}
}
}
func syncWithTwitch(ctx context.Context, dbConn *pgxpool.Pool, updateAll bool) {
log := logging.ExtractLogger(ctx)
log.Info().Msg("Running twitch sync")
p := perf.MakeNewRequestPerf("Background job", "", "syncWithTwitch")
defer func() {
p.EndRequest()
perf.LogPerf(p, log.Info())
}()
type twitchSyncStats struct {
NumSubbed int
NumUnsubbed int
NumStreamsChecked int
}
var stats twitchSyncStats
p.StartBlock("SQL", "Fetch list of streamers")
streamers, err := hmndata.FetchTwitchStreamers(ctx, dbConn)
if err != nil {
log.Error().Err(err).Msg("Error while monitoring twitch")
return
}
p.EndBlock()
needID := make([]string, 0)
streamerMap := make(map[string]*hmndata.TwitchStreamer)
for idx, streamer := range streamers {
needID = append(needID, streamer.TwitchLogin)
streamerMap[streamer.TwitchLogin] = &streamers[idx]
}
p.StartBlock("TwitchAPI", "Fetch twitch user info")
twitchUsers, err := getTwitchUsersByLogin(ctx, needID)
if err != nil {
log.Error().Err(err).Msg("Error while monitoring twitch")
return
}
p.EndBlock()
for _, tu := range twitchUsers {
streamerMap[tu.TwitchLogin].TwitchID = tu.TwitchID
}
validStreamers := make([]hmndata.TwitchStreamer, 0, len(streamers))
for _, streamer := range streamers {
if len(streamer.TwitchID) > 0 {
validStreamers = append(validStreamers, streamer)
}
}
p.StartBlock("TwitchAPI", "Fetch event subscriptions")
subscriptions, err := getEventSubscriptions(ctx)
if err != nil {
log.Error().Err(err).Msg("Error while monitoring twitch")
return
}
p.EndBlock()
const (
EventSubNone = 0 // No event of this type found
EventSubRefresh = 1 // Event found, but bad status. Need to unsubscribe and resubscribe.
EventSubGood = 2 // All is well.
)
type isSubbedByType map[string]bool
streamerEventSubs := make(map[string]isSubbedByType)
for _, streamer := range validStreamers {
streamerEventSubs[streamer.TwitchID] = make(isSubbedByType)
streamerEventSubs[streamer.TwitchID]["channel.update"] = false
streamerEventSubs[streamer.TwitchID]["stream.online"] = false
streamerEventSubs[streamer.TwitchID]["stream.offline"] = false
}
type unsubEvent struct {
TwitchID string
EventID string
}
toUnsub := make([]unsubEvent, 0)
for _, sub := range subscriptions {
handled := false
if eventSubs, ok := streamerEventSubs[sub.TwitchID]; ok {
if _, ok := eventSubs[sub.Type]; ok { // Make sure it's a known type
if !sub.GoodStatus {
log.Debug().Str("TwitchID", sub.TwitchID).Str("Event Type", sub.Type).Msg("Twitch doesn't like our sub")
toUnsub = append(toUnsub, unsubEvent{TwitchID: sub.TwitchID, EventID: sub.EventID})
} else {
eventSubs[sub.Type] = true
}
handled = true
}
}
if !handled {
// NOTE(asaf): Found an unknown type or an event subscription that we don't have a matching user for.
// Make sure we unsubscribe.
toUnsub = append(toUnsub, unsubEvent{TwitchID: sub.TwitchID, EventID: sub.EventID})
}
}
if config.Config.Env != config.Dev { // NOTE(asaf): Can't subscribe to events from dev. We need a non-localhost callback url.
p.StartBlock("TwitchAPI", "Sync subscriptions with twitch")
for _, ev := range toUnsub {
err = unsubscribeFromEvent(ctx, ev.EventID)
if err != nil {
log.Error().Err(err).Msg("Error while unsubscribing events")
// NOTE(asaf): Soft error. Don't care if it fails.
}
stats.NumUnsubbed += 1
}
for twitchID, evStatuses := range streamerEventSubs {
for evType, isSubbed := range evStatuses {
if !isSubbed {
err = subscribeToEvent(ctx, evType, twitchID)
if err != nil {
log.Error().Err(err).Msg("Error while monitoring twitch")
return
}
stats.NumSubbed += 1
}
}
}
p.EndBlock()
}
tx, err := dbConn.Begin(ctx)
if err != nil {
log.Error().Err(err).Msg("failed to start transaction")
}
defer tx.Rollback(ctx)
allIDs := make([]string, 0, len(validStreamers))
for _, streamer := range validStreamers {
allIDs = append(allIDs, streamer.TwitchID)
}
p.StartBlock("SQL", "Remove untracked streamers")
_, err = tx.Exec(ctx,
`DELETE FROM twitch_streams WHERE twitch_id != ANY($1)`,
allIDs,
)
if err != nil {
log.Error().Err(err).Msg("Failed to remove untracked twitch ids from streamer list in db")
return
}
p.EndBlock()
usersToUpdate := make([]string, 0)
if updateAll {
usersToUpdate = allIDs
} else {
// NOTE(asaf): Twitch can revoke our subscriptions, so we need to
// update users whose subs were revoked or missing since last time we checked.
for twitchID, evStatuses := range streamerEventSubs {
for _, isSubbed := range evStatuses {
if !isSubbed {
usersToUpdate = append(usersToUpdate, twitchID)
break
}
}
}
}
p.StartBlock("TwitchAPI", "Fetch twitch stream statuses")
statuses, err := getStreamStatus(ctx, usersToUpdate)
if err != nil {
log.Error().Err(err).Msg("failed to fetch stream statuses")
return
}
p.EndBlock()
p.StartBlock("SQL", "Update stream statuses in db")
for _, status := range statuses {
log.Debug().Interface("Status", status).Msg("Got streamer")
_, err = updateStreamStatusInDB(ctx, tx, &status)
if err != nil {
log.Error().Err(err).Msg("failed to update twitch stream status")
}
}
p.EndBlock()
err = tx.Commit(ctx)
if err != nil {
log.Error().Err(err).Msg("failed to commit transaction")
}
stats.NumStreamsChecked += len(usersToUpdate)
log.Info().Interface("Stats", stats).Msg("Twitch sync done")
}
func notifyDiscordOfLiveStream(ctx context.Context, dbConn db.ConnOrTx, twitchLogin string) error {
var err error
if config.Config.Discord.StreamsChannelID != "" {
err = discord.SendMessages(ctx, dbConn, discord.MessageToSend{
ChannelID: config.Config.Discord.StreamsChannelID,
Req: discord.CreateMessageRequest{
Content: fmt.Sprintf("%s is live: https://twitch.tv/%s", twitchLogin, twitchLogin),
},
})
}
return err
}
func processEventSubNotification(ctx context.Context, dbConn db.ConnOrTx, notification *twitchNotification) {
if notification.Type == notificationTypeNone {
return
}
log := logging.ExtractLogger(ctx)
status := streamStatus{
TwitchID: notification.TwitchID,
Live: false,
}
var err error
if notification.Type == notificationTypeChannelUpdate || notification.Type == notificationTypeOnline {
result, err := getStreamStatus(ctx, []string{notification.TwitchID})
if err != nil || len(result) == 0 {
log.Error().Str("TwitchID", notification.TwitchID).Err(err).Msg("failed to fetch stream status")
return
}
allStreamers, err := hmndata.FetchTwitchStreamers(ctx, dbConn)
if err != nil {
log.Error().Err(err).Msg("failed to fetch hmn streamers")
return
}
for _, streamer := range allStreamers {
if streamer.TwitchLogin == result[0].TwitchLogin {
status = result[0]
break
}
}
}
inserted, err := updateStreamStatusInDB(ctx, dbConn, &status)
if err != nil {
log.Error().Err(err).Msg("failed to update twitch stream status")
}
if inserted && notification.Type == notificationTypeOnline {
err = notifyDiscordOfLiveStream(ctx, dbConn, status.TwitchLogin)
if err != nil {
log.Error().Err(err).Msg("failed to notify discord")
}
}
}
func updateStreamStatusInDB(ctx context.Context, conn db.ConnOrTx, status *streamStatus) (bool, error) {
inserted := false
if isStatusRelevant(status) {
_, err := conn.Exec(ctx,
`
INSERT INTO twitch_streams (twitch_id, twitch_login, title, started_at)
VALUES ($1, $2, $3, $4)
ON CONFLICT (twitch_id) DO UPDATE SET
title = EXCLUDED.title,
started_at = EXCLUDED.started_at
`,
status.TwitchID,
status.TwitchLogin,
status.Title,
status.StartedAt,
)
if err != nil {
return false, oops.New(err, "failed to insert twitch streamer into db")
}
inserted = true
} else {
_, err := conn.Exec(ctx,
`
DELETE FROM twitch_streams WHERE twitch_id = $1
`,
status.TwitchID,
)
if err != nil {
return false, oops.New(err, "failed to remove twitch streamer from db")
}
inserted = false
}
return inserted, nil
}
var RelevantCategories = []string{
"1469308723", // Software and Game Development
}
var RelevantTags = []string{
"a59f1e4e-257b-4bd0-90c7-189c3efbf917", // Programming
"6f86127d-6051-4a38-94bb-f7b475dde109", // Software Development
}
func isStatusRelevant(status *streamStatus) bool {
if status.Live {
for _, cat := range RelevantCategories {
if status.Category == cat {
return true
}
}
for _, tag := range RelevantTags {
for _, streamTag := range status.Tags {
if tag == streamTag {
return true
}
}
}
}
return false
}

View File

@ -22,6 +22,7 @@ import (
"git.handmade.network/hmn/hmn/src/oops"
"git.handmade.network/hmn/hmn/src/parsing"
"git.handmade.network/hmn/hmn/src/templates"
"git.handmade.network/hmn/hmn/src/twitch"
"git.handmade.network/hmn/hmn/src/utils"
"github.com/google/uuid"
"github.com/jackc/pgx/v4"
@ -864,6 +865,7 @@ func updateProject(ctx context.Context, tx pgx.Tx, user *models.User, payload *P
}
}
twitchLoginsPreChange, preErr := hmndata.FetchTwitchLoginsForUserOrProject(ctx, tx, nil, &payload.ProjectID)
_, err = tx.Exec(ctx, `DELETE FROM handmade_links WHERE project_id = $1`, payload.ProjectID)
if err != nil {
return oops.New(err, "Failed to delete project links")
@ -883,6 +885,10 @@ func updateProject(ctx context.Context, tx pgx.Tx, user *models.User, payload *P
return oops.New(err, "Failed to insert new project link")
}
}
twitchLoginsPostChange, postErr := hmndata.FetchTwitchLoginsForUserOrProject(ctx, tx, nil, &payload.ProjectID)
if preErr == nil && postErr == nil {
twitch.UserOrProjectLinksUpdated(twitchLoginsPreChange, twitchLoginsPostChange)
}
return nil
}

View File

@ -205,6 +205,9 @@ func NewWebsiteRoutes(longRequestContext context.Context, conn *pgxpool.Pool) ht
hmnOnly.POST(hmnurl.RegexDiscordUnlink, authMiddleware(csrfMiddleware(DiscordUnlink)))
hmnOnly.POST(hmnurl.RegexDiscordShowcaseBacklog, authMiddleware(csrfMiddleware(DiscordShowcaseBacklog)))
hmnOnly.POST(hmnurl.RegexTwitchEventSubCallback, TwitchEventSubCallback)
hmnOnly.GET(hmnurl.RegexTwitchDebugPage, TwitchDebugPage)
hmnOnly.GET(hmnurl.RegexUserProfile, UserProfile)
hmnOnly.GET(hmnurl.RegexUserSettings, authMiddleware(UserSettings))
hmnOnly.POST(hmnurl.RegexUserSettings, authMiddleware(csrfMiddleware(UserSettingsSave)))

92
src/website/twitch.go Normal file
View File

@ -0,0 +1,92 @@
package website
import (
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net/http"
"git.handmade.network/hmn/hmn/src/config"
"git.handmade.network/hmn/hmn/src/db"
"git.handmade.network/hmn/hmn/src/models"
"git.handmade.network/hmn/hmn/src/oops"
"git.handmade.network/hmn/hmn/src/twitch"
)
func TwitchEventSubCallback(c *RequestContext) ResponseData {
secret := config.Config.Twitch.EventSubSecret
messageId := c.Req.Header.Get("Twitch-Eventsub-Message-Id")
timestamp := c.Req.Header.Get("Twitch-Eventsub-Message-Timestamp")
signature := c.Req.Header.Get("Twitch-Eventsub-Message-Signature")
messageType := c.Req.Header.Get("Twitch-Eventsub-Message-Type")
body, err := io.ReadAll(c.Req.Body)
if err != nil {
return c.ErrorResponse(http.StatusInternalServerError, oops.New(err, "failed to read request body"))
}
hmacMessage := fmt.Sprintf("%s%s%s", messageId, timestamp, string(body[:]))
hmac := hmac.New(sha256.New, []byte(secret))
hmac.Write([]byte(hmacMessage))
hash := hmac.Sum(nil)
hmacStr := "sha256=" + hex.EncodeToString(hash)
if hmacStr != signature {
var res ResponseData
res.StatusCode = 403
return res
}
if messageType == "webhook_callback_verification" {
type challengeReq struct {
Challenge string `json:"challenge"`
}
var data challengeReq
err = json.Unmarshal(body, &data)
if err != nil {
return c.ErrorResponse(http.StatusInternalServerError, oops.New(err, "failed to unmarshal twitch verification"))
}
var res ResponseData
res.StatusCode = 200
res.Header().Set("Content-Type", "text/plain") // NOTE(asaf): No idea why, but the twitch-cli fails when we don't set this.
res.Write([]byte(data.Challenge))
return res
} else {
err := twitch.QueueTwitchNotification(messageType, body)
if err != nil {
c.Logger.Error().Err(err).Msg("Failed to process twitch callback")
// NOTE(asaf): Returning 200 either way here
}
var res ResponseData
res.StatusCode = 200
return res
}
}
func TwitchDebugPage(c *RequestContext) ResponseData {
streams, err := db.Query(c.Context(), c.Conn, models.TwitchStream{},
`
SELECT $columns
FROM
twitch_streams
ORDER BY started_at DESC
`,
)
if err != nil {
return c.ErrorResponse(http.StatusInternalServerError, oops.New(err, "failed to fetch twitch streams"))
}
html := ""
for _, stream := range streams {
s := stream.(*models.TwitchStream)
html += fmt.Sprintf(`<a href="https://twitch.tv/%s">%s</a>%s<br />`, s.Login, s.Login, s.Title)
}
var res ResponseData
res.StatusCode = 200
res.Write([]byte(html))
return res
}

View File

@ -18,6 +18,7 @@ import (
"git.handmade.network/hmn/hmn/src/models"
"git.handmade.network/hmn/hmn/src/oops"
"git.handmade.network/hmn/hmn/src/templates"
"git.handmade.network/hmn/hmn/src/twitch"
"github.com/google/uuid"
"github.com/jackc/pgx/v4"
)
@ -378,6 +379,7 @@ func UserSettingsSave(c *RequestContext) ResponseData {
}
// Process links
twitchLoginsPreChange, preErr := hmndata.FetchTwitchLoginsForUserOrProject(c.Context(), tx, &c.CurrentUser.ID, nil)
linksText := form.Get("links")
links := ParseLinks(linksText)
_, err = tx.Exec(c.Context(), `DELETE FROM handmade_links WHERE user_id = $1`, c.CurrentUser.ID)
@ -401,6 +403,10 @@ func UserSettingsSave(c *RequestContext) ResponseData {
}
}
}
twitchLoginsPostChange, postErr := hmndata.FetchTwitchLoginsForUserOrProject(c.Context(), tx, &c.CurrentUser.ID, nil)
if preErr == nil && postErr == nil {
twitch.UserOrProjectLinksUpdated(twitchLoginsPreChange, twitchLoginsPostChange)
}
// Update password
oldPassword := form.Get("old_password")

View File

@ -17,6 +17,7 @@ import (
"git.handmade.network/hmn/hmn/src/logging"
"git.handmade.network/hmn/hmn/src/perf"
"git.handmade.network/hmn/hmn/src/templates"
"git.handmade.network/hmn/hmn/src/twitch"
"github.com/spf13/cobra"
)
@ -46,6 +47,7 @@ var WebsiteCommand = &cobra.Command{
perfCollector.Done,
discord.RunDiscordBot(backgroundJobContext, conn),
discord.RunHistoryWatcher(backgroundJobContext, conn),
twitch.MonitorTwitchSubscriptions(backgroundJobContext, conn),
)
signals := make(chan os.Signal, 1)