Skip to content

Commit

Permalink
Merge pull request #36 from rabbitmq/concurrency-step
Browse files Browse the repository at this point in the history
Add support for `step` concurrency option
  • Loading branch information
max-au authored May 17, 2024
2 parents 93509bc + 1dda45f commit 1640b14
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 9 deletions.
13 changes: 8 additions & 5 deletions src/erlperf.erl
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,14 @@
-type concurrency_test() :: #{
threshold => pos_integer(),
min => pos_integer(),
step => pos_integer(),
max => pos_integer()
}.
%% Concurrency estimation mode options.
%%
%% <ul>
%% <li>`min': initial number of workers, default is 1</li>
%% <li>`step': increase the number of workers by this value on each iteration, default is 1</li>
%% <li>`max': maximum number of workers, defaults to `erlang:system_info(process_limit) - 1000'</li>
%% <li>`threshold': stop concurrency run when adding this amount of workers does
%% not result in further total throughput increase. Default is 3</li>
Expand Down Expand Up @@ -418,7 +420,7 @@ record(Module, Function, Arity, TimeMs) ->
concurrency_mode_defaults(undefined) ->
undefined;
concurrency_mode_defaults(ConOpts) ->
maps:merge(#{min => 1, max => erlang:system_info(process_limit) - 1000, threshold => 3}, ConOpts).
maps:merge(#{min => 1, step => 1, max => erlang:system_info(process_limit) - 1000, threshold => 3}, ConOpts).

run_options_defaults(RunOptions) ->
maps:merge(#{
Expand Down Expand Up @@ -649,7 +651,7 @@ difference([S, F | Tail]) ->
%% Test considered complete when either:
%% * maximum number of workers reached
%% * last 'threshold' added workers did not increase throughput
estimate_concurrency(Jobs, Options, #{threshold := Threshold, max := Max} = ConOpts, Handles, Current, History, QMax) ->
estimate_concurrency(Jobs, Options, #{threshold := Threshold, step := Step, max := Max} = ConOpts, Handles, Current, History, QMax) ->
RunOptions = Options#{concurrency => Current},
[Report] = benchmark_impl(Jobs, RunOptions, undefined, Handles),
#{result := Result0} = Report,
Expand All @@ -658,6 +660,8 @@ estimate_concurrency(Jobs, Options, #{threshold := Threshold, max := Max} = ConO
QPS = lists:sum(difference(Samples)) div (length(Samples) - 1),
Result = Result0#{average => QPS},
NewHistory = [{Current, Result} | History],
%% this gives us nice round numbers (eg. with step of 10, we'll have [1, 10, 20...])
Next = (Current + Step) div Step * Step,
%% test if we are at Max concurrency, or saturated the node
case maxed(QPS, Current, QMax, Threshold) of
true ->
Expand All @@ -667,13 +671,13 @@ estimate_concurrency(Jobs, Options, #{threshold := Threshold, max := Max} = ConO
{BestConcurrency, BestResult} = lists:keyfind(BestConcurrency, 1, History),
#{mode => concurrency, result => BestResult, history => NewHistory, sleep => SleepMethod,
concurrency_options => ConOpts, run_options => Options#{concurrency => BestConcurrency}};
_NewQMax when Current =:= Max ->
_NewQMax when Next > Max ->
#{sleep := SleepMethod} = Report,
#{mode => concurrency, result => Result, history => NewHistory, sleep => SleepMethod,
concurrency_options => ConOpts, run_options => RunOptions};
NewQMax ->
% need more workers
estimate_concurrency(Jobs, RunOptions, ConOpts, Handles, Current + 1, NewHistory, NewQMax)
estimate_concurrency(Jobs, RunOptions, ConOpts, Handles, Next, NewHistory, NewQMax)
end.

maxed(QPS, Current, {Q, _}, _) when QPS > Q ->
Expand All @@ -683,7 +687,6 @@ maxed(_, Current, {_, W}, Count) when Current - W >= Count ->
maxed(_, _, QMax, _) ->
QMax.


multicall_result([], Acc) ->
lists:reverse(Acc);
multicall_result([{Pid, Ref} | Proxies], Acc) ->
Expand Down
5 changes: 4 additions & 1 deletion src/erlperf_cli.erl
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ determine_mode(#{concurrency_estimation := true} = ParsedOpts) ->
length(maps:get(code, ParsedOpts)) > 1 andalso
erlang:error({generic, "Parallel concurrency estimation runs are not supported~n"}),
RunOpts = maps:with([sample_duration, samples, warmup, cv], ParsedOpts),
{RunOpts, maps:with([min, max, threshold], ParsedOpts)};
{RunOpts, maps:with([min, step, max, threshold], ParsedOpts)};

%% continuous mode
determine_mode(ParsedOpts) ->
Expand Down Expand Up @@ -330,6 +330,9 @@ arguments() ->
#{name => min, long => "-min",
help => "start with this amount of processes (1)",
type => {int, [{min, 1}]}},
#{name => step, long => "-step",
help => "increase the number of processes by this value on each iteration (1)",
type => {int, [{min, 1}]}},
#{name => max, long => "-max",
help => "do not exceed this number of processes",
type => {int, [{max, erlang:system_info(process_limit) - 1000}]}},
Expand Down
13 changes: 10 additions & 3 deletions test/erlperf_cli_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
double/1, triple/1, pg/1, mfa/1,
full_report/1, basic_timed_report/1, full_timed_report/1,
recorded/1,
squeeze/0, squeeze/1,
squeeze/0, squeeze/1, step/1,
init_all/0, init_all/1
]).

Expand All @@ -26,7 +26,7 @@ suite() ->
[{timetrap, {seconds, 20}}].

all() ->
[simple, concurrent, verbose, compare, squeeze, usage, init, double,
[simple, concurrent, verbose, compare, squeeze, step, usage, init, double,
triple, pg, mfa, full_report, basic_timed_report, full_timed_report, recorded, init_all].

%%--------------------------------------------------------------------
Expand Down Expand Up @@ -167,14 +167,21 @@ compare(Config) when is_list(Config) ->
squeeze() ->
[{doc, "Tests concurrency test via command line"}, {timetrap, {seconds, 30}}].

% erlperf 'timer:sleep(1).' --sample_duration 50 --squeeze --min 2 --max 4 --threshold 2
% erlperf 'timer:sleep(1).' --duration 50 --squeeze --min 2 --max 4 --threshold 2
squeeze(Config) when is_list(Config) ->
Out = capture_io(
fun () -> erlperf_cli:main(["timer:sleep(1).", "--duration", "50", "--squeeze", "--min", "2", "--max", "4", "--threshold", "2"]) end),
[{_Code, 4, C, T}] = parse_out(Out),
?assert(C > 50 andalso C < 220, {qps, C}),
?assert(T > 1000000 andalso T < 3000000, {time, T}).

% erlperf 'timer:sleep(1).' --duration 50 --squeeze --min 1 --max 25 --step 10
step(Config) when is_list(Config) ->
Out = capture_io(
fun () -> erlperf_cli:main(["timer:sleep(1).", "--duration", "50", "--squeeze", "--min", "1", "--max", "25", "--step", "10"]) end),
[{_Code, 20, C, T}] = parse_out(Out),
?assert(C > 400 andalso C < 600, {qps, C}),
?assert(T > 1000000 andalso T < 3000000, {time, T}).
% erlperf -q
usage(Config) when is_list(Config) ->
Out = capture_io(fun () -> erlperf_cli:main(["-q"]) end),
Expand Down

0 comments on commit 1640b14

Please sign in to comment.