diff --git a/src/config/config.go.example b/src/config/config.go.example index 203faa3..563b9ad 100644 --- a/src/config/config.go.example +++ b/src/config/config.go.example @@ -57,6 +57,14 @@ var Config = HMNConfig{ MemberRoleID: "", ShowcaseChannelID: "", LibraryChannelID: "", + StreamsChannelID: "", + }, + Twitch: TwitchConfig{ + ClientID: "", + ClientSecret: "", + EventSubSecret: "", + BaseUrl: "https://api.twitch.tv/helix", + BaseIDUrl: "https://id.twitch.tv/oauth2", }, EpisodeGuide: EpisodeGuide{ CineraOutputPath: "./annotations/", diff --git a/src/config/types.go b/src/config/types.go index eccd2d0..cdfeb71 100644 --- a/src/config/types.go +++ b/src/config/types.go @@ -27,6 +27,7 @@ type HMNConfig struct { Email EmailConfig DigitalOcean DigitalOceanConfig Discord DiscordConfig + Twitch TwitchConfig EpisodeGuide EpisodeGuide } @@ -76,9 +77,18 @@ type DiscordConfig struct { MemberRoleID string ShowcaseChannelID string LibraryChannelID string + StreamsChannelID string JamShowcaseChannelID string } +type TwitchConfig struct { + ClientID string + ClientSecret string + EventSubSecret string // NOTE(asaf): Between 10-100 chars long. Anything will do. + BaseUrl string + BaseIDUrl string +} + type EpisodeGuide struct { CineraOutputPath string Projects map[string]string // NOTE(asaf): Maps from slugs to default episode guide topic diff --git a/src/hmndata/twitch.go b/src/hmndata/twitch.go new file mode 100644 index 0000000..60bdd21 --- /dev/null +++ b/src/hmndata/twitch.go @@ -0,0 +1,97 @@ +package hmndata + +import ( + "context" + "regexp" + "strings" + + "git.handmade.network/hmn/hmn/src/db" + "git.handmade.network/hmn/hmn/src/models" + "git.handmade.network/hmn/hmn/src/oops" +) + +const InvalidUserTwitchID = "INVALID_USER" + +type TwitchStreamer struct { + TwitchID string + TwitchLogin string + UserID *int + ProjectID *int +} + +var twitchRegex = regexp.MustCompile(`twitch\.tv/(?P[^/]+)$`) + +func FetchTwitchStreamers(ctx context.Context, dbConn db.ConnOrTx) ([]TwitchStreamer, error) { + streamers, err := db.Query(ctx, dbConn, models.Link{}, + ` + SELECT $columns + FROM + handmade_links AS link + WHERE + url ~* 'twitch\.tv/([^/]+)$' + `, + ) + if err != nil { + return nil, oops.New(err, "failed to fetch twitch links") + } + + result := make([]TwitchStreamer, 0, len(streamers)) + for _, s := range streamers { + dbStreamer := s.(*models.Link) + + streamer := TwitchStreamer{ + UserID: dbStreamer.UserID, + ProjectID: dbStreamer.ProjectID, + } + + match := twitchRegex.FindStringSubmatch(dbStreamer.URL) + if match != nil { + login := strings.ToLower(match[twitchRegex.SubexpIndex("login")]) + streamer.TwitchLogin = login + } + if len(streamer.TwitchLogin) > 0 { + duplicate := false + for _, r := range result { + if r.TwitchLogin == streamer.TwitchLogin { + duplicate = true + break + } + } + if !duplicate { + result = append(result, streamer) + } + } + } + + return result, nil +} + +func FetchTwitchLoginsForUserOrProject(ctx context.Context, dbConn db.ConnOrTx, userId *int, projectId *int) ([]string, error) { + links, err := db.Query(ctx, dbConn, models.Link{}, + ` + SELECT $columns + FROM + handmade_links AS link + WHERE + url ~* 'twitch\.tv/([^/]+)$' + AND ((user_id = $1 AND project_id IS NULL) OR (user_id IS NULL AND project_id = $2)) + ORDER BY url ASC + `, + userId, + projectId, + ) + if err != nil { + return nil, oops.New(err, "failed to fetch twitch links") + } + result := make([]string, 0, len(links)) + + for _, l := range links { + url := l.(*models.Link).URL + match := twitchRegex.FindStringSubmatch(url) + if match != nil { + login := strings.ToLower(match[twitchRegex.SubexpIndex("login")]) + result = append(result, login) + } + } + return result, nil +} diff --git a/src/hmnurl/urls.go b/src/hmnurl/urls.go index b0f0dba..337c0c0 100644 --- a/src/hmnurl/urls.go +++ b/src/hmnurl/urls.go @@ -687,6 +687,18 @@ func BuildAPICheckUsername() string { return Url("/api/check_username", nil) } +/* +* Twitch stuff + */ + +var RegexTwitchEventSubCallback = regexp.MustCompile("^/twitch_eventsub$") + +func BuildTwitchEventSubCallback() string { + return Url("/twitch_eventsub", nil) +} + +var RegexTwitchDebugPage = regexp.MustCompile("^/twitch_debug$") + /* * User assets */ diff --git a/src/migration/migrations/2022-03-15T012144Z_TwitchTables.go b/src/migration/migrations/2022-03-15T012144Z_TwitchTables.go new file mode 100644 index 0000000..5a69733 --- /dev/null +++ b/src/migration/migrations/2022-03-15T012144Z_TwitchTables.go @@ -0,0 +1,60 @@ +package migrations + +import ( + "context" + "time" + + "git.handmade.network/hmn/hmn/src/migration/types" + "git.handmade.network/hmn/hmn/src/oops" + "github.com/jackc/pgx/v4" +) + +func init() { + registerMigration(TwitchTables{}) +} + +type TwitchTables struct{} + +func (m TwitchTables) Version() types.MigrationVersion { + return types.MigrationVersion(time.Date(2022, 3, 15, 1, 21, 44, 0, time.UTC)) +} + +func (m TwitchTables) Name() string { + return "TwitchTables" +} + +func (m TwitchTables) Description() string { + return "Create tables for live twitch streams and twitch ID cache" +} + +func (m TwitchTables) Up(ctx context.Context, tx pgx.Tx) error { + _, err := tx.Exec(ctx, + ` + CREATE TABLE twitch_streams ( + twitch_id VARCHAR(255) NOT NULL, + twitch_login VARCHAR(255) NOT NULL, + title VARCHAR(255) NOT NULL, + started_at TIMESTAMP WITH TIME ZONE + ); + `, + ) + + if err != nil { + return oops.New(err, "failed to create twitch tables") + } + return err +} + +func (m TwitchTables) Down(ctx context.Context, tx pgx.Tx) error { + _, err := tx.Exec(ctx, + ` + DROP TABLE twitch_ids; + DROP TABLE twitch_streams; + `, + ) + + if err != nil { + return oops.New(err, "failed to create twitch tables") + } + return err +} diff --git a/src/migration/migrations/2022-03-15T063506Z_AddIndexOnTwitchStreams.go b/src/migration/migrations/2022-03-15T063506Z_AddIndexOnTwitchStreams.go new file mode 100644 index 0000000..3ea728b --- /dev/null +++ b/src/migration/migrations/2022-03-15T063506Z_AddIndexOnTwitchStreams.go @@ -0,0 +1,45 @@ +package migrations + +import ( + "context" + "time" + + "git.handmade.network/hmn/hmn/src/migration/types" + "github.com/jackc/pgx/v4" +) + +func init() { + registerMigration(AddIndexOnTwitchStreams{}) +} + +type AddIndexOnTwitchStreams struct{} + +func (m AddIndexOnTwitchStreams) Version() types.MigrationVersion { + return types.MigrationVersion(time.Date(2022, 3, 15, 6, 35, 6, 0, time.UTC)) +} + +func (m AddIndexOnTwitchStreams) Name() string { + return "AddIndexOnTwitchStreams" +} + +func (m AddIndexOnTwitchStreams) Description() string { + return "Add unique index on twitch streams" +} + +func (m AddIndexOnTwitchStreams) Up(ctx context.Context, tx pgx.Tx) error { + _, err := tx.Exec(ctx, + ` + CREATE UNIQUE INDEX twitch_streams_twitch_id ON twitch_streams (twitch_id); + `, + ) + return err +} + +func (m AddIndexOnTwitchStreams) Down(ctx context.Context, tx pgx.Tx) error { + _, err := tx.Exec(ctx, + ` + DROP INDEX twitch_streams_twitch_id; + `, + ) + return err +} diff --git a/src/models/twitch.go b/src/models/twitch.go new file mode 100644 index 0000000..9b6caac --- /dev/null +++ b/src/models/twitch.go @@ -0,0 +1,15 @@ +package models + +import "time" + +type TwitchID struct { + ID string `db:"id"` + Login string `db:"login"` +} + +type TwitchStream struct { + ID string `db:"twitch_id"` + Login string `db:"twitch_login"` + Title string `db:"title"` + StartedAt time.Time `db:"started_at"` +} diff --git a/src/twitch/rest.go b/src/twitch/rest.go new file mode 100644 index 0000000..c697ac2 --- /dev/null +++ b/src/twitch/rest.go @@ -0,0 +1,450 @@ +package twitch + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "net/url" + "strconv" + "strings" + "time" + + "git.handmade.network/hmn/hmn/src/config" + "git.handmade.network/hmn/hmn/src/hmnurl" + "git.handmade.network/hmn/hmn/src/logging" + "git.handmade.network/hmn/hmn/src/oops" + "git.handmade.network/hmn/hmn/src/utils" +) + +var twitchAPIBaseUrl = config.Config.Twitch.BaseUrl + +var HitRateLimit = errors.New("hit rate limit") +var MaxRetries = errors.New("hit max retries") + +var httpClient = &http.Client{} + +// NOTE(asaf): Access token is not thread-safe right now. +// All twitch requests are made through the goroutine in MonitorTwitchSubscriptions. +var activeAccessToken string +var rateLimitReset time.Time + +type twitchUser struct { + TwitchID string + TwitchLogin string +} + +func getTwitchUsersByLogin(ctx context.Context, logins []string) ([]twitchUser, error) { + result := make([]twitchUser, 0, len(logins)) + numChunks := len(logins)/100 + 1 + for i := 0; i < numChunks; i++ { + query := url.Values{} + query.Add("first", "100") + for _, login := range logins[i*100 : utils.IntMin((i+1)*100, len(logins))] { + query.Add("login", login) + } + req, err := http.NewRequestWithContext(ctx, "GET", buildUrl("/users", query.Encode()), nil) + if err != nil { + return nil, oops.New(err, "failed to create requset") + } + res, err := doRequest(ctx, true, req) + if err != nil { + return nil, oops.New(err, "failed to fetch twitch users") + } + + type user struct { + ID string `json:"id"` + Login string `json:"login"` + } + + type twitchResponse struct { + Data []user `json:"data"` + } + + body, err := io.ReadAll(res.Body) + res.Body.Close() + if err != nil { + return nil, oops.New(err, "failed to read response body while fetching twitch users") + } + + var userResponse twitchResponse + err = json.Unmarshal(body, &userResponse) + if err != nil { + return nil, oops.New(err, "failed to parse twitch response while fetching twitch users") + } + + for _, u := range userResponse.Data { + result = append(result, twitchUser{ + TwitchID: u.ID, + TwitchLogin: u.Login, + }) + } + } + + return result, nil +} + +type streamStatus struct { + TwitchID string + TwitchLogin string + Live bool + Title string + StartedAt time.Time + Category string + Tags []string +} + +func getStreamStatus(ctx context.Context, twitchIDs []string) ([]streamStatus, error) { + result := make([]streamStatus, 0, len(twitchIDs)) + numChunks := len(twitchIDs)/100 + 1 + for i := 0; i < numChunks; i++ { + query := url.Values{} + query.Add("first", "100") + for _, tid := range twitchIDs[i*100 : utils.IntMin((i+1)*100, len(twitchIDs))] { + query.Add("user_id", tid) + } + req, err := http.NewRequestWithContext(ctx, "GET", buildUrl("/streams", query.Encode()), nil) + if err != nil { + return nil, oops.New(err, "failed to create request") + } + res, err := doRequest(ctx, true, req) + if err != nil { + return nil, oops.New(err, "failed to fetch stream statuses") + } + + type twitchStatus struct { + TwitchID string `json:"user_id"` + TwitchLogin string `json:"user_login"` + GameID string `json:"game_id"` + Type string `json:"type"` + Title string `json:"title"` + StartedAt string `json:"started_at"` + Thumbnail string `json:"thumbnail_url"` + Tags []string `json:"tag_ids"` + } + + type twitchResponse struct { + Data []twitchStatus `json:"data"` + } + body, err := io.ReadAll(res.Body) + res.Body.Close() + if err != nil { + return nil, oops.New(err, "failed to read response body while processing stream statuses") + } + + var streamResponse twitchResponse + err = json.Unmarshal(body, &streamResponse) + if err != nil { + return nil, oops.New(err, "failed to parse twitch response while processing stream statuses") + } + + for _, d := range streamResponse.Data { + started, err := time.Parse(time.RFC3339, d.StartedAt) + if err != nil { + logging.ExtractLogger(ctx).Warn().Str("Time string", d.StartedAt).Msg("Failed to parse twitch timestamp") + started = time.Now() + } + status := streamStatus{ + TwitchID: d.TwitchID, + TwitchLogin: d.TwitchLogin, + Live: d.Type == "live", + Title: d.Title, + StartedAt: started, + Category: d.GameID, + Tags: d.Tags, + } + result = append(result, status) + } + } + + return result, nil +} + +type twitchEventSub struct { + EventID string + TwitchID string + Type string + GoodStatus bool +} + +func getEventSubscriptions(ctx context.Context) ([]twitchEventSub, error) { + result := make([]twitchEventSub, 0) + after := "" + for { + query := url.Values{} + if len(after) > 0 { + query.Add("after", after) + } + req, err := http.NewRequestWithContext(ctx, "GET", buildUrl("/eventsub/subscriptions", query.Encode()), nil) + if err != nil { + return nil, oops.New(err, "failed to create request") + } + res, err := doRequest(ctx, true, req) + if err != nil { + return nil, oops.New(err, "failed to fetch twitch event subscriptions") + } + + type eventSub struct { + ID string `json:"id"` + Status string `json:"status"` + Type string `json:"type"` + Condition struct { + TwitchID string `json:"broadcaster_user_id"` + } `json:"condition"` + } + + type twitchResponse struct { + Data []eventSub `json:"data"` + Pagination *struct { + After string `json:"after"` + } `json:"pagination"` + } + + body, err := io.ReadAll(res.Body) + res.Body.Close() + if err != nil { + return nil, oops.New(err, "failed to read response body while fetching twitch eventsubs") + } + + var eventSubResponse twitchResponse + err = json.Unmarshal(body, &eventSubResponse) + if err != nil { + return nil, oops.New(err, "failed to parse twitch response while fetching twitch eventsubs") + } + + for _, es := range eventSubResponse.Data { + result = append(result, twitchEventSub{ + EventID: es.ID, + TwitchID: es.Condition.TwitchID, + Type: es.Type, + GoodStatus: es.Status == "enabled" || es.Status == "webhook_callback_verification_pending", + }) + } + + if eventSubResponse.Pagination == nil || eventSubResponse.Pagination.After == "" { + return result, nil + } else { + after = eventSubResponse.Pagination.After + } + } +} + +func subscribeToEvent(ctx context.Context, eventType string, twitchID string) error { + type eventBody struct { + Type string `json:"type"` + Version string `json:"version"` + Condition struct { + TwitchID string `json:"broadcaster_user_id"` + } `json:"condition"` + Transport struct { + Method string `json:"method"` + Callback string `json:"callback"` + Secret string `json:"secret"` + } `json:"transport"` + } + + ev := eventBody{ + Type: eventType, + Version: "1", + } + ev.Condition.TwitchID = twitchID + ev.Transport.Method = "webhook" + // NOTE(asaf): Twitch has special treatment for localhost. We can keep this around for live/beta because it just won't replace anything. + ev.Transport.Callback = strings.ReplaceAll(hmnurl.BuildTwitchEventSubCallback(), "handmade.local:9001", "localhost") + ev.Transport.Secret = config.Config.Twitch.EventSubSecret + + evJson, err := json.Marshal(ev) + if err != nil { + return oops.New(err, "failed to marshal event sub data") + } + req, err := http.NewRequestWithContext(ctx, "POST", buildUrl("/eventsub/subscriptions", ""), bytes.NewReader(evJson)) + req.Header.Set("Content-Type", "application/json") + if err != nil { + return oops.New(err, "failed to create request") + } + res, err := doRequest(ctx, true, req) + if err != nil { + return oops.New(err, "failed to create new event subscription") + } + defer readAndClose(res) + + if res.StatusCode != 201 { + body, err := io.ReadAll(res.Body) + res.Body.Close() + if err != nil { + return oops.New(err, "failed to read response body while creating twitch eventsubs") + } + logging.ExtractLogger(ctx).Error().Interface("Headers", res.Header).Int("Status code", res.StatusCode).Str("Body", string(body[:])).Msg("Failed to create twitch event sub") + return oops.New(nil, "failed to create new event subscription") + } + return nil +} + +func unsubscribeFromEvent(ctx context.Context, eventID string) error { + query := url.Values{} + query.Add("id", eventID) + req, err := http.NewRequestWithContext(ctx, "DELETE", buildUrl("/eventsub/subscriptions", query.Encode()), nil) + if err != nil { + return oops.New(err, "failed to create request") + } + res, err := doRequest(ctx, true, req) + if err != nil { + return oops.New(err, "failed to delete new event subscription") + } + defer readAndClose(res) + + if res.StatusCode != 204 { + body, err := io.ReadAll(res.Body) + res.Body.Close() + if err != nil { + return oops.New(err, "failed to read response body while deleting twitch eventsubs") + } + logging.ExtractLogger(ctx).Error().Interface("Headers", res.Header).Int("Status code", res.StatusCode).Str("Body", string(body[:])).Msg("Failed to delete twitch event sub") + return oops.New(nil, "failed to delete new event subscription") + } + return nil +} + +func doRequest(ctx context.Context, waitOnRateLimit bool, req *http.Request) (*http.Response, error) { + serviceUnavailable := false + numRetries := 5 + + for { + if numRetries == 0 { + return nil, MaxRetries + } + numRetries -= 1 + + now := time.Now() + if rateLimitReset.After(now) { + if waitOnRateLimit { + timer := time.NewTimer(rateLimitReset.Sub(now)) + select { + case <-timer.C: + case <-ctx.Done(): + return nil, errors.New("request interrupted during rate limiting") + } + } else { + return nil, HitRateLimit + } + } + + req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", activeAccessToken)) + req.Header.Set("Client-Id", config.Config.Twitch.ClientID) + res, err := httpClient.Do(req) + if err != nil { + return nil, oops.New(err, "twitch request failed") + } + + if res.StatusCode != 503 { + serviceUnavailable = false + } + + if res.StatusCode >= 200 && res.StatusCode < 300 { + return res, nil + } else if res.StatusCode == 503 { + readAndClose(res) + if serviceUnavailable { + // NOTE(asaf): The docs say we should retry once if we receive 503 + return nil, oops.New(nil, "got 503 Service Unavailable twice in a row") + } else { + serviceUnavailable = true + } + } else if res.StatusCode == 429 { + logging.ExtractLogger(ctx).Warn().Interface("Headers", res.Header).Msg("Hit Twitch rate limit") + err = updateRateLimitReset(res) + if err != nil { + return nil, err + } + } else if res.StatusCode == 401 { + logging.ExtractLogger(ctx).Warn().Msg("Twitch refresh token is invalid. Renewing...") + readAndClose(res) + err = refreshAccessToken(ctx) + if err != nil { + return nil, err + } + } else { + body, err := io.ReadAll(res.Body) + if err != nil { + return nil, oops.New(err, "failed to read response body") + } + logging.ExtractLogger(ctx).Warn().Interface("Headers", res.Header).Int("Status code", res.StatusCode).Str("Body", string(body[:])).Msg("Unexpected status code from twitch") + res.Body.Close() + return res, oops.New(nil, "got an unexpected status code from twitch") + } + } +} + +func updateRateLimitReset(res *http.Response) error { + defer readAndClose(res) + + resetStr := res.Header.Get("Ratelimit-Reset") + if len(resetStr) == 0 { + return oops.New(nil, "no ratelimit data on response") + } + + resetUnix, err := strconv.Atoi(resetStr) + if err != nil { + return oops.New(err, "failed to parse reset time") + } + + rateLimitReset = time.Unix(int64(resetUnix), 0) + return nil +} + +type AccessTokenResponse struct { + AccessToken string `json:"access_token"` +} + +func refreshAccessToken(ctx context.Context) error { + logging.ExtractLogger(ctx).Info().Msg("Refreshing twitch token") + query := url.Values{} + query.Add("client_id", config.Config.Twitch.ClientID) + query.Add("client_secret", config.Config.Twitch.ClientSecret) + query.Add("grant_type", "client_credentials") + url := fmt.Sprintf("%s/token?%s", config.Config.Twitch.BaseIDUrl, query.Encode()) + req, err := http.NewRequestWithContext(ctx, "POST", url, nil) + if err != nil { + return oops.New(err, "failed to create request") + } + + res, err := httpClient.Do(req) + if err != nil { + return oops.New(err, "failed to request new access token") + } + defer readAndClose(res) + + if res.StatusCode >= 400 { + // NOTE(asaf): The docs don't specify the error cases for this call. + // NOTE(asaf): According to the docs rate limiting is per-token, and we don't use a token for this call, + // so who knows how rate limiting works here. + body, _ := io.ReadAll(res.Body) + logging.ExtractLogger(ctx).Error().Interface("Headers", res.Header).Int("Status code", res.StatusCode).Str("body", string(body[:])).Msg("Got bad status code from twitch access token refresh") + return oops.New(nil, "received unexpected status code from twitch access token refresh") + } + + body, err := io.ReadAll(res.Body) + if err != nil { + return oops.New(err, "failed to read response body") + } + var accessTokenResponse AccessTokenResponse + err = json.Unmarshal(body, &accessTokenResponse) + if err != nil { + return oops.New(err, "failed to unmarshal access token response") + } + + activeAccessToken = accessTokenResponse.AccessToken + return nil +} + +func readAndClose(res *http.Response) { + io.ReadAll(res.Body) + res.Body.Close() +} + +func buildUrl(path string, queryParams string) string { + return fmt.Sprintf("%s%s?%s", config.Config.Twitch.BaseUrl, path, queryParams) +} diff --git a/src/twitch/twitch.go b/src/twitch/twitch.go new file mode 100644 index 0000000..a6f1ebe --- /dev/null +++ b/src/twitch/twitch.go @@ -0,0 +1,462 @@ +package twitch + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "git.handmade.network/hmn/hmn/src/config" + "git.handmade.network/hmn/hmn/src/db" + "git.handmade.network/hmn/hmn/src/discord" + "git.handmade.network/hmn/hmn/src/hmndata" + "git.handmade.network/hmn/hmn/src/logging" + "git.handmade.network/hmn/hmn/src/oops" + "git.handmade.network/hmn/hmn/src/perf" + "github.com/jackc/pgx/v4/pgxpool" +) + +type twitchNotification struct { + TwitchID string + Type twitchNotificationType +} + +var twitchNotificationChannel chan twitchNotification +var linksChangedChannel chan struct{} + +func MonitorTwitchSubscriptions(ctx context.Context, dbConn *pgxpool.Pool) <-chan struct{} { + log := logging.ExtractLogger(ctx).With().Str("twitch goroutine", "stream monitor").Logger() + ctx = logging.AttachLoggerToContext(&log, ctx) + + if config.Config.Twitch.ClientID == "" { + log.Warn().Msg("No twitch config provided.") + done := make(chan struct{}, 1) + done <- struct{}{} + return done + } + + twitchNotificationChannel = make(chan twitchNotification, 100) + linksChangedChannel = make(chan struct{}, 10) + done := make(chan struct{}) + + go func() { + defer func() { + log.Info().Msg("Shutting down twitch monitor") + done <- struct{}{} + }() + log.Info().Msg("Running twitch monitor...") + + err := refreshAccessToken(ctx) + if err != nil { + log.Error().Err(err).Msg("Failed to fetch refresh token on start") + return + } + + monitorTicker := time.NewTicker(2 * time.Hour) + firstRunChannel := make(chan struct{}, 1) + firstRunChannel <- struct{}{} + for { + select { + case <-ctx.Done(): + return + case <-firstRunChannel: + syncWithTwitch(ctx, dbConn, true) + case <-monitorTicker.C: + syncWithTwitch(ctx, dbConn, true) + case <-linksChangedChannel: + syncWithTwitch(ctx, dbConn, false) + case notification := <-twitchNotificationChannel: + if notification.Type == notificationTypeRevocation { + syncWithTwitch(ctx, dbConn, false) + } else { + processEventSubNotification(ctx, dbConn, ¬ification) + } + } + } + }() + + return done +} + +type twitchNotificationType int + +const ( + notificationTypeNone twitchNotificationType = 0 + notificationTypeOnline = 1 + notificationTypeOffline = 2 + notificationTypeChannelUpdate = 3 + + notificationTypeRevocation = 4 +) + +func QueueTwitchNotification(messageType string, body []byte) error { + var notification twitchNotification + if messageType == "notification" { + type notificationJson struct { + Subscription struct { + Type string `json:"type"` + } `json:"subscription"` + Event struct { + BroadcasterUserID string `json:"broadcaster_user_id"` + BroadcasterUserLogin string `json:"broadcaster_user_login"` + } `json:"event"` + } + var incoming notificationJson + err := json.Unmarshal(body, &incoming) + if err != nil { + return oops.New(err, "failed to parse notification body") + } + + notification.TwitchID = incoming.Event.BroadcasterUserID + switch incoming.Subscription.Type { + case "stream.online": + notification.Type = notificationTypeOnline + case "stream.offline": + notification.Type = notificationTypeOffline + case "channel.update": + notification.Type = notificationTypeChannelUpdate + default: + return oops.New(nil, "unknown subscription type received") + } + } else if messageType == "revocation" { + notification.Type = notificationTypeRevocation + } + + if twitchNotificationChannel != nil && notification.Type != notificationTypeNone { + select { + case twitchNotificationChannel <- notification: + default: + return oops.New(nil, "twitch notification channel is full") + } + } + return nil +} + +func UserOrProjectLinksUpdated(twitchLoginsPreChange, twitchLoginsPostChange []string) { + if linksChangedChannel != nil { + twitchChanged := (len(twitchLoginsPreChange) != len(twitchLoginsPostChange)) + if !twitchChanged { + for idx, _ := range twitchLoginsPreChange { + if twitchLoginsPreChange[idx] != twitchLoginsPostChange[idx] { + twitchChanged = true + break + } + } + } + if twitchChanged { + // NOTE(asaf): Since we update links inside transactions for users/projects + // we won't see the updated list of links until the transaction is committed. + // Waiting 10 seconds is just a quick workaround for that. It's not + // convenient to only trigger this after the transaction is committed. + time.AfterFunc(10*time.Second, func() { + linksChangedChannel <- struct{}{} + }) + } + } +} + +func syncWithTwitch(ctx context.Context, dbConn *pgxpool.Pool, updateAll bool) { + log := logging.ExtractLogger(ctx) + log.Info().Msg("Running twitch sync") + p := perf.MakeNewRequestPerf("Background job", "", "syncWithTwitch") + defer func() { + p.EndRequest() + perf.LogPerf(p, log.Info()) + }() + + type twitchSyncStats struct { + NumSubbed int + NumUnsubbed int + NumStreamsChecked int + } + var stats twitchSyncStats + + p.StartBlock("SQL", "Fetch list of streamers") + streamers, err := hmndata.FetchTwitchStreamers(ctx, dbConn) + if err != nil { + log.Error().Err(err).Msg("Error while monitoring twitch") + return + } + p.EndBlock() + + needID := make([]string, 0) + streamerMap := make(map[string]*hmndata.TwitchStreamer) + for idx, streamer := range streamers { + needID = append(needID, streamer.TwitchLogin) + streamerMap[streamer.TwitchLogin] = &streamers[idx] + } + + p.StartBlock("TwitchAPI", "Fetch twitch user info") + twitchUsers, err := getTwitchUsersByLogin(ctx, needID) + if err != nil { + log.Error().Err(err).Msg("Error while monitoring twitch") + return + } + p.EndBlock() + + for _, tu := range twitchUsers { + streamerMap[tu.TwitchLogin].TwitchID = tu.TwitchID + } + + validStreamers := make([]hmndata.TwitchStreamer, 0, len(streamers)) + for _, streamer := range streamers { + if len(streamer.TwitchID) > 0 { + validStreamers = append(validStreamers, streamer) + } + } + + p.StartBlock("TwitchAPI", "Fetch event subscriptions") + subscriptions, err := getEventSubscriptions(ctx) + if err != nil { + log.Error().Err(err).Msg("Error while monitoring twitch") + return + } + p.EndBlock() + + const ( + EventSubNone = 0 // No event of this type found + EventSubRefresh = 1 // Event found, but bad status. Need to unsubscribe and resubscribe. + EventSubGood = 2 // All is well. + ) + + type isSubbedByType map[string]bool + + streamerEventSubs := make(map[string]isSubbedByType) + for _, streamer := range validStreamers { + streamerEventSubs[streamer.TwitchID] = make(isSubbedByType) + streamerEventSubs[streamer.TwitchID]["channel.update"] = false + streamerEventSubs[streamer.TwitchID]["stream.online"] = false + streamerEventSubs[streamer.TwitchID]["stream.offline"] = false + } + + type unsubEvent struct { + TwitchID string + EventID string + } + + toUnsub := make([]unsubEvent, 0) + + for _, sub := range subscriptions { + handled := false + if eventSubs, ok := streamerEventSubs[sub.TwitchID]; ok { + if _, ok := eventSubs[sub.Type]; ok { // Make sure it's a known type + if !sub.GoodStatus { + log.Debug().Str("TwitchID", sub.TwitchID).Str("Event Type", sub.Type).Msg("Twitch doesn't like our sub") + toUnsub = append(toUnsub, unsubEvent{TwitchID: sub.TwitchID, EventID: sub.EventID}) + } else { + eventSubs[sub.Type] = true + } + handled = true + } + } + if !handled { + // NOTE(asaf): Found an unknown type or an event subscription that we don't have a matching user for. + // Make sure we unsubscribe. + toUnsub = append(toUnsub, unsubEvent{TwitchID: sub.TwitchID, EventID: sub.EventID}) + } + } + + if config.Config.Env != config.Dev { // NOTE(asaf): Can't subscribe to events from dev. We need a non-localhost callback url. + p.StartBlock("TwitchAPI", "Sync subscriptions with twitch") + for _, ev := range toUnsub { + err = unsubscribeFromEvent(ctx, ev.EventID) + if err != nil { + log.Error().Err(err).Msg("Error while unsubscribing events") + // NOTE(asaf): Soft error. Don't care if it fails. + } + stats.NumUnsubbed += 1 + } + + for twitchID, evStatuses := range streamerEventSubs { + for evType, isSubbed := range evStatuses { + if !isSubbed { + err = subscribeToEvent(ctx, evType, twitchID) + if err != nil { + log.Error().Err(err).Msg("Error while monitoring twitch") + return + } + stats.NumSubbed += 1 + } + } + } + p.EndBlock() + } + + tx, err := dbConn.Begin(ctx) + if err != nil { + log.Error().Err(err).Msg("failed to start transaction") + } + defer tx.Rollback(ctx) + + allIDs := make([]string, 0, len(validStreamers)) + for _, streamer := range validStreamers { + allIDs = append(allIDs, streamer.TwitchID) + } + p.StartBlock("SQL", "Remove untracked streamers") + _, err = tx.Exec(ctx, + `DELETE FROM twitch_streams WHERE twitch_id != ANY($1)`, + allIDs, + ) + if err != nil { + log.Error().Err(err).Msg("Failed to remove untracked twitch ids from streamer list in db") + return + } + p.EndBlock() + + usersToUpdate := make([]string, 0) + if updateAll { + usersToUpdate = allIDs + } else { + // NOTE(asaf): Twitch can revoke our subscriptions, so we need to + // update users whose subs were revoked or missing since last time we checked. + for twitchID, evStatuses := range streamerEventSubs { + for _, isSubbed := range evStatuses { + if !isSubbed { + usersToUpdate = append(usersToUpdate, twitchID) + break + } + } + } + } + + p.StartBlock("TwitchAPI", "Fetch twitch stream statuses") + statuses, err := getStreamStatus(ctx, usersToUpdate) + if err != nil { + log.Error().Err(err).Msg("failed to fetch stream statuses") + return + } + p.EndBlock() + p.StartBlock("SQL", "Update stream statuses in db") + for _, status := range statuses { + log.Debug().Interface("Status", status).Msg("Got streamer") + _, err = updateStreamStatusInDB(ctx, tx, &status) + if err != nil { + log.Error().Err(err).Msg("failed to update twitch stream status") + } + } + p.EndBlock() + err = tx.Commit(ctx) + if err != nil { + log.Error().Err(err).Msg("failed to commit transaction") + } + stats.NumStreamsChecked += len(usersToUpdate) + log.Info().Interface("Stats", stats).Msg("Twitch sync done") +} + +func notifyDiscordOfLiveStream(ctx context.Context, dbConn db.ConnOrTx, twitchLogin string) error { + var err error + if config.Config.Discord.StreamsChannelID != "" { + err = discord.SendMessages(ctx, dbConn, discord.MessageToSend{ + ChannelID: config.Config.Discord.StreamsChannelID, + Req: discord.CreateMessageRequest{ + Content: fmt.Sprintf("%s is live: https://twitch.tv/%s", twitchLogin, twitchLogin), + }, + }) + } + return err +} + +func processEventSubNotification(ctx context.Context, dbConn db.ConnOrTx, notification *twitchNotification) { + if notification.Type == notificationTypeNone { + return + } + + log := logging.ExtractLogger(ctx) + status := streamStatus{ + TwitchID: notification.TwitchID, + Live: false, + } + var err error + if notification.Type == notificationTypeChannelUpdate || notification.Type == notificationTypeOnline { + result, err := getStreamStatus(ctx, []string{notification.TwitchID}) + if err != nil || len(result) == 0 { + log.Error().Str("TwitchID", notification.TwitchID).Err(err).Msg("failed to fetch stream status") + return + } + allStreamers, err := hmndata.FetchTwitchStreamers(ctx, dbConn) + if err != nil { + log.Error().Err(err).Msg("failed to fetch hmn streamers") + return + } + for _, streamer := range allStreamers { + if streamer.TwitchLogin == result[0].TwitchLogin { + status = result[0] + break + } + } + } + + inserted, err := updateStreamStatusInDB(ctx, dbConn, &status) + if err != nil { + log.Error().Err(err).Msg("failed to update twitch stream status") + } + if inserted && notification.Type == notificationTypeOnline { + err = notifyDiscordOfLiveStream(ctx, dbConn, status.TwitchLogin) + if err != nil { + log.Error().Err(err).Msg("failed to notify discord") + } + } +} + +func updateStreamStatusInDB(ctx context.Context, conn db.ConnOrTx, status *streamStatus) (bool, error) { + inserted := false + if isStatusRelevant(status) { + _, err := conn.Exec(ctx, + ` + INSERT INTO twitch_streams (twitch_id, twitch_login, title, started_at) + VALUES ($1, $2, $3, $4) + ON CONFLICT (twitch_id) DO UPDATE SET + title = EXCLUDED.title, + started_at = EXCLUDED.started_at + `, + status.TwitchID, + status.TwitchLogin, + status.Title, + status.StartedAt, + ) + if err != nil { + return false, oops.New(err, "failed to insert twitch streamer into db") + } + inserted = true + } else { + _, err := conn.Exec(ctx, + ` + DELETE FROM twitch_streams WHERE twitch_id = $1 + `, + status.TwitchID, + ) + if err != nil { + return false, oops.New(err, "failed to remove twitch streamer from db") + } + inserted = false + } + return inserted, nil +} + +var RelevantCategories = []string{ + "1469308723", // Software and Game Development +} + +var RelevantTags = []string{ + "a59f1e4e-257b-4bd0-90c7-189c3efbf917", // Programming + "6f86127d-6051-4a38-94bb-f7b475dde109", // Software Development +} + +func isStatusRelevant(status *streamStatus) bool { + if status.Live { + for _, cat := range RelevantCategories { + if status.Category == cat { + return true + } + } + + for _, tag := range RelevantTags { + for _, streamTag := range status.Tags { + if tag == streamTag { + return true + } + } + } + } + return false +} diff --git a/src/website/projects.go b/src/website/projects.go index db60449..84f113c 100644 --- a/src/website/projects.go +++ b/src/website/projects.go @@ -22,6 +22,7 @@ import ( "git.handmade.network/hmn/hmn/src/oops" "git.handmade.network/hmn/hmn/src/parsing" "git.handmade.network/hmn/hmn/src/templates" + "git.handmade.network/hmn/hmn/src/twitch" "git.handmade.network/hmn/hmn/src/utils" "github.com/google/uuid" "github.com/jackc/pgx/v4" @@ -864,6 +865,7 @@ func updateProject(ctx context.Context, tx pgx.Tx, user *models.User, payload *P } } + twitchLoginsPreChange, preErr := hmndata.FetchTwitchLoginsForUserOrProject(ctx, tx, nil, &payload.ProjectID) _, err = tx.Exec(ctx, `DELETE FROM handmade_links WHERE project_id = $1`, payload.ProjectID) if err != nil { return oops.New(err, "Failed to delete project links") @@ -883,6 +885,10 @@ func updateProject(ctx context.Context, tx pgx.Tx, user *models.User, payload *P return oops.New(err, "Failed to insert new project link") } } + twitchLoginsPostChange, postErr := hmndata.FetchTwitchLoginsForUserOrProject(ctx, tx, nil, &payload.ProjectID) + if preErr == nil && postErr == nil { + twitch.UserOrProjectLinksUpdated(twitchLoginsPreChange, twitchLoginsPostChange) + } return nil } diff --git a/src/website/routes.go b/src/website/routes.go index 20062fd..7d89099 100644 --- a/src/website/routes.go +++ b/src/website/routes.go @@ -205,6 +205,9 @@ func NewWebsiteRoutes(longRequestContext context.Context, conn *pgxpool.Pool) ht hmnOnly.POST(hmnurl.RegexDiscordUnlink, authMiddleware(csrfMiddleware(DiscordUnlink))) hmnOnly.POST(hmnurl.RegexDiscordShowcaseBacklog, authMiddleware(csrfMiddleware(DiscordShowcaseBacklog))) + hmnOnly.POST(hmnurl.RegexTwitchEventSubCallback, TwitchEventSubCallback) + hmnOnly.GET(hmnurl.RegexTwitchDebugPage, TwitchDebugPage) + hmnOnly.GET(hmnurl.RegexUserProfile, UserProfile) hmnOnly.GET(hmnurl.RegexUserSettings, authMiddleware(UserSettings)) hmnOnly.POST(hmnurl.RegexUserSettings, authMiddleware(csrfMiddleware(UserSettingsSave))) diff --git a/src/website/twitch.go b/src/website/twitch.go new file mode 100644 index 0000000..85b8830 --- /dev/null +++ b/src/website/twitch.go @@ -0,0 +1,92 @@ +package website + +import ( + "crypto/hmac" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "net/http" + + "git.handmade.network/hmn/hmn/src/config" + "git.handmade.network/hmn/hmn/src/db" + "git.handmade.network/hmn/hmn/src/models" + "git.handmade.network/hmn/hmn/src/oops" + "git.handmade.network/hmn/hmn/src/twitch" +) + +func TwitchEventSubCallback(c *RequestContext) ResponseData { + secret := config.Config.Twitch.EventSubSecret + messageId := c.Req.Header.Get("Twitch-Eventsub-Message-Id") + timestamp := c.Req.Header.Get("Twitch-Eventsub-Message-Timestamp") + signature := c.Req.Header.Get("Twitch-Eventsub-Message-Signature") + messageType := c.Req.Header.Get("Twitch-Eventsub-Message-Type") + + body, err := io.ReadAll(c.Req.Body) + if err != nil { + return c.ErrorResponse(http.StatusInternalServerError, oops.New(err, "failed to read request body")) + } + + hmacMessage := fmt.Sprintf("%s%s%s", messageId, timestamp, string(body[:])) + hmac := hmac.New(sha256.New, []byte(secret)) + hmac.Write([]byte(hmacMessage)) + hash := hmac.Sum(nil) + hmacStr := "sha256=" + hex.EncodeToString(hash) + + if hmacStr != signature { + var res ResponseData + res.StatusCode = 403 + return res + } + + if messageType == "webhook_callback_verification" { + type challengeReq struct { + Challenge string `json:"challenge"` + } + var data challengeReq + err = json.Unmarshal(body, &data) + if err != nil { + return c.ErrorResponse(http.StatusInternalServerError, oops.New(err, "failed to unmarshal twitch verification")) + } + var res ResponseData + res.StatusCode = 200 + + res.Header().Set("Content-Type", "text/plain") // NOTE(asaf): No idea why, but the twitch-cli fails when we don't set this. + res.Write([]byte(data.Challenge)) + return res + } else { + err := twitch.QueueTwitchNotification(messageType, body) + if err != nil { + c.Logger.Error().Err(err).Msg("Failed to process twitch callback") + // NOTE(asaf): Returning 200 either way here + } + var res ResponseData + res.StatusCode = 200 + return res + } +} + +func TwitchDebugPage(c *RequestContext) ResponseData { + streams, err := db.Query(c.Context(), c.Conn, models.TwitchStream{}, + ` + SELECT $columns + FROM + twitch_streams + ORDER BY started_at DESC + `, + ) + if err != nil { + return c.ErrorResponse(http.StatusInternalServerError, oops.New(err, "failed to fetch twitch streams")) + } + + html := "" + for _, stream := range streams { + s := stream.(*models.TwitchStream) + html += fmt.Sprintf(`%s%s
`, s.Login, s.Login, s.Title) + } + var res ResponseData + res.StatusCode = 200 + res.Write([]byte(html)) + return res +} diff --git a/src/website/user.go b/src/website/user.go index defb9c1..395695b 100644 --- a/src/website/user.go +++ b/src/website/user.go @@ -18,6 +18,7 @@ import ( "git.handmade.network/hmn/hmn/src/models" "git.handmade.network/hmn/hmn/src/oops" "git.handmade.network/hmn/hmn/src/templates" + "git.handmade.network/hmn/hmn/src/twitch" "github.com/google/uuid" "github.com/jackc/pgx/v4" ) @@ -378,6 +379,7 @@ func UserSettingsSave(c *RequestContext) ResponseData { } // Process links + twitchLoginsPreChange, preErr := hmndata.FetchTwitchLoginsForUserOrProject(c.Context(), tx, &c.CurrentUser.ID, nil) linksText := form.Get("links") links := ParseLinks(linksText) _, err = tx.Exec(c.Context(), `DELETE FROM handmade_links WHERE user_id = $1`, c.CurrentUser.ID) @@ -401,6 +403,10 @@ func UserSettingsSave(c *RequestContext) ResponseData { } } } + twitchLoginsPostChange, postErr := hmndata.FetchTwitchLoginsForUserOrProject(c.Context(), tx, &c.CurrentUser.ID, nil) + if preErr == nil && postErr == nil { + twitch.UserOrProjectLinksUpdated(twitchLoginsPreChange, twitchLoginsPostChange) + } // Update password oldPassword := form.Get("old_password") diff --git a/src/website/website.go b/src/website/website.go index 65bf929..0303a92 100644 --- a/src/website/website.go +++ b/src/website/website.go @@ -17,6 +17,7 @@ import ( "git.handmade.network/hmn/hmn/src/logging" "git.handmade.network/hmn/hmn/src/perf" "git.handmade.network/hmn/hmn/src/templates" + "git.handmade.network/hmn/hmn/src/twitch" "github.com/spf13/cobra" ) @@ -46,6 +47,7 @@ var WebsiteCommand = &cobra.Command{ perfCollector.Done, discord.RunDiscordBot(backgroundJobContext, conn), discord.RunHistoryWatcher(backgroundJobContext, conn), + twitch.MonitorTwitchSubscriptions(backgroundJobContext, conn), ) signals := make(chan os.Signal, 1)