-
Notifications
You must be signed in to change notification settings - Fork 7
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #8 from husseinfakharany/issue_4
Add Support for Signature Validation via HTTP
- Loading branch information
Showing
25 changed files
with
641 additions
and
31 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,2 @@ | ||
class AnnotatorException(Exception): | ||
"""A general exception type to be used by the annotators""" | ||
"""A general exception type to be used by the annotators""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
from dataclasses import dataclass | ||
from enum import Enum | ||
|
||
|
||
class DerivedComponent(Enum): | ||
Method = "@method" | ||
TargetURI = "@target-uri" | ||
Authority = "@authority" | ||
Scheme = "@scheme" | ||
Path = "@path" | ||
Query = "@query" | ||
QueryParams = "@query-params" | ||
|
||
def __str__(self) -> str: | ||
return f'{self.value}' | ||
|
||
class HttpConstants: | ||
|
||
@property | ||
def http_request_key(self): | ||
return "HttpRequestKey" | ||
|
||
@property | ||
def content_type(self): | ||
return "Content-Type" | ||
|
||
@property | ||
def content_length(self): | ||
return "Content-Length" | ||
|
||
@dataclass | ||
class ParseResult: | ||
"""A data class that holds the parsed data""" | ||
|
||
seed: str | ||
signature: str | ||
keyid: str | ||
algorithm: str | ||
|
||
def __eq__(self, __o: object) -> bool: | ||
if not isinstance(__o, ParseResult): | ||
return NotImplemented | ||
return self.seed == __o.seed and self.signature == __o.signature and self.keyid == __o.keyid and self.algorithm == __o.algorithm |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
import datetime | ||
from typing import List | ||
|
||
from requests import Request | ||
from alvarium.annotators.handler.interfaces import RequestHandler | ||
|
||
from alvarium.sign.contracts import SignInfo, SignType | ||
from io import StringIO | ||
|
||
from alvarium.sign.factories import SignProviderFactory | ||
from .utils import parseSignature | ||
|
||
|
||
class Ed25519RequestHandler(RequestHandler): | ||
|
||
def __init__(self, request: Request) -> None: | ||
self.request = request | ||
|
||
def AddSignatureHeaders(self, ticks: datetime, fields: List[str], keys: SignInfo) -> None: | ||
headerValue = StringIO() | ||
|
||
for i in range(len(fields)): | ||
headerValue.write(f'"{str(fields[i])}"') | ||
if i < len(fields) - 1: | ||
headerValue.write(f' ') | ||
|
||
headerValue.write(f';created={str(int(ticks.timestamp()))};keyid="{str(keys.public.path)}";alg="{str(keys.public.type)}";') | ||
|
||
self.request.headers['Signature-Input'] = headerValue.getvalue() | ||
|
||
parsed = parseSignature(r=self.request) | ||
inputValue = bytes(parsed.seed, 'utf-8') | ||
p = SignProviderFactory().get_provider(sign_type=SignType.ED25519) | ||
|
||
with open(keys.private.path, 'r') as file: | ||
prv_hex = file.read() | ||
prv = bytes.fromhex(prv_hex) | ||
|
||
signature = p.sign(key=prv, content=inputValue) | ||
|
||
self.request.headers['Signature'] = str(signature) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
class ParserException(Exception): | ||
"""A general exception type to be used by the parser""" | ||
|
||
class RequestHandlerException(Exception): | ||
"""A general exception type to be used by the request handler""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
from requests import Request | ||
from alvarium.annotators.handler.ed25519 import Ed25519RequestHandler | ||
from alvarium.annotators.handler.mock import NoneRequestHandler | ||
from alvarium.sign.contracts import SignInfo, SignType | ||
from .interfaces import RequestHandler | ||
from .exceptions import RequestHandlerException | ||
|
||
class RequestHandlerFactory(): | ||
|
||
def getRequestHandler(self, request: Request, keys: SignInfo) -> RequestHandler: | ||
if keys.private.type == SignType.NONE: | ||
return NoneRequestHandler(request=request) | ||
if keys.private.type == SignType.ED25519: | ||
return Ed25519RequestHandler(request=request) | ||
else: | ||
raise RequestHandlerException("Key type is not supported") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
from abc import ABC, abstractmethod | ||
import datetime | ||
from typing import List | ||
from alvarium.sign.contracts import SignInfo | ||
|
||
class RequestHandler(ABC): | ||
|
||
@abstractmethod | ||
def AddSignatureHeaders(self, ticks: datetime, fields: List[str], keys: SignInfo) -> None: | ||
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
import datetime | ||
from typing import List | ||
|
||
from requests import Request | ||
from alvarium.annotators.handler.interfaces import RequestHandler | ||
from alvarium.sign.contracts import SignType | ||
from alvarium.sign.factories import SignProviderFactory | ||
|
||
class NoneRequestHandler(RequestHandler): | ||
|
||
def __init__(self, request: Request) -> None: | ||
self.request = request | ||
|
||
def AddSignatureHeaders(self) -> None: | ||
|
||
#Adding the Signature-Input header | ||
self.request.headers['Signature-Input'] = "" | ||
|
||
#Adding the Signature header using the NoneSignProvider | ||
p = SignProviderFactory().get_provider(sign_type=SignType.NONE) | ||
|
||
inputValue = bytes("", 'utf-8') | ||
signature = p.sign(content=inputValue) | ||
|
||
self.request.headers['Signature'] = signature | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
from requests import Request, structures | ||
from .exceptions import ParserException | ||
from urllib.parse import urlparse | ||
from .contracts import ParseResult, DerivedComponent | ||
from io import StringIO | ||
|
||
def parseSignature(r: Request) -> ParseResult: | ||
|
||
# Making the request headers case insensitive | ||
headers = structures.CaseInsensitiveDict(r.headers) | ||
|
||
# Signature Inputs Extraction | ||
signatureInput = headers.get("Signature-Input") | ||
try: | ||
signature = headers.get("Signature") | ||
if signature == None: | ||
signature = "" | ||
except KeyError: | ||
signature = "" | ||
|
||
signatureInputList = signatureInput.split(";",1) | ||
signatureInputHeader = signatureInputList[0].split(" ") | ||
signatureInputTail = signatureInputList[1] | ||
|
||
signatureInputParsedTail = signatureInputTail.split(";") | ||
|
||
algorithm = "" | ||
keyid = "" | ||
for s in signatureInputParsedTail: | ||
if "alg" in s: | ||
raw = s.split("=")[1] | ||
algorithm = raw[1:len(raw)-1] | ||
if "keyid" in s: | ||
raw = s.split("=")[1] | ||
keyid = raw[1:len(raw)-1] | ||
|
||
parsed_url = urlparse(r.url) | ||
|
||
signatureInputFields = {} | ||
signatureInputBody = StringIO() | ||
|
||
for field in signatureInputHeader: | ||
# Remove double quotes from the field to access it directly in the header map | ||
key = field[1 : len(field)-1] | ||
if key[0] == "@": | ||
if DerivedComponent(key) == DerivedComponent.Method: | ||
signatureInputFields[key] = [r.method] | ||
elif DerivedComponent(key) == DerivedComponent.TargetURI: | ||
signatureInputFields[key] = [r.url] | ||
elif DerivedComponent(key) == DerivedComponent.Authority: | ||
signatureInputFields[key] = [parsed_url.netloc] | ||
elif DerivedComponent(key) == DerivedComponent.Scheme: | ||
signatureInputFields[key] = [parsed_url.scheme] | ||
elif DerivedComponent(key) == DerivedComponent.Path: | ||
signatureInputFields[key] = [parsed_url.path] | ||
elif DerivedComponent(key) == DerivedComponent.Query: | ||
signatureInputFields[key] = ["?"+parsed_url.query] | ||
elif DerivedComponent(key) == DerivedComponent.QueryParams: | ||
queryParams = [] | ||
rawQueryParams = parsed_url.query.split("&") | ||
for rawQueryParam in rawQueryParams: | ||
if rawQueryParam != "": | ||
parameter = rawQueryParam.split("=") | ||
name = parameter[0] | ||
value = parameter[1] | ||
queryParam = f';name="{name}": {value}' | ||
queryParams.append(queryParam) | ||
signatureInputFields[key] = queryParams | ||
else: | ||
raise ParserException(f"Unhandled Derived Component {key}") | ||
else: | ||
try: | ||
# Multi-value headers are not permitted in Python | ||
fieldValues = headers.get(key) | ||
# Removing leading and trailing whitespaces | ||
signatureInputFields[key] = [fieldValues.strip()] | ||
except KeyError: | ||
raise ParserException(f"Header field not found {key}") | ||
|
||
# Construct final output string | ||
keyValues = signatureInputFields[key] | ||
if len(keyValues) == 1: | ||
signatureInputBody.write(f'"{key}" {keyValues[0]}\n') | ||
else: | ||
for value in keyValues: | ||
signatureInputBody.write(f'"{key}"{value}\n') | ||
|
||
parsedSignatureInput = f"{signatureInputBody.getvalue()};{signatureInputTail}" | ||
s = ParseResult(seed=parsedSignatureInput, signature=signature, keyid=keyid, algorithm=algorithm) | ||
|
||
return s | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,48 +1,30 @@ | ||
import socket | ||
|
||
from alvarium.sign.exceptions import SignException | ||
from alvarium.sign.factories import SignProviderFactory | ||
from alvarium.contracts.annotation import Annotation, AnnotationType | ||
from alvarium.hash.contracts import HashType | ||
from alvarium.sign.contracts import KeyInfo, SignInfo | ||
from alvarium.sign.contracts import SignInfo | ||
from alvarium.utils import PropertyBag | ||
from .contracts import Signable | ||
from .utils import derive_hash, sign_annotation | ||
from .utils import derive_hash, sign_annotation, verify_signature | ||
from .interfaces import Annotator | ||
from .exceptions import AnnotatorException | ||
|
||
class PkiAnnotator(Annotator): | ||
|
||
def __init__(self, hash: HashType, sign_info: SignInfo) -> None: | ||
self.hash = hash | ||
self.sign_info = sign_info | ||
self.kind = AnnotationType.PKI | ||
|
||
def _verify_signature(self, key: KeyInfo, signable: Signable) -> bool: | ||
""" Responsible for verifying the signature, returns true if the verification passed | ||
, false otherwise.""" | ||
try: | ||
sign_provider = SignProviderFactory().get_provider(sign_type=key.type) | ||
except SignException as e: | ||
raise AnnotatorException("cannot get sign provider.", e) | ||
|
||
with open(key.path, 'r') as file: | ||
pub_key = file.read() | ||
return sign_provider.verify(key=bytes.fromhex(pub_key), | ||
content=bytes(signable.seed, 'utf-8'), | ||
signed=bytes.fromhex(signable.signature)) | ||
|
||
|
||
def execute(self, data: bytes, ctx: PropertyBag = None) -> Annotation: | ||
key = derive_hash(hash=self.hash, data=data) | ||
host: str = socket.gethostname() | ||
|
||
# create Signable object | ||
signable = Signable.from_json(data.decode('utf-8')) | ||
is_satisfied: bool = self._verify_signature(key=self.sign_info.public, signable=signable) | ||
is_satisfied: bool = verify_signature(key=self.sign_info.public, signable=signable) | ||
|
||
annotation = Annotation(key=key, host=host, hash=self.hash, kind=self.kind, is_satisfied=is_satisfied) | ||
|
||
signature: str = sign_annotation(key_info=self.sign_info.private, annotation=annotation) | ||
annotation.signature = signature | ||
return annotation | ||
return annotation |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
import socket | ||
|
||
from alvarium.annotators.handler.contracts import HttpConstants | ||
from alvarium.annotators.handler.exceptions import ParserException | ||
|
||
from alvarium.contracts.annotation import Annotation, AnnotationType | ||
from alvarium.hash.contracts import HashType | ||
from alvarium.sign.contracts import KeyInfo, SignInfo, KeyInfo, SignType | ||
from alvarium.utils import PropertyBag | ||
from .contracts import Signable | ||
from .utils import derive_hash, sign_annotation, verify_signature | ||
from .interfaces import Annotator | ||
from .exceptions import AnnotatorException | ||
from alvarium.annotators.handler.utils import parseSignature | ||
|
||
class HttpPkiAnnotator(Annotator): | ||
def __init__(self, hash: HashType, sign_info: SignInfo) -> None: | ||
self.hash = hash | ||
self.sign_info = sign_info | ||
self.kind = AnnotationType.PKI_HTTP | ||
|
||
def execute(self, data: bytes, ctx: PropertyBag = None) -> Annotation: | ||
key = derive_hash(hash=self.hash, data=data) | ||
host: str = socket.gethostname() | ||
|
||
# Call parser on request | ||
req = ctx.get_property(key=HttpConstants().http_request_key) | ||
|
||
try: | ||
parsed_data = parseSignature(r=req) | ||
except ParserException as e: | ||
raise AnnotatorException("Cannot parse the HTTP request.", e) | ||
|
||
signable = Signable(seed=parsed_data.seed, signature=parsed_data.signature) | ||
|
||
try: | ||
signType = SignType(parsed_data.algorithm) | ||
except Exception as e: | ||
raise AnnotatorException("Invalid key type specified" + str(parsed_data.algorithm),e) | ||
|
||
k = KeyInfo(signType, parsed_data.keyid) | ||
|
||
try: | ||
is_satisfied = verify_signature(key=k, signable=signable) | ||
except Exception as e: | ||
raise AnnotatorException(str(e),e) | ||
|
||
annotation = Annotation(key=key, host=host, hash=self.hash, kind=self.kind, is_satisfied=is_satisfied) | ||
|
||
signature: str = sign_annotation(key_info=self.sign_info.private, annotation=annotation) | ||
annotation.signature = signature | ||
return annotation |
Oops, something went wrong.