From f9ff2bf730a843707c5d8dd42fe71203081dccb0 Mon Sep 17 00:00:00 2001 From: pengshiyu <1940607002@qq.com> Date: Tue, 30 Jul 2024 22:34:57 +0800 Subject: [PATCH] add zerossl --- domain_admin/main.py | 2 +- domain_admin/router/permission.py | 29 +++---- domain_admin/service/auth_service.py | 2 +- .../service/issue_certificate_service.py | 21 +++-- domain_admin/service/token_service.py | 4 +- domain_admin/utils/acme_util/acme_v2_api.py | 76 ++++++++++++++++--- domain_admin/utils/acme_util/key_type_enum.py | 24 +++--- 7 files changed, 111 insertions(+), 47 deletions(-) diff --git a/domain_admin/main.py b/domain_admin/main.py index 3a67401da3..51e91a938a 100644 --- a/domain_admin/main.py +++ b/domain_admin/main.py @@ -36,7 +36,7 @@ def before_request(): response.headers.set('Access-Control-Max-Age', 60 * 30) return response - permission.check_permission() + permission.parse_token() db.connect(reuse_if_open=True) diff --git a/domain_admin/router/permission.py b/domain_admin/router/permission.py index 05823fefaa..9c913ad1b1 100644 --- a/domain_admin/router/permission.py +++ b/domain_admin/router/permission.py @@ -3,6 +3,7 @@ from flask import request, g from domain_admin.config import ADMIN_USERNAME, TOKEN_KEY +from domain_admin.log import logger from domain_admin.model.user_model import UserModel from domain_admin.service import token_service from domain_admin.utils.flask_ext.app_exception import UnauthorizedAppException, ForbiddenAppException @@ -35,28 +36,28 @@ API_PREFIX = '/api' -def check_permission(): +def parse_token(): + logger.info('check_permission: %s', request.path) + # 仅校验api if not request.path.startswith(API_PREFIX): return # 白名单直接通过 - if request.path in WHITE_LIST: - return + # if request.path in WHITE_LIST: + # return # 获取token token = request.headers.get(TOKEN_KEY) - if not token: - raise UnauthorizedAppException() - - # 解析token,并全局挂载 - payload = token_service.decode_token(token) - g.user_id = payload['user_id'] + if token: + # 解析token,并全局挂载 + payload = token_service.decode_token(token) + g.user_id = payload['user_id'] # root 权限 api - if request.path in ADMIN_API_LIST: - user_row = UserModel.get_by_id(g.user_id) - - if user_row.username != ADMIN_USERNAME: - raise ForbiddenAppException() + # if request.path in ADMIN_API_LIST: + # user_row = UserModel.get_by_id(g.user_id) + # + # if user_row.username != ADMIN_USERNAME: + # raise ForbiddenAppException() diff --git a/domain_admin/service/auth_service.py b/domain_admin/service/auth_service.py index 84e5701cfa..676290b835 100644 --- a/domain_admin/service/auth_service.py +++ b/domain_admin/service/auth_service.py @@ -91,7 +91,7 @@ def wrapper(*args, **kwargs): if user_row.status != StatusEnum.Enabled: raise AppException('用户已禁用') - if has_role_permission(current_role=user_row.role, need_permission=role): + if not has_role_permission(current_role=user_row.role, need_permission=role): raise AppException('暂无权限') # 当前用户数据全局可用 diff --git a/domain_admin/service/issue_certificate_service.py b/domain_admin/service/issue_certificate_service.py index 5ab82d4df6..89f5bf032f 100644 --- a/domain_admin/service/issue_certificate_service.py +++ b/domain_admin/service/issue_certificate_service.py @@ -47,7 +47,7 @@ def issue_certificate( # Issue certificate # Create domain private key and CSR - pkey_pem, csr_pem = acme_v2_api.new_csr_comp(domains=domains, key_type=key_type) + pkey_pem, csr_pem = acme_v2_api.new_csr_comp(domains=domains) issue_certificate_row = IssueCertificateModel.create( user_id=user_id, @@ -76,10 +76,13 @@ def get_certificate_challenges(issue_certificate_id): pkey_pem, csr_pem = acme_v2_api.new_csr_comp( domains=domains, pkey_pem=pkey_pem, - key_type=issue_certificate_row.key_type ) + print('directory_type', issue_certificate_row.directory_type) - acme_client = acme_v2_api.get_acme_client(directory_type=issue_certificate_row.directory_type) + acme_client = acme_v2_api.get_acme_client( + directory_type=issue_certificate_row.directory_type, + key_type=issue_certificate_row.key_type + ) orderr = acme_client.new_order(csr_pem) # Select HTTP-01 within offered challenges by the CA server @@ -113,7 +116,10 @@ def verify_certificate(issue_certificate_id, challenge_type): issue_certificate_row = IssueCertificateModel.get_by_id(issue_certificate_id) items = get_certificate_challenges(issue_certificate_id) - acme_client = acme_v2_api.get_acme_client(directory_type=issue_certificate_row.directory_type) + acme_client = acme_v2_api.get_acme_client( + directory_type=issue_certificate_row.directory_type, + key_type=issue_certificate_row.key_type + ) verify_count = 0 for item in items: @@ -176,13 +182,15 @@ def renew_certificate(row_id): pkey_pem = issue_certificate_row.ssl_certificate_key domains = issue_certificate_row.domains - acme_client = acme_v2_api.get_acme_client(directory_type=issue_certificate_row.directory_type) + acme_client = acme_v2_api.get_acme_client( + directory_type=issue_certificate_row.directory_type, + key_type=issue_certificate_row.key_type + ) # Create domain private key and CSR pkey_pem, csr_pem = acme_v2_api.new_csr_comp( domains=domains, pkey_pem=pkey_pem, - key_type=issue_certificate_row.key_type ) orderr = acme_client.new_order(csr_pem) @@ -269,7 +277,6 @@ def renew_certificate_row(row): # 重新申请 pkey_pem, csr_pem = acme_v2_api.new_csr_comp( domains=row.domains, - key_type=row.key_type ) IssueCertificateModel.update( diff --git a/domain_admin/service/token_service.py b/domain_admin/service/token_service.py index 7b36e0565c..ebc1484a97 100644 --- a/domain_admin/service/token_service.py +++ b/domain_admin/service/token_service.py @@ -10,7 +10,7 @@ from domain_admin.config import SECRET_KEY, TOKEN_EXPIRE_DAYS from domain_admin.enums.config_key_enum import ConfigKeyEnum -from domain_admin.utils.flask_ext.app_exception import ForbiddenAppException +from domain_admin.utils.flask_ext.app_exception import ForbiddenAppException, AppException def encode_token(payload): @@ -48,7 +48,7 @@ def decode_token(token): try: return jwt.decode(jwt=token, key=secret_key, algorithms=['HS256']) except Exception: - raise ForbiddenAppException() + raise AppException('token无效') if __name__ == '__main__': diff --git a/domain_admin/utils/acme_util/acme_v2_api.py b/domain_admin/utils/acme_util/acme_v2_api.py index e64e2e959b..28467baea2 100644 --- a/domain_admin/utils/acme_util/acme_v2_api.py +++ b/domain_admin/utils/acme_util/acme_v2_api.py @@ -60,6 +60,7 @@ import OpenSSL import josepy as jose +import requests from acme import challenges, errors from acme import client from acme import crypto_util @@ -115,15 +116,13 @@ def get_account_data_filename(directory_type=DirectoryTypeEnum.LETS_ENCRYPT): # Useful methods and classes: -def new_csr_comp(domains, pkey_pem=None, key_type=KeyTypeEnum.RSA): +def new_csr_comp(domains, pkey_pem=None): """Create certificate signing request.""" if pkey_pem is None: # fix: type must be an integer - pkey_type = key_type_enum.get_key_type(key_type) or OpenSSL.crypto.TYPE_RSA - # Create private key. pkey = OpenSSL.crypto.PKey() - pkey.generate_key(type=pkey_type, bits=CERT_PKEY_BITS) + pkey.generate_key(type=OpenSSL.crypto.TYPE_RSA, bits=CERT_PKEY_BITS) pkey_pem = OpenSSL.crypto.dump_privatekey( OpenSSL.crypto.FILETYPE_PEM, pkey) @@ -253,7 +252,7 @@ def get_account_key(directory_type=DirectoryTypeEnum.LETS_ENCRYPT): with open(account_key_filename, 'wb') as f: f.write(pem) - return jose.JWKRSA(key=private_key) + return private_key def ensure_account_exists(client_acme, directory_type=DirectoryTypeEnum.LETS_ENCRYPT): @@ -282,18 +281,54 @@ def ensure_account_exists(client_acme, directory_type=DirectoryTypeEnum.LETS_ENC create_account(client_acme, directory_type) +def get_zerossl_eab(): + """ + :return: + eg: + { + "success": true, + "eab_kid": "xxx", + "eab_hmac_key": "yyy" + } + """ + url = 'https://api.zerossl.com/acme/eab-credentials-email' + res = requests.post( + url=url, + data={'email': "admin@domain-admin.com"} + ) + + return res.json() + + def create_account(client_acme, directory_type=DirectoryTypeEnum.LETS_ENCRYPT): account_data_filename = get_account_data_filename(directory_type) - register = client_acme.new_account(messages.NewRegistration.from_data( - terms_of_service_agreed=True) + # 参考 certbot + if client_acme.external_account_required(): + config = get_zerossl_eab() + print('config', config) + + eab = messages.ExternalAccountBinding.from_data( + account_public_key=client_acme.net.key.public_key(), + kid=config['eab_kid'], + hmac_key=config['eab_hmac_key'], + directory=client_acme.directory + ) + else: + eab = None + + new_account = messages.NewRegistration.from_data( + terms_of_service_agreed=True, + external_account_binding=eab ) + register = client_acme.new_account(new_account) + with open(account_data_filename, 'w') as f: f.write(json.dumps(register.to_json(), indent=2)) -def get_acme_client(directory_type=DirectoryTypeEnum.LETS_ENCRYPT): +def get_acme_client(directory_type=DirectoryTypeEnum.LETS_ENCRYPT, key_type=KeyTypeEnum.RSA): # default use letsencrypt directory_url if not directory_type: directory_type = DirectoryTypeEnum.LETS_ENCRYPT @@ -303,9 +338,26 @@ def get_acme_client(directory_type=DirectoryTypeEnum.LETS_ENCRYPT): raise AppException("not found directory_url") # Register account and accept TOS - account_key = get_account_key(directory_type) + private_key = get_account_key(directory_type) + + if key_type == KeyTypeEnum.EC: + account_key = jose.JWKEC(key=private_key) + public_key = account_key.key + if public_key.key_size == 256: + alg = jose.ES256 + elif public_key.key_size == 384: + alg = jose.ES384 + elif public_key.key_size == 521: + alg = jose.ES512 + else: + raise errors.NotSupportedError( + "No matching signing algorithm can be found for the key" + ) + else: + alg = jose.RS256 + account_key = jose.JWKRSA(key=private_key) - net = client.ClientNetwork(account_key, user_agent=USER_AGENT) + net = client.ClientNetwork(account_key, alg=alg, user_agent=USER_AGENT) directory = client.ClientV2.get_directory(url=directory_url, net=net) client_acme = client.ClientV2(directory=directory, net=net) @@ -313,3 +365,7 @@ def get_acme_client(directory_type=DirectoryTypeEnum.LETS_ENCRYPT): ensure_account_exists(client_acme, directory_type) return client_acme + + +if __name__ == '__main__': + print(get_zerossl_eab()) diff --git a/domain_admin/utils/acme_util/key_type_enum.py b/domain_admin/utils/acme_util/key_type_enum.py index 2ee942eb5b..f1576c202b 100644 --- a/domain_admin/utils/acme_util/key_type_enum.py +++ b/domain_admin/utils/acme_util/key_type_enum.py @@ -11,9 +11,9 @@ class KeyTypeEnum(object): 加密方式 """ RSA = "RSA" - DSA = "DSA" + # DSA = "DSA" EC = "EC" - DH = "DH" + # DH = "DH" KEY_TYPE_OPTIONS = [ @@ -22,21 +22,21 @@ class KeyTypeEnum(object): 'value': KeyTypeEnum.RSA, 'type': OpenSSL.crypto.TYPE_RSA }, - { - 'label': "DSA", - 'value': KeyTypeEnum.DSA, - 'type': OpenSSL.crypto.TYPE_DSA - }, + # { + # 'label': "DSA", + # 'value': KeyTypeEnum.DSA, + # 'type': OpenSSL.crypto.TYPE_DSA + # }, { 'label': "EC", 'value': KeyTypeEnum.EC, 'type': OpenSSL.crypto.TYPE_EC }, - { - 'label': "DH", - 'value': KeyTypeEnum.DH, - 'type': OpenSSL.crypto.TYPE_DH - }, + # { + # 'label': "DH", + # 'value': KeyTypeEnum.DH, + # 'type': OpenSSL.crypto.TYPE_DH + # }, ]