Skip to content

Commit

Permalink
Merge pull request #107 from nats-io/nkeys-seed-connect
Browse files Browse the repository at this point in the history
Adds option for nkeys auth
  • Loading branch information
wallyqs authored May 31, 2019
2 parents 1acca35 + f58d33d commit 7ad8e54
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 10 deletions.
60 changes: 55 additions & 5 deletions nats/aio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from nats.aio.nuid import NUID
from nats.protocol.parser import *

__version__ = '0.9.0'
__version__ = '0.9.2'
__lang__ = 'python3'
PROTOCOL = 1

Expand Down Expand Up @@ -195,6 +195,10 @@ def __init__(self):
# user credentials file can be a tuple or single file.
self._user_credentials = None

# file that contains the nkeys seed and its public key as a string.
self._nkeys_seed = None
self._public_nkey = None

self.options = {}
self.stats = {
'in_msgs': 0,
Expand Down Expand Up @@ -235,6 +239,7 @@ def connect(
signature_cb=None,
user_jwt_cb=None,
user_credentials=None,
nkeys_seed=None,
):
self._setup_server_pool(servers)
self._loop = io_loop or loop or asyncio.get_event_loop()
Expand All @@ -247,6 +252,7 @@ def connect(
self._signature_cb = signature_cb
self._user_jwt_cb = user_jwt_cb
self._user_credentials = user_credentials
self._nkeys_seed = nkeys_seed

# Customizable options
self.options["verbose"] = verbose
Expand All @@ -268,7 +274,7 @@ def connect(
if tls:
self.options['tls'] = tls

if self._user_credentials is not None:
if self._user_credentials is not None or self._nkeys_seed is not None:
self._setup_nkeys_connect()

# Queue used to trigger flushes to the socket
Expand Down Expand Up @@ -305,8 +311,12 @@ def connect(
self._current_server.reconnects += 1

def _setup_nkeys_connect(self):
# NOTE: Should use bytearray throughout as that
# is more secure in handling the secrets.
if self._user_credentials is not None:
self._setup_nkeys_jwt_connect()
else:
self._setup_nkeys_seed_connect()

def _setup_nkeys_jwt_connect(self):
import nkeys
import os

Expand Down Expand Up @@ -386,6 +396,38 @@ def sig_cb(nonce):

self._signature_cb = sig_cb

def _setup_nkeys_seed_connect(self):
import nkeys
import os

seed = None
creds = self._nkeys_seed
with open(creds, 'rb') as f:
seed = bytearray(os.fstat(f.fileno()).st_size)
f.readinto(seed)
kp = nkeys.from_seed(seed)
self._public_nkey = kp.public_key.decode()
kp.wipe()
del kp
del seed

def sig_cb(nonce):
seed = None
with open(creds, 'rb') as f:
seed = bytearray(os.fstat(f.fileno()).st_size)
f.readinto(seed)
kp = nkeys.from_seed(seed)
raw_signed = kp.sign(nonce.encode())
sig = base64.b64encode(raw_signed)

# Best effort attempt to clear from memory.
kp.wipe()
del kp
del seed
return sig

self._signature_cb = sig_cb

@asyncio.coroutine
def close(self):
"""
Expand Down Expand Up @@ -1149,7 +1191,13 @@ def _process_err(self, err_msg):
self._err = ErrAuthorization
else:
m = b'nats: ' + err_msg[0]
self._err = NatsError(m.decode())
err = NatsError(m.decode())
self._err = err

if PERMISSIONS_ERR in m:
if self._error_cb is not None:
yield from self._error_cb(err)
return

do_cbs = False
if not self.is_connecting:
Expand Down Expand Up @@ -1298,6 +1346,8 @@ def _connect_command(self):
if self._user_jwt_cb is not None:
jwt = self._user_jwt_cb()
options["jwt"] = jwt.decode()
elif self._public_nkey is not None:
options["nkey"] = self._public_nkey
# In case there is no password, then consider handle
# sending a token instead.
elif self.options["user"] is not None and self.options[
Expand Down
1 change: 1 addition & 0 deletions nats/aio/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

STALE_CONNECTION = b"'Stale Connection'"
AUTHORIZATION_VIOLATION = b"'Authorization Violation'"
PERMISSIONS_ERR = b"Permissions Violation"


class NatsError(Exception):
Expand Down
6 changes: 3 additions & 3 deletions script/install_gnatsd.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

set -e

export DEFAULT_NATS_SERVER_VERSION=v2.0.0-RC8
export DEFAULT_NATS_SERVER_VERSION=v2.0.0-RC14

export NATS_SERVER_VERSION="${NATS_SERVER_VERSION:=$DEFAULT_NATS_SERVER_VERSION}"

Expand All @@ -11,9 +11,9 @@ if [ ! "$(ls -A $HOME/nats-server)" ]; then
(
mkdir -p $HOME/nats-server
cd $HOME/nats-server
wget https://github.com/nats-io/gnatsd/releases/download/$NATS_SERVER_VERSION/gnatsd-$NATS_SERVER_VERSION-linux-amd64.zip -O nats-server.zip
wget https://github.com/nats-io/nats-server/releases/download/$NATS_SERVER_VERSION/nats-server-$NATS_SERVER_VERSION-linux-amd64.zip -O nats-server.zip
unzip nats-server.zip
cp gnatsd-$NATS_SERVER_VERSION-linux-amd64/gnatsd $HOME/nats-server/gnatsd
cp nats-server-$NATS_SERVER_VERSION-linux-amd64/nats-server $HOME/nats-server/gnatsd
)
else
echo 'Using cached directory.';
Expand Down
45 changes: 43 additions & 2 deletions tests/client_nkeys_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,51 @@

from nats.aio.client import Client as NATS
from nats.aio.errors import ErrTimeout, ErrInvalidUserCredentials
from tests.utils import (async_test, TrustedServerTestCase)
from tests.utils import (
async_test, TrustedServerTestCase, NkeysServerTestCase
)


class ClientNkeysTest(TrustedServerTestCase):
class ClientNkeysAuthTest(NkeysServerTestCase):
@async_test
async def test_nkeys_connect(self):
nc = NATS()

future = asyncio.Future(loop=self.loop)

async def error_cb(e):
nonlocal future
future.set_result(True)

await nc.connect(
"tls://127.0.0.1:4222",
loop=self.loop,
error_cb=error_cb,
connect_timeout=10,
nkeys_seed="./tests/nkeys/foo-user.nk",
allow_reconnect=False,
)

async def help_handler(msg):
await nc.publish(msg.reply, b'OK!')

await nc.subscribe("help", cb=help_handler)
await nc.flush()
msg = await nc.request("help", b'I need help')
self.assertEqual(msg.data, b'OK!')

await nc.subscribe("bar", cb=help_handler)
await nc.flush()

await asyncio.wait_for(future, 1, loop=self.loop)

msg = await nc.request("help", b'I need help')
self.assertEqual(msg.data, b'OK!')

await nc.close()


class ClientJWTAuthTest(TrustedServerTestCase):
@async_test
async def test_nkeys_jwt_creds_user_connect(self):
nc = NATS()
Expand Down
20 changes: 20 additions & 0 deletions tests/nkeys/nkeys_server.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@

accounts {
acme {
users [
{
nkey = "UCK5N7N66OBOINFXAYC2ACJQYFSOD4VYNU6APEJTAVFZB2SVHLKGEW7L",
permissions = {
subscribe = {
allow = ["help", "_INBOX.>"]
deny = ["foo"]
}
publish = {
allow = ["help", "_INBOX.>"]
deny = ["foo"]
}
}
}
]
}
}
22 changes: 22 additions & 0 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,28 @@ def tearDown(self):
self.loop.close()


class NkeysServerTestCase(NatsTestCase):
def setUp(self):
super(NkeysServerTestCase, self).setUp()
self.server_pool = []
self.loop = asyncio.new_event_loop()

# Make sure that we are setting which loop we are using explicitly.
asyncio.set_event_loop(None)

server = Gnatsd(
port=4222, config_file="./tests/nkeys/nkeys_server.conf"
)
self.server_pool.append(server)
for gnatsd in self.server_pool:
start_gnatsd(gnatsd)

def tearDown(self):
for gnatsd in self.server_pool:
gnatsd.stop()
self.loop.close()


class TrustedServerTestCase(NatsTestCase):
def setUp(self):
super(TrustedServerTestCase, self).setUp()
Expand Down

0 comments on commit 7ad8e54

Please sign in to comment.