diff --git a/.gitignore b/.gitignore index ffcf48e..a3b42dd 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,4 @@ hmn.conf adminmailer/config.go adminmailer/adminmailer local/backups +/tmp diff --git a/src/assets/assets.go b/src/assets/assets.go index 21be9e6..a8fccd5 100644 --- a/src/assets/assets.go +++ b/src/assets/assets.go @@ -67,7 +67,7 @@ func SanitizeFilename(filename string) string { } func AssetKey(id, filename string) string { - return fmt.Sprintf("%s%s/%s", config.Config.DigitalOcean.AssetsPathPrefix, id, filename) + return fmt.Sprintf("%s/%s", id, filename) } type InvalidAssetError error diff --git a/src/auth/auth.go b/src/auth/auth.go index fa9aea9..5000041 100644 --- a/src/auth/auth.go +++ b/src/auth/auth.go @@ -14,6 +14,7 @@ import ( "time" "git.handmade.network/hmn/hmn/src/db" + "git.handmade.network/hmn/hmn/src/jobs" "git.handmade.network/hmn/hmn/src/logging" "git.handmade.network/hmn/hmn/src/models" "git.handmade.network/hmn/hmn/src/oops" @@ -234,10 +235,10 @@ func DeleteExpiredPasswordResets(ctx context.Context, conn *pgxpool.Pool) (int64 return tag.RowsAffected(), nil } -func PeriodicallyDeleteInactiveUsers(ctx context.Context, conn *pgxpool.Pool) <-chan struct{} { - done := make(chan struct{}) +func PeriodicallyDeleteInactiveUsers(ctx context.Context, conn *pgxpool.Pool) jobs.Job { + job := jobs.New() go func() { - defer close(done) + defer job.Done() t := time.NewTicker(1 * time.Hour) for { @@ -265,5 +266,5 @@ func PeriodicallyDeleteInactiveUsers(ctx context.Context, conn *pgxpool.Pool) <- } } }() - return done + return job } diff --git a/src/auth/session.go b/src/auth/session.go index c163db4..a0d7d46 100644 --- a/src/auth/session.go +++ b/src/auth/session.go @@ -11,6 +11,7 @@ import ( "git.handmade.network/hmn/hmn/src/config" "git.handmade.network/hmn/hmn/src/db" + "git.handmade.network/hmn/hmn/src/jobs" "git.handmade.network/hmn/hmn/src/logging" "git.handmade.network/hmn/hmn/src/models" "git.handmade.network/hmn/hmn/src/oops" @@ -132,10 +133,10 @@ func DeleteExpiredSessions(ctx context.Context, conn *pgxpool.Pool) (int64, erro return tag.RowsAffected(), nil } -func PeriodicallyDeleteExpiredSessions(ctx context.Context, conn *pgxpool.Pool) <-chan struct{} { - done := make(chan struct{}) +func PeriodicallyDeleteExpiredSessions(ctx context.Context, conn *pgxpool.Pool) jobs.Job { + job := jobs.New() go func() { - defer close(done) + defer job.Done() t := time.NewTicker(1 * time.Minute) for { @@ -154,5 +155,5 @@ func PeriodicallyDeleteExpiredSessions(ctx context.Context, conn *pgxpool.Pool) } } }() - return done + return job } diff --git a/src/config/config.go.example b/src/config/config.go.example index a72f44c..3044378 100644 --- a/src/config/config.go.example +++ b/src/config/config.go.example @@ -38,13 +38,20 @@ var Config = HMNConfig{ OverrideRecipientEmail: "override@handmade.network", // NOTE(asaf): If this is not empty, all emails will be redirected to this address. }, DigitalOcean: DigitalOceanConfig{ - AssetsSpacesKey: "", - AssetsSpacesSecret: "", - AssetsSpacesRegion: "", - AssetsSpacesEndpoint: "", - AssetsSpacesBucket: "", - AssetsPathPrefix: "", // Empty is fine for production, but may be necessary for dev - AssetsPublicUrlRoot: "", // e.g. "https://bucket-name.region.cdn.digitaloceanspaces.com/". Note the trailing slash... + AssetsSpacesKey: "dummy", + AssetsSpacesSecret: "dummy", + AssetsSpacesRegion: "dummy", + AssetsSpacesEndpoint: "http://handmade.local:9003/", + AssetsSpacesBucket: "assets", + AssetsPublicUrlRoot: "http://handmade.local:9003/assets/", + // In prod, AssetsPublicUrlRoot will probably look something like: + // + // "https://bucket-name.region.cdn.digitaloceanspaces.com/" + // + // Note the trailing slash... + + RunFakeServer: true, + FakeAddr: "localhost:9003", }, Discord: DiscordConfig{ BotToken: "", diff --git a/src/config/types.go b/src/config/types.go index cdfeb71..01f12d2 100644 --- a/src/config/types.go +++ b/src/config/types.go @@ -53,8 +53,10 @@ type DigitalOceanConfig struct { AssetsSpacesRegion string AssetsSpacesEndpoint string AssetsSpacesBucket string - AssetsPathPrefix string AssetsPublicUrlRoot string + + RunFakeServer bool + FakeAddr string } type EmailConfig struct { diff --git a/src/discord/gateway.go b/src/discord/gateway.go index f3188e5..b22beb9 100644 --- a/src/discord/gateway.go +++ b/src/discord/gateway.go @@ -13,6 +13,7 @@ import ( "git.handmade.network/hmn/hmn/src/config" "git.handmade.network/hmn/hmn/src/db" + "git.handmade.network/hmn/hmn/src/jobs" "git.handmade.network/hmn/hmn/src/logging" "git.handmade.network/hmn/hmn/src/models" "git.handmade.network/hmn/hmn/src/oops" @@ -22,22 +23,20 @@ import ( "github.com/jpillora/backoff" ) -func RunDiscordBot(ctx context.Context, dbConn *pgxpool.Pool) <-chan struct{} { +func RunDiscordBot(ctx context.Context, dbConn *pgxpool.Pool) jobs.Job { log := logging.ExtractLogger(ctx).With().Str("module", "discord").Logger() ctx = logging.AttachLoggerToContext(&log, ctx) if config.Config.Discord.BotToken == "" { log.Warn().Msg("No Discord bot token was provided, so the Discord bot cannot run.") - done := make(chan struct{}, 1) - done <- struct{}{} - return done + return jobs.Noop() } - done := make(chan struct{}) + job := jobs.New() go func() { defer func() { log.Debug().Msg("shut down Discord bot") - done <- struct{}{} + job.Done() }() boff := backoff.Backoff{ @@ -88,7 +87,7 @@ func RunDiscordBot(ctx context.Context, dbConn *pgxpool.Pool) <-chan struct{} { }() } }() - return done + return job } var outgoingMessagesReady = make(chan struct{}, 1) diff --git a/src/discord/history.go b/src/discord/history.go index ee879cc..8a8b32c 100644 --- a/src/discord/history.go +++ b/src/discord/history.go @@ -7,27 +7,26 @@ import ( "git.handmade.network/hmn/hmn/src/config" "git.handmade.network/hmn/hmn/src/db" + "git.handmade.network/hmn/hmn/src/jobs" "git.handmade.network/hmn/hmn/src/logging" "git.handmade.network/hmn/hmn/src/models" "github.com/jackc/pgx/v4/pgxpool" ) -func RunHistoryWatcher(ctx context.Context, dbConn *pgxpool.Pool) <-chan struct{} { +func RunHistoryWatcher(ctx context.Context, dbConn *pgxpool.Pool) jobs.Job { log := logging.ExtractLogger(ctx).With().Str("discord goroutine", "history watcher").Logger() ctx = logging.AttachLoggerToContext(&log, ctx) if config.Config.Discord.BotToken == "" { log.Warn().Msg("No Discord bot token was provided, so the Discord history bot cannot run.") - done := make(chan struct{}, 1) - done <- struct{}{} - return done + return jobs.Noop() } - done := make(chan struct{}) + job := jobs.New() go func() { defer func() { log.Debug().Msg("shut down Discord history watcher") - done <- struct{}{} + job.Done() }() newUserTicker := time.NewTicker(5 * time.Second) @@ -67,7 +66,7 @@ func RunHistoryWatcher(ctx context.Context, dbConn *pgxpool.Pool) <-chan struct{ } }() - return done + return job } func fetchMissingContent(ctx context.Context, dbConn *pgxpool.Pool) { diff --git a/src/discord/message_handling.go b/src/discord/message_handling.go index 3f78b95..1fdf82d 100644 --- a/src/discord/message_handling.go +++ b/src/discord/message_handling.go @@ -803,7 +803,7 @@ func HandleSnippetForInternedMessage(ctx context.Context, dbConn db.ConnOrTx, in ` SELECT $columns{tag} FROM - tags + tag JOIN project ON project.tag = tag.id WHERE project.id = ANY ($1) diff --git a/src/hmns3/hmns3.go b/src/hmns3/hmns3.go index c762e9c..91db660 100644 --- a/src/hmns3/hmns3.go +++ b/src/hmns3/hmns3.go @@ -1,78 +1,101 @@ package hmns3 import ( - _ "embed" + "context" + "errors" "fmt" - "git.handmade.network/hmn/hmn/src/website" - "github.com/spf13/cobra" "io" "io/fs" - "log" "net/http" "os" + "path/filepath" "strings" + + "git.handmade.network/hmn/hmn/src/config" + "git.handmade.network/hmn/hmn/src/jobs" + "git.handmade.network/hmn/hmn/src/logging" + "git.handmade.network/hmn/hmn/src/utils" + "github.com/rs/zerolog" ) -func init() { - s3Command := &cobra.Command{ - Use: "hmns3 [storage folder]", - Short: "Run a local s3 server that stores in the filesystem", - Run: func(cmd *cobra.Command, args []string) { - targetFolder := "./tmp" - if len(args) > 0 { - targetFolder = args[0] - } - err := os.MkdirAll(targetFolder, fs.ModePerm) - if err != nil { - panic(err) - } +const dir = "./tmp/s3" - handler := func(w http.ResponseWriter, r *http.Request) { - bucket, key := bucket_key(r) - - fmt.Println("\n\nIncoming request path:", r.URL.Path) - bodyBytes, err := io.ReadAll(r.Body) - fmt.Println("Bucket: ", bucket, " key: ", key, " method: ", r.Method, " len(body): ", len(bodyBytes)) - if err != nil { - panic(err) - } - if r.Method == http.MethodPut { - w.Header().Set("Location", fmt.Sprintf("/%s", bucket)) - err := os.MkdirAll(fmt.Sprintf("%s/%s", targetFolder, bucket), fs.ModePerm) - if err != nil { - panic(err) - } - if key != "" { - err = os.WriteFile(fmt.Sprintf("%s/%s/%s",targetFolder, bucket, key), bodyBytes, fs.ModePerm) - if err != nil { - panic(err) - } - } - } else if r.Method == http.MethodGet { - fileBytes, err := os.ReadFile(fmt.Sprintf("%s/%s/%s", targetFolder, bucket, key)) - if err != nil { - panic(err) - } - w.Write(fileBytes) - } else { - panic("Unimplemented method!") - } - } - - http.HandleFunc("/", handler) - log.Fatal(http.ListenAndServe(":80", nil)) - }, - } - - website.WebsiteCommand.AddCommand(s3Command) +type server struct { + log zerolog.Logger } +func StartServer(ctx context.Context) jobs.Job { + if !config.Config.DigitalOcean.RunFakeServer { + return jobs.Noop() + } -func bucket_key(r *http.Request) (string, string) { + utils.Must0(os.MkdirAll(dir, fs.ModePerm)) + + s := server{ + log: logging.ExtractLogger(ctx).With(). + Str("module", "S3 server"). + Logger(), + } + http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + if r.Method == http.MethodGet { + s.getObject(w, r) + } else if r.Method == http.MethodPut { + s.putObject(w, r) + } else { + panic("Unimplemented method!") + } + }) + + job := jobs.New() + srv := http.Server{ + Addr: config.Config.DigitalOcean.FakeAddr, + } + + s.log.Info().Msg("Starting local S3 server") + go func() { + defer job.Done() + err := srv.ListenAndServe() + if err != nil { + if errors.Is(err, http.ErrServerClosed) { + // This is normal and fine + } else { + panic(err) + } + } + }() + + go func() { + <-ctx.Done() + s.log.Info().Msg("Shutting down local S3 server") + srv.Shutdown(context.Background()) + }() + + return job +} + +func (s *server) getObject(w http.ResponseWriter, r *http.Request) { + bucket, key := bucketKey(r) + + file := utils.Must1(os.Open(filepath.Join(dir, bucket, key))) + io.Copy(w, file) +} + +func (s *server) putObject(w http.ResponseWriter, r *http.Request) { + bucket, key := bucketKey(r) + + w.Header().Set("Location", fmt.Sprintf("/%s", bucket)) + utils.Must0(os.MkdirAll(filepath.Join(dir, bucket), fs.ModePerm)) + if key != "" { + file := utils.Must1(os.Create(filepath.Join(dir, bucket, key))) + io.Copy(file, r.Body) + } +} + +func bucketKey(r *http.Request) (string, string) { slashIdx := strings.IndexByte(r.URL.Path[1:], '/') if slashIdx == -1 { return r.URL.Path[1:], "" } else { - return r.URL.Path[1 : 1+slashIdx], strings.Replace(r.URL.Path[2+slashIdx:], "/", "~", -1) + return r.URL.Path[1 : 1+slashIdx], strings.ReplaceAll(r.URL.Path[2+slashIdx:], "/", "~") } } diff --git a/src/jobs/jobs.go b/src/jobs/jobs.go new file mode 100644 index 0000000..257deb3 --- /dev/null +++ b/src/jobs/jobs.go @@ -0,0 +1,40 @@ +package jobs + +type Job struct { + C <-chan struct{} + rawC chan struct{} +} + +func New() Job { + return newFromChannel(make(chan struct{})) +} + +func (j *Job) Done() { + close(j.rawC) +} + +// Combines multiple jobs into one. +func Zip(jobs ...Job) Job { + out := make(chan struct{}) + go func() { + for _, job := range jobs { + <-job.C + } + close(out) + }() + return newFromChannel(out) +} + +// Returns a job that is already done. +func Noop() Job { + job := New() + job.Done() + return job +} + +func newFromChannel(c chan struct{}) Job { + return Job{ + C: c, + rawC: c, + } +} diff --git a/src/main.go b/src/main.go index 1820e0d..2377cbc 100644 --- a/src/main.go +++ b/src/main.go @@ -2,7 +2,6 @@ package main import ( _ "git.handmade.network/hmn/hmn/src/admintools" - _ "git.handmade.network/hmn/hmn/src/hmns3" _ "git.handmade.network/hmn/hmn/src/assets" _ "git.handmade.network/hmn/hmn/src/buildscss" _ "git.handmade.network/hmn/hmn/src/discord/cmd" diff --git a/src/perf/perf.go b/src/perf/perf.go index d749e11..c4c5b00 100644 --- a/src/perf/perf.go +++ b/src/perf/perf.go @@ -5,6 +5,7 @@ import ( "fmt" "time" + "git.handmade.network/hmn/hmn/src/jobs" "github.com/rs/zerolog" ) @@ -109,20 +110,20 @@ type PerfStorage struct { type PerfCollector struct { In chan<- RequestPerf - Done <-chan struct{} + Job jobs.Job RequestCopy chan<- (chan<- PerfStorage) } func RunPerfCollector(ctx context.Context) *PerfCollector { in := make(chan RequestPerf) - done := make(chan struct{}) + job := jobs.New() requestCopy := make(chan (chan<- PerfStorage)) var storage PerfStorage // TODO(asaf): Load history from file go func() { - defer close(done) + defer job.Done() for { select { @@ -139,7 +140,7 @@ func RunPerfCollector(ctx context.Context) *PerfCollector { perfCollector := PerfCollector{ In: in, - Done: done, + Job: job, RequestCopy: requestCopy, } return &perfCollector diff --git a/src/twitch/twitch.go b/src/twitch/twitch.go index 62eaf96..5d5a4cc 100644 --- a/src/twitch/twitch.go +++ b/src/twitch/twitch.go @@ -10,6 +10,7 @@ import ( "git.handmade.network/hmn/hmn/src/db" "git.handmade.network/hmn/hmn/src/discord" "git.handmade.network/hmn/hmn/src/hmndata" + "git.handmade.network/hmn/hmn/src/jobs" "git.handmade.network/hmn/hmn/src/logging" "git.handmade.network/hmn/hmn/src/models" "git.handmade.network/hmn/hmn/src/oops" @@ -25,25 +26,23 @@ type twitchNotification struct { var twitchNotificationChannel chan twitchNotification var linksChangedChannel chan struct{} -func MonitorTwitchSubscriptions(ctx context.Context, dbConn *pgxpool.Pool) <-chan struct{} { +func MonitorTwitchSubscriptions(ctx context.Context, dbConn *pgxpool.Pool) jobs.Job { log := logging.ExtractLogger(ctx).With().Str("twitch goroutine", "stream monitor").Logger() ctx = logging.AttachLoggerToContext(&log, ctx) if config.Config.Twitch.ClientID == "" { log.Warn().Msg("No twitch config provided.") - done := make(chan struct{}, 1) - done <- struct{}{} - return done + return jobs.Noop() } twitchNotificationChannel = make(chan twitchNotification, 100) linksChangedChannel = make(chan struct{}, 10) - done := make(chan struct{}) + job := jobs.New() go func() { defer func() { log.Info().Msg("Shutting down twitch monitor") - done <- struct{}{} + job.Done() }() log.Info().Msg("Running twitch monitor...") @@ -114,7 +113,7 @@ func MonitorTwitchSubscriptions(ctx context.Context, dbConn *pgxpool.Pool) <-cha } }() - return done + return job } type twitchNotificationType int diff --git a/src/website/website.go b/src/website/website.go index 7aec3cd..2ed8337 100644 --- a/src/website/website.go +++ b/src/website/website.go @@ -14,6 +14,8 @@ import ( "git.handmade.network/hmn/hmn/src/config" "git.handmade.network/hmn/hmn/src/db" "git.handmade.network/hmn/hmn/src/discord" + "git.handmade.network/hmn/hmn/src/hmns3" + "git.handmade.network/hmn/hmn/src/jobs" "git.handmade.network/hmn/hmn/src/logging" "git.handmade.network/hmn/hmn/src/perf" "git.handmade.network/hmn/hmn/src/templates" @@ -41,13 +43,14 @@ var WebsiteCommand = &cobra.Command{ Handler: NewWebsiteRoutes(longRequestContext, conn), } - backgroundJobsDone := zipJobs( + backgroundJobsDone := jobs.Zip( auth.PeriodicallyDeleteExpiredSessions(backgroundJobContext, conn), auth.PeriodicallyDeleteInactiveUsers(backgroundJobContext, conn), - perfCollector.Done, + perfCollector.Job, discord.RunDiscordBot(backgroundJobContext, conn), discord.RunHistoryWatcher(backgroundJobContext, conn), twitch.MonitorTwitchSubscriptions(backgroundJobContext, conn), + hmns3.StartServer(backgroundJobContext), ) signals := make(chan os.Signal, 1) @@ -81,17 +84,6 @@ var WebsiteCommand = &cobra.Command{ logging.Error().Err(serverErr).Msg("Server shut down unexpectedly") } - <-backgroundJobsDone + <-backgroundJobsDone.C }, } - -func zipJobs(cs ...<-chan struct{}) <-chan struct{} { - out := make(chan struct{}) - go func() { - for _, c := range cs { - <-c - } - close(out) - }() - return out -}