diff --git a/CHANGELOG.md b/CHANGELOG.md index 5d0e494e5e..1f4d00afe4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ ## 更新日志 -- v1.4.6(2023-06-23) +- v1.4.7(2023-06-23) + - 优化 获取证书支持:IP证书、多域名证书、通配符证书 + - 优化 移除未使用的域名信息缓存表`cache_domain_info` + +- v1.4.6(2023-06-22) - 新增 通知方式增加备注字段 - 新增 通知方式新增钉钉、飞书渠道 diff --git a/domain_admin/api/domain_api.py b/domain_admin/api/domain_api.py index 988df678f7..1bd37864b7 100644 --- a/domain_admin/api/domain_api.py +++ b/domain_admin/api/domain_api.py @@ -50,20 +50,22 @@ def add_domain(): domain_service.update_domain_row(row) # 顺带添加到域名监测列表 - first_domain_info_row = DomainInfoModel.select( - DomainInfoModel.id - ).where( - DomainInfoModel.domain == data['root_domain'], - DomainInfoModel.user_id == current_user_id - ).first() - - if not first_domain_info_row: - domain_info_service.add_domain_info( - domain=domain_util.get_root_domain(domain), - comment=alias, - group_id=group_id, - user_id=current_user_id, - ) + if not domain_util.is_ipv4(domain): + + first_domain_info_row = DomainInfoModel.select( + DomainInfoModel.id + ).where( + DomainInfoModel.domain == data['root_domain'], + DomainInfoModel.user_id == current_user_id + ).first() + + if not first_domain_info_row: + domain_info_service.add_domain_info( + domain=domain_util.get_root_domain(domain), + comment=alias, + group_id=group_id, + user_id=current_user_id, + ) return {'id': row.id} diff --git a/domain_admin/log.py b/domain_admin/log.py index 7d7bb93328..c2dcc09d8d 100644 --- a/domain_admin/log.py +++ b/domain_admin/log.py @@ -7,7 +7,11 @@ logger = logging.getLogger('domain-admin') # 单个日志文件最大为1M -handler = RotatingFileHandler(resolve_log_file("domain-admin.log"), maxBytes=1024 * 1024 * 1, encoding='utf-8') +handler = RotatingFileHandler( + filename=resolve_log_file("domain-admin.log"), + maxBytes=1024 * 1024 * 1, + encoding='utf-8' +) # 设置日志格式 formatter = logging.Formatter( diff --git a/domain_admin/model/cache_domain_info_model.py b/domain_admin/model/cache_domain_info_model.py deleted file mode 100644 index 44fb174916..0000000000 --- a/domain_admin/model/cache_domain_info_model.py +++ /dev/null @@ -1,53 +0,0 @@ -# -*- coding: utf-8 -*- -from datetime import datetime - -from peewee import CharField, IntegerField, DateTimeField, AutoField - -from domain_admin.model.base_model import BaseModel -from domain_admin.utils import time_util - - -class CacheDomainInfoModel(BaseModel): - """ - 域名信息缓存表 - @since 1.2.12 - @Deprecated @since 1.4.0 - """ - id = AutoField(primary_key=True) - - # 域名 - domain = CharField(unique=True) - - # 域名注册时间 - domain_start_time = DateTimeField(default=None, null=True) - - # 域名过期时间 - domain_expire_time = DateTimeField(default=None, null=True) - - # 缓存过期时间 - expire_time = DateTimeField(default=None, null=True) - - # 创建时间 - create_time = DateTimeField(default=datetime.now) - - # 更新时间 - update_time = DateTimeField(default=datetime.now) - - class Meta: - table_name = 'cache_domain_info' - - @property - def is_expired(self) -> [bool, None]: - """ - 过期时间 - :return: - """ - if self.expire_time: - return (self.expire_time - datetime.now()).seconds <= 0 - else: - return None - - @property - def domain_expire_days(self) -> int: - """域名过期天数""" - return time_util.get_diff_days(datetime.now(), self.domain_expire_time) diff --git a/domain_admin/model/database.py b/domain_admin/model/database.py index a5a8204f66..a74d218730 100644 --- a/domain_admin/model/database.py +++ b/domain_admin/model/database.py @@ -26,7 +26,7 @@ (group_model.GroupModel, None), (domain_model.DomainModel, None), (notify_model.NotifyModel, None), - (cache_domain_info_model.CacheDomainInfoModel, None), + # (cache_domain_info_model.CacheDomainInfoModel, None), (address_model.AddressModel, None), (domain_info_model.DomainInfoModel, None), ] diff --git a/domain_admin/service/cache_domain_info_service.py b/domain_admin/service/cache_domain_info_service.py deleted file mode 100644 index 59f00c7fdb..0000000000 --- a/domain_admin/service/cache_domain_info_service.py +++ /dev/null @@ -1,46 +0,0 @@ -# -*- coding: utf-8 -*- -""" -@File : cache_domain_info_service.py -@Date : 2023-04-22 -""" -from datetime import datetime, timedelta - -from domain_admin.model.cache_domain_info_model import CacheDomainInfoModel -from domain_admin.utils import domain_util -from domain_admin.utils.whois_util import whois_util - - -def get_domain_info(domain: str) -> CacheDomainInfoModel: - """ - 加一个缓存获取域名信息 - :param domain: - :return: - """ - root_domain = domain_util.get_root_domain(domain) - - row = CacheDomainInfoModel.select().where( - CacheDomainInfoModel.domain == root_domain - ).get_or_none() - - # 不存在或者已过期,重新获取 - if not row or row.is_expired is True: - domain_whois = whois_util.get_domain_info(root_domain) - - if domain_whois is None: - raise Exception("域名信息获取失败") - - data = { - "domain": root_domain, - "domain_start_time": domain_whois['start_time'], - "domain_expire_time": domain_whois['expire_time'], - "expire_time": datetime.now() - timedelta(minutes=3) - } - - if not row: - row = CacheDomainInfoModel.create(**data) - else: - CacheDomainInfoModel.update(data).where( - CacheDomainInfoModel.id == row.id - ).execute() - - return row diff --git a/domain_admin/service/domain_info_service.py b/domain_admin/service/domain_info_service.py index 908ea5301c..a73f3d1bbc 100644 --- a/domain_admin/service/domain_info_service.py +++ b/domain_admin/service/domain_info_service.py @@ -2,6 +2,8 @@ """ domain_info_service.py """ +import random +import time import traceback from datetime import datetime from typing import List @@ -62,7 +64,12 @@ def update_domain_info_row(row: DomainInfoModel) -> [str, None]: try: domain_whois = whois_util.get_domain_info(row.domain) except Exception as e: - pass + # 增加容错 + try: + time.sleep(3) + domain_whois = whois_util.get_domain_info(row.domain) + except Exception as e: + pass update_row = DomainInfoModel() diff --git a/domain_admin/service/domain_service.py b/domain_admin/service/domain_service.py index 96159422f5..71d88dedf7 100644 --- a/domain_admin/service/domain_service.py +++ b/domain_admin/service/domain_service.py @@ -2,7 +2,6 @@ """ domain_service.py """ -import time import traceback import warnings from datetime import datetime @@ -15,84 +14,17 @@ from domain_admin.model.address_model import AddressModel from domain_admin.model.domain_model import DomainModel from domain_admin.model.group_model import GroupModel -from domain_admin.model.log_scheduler_model import LogSchedulerModel from domain_admin.model.user_model import UserModel -from domain_admin.service import email_service, render_service, global_data_service, cache_domain_info_service +from domain_admin.service import email_service, render_service from domain_admin.service import file_service from domain_admin.service import notify_service from domain_admin.service import system_service -from domain_admin.utils import datetime_util, cert_util, whois_util, file_util, time_util +from domain_admin.utils import datetime_util, cert_util, whois_util from domain_admin.utils import domain_util -from domain_admin.utils.cert_util import cert_common, cert_socket_v2 +from domain_admin.utils.cert_util import cert_common, cert_socket_v2, cert_openssl_v2 from domain_admin.utils.flask_ext.app_exception import AppException, ForbiddenAppException -def update_domain_info(domain_row: DomainModel): - """ - 更新域名信息 - :param row: - :return: - """ - # logger.info("%s", model_to_dict(domain_row)) - - # 获取域名信息 - domain_info = None - - err = '' - - try: - domain_info = cache_domain_info_service.get_domain_info(domain_row.domain) - except Exception as e: - err = e.__str__() - pass - - update_data = { - 'domain_start_time': None, - "domain_expire_time": None, - 'domain_expire_days': 0, - } - - if domain_info: - update_data = { - 'domain_start_time': domain_info.domain_start_time, - "domain_expire_time": domain_info.domain_expire_time, - 'domain_expire_days': domain_info.domain_expire_days, - } - - DomainModel.update( - **update_data, - domain_check_time=datetime_util.get_datetime(), - update_time=datetime_util.get_datetime(), - ).where( - DomainModel.id == domain_row.id - ).execute() - - return err - - -def update_ip_info(row: DomainModel): - """ - 更新ip信息 - :param row: - :return: - """ - # 获取ip地址 - domain_ip = '' - - try: - domain_ip = cert_common.get_domain_ip(row.domain) - except Exception as e: - pass - - DomainModel.update( - ip=domain_ip, - ip_check_time=datetime_util.get_datetime(), - update_time=datetime_util.get_datetime(), - ).where( - DomainModel.id == row.id - ).execute() - - def update_domain_host_list(domain_row: DomainModel): """ 更新ip信息 @@ -153,7 +85,7 @@ def update_address_row_info(address_row, domain_row): err = '' try: - cert_info = cert_socket_v2.get_ssl_cert_info( + cert_info = cert_openssl_v2.get_ssl_cert_by_openssl( domain=domain_row.domain, host=address_row.host, port=domain_row.port @@ -543,6 +475,8 @@ def send_domain_list_email(user_id, rows: List[DomainModel]): to_addresses=email_list, content_type='html' ) + + def check_permission_and_get_row(domain_id, user_id): """ 权限检查 diff --git a/domain_admin/utils/cert_util/cert_openssl_v2.py b/domain_admin/utils/cert_util/cert_openssl_v2.py index 8660700728..d11a24c88c 100644 --- a/domain_admin/utils/cert_util/cert_openssl_v2.py +++ b/domain_admin/utils/cert_util/cert_openssl_v2.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- """ -@File : demo.py +@File : cert_openssl_v2.py @Date : 2023-06-19 """ import socket @@ -8,15 +8,81 @@ import OpenSSL -from domain_admin.utils.cert_util import cert_common +from domain_admin.utils import domain_util, time_util -def get_ssl_cert_by_openssl(domain: str, host: str = None, port: int = 443, timeout: int = 3): +def get_certificate_san(x509cert): + """ + 获取SAN域名列表 + ref: https://cloud.tencent.com/developer/ask/sof/141600 + :param x509cert: + :return: + """ + dns_names = [] + + ext_count = x509cert.get_extension_count() + + for i in range(0, ext_count): + ext = x509cert.get_extension(i) + if 'subjectAltName' in str(ext.get_short_name()): + for item in str(ext).split(', '): + + if item.startswith('DNS:'): + key, value = item.split(':') + dns_names.append(value.strip()) + + return dns_names + + +def verify_cert(cert, domain): + """ + 验证证书和域名是否匹配 + :param cert: + :param domain: + :return: + """ + # 检查 颁发对象 域名(CN) 备用域名(SAN) + common_name = cert.get_subject().commonName + + dns_names = get_certificate_san(cert) + + if common_name not in dns_names: + dns_names.insert(0, common_name) + + print(domain) + print(dns_names) + + for dns_name in dns_names: + domain_checked = domain_util.verify_cert_common_name(dns_name, domain) + if domain_checked: + return True + + return False + + +def get_ssl_cert_by_openssl( + domain: str, + host: str = None, + port: int = 443, + timeout: int = 3): + """ + 不验证证书,仅验证域名 + 支持通配符 + :param domain: + :param host: + :param port: + :param timeout: + :return: + """ + # socket sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(timeout) sock.connect((host, port)) + # ssl ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ssl_context.verify_mode = ssl.CERT_NONE + ssl_context.check_hostname = False with ssl_context.wrap_socket(sock, server_hostname=domain) as wrap_socket: dercert = wrap_socket.getpeercert(True) @@ -24,13 +90,13 @@ def get_ssl_cert_by_openssl(domain: str, host: str = None, port: int = 443, time server_cert = ssl.DER_cert_to_PEM_cert(dercert) cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, server_cert.encode()) - return { - 'start_date': cert_common.parse_time(cert.get_notBefore().decode()), - 'expire_date': cert_common.parse_time(cert.get_notAfter().decode()), - } + # verify + domain_checked = verify_cert(cert, domain) + if not domain_checked: + raise Exception("domain not verified") -if __name__ == '__main__': - # get_ssl_cert_by_openssl('pgmanage.qnvip.com', '121.196.205.251') - # get_ssl_cert_by_openssl('cdn-image-01.kaishuleyuan.com', '101.96.145.100') - get_ssl_cert_by_openssl('dev.csdn.net', '120.46.209.149') + return { + 'start_date': time_util.parse_time(cert.get_notBefore().decode()), + 'expire_date': time_util.parse_time(cert.get_notAfter().decode()), + } \ No newline at end of file diff --git a/domain_admin/utils/cert_util/cert_socket_v2.py b/domain_admin/utils/cert_util/cert_socket_v2.py index 6a50095854..3fbf83cec0 100644 --- a/domain_admin/utils/cert_util/cert_socket_v2.py +++ b/domain_admin/utils/cert_util/cert_socket_v2.py @@ -13,7 +13,6 @@ from domain_admin.log import logger from domain_admin.utils import time_util -from domain_admin.utils.cert_util import cert_common def get_domain_host_list(domain: str, port: int = 80) -> typing.List[str]: @@ -71,7 +70,10 @@ def get_ssl_cert_info(domain: str, host: str = None, port: int = 443, timeout: i :param timeout: :return: """ - return resolve_cert(get_ssl_cert(domain, host, port, timeout)) + cert = get_ssl_cert(domain, host, port, timeout) + print(cert) + + return resolve_cert(cert) def resolve_cert(cert: typing.Dict): @@ -89,4 +91,7 @@ def resolve_cert(cert: typing.Dict): if __name__ == '__main__': - print(get_ssl_cert_info('dev.csdn.net', '120.46.209.149')) + # print(get_ssl_cert_info('www.taobao.com', '111.62.93.139')) + print(get_ssl_cert_info('38.60.47.102', '38.60.47.102')) + # print('www.baidu.com'.encode('idna')) # b'www.baidu.com' + # print('www.baidu.com'.encode('punycode')) # b'www.baidu.com-' diff --git a/domain_admin/utils/domain_util.py b/domain_admin/utils/domain_util.py index 8faaee2684..53137563d8 100644 --- a/domain_admin/utils/domain_util.py +++ b/domain_admin/utils/domain_util.py @@ -8,8 +8,10 @@ from typing import NamedTuple import tldextract +from tldextract.remote import looks_like_ip from tldextract.tldextract import ExtractResult +from domain_admin.log import logger from domain_admin.utils import file_util from domain_admin.utils.cert_util import cert_consts @@ -132,4 +134,51 @@ def get_root_domain(domain: str) -> str: :return: """ extract_result = extract_domain(domain) - return '.'.join([extract_result.domain, extract_result.suffix]) + return extract_result.registered_domain + # return '.'.join([extract_result.domain, extract_result.suffix]) + + +def is_ipv4(ip) -> bool: + """ + 检测一个字符串是否是ipv4地址 + :param ip: + :return: + """ + return looks_like_ip(ip) + + # if re.match("(\d+\.){3}\d+", ip): + # return True + # else: + # return False + + +def encode_hostname(hostname: str) -> str: + """ + 编码中文域名,英文域名原样返回 + :param hostname: 中文域名 + :return: + """ + return hostname.encode('idna').decode('ascii') + + +def verify_cert_common_name(common_name, domain): + """ + 验证证书 + :param common_name: + :param domain: + :return: + """ + logger.info("%s <=> %s", common_name, domain) + + if '*' in common_name: + # 通配符 SSL 证书 + common_name_root_domain = get_root_domain(common_name) + root_domain = get_root_domain(domain) + return common_name_root_domain == root_domain + else: + # 普通证书 + return common_name == domain + + +if __name__ == '__main__': + print(get_root_domain("*.juejin.cn")) diff --git a/domain_admin/utils/whois_util/whois_util.py b/domain_admin/utils/whois_util/whois_util.py index 32b92be6c0..c3b3d6dbcb 100644 --- a/domain_admin/utils/whois_util/whois_util.py +++ b/domain_admin/utils/whois_util/whois_util.py @@ -30,23 +30,29 @@ def resolve_domain(domain: str) -> str: :return: """ # 解析出域名和顶级后缀 - extract_result = domain_util.extract_domain(domain) - - root_domain = extract_result.domain - suffix = extract_result.suffix - - # 处理包含中文的域名 - if text_util.has_chinese(suffix): - pass - - elif text_util.has_chinese(root_domain): - chinese = text_util.extract_chinese(root_domain) - punycode = chinese.encode('punycode').decode() - root_domain = f"xn--{punycode}" - - domain_and_suffix = '.'.join([root_domain, suffix]) - - return domain_and_suffix + if domain_util.is_ipv4(domain): + return domain + else: + root_domain = domain_util.get_root_domain(domain) + return domain_util.encode_hostname(root_domain) + + # extract_result = domain_util.extract_domain(domain) + # + # root_domain = extract_result.domain + # suffix = extract_result.suffix + # + # # 处理包含中文的域名 + # if text_util.has_chinese(suffix): + # pass + # + # elif text_util.has_chinese(root_domain): + # chinese = text_util.extract_chinese(root_domain) + # punycode = chinese.encode('punycode').decode() + # root_domain = f"xn--{punycode}" + # + # domain_and_suffix = '.'.join([root_domain, suffix]) + # + # return domain_and_suffix def parse_time(time_str, time_format=None): @@ -73,14 +79,18 @@ def load_whois_servers_config(): config = {} + # 通用配置 for root, server in whois_servers.items(): - # 通用配置 server_config = deepcopy(DEFAULT_WHOIS_CONFIG) server_config['whois_server'] = server - config[root] = server_config + config[domain_util.encode_hostname(root)] = server_config + + # 自定义配置优先 + for key, value in CUSTOM_WHOIS_CONFIGS.items(): + config[domain_util.encode_hostname(key)] = value - # 合并配置自定义配置优先 - return {**config, **CUSTOM_WHOIS_CONFIGS} + # 合并配置 + return config def get_whois_config(domain: str) -> [str, None]: diff --git a/tests/utils/test_cert_util.py b/tests/utils/test_cert_util.py index 05bf009071..18109d42f5 100644 --- a/tests/utils/test_cert_util.py +++ b/tests/utils/test_cert_util.py @@ -7,7 +7,7 @@ import socket from domain_admin.utils import cert_util -from domain_admin.utils.cert_util import cert_socket_v2 +from domain_admin.utils.cert_util import cert_socket_v2, cert_openssl_v2 def test_get_cert_info(): @@ -19,8 +19,9 @@ def test_cert_socket_v2(): ret = cert_socket_v2.get_ssl_cert_info( # 'www.csdn.net', # '123.129.227.79', - 'cdn-image-01.kaishuleyuan.com', - '101.96.145.100' + # '38.60.47.102', + # '38.60.47.102' + 'juejin.cn', '223.111.193.232' ) print(ret) @@ -35,3 +36,22 @@ def test_getaddrinfo(): ret = socket.getaddrinfo('www.baidu.com', 443, proto=socket.IPPROTO_TCP) for item in ret: print(item) + + +def test_get_default_verify_paths(): + import ssl + print(ssl.get_default_verify_paths()) + + +def test_get_ssl_cert_by_openssl(): + lst = [ + # ('cdn-image-01.kaishuleyuan.com', '101.96.145.100'), + # ('www.tmall.com', '111.13.104.112'), + # ('juejin.cn', '223.111.193.232'), + # ('dev.csdn.net', '120.46.209.149'), + # ('38.60.47.102', '38.60.47.102'), + ('pgmanage.qnvip.com', '121.196.205.251'), + ] + + for domain, host in lst: + cert_openssl_v2.get_ssl_cert_by_openssl(domain, host) diff --git a/tests/utils/test_domain_util.py b/tests/utils/test_domain_util.py index 9fc17b5783..15101bf261 100644 --- a/tests/utils/test_domain_util.py +++ b/tests/utils/test_domain_util.py @@ -43,3 +43,23 @@ def test_parse_domain_from_txt_file(): lst = domain_util.parse_domain_from_txt_file(domain_filename) for index, row in enumerate(lst): assert row['domain'] == expect_domains[index] + + +def test_is_ipv4(): + assert domain_util.is_ipv4('38.60.47.102') == True + assert domain_util.is_ipv4('www.baidu.com') == False + + +def test_get_root_domain(): + print(domain_util.get_root_domain('38.60.47.102')) + print(domain_util.get_root_domain('www.baidu.com')) + print(domain_util.get_root_domain('www.baidu.com.cn')) + # assert domain_util.get_root_domain('www.baidu.com') == True + + +def test_encode_hostname(): + assert domain_util.encode_hostname('www.baidu.com') == 'www.baidu.com' + + assert domain_util.encode_hostname('baidu.中国') == 'baidu.xn--fiqs8s' + + assert domain_util.encode_hostname('百度.中国') == 'xn--wxtr44c.xn--fiqs8s' diff --git a/tests/utils/test_whois_util.py b/tests/utils/test_whois_util.py index e9624a55d0..6a136ef060 100644 --- a/tests/utils/test_whois_util.py +++ b/tests/utils/test_whois_util.py @@ -43,3 +43,9 @@ def test_get_domain_info(): for domain in domain_list: # print(parse_whois_raw(get_whois_raw(domain, ROOT_SERVER))) print(whois_util.get_domain_info(domain)) + + +def test_resolve_domain(): + assert whois_util.resolve_domain('www.baidu.com') == 'baidu.com' + assert whois_util.resolve_domain('www.baidu.中国') == 'baidu.xn--fiqs8s' + assert whois_util.resolve_domain('192.168.0.1') == '192.168.0.1'