feat: support embed cover to video (#243)

This commit is contained in:
Nyakku Shigure 2024-02-19 23:28:40 +08:00 committed by GitHub
parent 0ebb6291d8
commit bc3f72d662
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 357 additions and 25 deletions

1
.gitignore vendored
View File

@ -130,6 +130,7 @@ dmypy.json
*.ass
*.srt
*.nfo
*.jpg
# test files
*.test.py

View File

@ -432,6 +432,15 @@ cat ~/.yutto_alias | yutto tensura-nikki --batch --alias-file -
- 参数 `--metadata-only`
- 默认值 `False`
#### 不生成视频封面
- 参数 `--no-cover`
- 默认值 `False`
> [!NOTE]
>
> 当前仅支持为包含视频流的视频生成封面。
#### 指定媒体元数据值的格式
当前仅支持 `premiered`

View File

@ -56,6 +56,7 @@ clean:
find . -name "*.nfo" -print0 | xargs -0 rm -f
find . -name "*.pb" -print0 | xargs -0 rm -f
find . -name "*.pyc" -print0 | xargs -0 rm -f
find . -name "*.jpg" -print0 | xargs -0 rm -f
rm -rf .pytest_cache/
rm -rf .mypy_cache/
find . -maxdepth 3 -type d -empty -print0 | xargs -0 -r rm -r

View File

@ -0,0 +1,160 @@
from __future__ import annotations
# import pytest
from yutto.utils.ffmpeg import FFmpegCommandBuilder
def test_video_input_only():
command_builder = FFmpegCommandBuilder()
command_builder.add_video_input("input.m4s")
command_builder.add_output("output.mp4")
excepted_command = ["-i", "input.m4s", "--", "output.mp4"]
assert command_builder.build() == excepted_command
def test_audio_input_only():
command_builder = FFmpegCommandBuilder()
command_builder.add_audio_input("input.aac")
command_builder.add_output("output.mp4")
excepted_command = ["-i", "input.aac", "--", "output.mp4"]
assert command_builder.build() == excepted_command
def test_merge_video_audio_with_auto_stream_selection():
command_builder = FFmpegCommandBuilder()
command_builder.add_video_input("input.m4s")
command_builder.add_audio_input("input.aac")
command_builder.add_output("output.mp4")
excepted_command = ["-i", "input.m4s", "-i", "input.aac", "--", "output.mp4"]
assert command_builder.build() == excepted_command
def test_merge_video_audio_with_manual_stream_selection_select_all():
command_builder = FFmpegCommandBuilder()
video_input = command_builder.add_video_input("input.m4s")
audio_input = command_builder.add_audio_input("input.aac")
output = command_builder.add_output("output.mp4")
output.use(video_input)
output.use(audio_input)
excepted_command = ["-i", "input.m4s", "-i", "input.aac", "-map", "0", "-map", "1", "--", "output.mp4"]
assert command_builder.build() == excepted_command
def test_merge_video_audio_with_manual_stream_selection_select_video_only():
command_builder = FFmpegCommandBuilder()
video_input = command_builder.add_video_input("input.m4s")
command_builder.add_audio_input("input.aac")
output = command_builder.add_output("output.mp4")
output.use(video_input)
excepted_command = ["-i", "input.m4s", "-i", "input.aac", "-map", "0", "--", "output.mp4"]
assert command_builder.build() == excepted_command
def test_merge_video_audio_with_cover():
command_builder = FFmpegCommandBuilder()
video_input = command_builder.add_video_input("input.m4s")
audio_input = command_builder.add_audio_input("input.aac")
cover_input = command_builder.add_video_input("cover.jpg")
output = command_builder.add_output("output.mp4")
output.use(video_input)
output.use(audio_input)
output.use(cover_input)
output.set_cover(cover_input)
excepted_command = [
"-i",
"input.m4s",
"-i",
"input.aac",
"-i",
"cover.jpg",
"-map",
"0",
"-map",
"1",
"-map",
"2",
"-c:v:1",
"copy",
"-disposition:v:1",
"attached_pic",
"--",
"output.mp4",
]
assert command_builder.build() == excepted_command
def test_merge_video_audio_with_cover_reorder():
command_builder = FFmpegCommandBuilder()
cover_input = command_builder.add_video_input("cover.jpg")
video_input = command_builder.add_video_input("input.m4s")
audio_input = command_builder.add_audio_input("input.aac")
output = command_builder.add_output("output.mp4")
output.use(cover_input)
output.use(audio_input)
output.use(video_input)
output.set_cover(cover_input)
excepted_command = [
"-i",
"cover.jpg",
"-i",
"input.m4s",
"-i",
"input.aac",
"-map",
"0",
"-map",
"2",
"-map",
"1",
"-c:v:0",
"copy",
"-disposition:v:0",
"attached_pic",
"--",
"output.mp4",
]
assert command_builder.build() == excepted_command
def test_merge_video_audio_with_codec():
command_builder = FFmpegCommandBuilder()
command_builder.add_video_input("input.m4s")
command_builder.add_audio_input("input.aac")
output = command_builder.add_output("output.mp4")
output.set_vcodec("hevc")
output.set_acodec("copy")
excepted_command = [
"-i",
"input.m4s",
"-i",
"input.aac",
"-vcodec",
"hevc",
"-acodec",
"copy",
"--",
"output.mp4",
]
assert command_builder.build() == excepted_command
def test_merge_video_audio_with_extra_options():
command_builder = FFmpegCommandBuilder()
command_builder.add_video_input("input.m4s")
command_builder.add_audio_input("input.aac")
output = command_builder.add_output("output.mp4")
output.with_extra_options(["-strict", "unofficial"])
command_builder.with_extra_options(["-threads", "8"])
excepted_command = [
"-i",
"input.m4s",
"-i",
"input.aac",
"-threads",
"8",
"-strict",
"unofficial",
"--",
"output.mp4",
]
assert command_builder.build() == excepted_command

View File

@ -45,8 +45,8 @@ from yutto.validator import (
validate_user_info,
)
DownloadResourceType: TypeAlias = Literal["video", "audio", "subtitle", "metadata", "danmaku"]
DOWNLOAD_RESOURCE_TYPES: list[DownloadResourceType] = ["video", "audio", "subtitle", "metadata", "danmaku"]
DownloadResourceType: TypeAlias = Literal["video", "audio", "subtitle", "metadata", "danmaku", "cover"]
DOWNLOAD_RESOURCE_TYPES: list[DownloadResourceType] = ["video", "audio", "subtitle", "metadata", "danmaku", "cover"]
def main():
@ -181,6 +181,12 @@ def cli() -> argparse.ArgumentParser:
action=create_select_required_action(select=["metadata"], deselect=invert_selection(["metadata"])),
help="仅生成元数据文件",
)
group_common.add_argument(
"--no-cover",
dest="require_cover",
action=create_select_required_action(deselect=["cover"]),
help="不生成封面",
)
group_common.set_defaults(
require_video=True,
@ -188,6 +194,7 @@ def cli() -> argparse.ArgumentParser:
require_subtitle=True,
require_metadata=False,
require_danmaku=True,
require_cover=True,
)
group_common.add_argument("--no-color", action="store_true", help="不使用颜色")
group_common.add_argument("--no-progress", action="store_true", help="不显示进度条")

View File

@ -171,6 +171,7 @@ class EpisodeData(TypedDict):
subtitles: list[MultiLangSubtitle]
metadata: MetaData | None
danmaku: DanmakuData
cover_data: bytes | None
output_dir: str
tmp_dir: str
filename: str

View File

@ -155,7 +155,7 @@ async def get_ugc_video_list(client: AsyncClient, avid: AvId) -> UgcVideoList:
"name": item["part"],
"avid": avid,
"cid": CId(str(item["cid"])),
"metadata": _parse_ugc_video_metadata(video_info, page_info),
"metadata": _parse_ugc_video_metadata(video_info, page_info, is_first_page=i == 0),
}
for i, (item, page_info) in enumerate(zip(res_json["data"], video_info["pages"]))
]
@ -268,12 +268,17 @@ async def get_ugc_video_subtitles(client: AsyncClient, avid: AvId, cid: CId) ->
def _parse_ugc_video_metadata(
video_info: _UgcVideoInfo,
page_info: _UgcVideoPageInfo,
is_first_page: bool = False,
) -> MetaData:
thumb = page_info["first_frame"] if page_info["first_frame"] is not None else video_info["picture"]
# Only the non-first page use the first frame as the thumbnail
if is_first_page:
thumb = video_info["picture"]
return MetaData(
title=page_info["part"],
show_title=page_info["part"],
plot=video_info["description"],
thumb=page_info["first_frame"] if page_info["first_frame"] is not None else video_info["picture"],
thumb=thumb,
premiered=video_info["pubdate"],
dateadded=get_time_stamp_by_now(),
actor=video_info["actor"],

View File

@ -31,6 +31,7 @@ from yutto.processor.path_resolver import (
)
from yutto.utils.console.logger import Logger
from yutto.utils.danmaku import EmptyDanmakuData
from yutto.utils.fetcher import Fetcher
async def extract_bangumi_data(
@ -53,6 +54,7 @@ async def extract_bangumi_data(
subtitles = await get_bangumi_subtitles(client, avid, cid) if args.require_subtitle else []
danmaku = await get_danmaku(client, cid, args.danmaku_format) if args.require_danmaku else EmptyDanmakuData
metadata = bangumi_info["metadata"] if args.require_metadata else None
cover_data = await Fetcher.fetch_bin(client, bangumi_info["metadata"]["thumb"]) if args.require_cover else None
subpath_variables_base: PathTemplateVariableDict = {
"id": id,
"name": name,
@ -71,8 +73,9 @@ async def extract_bangumi_data(
videos=videos,
audios=audios,
subtitles=subtitles,
danmaku=danmaku,
metadata=metadata,
danmaku=danmaku,
cover_data=cover_data,
output_dir=output_dir,
tmp_dir=args.tmp_dir or output_dir,
filename=filename,
@ -103,6 +106,7 @@ async def extract_cheese_data(
subtitles = await get_cheese_subtitles(client, avid, cid) if args.require_subtitle else []
danmaku = await get_danmaku(client, cid, args.danmaku_format) if args.require_danmaku else EmptyDanmakuData
metadata = cheese_info["metadata"] if args.require_metadata else None
cover_data = await Fetcher.fetch_bin(client, cheese_info["metadata"]["thumb"]) if args.require_cover else None
subpath_variables_base: PathTemplateVariableDict = {
"id": id,
"name": name,
@ -121,8 +125,9 @@ async def extract_cheese_data(
videos=videos,
audios=audios,
subtitles=subtitles,
danmaku=danmaku,
metadata=metadata,
danmaku=danmaku,
cover_data=cover_data,
output_dir=output_dir,
tmp_dir=args.tmp_dir or output_dir,
filename=filename,
@ -150,6 +155,9 @@ async def extract_ugc_video_data(
subtitles = await get_ugc_video_subtitles(client, avid, cid) if args.require_subtitle else []
danmaku = await get_danmaku(client, cid, args.danmaku_format) if args.require_danmaku else EmptyDanmakuData
metadata = ugc_video_info["metadata"] if args.require_metadata else None
cover_data = (
await Fetcher.fetch_bin(client, ugc_video_info["metadata"]["thumb"]) if args.require_cover else None
)
owner_uid: str = (
ugc_video_info["metadata"]["actor"][0]["profile"].split("/")[-1]
if ugc_video_info["metadata"]["actor"]
@ -173,8 +181,9 @@ async def extract_ugc_video_data(
videos=videos,
audios=audios,
subtitles=subtitles,
danmaku=danmaku,
metadata=metadata,
danmaku=danmaku,
cover_data=cover_data,
output_dir=output_dir,
tmp_dir=args.tmp_dir or output_dir,
filename=filename,

View File

@ -1,7 +1,6 @@
from __future__ import annotations
import asyncio
import functools
import os
from pathlib import Path
@ -16,7 +15,7 @@ from yutto.utils.console.colorful import colored_string
from yutto.utils.console.logger import Badge, Logger
from yutto.utils.danmaku import write_danmaku
from yutto.utils.fetcher import Fetcher
from yutto.utils.ffmpeg import FFmpeg
from yutto.utils.ffmpeg import FFmpeg, FFmpegCommandBuilder
from yutto.utils.file_buffer import AsyncFileBuffer
from yutto.utils.funcutils import filter_none_value, xmerge
from yutto.utils.metadata import write_metadata
@ -143,12 +142,15 @@ def merge_video_and_audio(
video_path: Path,
audio: AudioUrlMeta | None,
audio_path: Path,
cover_data: bytes | None,
cover_path: Path,
output_path: Path,
options: DownloaderOptions,
):
"""合并音视频"""
ffmpeg = FFmpeg()
command_builder = FFmpegCommandBuilder()
Logger.info("开始合并……")
# Using FFmpeg to Create HEVC Videos That Work on Apple Devices
@ -165,25 +167,35 @@ def merge_video_and_audio(
if audio is not None and audio["codec"] == options["audio_save_codec"]:
options["audio_save_codec"] = "copy"
args_list: list[list[str]] = [
["-i", str(video_path)] if video is not None else [],
["-i", str(audio_path)] if audio is not None else [],
["-vcodec", options["video_save_codec"]] if video is not None else [],
["-acodec", options["audio_save_codec"]] if audio is not None else [],
# see also: https://www.reddit.com/r/ffmpeg/comments/qe7oq1/comment/hi0bmic/?utm_source=share&utm_medium=web2x&context=3
["-strict", "unofficial"],
["-tag:v", vtag] if vtag is not None else [],
["-threads", str(os.cpu_count())],
# Using double dash to make sure that the output file name is not parsed as an option
# if the output file name starts with a dash
["-y", "--", str(output_path)],
]
output = command_builder.add_output(output_path)
if video is not None:
video_input = command_builder.add_video_input(video_path)
output.use(video_input)
output.set_vcodec(options["video_save_codec"])
if vtag is not None:
output.with_extra_options([f"-tag:v:{video_input.stream_id}", vtag])
if audio is not None:
audio_input = command_builder.add_audio_input(audio_path)
output.use(audio_input)
output.set_acodec(options["audio_save_codec"])
if video is not None and cover_data is not None:
cover_input = command_builder.add_video_input(cover_path)
output.use(cover_input)
output.set_cover(cover_input)
result = ffmpeg.exec(functools.reduce(lambda prev, cur: prev + cur, args_list))
# see also: https://www.reddit.com/r/ffmpeg/comments/qe7oq1/comment/hi0bmic/?utm_source=share&utm_medium=web2x&context=3
output.with_extra_options(["-strict", "unofficial"])
command_builder.with_extra_options(["-threads", str(os.cpu_count())])
command_builder.with_extra_options(["-y"])
result = ffmpeg.exec(command_builder.build())
if result.returncode != 0:
Logger.error("合并失败!")
Logger.error(result.stderr.decode())
return
else:
Logger.debug(result.stderr.decode())
Logger.info("合并完成!")
@ -191,6 +203,8 @@ def merge_video_and_audio(
video_path.unlink()
if audio is not None:
audio_path.unlink()
if cover_data is not None:
cover_path.unlink()
async def start_downloader(
@ -205,6 +219,7 @@ async def start_downloader(
subtitles = episode_data["subtitles"]
danmaku = episode_data["danmaku"]
metadata = episode_data["metadata"]
cover_data = episode_data["cover_data"]
output_dir = Path(episode_data["output_dir"])
tmp_dir = Path(episode_data["tmp_dir"])
filename = episode_data["filename"]
@ -216,6 +231,7 @@ async def start_downloader(
tmp_dir.mkdir(parents=True, exist_ok=True)
video_path = tmp_dir.joinpath(filename + "_video.m4s")
audio_path = tmp_dir.joinpath(filename + "_audio.m4s")
cover_path = tmp_dir.joinpath(filename + "_cover.jpg")
video = select_video(
videos, options["video_quality"], options["video_download_codec"], options["video_download_codec_priority"]
@ -292,8 +308,11 @@ async def start_downloader(
video = video if will_download_video else None
audio = audio if will_download_audio else None
if cover_data is not None:
cover_path.write_bytes(cover_data)
# 下载视频 / 音频
await download_video_and_audio(client, video, video_path, audio, audio_path, options)
# 合并视频 / 音频
merge_video_and_audio(video, video_path, audio, audio_path, output_path, options)
merge_video_and_audio(video, video_path, audio, audio_path, cover_data, cover_path, output_path, options)

View File

@ -1,9 +1,11 @@
from __future__ import annotations
import operator
import os
import re
import subprocess
from functools import cached_property
from functools import cached_property, reduce
from pathlib import Path
from yutto.utils.console.logger import Logger
from yutto.utils.funcutils import Singleton
@ -64,3 +66,121 @@ class FFmpeg(metaclass=Singleton):
if match_obj := re.match(r"^\s*A[F\.][S\.][X\.][B\.][D\.] (?P<encoder>\S+)", line):
results.append(match_obj.group("encoder"))
return results
def concat_commands(commands: list[list[str]]) -> list[str]:
return reduce(operator.add, commands, [])
class FFmpegInput:
def __init__(self, path: Path | str, input_id: int, stream_id: int):
self.path = Path(path)
self.input_id = input_id
self.stream_id = stream_id
def build(self) -> list[str]:
return ["-i", str(self.path)]
def __repr__(self):
return f"FFmpegInput({self.path})"
class FFmpegVideoInput(FFmpegInput):
...
class FFmpegAudioInput(FFmpegInput):
...
class FFmpegOutput:
def __init__(self, path: Path | str):
self.path = path
self.used_inputs: list[FFmpegInput] = []
self.vcodec: str | None = None
self.acodec: str | None = None
self.cover_input: FFmpegVideoInput | None = None
self.extra_commands: list[str] = []
def use(self, input: FFmpegInput):
self.used_inputs.append(input)
return self
def set_vcodec(self, codec: str):
self.vcodec = codec
return self
def set_acodec(self, codec: str):
self.acodec = codec
return self
def set_cover(self, cover: FFmpegVideoInput):
self.cover_input = cover
return self
def with_extra_options(self, command: list[str]):
self.extra_commands.extend(command)
return self
def build(self) -> list[str]:
selected_inputs = concat_commands([["-map", str(input.input_id)] for input in self.used_inputs])
vcodec = ["-vcodec", self.vcodec] if self.vcodec else []
acodec = ["-acodec", self.acodec] if self.acodec else []
# Refer to `-disposition` opiton in https://www.ffmpeg.org/ffmpeg.html#toc-Main-options
cover_options = (
[
f"-c:v:{self.cover_input.stream_id}",
"copy",
f"-disposition:v:{self.cover_input.stream_id}",
"attached_pic",
]
if self.cover_input
else []
)
# Using double dash to make sure that the output file name is not parsed as an option
# if the output file name starts with a dash
return selected_inputs + vcodec + acodec + cover_options + self.extra_commands + ["--"] + [str(self.path)]
def __repr__(self):
return f"FFmpegOutput({self.path})"
class FFmpegCommandBuilder:
def __init__(self):
self.num_inputs = 0
self.num_video_stream = 0
self.num_audio_stream = 0
self.inputs: list[FFmpegInput] = []
self.outputs: list[FFmpegOutput] = []
self.extra_commands: list[str] = []
def add_video_input(self, path: Path | str):
input = FFmpegVideoInput(path, self.num_inputs, self.num_video_stream)
self.num_inputs += 1
self.num_video_stream += 1
self.inputs.append(input)
return input
def add_audio_input(self, path: Path | str):
input = FFmpegAudioInput(path, self.num_inputs, self.num_audio_stream)
self.num_inputs += 1
self.num_audio_stream += 1
self.inputs.append(input)
return input
def with_extra_options(self, command: list[str]):
self.extra_commands.extend(command)
return self
def add_output(self, path: Path | str):
output = FFmpegOutput(path)
self.outputs.append(output)
return output
def build(self):
input_commands = concat_commands([input.build() for input in self.inputs])
output_commands = concat_commands([output.build() for output in self.outputs])
return input_commands + self.extra_commands + output_commands
def __repr__(self):
return "FFmpegCommandBuilder()"