Skip to content

Commit

Permalink
Avoid read docs twice when filtered _changes is triggered (#4862)
Browse files Browse the repository at this point in the history
* Add more tests for couch_changes and fabric changes

Add more tests to ensure the response is the same before and after
the code change.

Code coverage improvements
- chttpd_changes_test.erl: 99% -> 100%
- couch_changes.erl: 80% -> 82%
- fabric_rpc.erl: 56% -> 61%

* Verify the number of calls to `open_doc/3` when requesting `_changes`

* Avoid read docs twice when filtered _changes is triggered

- When filered `_changes` is triggered, `open_revs()` will
read docs. If request parameter has `include_docs=true`,
then `doc_member()` will also read docs.

To avoid the above behavior, changed `filter/3` to `filter/5`.

- When using filtered `_changes` with `style=all_docs`,
`conflicts=true` and `include_docs=true`, the response should
contain the `_conflicts` field.

Add `open_all_revs_include_doc/2` to include `_conflicts` field.

- If conflicted docs contain a deleted doc, the deleted `rev` value
should not appear in the `doc` field.

Add `Doc#doc.deleted =:= false` filter to avoid this behaviour.

* Update tests to verify the number of calls to `open_doc/3`
  • Loading branch information
jiahuili430 authored Mar 12, 2024
1 parent 5c6aeab commit 2d12ab0
Show file tree
Hide file tree
Showing 5 changed files with 1,542 additions and 175 deletions.
81 changes: 68 additions & 13 deletions src/chttpd/test/eunit/chttpd_changes_test.erl
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,18 @@ changes_include_docs_test_() ->
]
}.

changes_open_doc_times_test_() ->
{
foreach,
fun setup_basic/0,
fun teardown_basic/1,
[
?TDEF_FE(t_selector_open_doc_times),
?TDEF_FE(t_js_filter_open_doc_times),
?TDEF_FE(t_view_open_doc_times)
]
}.

t_basic({_, DbUrl}) ->
Res = {Seq, Pending, Rows} = changes(DbUrl),
?assertEqual(8, Seq),
Expand Down Expand Up @@ -853,8 +865,39 @@ t_view_all_docs_conflicts_include_docs({_, DbUrl}) ->
?assertEqual(Res1, Res2),
delete_ddocs(DDocUrl, Rev).

% Utility functions
t_selector_open_doc_times({_, DbUrl}) ->
Body = #{<<"selector">> => #{<<"_id">> => ?DOC1}},
F = "?filter=_selector",
I = "&include_docs=true",
S = "&style=all_docs",
Filter = called_times(fun() -> changes_post(DbUrl, Body, F) end, DbUrl),
FilterDocs = called_times(fun() -> changes_post(DbUrl, Body, F ++ I) end, DbUrl),
FilterAllDocs = called_times(fun() -> changes_post(DbUrl, Body, F ++ I ++ S) end, DbUrl),
?assertEqual({3, 3, 5}, {Filter, FilterDocs, FilterAllDocs}).

t_js_filter_open_doc_times({_, DbUrl}) ->
{DDocUrl, Rev} = create_ddocs(DbUrl, ?DOC1, custom),
F = "?filter=filters/f",
I = "&include_docs=true",
S = "&style=all_docs",
Filter = called_times(fun() -> changes(DbUrl, F) end, DbUrl),
FilterDocs = called_times(fun() -> changes(DbUrl, F ++ I) end, DbUrl),
FilterAllDocs = called_times(fun() -> changes(DbUrl, F ++ I ++ S) end, DbUrl),
?assertEqual({4, 4, 6}, {Filter, FilterDocs, FilterAllDocs}),
delete_ddocs(DDocUrl, Rev).

t_view_open_doc_times({_, DbUrl}) ->
{DDocUrl, Rev} = create_ddocs(DbUrl, ?DOC1, view),
F = "?filter=_view&view=views/v",
I = "&include_docs=true",
S = "&style=all_docs",
Filter = called_times(fun() -> changes(DbUrl, F) end, DbUrl),
FilterDocs = called_times(fun() -> changes(DbUrl, F ++ I) end, DbUrl),
FilterAllDocs = called_times(fun() -> changes(DbUrl, F ++ S ++ I) end, DbUrl),
?assertEqual({4, 4, 6}, {Filter, FilterDocs, FilterAllDocs}),
delete_ddocs(DDocUrl, Rev).

% Utility functions
setup_ctx(DbCreateParams) ->
Ctx = test_util:start_couch([chttpd]),
Hashed = couch_passwords:hash_admin_password(?PASS),
Expand Down Expand Up @@ -883,6 +926,7 @@ setup_basic() ->
CfgKey = "changes_doc_ids_optimization_threshold",
ok = config:set("couchdb", CfgKey, "2", _Persist = false),
meck:new(couch_changes, [passthrough]),
meck:new(couch_db, [passthrough]),
{Ctx, DbUrl}.

