Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type dump struct {
filtered []byte
cleanupPart []byte
indicesAndConstraints []byte
views []byte
sequences []string
roles map[string]role
eventTriggers []byte
Expand Down Expand Up @@ -240,9 +241,15 @@ func (s *SnapshotGenerator) CreateSnapshot(ctx context.Context, ss *snapshot.Sna

s.logger.Info("restoring schema indices and constraints", loglib.Fields{"schemaTables": ss.SchemaTables})
if s.snapshotTracker != nil {
return s.restoreIndicesWithTracking(ctx, dump.indicesAndConstraints)
if err := s.restoreIndicesWithTracking(ctx, dump.indicesAndConstraints); err != nil {
return err
}
} else if err := s.restoreDump(ctx, dump.indicesAndConstraints); err != nil {
return err
}
return s.restoreDump(ctx, dump.indicesAndConstraints)

s.logger.Info("restoring views")
return s.restoreDump(ctx, dump.views)
}

func (s *SnapshotGenerator) Close() error {
Expand Down Expand Up @@ -290,6 +297,7 @@ func (s *SnapshotGenerator) dumpSchema(ctx context.Context, schemaTables map[str

s.dumpToFile(s.getDumpFileName("-filtered"), pgdumpOpts, parsedDump.filtered)
s.dumpToFile(s.getDumpFileName("-indices-constraints"), pgdumpOpts, parsedDump.indicesAndConstraints)
s.dumpToFile(s.getDumpFileName("-views"), pgdumpOpts, parsedDump.views)

// only if clean is enabled, produce the clean up part of the dump
if s.optionGenerator.cleanTargetDB {
Expand Down Expand Up @@ -406,10 +414,12 @@ func (s *SnapshotGenerator) parseDump(d []byte) *dump {
indicesAndConstraints := strings.Builder{}
filteredDump := strings.Builder{}
eventTriggersDump := strings.Builder{}
viewsDump := strings.Builder{}
sequenceNames := []string{}
dumpRoles := make(map[string]role)
alterTable := ""
createEventTrigger := ""
createView := ""
for scanner.Scan() {
line := scanner.Text()
switch {
Expand Down Expand Up @@ -446,11 +456,27 @@ func (s *SnapshotGenerator) parseDump(d []byte) *dump {
eventTriggersDump.WriteString(line)
eventTriggersDump.WriteString("\n")

case strings.HasPrefix(line, "CREATE VIEW"),
strings.HasPrefix(line, "CREATE MATERIALIZED VIEW"):
createView = line
fallthrough
case createView != "":
if strings.HasSuffix(line, ";") {
viewsDump.WriteString(line)
viewsDump.WriteString("\n\n")
createView = ""
continue
}
viewsDump.WriteString(line)
viewsDump.WriteString("\n")

case strings.Contains(line, `\connect`):
indicesAndConstraints.WriteString(line)
indicesAndConstraints.WriteString("\n\n")
filteredDump.WriteString(line)
filteredDump.WriteString("\n")
viewsDump.WriteString(line)
viewsDump.WriteString("\n\n")
case strings.HasPrefix(line, "CREATE INDEX"),
strings.HasPrefix(line, "CREATE UNIQUE INDEX"),
strings.HasPrefix(line, "CREATE CONSTRAINT"),
Expand Down Expand Up @@ -515,6 +541,7 @@ func (s *SnapshotGenerator) parseDump(d []byte) *dump {
full: d,
filtered: []byte(filteredDump.String()),
indicesAndConstraints: []byte(indicesAndConstraints.String()),
views: []byte(viewsDump.String()),
sequences: sequenceNames,
roles: dumpRoles,
eventTriggers: []byte(eventTriggersDump.String()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ func TestSnapshotGenerator_CreateSnapshot(t *testing.T) {
t.Parallel()

schemaDump := []byte("schema dump\nCREATE SEQUENCE test.test_sequence\nALTER TABLE public.test_table OWNER TO test_role;\nGRANT ALL ON TABLE public.test_table TO test_role2;\nCREATE INDEX a;\n")
schemaDumpWithViews := []byte("schema dump\nCREATE SEQUENCE test.test_sequence\nALTER TABLE public.test_table OWNER TO test_role;\nGRANT ALL ON TABLE public.test_table TO test_role2;\nCREATE VIEW public.test_view AS\n SELECT 1;\nCREATE INDEX a;\n")
schemaDumpNoSequences := []byte("schema dump\nALTER TABLE public.test_table OWNER TO test_role;\nGRANT ALL ON TABLE public.test_table TO test_role2;\nCREATE INDEX a;\n")
filteredDumpNoSequences := []byte("schema dump\nGRANT ALL ON SCHEMA \"public\" TO \"test_role\";\nALTER TABLE public.test_table OWNER TO test_role;\nGRANT ALL ON TABLE public.test_table TO test_role2;\n")
filteredDump := []byte("schema dump\nCREATE SEQUENCE test.test_sequence\nGRANT ALL ON SCHEMA \"public\" TO \"test_role\";\nALTER TABLE public.test_table OWNER TO test_role;\nGRANT ALL ON TABLE public.test_table TO test_role2;\n")
sequenceDump := []byte("sequence dump\n")
indexDump := []byte("CREATE INDEX a;\n\n")
testViewsDump := []byte("CREATE VIEW public.test_view AS\n SELECT 1;\n\n")
rolesDumpOriginal := []byte("roles dump\nCREATE ROLE postgres\nCREATE ROLE test_role\nCREATE ROLE test_role2\nALTER ROLE test_role3 INHERIT FROM test_role;\n")
rolesDumpFiltered := []byte("roles dump\nCREATE ROLE test_role\nCREATE ROLE test_role2\nGRANT \"test_role\" TO CURRENT_USER;\n")
cleanupDump := []byte("cleanup dump\n")
Expand Down Expand Up @@ -281,6 +283,54 @@ func TestSnapshotGenerator_CreateSnapshot(t *testing.T) {

wantErr: nil,
},
{
name: "ok - with views",
snapshot: &snapshot.Snapshot{
SchemaTables: map[string][]string{
testSchema: {testTable},
},
},
conn: validQuerier(),
pgdumpFn: newMockPgdump(func(_ context.Context, i uint, po pglib.PGDumpOptions) ([]byte, error) {
switch i {
case 1:
return schemaDumpWithViews, nil
case 2:
return sequenceDump, nil
default:
return nil, fmt.Errorf("unexpected call to pgdumpFn: %d", i)
}
}),
pgdumpallFn: newMockPgdumpall(func(_ context.Context, i uint, po pglib.PGDumpAllOptions) ([]byte, error) {
switch i {
case 1:
return rolesDumpOriginal, nil
default:
return nil, fmt.Errorf("unexpected call to pgdumpallFn: %d", i)
}
}),
pgrestoreFn: newMockPgrestore(func(_ context.Context, i uint, po pglib.PGRestoreOptions, dump []byte) (string, error) {
switch i {
case 1:
require.Equal(t, string(schemaCreateDump), string(dump))
case 2:
require.Equal(t, string(rolesDumpFiltered), string(dump))
case 3:
require.Equal(t, string(filteredDump), string(dump))
case 4:
require.Equal(t, string(sequenceDump), string(dump))
case 5:
require.Equal(t, string(indexDump), string(dump))
case 6:
require.Equal(t, string(testViewsDump), string(dump))
default:
return "", fmt.Errorf("unexpected call to pgrestoreFn: %d", i)
}
return "", nil
}),

wantErr: nil,
},
{
name: "ok - no sequence dump",
snapshot: &snapshot.Snapshot{
Expand Down Expand Up @@ -1237,6 +1287,9 @@ func TestSnapshotGenerator_parseDump(t *testing.T) {
wantEventTriggersBytes, err := os.ReadFile("test/test_dump_event_triggers.sql")
require.NoError(t, err)

wantViewsBytes, err := os.ReadFile("test/test_dump_views.sql")
require.NoError(t, err)

sg := &SnapshotGenerator{
excludedSecurityLabels: []string{"anon"},
}
Expand All @@ -1248,12 +1301,15 @@ func TestSnapshotGenerator_parseDump(t *testing.T) {
wantConstraintsStr := strings.Trim(string(wantConstraintsBytes), "\n")
eventTriggersStr := strings.Trim(string(dump.eventTriggers), "\n")
wantEventTriggersStr := strings.Trim(string(wantEventTriggersBytes), "\n")
viewsStr := strings.Trim(string(dump.views), "\n")
wantViewsStr := strings.Trim(string(wantViewsBytes), "\n")
wantSequences := []string{`"musicbrainz"."alternative_medium_id_seq"`, `"musicbrainz"."Alternative_medium_id_seq"`}

require.Equal(t, wantFilteredStr, filteredStr)
require.Equal(t, wantConstraintsStr, constraintsStr)
require.Equal(t, wantSequences, dump.sequences)
require.Equal(t, wantEventTriggersStr, eventTriggersStr)
require.Equal(t, wantViewsStr, viewsStr)
}

func TestGetDumpsDiff(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,37 @@ ALTER TABLE ONLY musicbrainz.alternative_medium ALTER COLUMN id SET DEFAULT next
ALTER TABLE ONLY musicbrainz.alternative_release ALTER COLUMN id SET DEFAULT nextval('musicbrainz.alternative_release_id_seq'::regclass);


--
-- Name: edit_summary; Type: VIEW; Schema: musicbrainz; Owner: postgres
--

CREATE VIEW musicbrainz.edit_summary AS
SELECT e.id,
e.editor,
e.type,
e.status
FROM musicbrainz.edit e
GROUP BY e.id;


ALTER VIEW musicbrainz.edit_summary OWNER TO postgres;

--
-- Name: medium_summary; Type: MATERIALIZED VIEW; Schema: musicbrainz; Owner: postgres
--

CREATE MATERIALIZED VIEW musicbrainz.medium_summary AS
SELECT am.id,
am.medium,
am.alternative_release,
am.name
FROM musicbrainz.alternative_medium am
GROUP BY am.id
WITH NO DATA;


ALTER MATERIALIZED VIEW musicbrainz.medium_summary OWNER TO postgres;

--
-- Name: alternative_medium alternative_medium_pkey; Type: CONSTRAINT; Schema: musicbrainz; Owner: postgres
--
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,20 @@ ALTER TABLE ONLY musicbrainz.alternative_medium ALTER COLUMN id SET DEFAULT next
ALTER TABLE ONLY musicbrainz.alternative_release ALTER COLUMN id SET DEFAULT nextval('musicbrainz.alternative_release_id_seq'::regclass);


--
-- Name: edit_summary; Type: VIEW; Schema: musicbrainz; Owner: postgres
--




--
-- Name: medium_summary; Type: MATERIALIZED VIEW; Schema: musicbrainz; Owner: postgres
--




--
-- Name: alternative_medium alternative_medium_pkey; Type: CONSTRAINT; Schema: musicbrainz; Owner: postgres
--
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
\connect test

CREATE VIEW musicbrainz.edit_summary AS
SELECT e.id,
e.editor,
e.type,
e.status
FROM musicbrainz.edit e
GROUP BY e.id;

CREATE MATERIALIZED VIEW musicbrainz.medium_summary AS
SELECT am.id,
am.medium,
am.alternative_release,
am.name
FROM musicbrainz.alternative_medium am
GROUP BY am.id
WITH NO DATA;
37 changes: 37 additions & 0 deletions pkg/stream/integration/pg_pg_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ type testTableColumn struct {
username string
}

type idNameRow struct {
id int
name string
}

func Test_PostgresToPostgres(t *testing.T) {
if os.Getenv("PGSTREAM_INTEGRATION_TESTS") == "" {
t.Skip("skipping integration test...")
Expand Down Expand Up @@ -807,6 +812,38 @@ func getTestTableColumns(t *testing.T, ctx context.Context, conn pglib.Querier,
return columns
}

func getIDRows(t *testing.T, ctx context.Context, conn pglib.Querier, query string) []int {
rows, err := conn.Query(ctx, query)
require.NoError(t, err)
defer rows.Close()

var ids []int
for rows.Next() {
var id int
err := rows.Scan(&id)
require.NoError(t, err)
ids = append(ids, id)
}
require.NoError(t, rows.Err())
return ids
}

func getIDNameRows(t *testing.T, ctx context.Context, conn pglib.Querier, query string) []idNameRow {
rows, err := conn.Query(ctx, query)
require.NoError(t, err)
defer rows.Close()

var result []idNameRow
for rows.Next() {
var r idNameRow
err := rows.Scan(&r.id, &r.name)
require.NoError(t, err)
result = append(result, r)
}
require.NoError(t, rows.Err())
return result
}

func getRoles(t *testing.T, ctx context.Context, conn pglib.Querier) []string {
rows, err := conn.Query(ctx, "select rolname from pg_roles where rolname not like 'pg_%' and rolname <> 'postgres'")
require.NoError(t, err)
Expand Down
Loading
Loading