Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 93 additions & 15 deletions modules/file/service_minio.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,67 @@
// itself is never deleted from — bucket count is bounded by the
// allow-list, so growth is O(allowed buckets).
bucketLocks sync.Map

// bucketReady records buckets whose policy has been successfully
// applied within this process lifetime. ensureBucket consults it
// inside `bucketLocks` so the first caller after process start runs
// the full BucketExists / MakeBucket / SetBucketPolicy sequence and
// every subsequent caller short-circuits. Together with bucketLocks
// this means: one bootstrap round-trip per bucket per process,
// regardless of upload concurrency. The set never shrinks — drift
// recovery only happens on process restart, which matches the
// `minio-init`-pre-creates-bucket failure mode that motivated #77.
bucketReady sync.Map
}

// NewServiceMinio NewServiceMinio
func NewServiceMinio(ctx *config.Context) *ServiceMinio {
return &ServiceMinio{
sm := &ServiceMinio{
Log: log.NewTLog("File"),
ctx: ctx,
downloadClient: &http.Client{
Timeout: time.Second * 30,
},
}
// Self-heal MinIO bucket policy on every process start (#77). Without
// this proactive sweep, a pre-provisioned bucket whose policy was never
// applied (e.g. `minio-init` excluded `group` from its anonymous-read
// loop) only gets repaired the next time someone happens to upload to
// it — read-only buckets (group avatars exist but no fresh writes
// happen) stay broken across restarts. The sweep is async + best
// effort: ensureBucket failures are logged but don't block startup,
// and the upload path retries via the same ensureBucket on demand.
// Skipped under cfg.Test=true to keep unit/integration tests
// deterministic — they exercise ensureBucket explicitly via the upload
// entry points.
if !ctx.GetConfig().Test {
go sm.bootstrapAllowedBuckets()
}
return sm
}

// bootstrapAllowedBuckets runs `ensureBucket` against every bucket in the
// allow-list once at process start. Errors are logged at WARN — the upload
// path will retry on demand via the same ensureBucket call, so a transient
// MinIO outage during boot does not require a restart to recover. A 30s
// total budget keeps the goroutine from leaking if MinIO is unreachable
// indefinitely (10 buckets × ~3s per BucketExists is generous; later
// buckets short-circuit on the per-process memo if earlier ones succeed
// after a slow start).
func (sm *ServiceMinio) bootstrapAllowedBuckets() {
client, err := sm.newClient()
if err != nil {
sm.Warn("MinIO startup bootstrap: 创建 client 失败,跳过预扫", zap.Error(err))
return
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
for bucket := range allowedMinioBuckets {
if err := sm.ensureBucket(ctx, client, bucket); err != nil {
sm.Warn("MinIO startup bootstrap: bucket 初始化失败,下次上传时会重试",
zap.String("bucket", bucket), zap.Error(err))
}
}
}

// ensureBucket guarantees that `bucket` exists on the MinIO server and has
Expand All @@ -82,40 +132,68 @@
// race the policy update. The `BucketAlreadyOwnedByYou` S3 response is
// swallowed as a benign no-op for the case where another process (or another
// node sharing these credentials) won the create race.
//
// The full bootstrap (BucketExists + optional MakeBucket + SetBucketPolicy)
// runs once per bucket per process. Deployers frequently pre-provision
// buckets via `mc mb` / `minio-init` containers before this process starts;
// the original implementation skipped SetBucketPolicy when BucketExists
// returned true, so those pre-existing buckets stayed policy-less and any
// browser-direct GET (e.g. group avatars) returned 403 (#77). The fix is to
// apply the policy on the first ensureBucket call of each bucket regardless
// of who created it, then memoize via `bucketReady` so the hot upload path
// is not paying an extra HTTP round-trip per request. The memo is process-
// local: a restart re-runs the bootstrap, which is exactly the self-healing
// behaviour wanted for the minio-init-pre-creates-bucket failure mode.
func (sm *ServiceMinio) ensureBucket(ctx context.Context, client *minio.Client, bucket string) error {
if _, ok := sm.bucketReady.Load(bucket); ok {
return nil
}

mtxIface, _ := sm.bucketLocks.LoadOrStore(bucket, &sync.Mutex{})
mtx := mtxIface.(*sync.Mutex)
mtx.Lock()
defer mtx.Unlock()

// Re-check inside the lock: a parallel cold-start caller may have
// finished the bootstrap while we were waiting on the mutex.
if _, ok := sm.bucketReady.Load(bucket); ok {
return nil
}

exists, err := client.BucketExists(ctx, bucket)
if err != nil {
sm.Error(fmt.Sprintf("检测 %s目录是否存在错误", bucket), zap.Error(err))
return err
}
if exists {
return nil
}

if err := client.MakeBucket(ctx, bucket, minio.MakeBucketOptions{Region: minioDefaultRegion}); err != nil {
// Another caller (different process / different node sharing the
// same credentials) may have created the bucket between our
// BucketExists call and our MakeBucket call. Treat that specific
// S3 response as a no-op rather than a hard failure.
if minio.ToErrorResponse(err).Code == minioBucketAlreadyOwnedByYou {
sm.Info("bucket already owned by us, skipping create", zap.String("bucket", bucket))
} else {
sm.Error(fmt.Sprintf("创建 %s目录失败", bucket), zap.Error(err))
return err
if !exists {
if err := client.MakeBucket(ctx, bucket, minio.MakeBucketOptions{Region: minioDefaultRegion}); err != nil {
// Another caller (different process / different node sharing
// the same credentials) may have created the bucket between
// our BucketExists call and our MakeBucket call. Treat that
// specific S3 response as a no-op rather than a hard failure
// and fall through to SetBucketPolicy — the race winner may
// not have applied the policy yet, and a redundant Set is
// cheap.
if minio.ToErrorResponse(err).Code == minioBucketAlreadyOwnedByYou {
sm.Info("bucket already owned by us, skipping create", zap.String("bucket", bucket))

Check warning

Code scanning / CodeQL

Log entries created from user input Medium

This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
} else {
sm.Error(fmt.Sprintf("创建 %s目录失败", bucket), zap.Error(err))

Check warning

Code scanning / CodeQL

Log entries created from user input Medium

This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
This log entry depends on a
user-provided value
.
return err
}
}
}

// Read-only public policy: allow anonymous download only. Upload and
// delete go through authenticated server-side credentials.
// delete go through authenticated server-side credentials. Applied
// whether the bucket was just created or pre-existed — see function
// doc for the self-healing rationale.
if err := client.SetBucketPolicy(ctx, bucket, fmt.Sprintf(readOnlyAnonymousPolicy, bucket)); err != nil {
sm.Error("设置minio文件读写权限错误", zap.Error(err))
return err
}

sm.bucketReady.Store(bucket, struct{}{})
return nil
}

Expand Down
149 changes: 147 additions & 2 deletions modules/file/service_minio_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (
// newFakeMinioServer returns an httptest.Server that answers just enough of
// the MinIO HTTP surface to let `ensureBucket` succeed:
//
// - HEAD /<bucket>/ → 200 (BucketExists returns true, skipping MakeBucket
// and SetBucketPolicy entirely)
// - HEAD /<bucket>/ → 200 (BucketExists returns true; ensureBucket then
// reapplies SetBucketPolicy unconditionally per the #77 self-heal fix)
// - everything else → 200 with empty body, so the test never panics on an
// unexpected request shape
//
Expand Down Expand Up @@ -336,3 +336,148 @@ func TestPresignedPutURL_ConcurrentBucketBootstrap(t *testing.T) {
assert.Equal(t, int32(1), policyCount.Load(),
"SetBucketPolicy should run exactly once for a fresh shared bucket; ran %d times", policyCount.Load())
}

// TestEnsureBucket_SelfHealsPolicyOnExistingBucket reproduces issue #77: when
// a bucket already exists at startup (typically because a `minio-init`
// container or `mc mb` ran before octo-server), ensureBucket must still apply
// the read-only anonymous policy. The old behaviour returned early on
// `exists=true` and never called SetBucketPolicy, leaving pre-provisioned
// buckets policy-less — anonymous GETs against them then returned 403, which
// breaks any browser-direct asset (e.g. group avatars in the `group` bucket).
//
// The fake MinIO server returns 200 on HEAD (bucket already exists) and
// records every PUT request. After triggering ensureBucket via PresignedPutURL
// we assert:
// - MakeBucket was NOT called (no PUT without `policy` query key)
// - SetBucketPolicy WAS called at least once (PUT with `policy` query key)
//
// Before the fix this test fails (policyCount == 0). After the fix it passes
// and the bootstrap is self-healing on every process start.
func TestEnsureBucket_SelfHealsPolicyOnExistingBucket(t *testing.T) {
var (
makeCount atomic.Int32
policyCount atomic.Int32
)

srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case http.MethodHead:
// Bucket already exists from the deployer's perspective.
w.WriteHeader(http.StatusOK)
case http.MethodPut:
if _, ok := r.URL.Query()["policy"]; ok {
policyCount.Add(1)
} else {
makeCount.Add(1)
}
w.WriteHeader(http.StatusOK)
default:
w.WriteHeader(http.StatusOK)
}
}))
t.Cleanup(srv.Close)

cfg := config.New()
cfg.Test = true
cfg.Minio.URL = srv.URL
cfg.Minio.UploadURL = srv.URL
cfg.Minio.DownloadURL = "https://public.example.com"
cfg.Minio.AccessKeyID = "test-access-key"
cfg.Minio.SecretAccessKey = "test-secret-access-key-1234567890"

ctx := testutil.NewTestContext(cfg)
svc := file.NewServiceMinio(ctx)

// PresignedPutURL invokes ensureBucket on the resolved bucket. Use the
// `group` prefix specifically — that is the bucket #77 reported as
// broken on real deployments (group avatars served from /group/...).
_, _, err := svc.PresignedPutURL("group/avatar/abc.png", "image/png", "", 1024, time.Minute)
require.NoError(t, err)

assert.Equal(t, int32(0), makeCount.Load(),
"MakeBucket must not run when the bucket already exists; ran %d times", makeCount.Load())
assert.GreaterOrEqual(t, policyCount.Load(), int32(1),
"SetBucketPolicy must run on every ensureBucket call to self-heal pre-provisioned buckets; ran %d times", policyCount.Load())
}

// TestNewServiceMinio_BootstrapsAllAllowedBucketsAtStartup is the proof that
// `NewServiceMinio` actually performs the policy repair *at process start*,
// not just on the first upload (PR#87 review feedback from Jerry-Xin).
//
// The earlier #77 fix made ensureBucket reapply the policy whenever it ran,
// but ensureBucket only runs from UploadFile / PresignedPutURL. A bucket
// that is read-only after seed (e.g. `group` avatars uploaded once at group
// creation and never written to again) would never see a repair across
// restarts. This test pins the new behaviour: NewServiceMinio kicks off an
// async bootstrap goroutine that walks the full `allowedMinioBuckets` set
// and runs ensureBucket against each one.
//
// Setup: fake MinIO answers HEAD with 200 (every bucket pre-exists) and
// records every PUT. After constructing the service we poll until policy
// count reaches the expected total (= 10 = len(allowedMinioBuckets), kept
// as a literal because the var is unexported; helpers.go is the source of
// truth — bump this number if the allow-list changes). MakeBucket must
// stay at zero because BucketExists returned true for all of them.
//
// cfg.Test must be false so the bootstrap goroutine actually runs; the
// other integration tests in this file set cfg.Test=true precisely to
// suppress this side effect and keep their counters deterministic.
func TestNewServiceMinio_BootstrapsAllAllowedBucketsAtStartup(t *testing.T) {
var (
headCount atomic.Int32
makeCount atomic.Int32
policyCount atomic.Int32
)

srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case http.MethodHead:
headCount.Add(1)
w.WriteHeader(http.StatusOK)
case http.MethodPut:
if _, ok := r.URL.Query()["policy"]; ok {
policyCount.Add(1)
} else {
makeCount.Add(1)
}
w.WriteHeader(http.StatusOK)
default:
w.WriteHeader(http.StatusOK)
}
}))
t.Cleanup(srv.Close)

