Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tckt33: support multipart/form-data (i.e. file upload) #70

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 186 additions & 2 deletions MicroWebSrv2/httpRequest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

"""
The MIT License (MIT)
Copyright © 2019 Jean-Christophe Bos & HC² (www.hc2.fr)
Expand Down Expand Up @@ -177,7 +176,7 @@ def _routeRequest(self) :

# ------------------------------------------------------------------------

def GetPostedURLEncodedForm(self) :
def GetPostedForm(self) :
res = { }
if self.ContentType.lower() == 'application/x-www-form-urlencoded' :
try :
Expand All @@ -189,6 +188,26 @@ def GetPostedURLEncodedForm(self) :
res[UrlUtils.UnquotePlus(p[0])] = v
except :
pass
elif self.ContentType.lower() == 'multipart/form-data':
if "boundary=" in self.ContentSubtype.lower():
msg = "Received multipart form -- processing...."
self._mws2.Log(msg, self._mws2.DEBUG)
bound = self.ContentSubtype.split("boundary=")[1].strip()

msg = "boundary for multipart parts: {}".format(bound)
self._mws2.Log(msg, self._mws2.DEBUG)

form_parts = self.__split_parts_at(bound)
if form_parts:
for part in form_parts:
res.update(self.__parse_multi_part(part))
else:
self._mws2.Log('Could not split at boundaries', self._mws2.ERROR)
else:
self._mws2.Log('Could not find content boundary string', self._mws2.ERROR)
else:
self._mws2.Log('GetPostedForm does not support %s' % self.ContentType,
self._mws2.ERROR)
return res

# ------------------------------------------------------------------------
Expand Down Expand Up @@ -245,6 +264,166 @@ def CheckBearerAuth(self, token) :
pass
return False


# ------------------------------------------------------------------------

def __split_parts_at(self, boundary):
""" Split multipart/form-data into list with data b/w boundaries. """
content_bytes = bytes(self._content)
form_parts = []

bound_length = len(boundary)
content_end = len(self._content)
ind_start = 2
ind_stop = bound_length + 2

while ind_stop < content_end:
# Should always be starting w/ boundary string
content_chunk = content_bytes[ind_start:ind_stop]
try:
content_str = content_chunk.decode('utf-8')
if content_str != boundary:
print("boundary: >>{}<<\ntest_str: >>{}<<".format(
boundary, content_str))
self._mws2.Log('Ill-formed part of multipart data',
self._mws2.ERROR)
except Exception as ex:
self._mws2.Log('Ill-formed part of multipart data: %s' % ex,
self._mws2.ERROR)

# Strip initial \r\n from multipart part
ind_start = ind_stop + 2

if ind_start + 2 == content_end:
# Double-check last four bytes are '--\r\n'
last_four = content_bytes[ind_start-2:ind_start+2]
try:
end_str = last_four.decode('utf-8')
except:
pass
if end_str != "--\r\n":
self._mws2.Log(
'Ill-formed end of multipart msg: %s' % end_str,
self._mws2.ERROR)
break

# Should have 'Content-Disposition: form-data; name=' next
ind_stop = ind_start + 38
test_bytes = content_bytes[ind_stop:ind_stop + bound_length]
test_str = ""
is_str = False
try:
test_str = test_bytes.decode('utf-8')
is_str = True
except:
pass
while test_str != boundary:
ind_stop = ind_stop + 1
if ind_stop + bound_length > content_end:
self._mws2.Log('Cannot find next boundary', self._mws2.ERROR)
break
test_bytes = content_bytes[ind_stop:ind_stop + bound_length]
try:
test_str = test_bytes.decode('utf-8')
is_str = True
except:
is_str = False
pass
stop = ind_stop - 4
form_parts.insert(len(form_parts), content_bytes[ind_start:stop])
ind_start = ind_stop
ind_stop = ind_stop + bound_length
return form_parts

# ------------------------------------------------------------------------

def __parse_multi_part(self, form_data):
"""
Extract content from multipart/form-data.
Argument form_data is content between boundaries (i.e. an item from
list returned by __extract_form_parts() above).
"""
all_text = False
try:
form_str = form_data.decode('utf-8')
all_text = True
except:
pass
if all_text:
return self.__parse_multi_text(form_str)

ret_dict = {}

# Separate header text from binary file data.
disp_str = "Content-Disposition: form-data; "
str_start = 0
str_end = len(disp_str)
header_str = form_data[str_start:str_end].decode('utf-8')
while not header_str.endswith("\r\n\r\n"):
str_end = str_end + 1
header_str = form_data[str_start:str_end].decode('utf-8')
header_str = header_str[len(disp_str):].strip()
tmp_dict = self.__parse_upload_header(header_str)

# Save to drive.
# TODO: Should sanitize the filename in case of malicious user.
if "name" in tmp_dict and "filename" in tmp_dict:
ret_dict[tmp_dict["name"]] = tmp_dict["filename"]
upload_dir = self._mws2._uploadPath
ret_dict["saved_as"] = "{}/{}".format(upload_dir, tmp_dict["filename"])
with open(ret_dict["saved_as"], "wb") as bin_fh:
bin_fh.write(form_data[str_end:])
self._mws2.Log('ret_dict: %s' % ret_dict, self._mws2.DEBUG)
else:
self._mws2.Log('Cannot parse: %s' % header_str, self._mws2.ERROR)

return ret_dict

# ------------------------------------------------------------------------

def __parse_multi_text(self, form_str):
"""
Extract content from multipart/form-data.
Argument form_str is decoded (utf-8) content between boundaries.
"""
# First line should at least have content-disposition and name.
ret_dict = {}
header_content = form_str.split("\r\n\r\n")
disp_str = "Content-Disposition: form-data; "
header = header_content[0].split(disp_str)[1]

if "filename" in header:
tmp_dict = self.__parse_upload_header(header)
if "name" in tmp_dict and "filename" in tmp_dict:
ret_dict[tmp_dict["name"]] = tmp_dict["filename"]
ret_dict[tmp_dict["filename"]] = header_content[1]
else:
self._mws2.Log('Cannot parse: %s' % header,
self._mws2.ERROR)
else:
header_parts = header.split("=")
if len(header_parts) > 2:
self._mws2.Log('Ill-formed multipart part header: %s' % header,
self._mws2.ERROR)
else:
dict_key = header_parts[1].replace('"', '')
ret_dict[dict_key] = header_content[1]

return ret_dict

# ------------------------------------------------------------------------

def __parse_upload_header(self, header):
""" Parse header from multipart/form-data into dictionary. """
header_lines = header.split("\r\n")
header_parts = header_lines[0].split("; ")
tmp_dict = {}
for part in header_parts:
print("\t{}".format(part))
part_items = part.split("=")
tmp_dict[part_items[0]] = part_items[1].replace('"', '')
return tmp_dict

# ------------------------------------------------------------------------

@property
Expand Down Expand Up @@ -346,6 +525,11 @@ def Referer(self) :
@property
def ContentType(self) :
return self._headers.get('content-type', '').split(';', 1)[0].strip()
# ------------------------------------------------------------------------

@property
def ContentSubtype(self):
return self._headers.get('content-type', '').split(';', 1)[1].strip()

# ------------------------------------------------------------------------

Expand Down
19 changes: 16 additions & 3 deletions MicroWebSrv2/microWebSrv2.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

"""
The MIT License (MIT)
Copyright © 2019 Jean-Christophe Bos & HC² (www.hc2.fr)
Expand Down Expand Up @@ -89,6 +88,7 @@ def __init__(self) :
self._bindAddr = ('0.0.0.0', 80)
self._sslContext = None
self._rootPath = 'www'
self._uploadPath = '/tmp/mws2'
self._timeoutSec = 2
self._notFoundURL = None
self._allowAllOrigins = False
Expand Down Expand Up @@ -295,7 +295,7 @@ def _onSrvClosed(self, xAsyncTCPServer, closedReason) :
def _validateChangeConf(self, name='Configuration') :
if self._xasSrv :
raise MicroWebSrv2Exception('%s cannot be changed while the server is running.' % name)

