From 78f5ff2b7b085a9ad117e701ae0c59e1c9e7b27e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Mathieu?= Date: Wed, 20 Nov 2024 14:17:02 +0100 Subject: [PATCH] feat(*): add namespace as a parameter of the internal storage --- .../statestore/StateStoreMigrateCommand.java | 6 +- .../StateStoreMigrateCommandTest.java | 5 +- .../core/runners/DefaultRunContext.java | 4 +- .../io/kestra/core/runners/RunContext.java | 4 + .../pebble/functions/FileSizeFunction.java | 2 +- .../pebble/functions/ReadFileFunction.java | 4 +- .../core/services/ExecutionService.java | 4 +- .../core/storages/InternalNamespace.java | 16 +- .../kestra/core/storages/InternalStorage.java | 20 +- .../core/storages/StorageInterface.java | 37 +-- .../core/storages/kv/InternalKVStore.java | 10 +- .../kestra/plugin/core/flow/ForEachItem.java | 10 +- .../java/io/kestra/plugin/core/kv/Delete.java | 4 +- .../java/io/kestra/plugin/core/kv/Get.java | 2 +- .../io/kestra/plugin/core/kv/GetKeys.java | 2 +- .../io/kestra/plugin/core/storage/Delete.java | 2 +- .../io/kestra/plugin/core/storage/Size.java | 2 +- .../io/kestra/plugin/core/trigger/Toggle.java | 6 +- .../core/models/property/PropertyTest.java | 2 +- .../core/runners/FlowInputOutputTest.java | 2 +- .../io/kestra/core/runners/InputsTest.java | 2 +- .../io/kestra/core/runners/LogToFileTest.java | 2 +- .../kestra/core/runners/RunContextTest.java | 2 +- .../runners/TaskWithAllowFailureTest.java | 1 + .../runners/TaskWithAllowWarningTest.java | 1 + .../functions/FileSizeFunctionTest.java | 2 +- .../pebble/functions/KvFunctionTest.java | 2 +- .../functions/ReadFileFunctionTest.java | 10 +- .../core/storages/InternalKVStoreTest.java | 4 +- .../storages/StorageInterfaceFactoryTest.java | 1 - .../plugin/core/flow/ForEachItemCaseTest.java | 4 +- .../io/kestra/plugin/core/flow/PauseTest.java | 2 +- .../core/flow/WorkingDirectoryTest.java | 5 +- .../kestra/plugin/core/http/DownloadTest.java | 4 +- .../kestra/plugin/core/http/RequestTest.java | 2 + .../core/namespace/UploadFilesTest.java | 2 + .../plugin/core/storage/ConcatTest.java | 3 +- .../plugin/core/storage/DeleteTest.java | 1 + .../plugin/core/storage/LocalFilesTest.java | 13 +- .../plugin/core/storage/ReverseTest.java | 3 +- .../kestra/plugin/core/storage/SizeTest.java | 1 + .../kestra/plugin/core/storage/SplitTest.java | 3 +- .../io/kestra/storage/local/LocalStorage.java | 27 +- .../tasks/runners/AbstractTaskRunnerTest.java | 6 +- .../kestra/core/storage/StorageTestSuite.java | 247 +++++++++--------- .../controllers/api/ExecutionController.java | 40 +-- .../api/NamespaceFileController.java | 32 +-- .../controllers/api/KVControllerTest.java | 18 +- .../api/NamespaceFileControllerTest.java | 70 ++--- 49 files changed, 347 insertions(+), 307 deletions(-) diff --git a/cli/src/main/java/io/kestra/cli/commands/sys/statestore/StateStoreMigrateCommand.java b/cli/src/main/java/io/kestra/cli/commands/sys/statestore/StateStoreMigrateCommand.java index aee9c37c61..417fb99884 100644 --- a/cli/src/main/java/io/kestra/cli/commands/sys/statestore/StateStoreMigrateCommand.java +++ b/cli/src/main/java/io/kestra/cli/commands/sys/statestore/StateStoreMigrateCommand.java @@ -44,7 +44,7 @@ public Integer call() throws Exception { URI.create("/" + flow.getNamespace().replace(".", "/") + "/states") ))).map(potentialStateStoreUrisForAFlow -> Map.entry(potentialStateStoreUrisForAFlow.getKey(), potentialStateStoreUrisForAFlow.getValue().stream().flatMap(uri -> { try { - return storageInterface.allByPrefix(potentialStateStoreUrisForAFlow.getKey().getTenantId(), uri, false).stream(); + return storageInterface.allByPrefix(potentialStateStoreUrisForAFlow.getKey().getTenantId(), potentialStateStoreUrisForAFlow.getKey().getNamespace(), uri, false).stream(); } catch (IOException e) { return Stream.empty(); } @@ -59,9 +59,9 @@ public Integer call() throws Exception { boolean flowScoped = flowQualifierWithStateQualifiers[0].endsWith("/" + flow.getId()); StateStore stateStore = new StateStore(runContext(runContextFactory, flow), false); - try (InputStream is = storageInterface.get(flow.getTenantId(), stateStoreFileUri)) { + try (InputStream is = storageInterface.get(flow.getTenantId(), flow.getNamespace(), stateStoreFileUri)) { stateStore.putState(flowScoped, stateName, stateSubName, taskRunValue, is.readAllBytes()); - storageInterface.delete(flow.getTenantId(), stateStoreFileUri); + storageInterface.delete(flow.getTenantId(), flow.getNamespace(), stateStoreFileUri); } catch (IOException e) { throw new RuntimeException(e); } diff --git a/cli/src/test/java/io/kestra/cli/commands/sys/statestore/StateStoreMigrateCommandTest.java b/cli/src/test/java/io/kestra/cli/commands/sys/statestore/StateStoreMigrateCommandTest.java index 026b8b6e34..01eb03621a 100644 --- a/cli/src/test/java/io/kestra/cli/commands/sys/statestore/StateStoreMigrateCommandTest.java +++ b/cli/src/test/java/io/kestra/cli/commands/sys/statestore/StateStoreMigrateCommandTest.java @@ -52,11 +52,12 @@ void runMigration() throws IOException, ResourceExpiredException { URI oldStateStoreUri = URI.create("/" + flow.getNamespace().replace(".", "/") + "/" + Slugify.of("a-flow") + "/states/my-state/" + Hashing.hashToString("my-taskrun-value") + "/sub-name"); storage.put( tenantId, + flow.getNamespace(), oldStateStoreUri, new ByteArrayInputStream("my-value".getBytes()) ); assertThat( - storage.exists(tenantId, oldStateStoreUri), + storage.exists(tenantId, flow.getNamespace(), oldStateStoreUri), is(true) ); @@ -73,7 +74,7 @@ void runMigration() throws IOException, ResourceExpiredException { assertThat(new String(stateStore.getState(true, "my-state", "sub-name", "my-taskrun-value").readAllBytes()), is("my-value")); assertThat( - storage.exists(tenantId, oldStateStoreUri), + storage.exists(tenantId, flow.getNamespace(), oldStateStoreUri), is(false) ); diff --git a/core/src/main/java/io/kestra/core/runners/DefaultRunContext.java b/core/src/main/java/io/kestra/core/runners/DefaultRunContext.java index 81ab0ee524..c73906a5af 100644 --- a/core/src/main/java/io/kestra/core/runners/DefaultRunContext.java +++ b/core/src/main/java/io/kestra/core/runners/DefaultRunContext.java @@ -495,7 +495,7 @@ public String tenantId() { public FlowInfo flowInfo() { Map flow = (Map) this.getVariables().get("flow"); // normally only tests should not have the flow variable - return flow == null ? null : new FlowInfo( + return flow == null ? new FlowInfo(null, null, null, null) : new FlowInfo( (String) flow.get("tenantId"), (String) flow.get("namespace"), (String) flow.get("id"), @@ -531,7 +531,7 @@ public String version() { @Override public KVStore namespaceKv(String namespace) { - return kvStoreService.get(tenantId(), namespace, this.flowInfo().namespace()); + return kvStoreService.get(this.flowInfo().tenantId(), namespace, this.flowInfo().namespace()); } /** diff --git a/core/src/main/java/io/kestra/core/runners/RunContext.java b/core/src/main/java/io/kestra/core/runners/RunContext.java index 54f1db9d70..37cb491cd7 100644 --- a/core/src/main/java/io/kestra/core/runners/RunContext.java +++ b/core/src/main/java/io/kestra/core/runners/RunContext.java @@ -139,6 +139,10 @@ public abstract class RunContext { */ public abstract void cleanup(); + /** + * @deprecated use flowInfo().tenantId() instead + */ + @Deprecated(forRemoval = true) public abstract String tenantId(); public abstract FlowInfo flowInfo(); diff --git a/core/src/main/java/io/kestra/core/runners/pebble/functions/FileSizeFunction.java b/core/src/main/java/io/kestra/core/runners/pebble/functions/FileSizeFunction.java index d55312ac36..7772b81d3c 100644 --- a/core/src/main/java/io/kestra/core/runners/pebble/functions/FileSizeFunction.java +++ b/core/src/main/java/io/kestra/core/runners/pebble/functions/FileSizeFunction.java @@ -70,7 +70,7 @@ private long getFileSizeFromInternalStorageUri(EvaluationContext context, URI pa checkIfFileFromParentExecution(context, path); } - FileAttributes fileAttributes = storageInterface.getAttributes(flow.get("tenantId"), path); + FileAttributes fileAttributes = storageInterface.getAttributes(flow.get("tenantId"), flow.get("namespace"), path); return fileAttributes.getSize(); } diff --git a/core/src/main/java/io/kestra/core/runners/pebble/functions/ReadFileFunction.java b/core/src/main/java/io/kestra/core/runners/pebble/functions/ReadFileFunction.java index b171c1ddfa..90c3ca40bd 100644 --- a/core/src/main/java/io/kestra/core/runners/pebble/functions/ReadFileFunction.java +++ b/core/src/main/java/io/kestra/core/runners/pebble/functions/ReadFileFunction.java @@ -82,7 +82,7 @@ private boolean calledOnWorker() { private String readFromNamespaceFile(EvaluationContext context, String path) throws IOException { Map flow = (Map) context.getVariable("flow"); URI namespaceFile = URI.create(StorageContext.namespaceFilePrefix(flow.get("namespace")) + "/" + path); - try (InputStream inputStream = storageInterface.get(flow.get("tenantId"), namespaceFile)) { + try (InputStream inputStream = storageInterface.get(flow.get("tenantId"), flow.get("namespace"), namespaceFile)) { return new String(inputStream.readAllBytes(), StandardCharsets.UTF_8); } } @@ -107,7 +107,7 @@ private String readFromInternalStorageUri(EvaluationContext context, URI path) t } } - try (InputStream inputStream = storageInterface.get(flow.get("tenantId"), path)) { + try (InputStream inputStream = storageInterface.get(flow.get("tenantId"), flow.get("namespace"), path)) { return new String(inputStream.readAllBytes(), StandardCharsets.UTF_8); } } diff --git a/core/src/main/java/io/kestra/core/services/ExecutionService.java b/core/src/main/java/io/kestra/core/services/ExecutionService.java index 8dbf68eb0e..906a158eb5 100644 --- a/core/src/main/java/io/kestra/core/services/ExecutionService.java +++ b/core/src/main/java/io/kestra/core/services/ExecutionService.java @@ -401,7 +401,7 @@ public PurgeResult purge( if (purgeStorage) { URI uri = StorageContext.forExecution(execution).getExecutionStorageURI(StorageContext.KESTRA_SCHEME); - builder.storagesCount(storageInterface.deleteByPrefix(execution.getTenantId(), uri).size()); + builder.storagesCount(storageInterface.deleteByPrefix(execution.getTenantId(), execution.getNamespace(), uri).size()); } return (PurgeResult) builder.build(); @@ -441,7 +441,7 @@ public void delete( if (deleteStorage) { URI uri = StorageContext.forExecution(execution).getExecutionStorageURI(StorageContext.KESTRA_SCHEME); - storageInterface.deleteByPrefix(execution.getTenantId(), uri); + storageInterface.deleteByPrefix(execution.getTenantId(), execution.getNamespace(), uri); } } diff --git a/core/src/main/java/io/kestra/core/storages/InternalNamespace.java b/core/src/main/java/io/kestra/core/storages/InternalNamespace.java index 9b1921027f..2cd059e57c 100644 --- a/core/src/main/java/io/kestra/core/storages/InternalNamespace.java +++ b/core/src/main/java/io/kestra/core/storages/InternalNamespace.java @@ -85,7 +85,7 @@ public List all(final boolean includeDirectories) throws IOExcept @Override public List all(final String prefix, final boolean includeDirectories) throws IOException { URI namespacePrefix = URI.create(NamespaceFile.of(namespace, Optional.ofNullable(prefix).map(Path::of).orElse(null)).storagePath().toString().replace("\\","/") + "/"); - return storage.allByPrefix(tenant, namespacePrefix, includeDirectories) + return storage.allByPrefix(tenant, namespace, namespacePrefix, includeDirectories) .stream() .map(uri -> new NamespaceFile(relativize(uri), uri, namespace)) .toList(); @@ -119,7 +119,7 @@ public List findAllFilesMatching(final Predicate predicate) @Override public InputStream getFileContent(final Path path) throws IOException { Path namespaceFilePath = NamespaceFile.of(namespace, path).storagePath(); - return storage.get(tenant, namespaceFilePath.toUri()); + return storage.get(tenant, namespace, namespaceFilePath.toUri()); } /** @@ -130,11 +130,11 @@ public NamespaceFile putFile(final Path path, final InputStream content, final C Path namespaceFilesPrefix = NamespaceFile.of(namespace, path).storagePath(); // Remove Windows letter URI cleanUri = new URI(namespaceFilesPrefix.toUri().toString().replaceFirst("^file:///[a-zA-Z]:", "")); - final boolean exists = storage.exists(tenant, cleanUri); + final boolean exists = storage.exists(tenant, namespace, cleanUri); return switch (onAlreadyExist) { case OVERWRITE -> { - URI uri = storage.put(tenant, cleanUri, content); + URI uri = storage.put(tenant, namespace, cleanUri, content); NamespaceFile namespaceFile = new NamespaceFile(relativize(uri), uri, namespace); if (exists) { logger.debug(String.format( @@ -153,7 +153,7 @@ public NamespaceFile putFile(final Path path, final InputStream content, final C } case ERROR -> { if (!exists) { - URI uri = storage.put(tenant, namespaceFilesPrefix.toUri(), content); + URI uri = storage.put(tenant, namespace, namespaceFilesPrefix.toUri(), content); yield new NamespaceFile(relativize(uri), uri, namespace); } else { throw new IOException(String.format( @@ -166,7 +166,7 @@ public NamespaceFile putFile(final Path path, final InputStream content, final C } case SKIP -> { if (!exists) { - URI uri = storage.put(tenant, namespaceFilesPrefix.toUri(), content); + URI uri = storage.put(tenant, namespace, namespaceFilesPrefix.toUri(), content); NamespaceFile namespaceFile = new NamespaceFile(relativize(uri), uri, namespace); logger.debug(String.format( "File '%s' added to namespace '%s'.", @@ -193,7 +193,7 @@ public NamespaceFile putFile(final Path path, final InputStream content, final C **/ @Override public URI createDirectory(Path path) throws IOException { - return storage.createDirectory(tenant, NamespaceFile.of(namespace, path).storagePath().toUri()); + return storage.createDirectory(tenant, namespace, NamespaceFile.of(namespace, path).storagePath().toUri()); } /** @@ -201,6 +201,6 @@ public URI createDirectory(Path path) throws IOException { **/ @Override public boolean delete(Path path) throws IOException { - return storage.delete(tenant, URI.create(path.toString().replace("\\","/"))); + return storage.delete(tenant, namespace, URI.create(path.toString().replace("\\","/"))); } } diff --git a/core/src/main/java/io/kestra/core/storages/InternalStorage.java b/core/src/main/java/io/kestra/core/storages/InternalStorage.java index 347aca44bc..7f35f2784f 100644 --- a/core/src/main/java/io/kestra/core/storages/InternalStorage.java +++ b/core/src/main/java/io/kestra/core/storages/InternalStorage.java @@ -88,7 +88,7 @@ public Namespace namespace(String namespace) { **/ @Override public boolean isFileExist(URI uri) { - return this.storage.exists(context.getTenantId(), uri); + return this.storage.exists(context.getTenantId(), context.getNamespace(), uri); } /** @@ -98,7 +98,7 @@ public boolean isFileExist(URI uri) { public InputStream getFile(final URI uri) throws IOException { uriGuard(uri); - return this.storage.get(context.getTenantId(), uri); + return this.storage.get(context.getTenantId(), context.getNamespace(), uri); } @@ -109,7 +109,7 @@ public InputStream getFile(final URI uri) throws IOException { public boolean deleteFile(final URI uri) throws IOException { uriGuard(uri); - return this.storage.delete(context.getTenantId(), uri); + return this.storage.delete(context.getTenantId(), context.getNamespace(), uri); } @@ -133,7 +133,7 @@ private static void uriGuard(URI uri) { **/ @Override public List deleteExecutionFiles() throws IOException { - return this.storage.deleteByPrefix(context.getTenantId(), context.getExecutionStorageURI()); + return this.storage.deleteByPrefix(context.getTenantId(), context.getNamespace(), context.getExecutionStorageURI()); } /** @@ -151,7 +151,7 @@ public URI getContextBaseURI() { public URI putFile(InputStream inputStream, String name) throws IOException { URI uri = context.getContextStorageURI(); URI resolved = uri.resolve(uri.getPath() + PATH_SEPARATOR + name); - return this.storage.put(context.getTenantId(), resolved, new BufferedInputStream(inputStream)); + return this.storage.put(context.getTenantId(), context.getNamespace(), resolved, new BufferedInputStream(inputStream)); } /** @@ -159,7 +159,7 @@ public URI putFile(InputStream inputStream, String name) throws IOException { **/ @Override public URI putFile(InputStream inputStream, URI uri) throws IOException { - return this.storage.put(context.getTenantId(), uri, new BufferedInputStream(inputStream)); + return this.storage.put(context.getTenantId(), context.getNamespace(), uri, new BufferedInputStream(inputStream)); } /** @@ -211,14 +211,14 @@ public Optional getCacheFile(final String cacheId, } URI uri = context.getCacheURI(cacheId, objectId); return isFileExist(uri) ? - Optional.of(storage.get(context.getTenantId(), uri)) : + Optional.of(storage.get(context.getTenantId(), context.getNamespace(), uri)) : Optional.empty(); } private Optional getCacheFileLastModifiedTime(String cacheId, @Nullable String objectId) throws IOException { URI uri = context.getCacheURI(cacheId, objectId); return isFileExist(uri) ? - Optional.of(this.storage.getAttributes(context.getTenantId(), uri).getLastModifiedTime()) : + Optional.of(this.storage.getAttributes(context.getTenantId(), context.getNamespace(), uri).getLastModifiedTime()) : Optional.empty(); } @@ -238,7 +238,7 @@ public URI putCacheFile(File file, String cacheId, @Nullable String objectId) th public Optional deleteCacheFile(String cacheId, @Nullable String objectId) throws IOException { URI uri = context.getCacheURI(cacheId, objectId); return isFileExist(uri) ? - Optional.of(this.storage.delete(context.getTenantId(), uri)) : + Optional.of(this.storage.delete(context.getTenantId(), context.getNamespace(), uri)) : Optional.empty(); } @@ -263,7 +263,7 @@ private URI putFileAndDelete(File file, String prefix, String name) throws IOExc private URI putFile(InputStream inputStream, String prefix, String name) throws IOException { URI uri = URI.create(prefix); URI resolve = uri.resolve(uri.getPath() + PATH_SEPARATOR + name); - return this.storage.put(context.getTenantId(), resolve, new BufferedInputStream(inputStream)); + return this.storage.put(context.getTenantId(), context.getNamespace(), resolve, new BufferedInputStream(inputStream)); } public Optional getTaskStorageContext() { diff --git a/core/src/main/java/io/kestra/core/storages/StorageInterface.java b/core/src/main/java/io/kestra/core/storages/StorageInterface.java index bafe9b0344..5c5fdd7ba2 100644 --- a/core/src/main/java/io/kestra/core/storages/StorageInterface.java +++ b/core/src/main/java/io/kestra/core/storages/StorageInterface.java @@ -4,6 +4,7 @@ import io.kestra.core.models.executions.Execution; import io.kestra.core.models.Plugin; +import javax.annotation.Nullable; import java.io.BufferedInputStream; import java.io.File; import java.io.FileInputStream; @@ -12,8 +13,12 @@ import java.io.InputStream; import java.net.URI; import java.util.List; -import java.util.Map; +/** + * @implNote Most methods (except lifecycle on) took a namespace as parameter, this namespace parameter MUST NOT BE USED to denote the path of the storage URI in any sort, + * the URI must never be modified by a storage implementation. + * This is only used by storage implementation that must enforce namespace isolation. + */ public interface StorageInterface extends AutoCloseable, Plugin { /** @@ -34,10 +39,10 @@ default void close() { } @Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class}) - InputStream get(String tenantId, URI uri) throws IOException; + InputStream get(String tenantId, @Nullable String namespace, URI uri) throws IOException; @Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class}) - StorageObject getWithMetadata(String tenantId, URI uri) throws IOException; + StorageObject getWithMetadata(String tenantId, @Nullable String namespace, URI uri) throws IOException; /** * Returns all objects that start with the given prefix @@ -46,10 +51,10 @@ default void close() { * @return Kestra's internal storage uris of the found objects */ @Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class}) - List allByPrefix(String tenantId, URI prefix, boolean includeDirectories) throws IOException; + List allByPrefix(String tenantId, @Nullable String namespace, URI prefix, boolean includeDirectories) throws IOException; @Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class}) - List list(String tenantId, URI uri) throws IOException; + List list(String tenantId, @Nullable String namespace, URI uri) throws IOException; /** * Whether the uri points to a file/object that exist in the internal storage. @@ -59,8 +64,8 @@ default void close() { * @return true if the uri points to a file/object that exist in the internal storage. */ @SuppressWarnings("try") - default boolean exists(String tenantId, URI uri) { - try (InputStream ignored = get(tenantId, uri)) { + default boolean exists(String tenantId, @Nullable String namespace, URI uri) { + try (InputStream ignored = get(tenantId, namespace, uri)) { return true; } catch (IOException ieo) { return false; @@ -68,31 +73,31 @@ default boolean exists(String tenantId, URI uri) { } @Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class}) - FileAttributes getAttributes(String tenantId, URI uri) throws IOException; + FileAttributes getAttributes(String tenantId, @Nullable String namespace, URI uri) throws IOException; @Retryable(includes = {IOException.class}) - default URI put(String tenantId, URI uri, InputStream data) throws IOException { - return this.put(tenantId, uri, new StorageObject(null, data)); + default URI put(String tenantId, @Nullable String namespace, URI uri, InputStream data) throws IOException { + return this.put(tenantId, namespace, uri, new StorageObject(null, data)); } @Retryable(includes = {IOException.class}) - URI put(String tenantId, URI uri, StorageObject storageObject) throws IOException; + URI put(String tenantId, @Nullable String namespace, URI uri, StorageObject storageObject) throws IOException; @Retryable(includes = {IOException.class}) - boolean delete(String tenantId, URI uri) throws IOException; + boolean delete(String tenantId, @Nullable String namespace, URI uri) throws IOException; @Retryable(includes = {IOException.class}) - URI createDirectory(String tenantId, URI uri) throws IOException; + URI createDirectory(String tenantId, @Nullable String namespace, URI uri) throws IOException; @Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class}) - URI move(String tenantId, URI from, URI to) throws IOException; + URI move(String tenantId, @Nullable String namespace, URI from, URI to) throws IOException; @Retryable(includes = {IOException.class}) - List deleteByPrefix(String tenantId, URI storagePrefix) throws IOException; + List deleteByPrefix(String tenantId, @Nullable String namespace, URI storagePrefix) throws IOException; @Retryable(includes = {IOException.class}) default URI from(Execution execution, String input, File file) throws IOException { URI uri = StorageContext.forInput(execution, input, file.getName()).getContextStorageURI(); - return this.put(execution.getTenantId(), uri, new BufferedInputStream(new FileInputStream(file))); + return this.put(execution.getTenantId(), execution.getNamespace(), uri, new BufferedInputStream(new FileInputStream(file))); } } diff --git a/core/src/main/java/io/kestra/core/storages/kv/InternalKVStore.java b/core/src/main/java/io/kestra/core/storages/kv/InternalKVStore.java index 53809fbc32..7f55f38b8a 100644 --- a/core/src/main/java/io/kestra/core/storages/kv/InternalKVStore.java +++ b/core/src/main/java/io/kestra/core/storages/kv/InternalKVStore.java @@ -67,7 +67,7 @@ public void put(String key, KVValueAndMetadata value, boolean overwrite) throws byte[] serialized = JacksonMapper.ofIon().writeValueAsBytes(value.value()); - this.storage.put(this.tenant, this.storageUri(key), new StorageObject( + this.storage.put(this.tenant, this.namespace, this.storageUri(key), new StorageObject( value.metadataAsMap(), new ByteArrayInputStream(serialized) )); @@ -92,7 +92,7 @@ public Optional getRawValue(String key) throws IOException, ResourceExpi StorageObject withMetadata; try { - withMetadata = this.storage.getWithMetadata(this.tenant, this.storageUri(key)); + withMetadata = this.storage.getWithMetadata(this.tenant, this.namespace, this.storageUri(key)); } catch (FileNotFoundException e) { return Optional.empty(); } @@ -112,7 +112,7 @@ public Optional getRawValue(String key) throws IOException, ResourceExpi @Override public boolean delete(String key) throws IOException { KVStore.validateKey(key); - return this.storage.delete(this.tenant, this.storageUri(key)); + return this.storage.delete(this.tenant, this.namespace, this.storageUri(key)); } /** @@ -122,7 +122,7 @@ public boolean delete(String key) throws IOException { public List list() throws IOException { List list; try { - list = this.storage.list(this.tenant, this.storageUri(null)); + list = this.storage.list(this.tenant, this.namespace, this.storageUri(null)); } catch (FileNotFoundException e) { return Collections.emptyList(); } @@ -140,7 +140,7 @@ public Optional get(final String key) throws IOException { KVStore.validateKey(key); try { - KVEntry entry = KVEntry.from(this.storage.getAttributes(this.tenant, this.storageUri(key))); + KVEntry entry = KVEntry.from(this.storage.getAttributes(this.tenant, this.namespace, this.storageUri(key))); if (entry.expirationDate() != null && Instant.now().isAfter(entry.expirationDate())) { this.delete(key); return Optional.empty(); diff --git a/core/src/main/java/io/kestra/plugin/core/flow/ForEachItem.java b/core/src/main/java/io/kestra/plugin/core/flow/ForEachItem.java index 33b954931c..7b5a3aec7d 100644 --- a/core/src/main/java/io/kestra/plugin/core/flow/ForEachItem.java +++ b/core/src/main/java/io/kestra/plugin/core/flow/ForEachItem.java @@ -2,7 +2,6 @@ import io.kestra.core.exceptions.IllegalVariableEvaluationException; import io.kestra.core.exceptions.InternalException; -import io.kestra.core.models.Label; import io.kestra.core.models.annotations.Example; import io.kestra.core.models.annotations.Plugin; import io.kestra.core.models.annotations.PluginProperty; @@ -34,12 +33,10 @@ import lombok.NoArgsConstructor; import lombok.ToString; import lombok.experimental.SuperBuilder; -import org.apache.commons.lang3.stream.Streams; import java.io.*; import java.net.URI; import java.time.ZonedDateTime; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -47,7 +44,6 @@ import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; -import java.util.stream.IntStream; import java.util.stream.Stream; import static io.kestra.core.utils.Rethrow.throwFunction; @@ -81,7 +77,7 @@ tasks: - id: read_file type: io.kestra.plugin.scripts.shell.Commands - taskRunner: + taskRunner: type: io.kestra.plugin.core.runner.Process commands: - cat "{{ inputs.order }}" @@ -579,8 +575,8 @@ public ForEachItemMergeOutputs.Output run(RunContext runContext) throws Exceptio URI subflowOutputsBaseUri = URI.create(StorageContext.KESTRA_PROTOCOL + subflowOutputsBase + "/"); StorageInterface storage = ((DefaultRunContext) runContext).getApplicationContext().getBean(StorageInterface.class); - if (storage.exists(runContext.tenantId(), subflowOutputsBaseUri)) { - List list = storage.list(runContext.tenantId(), subflowOutputsBaseUri); + if (storage.exists(runContext.flowInfo().tenantId(), runContext.flowInfo().namespace(), subflowOutputsBaseUri)) { + List list = storage.list(runContext.flowInfo().tenantId(), runContext.flowInfo().namespace(), subflowOutputsBaseUri); if (!list.isEmpty()) { // Merge outputs from each sub-flow into a single stored in the internal storage. diff --git a/core/src/main/java/io/kestra/plugin/core/kv/Delete.java b/core/src/main/java/io/kestra/plugin/core/kv/Delete.java index a815f40139..3646436248 100644 --- a/core/src/main/java/io/kestra/plugin/core/kv/Delete.java +++ b/core/src/main/java/io/kestra/plugin/core/kv/Delete.java @@ -31,7 +31,7 @@ code = """ id: kv_store_delete namespace: company.team - + tasks: - id: kv_delete type: io.kestra.plugin.core.kv.Delete @@ -70,7 +70,7 @@ public Output run(RunContext runContext) throws Exception { String renderedNamespace = runContext.render(this.namespace); FlowService flowService = ((DefaultRunContext) runContext).getApplicationContext().getBean(FlowService.class); - flowService.checkAllowedNamespace(runContext.tenantId(), renderedNamespace, runContext.tenantId(), runContext.flowInfo().namespace()); + flowService.checkAllowedNamespace(runContext.flowInfo().tenantId(), renderedNamespace, runContext.flowInfo().tenantId(), runContext.flowInfo().namespace()); String renderedKey = runContext.render(this.key); diff --git a/core/src/main/java/io/kestra/plugin/core/kv/Get.java b/core/src/main/java/io/kestra/plugin/core/kv/Get.java index 94a7021f27..f54a9b1123 100644 --- a/core/src/main/java/io/kestra/plugin/core/kv/Get.java +++ b/core/src/main/java/io/kestra/plugin/core/kv/Get.java @@ -76,7 +76,7 @@ public Output run(RunContext runContext) throws Exception { String renderedNamespace = runContext.render(this.namespace); FlowService flowService = ((DefaultRunContext) runContext).getApplicationContext().getBean(FlowService.class); - flowService.checkAllowedNamespace(runContext.tenantId(), renderedNamespace, runContext.tenantId(), runContext.flowInfo().namespace()); + flowService.checkAllowedNamespace(runContext.flowInfo().tenantId(), renderedNamespace, runContext.flowInfo().tenantId(), runContext.flowInfo().namespace()); String renderedKey = runContext.render(this.key); diff --git a/core/src/main/java/io/kestra/plugin/core/kv/GetKeys.java b/core/src/main/java/io/kestra/plugin/core/kv/GetKeys.java index d104cfcc96..ff60137e7f 100644 --- a/core/src/main/java/io/kestra/plugin/core/kv/GetKeys.java +++ b/core/src/main/java/io/kestra/plugin/core/kv/GetKeys.java @@ -67,7 +67,7 @@ public Output run(RunContext runContext) throws Exception { String renderedNamespace = runContext.render(this.namespace); FlowService flowService = ((DefaultRunContext) runContext).getApplicationContext().getBean(FlowService.class); - flowService.checkAllowedNamespace(runContext.tenantId(), renderedNamespace, runContext.tenantId(), runContext.flowInfo().namespace()); + flowService.checkAllowedNamespace(runContext.flowInfo().tenantId(), renderedNamespace, runContext.flowInfo().tenantId(), runContext.flowInfo().namespace()); String renderedPrefix = runContext.render(this.prefix); Predicate filter = renderedPrefix == null ? key -> true : key -> key.startsWith(renderedPrefix); diff --git a/core/src/main/java/io/kestra/plugin/core/storage/Delete.java b/core/src/main/java/io/kestra/plugin/core/storage/Delete.java index 0bd474ad82..cc9dd3dec7 100644 --- a/core/src/main/java/io/kestra/plugin/core/storage/Delete.java +++ b/core/src/main/java/io/kestra/plugin/core/storage/Delete.java @@ -55,7 +55,7 @@ public Delete.Output run(RunContext runContext) throws Exception { StorageInterface storageInterface = ((DefaultRunContext)runContext).getApplicationContext().getBean(StorageInterface.class); URI render = URI.create(runContext.render(this.uri)); - boolean delete = storageInterface.delete(runContext.tenantId(), render); + boolean delete = storageInterface.delete(runContext.flowInfo().tenantId(), runContext.flowInfo().namespace(), render); if (errorOnMissing && !delete) { throw new NoSuchElementException("Unable to find file '" + render + "'"); diff --git a/core/src/main/java/io/kestra/plugin/core/storage/Size.java b/core/src/main/java/io/kestra/plugin/core/storage/Size.java index 5a3ddfa083..80388b75ee 100644 --- a/core/src/main/java/io/kestra/plugin/core/storage/Size.java +++ b/core/src/main/java/io/kestra/plugin/core/storage/Size.java @@ -47,7 +47,7 @@ public Size.Output run(RunContext runContext) throws Exception { StorageInterface storageInterface = ((DefaultRunContext)runContext).getApplicationContext().getBean(StorageInterface.class); URI render = URI.create(runContext.render(this.uri)); - Long size = storageInterface.getAttributes(runContext.tenantId(), render).getSize(); + Long size = storageInterface.getAttributes(runContext.flowInfo().tenantId(), runContext.flowInfo().namespace(), render).getSize(); return Output.builder() .size(size) diff --git a/core/src/main/java/io/kestra/plugin/core/trigger/Toggle.java b/core/src/main/java/io/kestra/plugin/core/trigger/Toggle.java index d684ad0fd4..a2c79beadf 100644 --- a/core/src/main/java/io/kestra/plugin/core/trigger/Toggle.java +++ b/core/src/main/java/io/kestra/plugin/core/trigger/Toggle.java @@ -111,11 +111,11 @@ public VoidOutput run(RunContext runContext) throws Exception { final ApplicationContext applicationContext = ((DefaultRunContext) runContext).getApplicationContext(); FlowExecutorInterface flowExecutor = applicationContext.getBean(FlowExecutorInterface.class); flowExecutor.findByIdFromTask( - runContext.tenantId(), + runContext.flowInfo().tenantId(), realNamespace, realFlowId, Optional.empty(), - runContext.tenantId(), + runContext.flowInfo().tenantId(), flowVariables.get("namespace"), flowVariables.get("id") ) @@ -124,7 +124,7 @@ public VoidOutput run(RunContext runContext) throws Exception { // load the trigger from the database TriggerContext triggerContext = TriggerContext.builder() - .tenantId(runContext.tenantId()) + .tenantId(runContext.flowInfo().tenantId()) .namespace(realNamespace) .flowId(realFlowId) .triggerId(realTrigger) diff --git a/core/src/test/java/io/kestra/core/models/property/PropertyTest.java b/core/src/test/java/io/kestra/core/models/property/PropertyTest.java index 9e4953bda4..479cc5ce7f 100644 --- a/core/src/test/java/io/kestra/core/models/property/PropertyTest.java +++ b/core/src/test/java/io/kestra/core/models/property/PropertyTest.java @@ -153,7 +153,7 @@ void withMessagesFromURI() throws Exception { FileSerde.writeAll(Files.newBufferedWriter(messages), Flux.fromIterable(inputValues)).block(); URI uri; try (var input = new FileInputStream(messages.toFile())) { - uri = storage.put(null, URI.create("/messages.ion"), input); + uri = storage.put(null, null, URI.create("/messages.ion"), input); } var task = DynamicPropertyExampleTask.builder() diff --git a/core/src/test/java/io/kestra/core/runners/FlowInputOutputTest.java b/core/src/test/java/io/kestra/core/runners/FlowInputOutputTest.java index 9bac1e7780..4b76ba1632 100644 --- a/core/src/test/java/io/kestra/core/runners/FlowInputOutputTest.java +++ b/core/src/test/java/io/kestra/core/runners/FlowInputOutputTest.java @@ -215,7 +215,7 @@ void shouldNotUploadFileInputAfterValidation() throws IOException { // Then Assertions.assertNull(values.getFirst().exception()); - Assertions.assertFalse(storageInterface.exists(null, URI.create(values.getFirst().value().toString()))); + Assertions.assertFalse(storageInterface.exists(null, null, URI.create(values.getFirst().value().toString()))); } @Test diff --git a/core/src/test/java/io/kestra/core/runners/InputsTest.java b/core/src/test/java/io/kestra/core/runners/InputsTest.java index be83cc70b1..5c647d28bc 100644 --- a/core/src/test/java/io/kestra/core/runners/InputsTest.java +++ b/core/src/test/java/io/kestra/core/runners/InputsTest.java @@ -134,7 +134,7 @@ void allValidInputs() throws URISyntaxException, IOException { assertThat(typeds.get("duration"), is(Duration.parse("PT5M6S"))); assertThat((URI) typeds.get("file"), is(new URI("kestra:///io/kestra/tests/inputs/executions/test/inputs/file/application-test.yml"))); assertThat( - CharStreams.toString(new InputStreamReader(storageInterface.get(null, (URI) typeds.get("file")))), + CharStreams.toString(new InputStreamReader(storageInterface.get(null, null, (URI) typeds.get("file")))), is(CharStreams.toString(new InputStreamReader(new FileInputStream((String) inputs.get("file"))))) ); assertThat(typeds.get("json"), is(Map.of("a", "b"))); diff --git a/core/src/test/java/io/kestra/core/runners/LogToFileTest.java b/core/src/test/java/io/kestra/core/runners/LogToFileTest.java index 4e07519680..32d41e6b33 100644 --- a/core/src/test/java/io/kestra/core/runners/LogToFileTest.java +++ b/core/src/test/java/io/kestra/core/runners/LogToFileTest.java @@ -31,7 +31,7 @@ void task() throws Exception { TaskRunAttempt attempt = taskRun.getAttempts().getFirst(); assertThat(attempt.getLogFile(), notNullValue()); - InputStream inputStream = storage.get(null, attempt.getLogFile()); + InputStream inputStream = storage.get(null, "io.kestra.tests", attempt.getLogFile()); List strings = IOUtils.readLines(inputStream, StandardCharsets.UTF_8); assertThat(strings, notNullValue()); assertThat(strings.size(), is(1)); diff --git a/core/src/test/java/io/kestra/core/runners/RunContextTest.java b/core/src/test/java/io/kestra/core/runners/RunContextTest.java index 097e5d3604..0b8ad40e3a 100644 --- a/core/src/test/java/io/kestra/core/runners/RunContextTest.java +++ b/core/src/test/java/io/kestra/core/runners/RunContextTest.java @@ -183,7 +183,7 @@ void largeInput() throws IOException, InterruptedException { p.destroy(); URI uri = runContext.storage().putFile(path.toFile()); - assertThat(storageInterface.getAttributes(null, uri).getSize(), is(size + 1)); + assertThat(storageInterface.getAttributes(null, null, uri).getSize(), is(size + 1)); } @Test diff --git a/core/src/test/java/io/kestra/core/runners/TaskWithAllowFailureTest.java b/core/src/test/java/io/kestra/core/runners/TaskWithAllowFailureTest.java index 6bd47d0bb2..89638ca79d 100644 --- a/core/src/test/java/io/kestra/core/runners/TaskWithAllowFailureTest.java +++ b/core/src/test/java/io/kestra/core/runners/TaskWithAllowFailureTest.java @@ -71,6 +71,7 @@ private URI storageUpload() throws URISyntaxException, IOException { Files.write(tempFile.toPath(), content()); return storageInterface.put( + null, null, new URI("/file/storage/file.txt"), new FileInputStream(tempFile) diff --git a/core/src/test/java/io/kestra/core/runners/TaskWithAllowWarningTest.java b/core/src/test/java/io/kestra/core/runners/TaskWithAllowWarningTest.java index 5049b4c155..7c086132e6 100644 --- a/core/src/test/java/io/kestra/core/runners/TaskWithAllowWarningTest.java +++ b/core/src/test/java/io/kestra/core/runners/TaskWithAllowWarningTest.java @@ -71,6 +71,7 @@ private URI storageUpload() throws URISyntaxException, IOException { Files.write(tempFile.toPath(), content()); return storageInterface.put( + null, null, new URI("/file/storage/file.txt"), new FileInputStream(tempFile) diff --git a/core/src/test/java/io/kestra/core/runners/pebble/functions/FileSizeFunctionTest.java b/core/src/test/java/io/kestra/core/runners/pebble/functions/FileSizeFunctionTest.java index 2a689df10a..411bdfd85b 100644 --- a/core/src/test/java/io/kestra/core/runners/pebble/functions/FileSizeFunctionTest.java +++ b/core/src/test/java/io/kestra/core/runners/pebble/functions/FileSizeFunctionTest.java @@ -165,6 +165,6 @@ private URI getInternalStorageURI(String executionId) { } private URI getInternalStorageFile(URI internalStorageURI) throws IOException { - return storageInterface.put(null, internalStorageURI, new ByteArrayInputStream(FILE_TEXT.getBytes())); + return storageInterface.put(null, null, internalStorageURI, new ByteArrayInputStream(FILE_TEXT.getBytes())); } } diff --git a/core/src/test/java/io/kestra/core/runners/pebble/functions/KvFunctionTest.java b/core/src/test/java/io/kestra/core/runners/pebble/functions/KvFunctionTest.java index 87226ac7cc..d3488aa055 100644 --- a/core/src/test/java/io/kestra/core/runners/pebble/functions/KvFunctionTest.java +++ b/core/src/test/java/io/kestra/core/runners/pebble/functions/KvFunctionTest.java @@ -41,7 +41,7 @@ public class KvFunctionTest extends AbstractMemoryRunnerTest { @BeforeEach void reset() throws IOException { - storageInterface.deleteByPrefix(null, URI.create(StorageContext.kvPrefix("io.kestra.tests"))); + storageInterface.deleteByPrefix(null, null, URI.create(StorageContext.kvPrefix("io.kestra.tests"))); } @Test diff --git a/core/src/test/java/io/kestra/core/runners/pebble/functions/ReadFileFunctionTest.java b/core/src/test/java/io/kestra/core/runners/pebble/functions/ReadFileFunctionTest.java index 68c1284ffb..d106179a56 100644 --- a/core/src/test/java/io/kestra/core/runners/pebble/functions/ReadFileFunctionTest.java +++ b/core/src/test/java/io/kestra/core/runners/pebble/functions/ReadFileFunctionTest.java @@ -36,8 +36,8 @@ class ReadFileFunctionTest { void readNamespaceFile() throws IllegalVariableEvaluationException, IOException { String namespace = "io.kestra.tests"; String filePath = "file.txt"; - storageInterface.createDirectory(null, URI.create(StorageContext.namespaceFilePrefix(namespace))); - storageInterface.put(null, URI.create(StorageContext.namespaceFilePrefix(namespace) + "/" + filePath), new ByteArrayInputStream("Hello from {{ flow.namespace }}".getBytes())); + storageInterface.createDirectory(null, namespace, URI.create(StorageContext.namespaceFilePrefix(namespace))); + storageInterface.put(null, namespace, URI.create(StorageContext.namespaceFilePrefix(namespace) + "/" + filePath), new ByteArrayInputStream("Hello from {{ flow.namespace }}".getBytes())); String render = variableRenderer.render("{{ render(read('" + filePath + "')) }}", Map.of("flow", Map.of("namespace", namespace))); assertThat(render, is("Hello from " + namespace)); @@ -56,7 +56,7 @@ void readInternalStorageFile() throws IOException, IllegalVariableEvaluationExce String flowId = "flow"; String executionId = IdUtils.create(); URI internalStorageURI = URI.create("/" + namespace.replace(".", "/") + "/" + flowId + "/executions/" + executionId + "/tasks/task/" + IdUtils.create() + "/123456.ion"); - URI internalStorageFile = storageInterface.put(null, internalStorageURI, new ByteArrayInputStream("Hello from a task output".getBytes())); + URI internalStorageFile = storageInterface.put(null, namespace, internalStorageURI, new ByteArrayInputStream("Hello from a task output".getBytes())); // test for an authorized execution Map variables = Map.of( @@ -93,7 +93,7 @@ void readInternalStorageURI() throws IOException, IllegalVariableEvaluationExcep String flowId = "flow"; String executionId = IdUtils.create(); URI internalStorageURI = URI.create("/" + namespace.replace(".", "/") + "/" + flowId + "/executions/" + executionId + "/tasks/task/" + IdUtils.create() + "/123456.ion"); - URI internalStorageFile = storageInterface.put(null, internalStorageURI, new ByteArrayInputStream("Hello from a task output".getBytes())); + URI internalStorageFile = storageInterface.put(null, namespace, internalStorageURI, new ByteArrayInputStream("Hello from a task output".getBytes())); // test for an authorized execution Map variables = Map.of( @@ -130,7 +130,7 @@ void readUnauthorizedInternalStorageFile() throws IOException { String flowId = "flow"; String executionId = IdUtils.create(); URI internalStorageURI = URI.create("/" + namespace.replace(".", "/") + "/" + flowId + "/executions/" + executionId + "/tasks/task/" + IdUtils.create() + "/123456.ion"); - URI internalStorageFile = storageInterface.put(null, internalStorageURI, new ByteArrayInputStream("Hello from a task output".getBytes())); + URI internalStorageFile = storageInterface.put(null, namespace, internalStorageURI, new ByteArrayInputStream("Hello from a task output".getBytes())); // test for an un-authorized execution with no trigger Map variables = Map.of( diff --git a/core/src/test/java/io/kestra/core/storages/InternalKVStoreTest.java b/core/src/test/java/io/kestra/core/storages/InternalKVStoreTest.java index 66727a06c1..c69beea653 100644 --- a/core/src/test/java/io/kestra/core/storages/InternalKVStoreTest.java +++ b/core/src/test/java/io/kestra/core/storages/InternalKVStoreTest.java @@ -94,7 +94,7 @@ void put() throws IOException { kv.put(TEST_KV_KEY, new KVValueAndMetadata(new KVMetadata(Duration.ofMinutes(5)), complexValue)); // Then - StorageObject withMetadata = storageInterface.getWithMetadata(null, URI.create("/" + kv.namespace().replace(".", "/") + "/_kv/my-key.ion")); + StorageObject withMetadata = storageInterface.getWithMetadata(null, kv.namespace(), URI.create("/" + kv.namespace().replace(".", "/") + "/_kv/my-key.ion")); String valueFile = new String(withMetadata.inputStream().readAllBytes()); Instant expirationDate = Instant.parse(withMetadata.metadata().get("expirationDate")); assertThat(expirationDate.isAfter(before.plus(Duration.ofMinutes(4))) && expirationDate.isBefore(before.plus(Duration.ofMinutes(6))), is(true)); @@ -104,7 +104,7 @@ void put() throws IOException { kv.put(TEST_KV_KEY, new KVValueAndMetadata(new KVMetadata(Duration.ofMinutes(10)), "some-value")); // Then - withMetadata = storageInterface.getWithMetadata(null, URI.create("/" + kv.namespace().replace(".", "/") + "/_kv/my-key.ion")); + withMetadata = storageInterface.getWithMetadata(null, kv.namespace(), URI.create("/" + kv.namespace().replace(".", "/") + "/_kv/my-key.ion")); valueFile = new String(withMetadata.inputStream().readAllBytes()); expirationDate = Instant.parse(withMetadata.metadata().get("expirationDate")); assertThat(expirationDate.isAfter(before.plus(Duration.ofMinutes(9))) && expirationDate.isBefore(before.plus(Duration.ofMinutes(11))), is(true)); diff --git a/core/src/test/java/io/kestra/core/storages/StorageInterfaceFactoryTest.java b/core/src/test/java/io/kestra/core/storages/StorageInterfaceFactoryTest.java index a8ca96cf88..ce87eacf25 100644 --- a/core/src/test/java/io/kestra/core/storages/StorageInterfaceFactoryTest.java +++ b/core/src/test/java/io/kestra/core/storages/StorageInterfaceFactoryTest.java @@ -5,7 +5,6 @@ import io.kestra.storage.local.LocalStorage; import io.kestra.core.junit.annotations.KestraTest; import jakarta.inject.Inject; -import jakarta.validation.ConstraintViolation; import jakarta.validation.ConstraintViolationException; import jakarta.validation.Validator; import org.junit.jupiter.api.Assertions; diff --git a/core/src/test/java/io/kestra/plugin/core/flow/ForEachItemCaseTest.java b/core/src/test/java/io/kestra/plugin/core/flow/ForEachItemCaseTest.java index 8990f6095d..6b3a82a5ed 100644 --- a/core/src/test/java/io/kestra/plugin/core/flow/ForEachItemCaseTest.java +++ b/core/src/test/java/io/kestra/plugin/core/flow/ForEachItemCaseTest.java @@ -261,7 +261,7 @@ public void forEachItemWithSubflowOutputs() throws TimeoutException, Interrupted // asserts for subflow merged outputs Map mergeTaskOutputs = execution.getTaskRunList().get(3).getOutputs(); assertThat(mergeTaskOutputs.get("subflowOutputs"), notNullValue()); - InputStream stream = storageInterface.get(null, URI.create((String) mergeTaskOutputs.get("subflowOutputs"))); + InputStream stream = storageInterface.get(null, execution.getNamespace(), URI.create((String) mergeTaskOutputs.get("subflowOutputs"))); try (var br = new BufferedReader(new InputStreamReader(stream))) { // one line per sub-flows @@ -275,6 +275,7 @@ private URI storageUpload() throws URISyntaxException, IOException { Files.write(tempFile.toPath(), content()); return storageInterface.put( + null, null, new URI("/file/storage/file.txt"), new FileInputStream(tempFile) @@ -285,6 +286,7 @@ private URI emptyItems() throws URISyntaxException, IOException { File tempFile = File.createTempFile("file", ".txt"); return storageInterface.put( + null, null, new URI("/file/storage/file.txt"), new FileInputStream(tempFile) diff --git a/core/src/test/java/io/kestra/plugin/core/flow/PauseTest.java b/core/src/test/java/io/kestra/plugin/core/flow/PauseTest.java index 55cd70ea9d..912d0b3626 100644 --- a/core/src/test/java/io/kestra/plugin/core/flow/PauseTest.java +++ b/core/src/test/java/io/kestra/plugin/core/flow/PauseTest.java @@ -230,7 +230,7 @@ public void runOnResume(RunnerUtils runnerUtils) throws Exception { assertThat(outputs.get("asked"), is("restarted")); assertThat((String) outputs.get("data"), startsWith("kestra://")); assertThat( - CharStreams.toString(new InputStreamReader(storageInterface.get(null, URI.create((String) outputs.get("data"))))), + CharStreams.toString(new InputStreamReader(storageInterface.get(null, null, URI.create((String) outputs.get("data"))))), is(executionId) ); } diff --git a/core/src/test/java/io/kestra/plugin/core/flow/WorkingDirectoryTest.java b/core/src/test/java/io/kestra/plugin/core/flow/WorkingDirectoryTest.java index 0d2434d3d1..aadb334cae 100644 --- a/core/src/test/java/io/kestra/plugin/core/flow/WorkingDirectoryTest.java +++ b/core/src/test/java/io/kestra/plugin/core/flow/WorkingDirectoryTest.java @@ -195,7 +195,7 @@ public void cache(RunnerUtils runnerUtils) throws TimeoutException, IOException, storage.deleteCacheFile("workingDir", null); URI cacheURI = storageContext.getCacheURI("workingdir", null); - assertFalse(storageInterface.exists(null, cacheURI)); + assertFalse(storageInterface.exists(null, null, cacheURI)); Execution execution = runnerUtils.runOne(null, "io.kestra.tests", "working-directory-cache"); @@ -207,7 +207,7 @@ public void cache(RunnerUtils runnerUtils) throws TimeoutException, IOException, is(Map.of("uris", Collections.emptyMap())) ); assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS)); - assertTrue(storageInterface.exists(null, cacheURI)); + assertTrue(storageInterface.exists(null, null, cacheURI)); // a second run should use the cache so the task `exists` should output the cached file execution = runnerUtils.runOne(null, "io.kestra.tests", "working-directory-cache"); @@ -271,6 +271,7 @@ public void encryption(RunnerUtils runnerUtils, RunContextFactory runContextFact private void put(String path, String content) throws IOException { storageInterface.put( + null, null, URI.create(StorageContext.namespaceFilePrefix("io.kestra.tests") + path), new ByteArrayInputStream(content.getBytes()) diff --git a/core/src/test/java/io/kestra/plugin/core/http/DownloadTest.java b/core/src/test/java/io/kestra/plugin/core/http/DownloadTest.java index e2217c9a10..59a24dac29 100644 --- a/core/src/test/java/io/kestra/plugin/core/http/DownloadTest.java +++ b/core/src/test/java/io/kestra/plugin/core/http/DownloadTest.java @@ -52,7 +52,7 @@ void run() throws Exception { Download.Output output = task.run(runContext); assertThat( - IOUtils.toString(this.storageInterface.get(null, output.getUri()), StandardCharsets.UTF_8), + IOUtils.toString(this.storageInterface.get(null, null, output.getUri()), StandardCharsets.UTF_8), is(IOUtils.toString(new URI(FILE).toURL().openStream(), StandardCharsets.UTF_8)) ); assertThat(output.getUri().toString(), endsWith(".csv")); @@ -95,7 +95,7 @@ void allowNoResponse() throws IOException { Download.Output output = assertDoesNotThrow(() -> task.run(runContext)); assertThat(output.getLength(), is(0L)); - assertThat(IOUtils.toString(this.storageInterface.get(null, output.getUri()), StandardCharsets.UTF_8), is("")); + assertThat(IOUtils.toString(this.storageInterface.get(null, null, output.getUri()), StandardCharsets.UTF_8), is("")); } @Test diff --git a/core/src/test/java/io/kestra/plugin/core/http/RequestTest.java b/core/src/test/java/io/kestra/plugin/core/http/RequestTest.java index 54caa12a13..851d9d1480 100644 --- a/core/src/test/java/io/kestra/plugin/core/http/RequestTest.java +++ b/core/src/test/java/io/kestra/plugin/core/http/RequestTest.java @@ -168,6 +168,7 @@ void multipart() throws Exception { File file = new File(Objects.requireNonNull(RequestTest.class.getClassLoader().getResource("application-test.yml")).toURI()); URI fileStorage = storageInterface.put( + null, null, new URI("/" + FriendlyId.createFriendlyId()), new FileInputStream(file) @@ -201,6 +202,7 @@ void multipartCustomFilename() throws Exception { File file = new File(Objects.requireNonNull(RequestTest.class.getClassLoader().getResource("application-test.yml")).toURI()); URI fileStorage = storageInterface.put( + null, null, new URI("/" + FriendlyId.createFriendlyId()), new FileInputStream(file) diff --git a/core/src/test/java/io/kestra/plugin/core/namespace/UploadFilesTest.java b/core/src/test/java/io/kestra/plugin/core/namespace/UploadFilesTest.java index c363277ebc..2e7cf7e24c 100644 --- a/core/src/test/java/io/kestra/plugin/core/namespace/UploadFilesTest.java +++ b/core/src/test/java/io/kestra/plugin/core/namespace/UploadFilesTest.java @@ -44,6 +44,7 @@ void shouldThrowExceptionGivenAlreadyExistingFileWhenConflictError() throws Exce File file = new File(Objects.requireNonNull(UploadFilesTest.class.getClassLoader().getResource("application-test.yml")).toURI()); URI fileStorage = storageInterface.put( + null, null, new URI("/" + FriendlyId.createFriendlyId()), new FileInputStream(file) @@ -168,6 +169,7 @@ private URI addToStorage(String fileToLoad) throws IOException, URISyntaxExcepti File file = new File(Objects.requireNonNull(UploadFilesTest.class.getClassLoader().getResource(fileToLoad)).toURI()); return storageInterface.put( + null, null, new URI("/" + FriendlyId.createFriendlyId()), new FileInputStream(file) diff --git a/core/src/test/java/io/kestra/plugin/core/storage/ConcatTest.java b/core/src/test/java/io/kestra/plugin/core/storage/ConcatTest.java index 014471b1a3..0c22e2e5f8 100644 --- a/core/src/test/java/io/kestra/plugin/core/storage/ConcatTest.java +++ b/core/src/test/java/io/kestra/plugin/core/storage/ConcatTest.java @@ -39,6 +39,7 @@ void run(Boolean json) throws Exception { .toURI()); URI put = storageInterface.put( + null, null, new URI("/file/storage/get.yml"), new FileInputStream(Objects.requireNonNull(resource).getFile()) @@ -57,7 +58,7 @@ void run(Boolean json) throws Exception { assertThat( - CharStreams.toString(new InputStreamReader(storageInterface.get(null, run.getUri()))), + CharStreams.toString(new InputStreamReader(storageInterface.get(null, null, run.getUri()))), is(s + "\n" + s + "\n") ); assertThat(run.getUri().getPath(), endsWith(".yml")); diff --git a/core/src/test/java/io/kestra/plugin/core/storage/DeleteTest.java b/core/src/test/java/io/kestra/plugin/core/storage/DeleteTest.java index 28446907fe..d9862ddc4c 100644 --- a/core/src/test/java/io/kestra/plugin/core/storage/DeleteTest.java +++ b/core/src/test/java/io/kestra/plugin/core/storage/DeleteTest.java @@ -31,6 +31,7 @@ void run() throws Exception { URL resource = DeleteTest.class.getClassLoader().getResource("application-test.yml"); URI put = storageInterface.put( + null, null, new URI("/file/storage/get.yml"), new FileInputStream(Objects.requireNonNull(resource).getFile()) diff --git a/core/src/test/java/io/kestra/plugin/core/storage/LocalFilesTest.java b/core/src/test/java/io/kestra/plugin/core/storage/LocalFilesTest.java index f723985687..c939cd84ca 100644 --- a/core/src/test/java/io/kestra/plugin/core/storage/LocalFilesTest.java +++ b/core/src/test/java/io/kestra/plugin/core/storage/LocalFilesTest.java @@ -36,6 +36,7 @@ private URI internalFiles() throws IOException, URISyntaxException { var resource = ConcatTest.class.getClassLoader().getResource("application-test.yml"); return storageInterface.put( + null, null, new URI("/file/storage/get.yml"), new FileInputStream(Objects.requireNonNull(resource).getFile()) @@ -64,14 +65,14 @@ void run() throws Exception { assertThat(outputs.getUris(), notNullValue()); assertThat(outputs.getUris().size(), is(1)); assertThat( - new String(storageInterface.get(null, outputs.getUris().get("hello-input.txt")).readAllBytes()), + new String(storageInterface.get(null, null, outputs.getUris().get("hello-input.txt")).readAllBytes()), is("Hello Input") ); assertThat(runContext.workingDir().path().toFile().list().length, is(2)); assertThat(Files.readString(runContext.workingDir().path().resolve("execution.txt")), is("tata")); assertThat( Files.readString(runContext.workingDir().path().resolve("application-test.yml")), - is(new String(storageInterface.get(null, storageFile).readAllBytes())) + is(new String(storageInterface.get(null, null, storageFile).readAllBytes())) ); runContext.cleanup(); @@ -98,18 +99,18 @@ void recursive() throws Exception { assertThat(outputs.getUris(), notNullValue()); assertThat(outputs.getUris().size(), is(3)); assertThat( - new String(storageInterface.get(null, outputs.getUris().get("test/hello-input.txt")).readAllBytes()), + new String(storageInterface.get(null, null, outputs.getUris().get("test/hello-input.txt")).readAllBytes()), is("Hello Input") ); assertThat( - new String(storageInterface.get(null, outputs.getUris().get("test/sub/dir/2/execution.txt")) + new String(storageInterface.get(null, null, outputs.getUris().get("test/sub/dir/2/execution.txt")) .readAllBytes()), is("tata") ); assertThat( - new String(storageInterface.get(null, outputs.getUris().get( "test/sub/dir/3/application-test.yml")) + new String(storageInterface.get(null, null, outputs.getUris().get( "test/sub/dir/3/application-test.yml")) .readAllBytes()), - is(new String(storageInterface.get(null, storageFile).readAllBytes())) + is(new String(storageInterface.get(null, null, storageFile).readAllBytes())) ); runContext.cleanup(); } diff --git a/core/src/test/java/io/kestra/plugin/core/storage/ReverseTest.java b/core/src/test/java/io/kestra/plugin/core/storage/ReverseTest.java index 5bb68bcebb..b97f3f27eb 100644 --- a/core/src/test/java/io/kestra/plugin/core/storage/ReverseTest.java +++ b/core/src/test/java/io/kestra/plugin/core/storage/ReverseTest.java @@ -29,6 +29,7 @@ void run() throws Exception { RunContext runContext = runContextFactory.of(); URI put = storageInterface.put( + null, null, new URI("/file/storage/get.yml"), new ByteArrayInputStream("1\n2\n3\n".getBytes()) @@ -42,6 +43,6 @@ void run() throws Exception { Reverse.Output run = result.run(runContext); assertThat(run.getUri().getPath(), endsWith(".yml")); - assertThat(CharStreams.toString(new InputStreamReader(storageInterface.get(null, run.getUri()))), is("3\n2\n1\n")); + assertThat(CharStreams.toString(new InputStreamReader(storageInterface.get(null, null, run.getUri()))), is("3\n2\n1\n")); } } \ No newline at end of file diff --git a/core/src/test/java/io/kestra/plugin/core/storage/SizeTest.java b/core/src/test/java/io/kestra/plugin/core/storage/SizeTest.java index 9a41fa94f0..f560fb031b 100644 --- a/core/src/test/java/io/kestra/plugin/core/storage/SizeTest.java +++ b/core/src/test/java/io/kestra/plugin/core/storage/SizeTest.java @@ -32,6 +32,7 @@ void run() throws Exception { new Random().nextBytes(randomBytes); URI put = storageInterface.put( + null, null, new URI("/file/storage/get.yml"), new ByteArrayInputStream(randomBytes) diff --git a/core/src/test/java/io/kestra/plugin/core/storage/SplitTest.java b/core/src/test/java/io/kestra/plugin/core/storage/SplitTest.java index b79310181e..15bee0ea60 100644 --- a/core/src/test/java/io/kestra/plugin/core/storage/SplitTest.java +++ b/core/src/test/java/io/kestra/plugin/core/storage/SplitTest.java @@ -93,7 +93,7 @@ private List content(int count) { private String readAll(List uris) throws IOException { return uris .stream() - .map(Rethrow.throwFunction(uri -> CharStreams.toString(new InputStreamReader(storageInterface.get(null, uri))))) + .map(Rethrow.throwFunction(uri -> CharStreams.toString(new InputStreamReader(storageInterface.get(null, null, uri))))) .collect(Collectors.joining()); } @@ -104,6 +104,7 @@ URI storageUpload(int count) throws URISyntaxException, IOException { Files.write(tempFile.toPath(), content(count)); return storageInterface.put( + null, null, new URI("/file/storage/get.yml"), new FileInputStream(tempFile) diff --git a/storage-local/src/main/java/io/kestra/storage/local/LocalStorage.java b/storage-local/src/main/java/io/kestra/storage/local/LocalStorage.java index 90946dd599..f3c1a448da 100644 --- a/storage-local/src/main/java/io/kestra/storage/local/LocalStorage.java +++ b/storage-local/src/main/java/io/kestra/storage/local/LocalStorage.java @@ -14,6 +14,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; import java.io.*; import java.net.URI; import java.nio.file.*; @@ -60,7 +61,7 @@ private Path getPath(String tenantId, URI uri) { } @Override - public InputStream get(String tenantId, URI uri) throws IOException { + public InputStream get(String tenantId, @Nullable String namespace, URI uri) throws IOException { return new BufferedInputStream(new FileInputStream(getPath(tenantId, uri) .toAbsolutePath() .toString()) @@ -68,12 +69,12 @@ public InputStream get(String tenantId, URI uri) throws IOException { } @Override - public StorageObject getWithMetadata(String tenantId, URI uri) throws IOException { - return new StorageObject(LocalFileAttributes.getMetadata(this.getPath(tenantId, uri)), this.get(tenantId, uri)); + public StorageObject getWithMetadata(String tenantId, @Nullable String namespace, URI uri) throws IOException { + return new StorageObject(LocalFileAttributes.getMetadata(this.getPath(tenantId, uri)), this.get(tenantId, namespace, uri)); } @Override - public List allByPrefix(String tenantId, URI prefix, boolean includeDirectories) throws IOException { + public List allByPrefix(String tenantId, @Nullable String namespace, URI prefix, boolean includeDirectories) throws IOException { Path fsPath = getPath(tenantId, prefix); List uris = new ArrayList<>(); Files.walkFileTree(fsPath, new SimpleFileVisitor<>() { @@ -115,12 +116,12 @@ public FileVisitResult visitFileFailed(Path file, IOException exc) { } @Override - public boolean exists(String tenantId, URI uri) { + public boolean exists(String tenantId, @Nullable String namespace, URI uri) { return Files.exists(getPath(tenantId, uri)); } @Override - public List list(String tenantId, URI uri) throws IOException { + public List list(String tenantId, @Nullable String namespace, URI uri) throws IOException { try (Stream stream = Files.list(getPath(tenantId, uri))) { return stream .filter(path -> !path.getFileName().toString().endsWith(".metadata")) @@ -130,7 +131,7 @@ public List list(String tenantId, URI uri) throws IOException { Path.of(file.toUri()) ).toString().replace("\\", "/") ); - return getAttributes(tenantId, relative); + return getAttributes(tenantId, namespace, relative); })) .toList(); } catch (NoSuchFileException e) { @@ -139,7 +140,7 @@ public List list(String tenantId, URI uri) throws IOException { } @Override - public URI put(String tenantId, URI uri, StorageObject storageObject) throws IOException { + public URI put(String tenantId, @Nullable String namespace, URI uri, StorageObject storageObject) throws IOException { File file = getPath(tenantId, uri).toFile(); File parent = file.getParentFile(); if (!parent.exists()) { @@ -165,7 +166,7 @@ public URI put(String tenantId, URI uri, StorageObject storageObject) throws IOE } @Override - public FileAttributes getAttributes(String tenantId, URI uri) throws IOException { + public FileAttributes getAttributes(String tenantId, @Nullable String namespace, URI uri) throws IOException { Path path = getPath(tenantId, uri); try { return LocalFileAttributes.builder() @@ -178,7 +179,7 @@ public FileAttributes getAttributes(String tenantId, URI uri) throws IOException } @Override - public URI createDirectory(String tenantId, URI uri) { + public URI createDirectory(String tenantId, @Nullable String namespace, URI uri) { if (uri == null || uri.getPath().isEmpty()) { throw new IllegalArgumentException("Unable to create a directory with empty url."); } @@ -190,7 +191,7 @@ public URI createDirectory(String tenantId, URI uri) { } @Override - public URI move(String tenantId, URI from, URI to) throws IOException { + public URI move(String tenantId, @Nullable String namespace, URI from, URI to) throws IOException { try { Files.move( getPath(tenantId, from), @@ -203,7 +204,7 @@ public URI move(String tenantId, URI from, URI to) throws IOException { } @Override - public boolean delete(String tenantId, URI uri) throws IOException { + public boolean delete(String tenantId, @Nullable String namespace, URI uri) throws IOException { Path path = getPath(tenantId, uri); File file = path.toFile(); @@ -217,7 +218,7 @@ public boolean delete(String tenantId, URI uri) throws IOException { @SuppressWarnings("ResultOfMethodCallIgnored") @Override - public List deleteByPrefix(String tenantId, URI storagePrefix) throws IOException { + public List deleteByPrefix(String tenantId, @Nullable String namespace, URI storagePrefix) throws IOException { Path path = this.getPath(tenantId, storagePrefix); if (!path.toFile().exists()) { diff --git a/tests/src/main/java/io/kestra/core/models/tasks/runners/AbstractTaskRunnerTest.java b/tests/src/main/java/io/kestra/core/models/tasks/runners/AbstractTaskRunnerTest.java index c25454046e..bf4ef4be34 100644 --- a/tests/src/main/java/io/kestra/core/models/tasks/runners/AbstractTaskRunnerTest.java +++ b/tests/src/main/java/io/kestra/core/models/tasks/runners/AbstractTaskRunnerTest.java @@ -129,9 +129,9 @@ public void accept(String log, Boolean isStdErr) { assertThat(logEntries.stream().filter(e -> e.getKey().contains("Hello World")).findFirst().orElseThrow().getValue(), is(false)); // Verify outputFiles - assertThat(IOUtils.toString(storage.get(null, outputFiles.get("output.txt")), StandardCharsets.UTF_8), is("Hello World")); - assertThat(IOUtils.toString(storage.get(null, outputFiles.get("file.txt")), StandardCharsets.UTF_8), is("file from output dir")); - assertThat(IOUtils.toString(storage.get(null, outputFiles.get("nested/file.txt")), StandardCharsets.UTF_8), is("nested file from output dir")); + assertThat(IOUtils.toString(storage.get(null, "unittest", outputFiles.get("output.txt")), StandardCharsets.UTF_8), is("Hello World")); + assertThat(IOUtils.toString(storage.get(null, "unittest", outputFiles.get("file.txt")), StandardCharsets.UTF_8), is("file from output dir")); + assertThat(IOUtils.toString(storage.get(null, "unittest", outputFiles.get("nested/file.txt")), StandardCharsets.UTF_8), is("nested file from output dir")); assertThat(defaultLogConsumer.getOutputs().get("logOutput"), is("Hello World")); } diff --git a/tests/src/main/java/io/kestra/core/storage/StorageTestSuite.java b/tests/src/main/java/io/kestra/core/storage/StorageTestSuite.java index 5213deb178..606a742eb8 100644 --- a/tests/src/main/java/io/kestra/core/storage/StorageTestSuite.java +++ b/tests/src/main/java/io/kestra/core/storage/StorageTestSuite.java @@ -58,16 +58,16 @@ void getNoCrossTenant() throws Exception { putFile(null, nullTenant); URI with = new URI(withTenant); - InputStream get = storageInterface.get(tenantId, with); + InputStream get = storageInterface.get(tenantId, prefix, with); assertThat(CharStreams.toString(new InputStreamReader(get)), is(CONTENT_STRING)); - assertTrue(storageInterface.exists(tenantId, with)); - assertThrows(FileNotFoundException.class, () -> storageInterface.get(null, with)); + assertTrue(storageInterface.exists(tenantId, prefix, with)); + assertThrows(FileNotFoundException.class, () -> storageInterface.get(null, null, with)); URI without = new URI(nullTenant); - get = storageInterface.get(null, without); + get = storageInterface.get(null, prefix, without); assertThat(CharStreams.toString(new InputStreamReader(get)), is(CONTENT_STRING)); - assertTrue(storageInterface.exists(null, without)); - assertThrows(FileNotFoundException.class, () -> storageInterface.get(tenantId, without)); + assertTrue(storageInterface.exists(null, prefix, without)); + assertThrows(FileNotFoundException.class, () -> storageInterface.get(tenantId, null, without)); } @@ -77,7 +77,7 @@ void getWithScheme() throws Exception { String tenantId = IdUtils.create(); putFile(tenantId, "/" + prefix + "/storage/get.yml"); - InputStream getScheme = storageInterface.get(tenantId, new URI("kestra:///" + prefix + "/storage/get.yml")); + InputStream getScheme = storageInterface.get(tenantId, prefix, new URI("kestra:///" + prefix + "/storage/get.yml")); assertThat(CharStreams.toString(new InputStreamReader(getScheme)), is(CONTENT_STRING)); } @@ -86,9 +86,9 @@ private void get(String tenantId, String prefix) throws Exception { putFile(tenantId, "/" + prefix + "/storage/level2/2.yml"); URI item = new URI("/" + prefix + "/storage/get.yml"); - InputStream get = storageInterface.get(tenantId, item); + InputStream get = storageInterface.get(tenantId, prefix, item); assertThat(CharStreams.toString(new InputStreamReader(get)), is(CONTENT_STRING)); - assertTrue(storageInterface.exists(tenantId, item)); + assertTrue(storageInterface.exists(tenantId, prefix, item)); } @Test @@ -101,7 +101,7 @@ void getNoTraversal() throws Exception { putFile(tenantId, "/" + prefix + "/storage/level2/2.yml"); // Assert that '..' in path cannot be used as gcs do not use directory listing and traversal. assertThrows(IllegalArgumentException.class, () -> { - storageInterface.get(tenantId, new URI("kestra:///" + prefix + "/storage/level2/../get.yml")); + storageInterface.get(tenantId, prefix, new URI("kestra:///" + prefix + "/storage/level2/../get.yml")); }); } @@ -111,21 +111,21 @@ void getFileNotFound() { String tenantId = IdUtils.create(); assertThrows(FileNotFoundException.class, () -> { - storageInterface.get(tenantId, new URI("/" + prefix + "/storage/missing.yml")); + storageInterface.get(tenantId, prefix, new URI("/" + prefix + "/storage/missing.yml")); }); } //endregion @Test void filesByPrefix() throws IOException { - storageInterface.put(null, URI.create("/namespace/file.txt"), new ByteArrayInputStream(new byte[0])); - storageInterface.put("tenant", URI.create("/namespace/tenant_file.txt"), new ByteArrayInputStream(new byte[0])); - storageInterface.put(null, URI.create("/namespace/another_file.json"), new ByteArrayInputStream(new byte[0])); - storageInterface.put(null, URI.create("/namespace/folder/file.txt"), new ByteArrayInputStream(new byte[0])); - storageInterface.put(null, URI.create("/namespace/folder/some.yaml"), new ByteArrayInputStream(new byte[0])); - storageInterface.put(null, URI.create("/namespace/folder/sub/script.py"), new ByteArrayInputStream(new byte[0])); - - List res = storageInterface.allByPrefix(null, URI.create("kestra:///namespace/"), false); + storageInterface.put(null, "namespace", URI.create("/namespace/file.txt"), new ByteArrayInputStream(new byte[0])); + storageInterface.put("tenant", "namespace", URI.create("/namespace/tenant_file.txt"), new ByteArrayInputStream(new byte[0])); + storageInterface.put(null, "namespace", URI.create("/namespace/another_file.json"), new ByteArrayInputStream(new byte[0])); + storageInterface.put(null, "namespace", URI.create("/namespace/folder/file.txt"), new ByteArrayInputStream(new byte[0])); + storageInterface.put(null, "namespace", URI.create("/namespace/folder/some.yaml"), new ByteArrayInputStream(new byte[0])); + storageInterface.put(null, "namespace", URI.create("/namespace/folder/sub/script.py"), new ByteArrayInputStream(new byte[0])); + + List res = storageInterface.allByPrefix(null, "namespace", URI.create("kestra:///namespace/"), false); assertThat(res, containsInAnyOrder( URI.create("kestra:///namespace/file.txt"), URI.create("kestra:///namespace/another_file.json"), @@ -134,41 +134,41 @@ void filesByPrefix() throws IOException { URI.create("kestra:///namespace/folder/sub/script.py") )); - res = storageInterface.allByPrefix("tenant", URI.create("/namespace"), false); + res = storageInterface.allByPrefix("tenant", "namespace", URI.create("/namespace"), false); assertThat(res, containsInAnyOrder(URI.create("kestra:///namespace/tenant_file.txt"))); - res = storageInterface.allByPrefix(null, URI.create("/namespace/folder"), false); + res = storageInterface.allByPrefix(null, "namespace", URI.create("/namespace/folder"), false); assertThat(res, containsInAnyOrder( URI.create("kestra:///namespace/folder/file.txt"), URI.create("kestra:///namespace/folder/some.yaml"), URI.create("kestra:///namespace/folder/sub/script.py") )); - res = storageInterface.allByPrefix(null, URI.create("/namespace/folder/sub"), false); + res = storageInterface.allByPrefix(null, "namespace", URI.create("/namespace/folder/sub"), false); assertThat(res, containsInAnyOrder(URI.create("kestra:///namespace/folder/sub/script.py"))); - res = storageInterface.allByPrefix(null, URI.create("/namespace/non-existing"), false); + res = storageInterface.allByPrefix(null, "namespace", URI.create("/namespace/non-existing"), false); assertThat(res, empty()); } @Test void objectsByPrefix() throws IOException { - storageInterface.put(null, URI.create("/some_namespace/file.txt"), new ByteArrayInputStream(new byte[0])); - storageInterface.put("tenant", URI.create("/some_namespace/tenant_file.txt"), new ByteArrayInputStream(new byte[0])); - storageInterface.createDirectory(null, URI.create("/some_namespace/folder/sub")); + storageInterface.put(null, "some_namespace", URI.create("/some_namespace/file.txt"), new ByteArrayInputStream(new byte[0])); + storageInterface.put("tenant", "some_namespace", URI.create("/some_namespace/tenant_file.txt"), new ByteArrayInputStream(new byte[0])); + storageInterface.createDirectory(null, "some_namespace", URI.create("/some_namespace/folder/sub")); - List res = storageInterface.allByPrefix(null, URI.create("kestra:///some_namespace/"), true); + List res = storageInterface.allByPrefix(null, "some_namespace", URI.create("kestra:///some_namespace/"), true); assertThat(res, containsInAnyOrder( URI.create("kestra:///some_namespace/file.txt"), URI.create("kestra:///some_namespace/folder/"), URI.create("kestra:///some_namespace/folder/sub/") )); - res = storageInterface.allByPrefix("tenant", URI.create("/some_namespace"), true); + res = storageInterface.allByPrefix("tenant", "some_namespace", URI.create("/some_namespace"), true); assertThat(res, containsInAnyOrder(URI.create("kestra:///some_namespace/tenant_file.txt"))); - res = storageInterface.allByPrefix(null, URI.create("/some_namespace/folder"), true); + res = storageInterface.allByPrefix(null, "some_namespace", URI.create("/some_namespace/folder"), true); assertThat(res, containsInAnyOrder(URI.create("kestra:///some_namespace/folder/sub/"))); } @@ -203,7 +203,7 @@ void listNoTraversal() throws Exception { path.forEach(throwConsumer(s -> putFile(tenantId, s))); assertThrows(IllegalArgumentException.class, () -> { - storageInterface.list(tenantId, new URI("/" + prefix + "/storage/level2/..")); + storageInterface.list(tenantId, prefix, new URI("/" + prefix + "/storage/level2/..")); }); } @@ -213,7 +213,7 @@ void listNotFound() { String prefix = IdUtils.create(); String tenantId = IdUtils.create(); assertThrows(FileNotFoundException.class, () -> { - storageInterface.list(tenantId, new URI("/" + prefix + "/storage/")); + storageInterface.list(tenantId, prefix, new URI("/" + prefix + "/storage/")); }); } @@ -235,16 +235,16 @@ void listNoCrossTenant() throws Exception { ); nullTenant.forEach(throwConsumer(s -> putFile(null, s))); - List with = storageInterface.list(tenantId, new URI("/" + prefix + "/with")); + List with = storageInterface.list(tenantId, prefix, new URI("/" + prefix + "/with")); assertThat(with.stream().map(FileAttributes::getFileName).toList(), containsInAnyOrder("1.yml", "2.yml", "3.yml")); assertThrows(FileNotFoundException.class, () -> { - storageInterface.list(tenantId, new URI("/" + prefix + "/notenant/")); + storageInterface.list(tenantId, prefix, new URI("/" + prefix + "/notenant/")); }); - List notenant = storageInterface.list(null, new URI("/" + prefix + "/notenant")); + List notenant = storageInterface.list(null, prefix, new URI("/" + prefix + "/notenant")); assertThat(notenant.stream().map(FileAttributes::getFileName).toList(), containsInAnyOrder("1.yml", "2.yml", "3.yml")); assertThrows(FileNotFoundException.class, () -> { - storageInterface.list(null, new URI("/" + prefix + "/with/")); + storageInterface.list(null, prefix, new URI("/" + prefix + "/with/")); }); } @@ -260,7 +260,7 @@ void listWithScheme() throws Exception { ); path.forEach(throwConsumer(s -> putFile(tenantId, s))); - List list = storageInterface.list(tenantId, new URI("kestra:///" + prefix + "/storage")); + List list = storageInterface.list(tenantId, prefix, new URI("kestra:///" + prefix + "/storage")); assertThat(list.stream().map(FileAttributes::getFileName).toList(), containsInAnyOrder("root.yml", "level1", "another")); } @@ -274,10 +274,10 @@ private void list(String prefix, String tenantId) throws Exception { ); path.forEach(throwConsumer(s -> putFile(tenantId, s))); - List list = storageInterface.list(tenantId, null); + List list = storageInterface.list(tenantId, prefix, null); assertThat(list.stream().map(FileAttributes::getFileName).toList(), hasItem(prefix)); - list = storageInterface.list(tenantId, new URI("/" + prefix + "/storage")); + list = storageInterface.list(tenantId, prefix, new URI("/" + prefix + "/storage")); assertThat(list.stream().map(FileAttributes::getFileName).toList(), containsInAnyOrder("root.yml", "level1", "another")); } //endregion @@ -301,8 +301,8 @@ void existsNoTenant() throws Exception { private void exists(String prefix, String tenantId) throws Exception { putFile(tenantId, "/" + prefix + "/storage/put.yml"); - assertThat(storageInterface.exists(tenantId, new URI("/" + prefix + "/storage/put.yml")), is(true)); - assertThat(storageInterface.exists(tenantId, new URI("/" + prefix + "/storage/notfound.yml")), is(false)); + assertThat(storageInterface.exists(tenantId, prefix, new URI("/" + prefix + "/storage/put.yml")), is(true)); + assertThat(storageInterface.exists(tenantId, prefix, new URI("/" + prefix + "/storage/notfound.yml")), is(false)); } @Test @@ -319,7 +319,7 @@ void existsNoTraversal() throws Exception { path.forEach(throwConsumer(s -> putFile(tenantId, s))); assertThrows(IllegalArgumentException.class, () -> { - storageInterface.exists(tenantId, new URI("/" + prefix + "/storage/level2/..")); + storageInterface.exists(tenantId, prefix, new URI("/" + prefix + "/storage/level2/..")); }); } @@ -334,12 +334,12 @@ void existsNoCrossTenant() throws Exception { putFile(null, nullTenant); URI with = new URI(withTenant); - assertTrue(storageInterface.exists(tenantId, with)); - assertFalse(storageInterface.exists(null, with)); + assertTrue(storageInterface.exists(tenantId, prefix, with)); + assertFalse(storageInterface.exists(null, prefix, with)); URI without = new URI(nullTenant); - assertFalse(storageInterface.exists(tenantId, without)); - assertTrue(storageInterface.exists(null, without)); + assertFalse(storageInterface.exists(tenantId, prefix, without)); + assertTrue(storageInterface.exists(null, prefix, without)); } @@ -349,7 +349,7 @@ void existsWithScheme() throws Exception { String tenantId = IdUtils.create(); putFile(tenantId, "/" + prefix + "/storage/get.yml"); - assertTrue(storageInterface.exists(tenantId, new URI("kestra:///" + prefix + "/storage/get.yml"))); + assertTrue(storageInterface.exists(tenantId, prefix, new URI("kestra:///" + prefix + "/storage/get.yml"))); } //endregion @@ -372,7 +372,7 @@ void sizeNoTenant() throws Exception { private void size(String prefix, String tenantId) throws Exception { URI put = putFile(tenantId, "/" + prefix + "/storage/put.yml"); - assertThat(storageInterface.getAttributes(tenantId, new URI("/" + prefix + "/storage/put.yml")).getSize(), is((long) CONTENT_STRING.length())); + assertThat(storageInterface.getAttributes(tenantId, prefix, new URI("/" + prefix + "/storage/put.yml")).getSize(), is((long) CONTENT_STRING.length())); } @Test @@ -388,7 +388,7 @@ void sizeNoTraversal() throws Exception { path.forEach(throwConsumer(s -> putFile(tenantId, s))); assertThrows(IllegalArgumentException.class, () -> { - storageInterface.getAttributes(tenantId, new URI("/" + prefix + "/storage/level2/../1.yml")).getSize(); + storageInterface.getAttributes(tenantId, prefix, new URI("/" + prefix + "/storage/level2/../1.yml")).getSize(); }); } @@ -397,7 +397,7 @@ void sizeNotFound() { String prefix = IdUtils.create(); String tenantId = IdUtils.create(); assertThrows(FileNotFoundException.class, () -> { - storageInterface.getAttributes(tenantId, new URI("/" + prefix + "/storage/")).getSize(); + storageInterface.getAttributes(tenantId, prefix, new URI("/" + prefix + "/storage/")).getSize(); }); } @@ -412,15 +412,15 @@ void sizeNoCrossTenant() throws Exception { putFile(null, nullTenant); URI with = new URI(withTenant); - assertThat(storageInterface.getAttributes(tenantId, with).getSize(), is((long) CONTENT_STRING.length())); + assertThat(storageInterface.getAttributes(tenantId, prefix, with).getSize(), is((long) CONTENT_STRING.length())); assertThrows(FileNotFoundException.class, () -> { - storageInterface.getAttributes(null, with).getSize(); + storageInterface.getAttributes(null, prefix, with).getSize(); }); URI without = new URI(nullTenant); - assertThat(storageInterface.getAttributes(null, without).getSize(), is((long) CONTENT_STRING.length())); + assertThat(storageInterface.getAttributes(null, prefix, without).getSize(), is((long) CONTENT_STRING.length())); assertThrows(FileNotFoundException.class, () -> { - storageInterface.getAttributes(tenantId, without).getSize(); + storageInterface.getAttributes(tenantId, prefix, without).getSize(); }); } @@ -431,7 +431,7 @@ void sizeWithScheme() throws Exception { String tenantId = IdUtils.create(); putFile(tenantId, "/" + prefix + "/storage/get.yml"); - assertThat(storageInterface.getAttributes(tenantId, new URI("kestra:///" + prefix + "/storage/get.yml")).getSize(), is((long) CONTENT_STRING.length())); + assertThat(storageInterface.getAttributes(tenantId, prefix, new URI("kestra:///" + prefix + "/storage/get.yml")).getSize(), is((long) CONTENT_STRING.length())); } //endregion @@ -454,7 +454,7 @@ void lastModifiedTimeNoTenant() throws Exception { private void lastModifiedTime(String prefix, String tenantId) throws Exception { putFile(tenantId, "/" + prefix + "/storage/put.yml"); - assertThat(storageInterface.getAttributes(tenantId, new URI("/" + prefix + "/storage/put.yml")).getLastModifiedTime(), notNullValue()); + assertThat(storageInterface.getAttributes(tenantId, prefix, new URI("/" + prefix + "/storage/put.yml")).getLastModifiedTime(), notNullValue()); } @Test @@ -470,7 +470,7 @@ void lastModifiedTimeNoTraversal() throws Exception { path.forEach(throwConsumer(s -> putFile(tenantId, s))); assertThrows(IllegalArgumentException.class, () -> { - storageInterface.getAttributes(tenantId, new URI("/" + prefix + "/storage/level2/../1.yml")).getLastModifiedTime(); + storageInterface.getAttributes(tenantId, prefix, new URI("/" + prefix + "/storage/level2/../1.yml")).getLastModifiedTime(); }); } @@ -479,7 +479,7 @@ void lastModifiedTimeNotFound() { String prefix = IdUtils.create(); String tenantId = IdUtils.create(); assertThrows(FileNotFoundException.class, () -> { - storageInterface.getAttributes(tenantId, new URI("/" + prefix + "/storage/")).getLastModifiedTime(); + storageInterface.getAttributes(tenantId, prefix, new URI("/" + prefix + "/storage/")).getLastModifiedTime(); }); } @@ -494,15 +494,15 @@ void lastModifiedTimeNoCrossTenant() throws Exception { putFile(null, nullTenant); URI with = new URI(withTenant); - assertThat(storageInterface.getAttributes(tenantId, with).getLastModifiedTime(), notNullValue()); + assertThat(storageInterface.getAttributes(tenantId, prefix, with).getLastModifiedTime(), notNullValue()); assertThrows(FileNotFoundException.class, () -> { - storageInterface.getAttributes(null, with).getLastModifiedTime(); + storageInterface.getAttributes(null, prefix, with).getLastModifiedTime(); }); URI without = new URI(nullTenant); - assertThat(storageInterface.getAttributes(null, without).getLastModifiedTime(), notNullValue()); + assertThat(storageInterface.getAttributes(null, prefix, without).getLastModifiedTime(), notNullValue()); assertThrows(FileNotFoundException.class, () -> { - storageInterface.getAttributes(tenantId, without).getLastModifiedTime(); + storageInterface.getAttributes(tenantId, prefix, without).getLastModifiedTime(); }); } @@ -513,7 +513,7 @@ void lastModifiedTimeWithScheme() throws Exception { String tenantId = IdUtils.create(); putFile(tenantId, "/" + prefix + "/storage/get.yml"); - assertThat(storageInterface.getAttributes(tenantId, new URI("kestra:///" + prefix + "/storage/get.yml")).getLastModifiedTime(), notNullValue()); + assertThat(storageInterface.getAttributes(tenantId, prefix, new URI("kestra:///" + prefix + "/storage/get.yml")).getLastModifiedTime(), notNullValue()); } //endregion @@ -541,7 +541,7 @@ private void getAttributes(String prefix, String tenantId) throws Exception { ); path.forEach(throwConsumer(s -> this.putFile(tenantId, s))); - FileAttributes attr = storageInterface.getAttributes(tenantId, new URI("/" + prefix + "/storage/root.yml")); + FileAttributes attr = storageInterface.getAttributes(tenantId, prefix, new URI("/" + prefix + "/storage/root.yml")); assertThat(attr.getFileName(), is("root.yml")); assertThat(attr.getType(), is(FileAttributes.FileType.File)); assertThat(attr.getSize(), is((long) CONTENT_STRING.length())); @@ -552,7 +552,7 @@ private void getAttributes(String prefix, String tenantId) throws Exception { assertThat(creationInstant.isAfter(Instant.now().minus(Duration.ofMinutes(1))), is(true)); assertThat(creationInstant.isBefore(Instant.now()), is(true)); - attr = storageInterface.getAttributes(tenantId, new URI("/" + prefix + "/storage/level1")); + attr = storageInterface.getAttributes(tenantId, prefix, new URI("/" + prefix + "/storage/level1")); assertThat(attr.getFileName(), is("level1")); assertThat(attr.getType(), is(FileAttributes.FileType.Directory)); lastModifiedInstant = Instant.ofEpochMilli(attr.getLastModifiedTime()); @@ -576,7 +576,7 @@ void getAttributesNoTraversal() throws Exception { path.forEach(throwConsumer(s -> putFile(tenantId, s))); assertThrows(IllegalArgumentException.class, () -> { - storageInterface.getAttributes(tenantId, new URI("/" + prefix + "/storage/level2/../1.yml")); + storageInterface.getAttributes(tenantId, prefix, new URI("/" + prefix + "/storage/level2/../1.yml")); }); } @@ -585,7 +585,7 @@ void getAttributesNotFound() { String prefix = IdUtils.create(); String tenantId = IdUtils.create(); assertThrows(FileNotFoundException.class, () -> { - storageInterface.getAttributes(tenantId, new URI("/" + prefix + "/storage/")); + storageInterface.getAttributes(tenantId, prefix, new URI("/" + prefix + "/storage/")); }); } @@ -600,17 +600,17 @@ void getAttributesNoCrossTenant() throws Exception { putFile(null, nullTenant); URI with = new URI(withTenant); - FileAttributes attr = storageInterface.getAttributes(tenantId, with); + FileAttributes attr = storageInterface.getAttributes(tenantId, prefix, with); assertThat(attr.getFileName(), is("withtenant.yml")); assertThrows(FileNotFoundException.class, () -> { - storageInterface.getAttributes(null, with); + storageInterface.getAttributes(null, prefix, with); }); URI without = new URI(nullTenant); - attr = storageInterface.getAttributes(null, without); + attr = storageInterface.getAttributes(null, prefix, without); assertThat(attr.getFileName(), is("nulltenant.yml")); assertThrows(FileNotFoundException.class, () -> { - storageInterface.getAttributes(tenantId, without); + storageInterface.getAttributes(tenantId, prefix, without); }); } @@ -620,7 +620,7 @@ void getAttributesWithScheme() throws Exception { String tenantId = IdUtils.create(); putFile(tenantId, "/" + prefix + "/storage/get.yml"); - FileAttributes attr = storageInterface.getAttributes(tenantId, new URI("kestra:///" + prefix + "/storage/get.yml")); + FileAttributes attr = storageInterface.getAttributes(tenantId, prefix, new URI("kestra:///" + prefix + "/storage/get.yml")); assertThat(attr.getFileName(), is("get.yml")); } //endregion @@ -643,12 +643,13 @@ void putFromAnotherFile() throws Exception { URI putFromAnother = storageInterface.put( tenantId, + prefix, new URI("/" + prefix + "/storage/put_from_another.yml"), - storageInterface.get(tenantId, new URI("/" + prefix + "/storage/put.yml")) + storageInterface.get(tenantId, prefix, new URI("/" + prefix + "/storage/put.yml")) ); assertThat(putFromAnother.toString(), is(new URI("kestra:///" + prefix + "/storage/put_from_another.yml").toString())); - InputStream get = storageInterface.get(tenantId, new URI("/" + prefix + "/storage/put_from_another.yml")); + InputStream get = storageInterface.get(tenantId, prefix, new URI("/" + prefix + "/storage/put_from_another.yml")); assertThat( CharStreams.toString(new InputStreamReader(get)), is(CONTENT_STRING) @@ -671,10 +672,11 @@ void putWithScheme() throws URISyntaxException, IOException { URI uri = new URI("kestra:///" + prefix + "/storage/get.yml"); storageInterface.put( tenantId, + prefix, uri, new ByteArrayInputStream(CONTENT_STRING.getBytes()) ); - InputStream getScheme = storageInterface.get(tenantId, new URI("/" + prefix + "/storage/get.yml")); + InputStream getScheme = storageInterface.get(tenantId, prefix, new URI("/" + prefix + "/storage/get.yml")); assertThat(CharStreams.toString(new InputStreamReader(getScheme)), is(CONTENT_STRING)); } @@ -683,11 +685,12 @@ void putNoTraversal() throws URISyntaxException, IOException { String prefix = IdUtils.create(); String tenantId = IdUtils.create(); - storageInterface.createDirectory(tenantId, new URI("/" + prefix + "/storage/level1")); + storageInterface.createDirectory(tenantId, prefix, new URI("/" + prefix + "/storage/level1")); assertThrows(IllegalArgumentException.class, () -> { storageInterface.put( tenantId, + prefix, new URI("kestra:///" + prefix + "/storage/level1/../get2.yml"), new ByteArrayInputStream(CONTENT_STRING.getBytes()) ); @@ -697,7 +700,7 @@ void putNoTraversal() throws URISyntaxException, IOException { private void put(String tenantId, String prefix) throws Exception { URI put = putFile(tenantId, "/" + prefix + "/storage/put.yml"); - InputStream get = storageInterface.get(tenantId, new URI("/" + prefix + "/storage/put.yml")); + InputStream get = storageInterface.get(tenantId, prefix, new URI("/" + prefix + "/storage/put.yml")); assertThat(put.toString(), is(new URI("kestra:///" + prefix + "/storage/put.yml").toString())); assertThat( @@ -737,7 +740,7 @@ void deleteNoTraversal() throws Exception { path.forEach(throwConsumer(s -> this.putFile(tenantId, s))); assertThrows(IllegalArgumentException.class, () -> { - storageInterface.delete(tenantId, new URI("/" + prefix + "/storage/level2/../1.yml")); + storageInterface.delete(tenantId, prefix, new URI("/" + prefix + "/storage/level2/../1.yml")); }); } @@ -745,7 +748,7 @@ void deleteNoTraversal() throws Exception { void deleteNotFound() throws URISyntaxException, IOException { String prefix = IdUtils.create(); String tenantId = IdUtils.create(); - assertThat(storageInterface.delete(tenantId, new URI("/" + prefix + "/storage/")), is(false)); + assertThat(storageInterface.delete(tenantId, prefix, new URI("/" + prefix + "/storage/")), is(false)); } private void delete(String prefix, String tenantId) throws Exception { @@ -760,23 +763,23 @@ private void delete(String prefix, String tenantId) throws Exception { ); path.forEach(throwConsumer(s -> this.putFile(tenantId, s))); - boolean deleted = storageInterface.delete(tenantId, new URI("/" + prefix + "/storage/level1")); + boolean deleted = storageInterface.delete(tenantId, prefix, new URI("/" + prefix + "/storage/level1")); assertThat(deleted, is(true)); - assertThat(storageInterface.exists(tenantId, new URI("/" + prefix + "/storage/root.yml")), is(true)); - assertThat(storageInterface.exists(tenantId, new URI("/" + prefix + "/storage/another/1.yml")), is(true)); - assertThat(storageInterface.exists(tenantId, new URI("/" + prefix + "/storage/level1")), is(false)); - assertThat(storageInterface.exists(tenantId, new URI("/" + prefix + "/storage/level12.yml")), is(true)); - assertThat(storageInterface.exists(tenantId, new URI("/" + prefix + "/storage/level1/1.yml")), is(false)); - assertThat(storageInterface.exists(tenantId, new URI("/" + prefix + "/storage/level1/level2/1.yml")), is(false)); - - deleted = storageInterface.delete(tenantId, new URI("/" + prefix + "/storage/root.yml")); + assertThat(storageInterface.exists(tenantId, prefix, new URI("/" + prefix + "/storage/root.yml")), is(true)); + assertThat(storageInterface.exists(tenantId, prefix, new URI("/" + prefix + "/storage/another/1.yml")), is(true)); + assertThat(storageInterface.exists(tenantId, prefix, new URI("/" + prefix + "/storage/level1")), is(false)); + assertThat(storageInterface.exists(tenantId, prefix, new URI("/" + prefix + "/storage/level12.yml")), is(true)); + assertThat(storageInterface.exists(tenantId, prefix, new URI("/" + prefix + "/storage/level1/1.yml")), is(false)); + assertThat(storageInterface.exists(tenantId, prefix, new URI("/" + prefix + "/storage/level1/level2/1.yml")), is(false)); + + deleted = storageInterface.delete(tenantId, prefix, new URI("/" + prefix + "/storage/root.yml")); assertThat(deleted, is(true)); - assertThat(storageInterface.exists(tenantId, new URI("/" + prefix + "/storage/root.yml")), is(false)); + assertThat(storageInterface.exists(tenantId, prefix, new URI("/" + prefix + "/storage/root.yml")), is(false)); - deleted = storageInterface.delete(tenantId, new URI("/" + prefix + "/storage/file")); + deleted = storageInterface.delete(tenantId, prefix, new URI("/" + prefix + "/storage/file")); assertThat(deleted, is(true)); - assertThat(storageInterface.exists(tenantId, new URI("/" + prefix + "/storage/file")), is(false)); - assertThat(storageInterface.exists(tenantId, new URI("/" + prefix + "/storage/file.txt")), is(true)); + assertThat(storageInterface.exists(tenantId, prefix, new URI("/" + prefix + "/storage/file")), is(false)); + assertThat(storageInterface.exists(tenantId, prefix, new URI("/" + prefix + "/storage/file.txt")), is(true)); } @Test @@ -785,8 +788,8 @@ void deleteWithScheme() throws Exception { String tenantId = IdUtils.create(); putFile(tenantId, "/" + prefix + "/storage/get.yml"); - assertTrue(storageInterface.delete(tenantId, new URI("kestra:///" + prefix + "/storage/get.yml"))); - assertThat(storageInterface.exists(tenantId, new URI("/" + prefix + "/storage/get.yml")), is(false)); + assertTrue(storageInterface.delete(tenantId, prefix, new URI("kestra:///" + prefix + "/storage/get.yml"))); + assertThat(storageInterface.exists(tenantId, prefix, new URI("/" + prefix + "/storage/get.yml")), is(false)); } //endregion @@ -820,13 +823,13 @@ void createDirectoryNoTraversal() throws Exception { path.forEach(throwConsumer(s -> this.putFile(tenantId, s))); assertThrows(IllegalArgumentException.class, () -> { - storageInterface.createDirectory(tenantId, new URI("/" + prefix + "/storage/level2/../newdir")); + storageInterface.createDirectory(tenantId, prefix, new URI("/" + prefix + "/storage/level2/../newdir")); }); } private void createDirectory(String prefix, String tenantId) throws Exception { - storageInterface.createDirectory(tenantId, new URI("/" + prefix + "/storage/level1")); - FileAttributes attr = storageInterface.getAttributes(tenantId, new URI("/" + prefix + "/storage/level1")); + storageInterface.createDirectory(tenantId, prefix, new URI("/" + prefix + "/storage/level1")); + FileAttributes attr = storageInterface.getAttributes(tenantId, prefix, new URI("/" + prefix + "/storage/level1")); assertThat(attr.getFileName(), is("level1")); assertThat(attr.getType(), is(FileAttributes.FileType.Directory)); assertThat(attr.getLastModifiedTime(), notNullValue()); @@ -837,8 +840,8 @@ void createDirectoryWithScheme() throws Exception { String prefix = IdUtils.create(); String tenantId = IdUtils.create(); - storageInterface.createDirectory(tenantId, new URI("kestra:///" + prefix + "/storage/level1")); - FileAttributes attr = storageInterface.getAttributes(tenantId, new URI("/" + prefix + "/storage/level1")); + storageInterface.createDirectory(tenantId, prefix, new URI("kestra:///" + prefix + "/storage/level1")); + FileAttributes attr = storageInterface.getAttributes(tenantId, prefix, new URI("/" + prefix + "/storage/level1")); assertThat(attr.getFileName(), is("level1")); assertThat(attr.getType(), is(FileAttributes.FileType.Directory)); assertThat(attr.getLastModifiedTime(), notNullValue()); @@ -847,9 +850,9 @@ void createDirectoryWithScheme() throws Exception { @Test void createDirectoryShouldBeRecursive() throws IOException { String prefix = IdUtils.create(); - storageInterface.createDirectory(null, URI.create("/" + prefix + "/first/second/third")); + storageInterface.createDirectory(null, prefix, URI.create("/" + prefix + "/first/second/third")); - List list = storageInterface.list(null, URI.create("/" + prefix)); + List list = storageInterface.list(null, prefix, URI.create("/" + prefix)); assertThat(list, contains( hasProperty("fileName", is("first")) )); @@ -878,7 +881,7 @@ void moveNotFound() { String prefix = IdUtils.create(); String tenantId = IdUtils.create(); assertThrows(FileNotFoundException.class, () -> { - storageInterface.move(tenantId, new URI("/" + prefix + "/storage/"), new URI("/" + prefix + "/test/")); + storageInterface.move(tenantId, prefix, new URI("/" + prefix + "/storage/"), new URI("/" + prefix + "/test/")); }); } @@ -895,7 +898,7 @@ void moveNoTraversal() throws Exception { path.forEach(throwConsumer(s -> this.putFile(tenantId, s))); assertThrows(IllegalArgumentException.class, () -> { - storageInterface.move(tenantId, new URI("/" + prefix + "/storage/level2/../1.yml"), new URI("/" + prefix + "/storage/level2/1.yml")); + storageInterface.move(tenantId, prefix, new URI("/" + prefix + "/storage/level2/../1.yml"), new URI("/" + prefix + "/storage/level2/1.yml")); }); } @@ -908,18 +911,18 @@ private void move(String prefix, String tenantId) throws Exception { ); path.forEach(throwConsumer(s -> this.putFile(tenantId, s))); - storageInterface.move(tenantId, new URI("/" + prefix + "/storage/level1"), new URI("/" + prefix + "/storage/moved")); + storageInterface.move(tenantId, prefix, new URI("/" + prefix + "/storage/level1"), new URI("/" + prefix + "/storage/moved")); - List list = storageInterface.list(tenantId, new URI("/" + prefix + "/storage/moved")); - assertThat(storageInterface.exists(tenantId, new URI("/" + prefix + "/storage/level1")), is(false)); + List list = storageInterface.list(tenantId, prefix, new URI("/" + prefix + "/storage/moved")); + assertThat(storageInterface.exists(tenantId, prefix, new URI("/" + prefix + "/storage/level1")), is(false)); assertThat(list.stream().map(FileAttributes::getFileName).toList(), containsInAnyOrder("level2", "1.yml")); - list = storageInterface.list(tenantId, new URI("/" + prefix + "/storage/moved/level2")); + list = storageInterface.list(tenantId, prefix, new URI("/" + prefix + "/storage/moved/level2")); assertThat(list.stream().map(FileAttributes::getFileName).toList(), containsInAnyOrder("2.yml")); - storageInterface.move(tenantId, new URI("/" + prefix + "/storage/root.yml"), new URI("/" + prefix + "/storage/root-moved.yml")); - assertThat(storageInterface.exists(tenantId, new URI("/" + prefix + "/storage/root.yml")), is(false)); - assertThat(storageInterface.exists(tenantId, new URI("/" + prefix + "/storage/root-moved.yml")), is(true)); + storageInterface.move(tenantId, prefix, new URI("/" + prefix + "/storage/root.yml"), new URI("/" + prefix + "/storage/root-moved.yml")); + assertThat(storageInterface.exists(tenantId, prefix, new URI("/" + prefix + "/storage/root.yml")), is(false)); + assertThat(storageInterface.exists(tenantId, prefix, new URI("/" + prefix + "/storage/root-moved.yml")), is(true)); } @Test @@ -929,9 +932,9 @@ void moveWithScheme() throws Exception { this.putFile(tenantId, "/" + prefix + "/storage/root.yml"); - storageInterface.move(tenantId, new URI("kestra:///" + prefix + "/storage/root.yml"), new URI("kestra:///" + prefix + "/storage/root-moved.yml")); - assertThat(storageInterface.exists(tenantId, new URI("/" + prefix + "/storage/root.yml")), is(false)); - assertThat(storageInterface.exists(tenantId, new URI("/" + prefix + "/storage/root-moved.yml")), is(true)); + storageInterface.move(tenantId, prefix, new URI("kestra:///" + prefix + "/storage/root.yml"), new URI("kestra:///" + prefix + "/storage/root-moved.yml")); + assertThat(storageInterface.exists(tenantId, prefix, new URI("/" + prefix + "/storage/root.yml")), is(false)); + assertThat(storageInterface.exists(tenantId, prefix, new URI("/" + prefix + "/storage/root-moved.yml")), is(true)); } //endregion @@ -956,7 +959,7 @@ void deleteByPrefixNoTenant() throws Exception { void deleteByPrefixNotFound() throws URISyntaxException, IOException { String prefix = IdUtils.create(); String tenantId = IdUtils.create(); - assertThat(storageInterface.deleteByPrefix(tenantId, new URI("/" + prefix + "/storage/")), containsInAnyOrder()); + assertThat(storageInterface.deleteByPrefix(tenantId, prefix, new URI("/" + prefix + "/storage/")), containsInAnyOrder()); } @Test @@ -972,7 +975,7 @@ void deleteByPrefixNoTraversal() throws Exception { path.forEach(throwConsumer(s -> this.putFile(tenantId, s))); assertThrows(IllegalArgumentException.class, () -> { - storageInterface.move(tenantId, new URI("/" + prefix + "/storage/level2/../1.yml"), new URI("/" + prefix + "/storage/level2/1.yml")); + storageInterface.move(tenantId, prefix, new URI("/" + prefix + "/storage/level2/../1.yml"), new URI("/" + prefix + "/storage/level2/1.yml")); }); } @@ -985,7 +988,7 @@ private void deleteByPrefix(String prefix, String tenantId) throws Exception { path.forEach(throwConsumer(s -> this.putFile(tenantId, s))); - List deleted = storageInterface.deleteByPrefix(tenantId, new URI("/" + prefix + "/storage/")); + List deleted = storageInterface.deleteByPrefix(tenantId, prefix, new URI("/" + prefix + "/storage/")); List res = Arrays.asList( "/" + prefix + "/storage", @@ -999,11 +1002,11 @@ private void deleteByPrefix(String prefix, String tenantId) throws Exception { assertThat(deleted, containsInAnyOrder(res.stream().map(s -> URI.create("kestra://" + s)).toArray())); assertThrows(FileNotFoundException.class, () -> { - storageInterface.get(tenantId, new URI("/" + prefix + "/storage/")); + storageInterface.get(tenantId, prefix, new URI("/" + prefix + "/storage/")); }); path.forEach(throwConsumer(s -> { - assertThat(storageInterface.exists(tenantId, new URI(s)), is(false)); + assertThat(storageInterface.exists(tenantId, prefix, new URI(s)), is(false)); })); } @@ -1020,7 +1023,7 @@ void deleteByPrefixWithScheme() throws Exception { path.forEach(throwConsumer(s -> this.putFile(tenantId, s))); - List deleted = storageInterface.deleteByPrefix(tenantId, new URI("/" + prefix + "/storage/")); + List deleted = storageInterface.deleteByPrefix(tenantId, prefix, new URI("/" + prefix + "/storage/")); List res = Arrays.asList( "/" + prefix + "/storage", @@ -1034,11 +1037,11 @@ void deleteByPrefixWithScheme() throws Exception { assertThat(deleted, containsInAnyOrder(res.stream().map(s -> URI.create("kestra://" + s)).toArray())); assertThrows(FileNotFoundException.class, () -> { - storageInterface.get(tenantId, new URI("kestra:///" + prefix + "/storage/")); + storageInterface.get(tenantId, prefix, new URI("kestra:///" + prefix + "/storage/")); }); path.forEach(throwConsumer(s -> { - assertThat(storageInterface.exists(tenantId, new URI(s)), is(false)); + assertThat(storageInterface.exists(tenantId, prefix, new URI(s)), is(false)); })); } //endregion @@ -1053,7 +1056,7 @@ void metadata() throws Exception { "anotherComplexKey2", "value2" ); putFile(tenantId, "/" + prefix + "/storage/get.yml", expectedMetadata); - StorageObject withMetadata = storageInterface.getWithMetadata(tenantId, new URI("kestra:///" + prefix + "/storage/get.yml")); + StorageObject withMetadata = storageInterface.getWithMetadata(tenantId, null, new URI("kestra:///" + prefix + "/storage/get.yml")); assertThat(CharStreams.toString(new InputStreamReader(withMetadata.inputStream())), is(CONTENT_STRING)); assertThat(withMetadata.metadata(), is(expectedMetadata)); } @@ -1061,6 +1064,7 @@ void metadata() throws Exception { private URI putFile(String tenantId, String path) throws Exception { return storageInterface.put( tenantId, + null, new URI(path), new ByteArrayInputStream(CONTENT_STRING.getBytes()) ); @@ -1069,6 +1073,7 @@ private URI putFile(String tenantId, String path) throws Exception { private URI putFile(String tenantId, String path, Map metadata) throws Exception { return storageInterface.put( tenantId, + null, new URI(path), new StorageObject( metadata, diff --git a/webserver/src/main/java/io/kestra/webserver/controllers/api/ExecutionController.java b/webserver/src/main/java/io/kestra/webserver/controllers/api/ExecutionController.java index 4061bb5fb7..d0a78f9451 100644 --- a/webserver/src/main/java/io/kestra/webserver/controllers/api/ExecutionController.java +++ b/webserver/src/main/java/io/kestra/webserver/controllers/api/ExecutionController.java @@ -676,14 +676,9 @@ protected List