first commit

This commit is contained in:
2022-03-17 15:59:24 +08:00
commit 2b0debb847
592 changed files with 73946 additions and 0 deletions

323
pkg/oss/aliyun/aliyun.go Normal file
View File

@@ -0,0 +1,323 @@
package aliyun
import (
"encoding/json"
"fmt"
"io"
"sort"
"time"
"github.com/aliyun/alibaba-cloud-sdk-go/sdk/requests"
"github.com/aliyun/alibaba-cloud-sdk-go/services/sts"
alioss "github.com/aliyun/aliyun-oss-go-sdk/oss"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
"gitlab.33.cn/chat/dtalk/pkg/oss"
)
var _ oss.Oss = (*Aliyun)(nil)
type Role struct {
RoleSessionName string `json:"roleSessionName"`
RoleArn string `json:"roleArn"`
}
type Aliyun struct {
cfg *oss.Config
client *alioss.Client
}
func New(cfg *oss.Config) *Aliyun {
ali := &Aliyun{
cfg: cfg,
client: nil,
}
// 创建并维护客户端
//go ali.watchClient()
err := ali.createClient()
if err != nil {
panic(err)
}
return ali
}
// Config 返回配置信息
func (ali *Aliyun) Config() *oss.Config {
return ali.cfg
}
// AssumeRole 返回临时授权角色
func (ali *Aliyun) AssumeRole() (*oss.AssumeRoleResp, error) {
//构建一个阿里云客户端, 用于发起请求。
//构建阿里云客户端时需要设置AccessKey ID和AccessKey Secret。
/*
regionId官方示例就是写死的
https://help.aliyun.com/document_detail/184381.htm?spm=a2c4g.11186623.2.19.25eb4bceQK29Sb#concept-1955433
*/
client, err := sts.NewClientWithAccessKey(ali.cfg.RegionId, ali.cfg.AccessKeyId, ali.cfg.AccessKeySecret)
if err != nil {
return nil, errors.WithMessage(err, "sts.NewClientWithAccessKey()")
}
//构建请求对象。
request := sts.CreateAssumeRoleRequest()
request.Scheme = "https"
var role Role
err = json.Unmarshal([]byte(ali.cfg.Role), &role)
if err != nil {
return nil, errors.WithMessagef(err, "json.Unmarshal role=%+v", role)
}
//设置参数。关于参数含义和设置方法请参见API参考。
request.RoleArn = role.RoleArn
request.RoleSessionName = role.RoleSessionName
// 设置过期时间
request.DurationSeconds = requests.NewInteger(ali.cfg.DurationSeconds)
//发起请求,并得到响应。
response, err := client.AssumeRole(request)
if err != nil {
return nil, errors.WithMessage(err, "client.AssumeRole()")
}
return &oss.AssumeRoleResp{
RequestId: response.RequestId,
Credentials: oss.Credentials{
AccessKeySecret: response.Credentials.AccessKeySecret,
Expiration: response.Credentials.Expiration,
AccessKeyId: response.Credentials.AccessKeyId,
SecurityToken: response.Credentials.SecurityToken,
},
AssumedRoleUser: oss.AssumedRoleUser{
AssumedRoleId: response.AssumedRoleUser.AssumedRoleId,
Arn: response.AssumedRoleUser.Arn,
},
}, nil
}
// Upload 上传文件
// key string 文件名, 包括路径 "dtalk/2021-07-01/1.jpg"
// body io.Reader 文件内容.
// url string 文件资源链接.
// error it's nil if no error, otherwise it's an error object.
func (ali *Aliyun) Upload(key string, body io.Reader, size int64) (url, uri string, err error) {
bucket, err := ali.client.Bucket(ali.cfg.Bucket)
if err != nil {
return "", "", errors.WithMessage(err, "client.Bucket()")
}
// 指定存储类型为标准存储,缺省也为标准存储。
storageType := alioss.ObjectStorageClass(alioss.StorageStandard)
// 指定存储类型为归档存储。
// storageType := alioss.ObjectStorageClass(alioss.StorageArchive)
// 指定访问权限为公共读缺省为继承bucket的权限。
objectAcl := alioss.ObjectACL(alioss.ACLPublicRead)
// 上传字符串。
err = bucket.PutObject(key, body, storageType, objectAcl)
if err != nil {
return "", "", errors.WithMessage(err, "bucket.PutObject()")
}
return ali.parseUrl(key), ali.parseUri(key), nil
}
func (ali *Aliyun) createClient() error {
client, err := alioss.New(
ali.cfg.EndPoint,
ali.cfg.AccessKeyId,
ali.cfg.AccessKeySecret,
)
if err != nil {
return errors.WithMessage(err, "alioss.New")
}
ali.client = client
return nil
}
// watchClient 定时更新临时客户端
func (ali *Aliyun) watchClient() {
// 创建一个计时器
durationTime := time.Second * time.Duration(ali.cfg.DurationSeconds)
timeTickerChan := time.Tick(durationTime)
// 更新客户端
for {
if err := ali.createTemporaryClient(); err != nil {
log.Error().Err(err).Msg("create ali temporary Client err")
} else {
log.Debug().Msg("create ali temporary Client success")
}
<-timeTickerChan
}
}
// createTemporaryClient 创建临时客户端
func (ali *Aliyun) createTemporaryClient() (err error) {
// 获得临时角色
assumeRoleResp, err := ali.AssumeRole()
if err != nil {
return errors.WithMessage(err, "ali.AssumeRole()")
}
// 创建临时客户端
ali.client, err = alioss.New(ali.cfg.EndPoint,
assumeRoleResp.Credentials.AccessKeyId,
assumeRoleResp.Credentials.AccessKeySecret,
alioss.SecurityToken(assumeRoleResp.Credentials.SecurityToken))
if err != nil {
return errors.WithMessage(err, "alioss.New()")
}
return nil
}
// parseUrl 获取对象在阿里云OSS上的完整访问URL
func (ali *Aliyun) parseUrl(key string) string {
return fmt.Sprintf("https://%s.%s/%s", ali.cfg.Bucket, ali.cfg.EndPoint, key)
}
// parseUri ...
func (ali *Aliyun) parseUri(key string) string {
return fmt.Sprintf("/%s", key)
}
// GetHost ...
func (ali *Aliyun) GetHost() string {
return fmt.Sprintf("https://%s.%s", ali.cfg.Bucket, ali.cfg.EndPoint)
}
// InitiateMultipartUpload 初始化分段上传任务
// 使用分段上传方式传输数据前必须先通知OBS初始化一个分段上传任务。
// 该操作会返回一个OBS服务端创建的全局唯一标识Upload ID用于标识本次分段上传任务。
// 您可以根据这个唯一标识来发起相关的操作,如取消分段上传任务、列举分段上传任务、列举已上传的段等。
//
// key string
//
// uploadId string
// err error
func (ali *Aliyun) InitiateMultipartUpload(key string) (uploadId string, err error) {
bucket, err := ali.client.Bucket(ali.cfg.Bucket)
if err != nil {
return "", errors.WithMessage(err, "client.Bucket()")
}
imur, err := bucket.InitiateMultipartUpload(key)
if err != nil {
return "", errors.WithMessage(err, "bucket.InitiateMultipartUpload")
}
return imur.UploadID, nil
}
// UploadPart 上传段
// 初始化一个分段上传任务之后可以根据指定的对象名和Upload ID来分段上传数据。
// 每一个上传的段都有一个标识它的号码——分段号Part Number范围是1~10000
// 对于同一个Upload ID该分段号不但唯一标识这一段数据也标识了这段数据在整个对象内的相对位置。
// 如果您用同一个分段号上传了新的数据那么OBS上已有的这个段号的数据将被覆盖。
// 除了最后一段以外其他段的大小范围是100KB~5GB最后段大小范围是0~5GB。
// 每个段不需要按顺序上传甚至可以在不同进程、不同机器上上传OBS会按照分段号排序组成最终对象。
//
// key string
// uploadId string
// body io.Reader
// partNumber int32
// offset int64
// partSize int64
//
// ETag string
// err error
func (ali *Aliyun) UploadPart(key, uploadId string, body io.Reader, partNumber int32, offset,
partSize int64) (ETag string, err error) {
// TODO offset, partSize 好像是不需要的参数, 如果 body 是 io.Reader 的话
bucket, err := ali.client.Bucket(ali.cfg.Bucket)
if err != nil {
return "", errors.WithMessage(err, "client.Bucket")
}
imur := alioss.InitiateMultipartUploadResult{
Key: key,
UploadID: uploadId,
Bucket: ali.cfg.Bucket,
}
part, err := bucket.UploadPart(imur, body, partSize, int(partNumber))
if err != nil {
return "", errors.WithMessage(err, "bucket.UploadPart")
}
return part.ETag, nil
}
// CompleteMultipartUpload 合并段
// 所有分段上传完成后需要调用合并段接口来在OBS服务端生成最终对象。
// 在执行该操作时需要提供所有有效的分段列表包括分段号和分段ETag值
// OBS收到提交的分段列表后会逐一验证每个段的有效性。当所有段验证通过后OBS将把这些分段组合成最终的对象。
//
// key string
// uploadId string
// parts []model.Part
//
// url string
// err error
func (ali *Aliyun) CompleteMultipartUpload(key, uploadId string, parts []oss.Part) (url, uri string, err error) {
bucket, err := ali.client.Bucket(ali.cfg.Bucket)
if err != nil {
return "", "", errors.WithMessage(err, "client.Bucket")
}
imur := alioss.InitiateMultipartUploadResult{
Key: key,
UploadID: uploadId,
Bucket: ali.cfg.Bucket,
}
ossParts := make([]alioss.UploadPart, len(parts), len(parts))
sort.Sort(oss.Parts(parts))
for i := range parts {
ossParts[i] = alioss.UploadPart{
PartNumber: int(parts[i].PartNumber),
ETag: parts[i].ETag,
}
}
_, err = bucket.CompleteMultipartUpload(imur, ossParts)
if err != nil {
return "", "", errors.WithMessage(err, "bucket.CompleteMultipartUpload")
}
return ali.parseUrl(key), ali.parseUri(key), nil
}
// AbortMultipartUpload 取消分段上传任务
// 分段上传任务可以被取消当一个分段上传任务被取消后就不能再使用其Upload ID做任何操作已经上传段也会被OBS删除。
// 采用分段上传方式上传对象过程中或上传对象失败后会在桶内产生段,这些段会占用您的存储空间,您可以通过取消该分段上传任务来清理掉不需要的段,节约存储空间。
//
// key string
// uploadId string
//
// err error
func (ali *Aliyun) AbortMultipartUpload(key, uploadId string) error {
bucket, err := ali.client.Bucket(ali.cfg.Bucket)
if err != nil {
return errors.WithMessage(err, "client.Bucket")
}
imur := alioss.InitiateMultipartUploadResult{
Key: key,
UploadID: uploadId,
Bucket: ali.cfg.Bucket,
}
err = bucket.AbortMultipartUpload(imur)
if err != nil {
return errors.WithMessage(err, "bucket.AbortMultipartUpload")
}
return nil
}
func (ali *Aliyun) test() {
//ali.client.
}