Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Reconcile on disk state with assigned partitions for given resource and delete unused partitions #650

Open
2 of 12 tasks
ZacAttack opened this issue Sep 21, 2023 · 8 comments · Fixed by #1196 · May be fixed by #1332
Open
2 of 12 tasks

[BUG] Reconcile on disk state with assigned partitions for given resource and delete unused partitions #650

ZacAttack opened this issue Sep 21, 2023 · 8 comments · Fixed by #1196 · May be fixed by #1332
Labels
bug Something isn't working

Comments

@ZacAttack
Copy link
Contributor

ZacAttack commented Sep 21, 2023

Willingness to contribute

No. I cannot contribute a bug fix at this time.

Venice version

Observed since March in production

System information

  • OS Platform and Distribution (e.g., Linux Ubuntu 20.0): Marner 5.15.111.1-1.cm2
  • JDK version: 17

Describe the problem

Currently the on-disk state inside the rocksDB folder of a venice server for a give resource (store version) can contain folders for partitions that are no longer assigned to the host, due to missed "partition drops".

We need to ensure that only the known (metadata ?) or "assigned" partitions are present in the folder for each store_version , and remove the rest very quickly in order to recover disk space.

It is NOT sufficient to clean up this data on start without some introspection. Venice relies on delayed rebalance on it's controller to avoid unnecessary bootstraps, so some examination may be needed to determine what the appropriate disk state should be.

Tracking information

No response

Code to reproduce bug

No response

What component(s) does this bug affect?

  • Controller: This is the control-plane for Venice. Used to create/update/query stores and their metadata.
  • Router: This is the stateless query-routing layer for serving read requests.
  • Server: This is the component that persists all the store data.
  • VenicePushJob: This is the component that pushes derived data from Hadoop to Venice backend.
  • VenicePulsarSink: This is a Sink connector for Apache Pulsar that pushes data from Pulsar into Venice.
  • Thin Client: This is a stateless client users use to query Venice Router for reading store data.
  • Fast Client: This is a stateful client users use to query Venice Server for reading store data.
  • Da Vinci Client: This is an embedded, stateful client that materializes store data locally.
  • Alpini: This is the framework that fast-client and routers use to route requests to the storage nodes that have the data.
  • Samza: This is the library users use to make nearline updates to store data.
  • Admin Tool: This is the stand-alone client used for ad-hoc operations on Venice.
  • Scripts: These are the various ops scripts in the repo.
@ZacAttack ZacAttack added the bug Something isn't working label Sep 21, 2023
@ZacAttack
Copy link
Contributor Author

Ok. So here are my notes for how to make the bug fix.

There are two tasks. Reconcile Disk state for Davinci and for servers. I'm going to explain them seperately.

Servers

Our task will be to consult Helix's idealstate for understanding which partitions will be assigned to the server. That should be a sufficient enough signal to tell us if the server will get state transitions for the data which is on disk.

Within one of the constructors for StorageService.java there is an argument for providing a functional interface called checkWhetherStorageEngineShouldBeKeptOrNot. When the storage service starts up, it's going to check every folder in it's local disk and determine weather or not this folder should be deleted or not. Today we use this in Davinci, but we don't in the server. You can observe this due to the fact that when VeniceServer.java calls new StorageService() it uses a constructor which doesn't provide a parameter for checkWhetherStorageEngineShouldBeKeptOrNot.

Our task here then is to have VeniceServer invoke the constructor that requires this parameter, and then to define the function that should be used.

Some Tips:

To get the Idealstate, you should be able to use the SafeHelixDdataAccessor object and call getProperty. What you should pass here is a PropertyKey which points to the idealstate for the resource we're checking up on. That call should look something like:

PropertyKey.Builder propertyKeyBuilder = new PropertyKey.Builder(clusterConfig.getClusterName());
IdealState idealstate = safeHelixDataAccessort.getProperty(propertyKeyBuilder.idealStates(storeName));

Once you have the idealstate object, you can interrogate it on which instance names currently host a given partition. If the current instance is in the list, then you know that we should keep the partition. Yay!

Some notes now that I'm looking at this. First of all, the current implementation of checkWhetherStorageEngineShouldBeKeptOrNot seems to work off of store names not partition names. We will probably have to tweak that somehow, I leave it to you to decide how best to do that.

Next, it would be great if the implementation prevented looking up the IdealState from ZK over and over again. Some cacheing per store would be ideal here so we only have to look it up once, not over and over again. So please keep that in mind.

@kristyelee
Copy link
Contributor

Currently have the following code in writing:

private Function<String, Boolean> functionToCheckWhetherStorageEngineShouldBeKeptOrNot() {
    return storageEngineName -> {
      String storeName = Version.parseStoreFromKafkaTopicName(storageEngineName);

      if (VeniceSystemStoreType.META_STORE.isSystemStore(storeName)) {
        return true;
      }

      AbstractStorageEngine storageEngine = storageService.getStorageEngine(storageEngineName);

      PropertyKey.Builder propertyKeyBuilder = new PropertyKey.Builder(this.veniceConfigLoader.getVeniceClusterConfig().getClusterName());
      IdealState idealState = SafeHelixDataAccessor.getProperty(propertyKeyBuilder.idealStates(storeName));

      Set<String> idealPartitionIds = idealState.getPartitionSet();
      Set<Integer> storageEnginePartitionIds = storageEngine.getPartitionIds();

      for (Integer storageEnginePartitionId: storageEnginePartitionIds) {
        //How to change storageEnginePartitionId into the correct type? [String]
//          if (idealPartitionIds.contains(storageEnginePartitionId)) {
//              continue;
//          }
//          dropPartition(storageEnginePartitionId);
      }

      return true;
    };
  }

