pcm-coordinator/api/internal/handler/image/chunkhandler.go

209 lines
5.9 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/*
Copyright (c) [2023] [pcm]
[pcm-coordinator] is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPaRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details.
*/
package image
import (
"bufio"
"context"
"encoding/base64"
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
types2 "github.com/docker/docker/api/types"
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/repository/result"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils/fileutils"
"io/ioutil"
"k8s.io/apimachinery/pkg/util/json"
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
)
var dir, _ = os.Getwd()
var uploadPath = filepath.Join(dir, "uploads")
func ChunkHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
size, _ := strconv.ParseInt(r.PostFormValue("size"), 10, 64)
hash := r.PostFormValue("hash")
name := r.PostFormValue("name")
dataType := r.PostFormValue("dataType")
kind := r.PostFormValue("kind")
// 对比合并请求的文件大小和已上传文件夹大小
toSize, err := fileutils.GetDirSize(filepath.Join(uploadTempPath, hash))
if err != nil {
logx.Error(err)
result.HttpResult(r, w, nil, err)
return
}
if size != toSize {
fmt.Fprintf(w, "文件上传错误")
}
chunksPath := filepath.Join(uploadTempPath, hash)
files, _ := ioutil.ReadDir(chunksPath)
// 将文件根据索引序号排序
filesSort := make(map[string]string)
for _, f := range files {
nameArr := strings.Split(f.Name(), "-")
filesSort[nameArr[1]] = f.Name()
}
saveFile := filepath.Join(uploadPath, name)
if exists, _ := fileutils.PathExists(saveFile); exists {
os.Remove(saveFile)
}
fs, _ := os.OpenFile(saveFile, os.O_CREATE|os.O_RDWR|os.O_APPEND, os.ModeAppend|os.ModePerm)
var wg sync.WaitGroup
filesCount := len(files)
if filesCount != len(filesSort) {
fmt.Fprintf(w, "文件上传错误2")
}
wg.Add(filesCount)
for i := 0; i < filesCount; i++ {
// 这里一定要注意按顺序读取不然文件就会损坏
fileName := filepath.Join(chunksPath, filesSort[strconv.Itoa(i)])
data, err := ioutil.ReadFile(fileName)
fmt.Println(err)
fs.Write(data)
wg.Done()
}
wg.Wait()
os.RemoveAll(chunksPath)
// 保存到数据库表里
svcCtx.DbEngin.Create(&models.File{
Name: name,
Hash: hash,
DataType: dataType,
Status: "local",
Kind: kind,
Bucket: "pcm"})
// 根据数据类型按需上传镜像推送到nexus 数据集和算法推送到云际存储)
switch kind {
case "image":
err = pushImage(svcCtx, hash, name)
case "dataSet", "algorithm":
err = uploadStorage(svcCtx, hash, name)
}
// 删除本地文件 避免占用本地存储资源
defer os.Remove(filepath.Join(uploadPath, name))
defer fs.Close()
result.HttpResult(r, w, nil, err)
}
}
// 同步数据集到modelArts
func syncDataSet() {
}
// 上传文件到云集存储
func uploadStorage(svcCtx *svc.ServiceContext, hash string, name string) error {
fileInfo, err := os.Open(filepath.Join(uploadPath, name))
if err != nil {
logx.Error(err)
return err
}
_, err = svcCtx.Uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String("pcm"),
Key: aws.String(name),
Body: fileInfo,
})
if err != nil {
logx.Error(err)
return err
}
// 更新数据状态
svcCtx.DbEngin.Model(&models.File{}).Where("hash = ?", hash).Update("status", "cloud")
return nil
}
// 推送镜像到nexus仓库
func pushImage(svcCtx *svc.ServiceContext, hash string, name string) error {
// 加载镜像文件到docker
fileInfo, err := os.Open(filepath.Join(uploadPath, name))
if err != nil {
logx.Error(err)
return err
}
defer fileInfo.Close()
reader := bufio.NewReader(fileInfo)
body, err := svcCtx.DockerClient.ImageLoad(context.Background(), reader, false)
if err != nil {
logx.Error(err)
return err
}
bytes, err := ioutil.ReadAll(body.Body)
println(string(bytes))
if err != nil {
logx.Error(err)
return err
}
//time.Sleep(12 * 100 * time.Millisecond)
privateImageName := "registry.cn-hangzhou.aliyuncs.com/jointcloud/pcm:" + name
// 给镜像打上私有仓库的tag
err = svcCtx.DockerClient.ImageTag(context.Background(), name, privateImageName)
if err != nil {
logx.Error(err)
return err
}
// 删除原镜像
_, err = svcCtx.DockerClient.ImageRemove(context.Background(), name, types2.ImageRemoveOptions{})
if err != nil {
logx.Error(err)
return err
}
// 推送镜像到registry
authConfig := types2.AuthConfig{
Username: svcCtx.Config.RegistryConf.Username,
Password: svcCtx.Config.RegistryConf.Password,
}
authConfigBytes, err := json.Marshal(authConfig)
if err != nil {
logx.Error(err)
return err
}
logx.Infof(fmt.Sprintln("传输开始", time.Now()))
authStr := base64.URLEncoding.EncodeToString(authConfigBytes)
pushBody, err := svcCtx.DockerClient.ImagePush(context.Background(), privateImageName, types2.ImagePushOptions{RegistryAuth: authStr})
pushBytes, _ := ioutil.ReadAll(pushBody)
println(string(pushBytes))
if err != nil {
logx.Error(err)
return err
}
logx.Infof(fmt.Sprintln("传输完成", time.Now()))
// 删除本地镜像 避免存储资源浪费
_, err = svcCtx.DockerClient.ImageRemove(context.Background(), privateImageName, types2.ImageRemoveOptions{})
if err != nil {
logx.Error(err)
return err
}
// 更新数据状态
svcCtx.DbEngin.Model(&models.File{}).Where("hash = ?", hash).Update("status", "cloud")
return nil
}