Skip to content

Commit

Permalink
fabric: switch to maps for view rows
Browse files Browse the repository at this point in the history
The `#view_row{}` record that is used for capturing messages with
row data is not flexible enough to have it extended easily.  If
one wanted to introduce a fresh field, the change would have to be
propagated through many functions and modules.  Especially, if
support for mixed-version clusters is a concern, this would come
with some degree of duplication.

Leverage Erlang/OTP's built-in maps for mitigating this issue and
offer the view callbacks the `view_row_map` Boolean key in
`#mrargs.extra` to request this communication format.  This way the
old record-based format would be still in use unless requested
otherwise.  This facilitates the smooth interoperability of old
coordinators and new workers.  In parallel to that, the new
coordinator could still receive view rows from old workers.
  • Loading branch information
pgj committed Feb 13, 2024
1 parent d546447 commit ced2e74
Show file tree
Hide file tree
Showing 9 changed files with 458 additions and 151 deletions.
49 changes: 30 additions & 19 deletions src/couch_replicator/src/couch_replicator_fabric.erl
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,32 @@ docs_int(DbName, Workers, QueryArgs, Callback, Acc0) ->
{ok, Resp}
end.

handle_row(Row0, {Worker, From} = Source, State) ->
#collector{query_args = Args, counters = Counters0, rows = Rows0} = State,
{Id, Doc} =
case Row0 of
#view_row{id = I, doc = D} -> {I, D};
#{id := I, doc := D} -> {I, D}
end,
case maybe_fetch_and_filter_doc(Id, Doc, State) of
{[_ | _]} = NewDoc ->
Row =
case Row0 of
#view_row{} ->
Row0#view_row{doc = NewDoc, worker = Source};
#{} ->
Row0#{doc => NewDoc, worker => Source}
end,
Dir = Args#mrargs.direction,
Rows = fabric_util:merge_row(Dir, Row, Rows0),
Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
State1 = State#collector{rows = Rows, counters = Counters1},
fabric_view:maybe_send_row(State1);
skip ->
rexi:stream_ack(From),
{ok, State}
end.

handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) ->
fabric_view:check_down_shards(State, NodeRef);
handle_message({rexi_EXIT, Reason}, Worker, State) ->
Expand Down Expand Up @@ -120,29 +146,14 @@ handle_message({meta, Meta0}, {Worker, From}, State) ->
user_acc = Acc
}}
end;
handle_message(#view_row{id = Id, doc = Doc} = Row0, {Worker, From}, State) ->
#collector{query_args = Args, counters = Counters0, rows = Rows0} = State,
case maybe_fetch_and_filter_doc(Id, Doc, State) of
{[_ | _]} = NewDoc ->
Row = Row0#view_row{doc = NewDoc},
Dir = Args#mrargs.direction,
Rows = merge_row(Dir, Row#view_row{worker = {Worker, From}}, Rows0),
Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
State1 = State#collector{rows = Rows, counters = Counters1},
fabric_view:maybe_send_row(State1);
skip ->
rexi:stream_ack(From),
{ok, State}
end;
handle_message(#view_row{id = _, doc = _} = Row, {_, _} = Source, State) ->
handle_row(Row, Source, State);
handle_message(#{id := _, doc := _} = Row, {_, _} = Source, State) ->
handle_row(Row, Source, State);
handle_message(complete, Worker, State) ->
Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters),
fabric_view:maybe_send_row(State#collector{counters = Counters}).

merge_row(fwd, Row, Rows) ->
lists:keymerge(#view_row.id, [Row], Rows);
merge_row(rev, Row, Rows) ->
lists:rkeymerge(#view_row.id, [Row], Rows).

maybe_fetch_and_filter_doc(Id, undecided, State) ->
#collector{db_name = DbName, query_args = #mrargs{extra = Extra}} = State,
FilterStates = proplists:get_value(filter_states, Extra),
Expand Down
22 changes: 18 additions & 4 deletions src/couch_replicator/src/couch_replicator_fabric_rpc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,22 @@ docs(DbName, Options, Args0) ->
Args = Args0#mrargs{skip = 0, limit = Skip + Limit},
HealthThreshold = couch_replicator_scheduler:health_threshold(),
{ok, Db} = couch_db:open_int(DbName, Options),
Acc = {DbName, FilterStates, HealthThreshold},
Acc =
case couch_util:get_value(view_row_map, Extra, false) of
true ->
{DbName, FilterStates, HealthThreshold, view_row_map};
false ->
{DbName, FilterStates, HealthThreshold, view_row_record}
end,
couch_mrview:query_all_docs(Db, Args, fun docs_cb/2, Acc).

docs_cb({meta, Meta}, Acc) ->
ok = rexi:stream2({meta, Meta}),
{ok, Acc};
docs_cb({row, Row}, {DbName, States, HealthThreshold} = Acc) ->
docs_cb({row, Row}, {DbName, States, HealthThreshold, Kind} = Acc) ->
Id = couch_util:get_value(id, Row),
Doc = couch_util:get_value(doc, Row),
ViewRow = #view_row{
ViewRow0 = #view_row{
id = Id,
key = couch_util:get_value(key, Row),
value = couch_util:get_value(value, Row)
Expand All @@ -45,7 +51,15 @@ docs_cb({row, Row}, {DbName, States, HealthThreshold} = Acc) ->
skip ->
ok;
Other ->
ok = rexi:stream2(ViewRow#view_row{doc = Other})
ViewRow1 =
case Kind of
view_row_map ->
fabric_util:to_view_row_map(ViewRow0);
view_row_record ->
ViewRow0
end,
ViewRow = fabric_util:row_embed_doc(ViewRow1, Other),
ok = rexi:stream2(ViewRow)
end,
{ok, Acc};
docs_cb(complete, Acc) ->
Expand Down
11 changes: 9 additions & 2 deletions src/fabric/src/fabric_rpc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -491,14 +491,21 @@ view_cb({meta, Meta}, Acc) ->
% Map function starting
ok = rexi:stream2({meta, Meta}),
{ok, Acc};
view_cb({row, Row}, Acc) ->
view_cb({row, Row}, #mrargs{extra = Options} = Acc) ->
% Adding another row
ViewRow = #view_row{
ViewRow0 = #view_row{
id = couch_util:get_value(id, Row),
key = couch_util:get_value(key, Row),
value = couch_util:get_value(value, Row),
doc = couch_util:get_value(doc, Row)
},
ViewRow =
case couch_util:get_value(view_row_map, Options, false) of
true ->
fabric_util:to_view_row_map(ViewRow0);
false ->
ViewRow0
end,
ok = rexi:stream2(ViewRow),
{ok, Acc};
view_cb(complete, Acc) ->
Expand Down
212 changes: 179 additions & 33 deletions src/fabric/src/fabric_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,17 @@
-export([worker_ranges/1]).
-export([get_uuid_prefix_len/0]).
-export([isolate/1, isolate/2]).
-export([row_embed_doc/2]).
-export([to_view_row_map/1, to_view_row_record/1]).
-export([row_get_worker/1, row_get_value/1]).
-export([merge_row/3]).

-compile({inline, [{doc_id_and_rev, 1}]}).

-include_lib("fabric/include/fabric.hrl").
-include_lib("mem3/include/mem3.hrl").
-include_lib("couch/include/couch_db.hrl").
-include_lib("couch_mrview/include/couch_mrview.hrl").
-include_lib("eunit/include/eunit.hrl").

remove_down_workers(Workers, BadNode) ->
remove_down_workers(Workers, BadNode, []).
Expand Down Expand Up @@ -250,38 +254,6 @@ create_monitors(Shards) ->
MonRefs = lists:usort([rexi_utils:server_pid(N) || #shard{node = N} <- Shards]),
rexi_monitor:start(MonRefs).

%% verify only id and rev are used in key.
update_counter_test() ->
Reply =
{ok, #doc{
id = <<"id">>,
revs = <<"rev">>,
body = <<"body">>,
atts = <<"atts">>
}},
?assertEqual(
[{{<<"id">>, <<"rev">>}, {Reply, 1}}],
update_counter(Reply, 1, [])
).

remove_ancestors_test() ->
Foo1 = {ok, #doc{revs = {1, [<<"foo">>]}}},
Foo2 = {ok, #doc{revs = {2, [<<"foo2">>, <<"foo">>]}}},
Bar1 = {ok, #doc{revs = {1, [<<"bar">>]}}},
Bar2 = {not_found, {1, <<"bar">>}},
?assertEqual(
[kv(Bar1, 1), kv(Foo1, 1)],
remove_ancestors([kv(Bar1, 1), kv(Foo1, 1)], [])
),
?assertEqual(
[kv(Bar1, 1), kv(Foo2, 2)],
remove_ancestors([kv(Bar1, 1), kv(Foo1, 1), kv(Foo2, 1)], [])
),
?assertEqual(
[kv(Bar1, 2)],
remove_ancestors([kv(Bar2, 1), kv(Bar1, 1)], [])
).

is_replicator_db(DbName) ->
path_ends_with(DbName, <<"_replicator">>).

Expand Down Expand Up @@ -423,6 +395,103 @@ do_isolate(Fun) ->
{'$isolerr', Tag, Reason, Stack}
end.

row_embed_doc(#view_row{} = Row, Doc) ->
Row#view_row{doc = Doc};
row_embed_doc(#{} = Row, Doc) ->
Row#{doc => Doc}.

add_if_defined(#{} = Map, Key, Value) ->
case Value of
undefined -> Map;
V -> Map#{Key => V}
end.

to_view_row_map(#view_row{key = Key, id = Id, value = Value, doc = Doc, worker = Worker}) ->
Row0 = #{},
Row1 = add_if_defined(Row0, key, Key),
Row2 = add_if_defined(Row1, id, Id),
Row3 = add_if_defined(Row2, value, Value),
Row4 = add_if_defined(Row3, doc, Doc),
add_if_defined(Row4, worker, Worker);
to_view_row_map(#{} = Row) ->
Row.

to_view_row_record(#view_row{} = Row) ->
Row;
to_view_row_record(#{} = Row) ->
Id = maps:get(id, Row, undefined),
Key = maps:get(key, Row, undefined),
Value = maps:get(value, Row, undefined),
Doc = maps:get(doc, Row, undefined),
Worker = maps:get(worker, Row, undefined),
#view_row{id = Id, key = Key, value = Value, doc = Doc, worker = Worker}.

row_get_worker(#view_row{worker = Worker}) ->
Worker;
row_get_worker(#{worker := Worker}) ->
Worker;
row_get_worker(#{}) ->
undefined.

row_get_value(#view_row{value = Value}) ->
Value;
row_get_value(#{value := Value}) ->
Value;
row_get_value(#{}) ->
undefined.

insert_row(_Dir, Row, []) ->
[Row];
insert_row(fwd, #{id := RowId} = Row, [#{id := HeadId} = Head | Tail] = List) ->
case RowId =< HeadId of
true -> [Row | List];
false -> [Head | insert_row(fwd, Row, Tail)]
end;
insert_row(rev, Row, List) ->
lists:reverse(insert_row(fwd, Row, lists:reverse(List))).

merge_row(fwd, #view_row{} = Row, Rows) ->
lists:keymerge(#view_row.id, [Row], Rows);
merge_row(rev, #view_row{} = Row, Rows) ->
lists:rkeymerge(#view_row.id, [Row], Rows);
merge_row(Dir, #{} = Row, Rows) ->
insert_row(Dir, Row, Rows).

-ifdef(TEST).
-include_lib("couch/include/couch_eunit.hrl").

%% verify only id and rev are used in key.
update_counter_test() ->
Reply =
{ok, #doc{
id = <<"id">>,
revs = <<"rev">>,
body = <<"body">>,
atts = <<"atts">>
}},
?assertEqual(
[{{<<"id">>, <<"rev">>}, {Reply, 1}}],
update_counter(Reply, 1, [])
).

remove_ancestors_test() ->
Foo1 = {ok, #doc{revs = {1, [<<"foo">>]}}},
Foo2 = {ok, #doc{revs = {2, [<<"foo2">>, <<"foo">>]}}},
Bar1 = {ok, #doc{revs = {1, [<<"bar">>]}}},
Bar2 = {not_found, {1, <<"bar">>}},
?assertEqual(
[kv(Bar1, 1), kv(Foo1, 1)],
remove_ancestors([kv(Bar1, 1), kv(Foo1, 1)], [])
),
?assertEqual(
[kv(Bar1, 1), kv(Foo2, 2)],
remove_ancestors([kv(Bar1, 1), kv(Foo1, 1), kv(Foo2, 1)], [])
),
?assertEqual(
[kv(Bar1, 2)],
remove_ancestors([kv(Bar2, 1), kv(Bar1, 1)], [])
).

get_db_timeout_test() ->
% Q=1, N=1
?assertEqual(20000, get_db_timeout(1, 2, 100, 60000)),
Expand Down Expand Up @@ -466,3 +535,80 @@ get_db_timeout_test() ->
% request_timeout was set to infinity, with enough shards it still gets to
% 100 min timeout at the start from the exponential logic
?assertEqual(100, get_db_timeout(64, 2, 100, infinity)).

merge_row_record_fwd_test() ->
RowX1 = #view_row{id = 4},
Row1 = #view_row{id = 1},
Row2 = #view_row{id = 3},
Row3 = #view_row{id = 5},
Row4 = #view_row{id = 7},
Rows = [Row1, Row2, Row3, Row4],
Expected1 = [Row1, Row2, RowX1, Row3, Row4],
?assertEqual(Expected1, merge_row(fwd, RowX1, Rows)),
RowX2 = #view_row{id = 0},
Expected2 = [RowX2, Row1, Row2, Row3, Row4],
?assertEqual(Expected2, merge_row(fwd, RowX2, Rows)),
RowX3 = #view_row{id = 8},
Expected3 = [Row1, Row2, Row3, Row4, RowX3],
?assertEqual(Expected3, merge_row(fwd, RowX3, Rows)),
RowX4 = #view_row{id = 5},
Expected4 = [Row1, Row2, RowX4, Row3, Row4],
?assertEqual(Expected4, merge_row(fwd, RowX4, Rows)).

merge_row_record_rev_test() ->
RowX1 = #view_row{id = 5},
Row1 = #view_row{id = 2},
Row2 = #view_row{id = 4},
Row3 = #view_row{id = 6},
Row4 = #view_row{id = 8},
Rows = [Row4, Row3, Row2, Row1],
Expected1 = [Row4, Row3, RowX1, Row2, Row1],
?assertEqual(Expected1, merge_row(rev, RowX1, Rows)),
RowX2 = #view_row{id = 1},
Expected2 = [Row4, Row3, Row2, Row1, RowX2],
?assertEqual(Expected2, merge_row(rev, RowX2, Rows)),
RowX3 = #view_row{id = 9},
Expected3 = [RowX3, Row4, Row3, Row2, Row1],
?assertEqual(Expected3, merge_row(rev, RowX3, Rows)),
RowX4 = #view_row{id = 6},
Expected4 = [Row4, Row3, RowX4, Row2, Row1],
?assertEqual(Expected4, merge_row(rev, RowX4, Rows)).

merge_row_map_fwd_test() ->
RowX1 = #{id => 4},
Row1 = #{id => 1},
Row2 = #{id => 3},
Row3 = #{id => 5},
Row4 = #{id => 7},
Rows = [Row1, Row2, Row3, Row4],
Expected1 = [Row1, Row2, RowX1, Row3, Row4],
?assertEqual(Expected1, merge_row(fwd, RowX1, Rows)),
RowX2 = #{id => 0},
Expected2 = [RowX2, Row1, Row2, Row3, Row4],
?assertEqual(Expected2, merge_row(fwd, RowX2, Rows)),
RowX3 = #{id => 8},
Expected3 = [Row1, Row2, Row3, Row4, RowX3],
?assertEqual(Expected3, merge_row(fwd, RowX3, Rows)),
RowX4 = #{id => 5},
Expected4 = [Row1, Row2, RowX4, Row3, Row4],
?assertEqual(Expected4, merge_row(fwd, RowX4, Rows)).

merge_row_map_rev_test() ->
RowX1 = #{id => 5},
Row1 = #{id => 2},
Row2 = #{id => 4},
Row3 = #{id => 6},
Row4 = #{id => 8},
Rows = [Row4, Row3, Row2, Row1],
Expected1 = [Row4, Row3, RowX1, Row2, Row1],
?assertEqual(Expected1, merge_row(rev, RowX1, Rows)),
RowX2 = #{id => 1},
Expected2 = [Row4, Row3, Row2, Row1, RowX2],
?assertEqual(Expected2, merge_row(rev, RowX2, Rows)),
RowX3 = #{id => 9},
Expected3 = [RowX3, Row4, Row3, Row2, Row1],
?assertEqual(Expected3, merge_row(rev, RowX3, Rows)),
RowX4 = #{id => 6},
Expected4 = [Row4, Row3, RowX4, Row2, Row1],
?assertEqual(Expected4, merge_row(rev, RowX4, Rows)).
-endif.
Loading

0 comments on commit ced2e74

Please sign in to comment.