Skip to content

Commit

Permalink
Add SSH host cert support
Browse files Browse the repository at this point in the history
  • Loading branch information
elonen committed Jul 28, 2024
1 parent dd14ef2 commit 03174d4
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 67 deletions.
12 changes: 7 additions & 5 deletions .github/workflows/python-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,22 @@ on:
jobs:
test:
runs-on: ubuntu-latest

env:
TERM: xterm

steps:
- uses: actions/checkout@v2

- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: '3.12'

- name: Install system dependencies
run: |
sudo apt-get update
sudo apt-get install -y make openssh-client openssl libpcsclite-dev
sudo apt-get install -y make openssh-client openssl libpcsclite-dev expect
- name: Run tests
run: |
make test
2 changes: 1 addition & 1 deletion hsm_secrets/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def cli(ctx: click.Context, config: str|None, quiet: bool, yklabel: str|None, hs

# Get first Yubikey HSM auth key label from device if not specified
yubikey_label = yklabel
if not yubikey_label:
if not yubikey_label and not (auth_default_admin or auth_password_id or mock):
creds = list_yubikey_hsm_creds()
if not creds:
if not (quiet or auth_default_admin or auth_password_id):
Expand Down
112 changes: 88 additions & 24 deletions hsm_secrets/ssh/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from hsm_secrets.config import HSMAsymmetricKey, HSMConfig
from hsm_secrets.utils import HsmSecretsCtx, cli_code_info, cli_result, cli_warn, open_hsm_session, pass_common_args
from cryptography.hazmat.primitives import _serialization
from cryptography.hazmat.primitives.serialization import ssh

import yubihsm.defs # type: ignore [import]
from yubihsm.objects import AsymmetricKey # type: ignore [import]
Expand Down Expand Up @@ -44,30 +45,87 @@ def get_ca(ctx: HsmSecretsCtx, get_all: bool, cert_ids: Sequence[str]):
cli_result(f"{pubkey} {key.label}")


@cmd_ssh.command('sign')

@cmd_ssh.command('sign-user')
@click.option('--out', '-o', type=click.Path(exists=False, dir_okay=False, resolve_path=True, allow_dash=True), help="Output file (default: deduce from input)", default=None)
@click.option('--ca', '-c', required=False, help="CA key ID (hex) or label to sign with. Default: read from config", default=None)
@click.option('--username', '-u', required=False, help="Key owner's name (for auditing)", default=None)
@click.option('--certid', '-n', required=False, help="Explicit certificate ID (default: auto-generated)", default=None)
@click.option('--validity', '-t', required=False, default=365*24*60*60, help="Validity period in seconds (default: 1 year)")
@click.option('--principals', '-p', required=False, help="Comma-separated list of principals", default='')
@click.option('--extentions', '-e', help="Comma-separated list of SSH extensions", default='permit-X11-forwarding,permit-agent-forwarding,permit-port-forwarding,permit-pty,permit-user-rc')
@click.option('--extensions', '-e', help="Comma-separated list of SSH extensions", default='permit-X11-forwarding,permit-agent-forwarding,permit-port-forwarding,permit-pty,permit-user-rc')
@click.argument('keyfile', type=click.Path(exists=False, dir_okay=False, resolve_path=True, allow_dash=True), default='-')
@pass_common_args
def sign_ssh_key(ctx: HsmSecretsCtx, out: str, ca: str|None, username: str|None, certid: str|None, validity: int, principals: str, extentions: str, keyfile: str):
def sign_ssh_user_key(ctx: HsmSecretsCtx, out: str, ca: str|None, username: str|None, certid: str|None, validity: int, principals: str, extensions: str, keyfile: str):
"""Make and sign an SSH user certificate
[keyfile]: file containing the public key to sign (default: stdin)
If --ca is not specified, the default CA key is used (as specified in the config file).
Either --username or explicit --certid must be specified. If --certid is not specified,
a certificate ID is auto-generated key owner name, current time and principal list.
a certificate ID is auto-generated using the key owner name, current time, and principal list.
Unique and clear certificate IDs are important for auditing and revocation.
Output file is deduced from input file if not specified with --out (or '-' for stdout).
For example, 'id_rsa.pub' will be signed to 'id_rsa-cert.pub'.
"""
if (not username and not certid) or (username and certid):
raise click.ClickException("Either --username or --certid must be specified, but not both")
timestamp = int(time.time())
certid = certid or (f"{username}-{timestamp}-{'+'.join(principals.split(','))}").strip().lower().replace(' ', '_')
_sign_ssh_key(ctx, out, ca, certid, validity, principals, extensions, keyfile, ssh.SSHCertificateType.USER, timestamp)


@cmd_ssh.command('sign-host')
@click.option('--out', '-o', type=click.Path(exists=False, dir_okay=False, resolve_path=True, allow_dash=True), help="Output file (default: deduce from input)", default=None)
@click.option('--ca', '-c', required=False, help="CA key ID (hex) or label to sign with. Default: read from config", default=None)
@click.option('--hostname', '-H', required=True, help="Primary hostname of the server")
@click.option('--validity', '-t', required=False, default=365*24*60*60, help="Validity period in seconds (default: 1 year)")
@click.option('--principals', '-p', required=False, help="Comma-separated list of additional hostnames, IP addresses, or wildcards this certificate is valid for", default=None)
@click.argument('keyfile', type=click.Path(exists=False, dir_okay=False, resolve_path=True, allow_dash=True), default='-')
@pass_common_args
def sign_ssh_host_key(ctx: HsmSecretsCtx, out: str, ca: str|None, hostname: str, validity: int, principals: str|None, keyfile: str):
"""Make and sign an SSH host certificate
[keyfile]: file containing the public key to sign (default: stdin)
If --ca is not specified, the default CA key is used (as specified in the config file).
The --hostname is used as the primary principal and is included in the certificate ID.
--principals can be used to specify additional hostnames, IP addresses, or wildcards that this certificate is valid for.
This is useful for servers with multiple names, IP addresses, or for covering entire subdomains or services.
Wildcards are supported in principals and can be used as prefixes or suffixes. For example:
- wiki.* would match any hostname starting with "wiki."
- *.example.com would match any subdomain of example.com
- 192.168.1.* would match any IP in the 192.168.1.0/24 subnet
Output file is deduced from input file if not specified with --out (or '-' for stdout).
For example, 'ssh_host_rsa_key.pub' will be signed to 'ssh_host_rsa_key-cert.pub'.
Example usage:
sign-host --hostname wiki.example.com --principals "wiki.*,*.example.com,10.0.0.*" ssh_host_rsa_key.pub
"""
principal_list = [hostname]
if principals:
principal_list.extend([p.strip() for p in principals.split(',') if p.strip() != hostname])
principals_str = ','.join(principal_list)

timestamp = int(time.time())
certid = f"host-{hostname}-{timestamp}+{len(principal_list)-1}-principals".strip().lower().replace(' ', '_')

cli_code_info(f"Creating host certificate for {hostname} with certid: {certid}")
cli_code_info(f"Principals: {principals_str}")

timestamp = int(time.time())
_sign_ssh_key(ctx, out, ca, certid, validity, principals_str, '', keyfile, ssh.SSHCertificateType.HOST, timestamp)



def _sign_ssh_key(ctx: HsmSecretsCtx, out: str, ca: str|None, certid: str, validity: int, principals: str, extensions: str, keyfile: str, cert_type: ssh.SSHCertificateType, timestamp: int):
from hsm_secrets.ssh.openssh.signing import sign_ssh_cert
from hsm_secrets.ssh.openssh.ssh_certificate import cert_for_ssh_pub_id, str_to_extension
from hsm_secrets.key_adapters import make_private_key_adapter
Expand All @@ -79,11 +137,6 @@ def sign_ssh_key(ctx: HsmSecretsCtx, out: str, ca: str|None, username: str|None,
if not ca_def:
raise click.ClickException(f"CA key 0x{ca_key_def.id:04x} not found in config")

if not username and not certid:
raise click.ClickException("Either --username or --certid must be specified")
elif username and certid:
raise click.ClickException("Only one of --username or --certid must be specified")

# Read public key
key_str = ""
if keyfile == '-':
Expand All @@ -100,21 +153,22 @@ def sign_ssh_key(ctx: HsmSecretsCtx, out: str, ca: str|None, username: str|None,
if len(parts) >= 3:
key_comment = "__"+parts[2]

timestamp = int(time.time())
princ_list = [s.strip() for s in principals.split(',')] if principals else []
certid = certid or (f"{username}-{timestamp}-{'+'.join(principals.split(','))}").strip().lower().replace(' ', '_')
cli_code_info(f"Signing key with CA `{ca_def[0].label}` as cert ID `{certid}` with principals: `{princ_list}`")

cert_type_str = "user" if cert_type == ssh.SSHCertificateType.USER else "host"
cli_code_info(f"Signing {cert_type_str} key with CA `{ca_def[0].label}` as cert ID `{certid}` with principals: `{princ_list}`")

# Create certificate from public key
cert = cert_for_ssh_pub_id(
key_str,
certid,
valid_seconds = validity,
principals = princ_list,
serial = timestamp,
extensions = {str_to_extension(s.strip()): b'' for s in extentions.split(',')})
cert_type=cert_type,
valid_seconds=validity,
principals=princ_list,
serial=timestamp,
extensions={str_to_extension(s.strip()): b'' for s in extensions.split(',')} if cert_type == ssh.SSHCertificateType.USER else {})

# Figre out output file
# Figure out output file
out_fp = None
path = None
if out == '-' or (keyfile == '-' and not out):
Expand All @@ -138,10 +192,20 @@ def sign_ssh_key(ctx: HsmSecretsCtx, out: str, ca: str|None, username: str|None,
out_fp.close()
if str(path) != '-':
ca_pub_key = ca_priv_key.public_key().public_bytes(encoding=_serialization.Encoding.OpenSSH, format=_serialization.PublicFormat.OpenSSH)
cli_code_info(dedent(f"""
Certificate written to: {path}
- Send it to the user and ask them to put it in `~/.ssh/` along with the private key
- To view it, run: `ssh-keygen -L -f {path}`
- To allow access (adapt principals as neede), add this to your server authorized_keys file(s):
`cert-authority,principals="{','.join(cert.valid_principals)}" {ca_pub_key.decode()} HSM_{ca_def[0].label}`
""").strip())
if cert_type == ssh.SSHCertificateType.USER:
cli_code_info(dedent(f"""
User certificate written to: {path}
- Send it to the user and ask them to put it in `~/.ssh/` along with the private key
- To view it, run: `ssh-keygen -L -f {path}`
- To allow access (adapt principals as needed), add this to your server authorized_keys file(s):
`cert-authority,principals="{','.join(cert.valid_principals)}" {ca_pub_key.decode()} HSM_{ca_def[0].label}`
""").strip())
else:
cli_code_info(dedent(f"""
Host certificate written to: {path}
- Install it on the host machine, typically in `/etc/ssh/`
- Update the SSH server config to use this certificate (e.g., `HostCertificate /etc/ssh/ssh_host_rsa_key-cert.pub`)
- To view it, run: `ssh-keygen -L -f {path}`
- To trust this CA for host certificates, add this to your client's `~/.ssh/known_hosts` file:
`@cert-authority * {ca_pub_key.decode()} HSM_{ca_def[0].label}`
""").strip())
27 changes: 20 additions & 7 deletions hsm_secrets/ssh/openssh/ssh_certificate.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,27 +366,27 @@ def str_to_extension(label: str) -> ExtensionLabelType:

def cert_for_ssh_pub_id(
encoded_public_key: str, # in the format "<type> <base64_data> <name>"
cert_id: str, # E.g. "user.name-1234567-principal1+principal2"
cert_type: ssh.SSHCertificateType = ssh.SSHCertificateType.USER,
cert_id: str, # E.g. "user.name-1234567-principal1+principal2" or "host.example.com"
cert_type: ssh.SSHCertificateType,
nonce: Optional[bytes] = None, # Random nonce (32 bytes), if not provided, it is generated
serial: Optional[int] = None, # Serial number of the certificate. If None, current timestamp is used.
principals: List[str] = [],
valid_seconds: int = 60*60*24*31, # 31 days
critical_options: Dict[str, bytes] = {},
extensions: Dict[ExtensionLabelType, bytes] = {'permit-X11-forwarding': b'', 'permit-agent-forwarding': b'', 'permit-port-forwarding': b'', 'permit-pty': b'', 'permit-user-rc': b''},
extensions: Dict[ExtensionLabelType, bytes]|None = None,
) -> Union[RSACertificate, ECDSACertificate, ED25519Certificate, SKECDSACertificate, SKEd25519Certificate]:
"""
Build an SSH certificate object fom an encoded public key / ID (in the format "<type> <base64_data> <name>").
Build an SSH certificate object from an encoded public key / ID (in the format "<type> <base64_data> <name>").
:param encoded_public_key: Encoded public key data.
:param cert_type: The certificate type (USER or HOST).
:param cert_id: The certificate ID (e.g. "user.name-1234567-principal1+principal2").
:param cert_id: The certificate ID (e.g. "user.name-1234567-principal1+principal2" or "host.example.com").
:param nonce: Random nonce (32 bytes). Generated if not provided.
:param serial: Serial number of the certificate. If None, current timestamp is used.
:param principals: List of principals that the certificate is valid for.
:param valid_seconds: Number of seconds the certificate is valid for (starting from now -1 min).
:param critical_options: Dictionary of critical options.
:param extensions: Dictionary of extensions.
:param extensions: Dictionary of extensions. Use None for default extensions.
:return: The SSH certificate object.
"""
parts = encoded_public_key.strip().split(' ')
Expand Down Expand Up @@ -416,7 +416,20 @@ def cert_for_ssh_pub_id(
cert.valid_after = int(datetime.datetime.now().timestamp() - 60) # 1 minute ago, to allow for clock skew
cert.valid_before = cert.valid_after + valid_seconds
cert.critical_options = critical_options
cert.extensions = {str(k): v for k, v in extensions.items()}

# Set default extensions based on certificate type
if cert_type == ssh.SSHCertificateType.USER:
default_extensions = {
'permit-X11-forwarding': b'',
'permit-agent-forwarding': b'',
'permit-port-forwarding': b'',
'permit-pty': b'',
'permit-user-rc': b''
}
else: # HOST certificate
default_extensions = {}

cert.extensions = {k: v for k, v in (extensions.items() if extensions is not None else default_extensions.items())}

cert.decode_public_key(key_data)
assert isinstance(cert, (RSACertificate, ECDSACertificate, ED25519Certificate, SKECDSACertificate, SKEd25519Certificate))
Expand Down
44 changes: 32 additions & 12 deletions hsm_secrets/ssh/openssh/ssh_certificate_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,17 +79,27 @@ def parsecert(args: argparse.Namespace) -> None:
def parsepub(args: argparse.Namespace) -> None:
file_contents = read_file_str(args.pub_file)

cert = cert_for_ssh_pub_id(
# Create a user certificate
user_cert = cert_for_ssh_pub_id(
file_contents,
cert_id = args.pub_file,
cert_id = "[email protected]",
cert_type = ssh.SSHCertificateType.USER,
principals=["basic_users", "admins"])

print("Parsed public key into a certificate:")
print_certificate_details(cert)
print("Parsed public key into a user certificate:")
print_certificate_details(user_cert)

# Create a host certificate
host_cert = cert_for_ssh_pub_id(
file_contents,
cert_id = "host.example.com",
cert_type = ssh.SSHCertificateType.HOST,
principals=["host.example.com", "*.example.com"])

print("\nParsed public key into a host certificate:")
print_certificate_details(host_cert)

print("")
print("Testing signing & verification with different issuers:")
print("\nTesting signing & verification with different issuers:")

issuers = [
ed25519.Ed25519PrivateKey.generate(),
Expand All @@ -98,13 +108,23 @@ def parsepub(args: argparse.Namespace) -> None:
]
for ca in issuers:
assert isinstance(ca, (rsa.RSAPrivateKey, ed25519.Ed25519PrivateKey, ec.EllipticCurvePrivateKey))
sign_ssh_cert(cert, ca)
print(f" - Signed ok with {ca.__class__.__name__}")
#print_certificate_details(cert)
if verify_ssh_cert(cert):
print(f" - Verified OK")

# Sign and verify user certificate
sign_ssh_cert(user_cert, ca)
print(f" - Signed user certificate ok with {ca.__class__.__name__}")
if verify_ssh_cert(user_cert):
print(f" - User certificate verified OK")
else:
print(f" - User certificate verification FAILED!")

# Sign and verify host certificate
sign_ssh_cert(host_cert, ca)
print(f" - Signed host certificate ok with {ca.__class__.__name__}")
if verify_ssh_cert(host_cert):
print(f" - Host certificate verified OK")
else:
print(f" - Verification FAILED!")
print(f" - Host certificate verification FAILED!")


def checksig(args: argparse.Namespace) -> None:
cert_contents = read_file_str(args.cert_file)
Expand Down
Loading

0 comments on commit 03174d4

Please sign in to comment.