From ced2e746611655a30451f8b7d3690ce614f07683 Mon Sep 17 00:00:00 2001 From: Gabor Pali Date: Mon, 12 Feb 2024 01:16:23 +0100 Subject: [PATCH] fabric: switch to maps for view rows The `#view_row{}` record that is used for capturing messages with row data is not flexible enough to have it extended easily. If one wanted to introduce a fresh field, the change would have to be propagated through many functions and modules. Especially, if support for mixed-version clusters is a concern, this would come with some degree of duplication. Leverage Erlang/OTP's built-in maps for mitigating this issue and offer the view callbacks the `view_row_map` Boolean key in `#mrargs.extra` to request this communication format. This way the old record-based format would be still in use unless requested otherwise. This facilitates the smooth interoperability of old coordinators and new workers. In parallel to that, the new coordinator could still receive view rows from old workers. --- .../src/couch_replicator_fabric.erl | 49 ++-- .../src/couch_replicator_fabric_rpc.erl | 22 +- src/fabric/src/fabric_rpc.erl | 11 +- src/fabric/src/fabric_util.erl | 212 +++++++++++++++--- src/fabric/src/fabric_view.erl | 59 +++-- src/fabric/src/fabric_view_all_docs.erl | 38 +++- src/fabric/src/fabric_view_map.erl | 94 +++++--- src/fabric/src/fabric_view_reduce.erl | 26 ++- src/mango/src/mango_cursor_view.erl | 98 +++++--- 9 files changed, 458 insertions(+), 151 deletions(-) diff --git a/src/couch_replicator/src/couch_replicator_fabric.erl b/src/couch_replicator/src/couch_replicator_fabric.erl index 6e5ebfc25a1..0827c0159fb 100644 --- a/src/couch_replicator/src/couch_replicator_fabric.erl +++ b/src/couch_replicator/src/couch_replicator_fabric.erl @@ -81,6 +81,32 @@ docs_int(DbName, Workers, QueryArgs, Callback, Acc0) -> {ok, Resp} end. +handle_row(Row0, {Worker, From} = Source, State) -> + #collector{query_args = Args, counters = Counters0, rows = Rows0} = State, + {Id, Doc} = + case Row0 of + #view_row{id = I, doc = D} -> {I, D}; + #{id := I, doc := D} -> {I, D} + end, + case maybe_fetch_and_filter_doc(Id, Doc, State) of + {[_ | _]} = NewDoc -> + Row = + case Row0 of + #view_row{} -> + Row0#view_row{doc = NewDoc, worker = Source}; + #{} -> + Row0#{doc => NewDoc, worker => Source} + end, + Dir = Args#mrargs.direction, + Rows = fabric_util:merge_row(Dir, Row, Rows0), + Counters1 = fabric_dict:update_counter(Worker, 1, Counters0), + State1 = State#collector{rows = Rows, counters = Counters1}, + fabric_view:maybe_send_row(State1); + skip -> + rexi:stream_ack(From), + {ok, State} + end. + handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) -> fabric_view:check_down_shards(State, NodeRef); handle_message({rexi_EXIT, Reason}, Worker, State) -> @@ -120,29 +146,14 @@ handle_message({meta, Meta0}, {Worker, From}, State) -> user_acc = Acc }} end; -handle_message(#view_row{id = Id, doc = Doc} = Row0, {Worker, From}, State) -> - #collector{query_args = Args, counters = Counters0, rows = Rows0} = State, - case maybe_fetch_and_filter_doc(Id, Doc, State) of - {[_ | _]} = NewDoc -> - Row = Row0#view_row{doc = NewDoc}, - Dir = Args#mrargs.direction, - Rows = merge_row(Dir, Row#view_row{worker = {Worker, From}}, Rows0), - Counters1 = fabric_dict:update_counter(Worker, 1, Counters0), - State1 = State#collector{rows = Rows, counters = Counters1}, - fabric_view:maybe_send_row(State1); - skip -> - rexi:stream_ack(From), - {ok, State} - end; +handle_message(#view_row{id = _, doc = _} = Row, {_, _} = Source, State) -> + handle_row(Row, Source, State); +handle_message(#{id := _, doc := _} = Row, {_, _} = Source, State) -> + handle_row(Row, Source, State); handle_message(complete, Worker, State) -> Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters), fabric_view:maybe_send_row(State#collector{counters = Counters}). -merge_row(fwd, Row, Rows) -> - lists:keymerge(#view_row.id, [Row], Rows); -merge_row(rev, Row, Rows) -> - lists:rkeymerge(#view_row.id, [Row], Rows). - maybe_fetch_and_filter_doc(Id, undecided, State) -> #collector{db_name = DbName, query_args = #mrargs{extra = Extra}} = State, FilterStates = proplists:get_value(filter_states, Extra), diff --git a/src/couch_replicator/src/couch_replicator_fabric_rpc.erl b/src/couch_replicator/src/couch_replicator_fabric_rpc.erl index daeb86e6058..196293dbf03 100644 --- a/src/couch_replicator/src/couch_replicator_fabric_rpc.erl +++ b/src/couch_replicator/src/couch_replicator_fabric_rpc.erl @@ -27,16 +27,22 @@ docs(DbName, Options, Args0) -> Args = Args0#mrargs{skip = 0, limit = Skip + Limit}, HealthThreshold = couch_replicator_scheduler:health_threshold(), {ok, Db} = couch_db:open_int(DbName, Options), - Acc = {DbName, FilterStates, HealthThreshold}, + Acc = + case couch_util:get_value(view_row_map, Extra, false) of + true -> + {DbName, FilterStates, HealthThreshold, view_row_map}; + false -> + {DbName, FilterStates, HealthThreshold, view_row_record} + end, couch_mrview:query_all_docs(Db, Args, fun docs_cb/2, Acc). docs_cb({meta, Meta}, Acc) -> ok = rexi:stream2({meta, Meta}), {ok, Acc}; -docs_cb({row, Row}, {DbName, States, HealthThreshold} = Acc) -> +docs_cb({row, Row}, {DbName, States, HealthThreshold, Kind} = Acc) -> Id = couch_util:get_value(id, Row), Doc = couch_util:get_value(doc, Row), - ViewRow = #view_row{ + ViewRow0 = #view_row{ id = Id, key = couch_util:get_value(key, Row), value = couch_util:get_value(value, Row) @@ -45,7 +51,15 @@ docs_cb({row, Row}, {DbName, States, HealthThreshold} = Acc) -> skip -> ok; Other -> - ok = rexi:stream2(ViewRow#view_row{doc = Other}) + ViewRow1 = + case Kind of + view_row_map -> + fabric_util:to_view_row_map(ViewRow0); + view_row_record -> + ViewRow0 + end, + ViewRow = fabric_util:row_embed_doc(ViewRow1, Other), + ok = rexi:stream2(ViewRow) end, {ok, Acc}; docs_cb(complete, Acc) -> diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl index d01f1f5a749..ac615eacf0d 100644 --- a/src/fabric/src/fabric_rpc.erl +++ b/src/fabric/src/fabric_rpc.erl @@ -491,14 +491,21 @@ view_cb({meta, Meta}, Acc) -> % Map function starting ok = rexi:stream2({meta, Meta}), {ok, Acc}; -view_cb({row, Row}, Acc) -> +view_cb({row, Row}, #mrargs{extra = Options} = Acc) -> % Adding another row - ViewRow = #view_row{ + ViewRow0 = #view_row{ id = couch_util:get_value(id, Row), key = couch_util:get_value(key, Row), value = couch_util:get_value(value, Row), doc = couch_util:get_value(doc, Row) }, + ViewRow = + case couch_util:get_value(view_row_map, Options, false) of + true -> + fabric_util:to_view_row_map(ViewRow0); + false -> + ViewRow0 + end, ok = rexi:stream2(ViewRow), {ok, Acc}; view_cb(complete, Acc) -> diff --git a/src/fabric/src/fabric_util.erl b/src/fabric/src/fabric_util.erl index 4acb65c739a..3a25009d5c6 100644 --- a/src/fabric/src/fabric_util.erl +++ b/src/fabric/src/fabric_util.erl @@ -35,13 +35,17 @@ -export([worker_ranges/1]). -export([get_uuid_prefix_len/0]). -export([isolate/1, isolate/2]). +-export([row_embed_doc/2]). +-export([to_view_row_map/1, to_view_row_record/1]). +-export([row_get_worker/1, row_get_value/1]). +-export([merge_row/3]). -compile({inline, [{doc_id_and_rev, 1}]}). +-include_lib("fabric/include/fabric.hrl"). -include_lib("mem3/include/mem3.hrl"). -include_lib("couch/include/couch_db.hrl"). -include_lib("couch_mrview/include/couch_mrview.hrl"). --include_lib("eunit/include/eunit.hrl"). remove_down_workers(Workers, BadNode) -> remove_down_workers(Workers, BadNode, []). @@ -250,38 +254,6 @@ create_monitors(Shards) -> MonRefs = lists:usort([rexi_utils:server_pid(N) || #shard{node = N} <- Shards]), rexi_monitor:start(MonRefs). -%% verify only id and rev are used in key. -update_counter_test() -> - Reply = - {ok, #doc{ - id = <<"id">>, - revs = <<"rev">>, - body = <<"body">>, - atts = <<"atts">> - }}, - ?assertEqual( - [{{<<"id">>, <<"rev">>}, {Reply, 1}}], - update_counter(Reply, 1, []) - ). - -remove_ancestors_test() -> - Foo1 = {ok, #doc{revs = {1, [<<"foo">>]}}}, - Foo2 = {ok, #doc{revs = {2, [<<"foo2">>, <<"foo">>]}}}, - Bar1 = {ok, #doc{revs = {1, [<<"bar">>]}}}, - Bar2 = {not_found, {1, <<"bar">>}}, - ?assertEqual( - [kv(Bar1, 1), kv(Foo1, 1)], - remove_ancestors([kv(Bar1, 1), kv(Foo1, 1)], []) - ), - ?assertEqual( - [kv(Bar1, 1), kv(Foo2, 2)], - remove_ancestors([kv(Bar1, 1), kv(Foo1, 1), kv(Foo2, 1)], []) - ), - ?assertEqual( - [kv(Bar1, 2)], - remove_ancestors([kv(Bar2, 1), kv(Bar1, 1)], []) - ). - is_replicator_db(DbName) -> path_ends_with(DbName, <<"_replicator">>). @@ -423,6 +395,103 @@ do_isolate(Fun) -> {'$isolerr', Tag, Reason, Stack} end. +row_embed_doc(#view_row{} = Row, Doc) -> + Row#view_row{doc = Doc}; +row_embed_doc(#{} = Row, Doc) -> + Row#{doc => Doc}. + +add_if_defined(#{} = Map, Key, Value) -> + case Value of + undefined -> Map; + V -> Map#{Key => V} + end. + +to_view_row_map(#view_row{key = Key, id = Id, value = Value, doc = Doc, worker = Worker}) -> + Row0 = #{}, + Row1 = add_if_defined(Row0, key, Key), + Row2 = add_if_defined(Row1, id, Id), + Row3 = add_if_defined(Row2, value, Value), + Row4 = add_if_defined(Row3, doc, Doc), + add_if_defined(Row4, worker, Worker); +to_view_row_map(#{} = Row) -> + Row. + +to_view_row_record(#view_row{} = Row) -> + Row; +to_view_row_record(#{} = Row) -> + Id = maps:get(id, Row, undefined), + Key = maps:get(key, Row, undefined), + Value = maps:get(value, Row, undefined), + Doc = maps:get(doc, Row, undefined), + Worker = maps:get(worker, Row, undefined), + #view_row{id = Id, key = Key, value = Value, doc = Doc, worker = Worker}. + +row_get_worker(#view_row{worker = Worker}) -> + Worker; +row_get_worker(#{worker := Worker}) -> + Worker; +row_get_worker(#{}) -> + undefined. + +row_get_value(#view_row{value = Value}) -> + Value; +row_get_value(#{value := Value}) -> + Value; +row_get_value(#{}) -> + undefined. + +insert_row(_Dir, Row, []) -> + [Row]; +insert_row(fwd, #{id := RowId} = Row, [#{id := HeadId} = Head | Tail] = List) -> + case RowId =< HeadId of + true -> [Row | List]; + false -> [Head | insert_row(fwd, Row, Tail)] + end; +insert_row(rev, Row, List) -> + lists:reverse(insert_row(fwd, Row, lists:reverse(List))). + +merge_row(fwd, #view_row{} = Row, Rows) -> + lists:keymerge(#view_row.id, [Row], Rows); +merge_row(rev, #view_row{} = Row, Rows) -> + lists:rkeymerge(#view_row.id, [Row], Rows); +merge_row(Dir, #{} = Row, Rows) -> + insert_row(Dir, Row, Rows). + +-ifdef(TEST). +-include_lib("couch/include/couch_eunit.hrl"). + +%% verify only id and rev are used in key. +update_counter_test() -> + Reply = + {ok, #doc{ + id = <<"id">>, + revs = <<"rev">>, + body = <<"body">>, + atts = <<"atts">> + }}, + ?assertEqual( + [{{<<"id">>, <<"rev">>}, {Reply, 1}}], + update_counter(Reply, 1, []) + ). + +remove_ancestors_test() -> + Foo1 = {ok, #doc{revs = {1, [<<"foo">>]}}}, + Foo2 = {ok, #doc{revs = {2, [<<"foo2">>, <<"foo">>]}}}, + Bar1 = {ok, #doc{revs = {1, [<<"bar">>]}}}, + Bar2 = {not_found, {1, <<"bar">>}}, + ?assertEqual( + [kv(Bar1, 1), kv(Foo1, 1)], + remove_ancestors([kv(Bar1, 1), kv(Foo1, 1)], []) + ), + ?assertEqual( + [kv(Bar1, 1), kv(Foo2, 2)], + remove_ancestors([kv(Bar1, 1), kv(Foo1, 1), kv(Foo2, 1)], []) + ), + ?assertEqual( + [kv(Bar1, 2)], + remove_ancestors([kv(Bar2, 1), kv(Bar1, 1)], []) + ). + get_db_timeout_test() -> % Q=1, N=1 ?assertEqual(20000, get_db_timeout(1, 2, 100, 60000)), @@ -466,3 +535,80 @@ get_db_timeout_test() -> % request_timeout was set to infinity, with enough shards it still gets to % 100 min timeout at the start from the exponential logic ?assertEqual(100, get_db_timeout(64, 2, 100, infinity)). + +merge_row_record_fwd_test() -> + RowX1 = #view_row{id = 4}, + Row1 = #view_row{id = 1}, + Row2 = #view_row{id = 3}, + Row3 = #view_row{id = 5}, + Row4 = #view_row{id = 7}, + Rows = [Row1, Row2, Row3, Row4], + Expected1 = [Row1, Row2, RowX1, Row3, Row4], + ?assertEqual(Expected1, merge_row(fwd, RowX1, Rows)), + RowX2 = #view_row{id = 0}, + Expected2 = [RowX2, Row1, Row2, Row3, Row4], + ?assertEqual(Expected2, merge_row(fwd, RowX2, Rows)), + RowX3 = #view_row{id = 8}, + Expected3 = [Row1, Row2, Row3, Row4, RowX3], + ?assertEqual(Expected3, merge_row(fwd, RowX3, Rows)), + RowX4 = #view_row{id = 5}, + Expected4 = [Row1, Row2, RowX4, Row3, Row4], + ?assertEqual(Expected4, merge_row(fwd, RowX4, Rows)). + +merge_row_record_rev_test() -> + RowX1 = #view_row{id = 5}, + Row1 = #view_row{id = 2}, + Row2 = #view_row{id = 4}, + Row3 = #view_row{id = 6}, + Row4 = #view_row{id = 8}, + Rows = [Row4, Row3, Row2, Row1], + Expected1 = [Row4, Row3, RowX1, Row2, Row1], + ?assertEqual(Expected1, merge_row(rev, RowX1, Rows)), + RowX2 = #view_row{id = 1}, + Expected2 = [Row4, Row3, Row2, Row1, RowX2], + ?assertEqual(Expected2, merge_row(rev, RowX2, Rows)), + RowX3 = #view_row{id = 9}, + Expected3 = [RowX3, Row4, Row3, Row2, Row1], + ?assertEqual(Expected3, merge_row(rev, RowX3, Rows)), + RowX4 = #view_row{id = 6}, + Expected4 = [Row4, Row3, RowX4, Row2, Row1], + ?assertEqual(Expected4, merge_row(rev, RowX4, Rows)). + +merge_row_map_fwd_test() -> + RowX1 = #{id => 4}, + Row1 = #{id => 1}, + Row2 = #{id => 3}, + Row3 = #{id => 5}, + Row4 = #{id => 7}, + Rows = [Row1, Row2, Row3, Row4], + Expected1 = [Row1, Row2, RowX1, Row3, Row4], + ?assertEqual(Expected1, merge_row(fwd, RowX1, Rows)), + RowX2 = #{id => 0}, + Expected2 = [RowX2, Row1, Row2, Row3, Row4], + ?assertEqual(Expected2, merge_row(fwd, RowX2, Rows)), + RowX3 = #{id => 8}, + Expected3 = [Row1, Row2, Row3, Row4, RowX3], + ?assertEqual(Expected3, merge_row(fwd, RowX3, Rows)), + RowX4 = #{id => 5}, + Expected4 = [Row1, Row2, RowX4, Row3, Row4], + ?assertEqual(Expected4, merge_row(fwd, RowX4, Rows)). + +merge_row_map_rev_test() -> + RowX1 = #{id => 5}, + Row1 = #{id => 2}, + Row2 = #{id => 4}, + Row3 = #{id => 6}, + Row4 = #{id => 8}, + Rows = [Row4, Row3, Row2, Row1], + Expected1 = [Row4, Row3, RowX1, Row2, Row1], + ?assertEqual(Expected1, merge_row(rev, RowX1, Rows)), + RowX2 = #{id => 1}, + Expected2 = [Row4, Row3, Row2, Row1, RowX2], + ?assertEqual(Expected2, merge_row(rev, RowX2, Rows)), + RowX3 = #{id => 9}, + Expected3 = [RowX3, Row4, Row3, Row2, Row1], + ?assertEqual(Expected3, merge_row(rev, RowX3, Rows)), + RowX4 = #{id => 6}, + Expected4 = [Row4, Row3, RowX4, Row2, Row1], + ?assertEqual(Expected4, merge_row(rev, RowX4, Rows)). +-endif. diff --git a/src/fabric/src/fabric_view.erl b/src/fabric/src/fabric_view.erl index 87f464b365d..19bc2adf170 100644 --- a/src/fabric/src/fabric_view.erl +++ b/src/fabric/src/fabric_view.erl @@ -187,6 +187,11 @@ possibly_embed_doc( #view_row{id = reduced} = Row ) -> Row; +possibly_embed_doc( + _State, + #{id := reduced} = Row +) -> + Row; possibly_embed_doc( _State, #view_row{value = undefined} = Row @@ -194,16 +199,21 @@ possibly_embed_doc( Row; possibly_embed_doc( #collector{db_name = DbName, query_args = Args}, - #view_row{key = _Key, id = _Id, value = Value, doc = _Doc} = Row + Row ) -> #mrargs{include_docs = IncludeDocs} = Args, + Value = + case Row of + #view_row{value = V} -> V; + #{} -> maps:get(value, Row, undefined) + end, case IncludeDocs andalso is_tuple(Value) of true -> {Props} = Value, Rev0 = couch_util:get_value(<<"_rev">>, Props), case couch_util:get_value(<<"_id">>, Props) of null -> - Row#view_row{doc = null}; + fabric_util:row_embed_doc(Row, null); undefined -> Row; IncId -> @@ -215,21 +225,25 @@ possibly_embed_doc( undefined -> case fabric:open_doc(DbName, IncId, []) of {ok, NewDoc} -> - Row#view_row{doc = couch_doc:to_json_obj(NewDoc, [])}; + fabric_util:row_embed_doc( + Row, couch_doc:to_json_obj(NewDoc, []) + ); {not_found, _} -> - Row#view_row{doc = null}; + fabric_util:row_embed_doc(Row, null); Else -> - Row#view_row{doc = {error, Else}} + fabric_util:row_embed_doc(Row, {error, Else}) end; Rev0 -> Rev = couch_doc:parse_rev(Rev0), case fabric:open_revs(DbName, IncId, [Rev], []) of {ok, [{ok, NewDoc}]} -> - Row#view_row{doc = couch_doc:to_json_obj(NewDoc, [])}; + fabric_util:row_embed_doc( + Row, couch_doc:to_json_obj(NewDoc, []) + ); {ok, [{{not_found, _}, Rev}]} -> - Row#view_row{doc = null}; + fabric_util:row_embed_doc(Row, null); Else -> - Row#view_row{doc = {error, Else}} + fabric_util:row_embed_doc(Row, {error, Else}) end end ) @@ -239,13 +253,17 @@ possibly_embed_doc( Resp end end; - _ -> + false -> Row end. detach_partition(#view_row{key = {p, _Partition, Key}} = Row) -> Row#view_row{key = Key}; +detach_partition(#{key := {p, _Partition, Key}} = Row) -> + Row#{key => Key}; detach_partition(#view_row{} = Row) -> + Row; +detach_partition(#{key := _} = Row) -> Row. keydict(undefined) -> @@ -264,7 +282,7 @@ get_next_row(#collector{rows = []}) -> throw(complete); get_next_row(#collector{reducer = RedSrc} = St) when RedSrc =/= undefined -> #collector{ - query_args = #mrargs{direction = Dir}, + query_args = #mrargs{direction = Dir, extra = Options}, keys = Keys, rows = RowDict, lang = Lang, @@ -275,7 +293,8 @@ get_next_row(#collector{reducer = RedSrc} = St) when RedSrc =/= undefined -> case reduce_row_dict_take(Key, RowDict, Collation) of {Records, NewRowDict} -> Counters = lists:foldl( - fun(#view_row{worker = {Worker, From}}, CntrsAcc) -> + fun(Row, CntrsAcc) -> + {Worker, From} = fabric_util:row_get_worker(Row), case From of {Pid, _} when is_pid(Pid) -> gen_server:reply(From, ok); @@ -287,17 +306,25 @@ get_next_row(#collector{reducer = RedSrc} = St) when RedSrc =/= undefined -> Counters0, Records ), - Wrapped = [[V] || #view_row{value = V} <- Records], + Wrapped = [[fabric_util:row_get_value(R)] || R <- Records], {ok, [Reduced]} = couch_query_servers:rereduce(Lang, [RedSrc], Wrapped), {ok, Finalized} = couch_query_servers:finalize(RedSrc, Reduced), NewSt = St#collector{keys = RestKeys, rows = NewRowDict, counters = Counters}, - {#view_row{key = Key, id = reduced, value = Finalized}, NewSt}; + ViewRow0 = #view_row{key = Key, id = reduced, value = Finalized}, + ViewRow = + case couch_util:get_value(view_row_map, Options, false) of + true -> + fabric_util:to_view_row_map(ViewRow0); + false -> + ViewRow0 + end, + {ViewRow, NewSt}; error -> get_next_row(St#collector{keys = RestKeys}) end; get_next_row(State) -> #collector{rows = [Row | Rest], counters = Counters0} = State, - {Worker, From} = Row#view_row.worker, + {Worker, From} = fabric_util:row_get_worker(Row), rexi:stream_ack(From), Counters1 = fabric_dict:update_counter(Worker, -1, Counters0), {Row, State#collector{rows = Rest, counters = Counters1}}. @@ -349,7 +376,9 @@ transform_row(#view_row{key = Key, id = Id, value = Value, doc = undefined}) -> transform_row(#view_row{key = Key, id = _Id, value = _Value, doc = {error, Reason}}) -> {row, [{id, error}, {key, Key}, {value, Reason}]}; transform_row(#view_row{key = Key, id = Id, value = Value, doc = Doc}) -> - {row, [{id, Id}, {key, Key}, {value, Value}, {doc, Doc}]}. + {row, [{id, Id}, {key, Key}, {value, Value}, {doc, Doc}]}; +transform_row(#{} = Row) -> + transform_row(fabric_util:to_view_row_record(Row)). compare(fwd, <<"raw">>, A, B) -> A < B; compare(rev, <<"raw">>, A, B) -> B < A; diff --git a/src/fabric/src/fabric_view_all_docs.erl b/src/fabric/src/fabric_view_all_docs.erl index a6786bff788..76cb958b68b 100644 --- a/src/fabric/src/fabric_view_all_docs.erl +++ b/src/fabric/src/fabric_view_all_docs.erl @@ -187,6 +187,28 @@ shards(Db, Args) -> end, fabric_view:get_shards(Db, NewArgs). +handle_row(Row0, {Worker, _} = Source, State) -> + #collector{query_args = Args, counters = Counters0, rows = Rows0} = State, + #mrargs{extra = Options, direction = Dir} = Args, + Row1 = + case Row0 of + #view_row{} -> Row0#view_row{worker = Source}; + #{} -> Row0#{worker => Source} + end, + % It has to be ensured that rows of the same format are merged in case of + % mixed-version cluster scenarios. + Row = + case couch_util:get_value(view_row_map, Options, false) of + true -> + fabric_util:to_view_row_map(Row1); + false -> + fabric_util:to_view_row_record(Row1) + end, + Rows = fabric_util:merge_row(Dir, Row, Rows0), + Counters1 = fabric_dict:update_counter(Worker, 1, Counters0), + State1 = State#collector{rows = Rows, counters = Counters1}, + fabric_view:maybe_send_row(State1). + handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) -> fabric_view:check_down_shards(State, NodeRef); handle_message({rexi_EXIT, Reason}, Worker, State) -> @@ -257,13 +279,10 @@ handle_message({meta, Meta0}, {Worker, From}, State) -> update_seq = UpdateSeq0 }} end; -handle_message(#view_row{} = Row, {Worker, From}, State) -> - #collector{query_args = Args, counters = Counters0, rows = Rows0} = State, - Dir = Args#mrargs.direction, - Rows = merge_row(Dir, Row#view_row{worker = {Worker, From}}, Rows0), - Counters1 = fabric_dict:update_counter(Worker, 1, Counters0), - State1 = State#collector{rows = Rows, counters = Counters1}, - fabric_view:maybe_send_row(State1); +handle_message(#view_row{} = Row, {_, _} = Source, State) -> + handle_row(Row, Source, State); +handle_message(#{} = Row, {_, _} = Source, State) -> + handle_row(Row, Source, State); handle_message(complete, Worker, State) -> Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters), fabric_view:maybe_send_row(State#collector{counters = Counters}); @@ -273,11 +292,6 @@ handle_message({execution_stats, _} = Msg, {_, From}, St) -> rexi:stream_ack(From), {Go, St#collector{user_acc = Acc}}. -merge_row(fwd, Row, Rows) -> - lists:keymerge(#view_row.id, [Row], Rows); -merge_row(rev, Row, Rows) -> - lists:rkeymerge(#view_row.id, [Row], Rows). - all_docs_concurrency() -> Value = config:get("fabric", "all_docs_concurrency", "10"), try diff --git a/src/fabric/src/fabric_view_map.erl b/src/fabric/src/fabric_view_map.erl index 43922d5d59e..d7bdb399bd0 100644 --- a/src/fabric/src/fabric_view_map.erl +++ b/src/fabric/src/fabric_view_map.erl @@ -121,6 +121,41 @@ go(DbName, Workers, {map, View, _}, Args, Callback, Acc0) -> {ok, Resp} end. +handle_stop(State) -> + #collector{callback = Callback} = State, + {_, Acc} = Callback(complete, State#collector.user_acc), + {stop, State#collector{user_acc = Acc}}. + +handle_non_sorted(Row, {_, From}, State) -> + #collector{callback = Callback, user_acc = AccIn, limit = Limit} = State, + {Go, Acc} = Callback(fabric_view:transform_row(Row), AccIn), + rexi:stream_ack(From), + {Go, State#collector{user_acc = Acc, limit = Limit - 1}}. + +handle_sorted(Row0, {Worker, _} = Source, State) -> + #collector{ + query_args = #mrargs{direction = Dir}, + counters = Counters0, + rows = Rows0, + keys = KeyDict0, + collation = Collation + } = State, + Row = + case Row0 of + #view_row{} -> Row0#view_row{worker = Source}; + #{} -> Row0#{worker => Source} + end, + {Rows, KeyDict} = merge_row( + Dir, + Collation, + KeyDict0, + Row, + Rows0 + ), + Counters1 = fabric_dict:update_counter(Worker, 1, Counters0), + State1 = State#collector{rows = Rows, counters = Counters1, keys = KeyDict}, + fabric_view:maybe_send_row(State1). + handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) -> fabric_view:check_down_shards(State, NodeRef); handle_message({rexi_EXIT, Reason}, Worker, State) -> @@ -176,49 +211,39 @@ handle_message({meta, Meta0}, {Worker, From}, State) -> }} end; handle_message(#view_row{}, {_, _}, #collector{sorted = false, limit = 0} = State) -> - #collector{callback = Callback} = State, - {_, Acc} = Callback(complete, State#collector.user_acc), - {stop, State#collector{user_acc = Acc}}; -handle_message(#view_row{} = Row, {_, From}, #collector{sorted = false} = St) -> - #collector{callback = Callback, user_acc = AccIn, limit = Limit} = St, - {Go, Acc} = Callback(fabric_view:transform_row(Row), AccIn), - rexi:stream_ack(From), - {Go, St#collector{user_acc = Acc, limit = Limit - 1}}; -handle_message(#view_row{} = Row, {Worker, From}, State) -> - #collector{ - query_args = #mrargs{direction = Dir}, - counters = Counters0, - rows = Rows0, - keys = KeyDict0, - collation = Collation - } = State, - {Rows, KeyDict} = merge_row( - Dir, - Collation, - KeyDict0, - Row#view_row{worker = {Worker, From}}, - Rows0 - ), - Counters1 = fabric_dict:update_counter(Worker, 1, Counters0), - State1 = State#collector{rows = Rows, counters = Counters1, keys = KeyDict}, - fabric_view:maybe_send_row(State1); + handle_stop(State); +handle_message(#view_row{} = Row, {_, _} = Source, #collector{sorted = false} = State) -> + handle_non_sorted(Row, Source, State); +handle_message(#view_row{} = Row, {_, _} = Source, State) -> + handle_sorted(Row, Source, State); +handle_message(#{}, {_, _}, #collector{sorted = false, limit = 0} = State) -> + handle_stop(State); +handle_message(#{} = Row, {_, _} = Source, #collector{sorted = false} = State) -> + handle_non_sorted(Row, Source, State); +handle_message(#{} = Row, {_, _} = Source, State) -> + handle_sorted(Row, Source, State); handle_message(complete, Worker, State) -> Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters), fabric_view:maybe_send_row(State#collector{counters = Counters}); -handle_message({execution_stats, _} = Msg, {_, From}, St) -> - #collector{callback = Callback, user_acc = AccIn} = St, +handle_message({execution_stats, _} = Msg, {_, From}, State) -> + #collector{callback = Callback, user_acc = AccIn} = State, {Go, Acc} = Callback(Msg, AccIn), rexi:stream_ack(From), - {Go, St#collector{user_acc = Acc}}; + {Go, State#collector{user_acc = Acc}}; handle_message(ddoc_updated, _Worker, State) -> {stop, State}; handle_message(insufficient_storage, _Worker, State) -> {stop, State}. +key_id_from_row(#view_row{key = Key, id = Id}) -> + {Key, Id}; +key_id_from_row(#{key := Key, id := Id}) -> + {Key, Id}. + merge_row(Dir, Collation, undefined, Row, Rows0) -> Rows1 = lists:merge( - fun(#view_row{key = KeyA, id = IdA}, #view_row{key = KeyB, id = IdB}) -> - compare(Dir, Collation, {KeyA, IdA}, {KeyB, IdB}) + fun(RowA, RowB) -> + compare(Dir, Collation, key_id_from_row(RowA), key_id_from_row(RowB)) end, [Row], Rows0 @@ -240,12 +265,15 @@ merge_row(Dir, Collation, KeyDict0, Row, Rows0) -> _ -> fun couch_ejson_compare:less/2 end, - case maybe_update_keydict(Row#view_row.key, KeyDict0, CmpFun) of + {Key, _} = key_id_from_row(Row), + case maybe_update_keydict(Key, KeyDict0, CmpFun) of undefined -> {Rows0, KeyDict0}; KeyDict1 -> Rows1 = lists:merge( - fun(#view_row{key = A, id = IdA}, #view_row{key = B, id = IdB}) -> + fun(RowA, RowB) -> + {A, IdA} = key_id_from_row(RowA), + {B, IdB} = key_id_from_row(RowB), case {Dir, CmpFun(A, B)} of {fwd, 0} -> IdA < IdB; diff --git a/src/fabric/src/fabric_view_reduce.erl b/src/fabric/src/fabric_view_reduce.erl index d4d17d5e1ee..d304e5fdd15 100644 --- a/src/fabric/src/fabric_view_reduce.erl +++ b/src/fabric/src/fabric_view_reduce.erl @@ -123,6 +123,21 @@ go2(DbName, Workers, {red, {_, Lang, View}, _} = VInfo, Args, Callback, Acc0) -> end end. +handle_row(Row0, {Worker, _} = Source, State) -> + #collector{counters = Counters0, rows = Rows0} = State, + true = fabric_dict:is_key(Worker, Counters0), + {Row, Key} = + case Row0 of + #view_row{key = K} -> + {Row0#view_row{worker = Source}, K}; + #{key := K} -> + {Row0#{worker => Source}, K} + end, + Rows = dict:append(Key, Row, Rows0), + C1 = fabric_dict:update_counter(Worker, 1, Counters0), + State1 = State#collector{rows = Rows, counters = C1}, + fabric_view:maybe_send_row(State1). + handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) -> fabric_view:check_down_shards(State, NodeRef); handle_message({rexi_EXIT, Reason}, Worker, State) -> @@ -165,13 +180,10 @@ handle_message({meta, Meta0}, {Worker, From}, State) -> user_acc = Acc }} end; -handle_message(#view_row{key = Key} = Row, {Worker, From}, State) -> - #collector{counters = Counters0, rows = Rows0} = State, - true = fabric_dict:is_key(Worker, Counters0), - Rows = dict:append(Key, Row#view_row{worker = {Worker, From}}, Rows0), - C1 = fabric_dict:update_counter(Worker, 1, Counters0), - State1 = State#collector{rows = Rows, counters = C1}, - fabric_view:maybe_send_row(State1); +handle_message(#view_row{key = _} = Row, {_, _} = Source, State) -> + handle_row(Row, Source, State); +handle_message(#{key := _} = Row, {_, _} = Source, State) -> + handle_row(Row, Source, State); handle_message(complete, Worker, #collector{counters = Counters0} = State) -> true = fabric_dict:is_key(Worker, Counters0), C1 = fabric_dict:update_counter(Worker, 1, Counters0), diff --git a/src/mango/src/mango_cursor_view.erl b/src/mango/src/mango_cursor_view.erl index b103d869da9..f96cd58c5b2 100644 --- a/src/mango/src/mango_cursor_view.erl +++ b/src/mango/src/mango_cursor_view.erl @@ -199,10 +199,12 @@ base_args(#cursor{index = Idx, selector = Selector, fields = Fields} = Cursor) - {ignore_partition_query_limit, true}, - % Request execution statistics in a map. The purpose of this option is - % to maintain interoperability on version upgrades. - % TODO remove this option in a later version. - {execution_stats_map, true} + % The purpose of the following options is to maintain + % interoperability on version upgrades: + % - Return execution statistics in a map + {execution_stats_map, true}, + % - Return view rows in a map + {view_row_map, true} ] }. @@ -350,7 +352,7 @@ view_cb({meta, Meta}, Acc) -> view_cb({row, Row}, #mrargs{extra = Options} = Acc) -> mango_execution_stats:shard_incr_keys_examined(), couch_stats:increment_counter([mango, keys_examined]), - ViewRow = #view_row{ + ViewRow0 = #view_row{ id = couch_util:get_value(id, Row), key = couch_util:get_value(key, Row), doc = couch_util:get_value(doc, Row) @@ -380,21 +382,35 @@ view_cb({row, Row}, #mrargs{extra = Options} = Acc) -> % However, this oddness is confined to being visible in this module. case match_and_extract_doc(Doc, Selector, Fields) of {match, FinalDoc} -> - FinalViewRow = ViewRow#view_row{doc = FinalDoc}, - ok = rexi:stream2(FinalViewRow), + ViewRow1 = + case couch_util:get_value(view_row_map, Options, false) of + true -> + fabric_util:to_view_row_map(ViewRow0); + false -> + ViewRow0 + end, + ViewRow = fabric_util:row_embed_doc(ViewRow1, FinalDoc), + ok = rexi:stream2(ViewRow), set_mango_msg_timestamp(); {no_match, undefined} -> maybe_send_mango_ping() end end, - case {ViewRow#view_row.doc, CoveringIndex} of + case {ViewRow0#view_row.doc, CoveringIndex} of {null, _} -> maybe_send_mango_ping(); {undefined, Index = #idx{}} -> - Doc = derive_doc_from_index(Index, ViewRow), + Doc = derive_doc_from_index(Index, ViewRow0), Process(Doc); {undefined, _} -> % include_docs=false. Use quorum fetch at coordinator + ViewRow = + case couch_util:get_value(view_row_map, Options, false) of + true -> + fabric_util:to_view_row_map(ViewRow0); + false -> + ViewRow0 + end, ok = rexi:stream2(ViewRow), set_mango_msg_timestamp(); {Doc, _} -> @@ -744,7 +760,8 @@ base_opts_test() -> covering_index => undefined }}, {ignore_partition_query_limit, true}, - {execution_stats_map, true} + {execution_stats_map, true}, + {view_row_map, true} ], MRArgs = #mrargs{ @@ -1063,7 +1080,8 @@ t_execute_ok_all_docs(_) -> covering_index => undefined }}, {ignore_partition_query_limit, true}, - {execution_stats_map, true} + {execution_stats_map, true}, + {view_row_map, true} ], Args = #mrargs{ @@ -1146,7 +1164,8 @@ t_execute_ok_query_view(_) -> covering_index => undefined }}, {ignore_partition_query_limit, true}, - {execution_stats_map, true} + {execution_stats_map, true}, + {view_row_map, true} ], Args = #mrargs{ @@ -1239,7 +1258,8 @@ t_execute_ok_all_docs_with_execution_stats(_) -> covering_index => undefined }}, {ignore_partition_query_limit, true}, - {execution_stats_map, true} + {execution_stats_map, true}, + {view_row_map, true} ], Args = #mrargs{ @@ -1314,7 +1334,9 @@ view_cb_test_() -> ?TDEF_FE(t_view_cb_row_missing_doc_triggers_quorum_fetch), ?TDEF_FE(t_view_cb_row_matching_covered_doc), ?TDEF_FE(t_view_cb_row_non_matching_covered_doc), - ?TDEF_FE(t_view_cb_row_backwards_compatible), + ?TDEF_FE(t_view_cb_row_backwards_compatible_callback_args), + ?TDEF_FE(t_view_cb_row_backwards_compatible_view_row_standard), + ?TDEF_FE(t_view_cb_row_backwards_compatible_view_row_quorum_fetch), ?TDEF_FE(t_view_cb_complete_shard_stats_v1), ?TDEF_FE(t_view_cb_complete_shard_stats_v2), ?TDEF_FE(t_view_cb_ok) @@ -1328,8 +1350,8 @@ t_view_cb_meta(_) -> t_view_cb_row_matching_regular_doc(_) -> Row = [{id, id}, {key, key}, {doc, doc}], - Result = #view_row{id = id, key = key, doc = doc}, - meck:expect(rexi, stream2, [Result], meck:val(ok)), + ViewRow = #{id => id, key => key, doc => doc}, + meck:expect(rexi, stream2, [ViewRow], meck:val(ok)), Accumulator = #mrargs{ extra = [ @@ -1337,7 +1359,8 @@ t_view_cb_row_matching_regular_doc(_) -> selector => {[]}, fields => all_fields, covering_index => undefined - }} + }}, + {view_row_map, true} ] }, mango_execution_stats:shard_init(), @@ -1355,7 +1378,8 @@ t_view_cb_row_non_matching_regular_doc(_) -> selector => {[{<<"field">>, {[{<<"$exists">>, true}]}}]}, fields => all_fields, covering_index => undefined - }} + }}, + {view_row_map, true} ] }, mango_execution_stats:shard_init(), @@ -1373,7 +1397,8 @@ t_view_cb_row_null_doc(_) -> selector => {[]}, fields => all_fields, covering_index => undefined - }} + }}, + {view_row_map, true} ] }, mango_execution_stats:shard_init(), @@ -1383,7 +1408,7 @@ t_view_cb_row_null_doc(_) -> t_view_cb_row_missing_doc_triggers_quorum_fetch(_) -> Row = [{id, id}, {key, key}, {doc, undefined}], - ViewRow = #view_row{id = id, key = key, doc = undefined}, + ViewRow = #{id => id, key => key}, meck:expect(rexi, stream2, [ViewRow], meck:val(ok)), Accumulator = #mrargs{ @@ -1392,7 +1417,8 @@ t_view_cb_row_missing_doc_triggers_quorum_fetch(_) -> selector => {[]}, fields => all_fields, covering_index => undefined - }} + }}, + {view_row_map, true} ] }, mango_execution_stats:shard_init(), @@ -1403,14 +1429,14 @@ t_view_cb_row_matching_covered_doc(_) -> Keys = [key1, key2], Row = [{id, id}, {key, Keys}, {doc, undefined}], Doc = {[{<<"field1">>, key1}, {<<"field2">>, key2}]}, - Result = #view_row{id = id, key = Keys, doc = Doc}, + ViewRow = #{id => id, key => Keys, doc => Doc}, Fields = [<<"field1">>, <<"field2">>], Index = #idx{ type = <<"json">>, def = {[{<<"fields">>, {[{<<"field1">>, undefined}, {<<"field2">>, undefined}]}}]} }, - meck:expect(rexi, stream2, [Result], meck:val(ok)), + meck:expect(rexi, stream2, [ViewRow], meck:val(ok)), Accumulator = #mrargs{ extra = [ @@ -1418,7 +1444,8 @@ t_view_cb_row_matching_covered_doc(_) -> selector => {[]}, fields => Fields, covering_index => Index - }} + }}, + {view_row_map, true} ] }, mango_execution_stats:shard_init(), @@ -1441,7 +1468,8 @@ t_view_cb_row_non_matching_covered_doc(_) -> selector => {[{<<"field">>, {[{<<"$exists">>, true}]}}]}, fields => Fields, covering_index => Index - }} + }}, + {view_row_map, true} ] }, mango_execution_stats:shard_init(), @@ -1449,7 +1477,7 @@ t_view_cb_row_non_matching_covered_doc(_) -> ?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)), ?assertNot(meck:called(rexi, stream2, '_')). -t_view_cb_row_backwards_compatible(_) -> +t_view_cb_row_backwards_compatible_callback_args(_) -> Row = [{id, id}, {key, key}, {doc, null}], meck:expect(rexi, stream2, ['_'], undefined), Accumulator = #mrargs{extra = [{selector, {[]}}]}, @@ -1458,6 +1486,24 @@ t_view_cb_row_backwards_compatible(_) -> ?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)), ?assertNot(meck:called(rexi, stream2, '_')). +t_view_cb_row_backwards_compatible_view_row_standard(_) -> + Row = [{id, id}, {key, key}, {doc, doc}], + ViewRow = #view_row{id = id, key = key, doc = doc}, + meck:expect(rexi, stream2, [ViewRow], meck:val(ok)), + Accumulator = #mrargs{extra = [{selector, {[]}}]}, + mango_execution_stats:shard_init(), + ?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)), + ?assert(meck:called(rexi, stream2, '_')). + +t_view_cb_row_backwards_compatible_view_row_quorum_fetch(_) -> + Row = [{id, id}, {key, key}, {doc, undefined}], + ViewRow = #view_row{id = id, key = key, doc = undefined}, + meck:expect(rexi, stream2, [ViewRow], meck:val(ok)), + Accumulator = #mrargs{extra = [{selector, {[]}}]}, + mango_execution_stats:shard_init(), + ?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)), + ?assert(meck:called(rexi, stream2, '_')). + t_view_cb_complete_shard_stats_v1(_) -> meck:expect(rexi, stream2, [{execution_stats, {docs_examined, '_'}}], meck:val(ok)), meck:expect(rexi, stream_last, [complete], meck:val(ok)),