Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(clients): expose waitForTasks to batch helpers [skip-bc] #4030

Merged
merged 28 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ jobs:
runs-on: ubuntu-22.04
timeout-minutes: 10
env:
CACHE_VERSION: 1.02 # bump this to run all clients on the CI.
CACHE_VERSION: 1.10 # bump this to run all clients on the CI.
steps:
- name: debugging - dump GitHub context
env:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,35 +166,38 @@ public partial interface ISearchClient
/// </summary>
/// <param name="indexName">The index in which to perform the request.</param>
/// <param name="objects">The list of `objects` to store in the given Algolia `indexName`.</param>
/// <param name="waitForTasks">Whether or not we should wait until every `batch` tasks has been processed, this operation may slow the total execution time of this method but is more reliable..</param>
/// <param name="options">Add extra http header or query parameters to Algolia.</param>
/// <param name="cancellationToken">Cancellation Token to cancel the request.</param>
/// <typeparam name="T"></typeparam>
Task<List<BatchResponse>> SaveObjectsAsync<T>(string indexName, IEnumerable<T> objects, RequestOptions options = null, CancellationToken cancellationToken = default) where T : class;
Task<List<BatchResponse>> SaveObjectsAsync<T>(string indexName, IEnumerable<T> objects, bool waitForTasks = false, RequestOptions options = null, CancellationToken cancellationToken = default) where T : class;
/// <inheritdoc cref="SaveObjectsAsync{T}(string, IEnumerable{T}, RequestOptions, CancellationToken)"/>
List<BatchResponse> SaveObjects<T>(string indexName, IEnumerable<T> objects, RequestOptions options = null, CancellationToken cancellationToken = default) where T : class;
List<BatchResponse> SaveObjects<T>(string indexName, IEnumerable<T> objects, bool waitForTasks = false, RequestOptions options = null, CancellationToken cancellationToken = default) where T : class;

/// <summary>
/// Helper: Deletes every records for the given objectIDs. The `chunkedBatch` helper is used under the hood, which creates a `batch` requests with at most 1000 objectIDs in it.
/// </summary>
/// <param name="indexName">The index in which to perform the request.</param>
/// <param name="objectIDs">The list of `objectIDs` to remove from the given Algolia `indexName`.</param>
/// <param name="waitForTasks">Whether or not we should wait until every `batch` tasks has been processed, this operation may slow the total execution time of this method but is more reliable..</param>
/// <param name="options">Add extra http header or query parameters to Algolia.</param>
/// <param name="cancellationToken">Cancellation Token to cancel the request.</param>
Task<List<BatchResponse>> DeleteObjectsAsync(string indexName, IEnumerable<String> objectIDs, RequestOptions options = null, CancellationToken cancellationToken = default);
Task<List<BatchResponse>> DeleteObjectsAsync(string indexName, IEnumerable<String> objectIDs, bool waitForTasks = false, RequestOptions options = null, CancellationToken cancellationToken = default);
/// <inheritdoc cref="DeleteObjectsAsync(string, IEnumerable{String}, RequestOptions, CancellationToken)"/>
List<BatchResponse> DeleteObjects(string indexName, IEnumerable<String> objectIDs, RequestOptions options = null, CancellationToken cancellationToken = default);
List<BatchResponse> DeleteObjects(string indexName, IEnumerable<String> objectIDs, bool waitForTasks = false, RequestOptions options = null, CancellationToken cancellationToken = default);

/// <summary>
/// Helper: Replaces object content of all the given objects according to their respective `objectID` field. The `chunkedBatch` helper is used under the hood, which creates a `batch` requests with at most 1000 objects in it.
/// </summary>
/// <param name="indexName">The index in which to perform the request.</param>
/// <param name="objects">The list of `objects` to update in the given Algolia `indexName`.</param>
/// <param name="createIfNotExists">To be provided if non-existing objects are passed, otherwise, the call will fail.</param>
/// <param name="waitForTasks">Whether or not we should wait until every `batch` tasks has been processed, this operation may slow the total execution time of this method but is more reliable..</param>
/// <param name="options">Add extra http header or query parameters to Algolia.</param>
/// <param name="cancellationToken">Cancellation Token to cancel the request.</param>
Task<List<BatchResponse>> PartialUpdateObjectsAsync<T>(string indexName, IEnumerable<T> objects, bool createIfNotExists, RequestOptions options = null, CancellationToken cancellationToken = default) where T : class;
Task<List<BatchResponse>> PartialUpdateObjectsAsync<T>(string indexName, IEnumerable<T> objects, bool createIfNotExists, bool waitForTasks = false, RequestOptions options = null, CancellationToken cancellationToken = default) where T : class;
/// <inheritdoc cref="PartialUpdateObjectsAsync{T}(string, IEnumerable{T}, bool, RequestOptions, CancellationToken)"/>
List<BatchResponse> PartialUpdateObjects<T>(string indexName, IEnumerable<T> objects, bool createIfNotExists, RequestOptions options = null, CancellationToken cancellationToken = default) where T : class;
List<BatchResponse> PartialUpdateObjects<T>(string indexName, IEnumerable<T> objects, bool createIfNotExists, bool waitForTasks = false, RequestOptions options = null, CancellationToken cancellationToken = default) where T : class;

