打造数据上传“随意门”-Multipart Upload的presign方案与用例

打造数据上传“随意门”-Multipart Upload的presign方案与用例

技术杂谈小彩虹2021-07-14 16:09:37130A+A-

需求背景

有项目需要在客户端上传数据,但是出于安全考虑不能在客户端本地任何位置存储secret-key的信息,同时普通的presign上传请求也有局限性,特别是当文件很大的时候整个上传效率非常低。因此需要调研Multipart Upload是否支持presign机制。(Multipart Upload是否支持presign,AWS及ceph官方都没有明确说明)

Multipart Upload基本流程

图片

整个Multipart Upload大致分为3个阶段
**1. initiate_multipart_upload:**使用HTTP POST请求构建初始化数据,object的metadata、content-type这些都是在这个阶段设置,请求成功以后的response里面回返回uploadID,后续使用这个uploadID进行数据上传操作。
**2. upload_part:**使用HTTP PUT请求构建数据上传操作,客户端本地将大文件拆分成多个part文件,每个part使用相同的uploadID,按uploadNumber顺序依次上传数据(可以并行)。每个part文件上传成功会返回对应的etag来完成每个part文件的一致性校验。
**3. compelte_upload:**使用HTTP PUT请求构建最后的合并请求操作,完成各个分块的逻辑合并,注意返回的etag不具备一致性校验功能。

Multipart Upload特性总结:
1.拆分大文件到多个part文件,实现超大文件的上传。
2.拆分的part文件可以并行上传,极大地提高了上传效率。
3.Multipart Upload实现的是过程一致性校验,而非最终一致性校验。
4.metadata和content-type一类的信息必须在initiate阶段提前设置。

在upload_part阶段实现presign

图片

**流程介绍:**和普通Multipart Upload一样先在client端实现文件的拆分,之后对每个part文件生成单独的PreSignURL,最后使用生成的URL完成各个part文件的上传(不再需要secret-key)。

使用python实现upload_part阶段的presign

分为两部分,server_demo.py主要实现最关键的S3Sign,代码如下

# -*- coding: utf-8 -*-
import time
import hmac
from hashlib import sha1 as sha

py3k = False
try:
    from urlparse import urlparse, unquote
    from base64 import encodestring
except:
    py3k = True
    from urllib.parse import urlparse, unquote
    from base64 import encodebytes as encodestring


class S3PreSign():
    def __init__(self, access_key, secret_key, service_url, bucket_name, object_name, upload_ID, expires):
        self.service_url = str(service_url)
        self.access_key = str(access_key)
        self.secret_key = str(secret_key)
        self.bucket_name = str(bucket_name)
        self.object_name = str(object_name)
        self.upload_ID = str(upload_ID)
        self.Expires = int(time.time()) + int(expires)

    def get_signature_str(self, sign_str):
        if py3k:
            key = self.secret_key.encode('utf-8')
            msg = sign_str.encode('utf-8')
        else:
            key = self.secret_key
            msg = sign_str
        h = hmac.new(key, msg, digestmod=sha)
        return (encodestring(h.digest()).strip()).replace('+', '%2b')

    def build_url(self, partNumber, Signature):
        url_ = "http://{bucket_name}.{service_url}/{object_name}?uploadId={uploadId}&partNumber={partNumber}&Expires={Expires}&AWSAccessKeyId={AWSAccessKeyId}&Signature={Signature}".format(
            bucket_name=self.bucket_name,
            service_url=self.service_url,
            object_name=self.object_name,
            uploadId=self.upload_ID,
            partNumber=partNumber,
            Expires=self.Expires,
            AWSAccessKeyId=self.access_key,
            Signature=Signature
        )
        return url_

    def build_url_with_partid(self, partMd5, partNumber):
        sign_str = "PUT\n{partMd5}\n\n{Expires}\n/{bucket_name}/{object_name}?partNumber={partNumber}&uploadId={uploadId}".format(
            partMd5=partMd5,
            Expires=self.Expires,
            bucket_name=self.bucket_name,
            object_name=self.object_name,
            partNumber=partNumber,
            uploadId=self.upload_ID)
        Signature_ = self.get_signature_str(sign_str)
        return self.build_url(partNumber, Signature_)

最后在client_demo.py中实现对应的并行上传用例,这里只是简单的说明过程,如果需要隐藏secret-key这些信息,需要将这个生成presignURL的过程封装成一个独立的服务接口,客户端通过请求去获取这些生成的presignURL。

# -*- coding: utf-8 -*-
from server_demo import S3PreSign
import requests
from base64 import encodestring
from hashlib import md5
import os
from multiprocessing import Pool

