Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 109 additions & 6 deletions cmd/msgvault/cmd/remove_account.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bufio"
"fmt"
"os"
"path/filepath"
"strings"

"github.com/spf13/cobra"
Expand All @@ -26,8 +27,8 @@ and mbox), use --type to specify which one to remove.
The Parquet analytics cache is deleted because it is shared across accounts
and must be rebuilt. Run 'msgvault build-cache' afterward to rebuild it.

Orphaned participants and attachment files on disk are not cleaned up;
use 'msgvault gc' (when available) to reclaim that space.
Attachment files on disk that are not shared with another account are deleted.
Shared attachments (same content hash across multiple accounts) are kept.

Examples:
msgvault remove-account you@gmail.com
Expand Down Expand Up @@ -114,10 +115,113 @@ func runRemoveAccount(cmd *cobra.Command, args []string) error {
}
}

// Collect attachment paths unique to this source before the cascade deletes them.
attachmentPaths, err := s.AttachmentPathsUniqueToSource(source.ID)
if err != nil {
return fmt.Errorf("collect attachment paths: %w", err)
}

// Check for any running syncs BEFORE RemoveSource. sync_runs rows for this
// source are cascade-deleted along with the source, so checking afterward
// would miss a running sync on the account being removed.
//
// Attachment ingestion writes the file first (StoreAttachmentFile) and
// inserts the DB row second (UpsertAttachment). Deleting files while any
// sync is in that window can remove a blob that a concurrent writer has
// already placed on disk but not yet recorded. We skip file deletion
// entirely in that case; the DB removal still proceeds.
skipFileDeletion := false
skipReason := ""
if len(attachmentPaths) > 0 {
anySyncRunning, err := s.HasAnyActiveSync()
if err != nil {
skipFileDeletion = true
skipReason = fmt.Sprintf("could not check for active syncs: %v", err)
} else if anySyncRunning {
skipFileDeletion = true
skipReason = "a sync is in progress"
}
}

if err := s.RemoveSource(source.ID); err != nil {
return fmt.Errorf("remove account: %w", err)
}

// Delete attachment files that are no longer referenced by any source.
attachmentsDir := cfg.AttachmentsDir()
var deletedFiles int
if len(attachmentPaths) > 0 {
if skipFileDeletion {
// The account DB rows are gone but files remain. Since the account
// no longer exists, the user cannot re-run remove-account; direct
// them to the attachments directory instead.
fmt.Fprintf(os.Stderr,
"Warning: %s; attachment files were not deleted.\n"+
"Orphaned files may remain in %s\n",
skipReason, attachmentsDir,
)
} else {
// No syncs were running when we checked. Re-check each path
// immediately before deletion as a last-ditch guard: after
// RemoveSource the cascade has cleared this source's attachment
// rows, so IsAttachmentPathReferenced will catch any new reference
// a concurrent sync for a different source may have inserted.

// Resolve attachmentsDir to an absolute path once so the
// containment check below works correctly regardless of the
// process working directory.
cleanDir, err := filepath.Abs(attachmentsDir)
if err != nil {
fmt.Fprintf(os.Stderr,
"Warning: could not resolve attachments dir; skipping file deletion: %v\n", err,
)
cleanDir = ""
}

var failedFiles int
for _, relPath := range attachmentPaths {
absPath := filepath.Join(cleanDir, relPath)

// Guard against a corrupt or tampered DB entry that could
// otherwise cause os.Remove to target a file outside the
// attachments directory via path components such as "..".
if cleanDir != "" {
rel, err := filepath.Rel(cleanDir, absPath)
if err != nil || rel == ".." || strings.HasPrefix(rel, ".."+string(filepath.Separator)) {
fmt.Fprintf(os.Stderr,
"Warning: attachment path %q escapes attachments directory, skipping\n",
relPath,
)
failedFiles++
continue
}
}

if referenced, err := s.IsAttachmentPathReferenced(relPath); err != nil {
fmt.Fprintf(os.Stderr,
"Warning: could not verify attachment %s is unreferenced: %v\n",
relPath, err,
)
failedFiles++
continue
} else if referenced {
continue
}
if err := os.Remove(absPath); err != nil && !os.IsNotExist(err) {
failedFiles++
} else {
deletedFiles++
}
}
if failedFiles > 0 {
fmt.Fprintf(os.Stderr,
"Warning: could not remove %d attachment file(s) from disk.\n",
failedFiles,
)
}
}
}