/// <summary>
/// Helper: Check if an index exists.
Expand Down Expand Up @@ -564,42 +567,44 @@ public List<BatchResponse> ChunkedBatch<T>(string indexName, IEnumerable<T> obje

/// <inheritdoc/>
public async Task<List<BatchResponse>> SaveObjectsAsync<T>(string indexName, IEnumerable<T> objects,
bool waitForTasks = false,
RequestOptions options = null,
CancellationToken cancellationToken = default) where T : class
{
return await ChunkedBatchAsync(indexName, objects, Action.AddObject, false, 1000, options, cancellationToken).ConfigureAwait(false);
return await ChunkedBatchAsync(indexName, objects, Action.AddObject, waitForTasks, 1000, options, cancellationToken).ConfigureAwait(false);
}

/// <inheritdoc/>
public List<BatchResponse> SaveObjects<T>(string indexName, IEnumerable<T> objects, RequestOptions options = null,
public List<BatchResponse> SaveObjects<T>(string indexName, IEnumerable<T> objects, bool waitForTasks = false, RequestOptions options = null,
CancellationToken cancellationToken = default) where T : class =>
AsyncHelper.RunSync(() => SaveObjectsAsync(indexName, objects, options, cancellationToken));
AsyncHelper.RunSync(() => SaveObjectsAsync(indexName, objects, waitForTasks, options, cancellationToken));

/// <inheritdoc/>
public async Task<List<BatchResponse>> DeleteObjectsAsync(string indexName, IEnumerable<String> objectIDs,
bool waitForTasks = false,
RequestOptions options = null,
CancellationToken cancellationToken = default)
{
return await ChunkedBatchAsync(indexName, objectIDs.Select(id => new { objectID = id }), Action.DeleteObject, false, 1000, options, cancellationToken).ConfigureAwait(false);
return await ChunkedBatchAsync(indexName, objectIDs.Select(id => new { objectID = id }), Action.DeleteObject, waitForTasks, 1000, options, cancellationToken).ConfigureAwait(false);
}

/// <inheritdoc/>
public List<BatchResponse> DeleteObjects(string indexName, IEnumerable<String> objectIDs, RequestOptions options = null,
public List<BatchResponse> DeleteObjects(string indexName, IEnumerable<String> objectIDs, bool waitForTasks = false, RequestOptions options = null,
CancellationToken cancellationToken = default) =>
AsyncHelper.RunSync(() => DeleteObjectsAsync(indexName, objectIDs, options, cancellationToken));
AsyncHelper.RunSync(() => DeleteObjectsAsync(indexName, objectIDs, waitForTasks, options, cancellationToken));

/// <inheritdoc/>
public async Task<List<BatchResponse>> PartialUpdateObjectsAsync<T>(string indexName, IEnumerable<T> objects, bool createIfNotExists,
public async Task<List<BatchResponse>> PartialUpdateObjectsAsync<T>(string indexName, IEnumerable<T> objects, bool createIfNotExists, bool waitForTasks = false,
RequestOptions options = null,
CancellationToken cancellationToken = default) where T : class
{
return await ChunkedBatchAsync(indexName, objects, createIfNotExists ? Action.PartialUpdateObject : Action.PartialUpdateObjectNoCreate, false, 1000, options, cancellationToken).ConfigureAwait(false);
return await ChunkedBatchAsync(indexName, objects, createIfNotExists ? Action.PartialUpdateObject : Action.PartialUpdateObjectNoCreate, waitForTasks, 1000, options, cancellationToken).ConfigureAwait(false);
}

/// <inheritdoc/>
public List<BatchResponse> PartialUpdateObjects<T>(string indexName, IEnumerable<T> objects, bool createIfNotExists,
public List<BatchResponse> PartialUpdateObjects<T>(string indexName, IEnumerable<T> objects, bool createIfNotExists, bool waitForTasks = false,
RequestOptions options = null, CancellationToken cancellationToken = default) where T : class =>
AsyncHelper.RunSync(() => PartialUpdateObjectsAsync(indexName, objects, createIfNotExists, options, cancellationToken));
AsyncHelper.RunSync(() => PartialUpdateObjectsAsync(indexName, objects, createIfNotExists, waitForTasks, options, cancellationToken));

private static async Task<List<TU>> CreateIterable<TU>(Func<TU, Task<TU>> executeQuery,
Func<TU, bool> stopCondition)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,20 +369,22 @@ public suspend fun SearchClient.chunkedBatch(
*
* @param indexName The index in which to perform the request.
* @param objects The list of objects to index.
* @param waitForTask If true, wait for the task to complete.
* @param requestOptions The requestOptions to send along with the query, they will be merged with the transporter requestOptions.
* @return The list of responses from the batch requests.
*
*/
public suspend fun SearchClient.saveObjects(
indexName: String,
objects: List<JsonObject>,
waitForTask: Boolean = false,
requestOptions: RequestOptions? = null,
): List<BatchResponse> {
return this.chunkedBatch(
indexName = indexName,
objects = objects,
action = Action.AddObject,
waitForTask = false,
waitForTask = waitForTask,
batchSize = 1000,
requestOptions = requestOptions,
)
Expand All @@ -393,20 +395,22 @@ public suspend fun SearchClient.saveObjects(
*
* @param indexName The index in which to perform the request.
* @param objectIDs The list of objectIDs to delete from the index.
* @param waitForTask If true, wait for the task to complete.
* @param requestOptions The requestOptions to send along with the query, they will be merged with the transporter requestOptions.
* @return The list of responses from the batch requests.
*
*/
public suspend fun SearchClient.deleteObjects(
indexName: String,
objectIDs: List<String>,
waitForTask: Boolean = false,
requestOptions: RequestOptions? = null,
): List<BatchResponse> {
return this.chunkedBatch(
indexName = indexName,
objects = objectIDs.map { id -> JsonObject(mapOf("objectID" to Json.encodeToJsonElement(id))) },
action = Action.DeleteObject,
waitForTask = false,
waitForTask = waitForTask,
batchSize = 1000,
requestOptions = requestOptions,
)
Expand All @@ -418,6 +422,7 @@ public suspend fun SearchClient.deleteObjects(
* @param indexName The index in which to perform the request.
* @param objects The list of objects to update in the index.
* @param createIfNotExists To be provided if non-existing objects are passed, otherwise, the call will fail..
* @param waitForTask If true, wait for the task to complete.
* @param requestOptions The requestOptions to send along with the query, they will be merged with the transporter requestOptions.
* @return The list of responses from the batch requests.
*
Expand All @@ -426,13 +431,14 @@ public suspend fun SearchClient.partialUpdateObjects(
indexName: String,
objects: List<JsonObject>,
createIfNotExists: Boolean,
waitForTask: Boolean = false,
requestOptions: RequestOptions? = null,
): List<BatchResponse> {
return this.chunkedBatch(
indexName = indexName,
objects = objects,
action = if (createIfNotExists) Action.PartialUpdateObject else Action.PartialUpdateObjectNoCreate,
waitForTask = false,
waitForTask = waitForTask,
batchSize = 1000,
requestOptions = requestOptions,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@ package object extension {
* The index in which to perform the request.
* @param objects
* The list of objects to save.
* @param waitForTasks
* Whether to wait for the tasks to complete.
* @param requestOptions
* Additional request configuration.
* @return
Expand All @@ -259,9 +261,10 @@ package object extension {
def saveObjects(
indexName: String,
objects: Seq[Any],
waitForTasks: Boolean = false,
requestOptions: Option[RequestOptions] = None
)(implicit ec: ExecutionContext): Future[Seq[BatchResponse]] = {
chunkedBatch(indexName, objects, Action.AddObject, false, 1000, requestOptions)
chunkedBatch(indexName, objects, Action.AddObject, waitForTasks, 1000, requestOptions)
}

/** Helper: Deletes every objects for the given objectIDs. The `chunkedBatch` helper is used under the hood, which
Expand All @@ -271,6 +274,8 @@ package object extension {
* The index in which to perform the request.
* @param objectIDs
* The list of objectIDs to delete.
* @param waitForTasks
* Whether to wait for the tasks to complete.
* @param requestOptions
* Additional request configuration.
* @return
Expand All @@ -279,13 +284,14 @@ package object extension {
def deleteObjects(
indexName: String,
objectIDs: Seq[String],
waitForTasks: Boolean = false,
requestOptions: Option[RequestOptions] = None
)(implicit ec: ExecutionContext): Future[Seq[BatchResponse]] = {
chunkedBatch(
indexName,
objectIDs.map(id => new { val objectID: String = id }),
Action.DeleteObject,
false,
waitForTasks,
1000,
requestOptions
)
Expand All @@ -300,6 +306,8 @@ package object extension {
* The list of objects to save.
* @param createIfNotExists
* To be provided if non-existing objects are passed, otherwise, the call will fail.
* @param waitForTasks
* Whether to wait for the tasks to complete.
* @param requestOptions
* Additional request configuration.
* @return
Expand All @@ -309,13 +317,14 @@ package object extension {
indexName: String,
objects: Seq[Any],
createIfNotExists: Boolean = false,
waitForTasks: Boolean = false,
requestOptions: Option[RequestOptions] = None
)(implicit ec: ExecutionContext): Future[Seq[BatchResponse]] = {
chunkedBatch(
indexName,
objects,
if (createIfNotExists) Action.PartialUpdateObject else Action.PartialUpdateObjectNoCreate,
false,
waitForTasks,
1000,
requestOptions
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,18 +463,20 @@ public extension SearchClient {
/// which creates a `batch` requests with at most 1000 objects in it.
/// - parameter indexName: The name of the index where to save the objects
/// - parameter objects: The new objects
/// - parameter waitForTasks: If we should wait for the batch task to be finished before processing the next one
/// - parameter requestOptions: The request options
/// - returns: [BatchResponse]
func saveObjects(
indexName: String,
objects: [some Encodable],
waitForTasks: Bool = false,
requestOptions: RequestOptions? = nil
) async throws -> [BatchResponse] {
try await self.chunkedBatch(
indexName: indexName,
objects: objects,
action: .addObject,
waitForTasks: false,
waitForTasks: waitForTasks,
batchSize: 1000,
requestOptions: requestOptions
)
Expand All @@ -484,18 +486,20 @@ public extension SearchClient {
/// creates a `batch` requests with at most 1000 objectIDs in it.
/// - parameter indexName: The name of the index to delete objectIDs from
/// - parameter objectIDs: The objectIDs to delete
/// - parameter waitForTasks: If we should wait for the batch task to be finished before processing the next one
/// - parameter requestOptions: The request options
/// - returns: [BatchResponse]
func deleteObjects(
indexName: String,
objectIDs: [String],
waitForTasks: Bool = false,
requestOptions: RequestOptions? = nil
) async throws -> [BatchResponse] {
try await self.chunkedBatch(
indexName: indexName,
objects: objectIDs.map { AnyCodable(["objectID": $0]) },
action: .deleteObject,
waitForTasks: false,
waitForTasks: waitForTasks,
batchSize: 1000,
requestOptions: requestOptions
)
Expand All @@ -507,19 +511,21 @@ public extension SearchClient {
/// - parameter objects: The objects to update
/// - parameter createIfNotExists: To be provided if non-existing objects are passed, otherwise, the call will
/// fail..
/// - parameter waitForTasks: If we should wait for the batch task to be finished before processing the next one
/// - parameter requestOptions: The request options
/// - returns: [BatchResponse]
func partialUpdateObjects(
indexName: String,
objects: [some Encodable],
createIfNotExists: Bool = false,
waitForTasks: Bool = false,
requestOptions: RequestOptions? = nil
) async throws -> [BatchResponse] {
try await self.chunkedBatch(
indexName: indexName,
objects: objects,
action: createIfNotExists ? .partialUpdateObject : .partialUpdateObjectNoCreate,
waitForTasks: false,
waitForTasks: waitForTasks,
batchSize: 1000,
requestOptions: requestOptions
)
Expand Down
20 changes: 10 additions & 10 deletions playground/python/app/search.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,30 @@
from asyncio import run
from os import environ

from algoliasearch.search.client import SearchClient
from algoliasearch.search.client import SearchClientSync
from algoliasearch.search import __version__
from dotenv import load_dotenv

load_dotenv("../.env")


async def main():
def main():
print("SearchClient version", __version__)

client = SearchClient(
client = SearchClientSync(
environ.get("ALGOLIA_APPLICATION_ID"), environ.get("ALGOLIA_ADMIN_KEY")
)
print("client initialized", client)

try:
resp = await client.search(search_method_params={
"requests": [{"indexName": "api-clients-automation"}]
})
print(resp.to_dict())
resp = client.save_objects("foo", [{"foo": "bar"}])
print(resp)

for r in resp:
client.wait_for_task(index_name="foo", task_id=r.task_id)
finally:
await client.close()
client.close()

print("client closed")


run(main())
main()
Loading
Loading