Skip to content

Commit

Permalink
feat(plc4j/iec-60870): Finished a first rough version of the code to …
Browse files Browse the repository at this point in the history
…publish incoming IEC 60870-5-104 events to PLC4X Subscription Handlers (Still query expressions and filtering isn't implemented)
  • Loading branch information
chrisdutz committed Aug 29, 2023
1 parent 9cafc2f commit 77de2a1
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 21 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.plc4x.java.iec608705104.readwrite.model;

import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.plc4x.java.iec608705104.readwrite.tag.Iec608705104Tag;
import org.apache.plc4x.java.spi.messages.PlcSubscriber;
import org.apache.plc4x.java.spi.model.DefaultPlcSubscriptionHandle;

public class Iec608705104SubscriptionHandle extends DefaultPlcSubscriptionHandle {

private final Iec608705104Tag tag;

public Iec608705104SubscriptionHandle(PlcSubscriber plcSubscriber, Iec608705104Tag tag) {
super(plcSubscriber);
this.tag = tag;
}

public Iec608705104Tag getTag() {
return tag;
}

/*public boolean matches(Iec608705104Tag address) {
return tag.matchesGroupAddress(address);
}*/

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}

if (!(o instanceof Iec608705104SubscriptionHandle)) {
return false;
}

Iec608705104SubscriptionHandle that = (Iec608705104SubscriptionHandle) o;

return new EqualsBuilder()
.append(getTag(), that.getTag())
.isEquals();
}

@Override
public int hashCode() {
return new HashCodeBuilder(17, 37)
.appendSuper(super.hashCode())
.append(getTag())
.toHashCode();
}