cfg := config.New()
// cfg.Test left as the zero value (false) so NewServiceMinio runs the
// bootstrap goroutine. The other tests in this file pin it true.
cfg.Minio.URL = srv.URL
cfg.Minio.UploadURL = srv.URL
cfg.Minio.DownloadURL = "https://public.example.com"
cfg.Minio.AccessKeyID = "test-access-key"
cfg.Minio.SecretAccessKey = "test-secret-access-key-1234567890"

ctx := testutil.NewTestContext(cfg)
// testutil.NewTestContext force-sets cfg.Test=true, which by design
// short-circuits the bootstrap goroutine. Flip it back so this test
// can observe the production code path.
ctx.GetConfig().Test = false
_ = file.NewServiceMinio(ctx)

// Bump this if `allowedMinioBuckets` in helpers.go grows or shrinks.
const wantBuckets int32 = 10

deadline := time.Now().Add(5 * time.Second)
for time.Now().Before(deadline) {
if policyCount.Load() >= wantBuckets {
break
}
time.Sleep(20 * time.Millisecond)
}

assert.Equal(t, wantBuckets, policyCount.Load(),
"startup bootstrap must run SetBucketPolicy once per allowed bucket; got %d, want %d", policyCount.Load(), wantBuckets)
assert.Equal(t, int32(0), makeCount.Load(),
"startup bootstrap must not call MakeBucket when buckets already exist; got %d", makeCount.Load())
assert.GreaterOrEqual(t, headCount.Load(), wantBuckets,
"startup bootstrap must HEAD each bucket; got %d, want >= %d", headCount.Load(), wantBuckets)
}
Loading