From 8c93bd1e582a05bd29b96aa45525895f79b65d9d Mon Sep 17 00:00:00 2001 From: Jochen Meierhofer Date: Thu, 7 Nov 2024 17:45:50 +0100 Subject: [PATCH 1/4] fix(plc4j/drivers/s7): fix NoSuchElementException when watchdog ChannelHandler was not added yet & fix unnecessary warning in logfile/console due to exception "MessageToMessageCodec$1 must produce at least one message." --- .../java/s7/readwrite/protocol/S7HMuxImpl.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7HMuxImpl.java b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7HMuxImpl.java index d58b8e32c40..d59eb17048c 100644 --- a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7HMuxImpl.java +++ b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7HMuxImpl.java @@ -20,6 +20,7 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.embedded.EmbeddedChannel; @@ -130,9 +131,8 @@ protected void encode(ChannelHandlerContext ctx, ByteBuf outBB, List lis if ((embedCtx == null) && (ctx.channel() instanceof EmbeddedChannel)) embedCtx = ctx; if ((tcpChannel != null) && (embedCtx == ctx)) { tcpChannel.writeAndFlush(outBB.copy()); - } else { - list.add(outBB.copy()); } + list.add(outBB.copy()); } /* @@ -175,9 +175,14 @@ public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exceptio public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { super.userEventTriggered(ctx, evt); logger.info(LocalTime.now() + " userEventTriggered: " + ctx.name() + " Event: " + evt); + if (evt instanceof ConnectedEvent) { try { - tcpChannel.pipeline().remove("watchdog"); + ChannelHandler watchdog = tcpChannel.pipeline().get("watchdog"); + if (watchdog != null) { + tcpChannel.pipeline().remove(watchdog); + } + } catch (Exception ex) { logger.info(ex.toString()); } @@ -199,7 +204,6 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc if (evt instanceof DisconnectEvent) { logger.info("DisconnectEvent"); } - } @Override @@ -250,7 +254,6 @@ public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { embededChannel.pipeline().fireUserEventTriggered(new ConnectEvent()); } } - ; if ((tcpChannel == secondaryChannel) && @@ -265,10 +268,8 @@ public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { embededChannel.pipeline().fireUserEventTriggered(new ConnectEvent()); } } - } - @Override public void setEmbededhannel(Channel embeded_channel, PlcConnectionConfiguration configuration) { final S7Configuration conf = (S7Configuration) configuration; @@ -351,5 +352,4 @@ public Channel getTCPChannel() { return tcpChannel; } - } From cd032383a4d094437924a1a140cdfeb4d3aeffdc Mon Sep 17 00:00:00 2001 From: Jochen Meierhofer Date: Thu, 7 Nov 2024 17:46:13 +0100 Subject: [PATCH 2/4] fix(plc4j/drivers/s7): fix some typos --- .../java/s7/readwrite/protocol/S7HPlcConnection.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7HPlcConnection.java b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7HPlcConnection.java index ea0a1e258b6..597dfdb8b75 100644 --- a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7HPlcConnection.java +++ b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/protocol/S7HPlcConnection.java @@ -301,7 +301,7 @@ public void doSecondaryTcpConnections() { * The user application must take the measures to make the connection again. */ protected void sendChannelDisconectEvent() { - logger.trace("Channels was not created, firing DisconnectEvent Event"); + logger.trace("Channel was not created, firing DisconnectEvent Event"); // Send an event to the pipeline telling the Protocol filters what's going on. channel.pipeline().fireUserEventTriggered(new DisconnectEvent()); } @@ -351,7 +351,7 @@ public void run() { if (primaryChannel != null) { if (!primaryChannel.isActive()) { - logger.info("Creating prymary connection."); + logger.info("Creating primary connection."); primaryChannel.eventLoop().shutdownGracefully(); doPrimaryTcpConnections(); } else if (null == secondaryChannel) { @@ -364,7 +364,7 @@ public void run() { } } } else { - logger.info("Creating firts prymary connection."); + logger.info("Creating first primary connection."); doPrimaryTcpConnections(); } @@ -384,7 +384,7 @@ public void run() { } } else { if (secondaryChannelFactory != null) { - logger.info("Creating firts secondary connection."); + logger.info("Creating first secondary connection."); doSecondaryTcpConnections(); } } From 5b1cc09e642745a7f7324be38ad6268233efb5ff Mon Sep 17 00:00:00 2001 From: Jochen Meierhofer Date: Fri, 8 Nov 2024 09:29:01 +0100 Subject: [PATCH 3/4] fix(plc4j/drivers/s7): delete 2 duplicate classes, the used ones are located in org.apache.plc4x.java.s7.readwrite.protocol --- .../java/s7/readwrite/connection/S7HMux.java | 29 -- .../s7/readwrite/connection/S7HMuxImpl.java | 311 ------------------ 2 files changed, 340 deletions(-) delete mode 100644 plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/connection/S7HMux.java delete mode 100644 plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/connection/S7HMuxImpl.java diff --git a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/connection/S7HMux.java b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/connection/S7HMux.java deleted file mode 100644 index 53ad40b951a..00000000000 --- a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/connection/S7HMux.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.plc4x.java.s7.readwrite.connection; - -import io.netty.channel.Channel; - -public interface S7HMux { - void setEmbeddedChannel(Channel embeded_channel); - - void setPrimaryChannel(Channel primary_channel); - - void setSecondaryChannel(Channel secondary_channel); -} diff --git a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/connection/S7HMuxImpl.java b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/connection/S7HMuxImpl.java deleted file mode 100644 index 83703f2ab21..00000000000 --- a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/connection/S7HMuxImpl.java +++ /dev/null @@ -1,311 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.plc4x.java.s7.readwrite.connection; - -import io.netty.buffer.ByteBuf; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandler.Sharable; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.embedded.EmbeddedChannel; -import io.netty.handler.codec.MessageToMessageCodec; -import io.netty.handler.timeout.ReadTimeoutHandler; -import io.netty.util.AttributeKey; -import org.apache.plc4x.java.spi.events.ConnectEvent; -import org.apache.plc4x.java.spi.events.DisconnectEvent; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.time.LocalTime; -import java.util.List; - -/** - * Implementation of a multiplexing channel, from an embedded channel to two - * possible TCP connections, primary and secondary. - * The objective is to allow connections to individual systems - * with a two CP (PN CPUs, CP343-1, CP443-1 or similar), or H-type systems - * (S7-400H or S7-1500H). - *

- * The user App must be in charge of restoring the requests or - * subscriptions that it is requesting. - */ -@Sharable -public class S7HMuxImpl extends MessageToMessageCodec implements S7HMux { - - private static final Logger logger = LoggerFactory.getLogger(S7HMuxImpl.class); - - /* - * This attribute indicates to the other handlers that the channel is connected - * or disconnected because a switch is being made between TCP channels or - * both TCP channels are disconnected. - * Default value: false - */ - public final static AttributeKey IS_CONNECTED = AttributeKey.valueOf("IS_CONNECTED"); - - /* - * This attribute indicates to the other handlers which channel is being used, - * this allows the request to be properly prepared. - * For example, in the case of a CPU with two CPs, you should change - * the "slot", in the case of H systems, you should change the "rack", - * the correct values will be defined in the connection URL. - * Default value: true - */ - public final static AttributeKey IS_PRIMARY = AttributeKey.valueOf("IS_PRIMARY"); - - /* - * This is the maximum waiting time for reading on the TCP channel. - * As there is no traffic, it must be assumed that the connection with the - * interlocutor was lost and it must be restarted. - * When the channel is closed, the "fail over" is carried out - * in case of having the secondary channel, or it is expected that it - * will be restored automatically, which is done every 4 seconds. - * Default value: 8 sec. - */ - public final static AttributeKey READ_TIME_OUT = AttributeKey.valueOf("READ_TIME_OUT"); - - /* - * If your application requires sampling times greater than the - * set "watchdog" time, it is important that the PING option is activated, - * this will prevent the TCP channel from being closed unnecessarily. - * Default value: false - */ - public final static AttributeKey IS_PING_ACTIVE = AttributeKey.valueOf("IS_PIN_ACTIVE"); - - /* - * Time value in seconds at which the execution of the PING will be scheduled. - * Generally set by developer experience, but generally should be the same - * as READ_TIME_OUT / 2. - * Default value: -1 - */ - public final static AttributeKey PING_TIME = AttributeKey.valueOf("PING_TIME"); - - /* - * Time for supervision of TCP channels. If the channel is not active, - * a safe stop of the EventLoop must be performed, to ensure that - * no additional tasks are created. - * Default value: 4 - */ - public final static AttributeKey RETRY_TIME = AttributeKey.valueOf("RETRY_TIME"); - - ChannelHandlerContext embedCtx = null; - protected Channel embededChannel = null; - protected Channel tcpChannel = null; - protected Channel primaryChannel = null; - protected Channel secondaryChannel = null; - - /* - * From S7ProtocolLogic - * TODO: Evaluate if the "embed_ctx" is really required since we set - * the Embedded channel when we created it. - */ - @Override - protected void encode(ChannelHandlerContext ctx, ByteBuf outBB, List list) { - logger.debug("ENCODE: {}", outBB.toString()); - if ((embedCtx == null) && (ctx.channel() instanceof EmbeddedChannel)) { - embedCtx = ctx; - } - if ((tcpChannel != null) && (embedCtx == ctx)) { - tcpChannel.writeAndFlush(outBB.copy()); - } else { - list.add(outBB.copy()); - } - } - - /* - * To S7ProtocolLogic - * The information received here from the channel "tcp_channel" is sent to - * the pipeline of the channel "embedded_channel" - */ - @Override - protected void decode(ChannelHandlerContext ctx, ByteBuf inbb, List list) throws Exception { - embedCtx.fireChannelRead(inbb.copy()); - } - - @Override - public void channelRegistered(ChannelHandlerContext ctx) throws Exception { - super.channelRegistered(ctx); - logger.debug("channelRegistered: {}", ctx.name()); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - super.exceptionCaught(ctx, cause); - logger.debug("exceptionCaught: {}", ctx.name()); - } - - @Override - public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { - super.channelWritabilityChanged(ctx); - logger.debug("channelWritabilityChanged: {}", ctx.name()); - } - - /* - * The events detected here flow from the S7ProtocolLogic object. - * Upon receiving the "ConnectEvent" event, we must safely add the watchdog - * to the pipeline of the "tcp_channel" connection. - * The supervision time can be defined in the connection URL, - * the default value being 8 secs, this value being defined experimentally. - */ - @Override - public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { - super.userEventTriggered(ctx, evt); - logger.info("{} userEventTriggered: {} Event: {}", LocalTime.now(), ctx.name(), evt); - if (evt instanceof ConnectEvent) { - try { - tcpChannel.pipeline().remove("watchdog"); - } catch (Exception ex) { - logger.info(ex.toString()); - } - try { - tcpChannel.pipeline().addFirst("watchdog", new ReadTimeoutHandler(30)); - if (tcpChannel.isActive()) { - embededChannel.attr(IS_CONNECTED).set(true); - } else { - embededChannel.attr(IS_CONNECTED).set(false); - } - } catch (Exception ex) { - logger.info(ex.toString()); - } - } - - if (evt instanceof DisconnectEvent) { - logger.info("DisconnectEvent"); - } - - } - - @Override - public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { - super.channelReadComplete(ctx); - logger.debug("{} channelReadComplete: {}", LocalTime.now(), ctx.name()); - } - - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - super.channelInactive(ctx); - logger.debug("channelInactive: {}", ctx.name()); - } - - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - super.channelActive(ctx); - logger.debug("channelActive: {}", ctx.name()); - } - - @Override - public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { - super.channelUnregistered(ctx); - logger.debug("{} channelUnregistered: {}", LocalTime.now(), ctx.name()); - String strCanal = (tcpChannel == primaryChannel) ? "PRIMARY" : "SECONDARY"; - logger.debug("Unregistered of channel: {}", strCanal); - //TODO: If embedded channel is closed, we need close all channels - if (ctx == embedCtx) return; - - if (tcpChannel == ctx.channel()) - embededChannel.attr(IS_CONNECTED).set(false); - - logger.info(embedCtx.executor().toString()); - - if ((tcpChannel == primaryChannel) && - (primaryChannel == ctx.channel())) - if ((!primaryChannel.isActive()) && - (secondaryChannel != null)) { - if (secondaryChannel.isActive()) - synchronized (tcpChannel) { - logger.info("Using secondary TCP channel."); - tcpChannel = secondaryChannel; - embededChannel.attr(IS_PRIMARY).set(false); - embededChannel.pipeline().fireUserEventTriggered(new ConnectEvent()); - } - } - - - if ((tcpChannel == secondaryChannel) && - (secondaryChannel == ctx.channel())) - if ((!secondaryChannel.isActive() && - (primaryChannel.isActive()))) { - synchronized (tcpChannel) { - logger.info("Using primary TCP channel."); - tcpChannel = primaryChannel; - embededChannel.attr(IS_PRIMARY).set(true); - embededChannel.pipeline().fireUserEventTriggered(new ConnectEvent()); - } - } - } - - - @Override - public void setEmbeddedChannel(Channel embeded_channel) { - this.embededChannel = embeded_channel; - this.embededChannel.attr(IS_CONNECTED).set(false); - this.embededChannel.attr(IS_PRIMARY).set(true); - this.embededChannel.attr(READ_TIME_OUT).set(8); - this.embededChannel.attr(IS_PING_ACTIVE).set(false); - this.embededChannel.attr(PING_TIME).set(-1); - this.embededChannel.attr(RETRY_TIME).set(8); - } - - public void setPrimaryChannel(Channel primary_channel) { - if ((this.primaryChannel == null) && (tcpChannel == null)) { - if (primary_channel != null) { - this.primaryChannel = primary_channel; - tcpChannel = primary_channel; - embededChannel.attr(IS_PRIMARY).set(true); - } - } else if ((!this.primaryChannel.isActive()) && (tcpChannel == secondaryChannel)) { - this.primaryChannel = primary_channel; - } else if ((!this.primaryChannel.isActive()) && (tcpChannel == this.primaryChannel)) { - synchronized (tcpChannel) { - tcpChannel.close(); - this.primaryChannel = primary_channel; - tcpChannel = primary_channel; - embededChannel.attr(IS_PRIMARY).set(true); - if (tcpChannel.isActive()) { - embedCtx.fireUserEventTriggered(new ConnectEvent()); - } - } - } - } - - @Override - public void setSecondaryChannel(Channel secondary_channel) { - if ((this.primaryChannel == null) && (tcpChannel == null)) { - if (secondary_channel != null) { - this.secondaryChannel = secondary_channel; - tcpChannel = secondary_channel; - embededChannel.attr(IS_PRIMARY).set(false); - } - } else if ((this.secondaryChannel == null) || (tcpChannel == primaryChannel)) { - this.secondaryChannel = secondary_channel; - } else if ((!this.secondaryChannel.isActive()) && (tcpChannel == primaryChannel)) { - this.secondaryChannel = secondary_channel; - } else if ((!this.secondaryChannel.isActive()) && (tcpChannel == this.secondaryChannel)) { - synchronized (tcpChannel) { - tcpChannel.close(); - this.secondaryChannel = secondary_channel; - tcpChannel = secondary_channel; - embededChannel.attr(IS_PRIMARY).set(false); - } - if (tcpChannel.isActive()) { - embedCtx.fireUserEventTriggered(new ConnectEvent()); - } - } - } - - -} From 1dae268b94ddbc288472e7f15d9ef79b539eb047 Mon Sep 17 00:00:00 2001 From: Jochen Meierhofer Date: Fri, 8 Nov 2024 09:41:08 +0100 Subject: [PATCH 4/4] Revert "fix(plc4j/drivers/s7): delete 2 duplicate classes, the used ones are located in org.apache.plc4x.java.s7.readwrite.protocol" This reverts commit 5b1cc09e642745a7f7324be38ad6268233efb5ff. --- .../java/s7/readwrite/connection/S7HMux.java | 29 ++ .../s7/readwrite/connection/S7HMuxImpl.java | 311 ++++++++++++++++++ 2 files changed, 340 insertions(+) create mode 100644 plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/connection/S7HMux.java create mode 100644 plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/connection/S7HMuxImpl.java diff --git a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/connection/S7HMux.java b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/connection/S7HMux.java new file mode 100644 index 00000000000..53ad40b951a --- /dev/null +++ b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/connection/S7HMux.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.plc4x.java.s7.readwrite.connection; + +import io.netty.channel.Channel; + +public interface S7HMux { + void setEmbeddedChannel(Channel embeded_channel); + + void setPrimaryChannel(Channel primary_channel); + + void setSecondaryChannel(Channel secondary_channel); +} diff --git a/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/connection/S7HMuxImpl.java b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/connection/S7HMuxImpl.java new file mode 100644 index 00000000000..83703f2ab21 --- /dev/null +++ b/plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/readwrite/connection/S7HMuxImpl.java @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.plc4x.java.s7.readwrite.connection; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.handler.codec.MessageToMessageCodec; +import io.netty.handler.timeout.ReadTimeoutHandler; +import io.netty.util.AttributeKey; +import org.apache.plc4x.java.spi.events.ConnectEvent; +import org.apache.plc4x.java.spi.events.DisconnectEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.LocalTime; +import java.util.List; + +/** + * Implementation of a multiplexing channel, from an embedded channel to two + * possible TCP connections, primary and secondary. + * The objective is to allow connections to individual systems + * with a two CP (PN CPUs, CP343-1, CP443-1 or similar), or H-type systems + * (S7-400H or S7-1500H). + *

+ * The user App must be in charge of restoring the requests or + * subscriptions that it is requesting. + */ +@Sharable +public class S7HMuxImpl extends MessageToMessageCodec implements S7HMux { + + private static final Logger logger = LoggerFactory.getLogger(S7HMuxImpl.class); + + /* + * This attribute indicates to the other handlers that the channel is connected + * or disconnected because a switch is being made between TCP channels or + * both TCP channels are disconnected. + * Default value: false + */ + public final static AttributeKey IS_CONNECTED = AttributeKey.valueOf("IS_CONNECTED"); + + /* + * This attribute indicates to the other handlers which channel is being used, + * this allows the request to be properly prepared. + * For example, in the case of a CPU with two CPs, you should change + * the "slot", in the case of H systems, you should change the "rack", + * the correct values will be defined in the connection URL. + * Default value: true + */ + public final static AttributeKey IS_PRIMARY = AttributeKey.valueOf("IS_PRIMARY"); + + /* + * This is the maximum waiting time for reading on the TCP channel. + * As there is no traffic, it must be assumed that the connection with the + * interlocutor was lost and it must be restarted. + * When the channel is closed, the "fail over" is carried out + * in case of having the secondary channel, or it is expected that it + * will be restored automatically, which is done every 4 seconds. + * Default value: 8 sec. + */ + public final static AttributeKey READ_TIME_OUT = AttributeKey.valueOf("READ_TIME_OUT"); + + /* + * If your application requires sampling times greater than the + * set "watchdog" time, it is important that the PING option is activated, + * this will prevent the TCP channel from being closed unnecessarily. + * Default value: false + */ + public final static AttributeKey IS_PING_ACTIVE = AttributeKey.valueOf("IS_PIN_ACTIVE"); + + /* + * Time value in seconds at which the execution of the PING will be scheduled. + * Generally set by developer experience, but generally should be the same + * as READ_TIME_OUT / 2. + * Default value: -1 + */ + public final static AttributeKey PING_TIME = AttributeKey.valueOf("PING_TIME"); + + /* + * Time for supervision of TCP channels. If the channel is not active, + * a safe stop of the EventLoop must be performed, to ensure that + * no additional tasks are created. + * Default value: 4 + */ + public final static AttributeKey RETRY_TIME = AttributeKey.valueOf("RETRY_TIME"); + + ChannelHandlerContext embedCtx = null; + protected Channel embededChannel = null; + protected Channel tcpChannel = null; + protected Channel primaryChannel = null; + protected Channel secondaryChannel = null; + + /* + * From S7ProtocolLogic + * TODO: Evaluate if the "embed_ctx" is really required since we set + * the Embedded channel when we created it. + */ + @Override + protected void encode(ChannelHandlerContext ctx, ByteBuf outBB, List list) { + logger.debug("ENCODE: {}", outBB.toString()); + if ((embedCtx == null) && (ctx.channel() instanceof EmbeddedChannel)) { + embedCtx = ctx; + } + if ((tcpChannel != null) && (embedCtx == ctx)) { + tcpChannel.writeAndFlush(outBB.copy()); + } else { + list.add(outBB.copy()); + } + } + + /* + * To S7ProtocolLogic + * The information received here from the channel "tcp_channel" is sent to + * the pipeline of the channel "embedded_channel" + */ + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf inbb, List list) throws Exception { + embedCtx.fireChannelRead(inbb.copy()); + } + + @Override + public void channelRegistered(ChannelHandlerContext ctx) throws Exception { + super.channelRegistered(ctx); + logger.debug("channelRegistered: {}", ctx.name()); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + super.exceptionCaught(ctx, cause); + logger.debug("exceptionCaught: {}", ctx.name()); + } + + @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { + super.channelWritabilityChanged(ctx); + logger.debug("channelWritabilityChanged: {}", ctx.name()); + } + + /* + * The events detected here flow from the S7ProtocolLogic object. + * Upon receiving the "ConnectEvent" event, we must safely add the watchdog + * to the pipeline of the "tcp_channel" connection. + * The supervision time can be defined in the connection URL, + * the default value being 8 secs, this value being defined experimentally. + */ + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + super.userEventTriggered(ctx, evt); + logger.info("{} userEventTriggered: {} Event: {}", LocalTime.now(), ctx.name(), evt); + if (evt instanceof ConnectEvent) { + try { + tcpChannel.pipeline().remove("watchdog"); + } catch (Exception ex) { + logger.info(ex.toString()); + } + try { + tcpChannel.pipeline().addFirst("watchdog", new ReadTimeoutHandler(30)); + if (tcpChannel.isActive()) { + embededChannel.attr(IS_CONNECTED).set(true); + } else { + embededChannel.attr(IS_CONNECTED).set(false); + } + } catch (Exception ex) { + logger.info(ex.toString()); + } + } + + if (evt instanceof DisconnectEvent) { + logger.info("DisconnectEvent"); + } + + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + super.channelReadComplete(ctx); + logger.debug("{} channelReadComplete: {}", LocalTime.now(), ctx.name()); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + super.channelInactive(ctx); + logger.debug("channelInactive: {}", ctx.name()); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + super.channelActive(ctx); + logger.debug("channelActive: {}", ctx.name()); + } + + @Override + public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { + super.channelUnregistered(ctx); + logger.debug("{} channelUnregistered: {}", LocalTime.now(), ctx.name()); + String strCanal = (tcpChannel == primaryChannel) ? "PRIMARY" : "SECONDARY"; + logger.debug("Unregistered of channel: {}", strCanal); + //TODO: If embedded channel is closed, we need close all channels + if (ctx == embedCtx) return; + + if (tcpChannel == ctx.channel()) + embededChannel.attr(IS_CONNECTED).set(false); + + logger.info(embedCtx.executor().toString()); + + if ((tcpChannel == primaryChannel) && + (primaryChannel == ctx.channel())) + if ((!primaryChannel.isActive()) && + (secondaryChannel != null)) { + if (secondaryChannel.isActive()) + synchronized (tcpChannel) { + logger.info("Using secondary TCP channel."); + tcpChannel = secondaryChannel; + embededChannel.attr(IS_PRIMARY).set(false); + embededChannel.pipeline().fireUserEventTriggered(new ConnectEvent()); + } + } + + + if ((tcpChannel == secondaryChannel) && + (secondaryChannel == ctx.channel())) + if ((!secondaryChannel.isActive() && + (primaryChannel.isActive()))) { + synchronized (tcpChannel) { + logger.info("Using primary TCP channel."); + tcpChannel = primaryChannel; + embededChannel.attr(IS_PRIMARY).set(true); + embededChannel.pipeline().fireUserEventTriggered(new ConnectEvent()); + } + } + } + + + @Override + public void setEmbeddedChannel(Channel embeded_channel) { + this.embededChannel = embeded_channel; + this.embededChannel.attr(IS_CONNECTED).set(false); + this.embededChannel.attr(IS_PRIMARY).set(true); + this.embededChannel.attr(READ_TIME_OUT).set(8); + this.embededChannel.attr(IS_PING_ACTIVE).set(false); + this.embededChannel.attr(PING_TIME).set(-1); + this.embededChannel.attr(RETRY_TIME).set(8); + } + + public void setPrimaryChannel(Channel primary_channel) { + if ((this.primaryChannel == null) && (tcpChannel == null)) { + if (primary_channel != null) { + this.primaryChannel = primary_channel; + tcpChannel = primary_channel; + embededChannel.attr(IS_PRIMARY).set(true); + } + } else if ((!this.primaryChannel.isActive()) && (tcpChannel == secondaryChannel)) { + this.primaryChannel = primary_channel; + } else if ((!this.primaryChannel.isActive()) && (tcpChannel == this.primaryChannel)) { + synchronized (tcpChannel) { + tcpChannel.close(); + this.primaryChannel = primary_channel; + tcpChannel = primary_channel; + embededChannel.attr(IS_PRIMARY).set(true); + if (tcpChannel.isActive()) { + embedCtx.fireUserEventTriggered(new ConnectEvent()); + } + } + } + } + + @Override + public void setSecondaryChannel(Channel secondary_channel) { + if ((this.primaryChannel == null) && (tcpChannel == null)) { + if (secondary_channel != null) { + this.secondaryChannel = secondary_channel; + tcpChannel = secondary_channel; + embededChannel.attr(IS_PRIMARY).set(false); + } + } else if ((this.secondaryChannel == null) || (tcpChannel == primaryChannel)) { + this.secondaryChannel = secondary_channel; + } else if ((!this.secondaryChannel.isActive()) && (tcpChannel == primaryChannel)) { + this.secondaryChannel = secondary_channel; + } else if ((!this.secondaryChannel.isActive()) && (tcpChannel == this.secondaryChannel)) { + synchronized (tcpChannel) { + tcpChannel.close(); + this.secondaryChannel = secondary_channel; + tcpChannel = secondary_channel; + embededChannel.attr(IS_PRIMARY).set(false); + } + if (tcpChannel.isActive()) { + embedCtx.fireUserEventTriggered(new ConnectEvent()); + } + } + } + + +}