@Override
public String toString() {
return new ToStringBuilder(this)
.append("tag", tag)
.toString();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,42 +20,44 @@
package org.apache.plc4x.java.iec608705104.readwrite.protocol;

import org.apache.plc4x.java.api.messages.PlcSubscriptionEvent;
import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
import org.apache.plc4x.java.api.model.PlcConsumerRegistration;
import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
import org.apache.plc4x.java.api.types.PlcResponseCode;
import org.apache.plc4x.java.api.value.PlcValue;
import org.apache.plc4x.java.iec608705104.readwrite.*;
import org.apache.plc4x.java.iec608705104.readwrite.configuration.Iec608705014Configuration;
import org.apache.plc4x.java.iec608705104.readwrite.model.Iec608705104SubscriptionHandle;
import org.apache.plc4x.java.iec608705104.readwrite.tag.Iec608705104Tag;
import org.apache.plc4x.java.spi.ConversationContext;
import org.apache.plc4x.java.spi.Plc4xProtocolBase;
import org.apache.plc4x.java.spi.configuration.HasConfiguration;
import org.apache.plc4x.java.spi.messages.DefaultPlcSubscriptionEvent;
import org.apache.plc4x.java.spi.messages.DefaultPlcSubscriptionResponse;
import org.apache.plc4x.java.spi.messages.PlcBrowser;
import org.apache.plc4x.java.spi.messages.PlcSubscriber;
import org.apache.plc4x.java.spi.messages.utils.ResponseItem;
import org.apache.plc4x.java.spi.model.DefaultPlcConsumerRegistration;
import org.apache.plc4x.java.spi.model.DefaultPlcSubscriptionTag;
import org.apache.plc4x.java.spi.transaction.RequestTransactionManager;

import java.time.Duration;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneOffset;
import java.time.*;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoField;
import java.time.temporal.Temporal;
import java.time.temporal.TemporalAmount;
import java.util.Calendar;
import java.util.Collection;
import java.util.TimeZone;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;

import static java.time.temporal.ChronoField.OFFSET_SECONDS;

public class Iec608705104Protocol extends Plc4xProtocolBase<APDU> implements HasConfiguration<Iec608705014Configuration>, PlcSubscriber, PlcBrowser {

private Iec608705014Configuration configuration;
private final RequestTransactionManager tm;

private int unconfirmedPackets;

private final Map<DefaultPlcConsumerRegistration, Consumer<PlcSubscriptionEvent>> consumers = new ConcurrentHashMap<>();

public Iec608705104Protocol() {
// We're starting with allowing only one message in-flight.
this.tm = new RequestTransactionManager(1);
Expand Down Expand Up @@ -129,13 +131,33 @@ else if (msg instanceof APDUIFormat){
}

@Override
public PlcConsumerRegistration register(Consumer<PlcSubscriptionEvent> consumer, Collection<PlcSubscriptionHandle> handles) {
return null;
public CompletableFuture<PlcSubscriptionResponse> subscribe(PlcSubscriptionRequest subscriptionRequest) {
Map<String, ResponseItem<PlcSubscriptionHandle>> values = new HashMap<>();
for (String tagName : subscriptionRequest.getTagNames()) {
final DefaultPlcSubscriptionTag tag = (DefaultPlcSubscriptionTag) subscriptionRequest.getTag(tagName);
if (!(tag.getTag() instanceof Iec608705104Tag)) {
values.put(tagName, new ResponseItem<>(PlcResponseCode.INVALID_ADDRESS, null));
} else {
values.put(tagName, new ResponseItem<>(PlcResponseCode.OK,
new Iec608705104SubscriptionHandle(this, (Iec608705104Tag) tag.getTag())));
}
}
return CompletableFuture.completedFuture(
new DefaultPlcSubscriptionResponse(subscriptionRequest, values));
}

@Override
public void unregister(PlcConsumerRegistration registration) {
public PlcConsumerRegistration register(Consumer<PlcSubscriptionEvent> consumer, Collection<PlcSubscriptionHandle> collection) {
final DefaultPlcConsumerRegistration consumerRegistration =
new DefaultPlcConsumerRegistration(this, consumer, collection.toArray(new PlcSubscriptionHandle[0]));
consumers.put(consumerRegistration, consumer);
return consumerRegistration;
}

@Override
public void unregister(PlcConsumerRegistration plcConsumerRegistration) {
DefaultPlcConsumerRegistration consumerRegistration = (DefaultPlcConsumerRegistration) plcConsumerRegistration;
consumers.remove(consumerRegistration);
}

protected void processData(ASDU asdu) {
Expand All @@ -159,7 +181,9 @@ protected void processData(ASDU asdu) {
} else {
eventTime = LocalDateTime.now();
}
System.out.println(DateTimeFormatter.ISO_DATE_TIME.format(eventTime) + ": " + tag + ": " + plcValue.toString());

// Send the event out to all subscribed listeners.
publishEvent(eventTime, tag, plcValue);
}
}

Expand All @@ -180,4 +204,28 @@ protected LocalDateTime convertCp56Time2aToCalendar(SevenOctetBinaryTime cp56Tim
.minus(localTimeZoneOffsetFromUTC);
}

protected void publishEvent(LocalDateTime timeStamp, Iec608705104Tag tag, PlcValue plcValue) {
// Create a subscription event from the input.
final PlcSubscriptionEvent event = new DefaultPlcSubscriptionEvent(
timeStamp.atZone(ZoneId.systemDefault()).toInstant(),
Collections.singletonMap(tag.toString(),
new ResponseItem<>(PlcResponseCode.OK, plcValue)));

// Try sending the subscription event to all listeners.
for (Map.Entry<DefaultPlcConsumerRegistration, Consumer<PlcSubscriptionEvent>> entry : consumers.entrySet()) {
final DefaultPlcConsumerRegistration registration = entry.getKey();
final Consumer<PlcSubscriptionEvent> consumer = entry.getValue();
// Only if the current data point matches the subscription, publish the event to it.
for (PlcSubscriptionHandle handle : registration.getSubscriptionHandles()) {
if (handle instanceof Iec608705104SubscriptionHandle) {
Iec608705104SubscriptionHandle subscriptionHandle = (Iec608705104SubscriptionHandle) handle;
// Check if the subscription matches this current event.
if (/*subscriptionHandle.getTag().matchesGroupAddress(groupAddress)*/true) {
consumer.accept(event);
}
}
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@

package org.apache.plc4x.java.iec608705104.readwrite.tag;

import org.apache.plc4x.java.api.exceptions.PlcInvalidTagException;
import org.apache.plc4x.java.api.model.ArrayInfo;
import org.apache.plc4x.java.api.model.PlcTag;
import org.apache.plc4x.java.api.types.PlcValueType;

import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.List;

public class Iec608705104Tag {
public class Iec608705104Tag implements PlcTag {

/*private static final Pattern IEC_60870_5_104_ADDRESS =
Pattern.compile("^(?<adsuAddrSingleValueGroup>(\\d|\\*))|((?<adsuAddrDoubleValueGroup1>(\\d|\\*))/(?<adsuAddrDoubleValueGroup2>(\\d|\\*)))/((?<objectAddressSingleValueGroup(\\d|\\*))|((?<objectAddressTrippleValueGroup1>(\\d|\\*))\\.(?<objectAddressTrippleValueGroup2>(\\d|\\*))\\.(?<objectAddressTrippleValueGroup3>(\\d|\\*))))");
Expand All @@ -50,6 +51,21 @@ public int getObjectAddress() {
return objectAddress;
}

@Override
public String getAddressString() {
return null;
}

@Override
public PlcValueType getPlcValueType() {
return PlcValueType.NULL;
}

@Override
public List<ArrayInfo> getArrayInfo() {
return PlcTag.super.getArrayInfo();
}

@Override
public String toString() {
return "Iec608705104Tag{" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class Iec608705104TagHandler implements PlcTagHandler {

@Override
public PlcTag parseTag(String tagAddress) {
return null;
return new Iec608705104Tag(0, 0);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@ public static void main(String[] args) throws Exception {
shutdown.complete(null);
}));
try (PlcConnection plcConnection = PlcDriverManager.getDefault().getConnectionManager().getConnection("iec-60870-5-104://192.168.23.10")) {
if(!plcConnection.getMetadata().canSubscribe()) {
throw new RuntimeException("Subscription not supported");
}

plcConnection.subscriptionRequestBuilder().addChangeOfStateTagAddress("all", "*").addPreRegisteredConsumer("all", plcSubscriptionEvent -> {
for (String tagName : plcSubscriptionEvent.getTagNames()) {
System.out.println(String.format("TS: %s, Addr: %s, Value; %s", plcSubscriptionEvent.getTimestamp().toString(), tagName, plcSubscriptionEvent.getPlcValue(tagName).toString()));
}
}).build().execute();

// Wait till shutdown.
shutdown.get();
}
Expand Down

0 comments on commit 77de2a1

Please sign in to comment.