Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
enable-ct: true
enable-ex-doc: true
enable-hank: true
enable-sheldon: true
enable-sheldon: false
enable-audit: false
enable-coverage: true
enable-sbom: true
Expand All @@ -26,6 +26,6 @@ jobs:
enable-summary: true
enable-mutate: false
enable-elp-lint: true
enable-elp-eqwalize: true
enable-elp-eqwalize: false

extra-services-compose: 'docker-compose.ci.yml'
165 changes: 156 additions & 9 deletions src/kura_migrator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ across multiple nodes.
migrate/1,
rollback/1, rollback/2,
status/1,
ensure_database/1,
ensure_schema_migrations/1,
compile_operation/1,
check_unsafe_operations/2
Expand All @@ -41,7 +42,8 @@ across multiple nodes.
to_module_list/1,
parse_version/1,
exec_operations/3,
check_alter_ops/3
check_alter_ops/3,
topo_sort_ops/1
]).
-endif.

Expand All @@ -51,6 +53,7 @@ across multiple nodes.
-doc "Run all pending migrations in order.".
-spec migrate(module()) -> {ok, [integer()]} | {error, term()}.
migrate(RepoMod) ->
ensure_database(RepoMod),
ensure_schema_migrations(RepoMod),
with_migration_lock(RepoMod, fun(PoolOpts) ->
Applied = get_applied_versions(RepoMod),
Expand Down Expand Up @@ -93,6 +96,54 @@ tag_status(V, M, Applied) ->
false -> {V, M, pending}
end.

%%----------------------------------------------------------------------
%% Database creation
%%----------------------------------------------------------------------

-spec ensure_database(module()) -> ok.
ensure_database(RepoMod) ->
Config = kura_repo:config(RepoMod),
case maps:find(database, Config) of
error -> ok;
{ok, DbName} -> do_ensure_database(Config, binary_to_list(DbName))
end.

do_ensure_database(Config, Database) ->
Host = binary_to_list(maps:get(hostname, Config, ~"localhost")),
Port = maps:get(port, Config, 5432),
User = binary_to_list(maps:get(username, Config, ~"postgres")),
Password = binary_to_list(maps:get(password, Config, <<>>)),
TmpPool = kura_migrator_tmp_pool,
TmpConfig = #{
host => Host,
port => Port,
database => "postgres",
user => User,
password => Password,
pool_size => 1,
decode_opts => [return_rows_as_maps, column_name_as_atom]
},
case pgo_sup:start_child(TmpPool, TmpConfig) of
{ok, _} -> ok;
{error, {already_started, _}} -> ok
end,
PoolOpts = #{pool => TmpPool},
DbBin = list_to_binary(Database),
case
pgo:query(
~"SELECT 1 FROM pg_database WHERE datname = $1", [DbBin], PoolOpts
)
of
#{rows := []} ->
QuotedDb = iolist_to_binary([<<"\"">>, DbBin, <<"\"">>]),
SQL = iolist_to_binary([~"CREATE DATABASE ", QuotedDb]),
_ = pgo:query(SQL, [], PoolOpts),
logger:info("Kura: created database ~s", [Database]),
ok;
#{rows := [_ | _]} ->
ok
end.

%%----------------------------------------------------------------------
%% Schema migrations table
%%----------------------------------------------------------------------
Expand Down Expand Up @@ -162,11 +213,21 @@ with_migration_lock(RepoMod, Fun) ->
)
)
catch
error:{migration_failed, Version, MigReason}:_ ->
logger:error("Kura: migration ~p failed: ~p", [Version, MigReason]),
error:{migration_failed, Version, MigReason}:Stack ->
logger:error(#{
msg => ~"migration_failed",
version => Version,
reason => MigReason,
stacktrace => Stack
}),
{error, {migration_failed, Version, MigReason}};
_:ExReason:_ ->
logger:error("Kura: migration failed: ~p", [ExReason]),
Class:ExReason:Stack ->
logger:error(#{
msg => ~"migration_failed",
class => Class,
reason => ExReason,
stacktrace => Stack
}),
{error, ExReason}
end.

