Skip to content

Commit

Permalink
Implement gateway filter for JDBC (H2 & Postgresql) device registry:
Browse files Browse the repository at this point in the history
- add optional parameter "isGateway"
- functionality for listing only devices or gateways
- add documentation

Also-by: Matthias Feurer [email protected]
Signed-off-by: g.dimitropoulos <[email protected]>
  • Loading branch information
gdimitropoulos-sotec committed Oct 30, 2023
1 parent fc10dc9 commit ee04d53
Show file tree
Hide file tree
Showing 17 changed files with 363 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,7 @@ private Map<String, Object> getAttributes(final PubSubBasedCommand command) {
attributes.put(PubSubMessageHelper.PUBSUB_PROPERTY_PROJECT_ID, projectId);
attributes.put(PubSubMessageHelper.PUBSUB_PROPERTY_RESPONSE_REQUIRED, !command.isOneWay());
Optional.ofNullable(command.getGatewayId()).ifPresent(
id -> {
attributes.put(MessageHelper.APP_PROPERTY_GATEWAY_ID, id);
attributes.put(MessageHelper.APP_PROPERTY_CMD_VIA, id);
});
id -> attributes.put(MessageHelper.APP_PROPERTY_GATEWAY_ID, id));
return attributes;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2019, 2021 Contributors to the Eclipse Foundation
* Copyright (c) 2019, 2021, 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down Expand Up @@ -118,6 +118,11 @@ public final class RegistryManagementConstants extends RequestResponseApiConstan
*/
public static final String PARAM_SORT_JSON = "sortJson";

/**
* The name of the boolean filter query parameter for searching gateways or only devices.
*/
public static final String PARAM_IS_GATEWAY = "isGateway";


// DEVICES

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2016, 2022 Contributors to the Eclipse Foundation
* Copyright (c) 2016, 2022, 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down Expand Up @@ -83,6 +83,13 @@ public abstract class AbstractHttpEndpoint<T extends ServiceConfigProperties> ex
}
};

/**
* A function that tries to parse a string into an Optional boolean.
*/
protected static final Function<String, Optional<Boolean>> CONVERTER_BOOLEAN = s -> {
return Strings.isNullOrEmpty(s) ? Optional.empty() : Optional.of(Boolean.valueOf(s));
};

