Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 180 additions & 3 deletions lib/phoenix/sync/electric.ex
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,178 @@ defmodule Phoenix.Sync.Electric do
end
end

@subset_body_keys ~w(where order_by limit offset params where_expr order_by_expr)
# In flattened POST params, root offset may be the stream cursor, so treat offset
# as subset-only only when we can reliably separate query/body params.
@subset_body_keys_without_offset @subset_body_keys -- ["offset"]

@doc false
def normalize_subset_params(params, method \\ "GET")

def normalize_subset_params(%Plug.Conn{} = conn, params) when is_map(params) do
case conn.method do
"POST" ->
query_params = map_or_empty(conn.query_params) |> normalize_prefixed_subset_params()
body_params = map_or_empty(conn.body_params)
path_params = map_or_empty(conn.path_params)

# Match sync-service behavior by nesting subset body keys separately
# from query params, preserving stream offset/handle from query/path.
if map_size(query_params) == 0 and map_size(body_params) == 0 do
normalize_subset_params(params, conn.method)
else
query_params
|> merge_subset_body_params(body_params)
|> Map.merge(path_params)
end

_ ->
normalize_subset_params(params, conn.method)
end
end

def normalize_subset_params(params, method) when is_map(params) do
params
|> normalize_prefixed_subset_params()
|> maybe_normalize_subset_body_params(method)
end

defp normalize_prefixed_subset_params(params) when is_map(params) do
{subset_prefixed_params, rest} =
Enum.reduce(params, {%{}, %{}}, fn {key, value}, {subset_acc, rest_acc} ->
case subset_param_name(key) do
{:ok, subset_key} ->
{Map.put(subset_acc, subset_key, value), rest_acc}

:error ->
{subset_acc, Map.put(rest_acc, key, value)}
end
end)

if map_size(subset_prefixed_params) == 0 do
params
else
existing_subset = existing_subset(rest)

rest
|> Map.drop(["subset", :subset])
|> Map.put("subset", Map.merge(existing_subset, subset_prefixed_params))
end
end

# POST subset snapshots send params in the body as plain subset keys
# (where/params/limit/offset/order_by), which need nesting under "subset".
defp maybe_normalize_subset_body_params(params, method)
when method in ["POST", :post] and is_map(params) do
{subset_body_params, rest} =
Enum.reduce(params, {%{}, %{}}, fn {key, value}, {subset_acc, rest_acc} ->
case subset_body_param_name(key, preserve_stream_offset?: true) do
{:ok, subset_key} ->
{Map.put(subset_acc, subset_key, value), rest_acc}

:error ->
{subset_acc, Map.put(rest_acc, key, value)}
end
end)

if map_size(subset_body_params) == 0 do
params
else
existing_subset = existing_subset(rest)

rest
|> Map.drop(["subset", :subset])
|> Map.put("subset", Map.merge(existing_subset, subset_body_params))
end
end

defp maybe_normalize_subset_body_params(params, _method), do: params

defp subset_param_name(key) when is_binary(key) do
if String.starts_with?(key, "subset__") do
{:ok, String.replace_prefix(key, "subset__", "")}
else
:error
end
end

defp subset_param_name(key) when is_atom(key) do
key
|> Atom.to_string()
|> subset_param_name()
end

defp subset_param_name(_), do: :error

defp subset_body_param_name(key, opts \\ [])

defp subset_body_param_name(key, opts) when is_binary(key) do
keys =
if opts[:preserve_stream_offset?],
do: @subset_body_keys_without_offset,
else: @subset_body_keys

if key in keys, do: {:ok, key}, else: :error
end

defp subset_body_param_name(key, opts) when is_atom(key) do
key
|> Atom.to_string()
|> subset_body_param_name(opts)
end

defp subset_body_param_name(_, _opts), do: :error

defp existing_subset(params) when is_map(params) do
params
|> Map.take(["subset", :subset])
|> Map.values()
|> Enum.filter(&is_map/1)
|> Enum.reduce(%{}, &Map.merge(&2, &1))
end

defp merge_subset_body_params(query_params, body_params) when map_size(body_params) == 0 do
query_params
end

defp merge_subset_body_params(query_params, body_params) when is_map(body_params) do
case existing_subset(body_params) do
subset when map_size(subset) > 0 ->
existing_subset = existing_subset(query_params)

query_params
|> Map.merge(body_params)
|> Map.drop([:subset, "subset"])
|> Map.put("subset", Map.merge(existing_subset, subset))

_ ->
{subset_params, other_params} =
Enum.reduce(body_params, {%{}, %{}}, fn {key, value}, {subset_acc, rest_acc} ->
case subset_body_param_name(key) do
{:ok, subset_key} ->
{Map.put(subset_acc, subset_key, value), rest_acc}

:error ->
{subset_acc, Map.put(rest_acc, key, value)}
end
end)

if map_size(subset_params) > 0 do
existing_subset = existing_subset(query_params)

query_params
|> Map.merge(other_params)
|> Map.put("subset", Map.merge(existing_subset, subset_params))
else
Map.merge(query_params, body_params)
end
end
end