@kvargha
Copy link
Contributor

kvargha commented Sep 23, 2024

dropPartition takes an integer, so you would want to convert to convert idealPartitionIds to a set of ints.

You can do it like this:

import java.util.Set;
import java.util.stream.Collectors;
Set<String> idealPartitionIds = idealState.getPartitionSet().stream().map(Integer::parseInt).collect(Collectors.toSet());

If you want to invoke dropPartition you would need to do it like this:
storageEngine.dropPartition(storageEnginePartitionId);

@kristyelee
Copy link
Contributor

kristyelee commented Sep 23, 2024

Hi @kvargha, thanks for the pointers.
I have revised the code to make the mapping work.

Set<Integer> idealStatePartitionIds = new HashSet<>();
idealState.getPartitionSet().stream().forEach(partitionId -> {
      idealStatePartitionIds.add(Integer.parseInt(partitionId));
});
Set<Integer> storageEnginePartitionIds = storageEngine.getPartitionIds();

for (Integer storageEnginePartitionId: storageEnginePartitionIds) {
    if (idealStatePartitionIds.contains(storageEnginePartitionId)) {
        continue;
    }
    storageEngine.dropPartition(storageEnginePartitionId);
}

@kristyelee
Copy link
Contributor

kristyelee commented Sep 24, 2024

Here are the proposed constructor changes following the addition of the new functionToCheckWhetherStorageEngineShouldBeKeptOrNot.
[Though, this function concerns checks with respect to partition].

In createServices():

boolean whetherToRestoreDataPartitions = !isIsolatedIngestion()
        || veniceConfigLoader.getVeniceServerConfig().freezeIngestionIfReadyToServeOrLocalDataExists();

    // Create and add StorageService. storeRepository will be populated by StorageService
    storageService = new StorageService(
        veniceConfigLoader,
        storageEngineStats,
        rocksDBMemoryStats,
        storeVersionStateSerializer,
        partitionStateSerializer,
        metadataRepo,
        whetherToRestoreDataPartitions,
        true,
        functionToCheckWhetherStorageEngineShouldBeKeptOrNot());

...

Then,

  protected final boolean isIsolatedIngestion() {
    return veniceConfigLoader.getVeniceServerConfig().getIngestionMode().equals(IngestionMode.ISOLATED);
  }

[This is following the format in DaVinciBackend.java.]

As the functional interface should be accepted before adding to the initialized StorageService, I have submitted the most recent commit that contains only an update to the functional interface for unassigned partition to be deleted.
Such that this is approved, will then integrate argument changes into the initialized StorageService.

@kristyelee
Copy link
Contributor

kristyelee commented Sep 24, 2024

Here is the code to the function:

private Function<String, Boolean> functionToCheckWhetherStorageEngineShouldBeKeptOrNot() {
    return storageEngineName -> {
      String storeName = Version.parseStoreFromKafkaTopicName(storageEngineName);

      AbstractStorageEngine storageEngine = storageService.getStorageEngine(storageEngineName);

      PropertyKey.Builder propertyKeyBuilder =
          new PropertyKey.Builder(this.veniceConfigLoader.getVeniceClusterConfig().getClusterName());
      IdealState idealState = SafeHelixDataAccessor.getProperty(propertyKeyBuilder.idealStates(storeName));

      Set<Integer> idealStatePartitionIds = new HashSet<>();
      idealState.getPartitionSet().stream().forEach(partitionId -> {
        idealStatePartitionIds.add(Integer.parseInt(partitionId));
      });
      Set<Integer> storageEnginePartitionIds = storageEngine.getPartitionIds();

      for (Integer storageEnginePartitionId: storageEnginePartitionIds) {
        if (idealStatePartitionIds.contains(storageEnginePartitionId)) {
          continue;
        }
        storageEngine.dropPartition(storageEnginePartitionId);
      }

      return true;
    };
  }

First added in this commit.

@kvargha
Copy link
Contributor

kvargha commented Nov 19, 2024

Reopening since we still need this fix for Da Vinci Client.

@kvargha kvargha reopened this Nov 19, 2024
@kristyelee kristyelee linked a pull request Nov 20, 2024 that will close this issue
2 tasks
@kristyelee
Copy link
Contributor

From discussion, seems that the logic in DVC is as follows:
In DVC, user actively specifies set of subscribed partitions. [Function: subscribe()]
Another method for partition subscription is bootstrap(). [Function: bootstrap()]
This involves retrieving storage engine on data folder upon restart.
Upon valid store, this subscribes all partitions on disk regardless of the requested partitions to subscribe, causing an inclusive subscription set instead of the correctly updated subscription set upon restart.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
3 participants