-
Notifications
You must be signed in to change notification settings - Fork 31
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* Drop deprecated features * Improve typing * Bump version * Edit README * Make API thiner * Add test for length validation * Simplify using custom host
- Loading branch information
1 parent
0e84366
commit 8b6057c
Showing
9 changed files
with
166 additions
and
158 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1 @@ | ||
__version__ = "3.0.4" | ||
__version__ = "3.0.5" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,93 +1,39 @@ | ||
import functools | ||
import re | ||
from typing import Mapping | ||
from http import HTTPStatus | ||
from typing import NoReturn | ||
|
||
from flask import (Blueprint, | ||
Request, | ||
Response, | ||
current_app as app, | ||
make_response) | ||
from flask import Blueprint, Response, make_response | ||
from flask.views import MethodView | ||
from marshmallow import Schema, ValidationError, fields, pre_load, validate | ||
from marshmallow import ValidationError | ||
from webargs.flaskparser import abort, parser, use_kwargs | ||
|
||
from shhh.api import handlers | ||
from shhh.constants import (DEFAULT_EXPIRATION_TIME_VALUE, | ||
DEFAULT_READ_TRIES_VALUE, | ||
EXPIRATION_TIME_VALUES, | ||
READ_TRIES_VALUES) | ||
from shhh.api.schemas import CallableResponse, ReadRequest, WriteRequest | ||
|
||
api = Blueprint("api", __name__, url_prefix="/api") | ||
|
||
json = functools.partial(use_kwargs, location="json") | ||
query = functools.partial(use_kwargs, location="query") | ||
|
||
|
||
class _ReadSchema(Schema): | ||
external_id = fields.Str(required=True) | ||
passphrase = fields.Str(required=True) | ||
|
||
|
||
def _passphrase_validator(passphrase: str) -> None: | ||
regex = re.compile(r"^(?=.*?[A-Z])(?=.*?[a-z])(?=.*?[0-9]).{8,}$") | ||
if regex.search(passphrase): | ||
return | ||
raise ValidationError( | ||
"Sorry, your passphrase is too weak. It needs minimum 8 characters, " | ||
"with 1 number and 1 uppercase.") | ||
|
||
def _handle(response: CallableResponse, code: HTTPStatus) -> Response: | ||
return make_response(response(), code) | ||
|
||
def _secret_validator(secret: str) -> None: | ||
max_length = app.config["SHHH_SECRET_MAX_LENGTH"] | ||
if len(secret) <= max_length: | ||
return | ||
raise ValidationError( | ||
f"The secret should not exceed {max_length} characters") | ||
|
||
|
||
class _CreateSchema(Schema): | ||
passphrase = fields.Str(required=True, validate=_passphrase_validator) | ||
secret = fields.Str(required=True, validate=_secret_validator) | ||
expire = fields.Str(dump_default=DEFAULT_EXPIRATION_TIME_VALUE, | ||
validate=validate.OneOf( | ||
EXPIRATION_TIME_VALUES.values())) | ||
tries = fields.Int(dump_default=DEFAULT_READ_TRIES_VALUE, | ||
validate=validate.OneOf(READ_TRIES_VALUES)) | ||
|
||
@pre_load | ||
def secret_sanitise_newline(self, data, **kwargs): | ||
if isinstance(data.get("secret"), str): | ||
data["secret"] = "\n".join(data["secret"].splitlines()) | ||
return data | ||
@parser.error_handler | ||
def handle_parsing_error(err: ValidationError, *args, **kwargs) -> NoReturn: | ||
abort(_handle(*handlers.parse_error(err))) | ||
|
||
|
||
@parser.error_handler | ||
def handle_parsing_error(err: ValidationError, | ||
req: Request, | ||
schema: Schema, | ||
*, | ||
error_status_code: int, | ||
error_headers: Mapping[str, str]): | ||
response, code = handlers.parse_error(err) | ||
return abort(make_response(response.make(), code)) | ||
body = functools.partial(use_kwargs, location="json") | ||
query = functools.partial(use_kwargs, location="query") | ||
|
||
|
||
class Api(MethodView): | ||
|
||
@query(_ReadSchema()) | ||
def get(self, external_id: str, passphrase: str) -> Response: | ||
response, code = handlers.read_secret(external_id, passphrase) | ||
return make_response(response.make(), code) | ||
@query(ReadRequest()) | ||
def get(self, *args, **kwargs) -> Response: | ||
return _handle(*handlers.read(*args, **kwargs)) | ||
|
||
@json(_CreateSchema()) | ||
def post(self, | ||
passphrase: str, | ||
secret: str, | ||
expire: str = DEFAULT_EXPIRATION_TIME_VALUE, | ||
tries: int = DEFAULT_READ_TRIES_VALUE) -> Response: | ||
response, code = handlers.write_secret(passphrase, secret, | ||
expire, tries) | ||
return make_response(response.make(), code) | ||
@body(WriteRequest()) | ||
def post(self, *args, **kwargs) -> Response: | ||
return _handle(*handlers.write(*args, **kwargs)) | ||
|
||
|
||
api = Blueprint("api", __name__, url_prefix="/api") | ||
api.add_url_rule("/secret", view_func=Api.as_view("secret")) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
import re | ||
from dataclasses import dataclass, field, fields as dfields | ||
from urllib.parse import urljoin | ||
|
||
from flask import Response, current_app as app, jsonify, request, url_for | ||
from marshmallow import Schema, ValidationError, fields, pre_load, validate | ||
|
||
from shhh.constants import (DEFAULT_EXPIRATION_TIME_VALUE, | ||
DEFAULT_READ_TRIES_VALUE, | ||
EXPIRATION_TIME_VALUES, | ||
READ_TRIES_VALUES, | ||
Message, | ||
Status) | ||
|
||
|
||
class ReadRequest(Schema): | ||
"""Schema for inbound read requests.""" | ||
external_id = fields.Str(required=True) | ||
passphrase = fields.Str(required=True) | ||
|
||
|
||
def _passphrase_validator(passphrase: str) -> None: | ||
regex = re.compile(r"^(?=.*?[A-Z])(?=.*?[a-z])(?=.*?[0-9]).{8,}$") | ||
if not regex.search(passphrase): | ||
raise ValidationError("Sorry, your passphrase is too weak. It needs " | ||
"minimum 8 characters, with 1 number and 1 " | ||
"uppercase.") | ||
|
||
|
||
def _secret_validator(secret: str) -> None: | ||
max_length = app.config["SHHH_SECRET_MAX_LENGTH"] | ||
if len(secret) > max_length: | ||
raise ValidationError(f"The secret should not exceed {max_length} " | ||
"characters.") | ||
|
||
|
||
class WriteRequest(Schema): | ||
"""Schema for inbound write requests.""" | ||
passphrase = fields.Str(required=True, validate=_passphrase_validator) | ||
secret = fields.Str(required=True, validate=_secret_validator) | ||
expire = fields.Str(load_default=DEFAULT_EXPIRATION_TIME_VALUE, | ||
validate=validate.OneOf( | ||
EXPIRATION_TIME_VALUES.values())) | ||
tries = fields.Int(load_default=DEFAULT_READ_TRIES_VALUE, | ||
validate=validate.OneOf(READ_TRIES_VALUES)) | ||
|
||
@pre_load | ||
def secret_sanitise_newline(self, data, **kwargs): | ||
if isinstance(data.get("secret"), str): | ||
data["secret"] = "\n".join(data["secret"].splitlines()) | ||
return data | ||
|
||
|
||
@dataclass | ||
class CallableResponse: | ||
|
||
def __call__(self) -> Response: | ||
return jsonify({ | ||
"response": { | ||
f.name: getattr(self, f.name) | ||
for f in dfields(self) | ||
} | ||
}) | ||
|
||
|
||
@dataclass | ||
class ReadResponse(CallableResponse): | ||
"""Schema for outbound read responses.""" | ||
status: Status | ||
msg: str | ||
|
||
|
||
def _build_link_url(external_id: str) -> str: | ||
root_host = app.config.get("SHHH_HOST") or request.url_root | ||
return urljoin(root_host, url_for("web.read", external_id=external_id)) | ||
|
||
|
||
@dataclass | ||
class WriteResponse(CallableResponse): | ||
"""Schema for outbound write responses.""" | ||
external_id: str | ||
expires_on: str | ||
link: str = field(init=False) | ||
status: Status = Status.CREATED | ||
details: Message = Message.CREATED | ||
|
||
def __post_init__(self): | ||
self.link = _build_link_url(self.external_id) | ||
|
||
|
||
@dataclass | ||
class ErrorResponse(CallableResponse): | ||
"""Schema for outbound error responses.""" | ||
details: str | ||
status: Status = Status.ERROR |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.