diff --git a/lib/phoenix/sync/electric.ex b/lib/phoenix/sync/electric.ex index 87a2b06..5fe1710 100644 --- a/lib/phoenix/sync/electric.ex +++ b/lib/phoenix/sync/electric.ex @@ -590,6 +590,178 @@ defmodule Phoenix.Sync.Electric do end end + @subset_body_keys ~w(where order_by limit offset params where_expr order_by_expr) + # In flattened POST params, root offset may be the stream cursor, so treat offset + # as subset-only only when we can reliably separate query/body params. + @subset_body_keys_without_offset @subset_body_keys -- ["offset"] + + @doc false + def normalize_subset_params(params, method \\ "GET") + + def normalize_subset_params(%Plug.Conn{} = conn, params) when is_map(params) do + case conn.method do + "POST" -> + query_params = map_or_empty(conn.query_params) |> normalize_prefixed_subset_params() + body_params = map_or_empty(conn.body_params) + path_params = map_or_empty(conn.path_params) + + # Match sync-service behavior by nesting subset body keys separately + # from query params, preserving stream offset/handle from query/path. + if map_size(query_params) == 0 and map_size(body_params) == 0 do + normalize_subset_params(params, conn.method) + else + query_params + |> merge_subset_body_params(body_params) + |> Map.merge(path_params) + end + + _ -> + normalize_subset_params(params, conn.method) + end + end + + def normalize_subset_params(params, method) when is_map(params) do + params + |> normalize_prefixed_subset_params() + |> maybe_normalize_subset_body_params(method) + end + + defp normalize_prefixed_subset_params(params) when is_map(params) do + {subset_prefixed_params, rest} = + Enum.reduce(params, {%{}, %{}}, fn {key, value}, {subset_acc, rest_acc} -> + case subset_param_name(key) do + {:ok, subset_key} -> + {Map.put(subset_acc, subset_key, value), rest_acc} + + :error -> + {subset_acc, Map.put(rest_acc, key, value)} + end + end) + + if map_size(subset_prefixed_params) == 0 do + params + else + existing_subset = existing_subset(rest) + + rest + |> Map.drop(["subset", :subset]) + |> Map.put("subset", Map.merge(existing_subset, subset_prefixed_params)) + end + end + + # POST subset snapshots send params in the body as plain subset keys + # (where/params/limit/offset/order_by), which need nesting under "subset". + defp maybe_normalize_subset_body_params(params, method) + when method in ["POST", :post] and is_map(params) do + {subset_body_params, rest} = + Enum.reduce(params, {%{}, %{}}, fn {key, value}, {subset_acc, rest_acc} -> + case subset_body_param_name(key, preserve_stream_offset?: true) do + {:ok, subset_key} -> + {Map.put(subset_acc, subset_key, value), rest_acc} + + :error -> + {subset_acc, Map.put(rest_acc, key, value)} + end + end) + + if map_size(subset_body_params) == 0 do + params + else + existing_subset = existing_subset(rest) + + rest + |> Map.drop(["subset", :subset]) + |> Map.put("subset", Map.merge(existing_subset, subset_body_params)) + end + end + + defp maybe_normalize_subset_body_params(params, _method), do: params + + defp subset_param_name(key) when is_binary(key) do + if String.starts_with?(key, "subset__") do + {:ok, String.replace_prefix(key, "subset__", "")} + else + :error + end + end + + defp subset_param_name(key) when is_atom(key) do + key + |> Atom.to_string() + |> subset_param_name() + end + + defp subset_param_name(_), do: :error + + defp subset_body_param_name(key, opts \\ []) + + defp subset_body_param_name(key, opts) when is_binary(key) do + keys = + if opts[:preserve_stream_offset?], + do: @subset_body_keys_without_offset, + else: @subset_body_keys + + if key in keys, do: {:ok, key}, else: :error + end + + defp subset_body_param_name(key, opts) when is_atom(key) do + key + |> Atom.to_string() + |> subset_body_param_name(opts) + end + + defp subset_body_param_name(_, _opts), do: :error + + defp existing_subset(params) when is_map(params) do + params + |> Map.take(["subset", :subset]) + |> Map.values() + |> Enum.filter(&is_map/1) + |> Enum.reduce(%{}, &Map.merge(&2, &1)) + end + + defp merge_subset_body_params(query_params, body_params) when map_size(body_params) == 0 do + query_params + end + + defp merge_subset_body_params(query_params, body_params) when is_map(body_params) do + case existing_subset(body_params) do + subset when map_size(subset) > 0 -> + existing_subset = existing_subset(query_params) + + query_params + |> Map.merge(body_params) + |> Map.drop([:subset, "subset"]) + |> Map.put("subset", Map.merge(existing_subset, subset)) + + _ -> + {subset_params, other_params} = + Enum.reduce(body_params, {%{}, %{}}, fn {key, value}, {subset_acc, rest_acc} -> + case subset_body_param_name(key) do + {:ok, subset_key} -> + {Map.put(subset_acc, subset_key, value), rest_acc} + + :error -> + {subset_acc, Map.put(rest_acc, key, value)} + end + end) + + if map_size(subset_params) > 0 do + existing_subset = existing_subset(query_params) + + query_params + |> Map.merge(other_params) + |> Map.put("subset", Map.merge(existing_subset, subset_params)) + else + Map.merge(query_params, body_params) + end + end + end + + defp map_or_empty(%Plug.Conn.Unfetched{}), do: %{} + defp map_or_empty(map) when is_map(map), do: map + defp map_or_empty(_), do: %{} + @json Phoenix.Sync.json_library() @doc false @@ -654,12 +826,14 @@ if Code.ensure_loaded?(Electric.Shapes.Api) do end def call(api, %{method: "GET"} = conn, params) do + params = Phoenix.Sync.Electric.normalize_subset_params(conn, params) + case Shapes.Api.validate(api, params) do {:ok, request} -> conn |> content_type() |> Plug.Conn.assign(:request, request) - |> Shapes.Api.serve_shape_log(request) + |> Shapes.Api.serve_shape_response(request) {:error, response} -> conn @@ -689,12 +863,15 @@ if Code.ensure_loaded?(Electric.Shapes.Api) do Shapes.Api.options(conn) end - def response(api, _conn, params) do + def response(api, conn, params) do + params = Phoenix.Sync.Electric.normalize_subset_params(conn, params) + case Shapes.Api.validate(api, params) do {:ok, request} -> { request, - Shapes.Api.serve_shape_log(request) |> Phoenix.Sync.Electric.consume_response_stream() + Shapes.Api.serve_shape_response(request) + |> Phoenix.Sync.Electric.consume_response_stream() } {:error, response} -> diff --git a/lib/phoenix/sync/electric/api_adapter.ex b/lib/phoenix/sync/electric/api_adapter.ex index 84671fd..4906312 100644 --- a/lib/phoenix/sync/electric/api_adapter.ex +++ b/lib/phoenix/sync/electric/api_adapter.ex @@ -23,10 +23,12 @@ if Code.ensure_loaded?(Electric.Shapes.Api) do end def call(%ApiAdapter{api: api, shape: shape}, %{method: "GET"} = conn, params) do + params = Phoenix.Sync.Electric.normalize_subset_params(conn, params) + if transform_fun = PredefinedShape.transform_fun(shape) do case Shapes.Api.validate(api, params) do {:ok, request} -> - response = Shapes.Api.serve_shape_log(request) + response = Shapes.Api.serve_shape_response(request) response = Map.update!(response, :body, &apply_transform(&1, transform_fun)) conn @@ -47,15 +49,18 @@ if Code.ensure_loaded?(Electric.Shapes.Api) do end def call(%ApiAdapter{api: api}, conn, params) do + params = Phoenix.Sync.Electric.normalize_subset_params(conn, params) Phoenix.Sync.Adapter.PlugApi.call(api, conn, params) end # only works if method is GET... def response(%ApiAdapter{api: api, shape: shape}, %{method: "GET"} = conn, params) do + params = Phoenix.Sync.Electric.normalize_subset_params(conn, params) + if transform_fun = PredefinedShape.transform_fun(shape) do case Shapes.Api.validate(api, params) do {:ok, request} -> - response = Shapes.Api.serve_shape_log(request) + response = Shapes.Api.serve_shape_response(request) response = Map.update!(response, :body, &apply_transform(&1, transform_fun)) {request, response} diff --git a/lib/phoenix/sync/electric/client_adapter.ex b/lib/phoenix/sync/electric/client_adapter.ex index 14545e7..dfa270c 100644 --- a/lib/phoenix/sync/electric/client_adapter.ex +++ b/lib/phoenix/sync/electric/client_adapter.ex @@ -8,6 +8,24 @@ defmodule Phoenix.Sync.Electric.ClientAdapter do alias Phoenix.Sync.PredefinedShape + # Predefined-shape routes must not allow overriding server-defined shape + # attributes or stream-position keys (which are passed via dedicated fields). + @blocked_passthrough_keys ~w( + table + where + columns + params + replica + where_expr + order_by_expr + order_by + limit + offset + handle + live + cursor + ) + def predefined_shape(sync_client, %PredefinedShape{} = predefined_shape) do shape_client = PredefinedShape.client(sync_client.client, predefined_shape) @@ -37,8 +55,8 @@ defmodule Phoenix.Sync.Electric.ClientAdapter do end # this is the server-defined shape route, so we want to only pass on the - # per-request/stream position params leaving the shape-definition params - # from the configured client. + # per-request/stream position params and protocol-level params, leaving + # the shape-definition params from the configured client. defp request(%{shape_definition: %PredefinedShape{} = shape} = sync_client, _conn, params) do { Client.request( @@ -47,7 +65,8 @@ defmodule Phoenix.Sync.Electric.ClientAdapter do offset: params["offset"], shape_handle: params["handle"], live: live?(params["live"]), - next_cursor: params["cursor"] + next_cursor: params["cursor"], + params: protocol_request_params(sync_client, params) ), shape } @@ -68,6 +87,21 @@ defmodule Phoenix.Sync.Electric.ClientAdapter do defp normalise_method(method), do: method |> String.downcase() |> String.to_atom() defp live?(live), do: live == "true" + defp protocol_request_params(%{client: %{params: client_params}}, params) do + client_param_keys = + client_params + |> stringify_keys() + |> Map.keys() + + params + |> stringify_keys() + |> Map.drop(@blocked_passthrough_keys ++ client_param_keys) + end + + defp stringify_keys(params) do + Map.new(params, fn {key, value} -> {to_string(key), value} end) + end + defp fetch_upstream(sync_client, conn, request, shape) do response = make_request(sync_client, conn, request, shape)