Skip to content

Commit

Permalink
Extract code into run_tasks_in_parallel helper
Browse files Browse the repository at this point in the history
  • Loading branch information
arcusfelis committed May 14, 2024
1 parent 29775e9 commit e25fee6
Showing 1 changed file with 15 additions and 12 deletions.
27 changes: 15 additions & 12 deletions src/mod_last_rdbms.erl
Original file line number Diff line number Diff line change
Expand Up @@ -123,25 +123,28 @@ sessions_cleanup(HostType, Sessions) ->
Seconds = erlang:system_time(second),
%% server, username, seconds, state
Records = [[S, U, Seconds, <<>>] || #session{usr = {U, S, _}} <- Sessions],
UpdateParams = [Seconds, <<>>],
AllTasks = prepare_cleanup_tasks(Records),
RunTaskF = fun({Count, QueryName, InsertParams}) ->
run_upsert(HostType, Count, QueryName, InsertParams, UpdateParams)
end,
run_tasks_in_parallel(RunTaskF, AllTasks).

prepare_cleanup_tasks(Records) ->
%% PgSQL would complain if there are duplicates (i.e. when there are two sessions
%% with the same name but different resources)
Records2 = lists:usort(Records),
UpdateParams = [Seconds, <<>>],
{Singles, Many100} = bucketize(100, Records2),
{Singles2, Many10} = bucketize(10, Singles),
%% Prepare data for queries
AllTasks =
[{100, last_upsert_many100, lists:append(Batch)} || Batch <- Many100] ++
[{10, last_upsert_many10, lists:append(Batch)} || Batch <- Many10] ++
[{1, last_upsert, Rec} || Rec <- Singles2],
RunTask = fun({Count, QueryName, InsertParams}) ->
run_upsert(HostType, Count, QueryName, InsertParams, UpdateParams)
end,
%% Run tasks in parallel
RunTasks = fun(Tasks) -> lists:map(RunTask, Tasks) end,
[{100, last_upsert_many100, lists:append(Batch)} || Batch <- Many100] ++
[{10, last_upsert_many10, lists:append(Batch)} || Batch <- Many10] ++
[{1, last_upsert, Rec} || Rec <- Singles2].

run_tasks_in_parallel(RunTaskF, AllTasks) ->
Workers = 8,
TasksForWorkers = spread(Workers, AllTasks),
Results = mongoose_lib:pmap(RunTasks, TasksForWorkers, timer:minutes(1)),
RunTasksF = fun(Tasks) -> lists:map(RunTaskF, Tasks) end,
Results = mongoose_lib:pmap(RunTasksF, TasksForWorkers, timer:minutes(1)),
[check_result(Res) || Res <- Results],
ok.

Expand Down

0 comments on commit e25fee6

Please sign in to comment.