defp map_or_empty(%Plug.Conn.Unfetched{}), do: %{}
defp map_or_empty(map) when is_map(map), do: map
defp map_or_empty(_), do: %{}

@json Phoenix.Sync.json_library()

@doc false
Expand Down Expand Up @@ -654,12 +826,14 @@ if Code.ensure_loaded?(Electric.Shapes.Api) do
end

def call(api, %{method: "GET"} = conn, params) do
params = Phoenix.Sync.Electric.normalize_subset_params(conn, params)

case Shapes.Api.validate(api, params) do
{:ok, request} ->
conn
|> content_type()
|> Plug.Conn.assign(:request, request)
|> Shapes.Api.serve_shape_log(request)
|> Shapes.Api.serve_shape_response(request)

{:error, response} ->
conn
Expand Down Expand Up @@ -689,12 +863,15 @@ if Code.ensure_loaded?(Electric.Shapes.Api) do
Shapes.Api.options(conn)
end

def response(api, _conn, params) do
def response(api, conn, params) do
params = Phoenix.Sync.Electric.normalize_subset_params(conn, params)

case Shapes.Api.validate(api, params) do
{:ok, request} ->
{
request,
Shapes.Api.serve_shape_log(request) |> Phoenix.Sync.Electric.consume_response_stream()
Shapes.Api.serve_shape_response(request)
|> Phoenix.Sync.Electric.consume_response_stream()
}

{:error, response} ->
Expand Down
9 changes: 7 additions & 2 deletions lib/phoenix/sync/electric/api_adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ if Code.ensure_loaded?(Electric.Shapes.Api) do
end

def call(%ApiAdapter{api: api, shape: shape}, %{method: "GET"} = conn, params) do
params = Phoenix.Sync.Electric.normalize_subset_params(conn, params)

if transform_fun = PredefinedShape.transform_fun(shape) do
case Shapes.Api.validate(api, params) do
{:ok, request} ->
response = Shapes.Api.serve_shape_log(request)
response = Shapes.Api.serve_shape_response(request)
response = Map.update!(response, :body, &apply_transform(&1, transform_fun))

conn
Expand All @@ -47,15 +49,18 @@ if Code.ensure_loaded?(Electric.Shapes.Api) do
end

def call(%ApiAdapter{api: api}, conn, params) do
params = Phoenix.Sync.Electric.normalize_subset_params(conn, params)
Phoenix.Sync.Adapter.PlugApi.call(api, conn, params)
end

# only works if method is GET...
def response(%ApiAdapter{api: api, shape: shape}, %{method: "GET"} = conn, params) do
params = Phoenix.Sync.Electric.normalize_subset_params(conn, params)

if transform_fun = PredefinedShape.transform_fun(shape) do
case Shapes.Api.validate(api, params) do
{:ok, request} ->
response = Shapes.Api.serve_shape_log(request)
response = Shapes.Api.serve_shape_response(request)
response = Map.update!(response, :body, &apply_transform(&1, transform_fun))
{request, response}

Expand Down
40 changes: 37 additions & 3 deletions lib/phoenix/sync/electric/client_adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,24 @@ defmodule Phoenix.Sync.Electric.ClientAdapter do

alias Phoenix.Sync.PredefinedShape

# Predefined-shape routes must not allow overriding server-defined shape
# attributes or stream-position keys (which are passed via dedicated fields).
@blocked_passthrough_keys ~w(
table
where
columns
params
replica
where_expr
order_by_expr
order_by
limit
offset
handle
live
cursor
)

def predefined_shape(sync_client, %PredefinedShape{} = predefined_shape) do
shape_client = PredefinedShape.client(sync_client.client, predefined_shape)

Expand Down Expand Up @@ -37,8 +55,8 @@ defmodule Phoenix.Sync.Electric.ClientAdapter do
end

# this is the server-defined shape route, so we want to only pass on the
# per-request/stream position params leaving the shape-definition params
# from the configured client.
# per-request/stream position params and protocol-level params, leaving
# the shape-definition params from the configured client.
defp request(%{shape_definition: %PredefinedShape{} = shape} = sync_client, _conn, params) do
{
Client.request(
Expand All @@ -47,7 +65,8 @@ defmodule Phoenix.Sync.Electric.ClientAdapter do
offset: params["offset"],
shape_handle: params["handle"],
live: live?(params["live"]),
next_cursor: params["cursor"]
next_cursor: params["cursor"],
params: protocol_request_params(sync_client, params)
),
shape
}
Expand All @@ -68,6 +87,21 @@ defmodule Phoenix.Sync.Electric.ClientAdapter do
defp normalise_method(method), do: method |> String.downcase() |> String.to_atom()
defp live?(live), do: live == "true"

defp protocol_request_params(%{client: %{params: client_params}}, params) do
client_param_keys =
client_params
|> stringify_keys()
|> Map.keys()

params
|> stringify_keys()
|> Map.drop(@blocked_passthrough_keys ++ client_param_keys)
end

defp stringify_keys(params) do
Map.new(params, fn {key, value} -> {to_string(key), value} end)
end

defp fetch_upstream(sync_client, conn, request, shape) do
response = make_request(sync_client, conn, request, shape)

Expand Down