class S3client():
    def __init__(self, part_num, uploadfile_path):
        self.part_num = part_num
        self.uploadfile_path = uploadfile_path

    def split_file(self):
        filelist = []
        statinfo = os.stat(self.uploadfile_path)
        chunksize = statinfo.st_size / self.part_num
        print "File size: %d(MB)" % (statinfo.st_size / (1024 * 1024))
        print self.uploadfile_path,chunksize
        with open(self.uploadfile_path, "rb") as f:
            index = 1
            while True:
                chunk = f.read(chunksize)
                if (chunk):
                    fn = "%s.part.%d" % (self.uploadfile_path, index)
                    print "creating", fn
                    with open(fn, "wb") as fw:
                        fw.write(chunk)
                    partMD5 = self.compute_hash(fn)
                    tmp_ = {}
                    tmp_[fn] = str(partMD5)
                    filelist.append(tmp_)
                    index = index + 1
                else:
                    break
        return filelist

    def compute_hash(self, filepath, buf_size=8192, size=None, hash_algorithm=md5):
        hash_obj = hash_algorithm()
        with open(filepath) as fp:
            spos = fp.tell()
            if size and size < buf_size:
                s = fp.read(size)
            else:
                s = fp.read(buf_size)
            while s:
                if not isinstance(s, bytes):
                    s = s.encode('utf-8')
                hash_obj.update(s)
                if size:
                    size -= len(s)
                    if size <= 0:
                        break
                if size and size < buf_size:
                    s = fp.read(size)
                else:
                    s = fp.read(buf_size)
            base64_digest = encodestring(hash_obj.digest()).decode('utf-8')
            if base64_digest[-1] == '\n':
                base64_digest = base64_digest[0:-1]
            return base64_digest

    def make_upload_list(self,S3Sign):
        upload_file_list = self.split_file()
        for f in upload_file_list:
            part_path = f.keys()[0]
            partMD5 = f.values()[0]
            partnum_ = f.keys()[0].split(".")[-1]
            url_ = S3Sign.build_url_with_partid(partMD5, partnum_)
            # print "Ready to upload {part_path} with URL={url_} MD5={partMD5}".format(part_path=part_path,url_=url_,partMD5=partMD5)
            yield (url_, part_path, partMD5)



def multipart_upload_with_part(url_, part_file_path, partMD5):
    headers = {}
    headers["Content-MD5"] = partMD5
    with open(part_file_path,'r') as fh:
        response = requests.put(url_, headers=headers, data=fh.read())
        if response.status_code == 200:
            print "{} upload Sucessful !".format(part_file_path)


if __name__ == "__main__":
    endpoint = 's3.ceph.work'
    access_key = ''
    secret_key = ''
    key_name = 'cosben-0.4.2.c4.zip'
    part_num = 6 #大文件切分数量
    expires = 300 #生成的presignURL有效时长
    bucket_name = 'multi-upload'
    file_path = '/tmp/cosbench-0.4.2.c4.zip'
    upload_ID = '2~EAEhzt0luJqhV4KkZDGQH3CmegO00FX' #uploadID使用boto或者其他方法生成
    processes_num = 2 #并行上传进程数

    s3sign = S3PreSign(access_key=access_key, secret_key=secret_key, service_url=endpoint, bucket_name=bucket_name,
                       object_name=key_name, upload_ID=upload_ID, expires=expires)
    s3client = S3client(part_num,file_path)
    # s3client.split_file()
    upload_file_list = s3client.make_upload_list(s3sign)
    p = Pool(processes=processes_num)
    for i in upload_file_list:
        # print i
        p.apply_async(multipart_upload_with_part, (i[0], i[1], i[2],))
    print 'Waiting for all subprocesses done...'
    p.close()
    p.join()
    print 'All subprocesses done.'

总结

目前测试ceph.0.94.5可以实现upload part的阶段presign,后续有时间再试下其他阶段的实现情况。注意与普通的presign方式不同的是使用upload part的阶段生成的presignURL上传数据,每个HTTP请求的header都要带上Content-MD5,否则会报403错误。

参考资料:
docs.aws.amazon.com/AmazonS3/la…
docs.aws.amazon.com/AmazonS3/la…
docs.aws.amazon.com/zh\_cn/Amaz…

点击这里复制本文地址 以上内容由权冠洲的博客整理呈现,请务必在转载分享时注明本文地址!如对内容有疑问,请联系我们,谢谢!

支持Ctrl+Enter提交

联系我们| 本站介绍| 留言建议 | 交换友链 | 域名展示
本站资源来自互联网收集,仅供用于学习和交流,请遵循相关法律法规,本站一切资源不代表本站立场,如有侵权、后门、不妥请联系本站删除

权冠洲的博客 © All Rights Reserved.  Copyright quanguanzhou.top All Rights Reserved
苏公网安备 32030302000848号   苏ICP备20033101号-1

联系我们