Skip to content

Commit

Permalink
v1.16: LoadedPrograms: introduce explicit SecondLevel type, fix prune…
Browse files Browse the repository at this point in the history
…_by_deployment_slot bug (#34434)

* Makes sure only LoadedPrograms::remove_programs_with_no_entries() does remove first level entries.

* Adds explicit SecondLevel structure to LoadedPrograms.

---------

Co-authored-by: Alexander Meißner <[email protected]>
  • Loading branch information
alessandrod and Lichtso authored Dec 14, 2023
1 parent 9cde446 commit 6a94de9
Showing 1 changed file with 64 additions and 48 deletions.
112 changes: 64 additions & 48 deletions program-runtime/src/loaded_programs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,12 +347,17 @@ impl LoadedProgram {
}
}

#[derive(Debug, Default)]
struct SecondLevel {
fork_versions: Vec<Arc<LoadedProgram>>,
}

#[derive(Debug, Default)]
pub struct LoadedPrograms {
/// A two level index:
///
/// Pubkey is the address of a program, multiple versions can coexists simultaneously under the same address (in different slots).
entries: HashMap<Pubkey, Vec<Arc<LoadedProgram>>>,
entries: HashMap<Pubkey, SecondLevel>,
/// Globally shared RBPF config and syscall registry
pub program_runtime_environment_v1: Arc<BuiltinProgram<InvokeContext<'static>>>,
latest_root: Slot,
Expand Down Expand Up @@ -434,12 +439,16 @@ impl LoadedPrograms {
key: Pubkey,
entry: Arc<LoadedProgram>,
) -> (bool, Arc<LoadedProgram>) {
let second_level = self.entries.entry(key).or_insert_with(Vec::new);
let index = second_level
let fork_versions = &mut self
.entries
.entry(key)
.or_insert_with(SecondLevel::default)
.fork_versions;
let index = fork_versions
.iter()
.position(|at| at.effective_slot >= entry.effective_slot);
if let Some((existing, entry_index)) =
index.and_then(|index| second_level.get(index).map(|value| (value, index)))
index.and_then(|index| fork_versions.get(index).map(|value| (value, index)))
{
if existing.deployment_slot == entry.deployment_slot
&& existing.effective_slot == entry.effective_slot
Expand All @@ -455,21 +464,21 @@ impl LoadedPrograms {
existing.ix_usage_counter.load(Ordering::Relaxed),
Ordering::Relaxed,
);
second_level.remove(entry_index);
fork_versions.remove(entry_index);
} else if existing.is_tombstone() != entry.is_tombstone() {
// Either the old entry is tombstone and the new one is not.
// (Let's give the new entry a chance).
// Or, the old entry is not a tombstone and the new one is a tombstone.
// (Remove the old entry, as the tombstone makes it obsolete).
second_level.remove(entry_index);
fork_versions.remove(entry_index);
} else {
self.stats.replacements.fetch_add(1, Ordering::Relaxed);
return (true, existing.clone());
}
}
}
self.stats.insertions.fetch_add(1, Ordering::Relaxed);
second_level.insert(index.unwrap_or(second_level.len()), entry.clone());
fork_versions.insert(index.unwrap_or(fork_versions.len()), entry.clone());
(false, entry)
}