/**
* The configuration properties for this endpoint.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,13 @@ public class TableManagementStore extends AbstractDeviceStore {
private final Statement updateDeviceVersionStatement;

private final Statement countDevicesOfTenantStatement;
private final Statement countDevicesWithFilter;
private final Statement countDevicesWithFilterStatement;
private final Statement countGatewaysOfTenantStatement;
private final Statement countOnlyDevicesOfTenantStatement;

private final Statement findDevicesStatement;
private final Statement findDevicesOfTenantStatement;
private final Statement findGatewaysOfTenantStatement;
private final Statement findOnlyDevicesOfTenantStatement;
private final Statement findDevicesOfTenantWithFilterStatement;

/**
Expand Down Expand Up @@ -204,20 +208,48 @@ public TableManagementStore(final JDBCClient client, final Tracer tracer, final
.validateParameters(
TENANT_ID);

this.countDevicesWithFilter = cfg
this.countGatewaysOfTenantStatement = cfg
.getRequiredStatement("countGatewaysOfTenant")
.validateParameters(
TENANT_ID,
DEVICE_ID);

this.countOnlyDevicesOfTenantStatement = cfg
.getRequiredStatement("countOnlyDevicesOfTenant")
.validateParameters(
TENANT_ID,
DEVICE_ID);

this.countDevicesWithFilterStatement = cfg
.getRequiredStatement("countDevicesOfTenantWithFilter")
.validateParameters(
TENANT_ID,
FIELD,
VALUE);

this.findDevicesStatement = cfg
this.findDevicesOfTenantStatement = cfg
.getRequiredStatement("findDevicesOfTenant")
.validateParameters(
TENANT_ID,
PAGE_SIZE,
PAGE_OFFSET);

this.findOnlyDevicesOfTenantStatement = cfg
.getRequiredStatement("findOnlyDevicesOfTenant")
.validateParameters(
TENANT_ID,
DEVICE_ID,
PAGE_SIZE,
PAGE_OFFSET);

this.findGatewaysOfTenantStatement = cfg
.getRequiredStatement("findGatewaysOfTenant")
.validateParameters(
TENANT_ID,
DEVICE_ID,
PAGE_SIZE,
PAGE_OFFSET);

this.findDevicesOfTenantWithFilterStatement = cfg
.getRequiredStatement("findDevicesOfTenantWithFilter")
.validateParameters(
Expand All @@ -228,46 +260,6 @@ public TableManagementStore(final JDBCClient client, final Tracer tracer, final
PAGE_OFFSET);
}

private static Future<Object> checkUpdateOutcome(final UpdateResult updateResult) {

if (updateResult.getUpdated() < 0) {
// conflict
log.debug("Optimistic lock broke");
return Future.failedFuture(new OptimisticLockingException());
}

return Future.succeededFuture();

}

private static Future<String> extractVersionForUpdate(final ResultSet device, final Optional<String> resourceVersion) {
final Optional<String> version = device.getRows(true).stream().map(o -> o.getString(VERSION)).findAny();

if (version.isEmpty()) {
log.debug("No version or no row found -> entity not found");
return Future.failedFuture(new EntityNotFoundException());
}

final var currentVersion = version.get();

return resourceVersion
// if we expect a certain version
.<Future<String>>map(expected -> {
// check ...
if (expected.equals(currentVersion)) {
// version matches, continue with current version
return Future.succeededFuture(currentVersion);
} else {
// version does not match, abort
return Future.failedFuture(new OptimisticLockingException());
}
}
)
// if we don't expect a version, continue with the current
.orElseGet(() -> Future.succeededFuture(currentVersion));

}

/**
* Read a device and lock it for updates.
* <p>
Expand Down Expand Up @@ -850,6 +842,46 @@ private <T> Future<T> recoverNotFound(final Span span, final Throwable err, fina
}
}

private static Future<Object> checkUpdateOutcome(final UpdateResult updateResult) {

if (updateResult.getUpdated() < 0) {
// conflict
log.debug("Optimistic lock broke");
return Future.failedFuture(new OptimisticLockingException());
}

return Future.succeededFuture();

}

private static Future<String> extractVersionForUpdate(final ResultSet device, final Optional<String> resourceVersion) {
final Optional<String> version = device.getRows(true).stream().map(o -> o.getString("version")).findAny();

if (version.isEmpty()) {
log.debug("No version or no row found -> entity not found");
return Future.failedFuture(new EntityNotFoundException());
}

final var currentVersion = version.get();

return resourceVersion
// if we expect a certain version
.<Future<String>>map(expected -> {
// check ...
if (expected.equals(currentVersion)) {
// version matches, continue with current version
return Future.succeededFuture(currentVersion);
} else {
// version does not match, abort
return Future.failedFuture(new OptimisticLockingException());
}
}
)
// if we don't expect a version, continue with the current
.orElseGet(() -> Future.succeededFuture(currentVersion));

}

/**
* Get all credentials for a device.
* <p>
Expand Down Expand Up @@ -912,7 +944,7 @@ private List<CommonCredential> parseCredentials(final ResultSet result) {
final var entries = result.getRows(true);

return entries.stream()
.map(o -> o.getString(DATA))
.map(o -> o.getString("data"))
.map(s -> Json.decodeValue(s, CommonCredential.class))
.collect(Collectors.toList());

Expand All @@ -925,25 +957,41 @@ private List<CommonCredential> parseCredentials(final ResultSet result) {
* @param pageSize The page size.
* @param pageOffset The page offset.
* @param filters The list of filters (currently only the first value of the list is used).
* @param isGateway Optional search gateway or only devices filter.
* @param spanContext The span to contribute to.
* @return A future containing devices.
*/
public Future<SearchResult<DeviceWithId>> findDevices(final String tenantId, final int pageSize, final int pageOffset, final List<Filter> filters,
public Future<SearchResult<DeviceWithId>> findDevices(final String tenantId, final int pageSize, final int pageOffset, final List<Filter> filters, final Optional<Boolean> isGateway,
final SpanContext spanContext) {


final var filter = filters.stream().findFirst();
final Statement findDeviceSqlStatement;
final Statement countStatement;
final String field;
final String value;

if (isGateway.isPresent()) {
field = "";
value = "";

final String field = filter.map(filter1 -> filter1.getField().toString().replace("/", "")).orElse("");
final var value = filter.map(filter1 ->
filter1.getValue().toString()
.replace("/", "")
.replace("*", "%")
.replace("?", "_")
).orElse("");
findDeviceSqlStatement = isGateway.get() ? this.findGatewaysOfTenantStatement : this.findOnlyDevicesOfTenantStatement;
countStatement = isGateway.get() ? this.countGatewaysOfTenantStatement : this.countOnlyDevicesOfTenantStatement;
} else {
final var filter = filters.stream().findFirst();

field = filter.map(filter1 -> filter1.getField().toString().replace("/", "")).orElse("");
value = filter.map(filter1 ->
filter1.getValue().toString()
.replace("/", "")
.replace("*", "%")
.replace("?", "_")
).orElse("");


findDeviceSqlStatement = (filter.isPresent()) ? findDevicesOfTenantWithFilterStatement : this.findDevicesOfTenantStatement;
countStatement = (filter.isPresent()) ? countDevicesWithFilterStatement : this.countDevicesOfTenantStatement;
}

final Statement findDeviceSqlStatement = (filter.isPresent()) ? findDevicesOfTenantWithFilterStatement : this.findDevicesStatement;
final Statement countStatement = (filter.isPresent()) ? countDevicesWithFilter : this.countDevicesOfTenantStatement;

final var expanded = findDeviceSqlStatement.expand(map -> {
map.put(TENANT_ID, tenantId);
Expand Down Expand Up @@ -985,5 +1033,3 @@ public Future<SearchResult<DeviceWithId>> findDevices(final String tenantId, fin
.onComplete(x -> span.finish());
}
}


Original file line number Diff line number Diff line change
@@ -1,18 +1,36 @@

countDevicesOfTenantWithFilter: |
SELECT COUNT(*) AS deviceCount FROM %1$s
WHERE
SELECT COUNT(*) AS deviceCount FROM %1$s
WHERE
tenant_id=:tenant_id
AND
LOCATE(CONCAT_WS(':', :field, REPLACE(:value, '"')), REPLACE(data, '"'))
OR
REPLACE(data, '"') LIKE CONCAT('%%', :field, ':', REPLACE(:value, '"'))
countGatewaysOfTenant: |
SELECT COUNT(*) AS deviceCount
FROM %1$s
WHERE
tenant_id=:tenant_id
AND
LOCATE(CONCAT(device_id, '|'),
(SELECT CONCAT(REPLACE(group_concat(DISTINCT ids separator '|'), ',', '|'), '|') FROM
(SELECT DISTINCT REGEXP_REPLACE(REGEXP_SUBSTR(DATA, '"via":\[.*?\]' ), '"via":\[|\]|"', '') as ids FROM %1$s WHERE tenant_id=:tenant_id ))) > 0
countOnlyDevicesOfTenant: |
SELECT COUNT(*) AS deviceCount
FROM %1$s
WHERE
tenant_id=:tenant_id
AND
LOCATE(CONCAT(device_id, '|'),
(SELECT CONCAT(REPLACE(group_concat(DISTINCT ids separator '|'), ',', '|'), '|') FROM
(SELECT DISTINCT REGEXP_REPLACE(REGEXP_SUBSTR(DATA, '"via":\[.*?\]' ), '"via":\[|\]|"', '') as ids FROM %1$s WHERE tenant_id=:tenant_id ))) = 0
findDevicesOfTenantWithFilter: |
SELECT *
FROM %s
WHERE
SELECT *
FROM %1$s
WHERE
tenant_id=:tenant_id
AND
LOCATE(CONCAT_WS(':', :field, REPLACE(:value, '"')), REPLACE(data, '"'))
Expand All @@ -21,3 +39,29 @@ findDevicesOfTenantWithFilter: |
ORDER BY device_id ASC
LIMIT :page_size
OFFSET :page_offset
findGatewaysOfTenant: |
SELECT *
FROM %1$s
WHERE
tenant_id=:tenant_id
AND
LOCATE(CONCAT(device_id, '|'),
(SELECT CONCAT(REPLACE(group_concat(DISTINCT ids separator '|'), ',', '|'), '|') FROM
(SELECT DISTINCT REPLACE(REGEXP_REPLACE(REGEXP_SUBSTR(DATA, '"via":\[.*?\]' ), '"via":\[|\]|"', ''), '\') as ids FROM %1$s WHERE tenant_id=:tenant_id ))) > 0
ORDER BY device_id ASC
LIMIT :page_size
OFFSET :page_offset
findOnlyDevicesOfTenant: |
SELECT *
FROM %1$s
WHERE
tenant_id=:tenant_id
AND
LOCATE(CONCAT(device_id, '|'),
(SELECT CONCAT(REPLACE(group_concat(DISTINCT ids separator '|'), ',', '|'), '|') FROM
(SELECT DISTINCT REGEXP_REPLACE(REGEXP_SUBSTR(DATA, '"via":\[.*?\]' ), '"via":\[|\]|"', '') as ids FROM %1$s WHERE tenant_id=:tenant_id ))) = 0
ORDER BY device_id ASC
LIMIT :page_size
OFFSET :page_offset
Loading

0 comments on commit ee04d53

Please sign in to comment.