feat: setup the basic pipeline

This commit is contained in:
SigureMo 2021-05-02 23:58:26 +08:00
parent 13cbfd3081
commit b8af6fa76e
No known key found for this signature in database
GPG Key ID: F99A3CD7BD76B247
30 changed files with 1176 additions and 187 deletions

9
.gitignore vendored
View File

@ -119,3 +119,12 @@ dmypy.json
# Editor/IDE Configures
.vscode
# Media files
*.aac
*.mp3
*.mp4
*.m4s
# test files
*.test.py

View File

@ -1,64 +1,67 @@
import asyncio
import json
from typing import Any, Optional
import argparse
import aiofiles
import aiohttp
from yutto.api.acg_video import (
AudioUrlMeta,
VideoUrlMeta,
get_acg_video_list,
get_acg_video_playurl,
get_acg_video_subtitile,
get_video_info,
)
from yutto.api.types import AId, BvId, CId
from yutto.filter import select_audio, select_video
from yutto.media.codec import AudioCodec, VideoCodec, gen_acodec_priority, gen_vcodec_priority
from yutto.media.quality import AudioQuality, VideoQuality, gen_audio_quality_priority, gen_video_quality_priority
from yutto.utils.asynclib import LimitParallelsPool, run_with_n_workers
from yutto.utils.fetcher import Fetcher
from yutto.utils.file_buffer import AsyncFileBuffer, BufferChunk
from yutto.utils.logger import logger
from yutto.cli import get, info, check_options
from yutto.__version__ import __version__
from yutto.utils.ffmpeg import FFmpeg
from yutto.utils.console.colorful import colored_string
from yutto.utils.console.logger import Logger
from yutto.media.quality import video_quality_priority_default, audio_quality_priority_default
def gen_headers():
return {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.85 Safari/537.36",
"Referer": "https://www.bilibili.com",
}
def main():
parser = argparse.ArgumentParser(description="yutto 一个任性的 B 站视频下载器", prog="yutto")
parser.add_argument("-v", "--version", action="version", version="%(prog)s {}".format(__version__))
parser.add_argument("-n", "--num-workers", type=int, default=8, help="同时下载的 Worker 个数")
parser.add_argument(
"-q",
"--video-quality",
default=125,
choices=video_quality_priority_default,
type=int,
help="视频清晰度等级125:HDR, 120:4K, 116:1080P60, 112:1080P+, 80:1080P, 74:720P60, 64:720P, 32:480P, 16:360P",
)
parser.add_argument(
"--audio-quality",
default=30280,
choices=audio_quality_priority_default,
type=int,
help="音频码率等级30280:320kbps, 30232:128kbps, 30216:64kbps",
)
parser.add_argument("--vcodec", default="avc:copy", help="视频编码格式(<下载格式>:<生成格式>")
parser.add_argument("--acodec", default="mp4a:copy", help="音频编码格式(<下载格式>:<生成格式>")
parser.add_argument("--only-video", dest="require_audio", action="store_false", help="只下载视频")
parser.add_argument("--only-audio", dest="require_video", action="store_false", help="只下载音频")
parser.add_argument("--danmaku", default="xml", choices=["xml", "ass", "no"], help="视频主页xxx")
parser.add_argument("-b", "--block-size", default=1.0, type=float, help="分块下载时各块大小,单位为 MiB默认为 1MiB")
parser.add_argument("-w", "--overwrite", action="store_true", help="强制覆盖已下载内容")
parser.add_argument("-x", "--proxy", default="auto", help="设置代理auto 为系统代理、no 为不使用代理、当然也可以设置代理值)")
parser.add_argument("-d", "--dir", default="", help="下载目录")
parser.add_argument("-c", "--sessdata", default="", help="Cookies 中的 SESSDATA 字段")
parser.add_argument("--path-pattern", default="{auto}", help="多级目录的存储路径 Pattern")
parser.add_argument("--no-color", action="store_true", help="不使用颜色")
parser.add_argument("--debug", action="store_true", help="启用 debug 模式")
parser.set_defaults(action=run)
subparsers = parser.add_subparsers()
# 子命令 get
parser_get = subparsers.add_parser("get", help="获取单个视频")
get.add_get_arguments(parser_get)
# 子命令 info
# TODO
# 子命令 batch
# TODO
# 执行各自的 action
args = parser.parse_args()
check_options.check_basic_options(args)
args.action(args)
async def main():
async with aiohttp.ClientSession(headers=gen_headers(), timeout=aiohttp.ClientTimeout(total=5)) as sess:
res = await get_video_info(sess, BvId("BV1864y1m7Yj"))
print(res)
print(json.dumps(str(res)))
res = await get_video_info(sess, AId("887650906"))
print(res)
res = await get_acg_video_list(sess, AId("887650906"))
print(res)
res = await get_acg_video_subtitile(sess, BvId("BV1C4411J7cR"), CId("92109804"))
print(res)
videos, audios = await get_acg_video_playurl(sess, BvId("BV1C4411J7cR"), CId("92109804"))
print(videos, audios)
await Fetcher.get_size(sess, videos[0]["url"])
video = select_video(videos)
audio = select_audio(audios)
print(video)
print(audio)
def run(args: argparse.Namespace):
Logger.error("未指定子命令 (get, info, batch)")
Logger.info("yutto version: {}".format(colored_string(__version__, fore="green")))
Logger.info("FFmpeg version: {}".format(colored_string(FFmpeg().version, fore="blue")))
# async def main():
# buf = await AsyncFileBuffer.create('tt.txt')
# await buf.write(b'12345', 25)
# await buf.write(b'34567', 20)
# await buf.write(b'00000', 30)
# await buf.write(b'99999', 35)
# await buf.close()
asyncio.run(main())
if __name__ == "__main__":
main()

View File

@ -4,34 +4,22 @@ from typing import Any, TypedDict, Literal
from aiohttp import ClientSession
from yutto.api.types import AId, AvId, BvId, CId, EpisodeId
from yutto.urlparser import regexp_bangumi_ep
from yutto.api.types import (
AId,
AvId,
BvId,
CId,
EpisodeId,
HttpStatusError,
NoAccessError,
UnSupportedTypeError,
VideoUrlMeta,
AudioUrlMeta,
)
from yutto.utils.fetcher import Fetcher
from yutto.media.codec import VideoCodec, AudioCodec
from yutto.media.quality import VideoQuality, AudioQuality
class HttpStatusError(Exception):
pass
class NoAccessError(Exception):
pass
class UnSupportedTypeError(Exception):
pass
class VideoInfo(TypedDict):
avid: AvId
aid: AId
bvid: BvId
episode_id: EpisodeId
is_bangumi: bool
cid: CId
picture: str
title: str
from yutto.api.info import get_video_info
class AcgVideoListItem(TypedDict):
@ -40,44 +28,6 @@ class AcgVideoListItem(TypedDict):
cid: CId
class VideoUrlMeta(TypedDict):
url: str
mirrors: list[str]
codec: VideoCodec
width: int
height: int
quality: VideoQuality
class AudioUrlMeta(TypedDict):
url: str
mirrors: list[str]
codec: AudioCodec
width: int
height: int
quality: AudioQuality
async def get_video_info(session: ClientSession, avid: AvId) -> VideoInfo:
info_api = "http://api.bilibili.com/x/web-interface/view?aid={aid}&bvid={bvid}"
res_json = await Fetcher.fetch_json(session, info_api.format(**avid.to_dict()))
res_json_data = res_json.get("data")
assert res_json_data is not None, "响应数据无 data 域"
episode_id = EpisodeId("")
if res_json_data.get("redirect_url") and (ep_match := regexp_bangumi_ep.match(res_json_data["redirect_url"])):
episode_id = EpisodeId(ep_match.group("episode_id"))
return {
"avid": BvId(res_json_data["bvid"]),
"aid": AId(str(res_json_data["aid"])),
"bvid": BvId(res_json_data["bvid"]),
"episode_id": episode_id,
"is_bangumi": bool(episode_id),
"cid": CId(str(res_json_data["cid"])),
"picture": res_json_data["pic"],
"title": res_json_data["title"],
}
async def get_acg_video_title(session: ClientSession, avid: AvId) -> str:
return (await get_video_info(session, avid))["title"]
@ -92,6 +42,7 @@ async def get_acg_video_list(session: ClientSession, avid: AvId) -> list[AcgVide
"name": item["part"],
"cid": CId(str(item["cid"]))
}
# fmt: on
for i, item in enumerate(res_json["data"])
]
@ -147,6 +98,7 @@ async def get_acg_video_subtitile(session: ClientSession, avid: AvId, cid: CId)
"lines": (await Fetcher.fetch_json(session, "https:" + sub_info["subtitle_url"]))["body"]
}
for sub_info in subtitle_json["subtitles"]
# fmt: on
]
else:
return []

141
yutto/api/bangumi.py Normal file
View File

@ -0,0 +1,141 @@
import json
import re
from typing import Any, TypedDict, Literal
from aiohttp import ClientSession
from yutto.api.types import (
AId,
AvId,
BvId,
CId,
EpisodeId,
MediaId,
SeasonId,
HttpStatusError,
NoAccessError,
UnSupportedTypeError,
VideoUrlMeta,
AudioUrlMeta,
)
from yutto.utils.fetcher import Fetcher
from yutto.media.codec import VideoCodec, AudioCodec
from yutto.media.quality import VideoQuality, AudioQuality
from yutto.utils.console.logger import Logger
class BangumiListItem(TypedDict):
id: int
name: str
cid: CId
episode_id: EpisodeId
avid: AvId
is_section: bool # 是否属于专区
async def get_season_id_by_media_id(session: ClientSession, media_id: MediaId) -> SeasonId:
home_url = "https://www.bilibili.com/bangumi/media/md{media_id}".format(media_id=media_id)
season_id = SeasonId("")
regex_season_id = re.compile(r'"param":{"season_id":(\d+),"season_type":\d+}')
if match_obj := regex_season_id.search(await Fetcher.fetch_text(session, home_url)):
season_id = match_obj.group(1)
return SeasonId(str(season_id))
async def get_season_id_by_episode_id(session: ClientSession, episode_id: EpisodeId) -> SeasonId:
home_url = "https://www.bilibili.com/bangumi/play/ep{episode_id}".format(episode_id=episode_id)
season_id = SeasonId
regex_season_id = re.compile(r'"id":\d+,"ssId":(\d+)')
if match_obj := regex_season_id.search(await Fetcher.fetch_text(session, home_url)):
season_id = match_obj.group(1)
return SeasonId(str(season_id))
async def get_bangumi_title(session: ClientSession, season_id: SeasonId) -> str:
play_url = "https://www.bilibili.com/bangumi/play/ss{season_id}".format(season_id=season_id)
regex_title = re.compile(r'<a href=".+" target="_blank" title="(.*?)" class="media-title">(?P<title>.*?)</a>')
if match_obj := regex_title.search(await Fetcher.fetch_text(session, play_url)):
title = match_obj.group("title")
else:
title = "呐,我也不知道是什么标题呢~"
return title
async def get_bangumi_list(session: ClientSession, season_id: SeasonId) -> list[BangumiListItem]:
list_api = "http://api.bilibili.com/pgc/view/web/season?season_id={season_id}"
resp_json = await Fetcher.fetch_json(session, list_api.format(season_id=season_id))
result = resp_json["result"]
section_episodes = []
for section in result.get("section", []):
section_episodes += section["episodes"]
return [
{
"id": i + 1,
"name": " ".join(
[
"{}".format(item["title"]) if re.match(r"^\d*\.?\d*$", item["title"]) else item["title"],
item["long_title"],
]
),
"cid": CId(str(item["cid"])),
"episode_id": EpisodeId(str(item["id"])),
"avid": BvId(item["bvid"]),
"is_section": i >= len(result["episodes"]),
}
for i, item in enumerate(result["episodes"] + section_episodes)
]
async def get_bangumi_playurl(
session: ClientSession, avid: AvId, episode_id: EpisodeId, cid: CId
) -> tuple[list[VideoUrlMeta], list[AudioUrlMeta]]:
play_api = "https://api.bilibili.com/pgc/player/web/playurl?avid={aid}&bvid={bvid}&ep_id={episode_id}&cid={cid}&qn=125&fnver=0&fnval=16&fourk=1"
codecid_map: dict[Literal[7, 12], VideoCodec] = {7: "avc", 12: "hevc"}
async with session.get(play_api.format(**avid.to_dict(), cid=cid, episode_id=episode_id)) as resp:
if not resp.ok:
raise NoAccessError("无法下载该视频cid: {cid}".format(cid=cid))
resp_json = await resp.json()
if resp_json["result"].get("dash") is None:
raise UnSupportedTypeError("该视频cid: {cid})尚不支持 DASH 格式".format(cid=cid))
if resp_json["result"]["is_preview"] == 1:
Logger.warning("视频cid: {cid})是预览视频".format(cid=cid))
return (
[
{
"url": video["base_url"],
"mirrors": video["backup_url"],
"codec": codecid_map[video["codecid"]],
"width": video["width"],
"height": video["height"],
"quality": video["id"],
}
for video in resp_json["result"]["dash"]["video"]
],
[
{
"url": audio["base_url"],
"mirrors": audio["backup_url"],
"codec": "mp4a",
"width": 0,
"height": 0,
"quality": audio["id"],
}
for audio in resp_json["result"]["dash"]["audio"]
],
)
async def get_bangumi_subtitile(session: ClientSession, avid: AvId, cid: CId) -> list[dict[str, str]]:
subtitile_api = "https://api.bilibili.com/x/player/v2?cid={cid}&aid={aid}&bvid={bvid}"
subtitile_url = subtitile_api.format(**avid.to_dict(), cid=cid)
subtitles_info = (await Fetcher.fetch_json(session, subtitile_url))["data"]["subtitle"]
return [
# fmt: off
{
"lang": sub_info["lan_doc"],
"lines": (await Fetcher.fetch_json(session, "https:" + sub_info["subtitle_url"]))["body"]
}
for sub_info in subtitles_info["subtitles"]
# fmt: on
]

59
yutto/api/info.py Normal file
View File

@ -0,0 +1,59 @@
from aiohttp import ClientSession
from yutto.processor.urlparser import regexp_bangumi_ep
from yutto.utils.fetcher import Fetcher
from yutto.media.codec import VideoCodec, AudioCodec
from typing import TypedDict
from yutto.api.types import (
AId,
AvId,
BvId,
CId,
EpisodeId,
HttpStatusError,
NoAccessError,
UnSupportedTypeError,
VideoUrlMeta,
AudioUrlMeta,
)
class VideoInfo(TypedDict):
avid: AvId
aid: AId
bvid: BvId
episode_id: EpisodeId
is_bangumi: bool
cid: CId
picture: str
title: str
async def get_video_info(session: ClientSession, avid: AvId) -> VideoInfo:
info_api = "http://api.bilibili.com/x/web-interface/view?aid={aid}&bvid={bvid}"
res_json = await Fetcher.fetch_json(session, info_api.format(**avid.to_dict()))
res_json_data = res_json.get("data")
assert res_json_data is not None, "响应数据无 data 域"
episode_id = EpisodeId("")
if res_json_data.get("redirect_url") and (ep_match := regexp_bangumi_ep.match(res_json_data["redirect_url"])):
episode_id = EpisodeId(ep_match.group("episode_id"))
return {
"avid": BvId(res_json_data["bvid"]),
"aid": AId(str(res_json_data["aid"])),
"bvid": BvId(res_json_data["bvid"]),
"episode_id": episode_id,
"is_bangumi": bool(episode_id),
"cid": CId(str(res_json_data["cid"])),
"picture": res_json_data["pic"],
"title": res_json_data["title"],
}
async def is_vip(session: ClientSession) -> bool:
info_api = "https://api.bilibili.com/x/web-interface/nav"
res_json = await Fetcher.fetch_json(session, info_api)
res_json_data = res_json.get("data")
if res_json_data.get("vipStatus") == 1:
return True
return False

View File

@ -1,4 +1,6 @@
from typing import NamedTuple
from typing import NamedTuple, TypedDict
from yutto.media.codec import VideoCodec, AudioCodec
from yutto.media.quality import VideoQuality, AudioQuality
class BilibiliId(NamedTuple):
@ -10,6 +12,9 @@ class BilibiliId(NamedTuple):
def __repr__(self) -> str:
return self.__str__()
def __eq__(self, other: "BilibiliId") -> bool:
return self.value == other.value
class AvId(BilibiliId):
def to_dict(self) -> dict[str, str]:
@ -44,6 +49,41 @@ class MediaId(BilibiliId):
return {"media_id": self.value}
class SeasonId(BilibiliId):
def to_dict(self):
return {"season_id": self.value}
class HttpStatusError(Exception):
pass
class NoAccessError(Exception):
pass
class UnSupportedTypeError(Exception):
pass
class VideoUrlMeta(TypedDict):
url: str
mirrors: list[str]
codec: VideoCodec
width: int
height: int
quality: VideoQuality
class AudioUrlMeta(TypedDict):
url: str
mirrors: list[str]
codec: AudioCodec
width: int
height: int
quality: AudioQuality
if __name__ == "__main__":
aid = AId("add")
cid = CId("xxx")

0
yutto/cli/__init__.py Normal file
View File

View File

@ -0,0 +1,87 @@
import argparse
import os
import sys
import aiohttp
import asyncio
from yutto.utils.console.colorful import set_no_color
from yutto.utils.console.logger import set_logger_debug, Logger, Badge
from yutto.media.codec import video_codec_priority_default, audio_codec_priority_default
from yutto.utils.ffmpeg import FFmpeg
from yutto.processor.crawler import gen_cookies, gen_headers
from yutto.api.info import is_vip
def check_basic_options(args: argparse.Namespace):
""" 检查 argparse 无法检查的选项,并设置某些全局变量 """
ffmpeg = FFmpeg()
# 在使用 --no-color 或者环境变量 NO_COLOR 非空时都应该不显示颜色
# Also see: https://no-color.org/
if args.no_color or os.environ.get("NO_COLOR"):
set_no_color()
# debug 设置
if args.debug:
set_logger_debug()
# vcodec 检查
vcodec_splited = args.vcodec.split(":")
if len(vcodec_splited) != 2:
Logger.error("vcodec 参数值({})不满足要求(并非使用 : 分隔的值)".format(args.vcodec))
sys.exit(1)
video_download_codec, video_save_codec = vcodec_splited
if video_download_codec not in video_codec_priority_default:
Logger.error(
"download_vcodec 参数值({})不满足要求(允许值:{{{}}}".format(
video_download_codec, ", ".join(video_codec_priority_default)
)
)
sys.exit(1)
if video_save_codec not in ffmpeg.video_encodecs + ["copy"]:
Logger.error(
"save_vcodec 参数值({})不满足要求(允许值:{{{}}}".format(video_save_codec, ", ".join(ffmpeg.video_encodecs + ["copy"]))
)
sys.exit(1)
# acodec 检查
acodec_splited = args.acodec.split(":")
if len(acodec_splited) != 2:
Logger.error("acodec 参数值({})不满足要求(并非使用 : 分隔的值)".format(args.acodec))
sys.exit(1)
audio_download_codec, audio_save_codec = acodec_splited
if audio_download_codec not in audio_codec_priority_default:
Logger.error(
"download_acodec 参数值({})不满足要求(允许值:{{{}}}".format(
audio_download_codec, ", ".join(audio_codec_priority_default)
)
)
sys.exit(1)
if audio_save_codec not in ffmpeg.audio_encodecs + ["copy"]:
Logger.error(
"save_acodec 参数值({})不满足要求(允许值:{{{}}}".format(audio_save_codec, ", ".join(ffmpeg.audio_encodecs + ["copy"]))
)
sys.exit(1)
# only_video 和 only_audio 不能同时设置
if not args.require_video and not args.require_audio:
Logger.error("only_video 和 only_audio 不能同时设置")
sys.exit(1)
# TODO: proxy 检验
# 大会员身份校验
if not args.sessdata:
Logger.warning("未提供 SESSDATA无法下载会员专属剧集")
elif asyncio.run(check_is_vip(args.sessdata)):
Logger.custom("成功以大会员身份登录~", badge=Badge("大会员", fore="white", back="magenta"))
else:
Logger.warning("以非大会员身份登录,无法下载会员专属剧集")
async def check_is_vip(sessdata: str = "") -> bool:
async with aiohttp.ClientSession(
headers=gen_headers(), cookies=gen_cookies(sessdata), timeout=aiohttp.ClientTimeout(total=5)
) as session:
return await is_vip(session)

63
yutto/cli/get.py Normal file
View File

@ -0,0 +1,63 @@
import argparse
import aiohttp
from yutto.processor.crawler import gen_cookies, gen_headers
from yutto.utils.functiontools.sync import sync
import sys
from yutto.processor.downloader import download_video
from yutto.api.bangumi import get_bangumi_playurl, get_bangumi_title, get_season_id_by_episode_id, get_bangumi_list
from yutto.api.acg_video import get_acg_video_title, get_acg_video_playurl
from yutto.api.types import AvId, AId, BvId, EpisodeId, MediaId, SeasonId, CId
from yutto.processor.urlparser import regexp_bangumi_ep
from yutto.utils.console.logger import Logger
from yutto.utils.console.formatter import repair_filename
def add_get_arguments(parser: argparse.ArgumentParser):
parser.add_argument("url", help="视频主页 url")
parser.set_defaults(action=run)
@sync
async def run(args: argparse.Namespace):
async with aiohttp.ClientSession(
headers=gen_headers(), cookies=gen_cookies(args.sessdata), timeout=aiohttp.ClientTimeout(total=5)
) as session:
if match_obj := regexp_bangumi_ep.match(args.url):
episode_id = EpisodeId(match_obj.group("episode_id"))
season_id = await get_season_id_by_episode_id(session, episode_id)
bangumi_list = await get_bangumi_list(session, season_id)
for bangumi_item in bangumi_list:
if bangumi_item["episode_id"] == episode_id:
avid = bangumi_item["avid"]
cid = bangumi_item["cid"]
filename = bangumi_item["name"]
break
else:
Logger.error("在列表中未找到该剧集")
sys.exit(1)
videos, audios = await get_bangumi_playurl(session, avid, episode_id, cid)
# title = await get_bangumi_title(session, season_id)
else:
Logger.error("url 不正确~")
sys.exit(1)
await download_video(
session,
videos,
audios,
args.dir,
repair_filename(filename),
{
"require_video": args.require_video,
"video_quality": args.video_quality,
"video_download_codec": args.vcodec.split(":")[0],
"video_save_codec": args.vcodec.split(":")[1],
"require_audio": args.require_audio,
"audio_quality": args.audio_quality,
"audio_download_codec": args.acodec.split(":")[0],
"audio_save_codec": args.acodec.split(":")[1],
"overwrite": args.overwrite,
"block_size": int(args.block_size * 1024 * 1024),
"num_workers": args.num_workers,
},
)

0
yutto/cli/info.py Normal file
View File

View File

@ -3,6 +3,9 @@ from typing import Any, Literal
VideoCodec = Literal["hevc", "avc"]
AudioCodec = Literal["mp4a"]
video_codec_priority_default: list[VideoCodec] = ["hevc", "avc"]
audio_codec_priority_default: list[AudioCodec] = ["mp4a"]
def gen_vcodec_priority(video_codec: VideoCodec) -> list[VideoCodec]:
""" 生成视频编码优先级序列 """
@ -11,6 +14,6 @@ def gen_vcodec_priority(video_codec: VideoCodec) -> list[VideoCodec]:
def gen_acodec_priority(audio_codec: AudioCodec) -> list[AudioCodec]:
""" 生成频编码优先级序列 """
""" 生成频编码优先级序列 """
return ["mp4a"]

View File

@ -59,21 +59,6 @@ video_quality_map = {
"width": 720,
"height": 360,
},
6: {
"description": "240P 极速",
"width": 320,
"height": 240,
},
208: {
"description": "1080P 高清",
"width": 1920,
"height": 1080,
},
192: {
"description": "720P 高清",
"width": 1280,
"height": 720,
},
}
audio_quality_map = {

View File

View File

@ -0,0 +1,14 @@
from urllib.parse import quote, unquote
def gen_headers():
return {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.85 Safari/537.36",
"Referer": "https://www.bilibili.com",
}
def gen_cookies(sessdata: str):
# 先解码后编码是防止获取到的 SESSDATA 是已经解码后的(包含「,」)
# 而番剧无法使用解码后的 SESSDATA
return {"SESSDATA": quote(unquote(sessdata))}

View File

@ -0,0 +1,159 @@
import asyncio
import os
import time
from typing import Any, Optional
import aiohttp
from aiofiles import os as aioos
from yutto.api.types import AudioUrlMeta, VideoUrlMeta
from yutto.processor.filter import filter_none_value, select_audio, select_video
from yutto.utils.asynclib import CoroutineTask, parallel_with_limit
from yutto.utils.console.logger import Logger
from yutto.utils.fetcher import Fetcher
from yutto.utils.ffmpeg import FFmpeg
from yutto.utils.file_buffer import AsyncFileBuffer
from yutto.processor.progressor import show_progress
def slice(start: int, total_size: Optional[int], block_size: Optional[int] = None) -> list[tuple[int, Optional[int]]]:
"""生成分块后的 (start, size) 序列
Args:
start (int): 总起始位置
total_size (Optional[int]): 需要分块的总大小
block_size (Optional[int], optional): 每块的大小. Defaults to None.
Returns:
list[tuple[int, Optional[int]]]: 分块大小序列使用元组组织格式为 (start, size)
"""
if total_size is None:
return [(0, None)]
if block_size is None:
return [(0, total_size - 1)]
assert start <= total_size, "起始地址({})大于总地址({}".format(start, total_size)
offset_list: list[tuple[int, Optional[int]]] = [(i, block_size) for i in range(start, total_size, block_size)]
if (total_size - start) % block_size != 0:
offset_list[-1] = (
start + (total_size - start) // block_size * block_size,
total_size - start - (total_size - start) // block_size * block_size,
)
return offset_list
def combine(*l_list: list[Any]) -> list[Any]:
results: list[Any] = []
for i in range(max([len(l) for l in l_list])):
for l in l_list:
if i < len(l):
results.append(l[i])
return results
async def download_video(
session: aiohttp.ClientSession,
videos: list[VideoUrlMeta],
audios: list[AudioUrlMeta],
output_dir: str,
file_name: str,
# TODO: options 使用 TypedDict
options: Any,
):
video_path = os.path.join(output_dir, file_name + "_video.m4s")
audio_path = os.path.join(output_dir, file_name + "_audio.m4s")
output_path = os.path.join(output_dir, file_name + "{output_format}")
ffmpeg = FFmpeg()
# TODO: 显示全部 Videos、Audios 信息
video = select_video(videos, options["require_video"], options["video_quality"], options["video_download_codec"])
audio = select_audio(audios, options["require_audio"], options["audio_quality"], options["audio_download_codec"])
# TODO: 显示被选中的 Video、Audio 信息
# idx_video = -1
# if video is not None:
# idx_video = videos.index(video)
# Logger.info(f"视频 {file_name} 共包含以下 {len(videos)} 个视频流:")
# videos_log = [
# "{:02} [{:>4}] [{:>4}x{:>4}] <{:>10}>".format(
# i,
# video["codec"].upper(),
# video["width"],
# video["height"],
# video_quality_map[video["quality"]]["description"],
# )
# for i, video in enumerate(videos)
# ]
# for video_log in videos_log:
# Logger.info(video_log)
if video is None and audio is None:
return
buffers: list[Optional[AsyncFileBuffer]] = [None, None]
sizes: list[Optional[int]] = [None, None]
task_funcs: list[list[CoroutineTask]] = []
if video is not None:
vbuf = await AsyncFileBuffer.create(video_path, overwrite=options["overwrite"])
vsize = await Fetcher.get_size(session, video["url"])
vtask_funcs = [
Fetcher.download_file_with_offset(session, video["url"], video["mirrors"], vbuf, offset, block_size)
for offset, block_size in slice(vbuf.written_size, vsize, options["block_size"])
]
task_funcs.append(vtask_funcs)
buffers[0], sizes[0] = vbuf, vsize
if audio is not None:
abuf = await AsyncFileBuffer.create(audio_path, overwrite=options["overwrite"])
asize = await Fetcher.get_size(session, audio["url"])
atask_funcs = [
Fetcher.download_file_with_offset(session, audio["url"], audio["mirrors"], abuf, offset, block_size)
for offset, block_size in slice(abuf.written_size, asize, options["block_size"])
]
task_funcs.append(atask_funcs)
buffers[1], sizes[1] = abuf, asize
tasks = parallel_with_limit(combine(*task_funcs), num_workers=options["num_workers"])
tasks.append(asyncio.create_task(show_progress(filter_none_value(buffers), sum(filter_none_value(sizes)))))
Logger.info(f"开始下载 {file_name}……")
for task in tasks:
await task
Logger.info("下载完成!")
if video is not None:
await vbuf.close()
if audio is not None:
await abuf.close()
# TODO: 将 merge 分离出去?
Logger.info(f"开始合并 {file_name}……")
# fmt: off
args: list[str] = []
if video is not None:
args.extend([
"-i", video_path,
])
if audio is not None:
args.extend([
"-i", audio_path,
])
if video is not None:
args.extend([
"-vcodec", options["video_save_codec"],
])
if audio is not None:
args.extend([
"-acodec", options["audio_save_codec"],
])
args.extend(["-y"])
output_format = ".mp4" if video is not None else ".aac"
args.append(output_path.format(output_format=output_format))
Logger.debug("FFmpeg > ffmpeg {}".format(" ".join(args)))
ffmpeg.exec(args)
# fmt: on
Logger.info("合并完成!")
if video is not None:
await aioos.remove(video_path)
if audio is not None:
await aioos.remove(audio_path)

View File

@ -1,18 +1,19 @@
from typing import Optional
from typing import Optional, TypeVar
from yutto.api.acg_video import AudioUrlMeta, VideoUrlMeta
from yutto.media.codec import (AudioCodec, VideoCodec, gen_acodec_priority,
gen_vcodec_priority)
from yutto.media.quality import (AudioQuality, VideoQuality,
gen_audio_quality_priority,
gen_video_quality_priority)
from yutto.media.codec import AudioCodec, VideoCodec, gen_acodec_priority, gen_vcodec_priority
from yutto.media.quality import AudioQuality, VideoQuality, gen_audio_quality_priority, gen_video_quality_priority
def select_video(
videos: list[VideoUrlMeta],
require_video: bool = True,
video_quality: VideoQuality = 125,
video_codec: VideoCodec = "hevc"
video_codec: VideoCodec = "hevc",
) -> Optional[VideoUrlMeta]:
if not require_video:
return None
video_quality_priority = gen_video_quality_priority(video_quality)
video_codec_priority = gen_vcodec_priority(video_codec)
@ -22,6 +23,7 @@ def select_video(
for vqn in video_quality_priority
for vcodec in video_codec_priority
]
# fmt: on
for vqn, vcodec in video_combined_priority:
for video in videos:
@ -29,11 +31,16 @@ def select_video(
return video
return None
def select_audio(
audios: list[AudioUrlMeta],
require_audio: bool = True,
audio_quality: AudioQuality = 30280,
audio_codec: AudioCodec = "mp4a",
) -> Optional[AudioUrlMeta]:
if not require_audio:
return None
audio_quality_priority = gen_audio_quality_priority(audio_quality)
audio_codec_priority = gen_acodec_priority(audio_codec)
@ -43,9 +50,23 @@ def select_audio(
for aqn in audio_quality_priority
for acodec in audio_codec_priority
]
# fmt: on
for aqn, acodec in audio_combined_priority:
for audio in audios:
if audio["quality"] == aqn and audio["codec"] == acodec:
return audio
return None
T = TypeVar("T")
def filter_none_value(l: list[Optional[T]]) -> list[T]:
result: list[T] = []
for item in l:
if item is not None:
result.append(item)
return result
# ? 不清楚直接这么写为什么类型不匹配
# return list(filter(lambda x: x is not None, l))

View File

@ -0,0 +1,34 @@
import asyncio
import time
from yutto.utils.console.formatter import size_format
from yutto.utils.file_buffer import AsyncFileBuffer
async def show_progress(file_buffers: list[AsyncFileBuffer], total_size: int):
file_buffers = list(filter(lambda x: x is not None, file_buffers))
t = time.time()
size = sum([file_buffer.written_size for file_buffer in file_buffers])
while True:
size_in_buffer: int = sum(
[sum([len(chunk.data) for chunk in file_buffer.buffer]) for file_buffer in file_buffers]
)
size_written: int = sum([file_buffer.written_size for file_buffer in file_buffers])
t_now = time.time()
size_now = size_written + size_in_buffer
speed = (size_now - size) / (t_now - t + 10 ** -6)
print(
"{} {}({} 块) {} {}/s".format(
size_format(size_written),
size_format(size_in_buffer),
sum([len(file_buffer.buffer) for file_buffer in file_buffers]),
size_format(total_size),
size_format(speed),
),
)
t, size = t_now, size_now
await asyncio.sleep(0.5)
if total_size == size:
break

View File

@ -1,14 +1,15 @@
import asyncio
from typing import Any, Coroutine, Iterable
from yutto.utils.logger import logger
from yutto.utils.console.logger import Logger
try:
import uvloop
except ImportError:
logger.warning("no install uvloop package")
Logger.warning("no install uvloop package")
else:
uvloop.install()
# uvloop.install()
pass
CoroutineTask = Coroutine[Any, Any, Any]
@ -63,3 +64,23 @@ def run_with_n_workers(tasks: Iterable[CoroutineTask], num_workers: int = 4):
pool = LimitParallelsPool(num_workers=4)
pool.add_list(tasks)
asyncio.run(pool.run())
def parallel(funcs: Iterable[CoroutineTask]):
return [asyncio.create_task(func) for func in funcs]
def parallel_with_limit(funcs: Iterable[CoroutineTask], num_workers: int = 4):
tasks = asyncio.Queue[CoroutineTask]()
for func in funcs:
tasks.put_nowait(func)
async def worker():
while True:
if not tasks.empty():
task = await tasks.get()
await task
else:
break
return [asyncio.create_task(worker()) for _ in range(num_workers)]

View File

View File

@ -0,0 +1,68 @@
from typing import Literal, Optional, TypedDict
Fore = Literal["black", "red", "green", "yellow", "blue", "magenta", "cyan", "white"]
Back = Literal["black", "red", "green", "yellow", "blue", "magenta", "cyan", "white"]
Style = Literal["reset", "bold", "italic", "underline", "defaultfg", "defaultbg"]
_no_color = False
class CodeMap(TypedDict):
fore: dict[Fore, int]
back: dict[Back, int]
style: dict[Style, int]
code_map: CodeMap = {
"fore": {
"black": 30,
"red": 31,
"green": 32,
"yellow": 33,
"blue": 34,
"magenta": 35,
"cyan": 36,
"white": 37,
},
"back": {
"black": 40,
"red": 41,
"green": 42,
"yellow": 43,
"blue": 44,
"magenta": 45,
"cyan": 46,
"white": 47,
},
"style": {
"reset": 0,
"bold": 1,
"italic": 3,
"underline": 4,
"defaultfg": 39,
"defaultbg": 49,
},
}
def colored_string(
string: str, fore: Optional[Fore] = None, back: Optional[Back] = None, style: Optional[Style] = None
) -> str:
if _no_color:
return string
template = "\033[{code}m"
result = ""
if fore is not None:
result += template.format(code=code_map["fore"][fore])
if back is not None:
result += template.format(code=code_map["back"][back])
if style is not None:
result += template.format(code=code_map["style"][style])
result += string
result += template.format(code=code_map["style"]["reset"])
return result
def set_no_color():
global _no_color
_no_color = True

View File

@ -0,0 +1,96 @@
import re
from typing import Literal
from urllib.parse import unquote
_count: int = 0
def size_format(size: float, ndigits: int = 2, baseUnitSize: Literal[1024, 1000] = 1024) -> str:
""" 输入数据字节数,与保留小数位数,返回数据量字符串 """
sign = "-" if size < 0 else ""
size = abs(size)
unit_list = (
["Bytes", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB", "BiB"]
if baseUnitSize == 1024
else ["Bytes", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB", "BB"]
)
index = 0
while index < len(unit_list) - 1:
if size >= baseUnitSize ** (index + 1):
index += 1
else:
break
return "{}{:.{}f} {}".format(sign, size / baseUnitSize ** index, ndigits, unit_list[index])
def get_char_width(char: str) -> int:
""" 计算单个字符的宽度 """
# fmt: off
widths = [
(126, 1), (159, 0), (687, 1), (710, 0), (711, 1),
(727, 0), (733, 1), (879, 0), (1154, 1), (1161, 0),
(4347, 1), (4447, 2), (7467, 1), (7521, 0), (8369, 1),
(8426, 0), (9000, 1), (9002, 2), (11021, 1), (12350, 2),
(12351, 1), (12438, 2), (12442, 0), (19893, 2), (19967, 1),
(55203, 2), (63743, 1), (64106, 2), (65039, 1), (65059, 0),
(65131, 2), (65279, 1), (65376, 2), (65500, 1), (65510, 2),
(120831, 1), (262141, 2), (1114109, 1),
]
# fmt: on
o = ord(char)
if o == 0xE or o == 0xF:
return 0
for num, wid in widths:
if o <= num:
return wid
return 1
def get_string_width(string: str) -> int:
""" 计算包含中文的字符串宽度 """
# 去除颜色码
string = no_color_string(string)
try:
length = sum([get_char_width(c) for c in string])
except:
length = len(string)
return length
def no_color_string(string: str) -> str:
""" 去除字符串中的颜色码 """
regex_color = re.compile(r"\033\[\d+m")
string = regex_color.sub("", string)
return string
def repair_filename(filename: str) -> str:
""" 修复不合法的文件名 """
def to_full_width_chr(matchobj: "re.Match[str]") -> str:
char = matchobj.group(0)
full_width_char = chr(ord(char) + ord("") - ord("?"))
return full_width_char
# 路径非法字符,转全角
regex_path = re.compile(r'[\\/:*?"<>|]')
# 空格类字符,转空格
regex_spaces = re.compile(r"\s+")
# 不可打印字符,移除
regex_non_printable = re.compile(
r"[\001\002\003\004\005\006\007\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f"
r"\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a]"
)
# url decode
filename = unquote(filename)
filename = regex_path.sub(to_full_width_chr, filename)
filename = regex_spaces.sub(" ", filename)
filename = regex_non_printable.sub("", filename)
filename = filename.strip()
if not filename:
filename = "未命名文件_{:04}".format(_count)
_count += 1
return filename

View File

@ -0,0 +1,100 @@
from typing import Any, Optional
from yutto.utils.functiontools.singleton import Singleton
from yutto.utils.console.colorful import colored_string, Fore, Back, Style
from yutto.utils.console.formatter import get_string_width
_logger_debug = False
def set_logger_debug():
global _logger_debug
_logger_debug = True
class Badge:
def __init__(
self,
text: str = "CUSTOM",
fore: Optional[Fore] = None,
back: Optional[Back] = None,
style: Optional[Style] = None,
):
self.text: str = text
self.fore: Optional[Fore] = fore
self.back: Optional[Back] = back
self.style: Optional[Style] = style
def __str__(self):
return colored_string(" {} ".format(self.text), fore=self.fore, back=self.back, style=self.style)
def __repr__(self):
return str(self)
def __len__(self):
return get_string_width(str(self))
def __add__(self, other: str) -> str:
return str(self) + other
WARNING_BADGE = Badge("WARN", fore="black", back="yellow")
ERROR_BADGE = Badge("ERROR", fore="white", back="red")
INFO_BADGE = Badge("INFO", fore="black", back="green")
DEBUG_BADGE = Badge("ERROR", fore="black", back="blue")
class Logger(metaclass=Singleton):
@classmethod
def custom(cls, string: Any, badge: Badge, *print_args: Any, **print_kwargs: Any):
prefix = badge + " "
print(prefix + str(string), *print_args, **print_kwargs)
@classmethod
def warning(cls, string: Any, *print_args: Any, **print_kwargs: Any):
Logger.custom(string, WARNING_BADGE, *print_args, **print_kwargs)
@classmethod
def error(cls, string: Any, *print_args: Any, **print_kwargs: Any):
Logger.custom(string, ERROR_BADGE, *print_args, **print_kwargs)
@classmethod
def info(cls, string: Any, *print_args: Any, **print_kwargs: Any):
Logger.custom(string, INFO_BADGE, *print_args, **print_kwargs)
@classmethod
def debug(cls, string: Any, *print_args: Any, **print_kwargs: Any):
if not _logger_debug:
return
Logger.custom(string, DEBUG_BADGE, *print_args, **print_kwargs)
@classmethod
def custom_multiline(cls, string: Any, badge: Badge, *print_args: Any, **print_kwargs: Any):
prefix = badge + " "
lines = string.split("\n")
multiline_string = prefix + "\n".join(
[((" " * get_string_width(prefix)) if i != 0 else "") + line for i, line in enumerate(lines)]
)
print(multiline_string, *print_args, **print_kwargs)
@classmethod
def warning_multiline(cls, string: Any, *print_args: Any, **print_kwargs: Any):
Logger.custom_multiline(string, WARNING_BADGE, *print_args, **print_kwargs)
@classmethod
def error_multiline(cls, string: Any, *print_args: Any, **print_kwargs: Any):
Logger.custom_multiline(string, ERROR_BADGE, *print_args, **print_kwargs)
@classmethod
def info_multiline(cls, string: Any, *print_args: Any, **print_kwargs: Any):
Logger.custom_multiline(string, INFO_BADGE, *print_args, **print_kwargs)
@classmethod
def debug_multiline(cls, string: Any, *print_args: Any, **print_kwargs: Any):
if not _logger_debug:
return
Logger.custom_multiline(string, INFO_BADGE, *print_args, **print_kwargs)
@classmethod
def print(cls, string: Any, *print_args: Any, **print_kwargs: Any):
print(string, *print_args, **print_kwargs)

View File

@ -1,9 +1,12 @@
import asyncio
import aiohttp
import random
from typing import Any, Optional
from aiohttp import ClientSession
from yutto.utils.logger import logger
from yutto.utils.file_buffer import AsyncFileBuffer
from yutto.utils.console.logger import Logger
class MaxRetryError(Exception):
@ -19,20 +22,20 @@ class Fetcher:
async with session.get(url) as resp:
return await resp.text()
except asyncio.TimeoutError as e:
logger.warning("url: {url} 抓取超时".format(url=url))
Logger.warning("url: {url} 抓取超时".format(url=url))
finally:
retry -= 1
raise MaxRetryError()
@classmethod
async def fetch_json(cls, session: ClientSession, url: str, max_retry: int = 2) -> dict[str, Any]:
async def fetch_json(cls, session: ClientSession, url: str, max_retry: int = 2) -> Any:
retry = max_retry + 1
while retry:
try:
async with session.get(url) as resp:
return await resp.json()
except asyncio.TimeoutError as e:
logger.warning("url: {url} 抓取超时".format(url=url))
Logger.warning("url: {url} 抓取超时".format(url=url))
finally:
retry -= 1
raise MaxRetryError()
@ -42,7 +45,53 @@ class Fetcher:
headers = session.headers.copy()
headers["Range"] = "bytes=0-1"
async with session.get(url, headers=headers) as resp:
if resp.headers.get("Content-Length"):
if resp.status == 206:
return int(resp.headers["Content-Range"].split("/")[-1])
else:
return None
@classmethod
async def download_file_with_offset(
cls,
session: ClientSession,
url: str,
mirrors: list[str],
file_buffer: AsyncFileBuffer,
offset: int,
size: Optional[int],
stream: bool = True,
) -> None:
done = False
headers = session.headers.copy()
url_pool = [url] + mirrors
block_offset = 0
while not done:
try:
url = random.choice(url_pool)
headers["Range"] = "bytes={}-{}".format(
offset + block_offset, offset + size - 1 if size is not None else ""
)
async with session.get(
url, headers=headers, timeout=aiohttp.ClientTimeout(connect=5, sock_read=10)
) as resp:
if stream:
while True:
# 如果直接用 1KiB 的话,会产生大量的块,消耗大量的 CPU 资源,
# 反而使得协程的优势不明显
# 而使用 1MiB 以上或者不使用流式下载方式时,由于分块太大,
# 导致进度条显示的实时速度并不准,波动太大,用户体验不佳,
# 因此取两者折中
chunk = await resp.content.read(2 ** 15)
if not chunk:
break
await file_buffer.write(chunk, offset + block_offset)
block_offset += len(chunk)
else:
chunk = await resp.read()
await file_buffer.write(chunk, offset + block_offset)
block_offset += len(chunk)
# TODO: 是否需要校验总大小
done = True
except asyncio.TimeoutError as e:
Logger.warning("文件 {} 下载超时,尝试重新连接...".format(file_buffer.file_path))

53
yutto/utils/ffmpeg.py Normal file
View File

@ -0,0 +1,53 @@
import os
import re
import subprocess
from functools import cached_property
from yutto.utils.functiontools.singleton import Singleton
class FFmpegNotFoundError(Exception):
def __init__(self):
super().__init__("请配置正确的 FFmpeg 路径")
class FFmpeg(object, metaclass=Singleton):
def __init__(self, ffmpeg_path: str = "ffmpeg"):
try:
if subprocess.run([ffmpeg_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE).returncode != 1:
raise FFmpegNotFoundError()
except FileNotFoundError:
raise FFmpegNotFoundError()
self.path = os.path.normpath(ffmpeg_path)
def exec(self, args: list[str]):
cmd = [self.path]
cmd.extend(args)
return subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
@cached_property
def version(self) -> str:
output = self.exec(["-version"]).stdout.decode()
if match_obj := re.match(r"ffmpeg version (?P<version>(\S+)) Copyright", output):
return match_obj.group("version")
return "Unknown version"
@cached_property
def video_encodecs(self) -> list[str]:
output = self.exec(["-codecs"]).stdout.decode()
results: list[str] = []
for line in output.split("\n"):
if match_obj := re.match(r"^\s*[D\.]EV[I\.][L\.][S\.] (?P<vcodec>\S+)", line):
results.append(match_obj.group("vcodec"))
return results
@cached_property
def audio_encodecs(self) -> list[str]:
output = self.exec(["-codecs"]).stdout.decode()
results: list[str] = []
for line in output.split("\n"):
if match_obj := re.match(r"^\s*[D\.]EA[I\.][L\.][S\.] (?P<vcodec>\S+)", line):
results.append(match_obj.group("vcodec"))
return results

View File

@ -1,20 +1,23 @@
import bisect
import heapq
import os
from typing import NamedTuple, Optional
from dataclasses import dataclass, field
from typing import Optional
import aiofiles
from aiofiles import os as aioos
from yutto.utils.logger import logger
from yutto.utils.console.logger import Logger
class BufferChunk(NamedTuple):
chunk: Optional[bytes]
@dataclass(order=True)
class BufferChunk:
offset: int
data: bytes = field(compare=False)
class AsyncFileBuffer:
def __init__(self):
self.file_path = ""
self.file_obj: Optional[aiofiles.threadpool.binary.AsyncBufferedIOBase] = None
self.buffer = list[BufferChunk]()
self.written_size = 0
@ -22,33 +25,30 @@ class AsyncFileBuffer:
@classmethod
async def create(cls, file_path: str, overwrite: bool = False):
self = cls()
self.file_path = file_path
if overwrite and os.path.exists(file_path):
await aioos.remove(file_path)
self.written_size = os.path.getsize(file_path) if os.path.exists(file_path) and not overwrite else 0
self.file_obj = await aiofiles.open(file_path, "r+b")
await self._seek(self.written_size)
self.written_size = os.path.getsize(file_path) if not overwrite and os.path.exists(file_path) else 0
self.file_obj = await aiofiles.open(file_path, "ab")
return self
async def write(self, chunk: bytes, offset: int):
buffer_chunk = BufferChunk(chunk, offset)
index = bisect.bisect([offset for (_, offset) in self.buffer], buffer_chunk.offset)
self.buffer.insert(index, buffer_chunk)
buffer_chunk = BufferChunk(offset, chunk)
# 使用堆结构,保证第一个元素始终最小
heapq.heappush(self.buffer, buffer_chunk)
while self.buffer and self.buffer[0].offset <= self.written_size:
assert self.file_obj is not None
ready_to_write_chunk = self.buffer.pop(0)
assert ready_to_write_chunk.chunk is not None
ready_to_write_chunk = heapq.heappop(self.buffer)
if ready_to_write_chunk.offset < self.written_size:
await self._seek(ready_to_write_chunk.offset)
logger.warning("[WARNING] 文件指针回溯!")
await self.file_obj.write(ready_to_write_chunk.chunk)
self.written_size += len(ready_to_write_chunk.chunk)
Logger.error("交叠的块范围 {} < {},舍弃!".format(ready_to_write_chunk.offset, self.written_size))
continue
await self.file_obj.write(ready_to_write_chunk.data)
self.written_size += len(ready_to_write_chunk.data)
async def close(self):
assert self.file_obj is not None, "无法关闭未创建的文件对象"
await self.file_obj.close()
async def _seek(self, offset: int):
assert self.file_obj is not None
await self.file_obj.seek(offset)
self.written_size = offset
if self.buffer:
Logger.error("buffer 尚未清空")
if self.file_obj is not None:
await self.file_obj.close()
else:
Logger.error("未预期的结果:未曾创建文件对象")

View File

View File

@ -0,0 +1,19 @@
# type: ignore
class Singleton(type):
"""单例模式元类
@refs: https://stackoverflow.com/questions/6760685/creating-a-singleton-in-python
# Usage
```
class MyClass(BaseClass, metaclass=Singleton):
pass
```
"""
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]

View File

@ -0,0 +1,23 @@
import asyncio
from typing import Coroutine, Any, Callable, TypeVar
from functools import wraps
T = TypeVar("T")
def sync(async_func: Callable[..., Coroutine[Any, Any, T]]) -> Callable[..., T]:
@wraps(async_func)
def sync_func(*args: Any, **kwargs: Any):
return asyncio.run(async_func(*args, **kwargs))
return sync_func
if __name__ == "__main__":
@sync
async def run(a: int) -> int:
return a
print(run(1))

View File

@ -1,10 +0,0 @@
import logging
import coloredlogs
logger = logging.getLogger()
coloredlogs.install(
level='DEBUG',
fmt='%(asctime)s %(levelname)s %(message)s',
logger=logger,
datefmt='%H:%M:%S'
)