# ------------------------------------------------------------------------

def EnableSSL(self, certFile, keyFile, caFile=None) :
Expand Down Expand Up @@ -330,7 +330,7 @@ def DisableSSL(self) :
if self._bindAddr[1] == 443 :
self._bindAddr = (self._bindAddr[0], 80)

# ------------------------------------------------------------------------
# ------------------------------------------------------------------------

def SetEmbeddedConfig(self) :
self._validateChangeConf()
Expand Down Expand Up @@ -480,6 +480,19 @@ def RootPath(self, value) :

# ------------------------------------------------------------------------

@property
def UploadPath(self):
return self._uploadPath

@UploadPath.setter
def UploadPath(self, value):
if not isinstance(value, str) or len(value) == 0:
raise ValueError('"UploadPath" must be a not empty string.')
self._validateChangeConf('"UploadPath"')
self._uploadPath = (value[:-1] if value.endswith('/') else value)

# ------------------------------------------------------------------------

@property
def RequestsTimeoutSec(self) :
return self._timeoutSec
Expand Down
72 changes: 69 additions & 3 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@


from MicroWebSrv2 import *
from time import sleep
from _thread import allocate_lock
Expand Down Expand Up @@ -41,7 +39,7 @@ def RequestTestPost(microWebSrv2, request) :

