Skip to content

Commit

Permalink
fix: root page was falsely deleted (#1464)
Browse files Browse the repository at this point in the history
* fix: root page was falsely deleted

* feat: cleaner db query
fix: root page was marked as immutable too soon

* fix: broken file name reference
  • Loading branch information
jobulcke authored Dec 19, 2024
1 parent cac7b39 commit 98ceeef
Show file tree
Hide file tree
Showing 20 changed files with 224 additions and 15 deletions.
2 changes: 1 addition & 1 deletion docs/how-to-run.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ Here is an explanation provided for all the possibilities on how to tweak and co
<td>ldes-server.compaction-duration</td>
<td>Defines how long the redundant compacted fragments will remain on the server</td>
<td>No</td>
<td>PD7</td>
<td>P7D</td>
</tr>
<tr><td colspan="4"><b>Maintenance</b></td></tr>
<tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public RootBucketCreatorImpl(BucketRepository bucketRepository) {
@Override
public void createRootBucketForView(ViewName viewName) {
if (bucketRepository.retrieveRootBucket(viewName).isEmpty()) {
bucketRepository.insertBucket(Bucket.createRootBucketForView(viewName));
bucketRepository.insertRootBucket(Bucket.createRootBucketForView(viewName));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@
import java.util.Optional;

public interface BucketRepository {
Bucket insertBucket(Bucket bucket);
Bucket insertRootBucket(Bucket bucket);
Optional<Bucket> retrieveRootBucket(ViewName viewName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ void when_RootFragmentDoesNotExist_ItIsCreatedAndSaved() {

InOrder inOrder = inOrder(bucketRepository);
inOrder.verify(bucketRepository).retrieveRootBucket(VIEW_NAME);
inOrder.verify(bucketRepository).insertBucket(Bucket.createRootBucketForView(VIEW_NAME));
inOrder.verify(bucketRepository).insertRootBucket(Bucket.createRootBucketForView(VIEW_NAME));
inOrder.verifyNoMoreInteractions();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ public BucketPostgresRepository(ViewEntityRepository viewEntityRepository, Bucke

@Override
@Transactional
public Bucket insertBucket(Bucket bucket) {
public Bucket insertRootBucket(Bucket bucket) {
String sql = """
INSERT INTO pages (bucket_id, expiration, partial_url)
VALUES (:bucketId, NULL, :partialUrl)
INSERT INTO pages (bucket_id, expiration, partial_url, is_root)
VALUES (:bucketId, NULL, :partialUrl, true)
ON CONFLICT DO NOTHING
""";
ViewEntity view = viewEntityRepository.findByViewName(bucket.getViewName().getCollectionName(), bucket.getViewName().getViewName())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ void test_Insertion() {
final Bucket bucketToSave = new Bucket(BucketDescriptor.of(new BucketDescriptorPair("key", "value"), new BucketDescriptorPair("k", "v")), VIEW_NAME);
final Bucket expectedSavedBucket = new Bucket(1L, BucketDescriptor.of(new BucketDescriptorPair("key", "value"), new BucketDescriptorPair("k", "v")), VIEW_NAME, List.of(), 0);

final Bucket result = bucketPostgresRepository.insertBucket(bucketToSave);
final Bucket result = bucketPostgresRepository.insertRootBucket(bucketToSave);

assertThat(result)
.usingRecursiveComparison()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?xml version="1.0" encoding="UTF-8"?>
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog
https://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-latest.xsd">

<changeSet id="add_is_root_column" author="vsds">
<preConditions onError="MARK_RAN">
<not>
<columnExists tableName="pages" columnName="is_root" />
</not>
</preConditions>
<addColumn tableName="pages">
<column name="is_root" type="BOOLEAN" defaultValueBoolean="false">
<constraints nullable="false" />
</column>
</addColumn>
<sqlFile path="add_is_root_values.sql" relativeToChangelogFile="true" />
</changeSet>
</databaseChangeLog>
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
WITH root_page_partial_urls AS (
SELECT CONCAT('/', c.name, '/', v.name) AS partial_url
FROM collections c
JOIN views v ON c.collection_id = v.collection_id
)
UPDATE pages SET is_root = true, immutable = false
WHERE partial_url IN (SELECT partial_url FROM root_page_partial_urls);
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
DROP VIEW IF EXISTS open_pages;

CREATE VIEW open_pages AS
SELECT DISTINCT ON (p.bucket_id) p.page_id,
p.bucket_id,
p.partial_url,
v.page_size,
count(pm.member_id) AS assigned_members
FROM pages p
JOIN buckets b ON b.bucket_id = p.bucket_id
JOIN views v ON v.view_id = b.view_id
LEFT JOIN page_members pm ON pm.page_id = p.page_id
WHERE NOT p.immutable
GROUP BY p.page_id, v.page_size, p.bucket_id, p.is_root
ORDER BY p.bucket_id, p.is_root
;
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
<?xml version="1.0" encoding="UTF-8"?>
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog
https://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-latest.xsd">

<include relativeToChangelogFile="true" file="add_is_root_column.xml"/>
<include relativeToChangelogFile="true" file="alter_open_pages_view.sql"/>
</databaseChangeLog>
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@
<include file="/db/changelog/3_5_0/master.xml" />
<include file="/db/changelog/3_5_1/master.xml" />
<include file="/db/changelog/3_6_0/master.xml" />
<include file="/db/changelog/3_6_1/master.xml" />

</databaseChangeLog>
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ public class CompactionPageEntity {
@Column(name = "partial_url", nullable = false, unique = true)
private String partialUrl;

@Column(name = "is_root", nullable = false, columnDefinition = "BOOLEAN")
private boolean isRoot;

public CompactionPageEntity() {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ SELECT p.page_id as fragmentId, COUNT(*) AS size, r.to_page_id AS toPage, p.buck
WHERE c.name = :collectionName
AND v.name = :viewName
AND p.expiration IS NULL AND p.immutable
AND NOT p.is_root
GROUP BY p.page_id, r.to_page_id
HAVING COUNT(*) < :capacityPerPage
""", nativeQuery = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ public class PageEntity {
@Column(name = "partial_url", nullable = false, unique = true)
private String partialUrl;

@Column(name = "is_root", nullable = false, columnDefinition = "BOOLEAN")
private boolean isRoot;

@OneToMany(mappedBy = "fromPage")
private List<PageRelationEntity> relations;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

public interface PageEntityRepository extends JpaRepository<PageEntity, Long> {
@Modifying
@Query(value = "UPDATE pages SET immutable = true WHERE page_id = ?", nativeQuery = true)
@Query(value = "UPDATE pages SET immutable = true WHERE page_id = ? AND NOT is_root", nativeQuery = true)
void setPageImmutable(long pageId);

@Modifying
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,15 @@

import be.vlaanderen.informatievlaanderen.ldes.server.pagination.postgres.entity.PageEntity;
import be.vlaanderen.informatievlaanderen.ldes.server.pagination.postgres.entity.PageRelationEntity;
import be.vlaanderen.informatievlaanderen.ldes.server.resultactionsextensions.ResponseToModelConverter;
import io.cucumber.java.After;
import io.cucumber.java.Before;
import io.cucumber.java.en.And;
import io.cucumber.java.en.Then;
import io.cucumber.java.en.When;
import org.apache.jena.rdf.model.ResourceFactory;
import org.apache.jena.riot.Lang;
import org.apache.jena.vocabulary.RDF;

import java.io.IOException;
import java.net.URISyntaxException;
Expand All @@ -17,17 +24,34 @@
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatNoException;
import static org.awaitility.Awaitility.await;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;

@SuppressWarnings("java:S3415")
public class CompactionServiceSteps extends LdesServerIntegrationTest {
private int versionIncremeter = 1;
private ScheduledExecutorService executorService;
private Future<?> seedingTask;

@Before
public void setup() {
executorService = Executors.newSingleThreadScheduledExecutor();
}

@After
public void cleanup() {
executorService.shutdown();
}

@And("I ingest {int} members of different versions")
public void ingestDifferentVersions(int amount) throws Exception {
Expand Down Expand Up @@ -144,4 +168,70 @@ private boolean isValidUuid(String pageNumber) {
return false;
}
}

@And("I start seeding {int} members every {int} seconds")
public void iStartSeedingMembersEverySeconds(int numberOfMembers, int seconds) {
seedingTask = executorService.scheduleAtFixedRate(() -> ingestNumberOfVersionObjects(numberOfMembers), 0, seconds, SECONDS);
}

private void ingestNumberOfVersionObjects(int numberOfMembers) {
try {
String memberTemplate = readMemberTemplate("data/input/members/observation.template.json");
for (int i = 0; i < numberOfMembers; i++) {
String memberContent = memberTemplate
.replace("ID", String.valueOf(i))
.replace("DATETIME", getCurrentTimestamp());
mockMvc.perform(post("/observations")
.contentType(Lang.JSONLD.getHeaderString())
.content(memberContent))
.andExpect(status().is2xxSuccessful());
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Then("I wait until {int} members are ingested")
public void iWaitUntilMembersAreIngested(int number) {
await().atMost(Duration.ofMinutes(3))
.pollInterval(Duration.ofSeconds(15))
.untilAsserted(() -> assertThat(jdbcTemplate.queryForObject("SELECT COUNT(*) FROM members", Long.class)).isEqualTo(number));
}

@Then("I stop seeding members")
public void iStopSeedingMembers() {
seedingTask.cancel(true);
}


@When("I wait until the first page does not exits anymore")
public void iWaitUntilThePageWithPageNumberDoesNotExitsAnymore() {
await()
.atMost(Duration.ofMinutes(2))
.pollInterval(Duration.ofSeconds(5))
.untilAsserted(() -> mockMvc.perform(get("/observations/time-based?pageNumber=1"))
.andExpect(status().isNotFound()));
}

@And("I only have one open page")
public void iOnlyHaveOneOpenPage() {
final Long openPageCount = jdbcTemplate.queryForObject("SELECT COUNT(*) FROM open_pages", Long.class);
assertThat(openPageCount).isEqualTo(1);
}

@Then("the root page points to a compacted page")
public void theRootPagePointsToACompactedPage() throws Exception {
final var response = mockMvc.perform(get("/observations/time-based").accept(Lang.NQ.getHeaderString()))
.andExpect(status().is2xxSuccessful())
.andReturn()
.getResponse();
final String pageNumber = new ResponseToModelConverter(response).convert()
.listSubjectsWithProperty(RDF.type, ResourceFactory.createProperty("https://w3id.org/tree#Relation"))
.nextResource()
.listProperties(ResourceFactory.createProperty("https://w3id.org/tree#node"))
.nextStatement()
.getResource()
.getLocalName();
assertThatNoException().isThrownBy(() -> UUID.fromString(pageNumber));
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
ldes-server:
host-name: "http://localhost:8080"
compaction-duration: "*/10 * * * * *"
fragmentation-cron: "*/10 * * * * *"
maintenance-cron: "*/10 * * * * *"
maintenance-cron: "*/5 * * * * *"
compaction-duration: PT2M

springdoc.swaggerui.path: "/swagger"
management:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
@prefix ldes: <https://w3id.org/ldes#> .
@prefix tree: <https://w3id.org/tree#>.
@prefix sh: <http://www.w3.org/ns/shacl#> .
@prefix ex: <http://example.org#> .
@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .
@prefix dcterms: <http://purl.org/dc/terms/> .

</observations> a ldes:EventStream ;
ldes:timestampPath dcterms:created ;
ldes:versionOfPath dcterms:isVersionOf ;
tree:shape [ a sh:NodeShape ] ;
tree:view </observations/time-based> ;
ldes:eventSource [
a ldes:EventSource ;
ldes:retentionPolicy [
a ldes:DurationAgoPolicy ;
tree:value "PT2M"^^xsd:duration
] ;
] .

</observations/time-based> a tree:Node ;
tree:viewDescription [
a tree:ViewDescription ;
tree:fragmentationStrategy () ;
tree:pageSize "7"^^xsd:integer ;
ldes:retentionPolicy [
a ldes:DurationAgoPolicy ;
tree:value "PT90S"^^xsd:duration
]
]
.
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{
"@context": {
"@base": "http://example.org/id/",
"@vocab": "http://purl.org/dc/terms/",
"isVersionOf": {
"@type": "@id"
}
},
"@id": "ID#DATETIME",
"@type": [
"something"
],
"created": [
{
"@value": "DATETIME",
"@type": "http://www.w3.org/2001/XMLSchema#dateTime"
}
],
"isVersionOf": "http://example.org/id/ID"
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
Feature: LDES Server Compaction

Background:
Scenario: Execution Compaction
Given I create the eventstream "data/input/eventstreams/compaction/mobility-hindrances_paginated_5.ttl"
And I ingest 6 members of different versions
And I ingest 5 members of the same version
And I ingest 5 members of the same version
And I ingest 3 members of different versions

Scenario: Execution Compaction
Then I wait until all members are fragmented
Then wait until no fragments can be compacted
And verify there are 5 pages
Expand All @@ -21,4 +19,14 @@ Feature: LDES Server Compaction
And verify the following pages no longer exist
| 2 |
| 3 |
And the background processes did not fail
And the background processes did not fail

Scenario: Retention Compaction And Deletion works fine together
Given I create the eventstream "data/input/eventstreams/compaction/observations.ttl"
And I start seeding 5 members every 15 seconds
When I wait until the first page does not exits anymore
Then the root page points to a compacted page
And I only have one open page
Then I stop seeding members
And I delete the eventstream "observations"
And the background processes did not fail

0 comments on commit 98ceeef

Please sign in to comment.