// Remove credentials for the source type.
switch source.SourceType {
case "gmail":
Expand Down Expand Up @@ -183,13 +287,12 @@ func runRemoveAccount(cmd *cobra.Command, args []string) error {
}

fmt.Printf("\nAccount %s removed.\n", email)
if deletedFiles > 0 {
fmt.Printf("Deleted %d attachment file(s) from disk.\n", deletedFiles)
}
fmt.Println(
"Run 'msgvault build-cache' to rebuild the analytics cache.",
)
fmt.Println(
"Note: attachment files on disk were not removed." +
" Use 'msgvault gc' (when available) to reclaim space.",
)

return nil
}
Expand Down
56 changes: 56 additions & 0 deletions internal/store/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -1220,6 +1220,62 @@ func (s *Store) UpsertMessageRawWithFormat(messageID int64, rawData []byte, form
return err
}

// AttachmentPathsUniqueToSource returns storage_path values for attachments
// belonging to sourceID whose content_hash is not shared with any other source.
// Call this before RemoveSource so the cascade hasn't run yet.
//
// Note: thumbnail_path values are not included. No sync/import code currently
// writes thumbnail files to disk, so there are no thumbnail files to clean up.
// If thumbnail storage is added in the future, this function and the delete
// loop in remove_account.go must be extended to cover thumbnail_path as well.
func (s *Store) AttachmentPathsUniqueToSource(sourceID int64) ([]string, error) {
rows, err := s.db.Query(`
SELECT DISTINCT a.storage_path
FROM attachments a
JOIN messages m ON m.id = a.message_id
WHERE m.source_id = ?
AND a.content_hash IS NOT NULL
AND a.storage_path IS NOT NULL
AND a.storage_path != ''
AND NOT EXISTS (
SELECT 1 FROM attachments a2
JOIN messages m2 ON m2.id = a2.message_id
WHERE a2.content_hash = a.content_hash
AND m2.source_id != ?
)
`, sourceID, sourceID)
if err != nil {
return nil, err
}
defer rows.Close()

var paths []string
for rows.Next() {
var p string
if err := rows.Scan(&p); err != nil {
return nil, err
}
paths = append(paths, p)
}
return paths, rows.Err()
}

// IsAttachmentPathReferenced returns true if any attachment record still
// points to the given storage_path. Use this immediately before deleting a
// file to guard against a concurrent sync that added a new reference after
// the candidate list was collected.
func (s *Store) IsAttachmentPathReferenced(storagePath string) (bool, error) {
var count int
err := s.db.QueryRow(
`SELECT COUNT(*) FROM attachments WHERE storage_path = ?`,
storagePath,
).Scan(&count)
if err != nil {
return true, err // fail safe: treat error as referenced
}
return count > 0, nil
}

// UpsertAttachment stores an attachment record.
func (s *Store) UpsertAttachment(messageID int64, filename, mimeType, storagePath, contentHash string, size int) error {
// Check if attachment already exists (by message_id and content_hash)
Expand Down
1 change: 1 addition & 0 deletions internal/store/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ CREATE INDEX IF NOT EXISTS idx_reactions_message ON reactions(message_id);
-- Attachments
CREATE INDEX IF NOT EXISTS idx_attachments_message ON attachments(message_id);
CREATE INDEX IF NOT EXISTS idx_attachments_hash ON attachments(content_hash);
CREATE INDEX IF NOT EXISTS idx_attachments_storage_path ON attachments(storage_path);

-- Labels
CREATE INDEX IF NOT EXISTS idx_labels_source ON labels(source_id);
Expand Down
14 changes: 14 additions & 0 deletions internal/store/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,20 @@ func (s *Store) GetActiveSync(sourceID int64) (*SyncRun, error) {
return run, err
}

// HasAnyActiveSync returns true if any source currently has a running sync.
// Use this as a safety gate before performing destructive file operations that
// could race with concurrent attachment ingestion.
func (s *Store) HasAnyActiveSync() (bool, error) {
var count int
err := s.db.QueryRow(
`SELECT COUNT(*) FROM sync_runs WHERE status = 'running'`,
).Scan(&count)
if err != nil {
return true, err // fail safe
}
return count > 0, nil
}

// GetLastSuccessfulSync returns the most recent successful sync for a source.
func (s *Store) GetLastSuccessfulSync(sourceID int64) (*SyncRun, error) {
row := s.db.QueryRow(`
Expand Down