Expand Down Expand Up @@ -203,12 +264,16 @@ run_migrations([{Version, Module} | Rest], Dir, PoolOpts, Acc) ->
run_migrations(Rest, Dir, PoolOpts, [Version | Acc]).

-spec exec_operations([kura_migration:operation()], integer(), map()) -> ok.
exec_operations([], _Version, _PoolOpts) ->
exec_operations(Ops, Version, PoolOpts) ->
Sorted = topo_sort_ops(Ops),
exec_operations_seq(Sorted, Version, PoolOpts).

exec_operations_seq([], _Version, _PoolOpts) ->
ok;
exec_operations([Op | Rest], Version, PoolOpts) ->
exec_operations_seq([Op | Rest], Version, PoolOpts) ->
SQL = compile_operation(Op),
exec(SQL, [], Version, PoolOpts),
exec_operations(Rest, Version, PoolOpts).
exec_operations_seq(Rest, Version, PoolOpts).

-spec exec(binary(), list(), integer(), map()) -> ok.
exec(SQL, Params, Version, PoolOpts) ->
Expand Down Expand Up @@ -567,7 +632,11 @@ format_default(Val) when is_binary(Val) ->
format_default(true) ->
~"TRUE";
format_default(false) ->
~"FALSE".
~"FALSE";
format_default(Val) when is_map(Val) ->
<<"'", (json:encode(Val))/binary, "'::jsonb">>;
format_default(Val) when is_list(Val) ->
<<"'", (json:encode(Val))/binary, "'::jsonb">>.

%%----------------------------------------------------------------------
%% Internal: type narrowing helpers for eqWAlizer
Expand Down Expand Up @@ -669,3 +738,81 @@ check_alter_ops([], _Table, _SafeEntries) ->
[];
check_alter_ops([AltOp | Rest], Table, SafeEntries) ->
check_alter_op(AltOp, Table, SafeEntries) ++ check_alter_ops(Rest, Table, SafeEntries).

%%----------------------------------------------------------------------
%% Topological sort for create_table operations
%%----------------------------------------------------------------------

-spec topo_sort_ops([kura_migration:operation()]) -> [kura_migration:operation()].
topo_sort_ops(Ops) ->
Creates = [Op || Op <- Ops, is_create_table(Op)],
case Creates of
[] ->
Ops;
_ ->
Sorted = topo_sort_creates(Creates),
replace_creates(Ops, Sorted)
end.

is_create_table({create_table, _, _}) -> true;
is_create_table({create_table, _, _, _}) -> true;
is_create_table(_) -> false.

table_name({create_table, Name, _}) -> Name;
table_name({create_table, Name, _, _}) -> Name.

table_columns({create_table, _, Cols}) -> Cols;
table_columns({create_table, _, Cols, _}) -> Cols.

