diff --git a/src/fabric/src/fabric_doc_update.erl b/src/fabric/src/fabric_doc_update.erl index 1f5755de09e..a9a7269d82e 100644 --- a/src/fabric/src/fabric_doc_update.erl +++ b/src/fabric/src/fabric_doc_update.erl @@ -22,7 +22,10 @@ doc_count, w, grouped_docs, - reply + reply, + update_options, + leaders = [], + started = [] }). go(_, [], _) -> @@ -33,10 +36,8 @@ go(DbName, AllDocs0, Opts) -> validate_atomic_update(DbName, AllDocs, lists:member(all_or_nothing, Opts)), Options = lists:delete(all_or_nothing, Opts), GroupedDocs = lists:map( - fun({#shard{name = Name, node = Node} = Shard, Docs}) -> - Docs1 = untag_docs(Docs), - Ref = rexi:cast(Node, {fabric_rpc, update_docs, [Name, Docs1, Options]}), - {Shard#shard{ref = Ref}, Docs} + fun({#shard{} = Shard, Docs}) -> + {Shard#shard{ref = make_ref()}, Docs} end, group_docs_by_shard(DbName, AllDocs) ), @@ -44,6 +45,7 @@ go(DbName, AllDocs0, Opts) -> RexiMon = fabric_util:create_monitors(Workers), W = couch_util:get_value(w, Options, integer_to_list(mem3:quorum(DbName))), Acc0 = #acc{ + update_options = Options, waiting_count = length(Workers), doc_count = length(AllDocs), w = list_to_integer(W), @@ -51,7 +53,8 @@ go(DbName, AllDocs0, Opts) -> reply = dict:new() }, Timeout = fabric_util:request_timeout(), - try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, Acc0, infinity, Timeout) of + Acc1 = start_leaders(Acc0), + try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, Acc1, infinity, Timeout) of {ok, {Health, Results}} when Health =:= ok; Health =:= accepted; Health =:= error -> @@ -72,61 +75,78 @@ go(DbName, AllDocs0, Opts) -> rexi_monitor:stop(RexiMon) end. -handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _Worker, #acc{} = Acc0) -> +handle_message({rexi_DOWN, _, {_, NodeRef}, _}, Worker, #acc{} = Acc0) -> #acc{grouped_docs = GroupedDocs} = Acc0, NewGrpDocs = [X || {#shard{node = N}, _} = X <- GroupedDocs, N =/= NodeRef], - skip_message(Acc0#acc{waiting_count = length(NewGrpDocs), grouped_docs = NewGrpDocs}); + Acc1 = Acc0#acc{waiting_count = length(NewGrpDocs), grouped_docs = NewGrpDocs}, + Acc2 = start_followers(Worker, Acc1), + skip_message(Acc2); handle_message({rexi_EXIT, _}, Worker, #acc{} = Acc0) -> #acc{waiting_count = WC, grouped_docs = GrpDocs} = Acc0, NewGrpDocs = lists:keydelete(Worker, 1, GrpDocs), - skip_message(Acc0#acc{waiting_count = WC - 1, grouped_docs = NewGrpDocs}); + Acc1 = Acc0#acc{waiting_count = WC - 1, grouped_docs = NewGrpDocs}, + Acc2 = start_followers(Worker, Acc1), + skip_message(Acc2); handle_message({error, all_dbs_active}, Worker, #acc{} = Acc0) -> % treat it like rexi_EXIT, the hope at least one copy will return successfully #acc{waiting_count = WC, grouped_docs = GrpDocs} = Acc0, NewGrpDocs = lists:keydelete(Worker, 1, GrpDocs), - skip_message(Acc0#acc{waiting_count = WC - 1, grouped_docs = NewGrpDocs}); + Acc1 = Acc0#acc{waiting_count = WC - 1, grouped_docs = NewGrpDocs}, + Acc2 = start_followers(Worker, Acc1), + skip_message(Acc2); handle_message(internal_server_error, Worker, #acc{} = Acc0) -> % happens when we fail to load validation functions in an RPC worker #acc{waiting_count = WC, grouped_docs = GrpDocs} = Acc0, NewGrpDocs = lists:keydelete(Worker, 1, GrpDocs), - skip_message(Acc0#acc{waiting_count = WC - 1, grouped_docs = NewGrpDocs}); + Acc1 = Acc0#acc{waiting_count = WC - 1, grouped_docs = NewGrpDocs}, + Acc2 = start_followers(Worker, Acc1), + skip_message(Acc2); handle_message(attachment_chunk_received, _Worker, #acc{} = Acc0) -> {ok, Acc0}; handle_message({ok, Replies}, Worker, #acc{} = Acc0) -> #acc{ - waiting_count = WaitingCount, doc_count = DocCount, w = W, grouped_docs = GroupedDocs, reply = DocReplyDict0 } = Acc0, - {value, {_, Docs}, NewGrpDocs} = lists:keytake(Worker, 1, GroupedDocs), - DocReplyDict = append_update_replies(Docs, Replies, DocReplyDict0), - case {WaitingCount, dict:size(DocReplyDict)} of - {1, _} -> - % last message has arrived, we need to conclude things - {Health, W, Reply} = dict:fold( - fun force_reply/3, - {ok, W, []}, - DocReplyDict - ), - {stop, {Health, Reply}}; - {_, DocCount} -> - % we've got at least one reply for each document, let's take a look - case dict:fold(fun maybe_reply/3, {stop, W, []}, DocReplyDict) of - continue -> - {ok, Acc0#acc{ - waiting_count = WaitingCount - 1, - grouped_docs = NewGrpDocs, - reply = DocReplyDict - }}; - {stop, W, FinalReplies} -> - {stop, {ok, FinalReplies}} - end; - _ -> - {ok, Acc0#acc{ - waiting_count = WaitingCount - 1, grouped_docs = NewGrpDocs, reply = DocReplyDict - }} + {value, {_, Docs}, NewGrpDocs0} = lists:keytake(Worker, 1, GroupedDocs), + IsLeader = lists:member(Worker#shard.ref, Acc0#acc.leaders), + DocReplyDict = append_update_replies(Docs, Replies, W, IsLeader, DocReplyDict0), + Acc1 = Acc0#acc{grouped_docs = NewGrpDocs0, reply = DocReplyDict}, + Acc2 = remove_conflicts(Docs, Replies, Acc1), + NewGrpDocs = Acc2#acc.grouped_docs, + case skip_message(Acc2) of + {stop, Msg} -> + {stop, Msg}; + {ok, Acc3} -> + Acc4 = start_followers(Worker, Acc3), + case {Acc4#acc.waiting_count, dict:size(DocReplyDict)} of + {1, _} -> + % last message has arrived, we need to conclude things + {Health, W, Reply} = dict:fold( + fun force_reply/3, + {ok, W, []}, + DocReplyDict + ), + {stop, {Health, Reply}}; + {_, DocCount} -> + % we've got at least one reply for each document, let's take a look + case dict:fold(fun maybe_reply/3, {stop, W, []}, DocReplyDict) of + continue -> + {ok, Acc4#acc{ + waiting_count = Acc4#acc.waiting_count - 1, + grouped_docs = NewGrpDocs + }}; + {stop, W, FinalReplies} -> + {stop, {ok, FinalReplies}} + end; + _ -> + {ok, Acc4#acc{ + waiting_count = Acc4#acc.waiting_count - 1, + grouped_docs = NewGrpDocs + }} + end end; handle_message({missing_stub, Stub}, _, _) -> throw({missing_stub, Stub}); @@ -318,13 +338,91 @@ group_docs_by_shard(DbName, Docs) -> ) ). -append_update_replies([], [], DocReplyDict) -> +%% use 'lowest' node that hosts this shard range as leader +is_leader(Worker, Workers) -> + Worker#shard.node == + lists:min([W#shard.node || W <- Workers, W#shard.range == Worker#shard.range]). + +start_leaders(#acc{} = Acc0) -> + #acc{grouped_docs = GroupedDocs} = Acc0, + {Workers, _} = lists:unzip(GroupedDocs), + LeaderRefs = lists:foldl( + fun({Worker, Docs}, RefAcc) -> + case is_leader(Worker, Workers) of + true -> + start_worker(Worker, Docs, Acc0), + [Worker#shard.ref | RefAcc]; + false -> + RefAcc + end + end, + [], + GroupedDocs + ), + Acc0#acc{leaders = LeaderRefs, started = LeaderRefs}. + +start_followers(#shard{} = Leader, #acc{} = Acc0) -> + Followers = [ + {Worker, Docs} + || {Worker, Docs} <- Acc0#acc.grouped_docs, + Worker#shard.range == Leader#shard.range, + not lists:member(Worker#shard.ref, Acc0#acc.started) + ], + lists:foreach( + fun({Worker, Docs}) -> + start_worker(Worker, Docs, Acc0) + end, + Followers + ), + Started = [Ref || {#shard{ref = Ref}, _Docs} <- Followers], + Acc0#acc{started = lists:append([Started, Acc0#acc.started])}. + +start_worker(#shard{ref = Ref} = Worker, Docs, #acc{} = Acc0) when is_reference(Ref) -> + #shard{name = Name, node = Node} = Worker, + #acc{update_options = UpdateOptions} = Acc0, + rexi:cast_ref(Ref, Node, {fabric_rpc, update_docs, [Name, untag_docs(Docs), UpdateOptions]}), + ok; +start_worker(#shard{ref = undefined}, _Docs, #acc{}) -> + % for unit tests below. + ok. + +append_update_replies([], [], _W, _IsLeader, DocReplyDict) -> DocReplyDict; -append_update_replies([Doc | Rest], [], Dict0) -> +append_update_replies([Doc | Rest], [], W, IsLeader, Dict0) -> % icky, if replicated_changes only errors show up in result - append_update_replies(Rest, [], dict:append(Doc, noreply, Dict0)); -append_update_replies([Doc | Rest1], [Reply | Rest2], Dict0) -> - append_update_replies(Rest1, Rest2, dict:append(Doc, Reply, Dict0)). + append_update_replies(Rest, [], W, IsLeader, dict:append(Doc, noreply, Dict0)); +append_update_replies([Doc | Rest1], [conflict | Rest2], W, true, Dict0) -> + %% fake conflict replies from followers as we won't ask them + append_update_replies( + Rest1, Rest2, W, true, dict:append_list(Doc, lists:duplicate(W, conflict), Dict0) + ); +append_update_replies([Doc | Rest1], [Reply | Rest2], W, IsLeader, Dict0) -> + append_update_replies(Rest1, Rest2, W, IsLeader, dict:append(Doc, Reply, Dict0)). + +%% leader found a conflict, remove that doc from the other (follower) workers, +%% removing the worker entirely if no docs remain. +remove_conflicts([], [], #acc{} = Acc0) -> + Acc0; +remove_conflicts([Doc | DocRest], [conflict | ReplyRest], #acc{} = Acc0) -> + #acc{grouped_docs = GroupedDocs0} = Acc0, + GroupedDocs1 = lists:foldl( + fun({Worker, Docs}, FoldAcc) -> + case lists:delete(Doc, Docs) of + [] -> + FoldAcc#acc{waiting_count = FoldAcc#acc.waiting_count - 1}; + Rest -> + [{Worker, Rest} | FoldAcc] + end + end, + [], + GroupedDocs0 + ), + Acc1 = Acc0#acc{grouped_docs = GroupedDocs1}, + remove_conflicts(DocRest, ReplyRest, Acc1); +remove_conflicts([_Doc | DocRest], [_Reply | ReplyRest], #acc{} = Acc0) -> + remove_conflicts(DocRest, ReplyRest, Acc0); +remove_conflicts([_Doc | DocRest], [], #acc{} = Acc0) -> + remove_conflicts(DocRest, [], Acc0). skip_message(#acc{waiting_count = 0, w = W, reply = DocReplyDict}) -> {Health, W, Reply} = dict:fold(fun force_reply/3, {ok, W, []}, DocReplyDict),