Expand All @@ -486,7 +495,7 @@ impl LoadedPrograms {
/// On the epoch boundary this removes all programs of the outdated feature set
pub fn prune_feature_set_transition(&mut self) {
for second_level in self.entries.values_mut() {
second_level.retain(|entry| {
second_level.fork_versions.retain(|entry| {
let retain = match &entry.program {
LoadedProgramType::Builtin(_)
| LoadedProgramType::DelayVisibility
Expand Down Expand Up @@ -517,23 +526,21 @@ impl LoadedPrograms {
}

pub fn prune_by_deployment_slot(&mut self, slot: Slot) {
self.entries.retain(|_key, second_level| {
*second_level = second_level
.iter()
.filter(|entry| entry.deployment_slot != slot)
.cloned()
.collect();
!second_level.is_empty()
});
for second_level in self.entries.values_mut() {
second_level
.fork_versions
.retain(|entry| entry.deployment_slot != slot);
}
self.remove_programs_with_no_entries();
}

/// Before rerooting the blockstore this removes all programs of orphan forks
pub fn prune<F: ForkGraph>(&mut self, fork_graph: &F, new_root: Slot) {
let previous_root = self.latest_root;
self.entries.retain(|_key, second_level| {
for second_level in self.entries.values_mut() {
let mut first_ancestor_found = false;
*second_level = second_level
second_level.fork_versions = second_level
.fork_versions
.iter()
.rev()
.filter(|entry| {
Expand All @@ -553,9 +560,8 @@ impl LoadedPrograms {
})
.cloned()
.collect();
second_level.reverse();
!second_level.is_empty()
});
second_level.fork_versions.reverse();
}

self.remove_expired_entries(new_root);
self.remove_programs_with_no_entries();
Expand Down Expand Up @@ -608,7 +614,7 @@ impl LoadedPrograms {
let found = keys
.filter_map(|(key, (match_criteria, count))| {
if let Some(second_level) = self.entries.get(&key) {
for entry in second_level.iter().rev() {
for entry in second_level.fork_versions.iter().rev() {
let current_slot = working_slot.current_slot();
if entry.deployment_slot <= self.latest_root
|| entry.deployment_slot == current_slot
Expand Down Expand Up @@ -669,8 +675,10 @@ impl LoadedPrograms {
let sorted_candidates: Vec<(Pubkey, Arc<LoadedProgram>)> = self
.entries
.iter()
.flat_map(|(id, list)| {
list.iter()
.flat_map(|(id, second_level)| {
second_level
.fork_versions
.iter()
.filter_map(move |program| match program.program {
LoadedProgramType::LegacyV0(_)
| LoadedProgramType::LegacyV1(_)
Expand Down Expand Up @@ -702,8 +710,8 @@ impl LoadedPrograms {
}

fn remove_expired_entries(&mut self, current_slot: Slot) {
for entry in self.entries.values_mut() {
entry.retain(|program| {
for second_level in self.entries.values_mut() {
second_level.fork_versions.retain(|program| {
program
.maybe_expiration_slot
.map(|expiration| {
Expand All @@ -720,8 +728,8 @@ impl LoadedPrograms {
}

fn unload_program(&mut self, id: &Pubkey) {
if let Some(entries) = self.entries.get_mut(id) {
entries.iter_mut().for_each(|entry| {
if let Some(second_level) = self.entries.get_mut(id) {
second_level.fork_versions.iter_mut().for_each(|entry| {
if let Some(unloaded) = entry.to_unloaded() {
*entry = Arc::new(unloaded);
self.stats
Expand All @@ -744,8 +752,12 @@ impl LoadedPrograms {
remove: impl Iterator<Item = &'a (Pubkey, Arc<LoadedProgram>)>,
) {
for (id, program) in remove {
if let Some(entries) = self.entries.get_mut(id) {
if let Some(candidate) = entries.iter_mut().find(|entry| entry == &program) {
if let Some(second_level) = self.entries.get_mut(id) {
if let Some(candidate) = second_level
.fork_versions
.iter_mut()
.find(|entry| entry == &program)
{
if let Some(unloaded) = candidate.to_unloaded() {
if candidate.tx_usage_counter.load(Ordering::Relaxed) == 1 {
self.stats.one_hit_wonders.fetch_add(1, Ordering::Relaxed);
Expand All @@ -764,7 +776,8 @@ impl LoadedPrograms {

fn remove_programs_with_no_entries(&mut self) {
let num_programs_before_removal = self.entries.len();
self.entries.retain(|_, programs| !programs.is_empty());
self.entries
.retain(|_, second_level| !second_level.fork_versions.is_empty());
if self.entries.len() < num_programs_before_removal {
self.stats.empty_entries.fetch_add(
num_programs_before_removal.saturating_sub(self.entries.len()) as u64,
Expand Down Expand Up @@ -850,8 +863,9 @@ mod tests {
cache
.entries
.values()
.map(|programs| {
programs
.map(|second_level| {
second_level
.fork_versions
.iter()
.filter(|program| predicate(&program.program))
.count()
Expand Down Expand Up @@ -995,8 +1009,8 @@ mod tests {
let unloaded = cache
.entries
.iter()
.flat_map(|(id, cached_programs)| {
cached_programs.iter().filter_map(|program| {
.flat_map(|(id, second_level)| {
second_level.fork_versions.iter().filter_map(|program| {
matches!(program.program, LoadedProgramType::Unloaded(_))
.then_some((*id, program.tx_usage_counter.load(Ordering::Relaxed)))
})
Expand Down Expand Up @@ -1049,8 +1063,8 @@ mod tests {
});
assert_eq!(num_unloaded, 1);

cache.entries.values().for_each(|programs| {
programs.iter().for_each(|program| {
cache.entries.values().for_each(|second_level| {
second_level.fork_versions.iter().for_each(|program| {
if matches!(program.program, LoadedProgramType::Unloaded(_)) {
// Test that the usage counter is retained for the unloaded program
assert_eq!(program.tx_usage_counter.load(Ordering::Relaxed), 10);
Expand All @@ -1067,8 +1081,8 @@ mod tests {
new_test_loaded_program_with_usage(0, 2, AtomicU64::new(0)),
);

cache.entries.values().for_each(|programs| {
programs.iter().for_each(|program| {
cache.entries.values().for_each(|second_level| {
second_level.fork_versions.iter().for_each(|program| {
if matches!(program.program, LoadedProgramType::Unloaded(_))
&& program.deployment_slot == 0
&& program.effective_slot == 2
Expand Down Expand Up @@ -1129,8 +1143,8 @@ mod tests {
.entries
.get(&program1)
.expect("Failed to find the entry");
assert_eq!(second_level.len(), 1);
assert!(second_level.get(0).unwrap().is_tombstone());
assert_eq!(second_level.fork_versions.len(), 1);
assert!(second_level.fork_versions.get(0).unwrap().is_tombstone());
assert_eq!(tombstone.deployment_slot, 10);
assert_eq!(tombstone.effective_slot, 10);

Expand All @@ -1145,8 +1159,8 @@ mod tests {
.entries
.get(&program2)
.expect("Failed to find the entry");
assert_eq!(second_level.len(), 1);
assert!(!second_level.get(0).unwrap().is_tombstone());
assert_eq!(second_level.fork_versions.len(), 1);
assert!(!second_level.fork_versions.get(0).unwrap().is_tombstone());

let tombstone = set_tombstone(
&mut cache,
Expand All @@ -1158,9 +1172,9 @@ mod tests {
.entries
.get(&program2)
.expect("Failed to find the entry");
assert_eq!(second_level.len(), 2);
assert!(!second_level.get(0).unwrap().is_tombstone());
assert!(second_level.get(1).unwrap().is_tombstone());
assert_eq!(second_level.fork_versions.len(), 2);
assert!(!second_level.fork_versions.get(0).unwrap().is_tombstone());
assert!(second_level.fork_versions.get(1).unwrap().is_tombstone());
assert!(tombstone.is_tombstone());
assert_eq!(tombstone.deployment_slot, 60);
assert_eq!(tombstone.effective_slot, 60);
Expand Down Expand Up @@ -1593,8 +1607,8 @@ mod tests {
assert!(missing.contains(&(program4, 1)));

// Remove the expired entry to let the rest of the test continue
if let Some(programs) = cache.entries.get_mut(&program4) {
programs.pop();
if let Some(second_level) = cache.entries.get_mut(&program4) {
second_level.fork_versions.pop();
}

cache.prune(&fork_graph, 5);
Expand Down Expand Up @@ -1854,6 +1868,7 @@ mod tests {
.entries
.get(&program1)
.expect("Didn't find program1")
.fork_versions
.len(),
3
);
Expand All @@ -1865,6 +1880,7 @@ mod tests {
.entries
.get(&program1)
.expect("Didn't find program1")
.fork_versions
.len(),
1
);
Expand Down

0 comments on commit 6a94de9

Please sign in to comment.