table_deps(Op) ->
Name = table_name(Op),
Cols = table_columns(Op),
lists:usort([T || #kura_column{references = {T, _}} <- Cols, T =/= Name]).

topo_sort_creates(Creates) ->
ByName = maps:from_list([{table_name(C), C} || C <- Creates]),
DepMap = maps:from_list([{table_name(C), table_deps(C)} || C <- Creates]),
AllNames = maps:keys(ByName),
InDeg = lists:foldl(
fun(Name, Acc) ->
Deps = maps:get(Name, DepMap),
Count = length([D || D <- Deps, maps:is_key(D, ByName)]),
Acc#{Name => Count}
end,
#{},
AllNames
),
Queue = [N || N <- AllNames, maps:get(N, InDeg) =:= 0],
kahn_loop(Queue, InDeg, DepMap, ByName, []).

kahn_loop([], _InDeg, _DepMap, _ByName, Acc) ->
lists:reverse(Acc);
kahn_loop([Name | Rest], InDeg, DepMap, ByName, Acc) ->
Op = maps:get(Name, ByName),
Dependents = [
N
|| N <- maps:keys(ByName),
lists:member(Name, maps:get(N, DepMap, []))
],
{NewQueue, NewInDeg} = lists:foldl(
fun(Dep, {Q, ID}) ->
NewDeg = maps:get(Dep, ID) - 1,
ID1 = ID#{Dep => NewDeg},
case NewDeg of
0 -> {Q ++ [Dep], ID1};
_ -> {Q, ID1}
end
end,
{Rest, InDeg},
Dependents
),
kahn_loop(NewQueue, NewInDeg, DepMap, ByName, [Op | Acc]).

replace_creates([], _Sorted) ->
[];
replace_creates([{create_table, _, _} | Rest], [Next | Sorted]) ->
[Next | replace_creates(Rest, Sorted)];
replace_creates([{create_table, _, _, _} | Rest], [Next | Sorted]) ->
[Next | replace_creates(Rest, Sorted)];
replace_creates([Op | Rest], Sorted) ->
[Op | replace_creates(Rest, Sorted)].
26 changes: 24 additions & 2 deletions src/kura_repo_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,8 @@ insert(RepoMod, CS = #kura_changeset{schema = SchemaMod}) ->
insert(_RepoMod, CS = #kura_changeset{valid = false}, _Opts) ->
{error, CS#kura_changeset{action = insert}};
insert(RepoMod, CS = #kura_changeset{schema = SchemaMod, changes = Changes}, Opts) ->
Changes1 = maybe_add_timestamps(SchemaMod, Changes, insert),
Changes0 = maybe_generate_pk(SchemaMod, Changes),
Changes1 = maybe_add_timestamps(SchemaMod, Changes0, insert),
DumpedChanges = dump_changes(SchemaMod, Changes1),
Fields = maps:keys(DumpedChanges),
{SQL, Params} = kura_query_compiler:insert(SchemaMod, Fields, DumpedChanges, Opts),
Expand Down Expand Up @@ -501,7 +502,8 @@ insert_record(RepoMod, CS0 = #kura_changeset{schema = SchemaMod}) ->
{error, ErrCS#kura_changeset{action = insert}};
{ok, CS} ->
Changes = maybe_apply_tenant_changes(CS#kura_changeset.changes),
Changes1 = maybe_add_timestamps(SchemaMod, Changes, insert),
Changes0 = maybe_generate_pk(SchemaMod, Changes),
Changes1 = maybe_add_timestamps(SchemaMod, Changes0, insert),
DumpedChanges = dump_changes(SchemaMod, Changes1),
Fields = maps:keys(DumpedChanges),
{SQL, Params} = kura_query_compiler:insert(SchemaMod, Fields, DumpedChanges),
Expand Down Expand Up @@ -818,6 +820,26 @@ maybe_add_timestamps(SchemaMod, Changes, Action) ->
false -> Changes1
end.

maybe_generate_pk(SchemaMod, Changes) ->
PK = kura_schema:primary_key(SchemaMod),
case maps:is_key(PK, Changes) of
true ->
Changes;
false ->
Types = kura_schema:field_types(SchemaMod),
case maps:get(PK, Types, undefined) of
uuid -> Changes#{PK => generate_uuid_v4()};
_ -> Changes
end
end.

generate_uuid_v4() ->
<<A:48, _:4, B:12, _:2, C:62>> = crypto:strong_rand_bytes(16),
Bytes = <<A:48, 4:4, B:12, 2:2, C:62>>,
Hex = binary:encode_hex(Bytes, lowercase),
<<P1:8/binary, P2:4/binary, P3:4/binary, P4:4/binary, P5:12/binary>> = Hex,
<<P1/binary, "-", P2/binary, "-", P3/binary, "-", P4/binary, "-", P5/binary>>.

handle_pg_error(CS, {pgsql_error, Fields}) when is_map(Fields) ->
handle_pg_error(CS, Fields);
handle_pg_error(CS, {pgsql_error, Fields}) when is_list(Fields) ->
Expand Down
Loading
Loading