teardown_basic({Ctx, DbUrl}) ->
Expand All @@ -891,20 +935,11 @@ teardown_basic({Ctx, DbUrl}) ->
teardown_ctx({Ctx, DbUrl}).

create_db(Top, Db, Params) ->
case req(put, Top ++ Db ++ Params) of
{201, #{}} ->
ok;
Error ->
error({failed_to_create_test_db, Db, Error})
end.
{201, #{}} = req(put, Top ++ Db ++ Params),
ok.

delete_db(DbUrl) ->
case req(delete, DbUrl) of
{200, #{}} ->
ok;
Error ->
error({failed_to_delete_test_db, DbUrl, Error})
end.
{200, #{}} = req(delete, DbUrl).

doc_fun({Id, Revs, Deleted}) ->
Doc = #{
Expand Down Expand Up @@ -1033,3 +1068,23 @@ seq(<<_/binary>> = Seq) ->
binary_to_integer(NumStr);
seq(null) ->
null.

called_times(ReqFun, DbUrl) ->
meck:reset(couch_db),
ReqFun(),
open_doc_calls(DbUrl).

open_doc_calls(DbUrl) ->
#{path := "/" ++ DbName0} = uri_string:parse(DbUrl),
DbName = ?l2b(DbName0),
FoldFun =
fun([Db, IdOrDocInfo, _Opts], Acc) ->
case {mem3:dbname(couch_db:name(Db)), IdOrDocInfo} of
{DbName, #doc_info{}} -> Acc + 1;
_ -> Acc
end
end,
lists:foldl(FoldFun, 0, meck_history(couch_db, open_doc, 3)).

meck_history(Mod, Fun, Arity) ->
[A || {_Pid, {_M, F, A}, _R} <- meck:history(Mod), F =:= Fun, length(A) =:= Arity].
112 changes: 78 additions & 34 deletions src/couch/src/couch_changes.erl
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
wait_updated/3,
get_rest_updated/1,
configure_filter/4,
filter/3,
filter/5,
handle_db_event/3,
handle_view_event/3,
send_changes_doc_ids/6,
Expand Down Expand Up @@ -225,44 +225,71 @@ configure_filter(FilterName, Style, Req, Db) ->
throw({bad_request, Msg})
end.

filter(Db, #full_doc_info{} = FDI, Filter) ->
filter(Db, couch_doc:to_doc_info(FDI), Filter);
filter(_Db, DocInfo, {default, Style}) ->
apply_style(DocInfo, Style);
filter(_Db, DocInfo, {doc_ids, Style, DocIds}) ->
case lists:member(DocInfo#doc_info.id, DocIds) of
true ->
apply_style(DocInfo, Style);
false ->
[]
end;
filter(Db, DocInfo, {selector, Style, {Selector, _Fields}}) ->
Docs = open_revs(Db, DocInfo, Style),
Passes = [
mango_selector:match(Selector, couch_doc:to_json_obj(Doc, []))
|| Doc <- Docs
],
filter_revs(Passes, Docs);
filter(_Db, DocInfo, {design_docs, Style}) ->
case DocInfo#doc_info.id of
<<"_design", _/binary>> ->
apply_style(DocInfo, Style);
_ ->
[]
filter(Db, #full_doc_info{} = FDI, Filter, IncludeDocs, Conflicts) ->
filter(Db, couch_doc:to_doc_info(FDI), Filter, IncludeDocs, Conflicts);
filter(_Db, DocInfo, {default, Style}, _IncludeDocs, _Conflicts) ->
{[], apply_style(DocInfo, Style)};
filter(_Db, DocInfo, {doc_ids, Style, DocIds}, _IncludeDocs, _Conflicts) ->
Revs =
case lists:member(DocInfo#doc_info.id, DocIds) of
true ->
apply_style(DocInfo, Style);
false ->
[]
end,
{[], Revs};
filter(_Db, DocInfo, {design_docs, Style}, _IncludeDocs, _Conflicts) ->
Revs =
case DocInfo#doc_info.id of
<<"_design", _/binary>> ->
apply_style(DocInfo, Style);
_ ->
[]
end,
{[], Revs};
filter(Db, DocInfo, {selector, all_docs, {Selector, _Fields}}, true, true) ->
{DocWin, Docs} = open_all_revs_include_doc(Db, DocInfo),
Passes = [mango_selector:match(Selector, couch_doc:to_json_obj(Doc, [])) || Doc <- Docs],
{DocWin, filter_revs(Passes, Docs)};
filter(Db, DocInfo, {selector, Style, {Selector, _Fields}}, IncludeDocs, Conflicts) ->
Docs = open_revs(Db, DocInfo, Style, Conflicts),
Passes = [mango_selector:match(Selector, couch_doc:to_json_obj(Doc, [])) || Doc <- Docs],
case IncludeDocs of
true -> {Docs, filter_revs(Passes, Docs)};
false -> {[], filter_revs(Passes, Docs)}
end;
filter(Db, DocInfo, {view, Style, DDoc, VName}) ->
Docs = open_revs(Db, DocInfo, Style),
filter(Db, DocInfo, {view, all_docs, DDoc, VName}, true, true) ->
{DocWin, Docs} = open_all_revs_include_doc(Db, DocInfo),
{ok, Passes} = couch_query_servers:filter_view(Db, DDoc, VName, Docs),
filter_revs(Passes, Docs);
filter(Db, DocInfo, {custom, Style, Req0, DDoc, FName}) ->
{DocWin, filter_revs(Passes, Docs)};
filter(Db, DocInfo, {view, Style, DDoc, VName}, IncludeDocs, Conflicts) ->
Docs = open_revs(Db, DocInfo, Style, Conflicts),
{ok, Passes} = couch_query_servers:filter_view(Db, DDoc, VName, Docs),
case IncludeDocs of
true -> {Docs, filter_revs(Passes, Docs)};
false -> {[], filter_revs(Passes, Docs)}
end;
filter(Db, DocInfo, {custom, all_docs, Req0, DDoc, FName}, true, true) ->
Req =
case Req0 of
{json_req, _} -> Req0;
#httpd{} -> {json_req, chttpd_external:json_req_obj(Req0, Db)}
end,
{DocWin, Docs} = open_all_revs_include_doc(Db, DocInfo),
{ok, Passes} = couch_query_servers:filter_docs(Req, Db, DDoc, FName, Docs),
{DocWin, filter_revs(Passes, Docs)};
filter(Db, DocInfo, {custom, Style, Req0, DDoc, FName}, IncludeDocs, Conflicts) ->
Req =
case Req0 of
{json_req, _} -> Req0;
#httpd{} -> {json_req, chttpd_external:json_req_obj(Req0, Db)}
end,
Docs = open_revs(Db, DocInfo, Style),
Docs = open_revs(Db, DocInfo, Style, Conflicts),
{ok, Passes} = couch_query_servers:filter_docs(Req, Db, DDoc, FName, Docs),
filter_revs(Passes, Docs).
case IncludeDocs of
true -> {Docs, filter_revs(Passes, Docs)};
false -> {[], filter_revs(Passes, Docs)}
end.

get_view_qs({json_req, {Props}}) ->
{Query} = couch_util:get_value(<<"query">>, Props, {[]}),
Expand Down Expand Up @@ -357,17 +384,32 @@ apply_style(#doc_info{revs = Revs}, main_only) ->
apply_style(#doc_info{revs = Revs}, all_docs) ->
[{[{<<"rev">>, couch_doc:rev_to_str(R)}]} || #rev_info{rev = R} <- Revs].

open_revs(Db, DocInfo, Style) ->
open_revs(Db, DocInfo, Style, Conflicts) ->
DocInfos =
case Style of
main_only -> [DocInfo];
all_docs -> [DocInfo#doc_info{revs = [R]} || R <- DocInfo#doc_info.revs]
end,
OpenOpts = [deleted, conflicts],
OpenOpts =
case Conflicts of
true -> [deleted, conflicts];
false -> [deleted]
end,
% Relying on list comprehensions to silence errors
OpenResults = [couch_db:open_doc(Db, DI, OpenOpts) || DI <- DocInfos],
[Doc || {ok, Doc} <- OpenResults].

open_all_revs_include_doc(Db, DocInfo) ->
DocInfos = [DocInfo#doc_info{revs = [R]} || R <- DocInfo#doc_info.revs],
OpenOpts = [deleted, conflicts],
OpenResults = [couch_db:open_doc(Db, DI, OpenOpts) || DI <- DocInfos],
Docs = [Doc || {ok, Doc} <- OpenResults],
[Doc1 | RestDocs] = Docs,
RestRevs = [Doc#doc.revs || Doc <- RestDocs, Doc#doc.deleted =:= false],
Conflicts = [{conflicts, [{Pos, RevId} || {Pos, [RevId]} <- RestRevs]}],
Doc2 = Doc1#doc{meta = Conflicts},
{[Doc2], Docs}.

filter_revs(Passes, Docs) ->
lists:flatmap(
fun
Expand Down Expand Up @@ -611,12 +653,14 @@ changes_enumerator(Value, Acc) ->
prepend = Prepend,
user_acc = UserAcc,
limit = Limit,
include_docs = IncludeDocs,
conflicts = Conflicts,
resp_type = ResponseType,
db = Db,
timeout = Timeout,
timeout_fun = TimeoutFun
} = maybe_upgrade_changes_acc(Acc),
Results0 = filter(Db, Value, Filter),
{_, Results0} = filter(Db, Value, Filter, IncludeDocs, Conflicts),
Results = [Result || Result <- Results0, Result /= null],
Seq =
case Value of
Expand Down
Loading

0 comments on commit 2d12ab0

Please sign in to comment.