Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add concurrency to the discover #29

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open

add concurrency to the discover #29

wants to merge 3 commits into from

Conversation

keyn4
Copy link

@keyn4 keyn4 commented Mar 11, 2024

Description of change

add concurrency to the discover to make it faster

]
for sobject_name in chunk]
run_concurrently(discover_stream, chunk_args)
start_counter = end_counter

for sobject_name in sorted(objects_to_discover):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we get rid of the for loop now since the work is being done above?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

old loop deleted

Copy link

@butkeraites-hotglue butkeraites-hotglue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent work! Left two suggestions feel free to apply them or not 👍

Comment on lines +318 to +323
results = []

for future in as_completed(all_tasks):
(index, result) = future.result()
# Insert the result in the right index of the list
results.insert(index, result)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
results = []
for future in as_completed(all_tasks):
(index, result) = future.result()
# Insert the result in the right index of the list
results.insert(index, result)
results = [None] * len(fn_args_list) # Preallocate list for correct ordering
for future in as_completed(all_tasks):
index, result = future.result()
results[index] = result

It's just a suggestion as it's not necessary to do this memory allocation dynamically.

Comment on lines +346 to +366
objects_list = sorted(objects_to_discover)
start_counter = 0
concurrency_limit = 25

while start_counter < len(objects_list):
end_counter = start_counter + concurrency_limit
if end_counter >= len(objects_list):
end_counter = len(objects_list)

chunk = objects_list[start_counter:end_counter]
chunk_args = [
[
sf,
sobject_name,
entries,
sf_custom_setting_objects,
object_to_tag_references,
]
for sobject_name in chunk]
run_concurrently(discover_stream, chunk_args)
start_counter = end_counter

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
objects_list = sorted(objects_to_discover)
start_counter = 0
concurrency_limit = 25
while start_counter < len(objects_list):
end_counter = start_counter + concurrency_limit
if end_counter >= len(objects_list):
end_counter = len(objects_list)
chunk = objects_list[start_counter:end_counter]
chunk_args = [
[
sf,
sobject_name,
entries,
sf_custom_setting_objects,
object_to_tag_references,
]
for sobject_name in chunk]
run_concurrently(discover_stream, chunk_args)
start_counter = end_counter
objects_list = sorted(objects_to_discover)
concurrency_limit = 25
for start_counter in range(0, len(objects_list), concurrency_limit):
chunk = objects_list[start_counter:start_counter + concurrency_limit]
chunk_args = [
(sf, sobject_name, entries, sf_custom_setting_objects, object_to_tag_references)
for sobject_name in chunk
]
run_concurrently(discover_stream, chunk_args)

Seems a little bit clearer like that

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants