Integrate Nick's local S3 server

Works like a charm!

Small tweak for clarity
This commit is contained in:
Ben Visness 2022-05-14 00:33:00 -05:00
parent 4187a3b6ca
commit c1fa6cae13
15 changed files with 179 additions and 115 deletions

1
.gitignore vendored
View File

@ -13,3 +13,4 @@ hmn.conf
adminmailer/config.go
adminmailer/adminmailer
local/backups
/tmp

View File

@ -67,7 +67,7 @@ func SanitizeFilename(filename string) string {
}
func AssetKey(id, filename string) string {
return fmt.Sprintf("%s%s/%s", config.Config.DigitalOcean.AssetsPathPrefix, id, filename)
return fmt.Sprintf("%s/%s", id, filename)
}
type InvalidAssetError error

View File

@ -14,6 +14,7 @@ import (
"time"
"git.handmade.network/hmn/hmn/src/db"
"git.handmade.network/hmn/hmn/src/jobs"
"git.handmade.network/hmn/hmn/src/logging"
"git.handmade.network/hmn/hmn/src/models"
"git.handmade.network/hmn/hmn/src/oops"
@ -234,10 +235,10 @@ func DeleteExpiredPasswordResets(ctx context.Context, conn *pgxpool.Pool) (int64
return tag.RowsAffected(), nil
}
func PeriodicallyDeleteInactiveUsers(ctx context.Context, conn *pgxpool.Pool) <-chan struct{} {
done := make(chan struct{})
func PeriodicallyDeleteInactiveUsers(ctx context.Context, conn *pgxpool.Pool) jobs.Job {
job := jobs.New()
go func() {
defer close(done)
defer job.Done()
t := time.NewTicker(1 * time.Hour)
for {
@ -265,5 +266,5 @@ func PeriodicallyDeleteInactiveUsers(ctx context.Context, conn *pgxpool.Pool) <-
}
}
}()
return done
return job
}

View File

@ -11,6 +11,7 @@ import (
"git.handmade.network/hmn/hmn/src/config"
"git.handmade.network/hmn/hmn/src/db"
"git.handmade.network/hmn/hmn/src/jobs"
"git.handmade.network/hmn/hmn/src/logging"
"git.handmade.network/hmn/hmn/src/models"
"git.handmade.network/hmn/hmn/src/oops"
@ -132,10 +133,10 @@ func DeleteExpiredSessions(ctx context.Context, conn *pgxpool.Pool) (int64, erro
return tag.RowsAffected(), nil
}
func PeriodicallyDeleteExpiredSessions(ctx context.Context, conn *pgxpool.Pool) <-chan struct{} {
done := make(chan struct{})
func PeriodicallyDeleteExpiredSessions(ctx context.Context, conn *pgxpool.Pool) jobs.Job {
job := jobs.New()
go func() {
defer close(done)
defer job.Done()
t := time.NewTicker(1 * time.Minute)
for {
@ -154,5 +155,5 @@ func PeriodicallyDeleteExpiredSessions(ctx context.Context, conn *pgxpool.Pool)
}
}
}()
return done
return job
}

View File

@ -38,13 +38,20 @@ var Config = HMNConfig{
OverrideRecipientEmail: "override@handmade.network", // NOTE(asaf): If this is not empty, all emails will be redirected to this address.
},
DigitalOcean: DigitalOceanConfig{
AssetsSpacesKey: "",
AssetsSpacesSecret: "",
AssetsSpacesRegion: "",
AssetsSpacesEndpoint: "",
AssetsSpacesBucket: "",
AssetsPathPrefix: "", // Empty is fine for production, but may be necessary for dev
AssetsPublicUrlRoot: "", // e.g. "https://bucket-name.region.cdn.digitaloceanspaces.com/". Note the trailing slash...
AssetsSpacesKey: "dummy",
AssetsSpacesSecret: "dummy",
AssetsSpacesRegion: "dummy",
AssetsSpacesEndpoint: "http://handmade.local:9003/",
AssetsSpacesBucket: "assets",
AssetsPublicUrlRoot: "http://handmade.local:9003/assets/",
// In prod, AssetsPublicUrlRoot will probably look something like:
//
// "https://bucket-name.region.cdn.digitaloceanspaces.com/"
//
// Note the trailing slash...
RunFakeServer: true,
FakeAddr: "localhost:9003",
},
Discord: DiscordConfig{
BotToken: "",

View File

@ -53,8 +53,10 @@ type DigitalOceanConfig struct {
AssetsSpacesRegion string
AssetsSpacesEndpoint string
AssetsSpacesBucket string
AssetsPathPrefix string
AssetsPublicUrlRoot string
RunFakeServer bool
FakeAddr string
}
type EmailConfig struct {

View File

@ -13,6 +13,7 @@ import (
"git.handmade.network/hmn/hmn/src/config"
"git.handmade.network/hmn/hmn/src/db"
"git.handmade.network/hmn/hmn/src/jobs"
"git.handmade.network/hmn/hmn/src/logging"
"git.handmade.network/hmn/hmn/src/models"
"git.handmade.network/hmn/hmn/src/oops"
@ -22,22 +23,20 @@ import (
"github.com/jpillora/backoff"
)
func RunDiscordBot(ctx context.Context, dbConn *pgxpool.Pool) <-chan struct{} {
func RunDiscordBot(ctx context.Context, dbConn *pgxpool.Pool) jobs.Job {
log := logging.ExtractLogger(ctx).With().Str("module", "discord").Logger()
ctx = logging.AttachLoggerToContext(&log, ctx)
if config.Config.Discord.BotToken == "" {
log.Warn().Msg("No Discord bot token was provided, so the Discord bot cannot run.")
done := make(chan struct{}, 1)
done <- struct{}{}
return done
return jobs.Noop()
}
done := make(chan struct{})
job := jobs.New()
go func() {
defer func() {
log.Debug().Msg("shut down Discord bot")
done <- struct{}{}
job.Done()
}()
boff := backoff.Backoff{
@ -88,7 +87,7 @@ func RunDiscordBot(ctx context.Context, dbConn *pgxpool.Pool) <-chan struct{} {
}()
}
}()
return done
return job
}
var outgoingMessagesReady = make(chan struct{}, 1)

View File

@ -7,27 +7,26 @@ import (
"git.handmade.network/hmn/hmn/src/config"
"git.handmade.network/hmn/hmn/src/db"
"git.handmade.network/hmn/hmn/src/jobs"
"git.handmade.network/hmn/hmn/src/logging"
"git.handmade.network/hmn/hmn/src/models"
"github.com/jackc/pgx/v4/pgxpool"
)
func RunHistoryWatcher(ctx context.Context, dbConn *pgxpool.Pool) <-chan struct{} {
func RunHistoryWatcher(ctx context.Context, dbConn *pgxpool.Pool) jobs.Job {
log := logging.ExtractLogger(ctx).With().Str("discord goroutine", "history watcher").Logger()
ctx = logging.AttachLoggerToContext(&log, ctx)
if config.Config.Discord.BotToken == "" {
log.Warn().Msg("No Discord bot token was provided, so the Discord history bot cannot run.")
done := make(chan struct{}, 1)
done <- struct{}{}
return done
return jobs.Noop()
}
done := make(chan struct{})
job := jobs.New()
go func() {
defer func() {
log.Debug().Msg("shut down Discord history watcher")
done <- struct{}{}
job.Done()
}()
newUserTicker := time.NewTicker(5 * time.Second)
@ -67,7 +66,7 @@ func RunHistoryWatcher(ctx context.Context, dbConn *pgxpool.Pool) <-chan struct{
}
}()
return done
return job
}
func fetchMissingContent(ctx context.Context, dbConn *pgxpool.Pool) {

View File

@ -803,7 +803,7 @@ func HandleSnippetForInternedMessage(ctx context.Context, dbConn db.ConnOrTx, in
`
SELECT $columns{tag}
FROM
tags
tag
JOIN project ON project.tag = tag.id
WHERE
project.id = ANY ($1)

View File

@ -1,78 +1,101 @@
package hmns3
import (
_ "embed"
"context"
"errors"
"fmt"
"git.handmade.network/hmn/hmn/src/website"
"github.com/spf13/cobra"
"io"
"io/fs"
"log"
"net/http"
"os"
"path/filepath"
"strings"
"git.handmade.network/hmn/hmn/src/config"
"git.handmade.network/hmn/hmn/src/jobs"
"git.handmade.network/hmn/hmn/src/logging"
"git.handmade.network/hmn/hmn/src/utils"
"github.com/rs/zerolog"
)
func init() {
s3Command := &cobra.Command{
Use: "hmns3 [storage folder]",
Short: "Run a local s3 server that stores in the filesystem",
Run: func(cmd *cobra.Command, args []string) {
targetFolder := "./tmp"
if len(args) > 0 {
targetFolder = args[0]
}
err := os.MkdirAll(targetFolder, fs.ModePerm)
if err != nil {
panic(err)
}
const dir = "./tmp/s3"
handler := func(w http.ResponseWriter, r *http.Request) {
bucket, key := bucket_key(r)
fmt.Println("\n\nIncoming request path:", r.URL.Path)
bodyBytes, err := io.ReadAll(r.Body)
fmt.Println("Bucket: ", bucket, " key: ", key, " method: ", r.Method, " len(body): ", len(bodyBytes))
if err != nil {
panic(err)
}
if r.Method == http.MethodPut {
w.Header().Set("Location", fmt.Sprintf("/%s", bucket))
err := os.MkdirAll(fmt.Sprintf("%s/%s", targetFolder, bucket), fs.ModePerm)
if err != nil {
panic(err)
}
if key != "" {
err = os.WriteFile(fmt.Sprintf("%s/%s/%s",targetFolder, bucket, key), bodyBytes, fs.ModePerm)
if err != nil {
panic(err)
}
}
} else if r.Method == http.MethodGet {
fileBytes, err := os.ReadFile(fmt.Sprintf("%s/%s/%s", targetFolder, bucket, key))
if err != nil {
panic(err)
}
w.Write(fileBytes)
} else {
panic("Unimplemented method!")
}
}
http.HandleFunc("/", handler)
log.Fatal(http.ListenAndServe(":80", nil))
},
}
website.WebsiteCommand.AddCommand(s3Command)
type server struct {
log zerolog.Logger
}
func StartServer(ctx context.Context) jobs.Job {
if !config.Config.DigitalOcean.RunFakeServer {
return jobs.Noop()
}
func bucket_key(r *http.Request) (string, string) {
utils.Must0(os.MkdirAll(dir, fs.ModePerm))
s := server{
log: logging.ExtractLogger(ctx).With().
Str("module", "S3 server").
Logger(),
}
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodGet {
s.getObject(w, r)
} else if r.Method == http.MethodPut {
s.putObject(w, r)
} else {
panic("Unimplemented method!")
}
})
job := jobs.New()
srv := http.Server{
Addr: config.Config.DigitalOcean.FakeAddr,
}
s.log.Info().Msg("Starting local S3 server")
go func() {
defer job.Done()
err := srv.ListenAndServe()
if err != nil {
if errors.Is(err, http.ErrServerClosed) {
// This is normal and fine
} else {
panic(err)
}
}
}()
go func() {
<-ctx.Done()
s.log.Info().Msg("Shutting down local S3 server")
srv.Shutdown(context.Background())
}()
return job
}
func (s *server) getObject(w http.ResponseWriter, r *http.Request) {
bucket, key := bucketKey(r)
file := utils.Must1(os.Open(filepath.Join(dir, bucket, key)))
io.Copy(w, file)
}
func (s *server) putObject(w http.ResponseWriter, r *http.Request) {
bucket, key := bucketKey(r)
w.Header().Set("Location", fmt.Sprintf("/%s", bucket))
utils.Must0(os.MkdirAll(filepath.Join(dir, bucket), fs.ModePerm))
if key != "" {
file := utils.Must1(os.Create(filepath.Join(dir, bucket, key)))
io.Copy(file, r.Body)
}
}
func bucketKey(r *http.Request) (string, string) {
slashIdx := strings.IndexByte(r.URL.Path[1:], '/')
if slashIdx == -1 {
return r.URL.Path[1:], ""
} else {
return r.URL.Path[1 : 1+slashIdx], strings.Replace(r.URL.Path[2+slashIdx:], "/", "~", -1)
return r.URL.Path[1 : 1+slashIdx], strings.ReplaceAll(r.URL.Path[2+slashIdx:], "/", "~")
}
}

40
src/jobs/jobs.go Normal file
View File

@ -0,0 +1,40 @@
package jobs
type Job struct {
C <-chan struct{}
rawC chan struct{}
}
func New() Job {
return newFromChannel(make(chan struct{}))
}
func (j *Job) Done() {
close(j.rawC)
}
// Combines multiple jobs into one.
func Zip(jobs ...Job) Job {
out := make(chan struct{})
go func() {
for _, job := range jobs {
<-job.C
}
close(out)
}()
return newFromChannel(out)
}
// Returns a job that is already done.
func Noop() Job {
job := New()
job.Done()
return job
}
func newFromChannel(c chan struct{}) Job {
return Job{
C: c,
rawC: c,
}
}

View File

@ -2,7 +2,6 @@ package main
import (
_ "git.handmade.network/hmn/hmn/src/admintools"
_ "git.handmade.network/hmn/hmn/src/hmns3"
_ "git.handmade.network/hmn/hmn/src/assets"
_ "git.handmade.network/hmn/hmn/src/buildscss"
_ "git.handmade.network/hmn/hmn/src/discord/cmd"

View File

@ -5,6 +5,7 @@ import (
"fmt"
"time"
"git.handmade.network/hmn/hmn/src/jobs"
"github.com/rs/zerolog"
)
@ -109,20 +110,20 @@ type PerfStorage struct {
type PerfCollector struct {
In chan<- RequestPerf
Done <-chan struct{}
Job jobs.Job
RequestCopy chan<- (chan<- PerfStorage)
}
func RunPerfCollector(ctx context.Context) *PerfCollector {
in := make(chan RequestPerf)
done := make(chan struct{})
job := jobs.New()
requestCopy := make(chan (chan<- PerfStorage))
var storage PerfStorage
// TODO(asaf): Load history from file
go func() {
defer close(done)
defer job.Done()
for {
select {
@ -139,7 +140,7 @@ func RunPerfCollector(ctx context.Context) *PerfCollector {
perfCollector := PerfCollector{
In: in,
Done: done,
Job: job,
RequestCopy: requestCopy,
}
return &perfCollector

View File

@ -10,6 +10,7 @@ import (
"git.handmade.network/hmn/hmn/src/db"
"git.handmade.network/hmn/hmn/src/discord"
"git.handmade.network/hmn/hmn/src/hmndata"
"git.handmade.network/hmn/hmn/src/jobs"
"git.handmade.network/hmn/hmn/src/logging"
"git.handmade.network/hmn/hmn/src/models"
"git.handmade.network/hmn/hmn/src/oops"
@ -25,25 +26,23 @@ type twitchNotification struct {
var twitchNotificationChannel chan twitchNotification
var linksChangedChannel chan struct{}
func MonitorTwitchSubscriptions(ctx context.Context, dbConn *pgxpool.Pool) <-chan struct{} {
func MonitorTwitchSubscriptions(ctx context.Context, dbConn *pgxpool.Pool) jobs.Job {
log := logging.ExtractLogger(ctx).With().Str("twitch goroutine", "stream monitor").Logger()
ctx = logging.AttachLoggerToContext(&log, ctx)
if config.Config.Twitch.ClientID == "" {
log.Warn().Msg("No twitch config provided.")
done := make(chan struct{}, 1)
done <- struct{}{}
return done
return jobs.Noop()
}
twitchNotificationChannel = make(chan twitchNotification, 100)
linksChangedChannel = make(chan struct{}, 10)
done := make(chan struct{})
job := jobs.New()
go func() {
defer func() {
log.Info().Msg("Shutting down twitch monitor")
done <- struct{}{}
job.Done()
}()
log.Info().Msg("Running twitch monitor...")
@ -114,7 +113,7 @@ func MonitorTwitchSubscriptions(ctx context.Context, dbConn *pgxpool.Pool) <-cha
}
}()
return done
return job
}
type twitchNotificationType int

View File

@ -14,6 +14,8 @@ import (
"git.handmade.network/hmn/hmn/src/config"
"git.handmade.network/hmn/hmn/src/db"
"git.handmade.network/hmn/hmn/src/discord"
"git.handmade.network/hmn/hmn/src/hmns3"
"git.handmade.network/hmn/hmn/src/jobs"
"git.handmade.network/hmn/hmn/src/logging"
"git.handmade.network/hmn/hmn/src/perf"
"git.handmade.network/hmn/hmn/src/templates"
@ -41,13 +43,14 @@ var WebsiteCommand = &cobra.Command{
Handler: NewWebsiteRoutes(longRequestContext, conn),
}
backgroundJobsDone := zipJobs(
backgroundJobsDone := jobs.Zip(
auth.PeriodicallyDeleteExpiredSessions(backgroundJobContext, conn),
auth.PeriodicallyDeleteInactiveUsers(backgroundJobContext, conn),
perfCollector.Done,
perfCollector.Job,
discord.RunDiscordBot(backgroundJobContext, conn),
discord.RunHistoryWatcher(backgroundJobContext, conn),
twitch.MonitorTwitchSubscriptions(backgroundJobContext, conn),
hmns3.StartServer(backgroundJobContext),
)
signals := make(chan os.Signal, 1)
@ -81,17 +84,6 @@ var WebsiteCommand = &cobra.Command{
logging.Error().Err(serverErr).Msg("Server shut down unexpectedly")
}
<-backgroundJobsDone
<-backgroundJobsDone.C
},
}
func zipJobs(cs ...<-chan struct{}) <-chan struct{} {
out := make(chan struct{})
go func() {
for _, c := range cs {
<-c
}
close(out)
}()
return out
}