@WebRoute(POST, '/test-post', name='TestPost2/2')
def RequestTestPost(microWebSrv2, request) :
data = request.GetPostedURLEncodedForm()
data = request.GetPostedForm()
try :
firstname = data['firstname']
lastname = data['lastname']
Expand All @@ -63,6 +61,67 @@ def RequestTestPost(microWebSrv2, request) :
MicroWebSrv2.HTMLEscape(lastname) )
request.Response.ReturnOk(content)

# ------------------------------------------------------------------------

@WebRoute(GET, '/test-upload', name='TestUpload1/2')
def RequestTestPost(microWebSrv2, request):
content = """\
<!DOCTYPE html>
<html>

<head>
<title>File Upload Test</title>
</head>

<body>
<h2>MicroWebSrv2 - File Upload Test</h2>
<form action="/test-upload"
method="post"
enctype="multipart/form-data"
>
First name: <input type="text" name="firstname"><br />
Last name: <input type="text" name="lastname"><br />
File: <INPUT TYPE=FILE ID=UPLOAD_IMG_ID NAME=UPLOAD_FILE />
<input type="submit" value="OK">
</form>
</body>

</html>
"""
request.Response.ReturnOk(content)

# ------------------------------------------------------------------------

@WebRoute(POST, '/test-upload', name='TestUpload2/2')
def RequestTestPost(microWebSrv2, request) :
data = request.GetPostedForm()
try:
firstname = data['firstname']
lastname = data['lastname']
saved_as = data['saved_as']
filename = data['UPLOAD_FILE']
except:
firstname = ""
lastname = ""
filename = ""
content = """\
<!DOCTYPE html>
<html>
<head>
<title>File upload result</title>
</head>
<body>
<h2>File upload result</h2>
Hello %s %s :) -- you uploaded %s (server saved to %s)<br />
</body>
</html>
""" % (MicroWebSrv2.HTMLEscape(firstname),
MicroWebSrv2.HTMLEscape(lastname),
MicroWebSrv2.HTMLEscape(filename),
MicroWebSrv2.HTMLEscape(saved_as)
)
request.Response.ReturnOk(content)

# ============================================================================
# ============================================================================
# ============================================================================
Expand Down Expand Up @@ -163,8 +222,15 @@ def OnWSChatClosed(webSocket) :
# For embedded MicroPython, use a very light configuration,
mws2.SetEmbeddedConfig()

# Allow up to 32 MB upload
mws2.MaxRequestContentLength = 32 * 1024 * 1024

# On a slower network, upload might take a while
mws2.RequestsTimeoutSec = 60

# All pages not found will be redirected to the home '/',
mws2.NotFoundURL = '/'
# mws2.BindAddress = ("0.0.0.0", 8765)

# Starts the server as easily as possible in managed mode,
mws2.StartManaged()
Expand Down
1 change: 1 addition & 0 deletions www/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ <h2>About</h2>
<ul>
<li><a href="test-redir">Test a redirection</a></li>
<li><a href="test-post">Test a POST form</a></li>
<li><a href="test-upload">Test a file upload form</a></li>
<li><a href="wstest.html">Test the WebSockets page</a></li>
<li><a href="wschat.html">Test the multi-users chat page</a></li>
<li><a href="test.pyhtml">Test a pyhtml template page</a></li>
Expand Down