Skip to content

Commit

Permalink
feat(*): add namespace as a parameter of the internal storage
Browse files Browse the repository at this point in the history
  • Loading branch information
loicmathieu committed Nov 21, 2024
1 parent 62fd559 commit 78f5ff2
Show file tree
Hide file tree
Showing 49 changed files with 347 additions and 307 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public Integer call() throws Exception {
URI.create("/" + flow.getNamespace().replace(".", "/") + "/states")
))).map(potentialStateStoreUrisForAFlow -> Map.entry(potentialStateStoreUrisForAFlow.getKey(), potentialStateStoreUrisForAFlow.getValue().stream().flatMap(uri -> {
try {
return storageInterface.allByPrefix(potentialStateStoreUrisForAFlow.getKey().getTenantId(), uri, false).stream();
return storageInterface.allByPrefix(potentialStateStoreUrisForAFlow.getKey().getTenantId(), potentialStateStoreUrisForAFlow.getKey().getNamespace(), uri, false).stream();
} catch (IOException e) {
return Stream.empty();
}
Expand All @@ -59,9 +59,9 @@ public Integer call() throws Exception {
boolean flowScoped = flowQualifierWithStateQualifiers[0].endsWith("/" + flow.getId());
StateStore stateStore = new StateStore(runContext(runContextFactory, flow), false);

try (InputStream is = storageInterface.get(flow.getTenantId(), stateStoreFileUri)) {
try (InputStream is = storageInterface.get(flow.getTenantId(), flow.getNamespace(), stateStoreFileUri)) {
stateStore.putState(flowScoped, stateName, stateSubName, taskRunValue, is.readAllBytes());
storageInterface.delete(flow.getTenantId(), stateStoreFileUri);
storageInterface.delete(flow.getTenantId(), flow.getNamespace(), stateStoreFileUri);
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,12 @@ void runMigration() throws IOException, ResourceExpiredException {
URI oldStateStoreUri = URI.create("/" + flow.getNamespace().replace(".", "/") + "/" + Slugify.of("a-flow") + "/states/my-state/" + Hashing.hashToString("my-taskrun-value") + "/sub-name");
storage.put(
tenantId,
flow.getNamespace(),
oldStateStoreUri,
new ByteArrayInputStream("my-value".getBytes())
);
assertThat(
storage.exists(tenantId, oldStateStoreUri),
storage.exists(tenantId, flow.getNamespace(), oldStateStoreUri),
is(true)
);

Expand All @@ -73,7 +74,7 @@ void runMigration() throws IOException, ResourceExpiredException {

assertThat(new String(stateStore.getState(true, "my-state", "sub-name", "my-taskrun-value").readAllBytes()), is("my-value"));
assertThat(
storage.exists(tenantId, oldStateStoreUri),
storage.exists(tenantId, flow.getNamespace(), oldStateStoreUri),
is(false)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ public String tenantId() {
public FlowInfo flowInfo() {
Map<String, Object> flow = (Map<String, Object>) this.getVariables().get("flow");
// normally only tests should not have the flow variable
return flow == null ? null : new FlowInfo(
return flow == null ? new FlowInfo(null, null, null, null) : new FlowInfo(
(String) flow.get("tenantId"),
(String) flow.get("namespace"),
(String) flow.get("id"),
Expand Down Expand Up @@ -531,7 +531,7 @@ public String version() {

@Override
public KVStore namespaceKv(String namespace) {
return kvStoreService.get(tenantId(), namespace, this.flowInfo().namespace());
return kvStoreService.get(this.flowInfo().tenantId(), namespace, this.flowInfo().namespace());
}

/**
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/java/io/kestra/core/runners/RunContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ public abstract class RunContext {
*/
public abstract void cleanup();

/**
* @deprecated use flowInfo().tenantId() instead
*/
@Deprecated(forRemoval = true)
public abstract String tenantId();

public abstract FlowInfo flowInfo();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ private long getFileSizeFromInternalStorageUri(EvaluationContext context, URI pa
checkIfFileFromParentExecution(context, path);
}

FileAttributes fileAttributes = storageInterface.getAttributes(flow.get("tenantId"), path);
FileAttributes fileAttributes = storageInterface.getAttributes(flow.get("tenantId"), flow.get("namespace"), path);
return fileAttributes.getSize();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ private boolean calledOnWorker() {
private String readFromNamespaceFile(EvaluationContext context, String path) throws IOException {
Map<String, String> flow = (Map<String, String>) context.getVariable("flow");
URI namespaceFile = URI.create(StorageContext.namespaceFilePrefix(flow.get("namespace")) + "/" + path);
try (InputStream inputStream = storageInterface.get(flow.get("tenantId"), namespaceFile)) {
try (InputStream inputStream = storageInterface.get(flow.get("tenantId"), flow.get("namespace"), namespaceFile)) {
return new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
}
}
Expand All @@ -107,7 +107,7 @@ private String readFromInternalStorageUri(EvaluationContext context, URI path) t
}
}

try (InputStream inputStream = storageInterface.get(flow.get("tenantId"), path)) {
try (InputStream inputStream = storageInterface.get(flow.get("tenantId"), flow.get("namespace"), path)) {
return new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ public PurgeResult purge(

if (purgeStorage) {
URI uri = StorageContext.forExecution(execution).getExecutionStorageURI(StorageContext.KESTRA_SCHEME);
builder.storagesCount(storageInterface.deleteByPrefix(execution.getTenantId(), uri).size());
builder.storagesCount(storageInterface.deleteByPrefix(execution.getTenantId(), execution.getNamespace(), uri).size());
}

return (PurgeResult) builder.build();
Expand Down Expand Up @@ -441,7 +441,7 @@ public void delete(

if (deleteStorage) {
URI uri = StorageContext.forExecution(execution).getExecutionStorageURI(StorageContext.KESTRA_SCHEME);
storageInterface.deleteByPrefix(execution.getTenantId(), uri);
storageInterface.deleteByPrefix(execution.getTenantId(), execution.getNamespace(), uri);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public List<NamespaceFile> all(final boolean includeDirectories) throws IOExcept
@Override
public List<NamespaceFile> all(final String prefix, final boolean includeDirectories) throws IOException {
URI namespacePrefix = URI.create(NamespaceFile.of(namespace, Optional.ofNullable(prefix).map(Path::of).orElse(null)).storagePath().toString().replace("\\","/") + "/");
return storage.allByPrefix(tenant, namespacePrefix, includeDirectories)
return storage.allByPrefix(tenant, namespace, namespacePrefix, includeDirectories)
.stream()
.map(uri -> new NamespaceFile(relativize(uri), uri, namespace))
.toList();
Expand Down Expand Up @@ -119,7 +119,7 @@ public List<NamespaceFile> findAllFilesMatching(final Predicate<Path> predicate)
@Override
public InputStream getFileContent(final Path path) throws IOException {
Path namespaceFilePath = NamespaceFile.of(namespace, path).storagePath();
return storage.get(tenant, namespaceFilePath.toUri());
return storage.get(tenant, namespace, namespaceFilePath.toUri());
}

/**
Expand All @@ -130,11 +130,11 @@ public NamespaceFile putFile(final Path path, final InputStream content, final C
Path namespaceFilesPrefix = NamespaceFile.of(namespace, path).storagePath();
// Remove Windows letter
URI cleanUri = new URI(namespaceFilesPrefix.toUri().toString().replaceFirst("^file:///[a-zA-Z]:", ""));
final boolean exists = storage.exists(tenant, cleanUri);
final boolean exists = storage.exists(tenant, namespace, cleanUri);

return switch (onAlreadyExist) {
case OVERWRITE -> {
URI uri = storage.put(tenant, cleanUri, content);
URI uri = storage.put(tenant, namespace, cleanUri, content);
NamespaceFile namespaceFile = new NamespaceFile(relativize(uri), uri, namespace);
if (exists) {
logger.debug(String.format(
Expand All @@ -153,7 +153,7 @@ public NamespaceFile putFile(final Path path, final InputStream content, final C
}
case ERROR -> {
if (!exists) {
URI uri = storage.put(tenant, namespaceFilesPrefix.toUri(), content);
URI uri = storage.put(tenant, namespace, namespaceFilesPrefix.toUri(), content);
yield new NamespaceFile(relativize(uri), uri, namespace);
} else {
throw new IOException(String.format(
Expand All @@ -166,7 +166,7 @@ public NamespaceFile putFile(final Path path, final InputStream content, final C
}
case SKIP -> {
if (!exists) {
URI uri = storage.put(tenant, namespaceFilesPrefix.toUri(), content);
URI uri = storage.put(tenant, namespace, namespaceFilesPrefix.toUri(), content);
NamespaceFile namespaceFile = new NamespaceFile(relativize(uri), uri, namespace);
logger.debug(String.format(
"File '%s' added to namespace '%s'.",
Expand All @@ -193,14 +193,14 @@ public NamespaceFile putFile(final Path path, final InputStream content, final C
**/
@Override
public URI createDirectory(Path path) throws IOException {
return storage.createDirectory(tenant, NamespaceFile.of(namespace, path).storagePath().toUri());
return storage.createDirectory(tenant, namespace, NamespaceFile.of(namespace, path).storagePath().toUri());
}

/**
* {@inheritDoc}
**/
@Override
public boolean delete(Path path) throws IOException {
return storage.delete(tenant, URI.create(path.toString().replace("\\","/")));
return storage.delete(tenant, namespace, URI.create(path.toString().replace("\\","/")));
}
}
20 changes: 10 additions & 10 deletions core/src/main/java/io/kestra/core/storages/InternalStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public Namespace namespace(String namespace) {
**/
@Override
public boolean isFileExist(URI uri) {
return this.storage.exists(context.getTenantId(), uri);
return this.storage.exists(context.getTenantId(), context.getNamespace(), uri);
}

/**
Expand All @@ -98,7 +98,7 @@ public boolean isFileExist(URI uri) {
public InputStream getFile(final URI uri) throws IOException {
uriGuard(uri);

return this.storage.get(context.getTenantId(), uri);
return this.storage.get(context.getTenantId(), context.getNamespace(), uri);

}

Expand All @@ -109,7 +109,7 @@ public InputStream getFile(final URI uri) throws IOException {
public boolean deleteFile(final URI uri) throws IOException {
uriGuard(uri);

return this.storage.delete(context.getTenantId(), uri);
return this.storage.delete(context.getTenantId(), context.getNamespace(), uri);

}

Expand All @@ -133,7 +133,7 @@ private static void uriGuard(URI uri) {
**/
@Override
public List<URI> deleteExecutionFiles() throws IOException {
return this.storage.deleteByPrefix(context.getTenantId(), context.getExecutionStorageURI());
return this.storage.deleteByPrefix(context.getTenantId(), context.getNamespace(), context.getExecutionStorageURI());
}

/**
Expand All @@ -151,15 +151,15 @@ public URI getContextBaseURI() {
public URI putFile(InputStream inputStream, String name) throws IOException {
URI uri = context.getContextStorageURI();
URI resolved = uri.resolve(uri.getPath() + PATH_SEPARATOR + name);
return this.storage.put(context.getTenantId(), resolved, new BufferedInputStream(inputStream));
return this.storage.put(context.getTenantId(), context.getNamespace(), resolved, new BufferedInputStream(inputStream));
}

/**
* {@inheritDoc}
**/
@Override
public URI putFile(InputStream inputStream, URI uri) throws IOException {
return this.storage.put(context.getTenantId(), uri, new BufferedInputStream(inputStream));
return this.storage.put(context.getTenantId(), context.getNamespace(), uri, new BufferedInputStream(inputStream));
}

/**
Expand Down Expand Up @@ -211,14 +211,14 @@ public Optional<InputStream> getCacheFile(final String cacheId,
}
URI uri = context.getCacheURI(cacheId, objectId);
return isFileExist(uri) ?
Optional.of(storage.get(context.getTenantId(), uri)) :
Optional.of(storage.get(context.getTenantId(), context.getNamespace(), uri)) :
Optional.empty();
}

private Optional<Long> getCacheFileLastModifiedTime(String cacheId, @Nullable String objectId) throws IOException {
URI uri = context.getCacheURI(cacheId, objectId);
return isFileExist(uri) ?
Optional.of(this.storage.getAttributes(context.getTenantId(), uri).getLastModifiedTime()) :
Optional.of(this.storage.getAttributes(context.getTenantId(), context.getNamespace(), uri).getLastModifiedTime()) :
Optional.empty();
}

Expand All @@ -238,7 +238,7 @@ public URI putCacheFile(File file, String cacheId, @Nullable String objectId) th
public Optional<Boolean> deleteCacheFile(String cacheId, @Nullable String objectId) throws IOException {
URI uri = context.getCacheURI(cacheId, objectId);
return isFileExist(uri) ?
Optional.of(this.storage.delete(context.getTenantId(), uri)) :
Optional.of(this.storage.delete(context.getTenantId(), context.getNamespace(), uri)) :
Optional.empty();
}

Expand All @@ -263,7 +263,7 @@ private URI putFileAndDelete(File file, String prefix, String name) throws IOExc
private URI putFile(InputStream inputStream, String prefix, String name) throws IOException {
URI uri = URI.create(prefix);
URI resolve = uri.resolve(uri.getPath() + PATH_SEPARATOR + name);
return this.storage.put(context.getTenantId(), resolve, new BufferedInputStream(inputStream));
return this.storage.put(context.getTenantId(), context.getNamespace(), resolve, new BufferedInputStream(inputStream));
}

public Optional<StorageContext.Task> getTaskStorageContext() {
Expand Down
37 changes: 21 additions & 16 deletions core/src/main/java/io/kestra/core/storages/StorageInterface.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.Plugin;

import javax.annotation.Nullable;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
Expand All @@ -12,8 +13,12 @@
import java.io.InputStream;
import java.net.URI;
import java.util.List;
import java.util.Map;

/**
* @implNote Most methods (except lifecycle on) took a namespace as parameter, this namespace parameter MUST NOT BE USED to denote the path of the storage URI in any sort,
* the URI must never be modified by a storage implementation.
* This is only used by storage implementation that must enforce namespace isolation.
*/
public interface StorageInterface extends AutoCloseable, Plugin {

/**
Expand All @@ -34,10 +39,10 @@ default void close() {
}

@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
InputStream get(String tenantId, URI uri) throws IOException;
InputStream get(String tenantId, @Nullable String namespace, URI uri) throws IOException;

@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
StorageObject getWithMetadata(String tenantId, URI uri) throws IOException;
StorageObject getWithMetadata(String tenantId, @Nullable String namespace, URI uri) throws IOException;

/**
* Returns all objects that start with the given prefix
Expand All @@ -46,10 +51,10 @@ default void close() {
* @return Kestra's internal storage uris of the found objects
*/
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
List<URI> allByPrefix(String tenantId, URI prefix, boolean includeDirectories) throws IOException;
List<URI> allByPrefix(String tenantId, @Nullable String namespace, URI prefix, boolean includeDirectories) throws IOException;

@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
List<FileAttributes> list(String tenantId, URI uri) throws IOException;
List<FileAttributes> list(String tenantId, @Nullable String namespace, URI uri) throws IOException;

/**
* Whether the uri points to a file/object that exist in the internal storage.
Expand All @@ -59,40 +64,40 @@ default void close() {
* @return true if the uri points to a file/object that exist in the internal storage.
*/
@SuppressWarnings("try")
default boolean exists(String tenantId, URI uri) {
try (InputStream ignored = get(tenantId, uri)) {
default boolean exists(String tenantId, @Nullable String namespace, URI uri) {
try (InputStream ignored = get(tenantId, namespace, uri)) {
return true;
} catch (IOException ieo) {
return false;
}
}

@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
FileAttributes getAttributes(String tenantId, URI uri) throws IOException;
FileAttributes getAttributes(String tenantId, @Nullable String namespace, URI uri) throws IOException;

@Retryable(includes = {IOException.class})
default URI put(String tenantId, URI uri, InputStream data) throws IOException {
return this.put(tenantId, uri, new StorageObject(null, data));
default URI put(String tenantId, @Nullable String namespace, URI uri, InputStream data) throws IOException {
return this.put(tenantId, namespace, uri, new StorageObject(null, data));
}

@Retryable(includes = {IOException.class})
URI put(String tenantId, URI uri, StorageObject storageObject) throws IOException;
URI put(String tenantId, @Nullable String namespace, URI uri, StorageObject storageObject) throws IOException;

@Retryable(includes = {IOException.class})
boolean delete(String tenantId, URI uri) throws IOException;
boolean delete(String tenantId, @Nullable String namespace, URI uri) throws IOException;

@Retryable(includes = {IOException.class})
URI createDirectory(String tenantId, URI uri) throws IOException;
URI createDirectory(String tenantId, @Nullable String namespace, URI uri) throws IOException;

@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
URI move(String tenantId, URI from, URI to) throws IOException;
URI move(String tenantId, @Nullable String namespace, URI from, URI to) throws IOException;

@Retryable(includes = {IOException.class})
List<URI> deleteByPrefix(String tenantId, URI storagePrefix) throws IOException;
List<URI> deleteByPrefix(String tenantId, @Nullable String namespace, URI storagePrefix) throws IOException;

@Retryable(includes = {IOException.class})
default URI from(Execution execution, String input, File file) throws IOException {
URI uri = StorageContext.forInput(execution, input, file.getName()).getContextStorageURI();
return this.put(execution.getTenantId(), uri, new BufferedInputStream(new FileInputStream(file)));
return this.put(execution.getTenantId(), execution.getNamespace(), uri, new BufferedInputStream(new FileInputStream(file)));
}
}
Loading

0 comments on commit 78f5ff2

Please sign in to comment.