From c840aebd0a4cc4cc40ec5321538f4415328cd901 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=9C=E9=A3=8E?= <74083801+ZY945@users.noreply.github.com> Date: Sun, 10 Dec 2023 08:48:06 +0800 Subject: [PATCH] monitoring the availability of websockets through handshake. (#1413) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: 东风 <1335799468@qq.com> --- .../websocket/WebsocketCollectImpl.java | 165 ++++++++++++++++++ .../collector/dispatch/DispatchConstants.java | 4 + ...ertzbeat.collector.collect.AbstractCollect | 1 + .../hertzbeat/common/entity/job/Metrics.java | 5 + .../job/protocol/WebsocketProtocol.java | 25 +++ .../main/resources/define/app-websocket.yml | 125 +++++++++++++ 6 files changed, 325 insertions(+) create mode 100644 collector/src/main/java/org/dromara/hertzbeat/collector/collect/websocket/WebsocketCollectImpl.java create mode 100644 common/src/main/java/org/dromara/hertzbeat/common/entity/job/protocol/WebsocketProtocol.java create mode 100644 manager/src/main/resources/define/app-websocket.yml diff --git a/collector/src/main/java/org/dromara/hertzbeat/collector/collect/websocket/WebsocketCollectImpl.java b/collector/src/main/java/org/dromara/hertzbeat/collector/collect/websocket/WebsocketCollectImpl.java new file mode 100644 index 00000000000..10458e2f6fa --- /dev/null +++ b/collector/src/main/java/org/dromara/hertzbeat/collector/collect/websocket/WebsocketCollectImpl.java @@ -0,0 +1,165 @@ +package org.dromara.hertzbeat.collector.collect.websocket; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.dromara.hertzbeat.collector.collect.AbstractCollect; +import org.dromara.hertzbeat.collector.dispatch.DispatchConstants; +import org.dromara.hertzbeat.common.constants.CollectorConstants; +import org.dromara.hertzbeat.common.constants.CommonConstants; +import org.dromara.hertzbeat.common.entity.job.Metrics; +import org.dromara.hertzbeat.common.entity.job.protocol.WebsocketProtocol; +import org.dromara.hertzbeat.common.entity.message.CollectRep; +import org.dromara.hertzbeat.common.util.CommonUtil; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketAddress; +import java.net.SocketTimeoutException; +import java.net.UnknownHostException; +import java.security.SecureRandom; +import java.util.Base64; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * @author dongfeng + */ +@Slf4j +public class WebsocketCollectImpl extends AbstractCollect { + public WebsocketCollectImpl() { + } + + @Override + public void collect(CollectRep.MetricsData.Builder builder, long monitorId, String app, Metrics metrics) { + long startTime = System.currentTimeMillis(); + if (metrics == null || metrics.getWebsocket() == null) { + builder.setCode(CollectRep.Code.FAIL); + builder.setMsg("Websocket collect must has Websocket params"); + return; + } + WebsocketProtocol WebsocketProtocol = metrics.getWebsocket(); + String host = WebsocketProtocol.getHost(); + String port = WebsocketProtocol.getPort(); + Socket socket = null; + try { + socket = new Socket(); + SocketAddress socketAddress = new InetSocketAddress(host, Integer.parseInt(port)); + socket.connect(socketAddress); + + if (socket.isConnected()) { + long responseTime = System.currentTimeMillis() - startTime; + OutputStream out = socket.getOutputStream(); + InputStream in = socket.getInputStream(); + + + send(out); + Map resultMap = readHeaders(in); + resultMap.put(CollectorConstants.RESPONSE_TIME, Long.toString(responseTime)); + + // 关闭输出流和Socket连接 + in.close(); + out.close(); + socket.close(); + List aliasFields = metrics.getAliasFields(); + CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder(); + for (String field : aliasFields) { + String fieldValue = resultMap.get(field); + valueRowBuilder.addColumns(Objects.requireNonNullElse(fieldValue, CommonConstants.NULL_VALUE)); + } + builder.addValues(valueRowBuilder.build()); + } else { + builder.setCode(CollectRep.Code.UN_CONNECTABLE); + builder.setMsg("Peer connect failed:"); + } + } catch (UnknownHostException unknownHostException) { + String errorMsg = CommonUtil.getMessageFromThrowable(unknownHostException); + log.info(errorMsg); + builder.setCode(CollectRep.Code.UN_CONNECTABLE); + builder.setMsg("UnknownHost:" + errorMsg); + } catch (SocketTimeoutException socketTimeoutException) { + String errorMsg = CommonUtil.getMessageFromThrowable(socketTimeoutException); + log.info(errorMsg); + builder.setCode(CollectRep.Code.UN_CONNECTABLE); + builder.setMsg("Socket connect timeout: " + errorMsg); + } catch (IOException ioException) { + String errorMsg = CommonUtil.getMessageFromThrowable(ioException); + log.info(errorMsg); + builder.setCode(CollectRep.Code.UN_CONNECTABLE); + builder.setMsg("Connect may fail:" + errorMsg); + } + } + + @Override + public String supportProtocol() { + return DispatchConstants.PROTOCOL_WEBSOCKET; + } + + private static void send(OutputStream out) throws IOException { + byte[] key = generateRandomKey(); + String base64Key = base64Encode(key); + String requestLine = "GET / HTTP/1.1\r\n"; + out.write(requestLine.getBytes()); + String hostName = InetAddress.getLocalHost().getHostAddress(); + out.write(("Host:" + hostName + "\r\n").getBytes()); + out.write("Upgrade: websocket\r\n".getBytes()); + out.write("Connection: Upgrade\r\n".getBytes()); + out.write("Sec-WebSocket-Version: 13\r\n".getBytes()); + out.write("Sec-WebSocket-Extensions: chat, superchat\r\n".getBytes()); + out.write(("Sec-WebSocket-Key: " + base64Key + "\r\n").getBytes()); + out.write("Content-Length: 0\r\n".getBytes()); + out.write("\r\n".getBytes()); + out.flush(); + } + + // 读取响应头 + private static Map readHeaders(InputStream in) throws IOException { + + Map map = new HashMap<>(8); + BufferedReader reader = new BufferedReader(new InputStreamReader(in)); + + String line; + while ((line = reader.readLine()) != null && !line.isEmpty()) { + int separatorIndex = line.indexOf(':'); + if (separatorIndex != -1) { + String key = line.substring(0, separatorIndex).trim(); + String value = line.substring(separatorIndex + 1).trim(); + // 首字母小写化 + map.put(StringUtils.uncapitalize(key), value); + } else { + // 切割HTTP/1.1, 101, Switching Protocols + String[] parts = line.split("\\s+", 3); + if (parts.length == 3) { + for (int i = 0; i < parts.length; i++) { + if (parts[i].startsWith("HTTP")) { + map.put("httpVersion", parts[i]); + } else if (Character.isDigit(parts[i].charAt(0))) { + map.put("responseCode", parts[i]); + } else { + map.put("statusMessage", parts[i]); + } + } + } + } + } + return map; + } + + private static byte[] generateRandomKey() { + SecureRandom secureRandom = new SecureRandom(); + byte[] key = new byte[16]; + secureRandom.nextBytes(key); + return key; + } + + private static String base64Encode(byte[] data) { + return Base64.getEncoder().encodeToString(data); + } +} diff --git a/collector/src/main/java/org/dromara/hertzbeat/collector/dispatch/DispatchConstants.java b/collector/src/main/java/org/dromara/hertzbeat/collector/dispatch/DispatchConstants.java index b67ea1506b3..daeb52114c4 100644 --- a/collector/src/main/java/org/dromara/hertzbeat/collector/dispatch/DispatchConstants.java +++ b/collector/src/main/java/org/dromara/hertzbeat/collector/dispatch/DispatchConstants.java @@ -43,6 +43,10 @@ public interface DispatchConstants { * protocol ntp */ String PROTOCOL_NTP = "ntp"; + /** + * protocol websocket + */ + String PROTOCOL_WEBSOCKET = "websocket"; /** * protocol udp */ diff --git a/collector/src/main/resources/META-INF/services/org.dromara.hertzbeat.collector.collect.AbstractCollect b/collector/src/main/resources/META-INF/services/org.dromara.hertzbeat.collector.collect.AbstractCollect index bce577cc0b9..0ec7b3a9232 100644 --- a/collector/src/main/resources/META-INF/services/org.dromara.hertzbeat.collector.collect.AbstractCollect +++ b/collector/src/main/resources/META-INF/services/org.dromara.hertzbeat.collector.collect.AbstractCollect @@ -10,6 +10,7 @@ org.dromara.hertzbeat.collector.collect.ssh.SshCollectImpl org.dromara.hertzbeat.collector.collect.telnet.TelnetCollectImpl org.dromara.hertzbeat.collector.collect.smtp.SmtpCollectImpl org.dromara.hertzbeat.collector.collect.ntp.NtpCollectImpl +org.dromara.hertzbeat.collector.collect.websocket.WebsocketCollectImpl org.dromara.hertzbeat.collector.collect.ftp.FtpCollectImpl org.dromara.hertzbeat.collector.collect.mq.RocketmqSingleCollectImpl org.dromara.hertzbeat.collector.collect.udp.UdpCollectImpl diff --git a/common/src/main/java/org/dromara/hertzbeat/common/entity/job/Metrics.java b/common/src/main/java/org/dromara/hertzbeat/common/entity/job/Metrics.java index b3950f2174a..c1e78c9a830 100644 --- a/common/src/main/java/org/dromara/hertzbeat/common/entity/job/Metrics.java +++ b/common/src/main/java/org/dromara/hertzbeat/common/entity/job/Metrics.java @@ -125,6 +125,11 @@ public class Metrics { * 使用ntp协议的监控配置信息 */ private NtpProtocol ntp; + /** + * Monitoring configuration information using the websocket protocol + * 使用websocket的监控配置信息 + */ + private WebsocketProtocol websocket; /** * Use udp implemented by socket for service port detection configuration information * 使用socket实现的udp进行服务端口探测配置信息 diff --git a/common/src/main/java/org/dromara/hertzbeat/common/entity/job/protocol/WebsocketProtocol.java b/common/src/main/java/org/dromara/hertzbeat/common/entity/job/protocol/WebsocketProtocol.java new file mode 100644 index 00000000000..cd390e1aa26 --- /dev/null +++ b/common/src/main/java/org/dromara/hertzbeat/common/entity/job/protocol/WebsocketProtocol.java @@ -0,0 +1,25 @@ +package org.dromara.hertzbeat.common.entity.job.protocol; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * @author dongfeng + */ +@Data +@Builder +@AllArgsConstructor +@NoArgsConstructor +public class WebsocketProtocol { + /** + * Websocket 主机ip或域名 + */ + private String host; + + /** + * Websocket 主机端口 + */ + private String port; +} diff --git a/manager/src/main/resources/define/app-websocket.yml b/manager/src/main/resources/define/app-websocket.yml new file mode 100644 index 00000000000..bf89443fb42 --- /dev/null +++ b/manager/src/main/resources/define/app-websocket.yml @@ -0,0 +1,125 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# The monitoring type category:service-application service monitoring db-database monitoring mid-middleware custom-custom monitoring os-operating system monitoring +# 监控类型所属类别:service-应用服务 program-应用程序 db-数据库 custom-自定义 os-操作系统 bigdata-大数据 mid-中间件 webserver-web服务器 cache-缓存 cn-云原生 network-网络监控等等 +category: service +# The monitoring type eg: linux windows tomcat mysql aws... +# 监控类型 eg: linux windows tomcat mysql aws... +app: websocket +# 监控类型国际化名称 +name: + zh-CN: WebSocket监控 + en-US: WebSocket MONITORS +# The description and help of this monitoring type +# 监控类型的帮助描述信息 +help: + zh-CN: HertzBeat 对 WebSocket 服务的首次握手的响应等相关指标进行监测。 + en-US: HertzBeat monitors metrics such as the response of the WebSocket service's first handshake. + zh-TW: HertzBeat對WebSocket服務的首次握手的響應等相關名額進行監測。 +# 监控所需输入参数定义(根据定义渲染页面UI) +# Input params define for monitoring(render web ui by the definition) +params: + # field-param field key + # field-字段名称标识符 + - field: host + # name-param field display i18n name + # name-参数字段显示名称 + name: + zh-CN: WebSocket服务的Host + en-US: Host of WebSocket service + # type-param field type(most mapping the html input type) + # type-字段类型,样式(大部分映射input标签type属性) + type: host + # required-true or false + # 是否是必输项 true-必填 false-可选 + required: true + # field-param field key + # field-字段名称标识符 + - field: port + # name-param field display i18n name + # name-参数字段显示名称 + name: + zh-CN: 端口 + en-US: Port + # type-param field type(most mapping the html input type) + # type-字段类型,样式(大部分映射input标签type属性) + type: number + # when type is number, range is required + # 当type为number时,用range表示范围 + range: '[0,65535]' + # required-true or false + # 是否是必输项 true-必填 false-可选 + required: true +# collect metrics config list +# 采集指标配置列表 +metrics: + # metrics - summary + # 监控指标 - summary + - name: summary + i18n: + zh-CN: 概要 + en-US: Summary + # metrics scheduling priority(0->127)->(high->low), metrics with the same priority will be scheduled in parallel + # priority 0's metrics is availability metrics, it will be scheduled first, only availability metrics collect success will the scheduling continue + # 指标采集调度优先级(0->127)->(优先级高->低) 优先级低的指标会等优先级高的指标采集完成后才会被调度, 相同优先级的指标会并行调度采集 + # 优先级为0的指标为可用性指标,即它会被首先调度,采集成功才会继续调度其它指标,采集失败则中断调度 + priority: 0 + # 指标信息 包括 field名称 type字段类型:0-number数字,1-string字符串 label-是否是指标标签字段 unit:指标单位 + # field-metric name, type-metric type(0-number,1-string), unit-metric unit('%','ms','MB'), label-whether it is a metrics label field + # field-指标名称, type-指标类型(0-number数字,1-string字符串), unit-指标单位('%','ms','MB'), label-是否是指标标签字段 + fields: + - field: responseTime + type: 0 + unit: ms + i18n: + zh-CN: 响应时间 + en-US: ResponseTime + - field: httpVersion + type: 1 + i18n: + zh-CN: http版本 + en-US: httpVersion + - field: responseCode + type: 1 + i18n: + zh-CN: 响应状态码 + en-US: responseCode + - field: statusMessage + type: 1 + i18n: + zh-CN: 状态消息 + en-US: statusMessage + - field: connection + type: 1 + i18n: + zh-CN: 是否支持升级(不存在表示是普通http请求,存在表示支持升级) + en-US: connection + - field: upgrade + type: 1 + i18n: + zh-CN: 升级(协议) + en-US: upgrade + + # the protocol used for monitoring, eg: sql, ssh, http, telnet, wmi, snmp, sdk + # 采集协议, 目前支持sql, ssh, http, telnet, wmi, snmp, sdk + protocol: websocket + # Specific collection configuration when protocol is telnet protocol + # 当protocol为telnet协议时具体的采集配置 + websocket: + # telnet host + # 远程登录主机 + host: ^_^host^_^ + port: ^_^port^_^