Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add kid as param for client constructor, split out auth challenge creation into its own method #201

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
171 changes: 122 additions & 49 deletions sewer/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def __init__(
bits: int = 2048,
digest: str = "sha256",
provider: ProviderBase = None,
kid: str = None,
ACME_REQUEST_TIMEOUT: int = 7,
ACME_AUTH_STATUS_WAIT_PERIOD: int = 8,
ACME_AUTH_STATUS_MAX_CHECKS: int = 3,
Expand Down Expand Up @@ -67,6 +68,13 @@ def __init__(
type(certificate_key)
)
)
elif not isinstance(kid, (type(None), str)):
raise ValueError(
"""kid should be of type:: None or str. You entered {0}.
More specifically, kid should be the "kid" value in the acme header from a previous rergister request""".format(
type(certificate_key)
)
)
elif LOG_LEVEL.upper() not in ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]:
raise ValueError(
"""LOG_LEVEL should be one of; 'DEBUG', 'INFO', 'WARNING', 'ERROR' or 'CRITICAL'. not {0}""".format(
Expand Down Expand Up @@ -116,7 +124,7 @@ def __init__(

# unique account identifier
# https://tools.ietf.org/html/draft-ietf-acme-acme#section-6.2
self.kid = None
self.kid = kid

self.certificate_key = certificate_key or self.create_certificate_key()
self.csr = self.create_csr()
Expand Down Expand Up @@ -144,7 +152,9 @@ def __init__(
### FIX ME ### [:100] is bandaid to reduce spew during tests

except Exception as e:
self.logger.error("Unable to intialise Client. error={0}".format(str(e)[:100]))
self.logger.error(
"Unable to intialise Client. error={0}".format(str(e)[:100])
)
raise e

def GET(self, url: str) -> requests.Response:
Expand All @@ -167,7 +177,12 @@ def POST(
return self._request("POST", url, data=data, headers=headers)

def _request(
self, method: str, url: str, *, data: bytes = None, headers: Dict[str, str] = None
self,
method: str,
url: str,
*,
data: bytes = None,
headers: Dict[str, str] = None
) -> requests.Response:
"""
shared implementation for GET, POST and HEAD
Expand All @@ -182,7 +197,9 @@ def _request(
if "UserAgent" not in headers:
headers["UserAgent"] = self.User_Agent

kwargs = {"timeout": self.ACME_REQUEST_TIMEOUT} # type: Dict[str, Union[str, int]]
kwargs = {
"timeout": self.ACME_REQUEST_TIMEOUT
} # type: Dict[str, Union[str, int]]

### FIX ME ### can get current bogus cert from pebble, figure out how to use it here?

Expand Down Expand Up @@ -218,7 +235,9 @@ def get_acme_endpoints(self):
self.logger.debug("get_acme_endpoints")
get_acme_endpoints = self.GET(self.ACME_DIRECTORY_URL)
self.logger.debug(
"get_acme_endpoints_response. status_code={0}".format(get_acme_endpoints.status_code)
"get_acme_endpoints_response. status_code={0}".format(
get_acme_endpoints.status_code
)
)
if get_acme_endpoints.status_code not in [200, 201]:
raise ValueError(
Expand Down Expand Up @@ -274,7 +293,9 @@ def create_csr(self):
X509Req.set_pubkey(pk)
X509Req.set_version(2)
X509Req.sign(pk, self.digest)
return OpenSSL.crypto.dump_certificate_request(OpenSSL.crypto.FILETYPE_ASN1, X509Req)
return OpenSSL.crypto.dump_certificate_request(
OpenSSL.crypto.FILETYPE_ASN1, X509Req
)

def acme_register(self):
"""
Expand Down Expand Up @@ -377,6 +398,36 @@ def apply_for_cert_issuance(self):
self.logger.info("apply_for_cert_issuance_success")
return authorizations, finalize_url

def get_identifier_auth_challenge(
self, domain, auth_url, wildcard, response_challenges
):
allowed_challenge_types = [c["type"] for c in response_challenges]

# left most chal_type wins
for chal_type in self.provider.chal_types:
try:
i = allowed_challenge_types.index(chal_type)
challenge = response_challenges[i]
challenge_token = challenge["token"]
challenge_url = challenge["url"]

return {
"domain": domain,
"url": auth_url,
"wildcard": wildcard,
"token": challenge_token,
"challenge_url": challenge_url,
}

except ValueError:
self.logger.debug(
"get_identifier_authorization_response. {chal_type} not in response challenges list".format(
chal_type=chal_type
)
)

return None

def get_identifier_authorization(self, auth_url: str) -> Dict[str, str]:
"""
https://tools.ietf.org/html/draft-ietf-acme-acme#section-7.5
Expand Down Expand Up @@ -405,33 +456,36 @@ def get_identifier_authorization(self, auth_url: str) -> Dict[str, str]:
response_json = response.json()
domain = response_json["identifier"]["value"]
wildcard = response_json.get("wildcard")
identifier_auth = self.get_identifier_auth_challenge(
domain, auth_url, wildcard, response_json["challenges"]
)

for i in response_json["challenges"]:
if i["type"] in self.provider.chal_types:
challenge = i
challenge_token = challenge["token"]
challenge_url = challenge["url"]

identifier_auth = {
"domain": domain,
"url": auth_url,
"wildcard": wildcard,
"token": challenge_token,
"challenge_url": challenge_url,
}
if not identifier_auth:
raise ValueError(
"Error getting authorization: provider challenge types {} not in response".format(
self.provider.chal_types
)
)

self.logger.debug(
"get_identifier_authorization_success. identifier_auth={0}".format(identifier_auth)
"get_identifier_authorization_success. identifier_auth={0}".format(
identifier_auth
)
)
self.logger.info(
"get_identifier_authorization got %s, token=%s" % (challenge_url, challenge_token)
"get_identifier_authorization got %s, token=%s"
% (identifier_auth["challenge_url"], identifier_auth["token"])
)
return identifier_auth

def get_keyauthorization(self, token):
self.logger.debug("get_keyauthorization")
acme_header_jwk_json = json.dumps(self.get_jwk(), sort_keys=True, separators=(",", ":"))
acme_thumbprint = safe_base64(sha256(acme_header_jwk_json.encode("utf8")).digest())
acme_header_jwk_json = json.dumps(
self.get_jwk(), sort_keys=True, separators=(",", ":")
)
acme_thumbprint = safe_base64(
sha256(acme_header_jwk_json.encode("utf8")).digest()
)
acme_keyauthorization = "{0}.{1}".format(token, acme_thumbprint)

return acme_keyauthorization
Expand Down Expand Up @@ -500,7 +554,9 @@ def respond_to_challenge(self, acme_keyauthorization, challenge_url):
"respond_to_challenge for %s at %s" % (acme_keyauthorization, challenge_url)
)
payload = json.dumps({"keyAuthorization": "{0}".format(acme_keyauthorization)})
respond_to_challenge_response = self.make_signed_acme_request(challenge_url, payload)
respond_to_challenge_response = self.make_signed_acme_request(
challenge_url, payload
)
self.logger.debug(
"respond_to_challenge_response. status_code={0}. response={1}".format(
respond_to_challenge_response.status_code,
Expand Down Expand Up @@ -569,7 +625,9 @@ def download_certificate(self, certificate_url: str) -> str:

def sign_message(self, message):
self.logger.debug("sign_message")
pk = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, self.account_key.encode())
pk = OpenSSL.crypto.load_privatekey(
OpenSSL.crypto.FILETYPE_PEM, self.account_key.encode()
)
return OpenSSL.crypto.sign(pk, message.encode("utf8"), self.digest)

def get_nonce(self):
Expand Down Expand Up @@ -630,7 +688,9 @@ def make_signed_acme_request(self, url, payload, needs_jwk=False):
payload64 = safe_base64(payload)
protected = self.get_acme_header(url, needs_jwk)
protected64 = safe_base64(json.dumps(protected))
signature = self.sign_message(message="{0}.{1}".format(protected64, payload64)) # bytes
signature = self.sign_message(
message="{0}.{1}".format(protected64, payload64)
) # bytes
signature64 = safe_base64(signature) # str
data = json.dumps(
{"protected": protected64, "payload": payload64, "signature": signature64}
Expand All @@ -639,34 +699,44 @@ def make_signed_acme_request(self, url, payload, needs_jwk=False):
response = self.POST(url, data=data.encode("utf8"), headers=headers)
return response

def create_authorization_challenges(self):
challenges = []

self.acme_register()
authorizations, finalize_url = self.apply_for_cert_issuance()

for auth_url in authorizations:
identifier_auth = self.get_identifier_authorization(auth_url)
token = identifier_auth["token"]
challenge = {
"ident_value": identifier_auth["domain"],
"token": token,
"key_auth": self.get_keyauthorization(
token
), # responder acme_keyauth..
"wildcard": identifier_auth["wildcard"],
"auth_url": auth_url, # responder auth.._url
"chal_url": identifier_auth["challenge_url"], # responder challenge_url
}
challenges.append(challenge)

# any errors in setup are fatal (here - they are all necessary for same cert)
failures = self.provider.setup(challenges)
if failures:
raise RuntimeError(
"get_certificate: challenge setup failed for %s" % failures
)

return challenges, finalize_url

def get_certificate(self):
self.logger.debug("get_certificate")
challenges = []

try:
self.acme_register()
authorizations, finalize_url = self.apply_for_cert_issuance()

for auth_url in authorizations:
identifier_auth = self.get_identifier_authorization(auth_url)
token = identifier_auth["token"]
challenge = {
"ident_value": identifier_auth["domain"],
"token": token,
"key_auth": self.get_keyauthorization(token), # responder acme_keyauth..
"wildcard": identifier_auth["wildcard"],
"auth_url": auth_url, # responder auth.._url
"chal_url": identifier_auth["challenge_url"], # responder challenge_url
}
challenges.append(challenge)

# any errors in setup are fatal (here - they are all necessary for same cert)
failures = self.provider.setup(challenges)
if failures:
raise RuntimeError("get_certificate: challenge setup failed for %s" % failures)
challenges, finalize_url = self.create_authorization_challenges()

### FIX ME ### should abort cert and try to clear on error

error, errata_list = self.propagation_delay(challenges)

# for a case where you want certificates for *.example.com and example.com
Expand Down Expand Up @@ -695,7 +765,9 @@ def get_certificate(self):
### FIX ME ### [:100] is a bandaid to reduce spew during tests

except Exception as e:
self.logger.error("Error: Unable to issue certificate. error={0}".format(str(e)[:100]))
self.logger.error(
"Error: Unable to issue certificate. error={0}".format(str(e)[:100])
)
raise e
finally:
# best-effort attempt to clear challenges
Expand Down Expand Up @@ -753,7 +825,8 @@ def propagation_delay(self, challenges: ChalListType) -> Tuple[str, ErrataListTy
### FIX ME ### might be good for mock tests, but really should try to clear, eh?
# return ("timeout", unready)
raise RuntimeError(
"propagation_delay: time out after %s probes: %s" % (num_checks, unready)
"propagation_delay: time out after %s probes: %s"
% (num_checks, unready)
)

return ("", [])
Expand Down