Source code for biothings.hub.api.handlers.upload

import cgi
import logging
import os

from tornado import gen
from tornado.web import stream_request_body

from biothings.utils.common import sizeof_fmt

from .base import GenericHandler


[docs] @stream_request_body class UploadHandler(GenericHandler):
[docs] def initialize(self, upload_root, **kwargs): self.upload_root = upload_root
[docs] def prepare(self): # sanity check + extract boundary ct = self.request.headers.get("Content-Type") if not ct: self.write_error(400, exc_info=[None, "No Content-Type header found", None]) return try: ct, boundary = ct.split(";") except Exception as e: self.write_error(400, exc_info=[None, str(e), None]) return ct = ct.strip() if ct != "multipart/form-data": self.write_error(400, exc_info=[None, "Excepting 'Content-Type: multipart/form-data', got %s" % ct, None]) return boundname, boundary = boundary.strip().split("=") if boundname != "boundary": self.write_error(400, exc_info=[None, "No boundary field found in headers", None]) self.boundary = boundary cl = self.request.headers.get("Content-Length") if cl: self.contentlength = sizeof_fmt(int(cl)) else: self.contentlength = "unknown size" self.begin = False self.src_name = self.request.uri.split("/")[-1] self.fp = None self.head = ""
[docs] def parse_head(self): data = self.head.splitlines() assert "--" + self.boundary == data[0] for dat in data[1:]: if "Content-Disposition:".lower() in dat.lower(): cd, cdopts = cgi.parse_header(dat) try: self.filename = cdopts["filename"] except KeyError: self.write_error(400, exc_info=[None, "No 'filename' found in Content-Disposition", None]) if "Content-Type:".lower() in dat.lower(): _, ct = map(str.strip, dat.split(":")) if not ct == "application/octet-stream": self.write_error(400, exc_info=[None, f"Expected 'application/octet-stream', got: {ct}", None]) logging.info("Upload file '%s' (%s) for source '%s'", self.filename, self.contentlength, self.src_name) folder = os.path.join(self.upload_root, self.src_name) os.makedirs(folder, exist_ok=True) self.fp = open(os.path.join(folder, self.filename), "wb")
[docs] @gen.coroutine def data_received(self, chunk): while True: if self.begin: endbound = b"\r\n--" + self.boundary.encode() + b"--\r\n" if endbound in chunk: chunk = chunk.replace(endbound, b"") self.fp.write(chunk) break else: # we're in headers, assuming str conversion data = chunk.split(b"\r\n\r\n") assert len(data) >= 1 self.head += data[0].decode() self.begin = True chunk = b"\r\n\r\n".join(data[1:]) self.parse_head()
[docs] def post(self, src_name): assert src_name == self.src_name self.write("ok")