Skip to content

Commit

Permalink
monitoring the availability of websockets through handshake. (#1413)
Browse files Browse the repository at this point in the history
Co-authored-by: 东风 <[email protected]>
  • Loading branch information
ZY945 and 东风 authored Dec 10, 2023
1 parent 3204ad6 commit c840aeb
Show file tree
Hide file tree
Showing 6 changed files with 325 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package org.dromara.hertzbeat.collector.collect.websocket;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.dromara.hertzbeat.collector.collect.AbstractCollect;
import org.dromara.hertzbeat.collector.dispatch.DispatchConstants;
import org.dromara.hertzbeat.common.constants.CollectorConstants;
import org.dromara.hertzbeat.common.constants.CommonConstants;
import org.dromara.hertzbeat.common.entity.job.Metrics;
import org.dromara.hertzbeat.common.entity.job.protocol.WebsocketProtocol;
import org.dromara.hertzbeat.common.entity.message.CollectRep;
import org.dromara.hertzbeat.common.util.CommonUtil;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.security.SecureRandom;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
* @author dongfeng
*/
@Slf4j
public class WebsocketCollectImpl extends AbstractCollect {
public WebsocketCollectImpl() {
}

@Override
public void collect(CollectRep.MetricsData.Builder builder, long monitorId, String app, Metrics metrics) {
long startTime = System.currentTimeMillis();
if (metrics == null || metrics.getWebsocket() == null) {
builder.setCode(CollectRep.Code.FAIL);
builder.setMsg("Websocket collect must has Websocket params");
return;
}
WebsocketProtocol WebsocketProtocol = metrics.getWebsocket();
String host = WebsocketProtocol.getHost();
String port = WebsocketProtocol.getPort();
Socket socket = null;
try {
socket = new Socket();
SocketAddress socketAddress = new InetSocketAddress(host, Integer.parseInt(port));
socket.connect(socketAddress);

if (socket.isConnected()) {
long responseTime = System.currentTimeMillis() - startTime;
OutputStream out = socket.getOutputStream();
InputStream in = socket.getInputStream();


send(out);
Map<String, String> resultMap = readHeaders(in);
resultMap.put(CollectorConstants.RESPONSE_TIME, Long.toString(responseTime));

// 关闭输出流和Socket连接
in.close();
out.close();
socket.close();
List<String> aliasFields = metrics.getAliasFields();
CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder();
for (String field : aliasFields) {
String fieldValue = resultMap.get(field);
valueRowBuilder.addColumns(Objects.requireNonNullElse(fieldValue, CommonConstants.NULL_VALUE));
}
builder.addValues(valueRowBuilder.build());
} else {
builder.setCode(CollectRep.Code.UN_CONNECTABLE);
builder.setMsg("Peer connect failed:");
}
} catch (UnknownHostException unknownHostException) {
String errorMsg = CommonUtil.getMessageFromThrowable(unknownHostException);
log.info(errorMsg);
builder.setCode(CollectRep.Code.UN_CONNECTABLE);
builder.setMsg("UnknownHost:" + errorMsg);
} catch (SocketTimeoutException socketTimeoutException) {
String errorMsg = CommonUtil.getMessageFromThrowable(socketTimeoutException);
log.info(errorMsg);
builder.setCode(CollectRep.Code.UN_CONNECTABLE);
builder.setMsg("Socket connect timeout: " + errorMsg);
} catch (IOException ioException) {
String errorMsg = CommonUtil.getMessageFromThrowable(ioException);
log.info(errorMsg);
builder.setCode(CollectRep.Code.UN_CONNECTABLE);
builder.setMsg("Connect may fail:" + errorMsg);
}
}

@Override
public String supportProtocol() {
return DispatchConstants.PROTOCOL_WEBSOCKET;
}

private static void send(OutputStream out) throws IOException {
byte[] key = generateRandomKey();
String base64Key = base64Encode(key);
String requestLine = "GET / HTTP/1.1\r\n";
out.write(requestLine.getBytes());
String hostName = InetAddress.getLocalHost().getHostAddress();
out.write(("Host:" + hostName + "\r\n").getBytes());
out.write("Upgrade: websocket\r\n".getBytes());
out.write("Connection: Upgrade\r\n".getBytes());
out.write("Sec-WebSocket-Version: 13\r\n".getBytes());
out.write("Sec-WebSocket-Extensions: chat, superchat\r\n".getBytes());
out.write(("Sec-WebSocket-Key: " + base64Key + "\r\n").getBytes());
out.write("Content-Length: 0\r\n".getBytes());
out.write("\r\n".getBytes());
out.flush();
}

// 读取响应头
private static Map<String, String> readHeaders(InputStream in) throws IOException {

Map<String, String> map = new HashMap<>(8);
BufferedReader reader = new BufferedReader(new InputStreamReader(in));

String line;
while ((line = reader.readLine()) != null && !line.isEmpty()) {
int separatorIndex = line.indexOf(':');
if (separatorIndex != -1) {
String key = line.substring(0, separatorIndex).trim();
String value = line.substring(separatorIndex + 1).trim();
// 首字母小写化
map.put(StringUtils.uncapitalize(key), value);
} else {
// 切割HTTP/1.1, 101, Switching Protocols
String[] parts = line.split("\\s+", 3);
if (parts.length == 3) {
for (int i = 0; i < parts.length; i++) {
if (parts[i].startsWith("HTTP")) {
map.put("httpVersion", parts[i]);
} else if (Character.isDigit(parts[i].charAt(0))) {
map.put("responseCode", parts[i]);
} else {
map.put("statusMessage", parts[i]);
}
}
}
}
}
return map;
}

private static byte[] generateRandomKey() {
SecureRandom secureRandom = new SecureRandom();
byte[] key = new byte[16];
secureRandom.nextBytes(key);
return key;
}

private static String base64Encode(byte[] data) {
return Base64.getEncoder().encodeToString(data);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ public interface DispatchConstants {
* protocol ntp
*/
String PROTOCOL_NTP = "ntp";
/**
* protocol websocket
*/
String PROTOCOL_WEBSOCKET = "websocket";
/**
* protocol udp
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ org.dromara.hertzbeat.collector.collect.ssh.SshCollectImpl
org.dromara.hertzbeat.collector.collect.telnet.TelnetCollectImpl
org.dromara.hertzbeat.collector.collect.smtp.SmtpCollectImpl
org.dromara.hertzbeat.collector.collect.ntp.NtpCollectImpl
org.dromara.hertzbeat.collector.collect.websocket.WebsocketCollectImpl
org.dromara.hertzbeat.collector.collect.ftp.FtpCollectImpl
org.dromara.hertzbeat.collector.collect.mq.RocketmqSingleCollectImpl
org.dromara.hertzbeat.collector.collect.udp.UdpCollectImpl
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@ public class Metrics {
* 使用ntp协议的监控配置信息
*/
private NtpProtocol ntp;
/**
* Monitoring configuration information using the websocket protocol
* 使用websocket的监控配置信息
*/
private WebsocketProtocol websocket;
/**
* Use udp implemented by socket for service port detection configuration information
* 使用socket实现的udp进行服务端口探测配置信息
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package org.dromara.hertzbeat.common.entity.job.protocol;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
* @author dongfeng
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class WebsocketProtocol {
/**
* Websocket 主机ip或域名
*/
private String host;

/**
* Websocket 主机端口
*/
private String port;
}
125 changes: 125 additions & 0 deletions manager/src/main/resources/define/app-websocket.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# The monitoring type category:service-application service monitoring db-database monitoring mid-middleware custom-custom monitoring os-operating system monitoring
# 监控类型所属类别:service-应用服务 program-应用程序 db-数据库 custom-自定义 os-操作系统 bigdata-大数据 mid-中间件 webserver-web服务器 cache-缓存 cn-云原生 network-网络监控等等
category: service
# The monitoring type eg: linux windows tomcat mysql aws...
# 监控类型 eg: linux windows tomcat mysql aws...
app: websocket
# 监控类型国际化名称
name:
zh-CN: WebSocket监控
en-US: WebSocket MONITORS
# The description and help of this monitoring type
# 监控类型的帮助描述信息
help:
zh-CN: HertzBeat 对 WebSocket 服务的首次握手的响应等相关指标进行监测。
en-US: HertzBeat monitors metrics such as the response of the WebSocket service's first handshake.
zh-TW: HertzBeat對WebSocket服務的首次握手的響應等相關名額進行監測。
# 监控所需输入参数定义(根据定义渲染页面UI)
# Input params define for monitoring(render web ui by the definition)
params:
# field-param field key
# field-字段名称标识符
- field: host
# name-param field display i18n name
# name-参数字段显示名称
name:
zh-CN: WebSocket服务的Host
en-US: Host of WebSocket service
# type-param field type(most mapping the html input type)
# type-字段类型,样式(大部分映射input标签type属性)
type: host
# required-true or false
# 是否是必输项 true-必填 false-可选
required: true
# field-param field key
# field-字段名称标识符
- field: port
# name-param field display i18n name
# name-参数字段显示名称
name:
zh-CN: 端口
en-US: Port
# type-param field type(most mapping the html input type)
# type-字段类型,样式(大部分映射input标签type属性)
type: number
# when type is number, range is required
# 当type为number时,用range表示范围
range: '[0,65535]'
# required-true or false
# 是否是必输项 true-必填 false-可选
required: true
# collect metrics config list
# 采集指标配置列表
metrics:
# metrics - summary
# 监控指标 - summary
- name: summary
i18n:
zh-CN: 概要
en-US: Summary
# metrics scheduling priority(0->127)->(high->low), metrics with the same priority will be scheduled in parallel
# priority 0's metrics is availability metrics, it will be scheduled first, only availability metrics collect success will the scheduling continue
# 指标采集调度优先级(0->127)->(优先级高->低) 优先级低的指标会等优先级高的指标采集完成后才会被调度, 相同优先级的指标会并行调度采集
# 优先级为0的指标为可用性指标,即它会被首先调度,采集成功才会继续调度其它指标,采集失败则中断调度
priority: 0
# 指标信息 包括 field名称 type字段类型:0-number数字,1-string字符串 label-是否是指标标签字段 unit:指标单位
# field-metric name, type-metric type(0-number,1-string), unit-metric unit('%','ms','MB'), label-whether it is a metrics label field
# field-指标名称, type-指标类型(0-number数字,1-string字符串), unit-指标单位('%','ms','MB'), label-是否是指标标签字段
fields:
- field: responseTime
type: 0
unit: ms
i18n:
zh-CN: 响应时间
en-US: ResponseTime
- field: httpVersion
type: 1
i18n:
zh-CN: http版本
en-US: httpVersion
- field: responseCode
type: 1
i18n:
zh-CN: 响应状态码
en-US: responseCode
- field: statusMessage
type: 1
i18n:
zh-CN: 状态消息
en-US: statusMessage
- field: connection
type: 1
i18n:
zh-CN: 是否支持升级(不存在表示是普通http请求,存在表示支持升级)
en-US: connection
- field: upgrade
type: 1
i18n:
zh-CN: 升级(协议)
en-US: upgrade

# the protocol used for monitoring, eg: sql, ssh, http, telnet, wmi, snmp, sdk
# 采集协议, 目前支持sql, ssh, http, telnet, wmi, snmp, sdk
protocol: websocket
# Specific collection configuration when protocol is telnet protocol
# 当protocol为telnet协议时具体的采集配置
websocket:
# telnet host
# 远程登录主机
host: ^_^host^_^
port: ^_^port^_^

0 comments on commit c840aeb

Please sign in to comment.