♻️ refactor: introduce `ExtractorOptions` for improved argument handling in extractors (#490)

This commit is contained in:
Nyakku Shigure 2025-04-05 22:35:25 +08:00 committed by GitHub
parent 1490c4a7dc
commit 19ff34928d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 125 additions and 92 deletions

View File

@ -12,6 +12,7 @@ from typing import TYPE_CHECKING
import httpx import httpx
from biliass import BlockOptions from biliass import BlockOptions
from yutto._typing import ExtractorOptions
from yutto.cli.cli import cli from yutto.cli.cli import cli
from yutto.exceptions import ErrorCode from yutto.exceptions import ErrorCode
from yutto.extractor import ( from yutto.extractor import (
@ -133,7 +134,23 @@ async def run(ctx: FetcherContext, args_list: list[argparse.Namespace]):
# 提取信息,构造解析任务~ # 提取信息,构造解析任务~
for extractor in extractors: for extractor in extractors:
if extractor.match(url): if extractor.match(url):
download_list = await extractor(ctx, client, args) download_list = await extractor(
ctx,
client,
ExtractorOptions(
episodes=args.episodes,
with_section=args.with_section,
require_video=args.require_video,
require_audio=args.require_audio,
require_danmaku=args.require_danmaku,
require_subtitle=args.require_subtitle,
require_metadata=args.require_metadata,
require_cover=args.require_cover,
require_chapter_info=args.require_chapter_info,
danmaku_format=args.danmaku_format,
subpath_template=args.subpath_template,
),
)
break break
else: else:
if args.batch: if args.batch:

View File

@ -7,7 +7,7 @@ if TYPE_CHECKING:
from yutto.bilibili_typing.codec import AudioCodec, VideoCodec from yutto.bilibili_typing.codec import AudioCodec, VideoCodec
from yutto.bilibili_typing.quality import AudioQuality, VideoQuality from yutto.bilibili_typing.quality import AudioQuality, VideoQuality
from yutto.utils.danmaku import DanmakuData, DanmakuOptions from yutto.utils.danmaku import DanmakuData, DanmakuOptions, DanmakuSaveType
from yutto.utils.metadata import ChapterInfoData, MetaData from yutto.utils.metadata import ChapterInfoData, MetaData
from yutto.utils.subtitle import SubtitleData from yutto.utils.subtitle import SubtitleData
@ -211,6 +211,20 @@ class MultiLangSubtitle(TypedDict):
lines: SubtitleData lines: SubtitleData
class ExtractorOptions(TypedDict):
episodes: str
with_section: bool
require_video: bool
require_audio: bool
require_danmaku: bool
require_subtitle: bool
require_metadata: bool
require_cover: bool
require_chapter_info: bool
danmaku_format: DanmakuSaveType
subpath_template: str
class EpisodeData(TypedDict): class EpisodeData(TypedDict):
"""剧集数据,包含了一个视频资源的基本信息以及相关资源""" """剧集数据,包含了一个视频资源的基本信息以及相关资源"""

View File

@ -4,11 +4,9 @@ from abc import ABCMeta, abstractmethod
from typing import TYPE_CHECKING, TypeVar from typing import TYPE_CHECKING, TypeVar
if TYPE_CHECKING: if TYPE_CHECKING:
import argparse
import httpx import httpx
from yutto._typing import EpisodeData from yutto._typing import EpisodeData, ExtractorOptions
from yutto.utils.asynclib import CoroutineWrapper from yutto.utils.asynclib import CoroutineWrapper
from yutto.utils.fetcher import FetcherContext from yutto.utils.fetcher import FetcherContext
@ -27,32 +25,32 @@ class Extractor(metaclass=ABCMeta):
@abstractmethod @abstractmethod
async def __call__( async def __call__(
self, ctx: FetcherContext, client: httpx.AsyncClient, args: argparse.Namespace self, ctx: FetcherContext, client: httpx.AsyncClient, options: ExtractorOptions
) -> list[CoroutineWrapper[EpisodeData | None] | None]: ) -> list[CoroutineWrapper[EpisodeData | None] | None]:
raise NotImplementedError raise NotImplementedError
class SingleExtractor(Extractor): class SingleExtractor(Extractor):
async def __call__( async def __call__(
self, ctx: FetcherContext, client: httpx.AsyncClient, args: argparse.Namespace self, ctx: FetcherContext, client: httpx.AsyncClient, options: ExtractorOptions
) -> list[CoroutineWrapper[EpisodeData | None] | None]: ) -> list[CoroutineWrapper[EpisodeData | None] | None]:
return [await self.extract(ctx, client, args)] return [await self.extract(ctx, client, options)]
@abstractmethod @abstractmethod
async def extract( async def extract(
self, ctx: FetcherContext, client: httpx.AsyncClient, args: argparse.Namespace self, ctx: FetcherContext, client: httpx.AsyncClient, options: ExtractorOptions
) -> CoroutineWrapper[EpisodeData | None] | None: ) -> CoroutineWrapper[EpisodeData | None] | None:
raise NotImplementedError raise NotImplementedError
class BatchExtractor(Extractor): class BatchExtractor(Extractor):
async def __call__( async def __call__(
self, ctx: FetcherContext, client: httpx.AsyncClient, args: argparse.Namespace self, ctx: FetcherContext, client: httpx.AsyncClient, options: ExtractorOptions
) -> list[CoroutineWrapper[EpisodeData | None] | None]: ) -> list[CoroutineWrapper[EpisodeData | None] | None]:
return await self.extract(ctx, client, args) return await self.extract(ctx, client, options)
@abstractmethod @abstractmethod
async def extract( async def extract(
self, ctx: FetcherContext, client: httpx.AsyncClient, args: argparse.Namespace self, ctx: FetcherContext, client: httpx.AsyncClient, options: ExtractorOptions
) -> list[CoroutineWrapper[EpisodeData | None] | None]: ) -> list[CoroutineWrapper[EpisodeData | None] | None]:
raise NotImplementedError raise NotImplementedError

View File

@ -19,10 +19,9 @@ from yutto.utils.asynclib import CoroutineWrapper
from yutto.utils.console.logger import Badge, Logger from yutto.utils.console.logger import Badge, Logger
if TYPE_CHECKING: if TYPE_CHECKING:
import argparse
import httpx import httpx
from yutto._typing import ExtractorOptions
from yutto.utils.fetcher import FetcherContext from yutto.utils.fetcher import FetcherContext
@ -51,7 +50,7 @@ class BangumiExtractor(SingleExtractor):
return False return False
async def extract( async def extract(
self, ctx: FetcherContext, client: httpx.AsyncClient, args: argparse.Namespace self, ctx: FetcherContext, client: httpx.AsyncClient, options: ExtractorOptions
) -> CoroutineWrapper[EpisodeData | None] | None: ) -> CoroutineWrapper[EpisodeData | None] | None:
season_id = await get_season_id_by_episode_id(ctx, client, self.episode_id) season_id = await get_season_id_by_episode_id(ctx, client, self.episode_id)
bangumi_list = await get_bangumi_list(ctx, client, season_id) bangumi_list = await get_bangumi_list(ctx, client, season_id)
@ -70,7 +69,7 @@ class BangumiExtractor(SingleExtractor):
ctx, ctx,
client, client,
bangumi_list_item, bangumi_list_item,
args, options,
{ {
"title": bangumi_list["title"], "title": bangumi_list["title"],
}, },

View File

@ -16,10 +16,9 @@ from yutto.utils.asynclib import CoroutineWrapper
from yutto.utils.console.logger import Badge, Logger from yutto.utils.console.logger import Badge, Logger
if TYPE_CHECKING: if TYPE_CHECKING:
import argparse
import httpx import httpx
from yutto._typing import ExtractorOptions
from yutto.utils.fetcher import FetcherContext from yutto.utils.fetcher import FetcherContext
@ -73,7 +72,7 @@ class BangumiBatchExtractor(BatchExtractor):
self.season_id = await get_season_id_by_media_id(ctx, client, media_id) self.season_id = await get_season_id_by_media_id(ctx, client, media_id)
async def extract( async def extract(
self, ctx: FetcherContext, client: httpx.AsyncClient, args: argparse.Namespace self, ctx: FetcherContext, client: httpx.AsyncClient, options: ExtractorOptions
) -> list[CoroutineWrapper[EpisodeData | None] | None]: ) -> list[CoroutineWrapper[EpisodeData | None] | None]:
await self._parse_ids(ctx, client) await self._parse_ids(ctx, client)
@ -81,10 +80,10 @@ class BangumiBatchExtractor(BatchExtractor):
Logger.custom(bangumi_list["title"], Badge("番剧", fore="black", back="cyan")) Logger.custom(bangumi_list["title"], Badge("番剧", fore="black", back="cyan"))
# 如果没有 with_section 则不需要专区内容 # 如果没有 with_section 则不需要专区内容
bangumi_list["pages"] = list( bangumi_list["pages"] = list(
filter(lambda item: args.with_section or not item["is_section"], bangumi_list["pages"]) filter(lambda item: options["with_section"] or not item["is_section"], bangumi_list["pages"])
) )
# 选集过滤 # 选集过滤
episodes = parse_episodes_selection(args.episodes, len(bangumi_list["pages"])) episodes = parse_episodes_selection(options["episodes"], len(bangumi_list["pages"]))
bangumi_list["pages"] = list(filter(lambda item: item["id"] in episodes, bangumi_list["pages"])) bangumi_list["pages"] = list(filter(lambda item: item["id"] in episodes, bangumi_list["pages"]))
return [ return [
CoroutineWrapper( CoroutineWrapper(
@ -92,7 +91,7 @@ class BangumiBatchExtractor(BatchExtractor):
ctx, ctx,
client, client,
bangumi_item, bangumi_item,
args, options,
{ {
"title": bangumi_list["title"], "title": bangumi_list["title"],
}, },

View File

@ -19,10 +19,9 @@ from yutto.utils.asynclib import CoroutineWrapper
from yutto.utils.console.logger import Badge, Logger from yutto.utils.console.logger import Badge, Logger
if TYPE_CHECKING: if TYPE_CHECKING:
import argparse
import httpx import httpx
from yutto._typing import ExtractorOptions
from yutto.utils.fetcher import FetcherContext from yutto.utils.fetcher import FetcherContext
@ -52,7 +51,7 @@ class CheeseExtractor(SingleExtractor):
return False return False
async def extract( async def extract(
self, ctx: FetcherContext, client: httpx.AsyncClient, args: argparse.Namespace self, ctx: FetcherContext, client: httpx.AsyncClient, options: ExtractorOptions
) -> CoroutineWrapper[EpisodeData | None] | None: ) -> CoroutineWrapper[EpisodeData | None] | None:
season_id = await get_season_id_by_episode_id(ctx, client, self.episode_id) season_id = await get_season_id_by_episode_id(ctx, client, self.episode_id)
cheese_list = await get_cheese_list(ctx, client, season_id) cheese_list = await get_cheese_list(ctx, client, season_id)
@ -72,7 +71,7 @@ class CheeseExtractor(SingleExtractor):
client, client,
self.episode_id, self.episode_id,
cheese_list_item, cheese_list_item,
args, options,
{ {
"title": cheese_list["title"], "title": cheese_list["title"],
}, },

View File

@ -12,10 +12,9 @@ from yutto.utils.asynclib import CoroutineWrapper
from yutto.utils.console.logger import Badge, Logger from yutto.utils.console.logger import Badge, Logger
if TYPE_CHECKING: if TYPE_CHECKING:
import argparse
import httpx import httpx
from yutto._typing import ExtractorOptions
from yutto.utils.fetcher import FetcherContext from yutto.utils.fetcher import FetcherContext
@ -58,14 +57,14 @@ class CheeseBatchExtractor(BatchExtractor):
self.season_id = SeasonId(self._match_result.group("season_id")) self.season_id = SeasonId(self._match_result.group("season_id"))
async def extract( async def extract(
self, ctx: FetcherContext, client: httpx.AsyncClient, args: argparse.Namespace self, ctx: FetcherContext, client: httpx.AsyncClient, options: ExtractorOptions
) -> list[CoroutineWrapper[EpisodeData | None] | None]: ) -> list[CoroutineWrapper[EpisodeData | None] | None]:
await self._parse_ids(ctx, client) await self._parse_ids(ctx, client)
cheese_list = await get_cheese_list(ctx, client, self.season_id) cheese_list = await get_cheese_list(ctx, client, self.season_id)
Logger.custom(cheese_list["title"], Badge("课程", fore="black", back="cyan")) Logger.custom(cheese_list["title"], Badge("课程", fore="black", back="cyan"))
# 选集过滤 # 选集过滤
episodes = parse_episodes_selection(args.episodes, len(cheese_list["pages"])) episodes = parse_episodes_selection(options["episodes"], len(cheese_list["pages"]))
cheese_list["pages"] = list(filter(lambda item: item["id"] in episodes, cheese_list["pages"])) cheese_list["pages"] = list(filter(lambda item: item["id"] in episodes, cheese_list["pages"]))
return [ return [
CoroutineWrapper( CoroutineWrapper(
@ -74,7 +73,7 @@ class CheeseBatchExtractor(BatchExtractor):
client, client,
cheese_item["episode_id"], cheese_item["episode_id"],
cheese_item, cheese_item,
args, options,
{ {
"title": cheese_list["title"], "title": cheese_list["title"],
}, },

View File

@ -18,10 +18,10 @@ from yutto.utils.fetcher import Fetcher, FetcherContext
from yutto.utils.filter import Filter from yutto.utils.filter import Filter
if TYPE_CHECKING: if TYPE_CHECKING:
import argparse
import httpx import httpx
from yutto._typing import ExtractorOptions
class CollectionExtractor(BatchExtractor): class CollectionExtractor(BatchExtractor):
"""视频合集""" """视频合集"""
@ -53,7 +53,7 @@ class CollectionExtractor(BatchExtractor):
return False return False
async def extract( async def extract(
self, ctx: FetcherContext, client: httpx.AsyncClient, args: argparse.Namespace self, ctx: FetcherContext, client: httpx.AsyncClient, options: ExtractorOptions
) -> list[CoroutineWrapper[EpisodeData | None] | None]: ) -> list[CoroutineWrapper[EpisodeData | None] | None]:
username, collection_details = await asyncio.gather( username, collection_details = await asyncio.gather(
get_user_name(ctx, client, self.mid), get_user_name(ctx, client, self.mid),
@ -65,7 +65,7 @@ class CollectionExtractor(BatchExtractor):
ugc_video_info_list: list[tuple[UgcVideoListItem, str, int]] = [] ugc_video_info_list: list[tuple[UgcVideoListItem, str, int]] = []
# 选集过滤 # 选集过滤
episodes = parse_episodes_selection(args.episodes, len(collection_details["pages"])) episodes = parse_episodes_selection(options["episodes"], len(collection_details["pages"]))
collection_details["pages"] = list(filter(lambda item: item["id"] in episodes, collection_details["pages"])) collection_details["pages"] = list(filter(lambda item: item["id"] in episodes, collection_details["pages"]))
for item in collection_details["pages"]: for item in collection_details["pages"]:
@ -97,7 +97,7 @@ class CollectionExtractor(BatchExtractor):
client, client,
ugc_video_item["avid"], ugc_video_item["avid"],
ugc_video_item, ugc_video_item,
args, options,
{ {
# TODO: 关于对于 id 的优化 # TODO: 关于对于 id 的优化
# TODO: 关于对于 title 的优化(最好使用合集标题,而不是原来的视频标题) # TODO: 关于对于 title 的优化(最好使用合集标题,而不是原来的视频标题)

View File

@ -3,7 +3,7 @@ from __future__ import annotations
from pathlib import Path from pathlib import Path
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from yutto._typing import AvId, EpisodeData, EpisodeId, format_ids from yutto._typing import AvId, EpisodeData, EpisodeId, ExtractorOptions, format_ids
from yutto.api.bangumi import ( from yutto.api.bangumi import (
BangumiListItem, BangumiListItem,
get_bangumi_playurl, get_bangumi_playurl,
@ -34,8 +34,6 @@ from yutto.utils.fetcher import Fetcher, FetcherContext
from yutto.utils.metadata import attach_chapter_info from yutto.utils.metadata import attach_chapter_info
if TYPE_CHECKING: if TYPE_CHECKING:
import argparse
import httpx import httpx
@ -43,7 +41,7 @@ async def extract_bangumi_data(
ctx: FetcherContext, ctx: FetcherContext,
client: httpx.AsyncClient, client: httpx.AsyncClient,
bangumi_info: BangumiListItem, bangumi_info: BangumiListItem,
args: argparse.Namespace, options: ExtractorOptions,
subpath_variables: PathTemplateVariableDict, subpath_variables: PathTemplateVariableDict,
auto_subpath_template: str = "{name}", auto_subpath_template: str = "{name}",
) -> EpisodeData | None: ) -> EpisodeData | None:
@ -55,15 +53,21 @@ async def extract_bangumi_data(
if bangumi_info["is_preview"]: if bangumi_info["is_preview"]:
Logger.warning(f"视频({format_ids(avid, cid)})是预览视频(疑似未登录或非大会员用户)") Logger.warning(f"视频({format_ids(avid, cid)})是预览视频(疑似未登录或非大会员用户)")
videos, audios = ( videos, audios = (
await get_bangumi_playurl(ctx, client, avid, cid) if args.require_video or args.require_audio else ([], []) await get_bangumi_playurl(ctx, client, avid, cid)
if options["require_video"] or options["require_audio"]
else ([], [])
) )
subtitles = await get_bangumi_subtitles(ctx, client, avid, cid) if args.require_subtitle else [] subtitles = await get_bangumi_subtitles(ctx, client, avid, cid) if options["require_subtitle"] else []
danmaku = ( danmaku = (
await get_danmaku(ctx, client, cid, avid, args.danmaku_format) if args.require_danmaku else EmptyDanmakuData await get_danmaku(ctx, client, cid, avid, options["danmaku_format"])
if options["require_danmaku"]
else EmptyDanmakuData
) )
metadata = bangumi_info["metadata"] if args.require_metadata else None metadata = bangumi_info["metadata"] if options["require_metadata"] else None
cover_data = ( cover_data = (
await Fetcher.fetch_bin(ctx, client, bangumi_info["metadata"]["thumb"]) if args.require_cover else None await Fetcher.fetch_bin(ctx, client, bangumi_info["metadata"]["thumb"])
if options["require_cover"]
else None
) )
subpath_variables_base: PathTemplateVariableDict = { subpath_variables_base: PathTemplateVariableDict = {
"id": id, "id": id,
@ -78,7 +82,7 @@ async def extract_bangumi_data(
"owner_uid": UNKNOWN, "owner_uid": UNKNOWN,
} }
subpath_variables_base.update(subpath_variables) subpath_variables_base.update(subpath_variables)
path = resolve_path_template(args.subpath_template, auto_subpath_template, subpath_variables_base) path = resolve_path_template(options["subpath_template"], auto_subpath_template, subpath_variables_base)
return EpisodeData( return EpisodeData(
videos=videos, videos=videos,
audios=audios, audios=audios,
@ -99,7 +103,7 @@ async def extract_cheese_data(
client: httpx.AsyncClient, client: httpx.AsyncClient,
episode_id: EpisodeId, episode_id: EpisodeId,
cheese_info: CheeseListItem, cheese_info: CheeseListItem,
args: argparse.Namespace, options: ExtractorOptions,
subpath_variables: PathTemplateVariableDict, subpath_variables: PathTemplateVariableDict,
auto_subpath_template: str = "{name}", auto_subpath_template: str = "{name}",
) -> EpisodeData | None: ) -> EpisodeData | None:
@ -110,16 +114,18 @@ async def extract_cheese_data(
id = cheese_info["id"] id = cheese_info["id"]
videos, audios = ( videos, audios = (
await get_cheese_playurl(ctx, client, avid, episode_id, cid) await get_cheese_playurl(ctx, client, avid, episode_id, cid)
if args.require_video or args.require_audio if options["require_video"] or options["require_audio"]
else ([], []) else ([], [])
) )
subtitles = await get_cheese_subtitles(ctx, client, avid, cid) if args.require_subtitle else [] subtitles = await get_cheese_subtitles(ctx, client, avid, cid) if options["require_subtitle"] else []
danmaku = ( danmaku = (
await get_danmaku(ctx, client, cid, avid, args.danmaku_format) if args.require_danmaku else EmptyDanmakuData await get_danmaku(ctx, client, cid, avid, options["danmaku_format"])
if options["require_danmaku"]
else EmptyDanmakuData
) )
metadata = cheese_info["metadata"] if args.require_metadata else None metadata = cheese_info["metadata"] if options["require_metadata"] else None
cover_data = ( cover_data = (
await Fetcher.fetch_bin(ctx, client, cheese_info["metadata"]["thumb"]) if args.require_cover else None await Fetcher.fetch_bin(ctx, client, cheese_info["metadata"]["thumb"]) if options["require_cover"] else None
) )
subpath_variables_base: PathTemplateVariableDict = { subpath_variables_base: PathTemplateVariableDict = {
"id": id, "id": id,
@ -134,7 +140,7 @@ async def extract_cheese_data(
"owner_uid": UNKNOWN, "owner_uid": UNKNOWN,
} }
subpath_variables_base.update(subpath_variables) subpath_variables_base.update(subpath_variables)
path = resolve_path_template(args.subpath_template, auto_subpath_template, subpath_variables_base) path = resolve_path_template(options["subpath_template"], auto_subpath_template, subpath_variables_base)
return EpisodeData( return EpisodeData(
videos=videos, videos=videos,
audios=audios, audios=audios,
@ -155,7 +161,7 @@ async def extract_ugc_video_data(
client: httpx.AsyncClient, client: httpx.AsyncClient,
avid: AvId, avid: AvId,
ugc_video_info: UgcVideoListItem, ugc_video_info: UgcVideoListItem,
args: argparse.Namespace, options: ExtractorOptions,
subpath_variables: PathTemplateVariableDict, subpath_variables: PathTemplateVariableDict,
auto_subpath_template: str = "{title}", auto_subpath_template: str = "{title}",
) -> EpisodeData | None: ) -> EpisodeData | None:
@ -165,19 +171,25 @@ async def extract_ugc_video_data(
id = ugc_video_info["id"] id = ugc_video_info["id"]
videos, audios = ( videos, audios = (
await get_ugc_video_playurl(ctx, client, avid, cid) await get_ugc_video_playurl(ctx, client, avid, cid)
if args.require_video or args.require_audio if options["require_video"] or options["require_audio"]
else ([], []) else ([], [])
) )
subtitles = await get_ugc_video_subtitles(ctx, client, avid, cid) if args.require_subtitle else [] subtitles = await get_ugc_video_subtitles(ctx, client, avid, cid) if options["require_subtitle"] else []
chapter_info_data = await get_ugc_video_chapters(ctx, client, avid, cid) if args.require_chapter_info else [] chapter_info_data = (
danmaku = ( await get_ugc_video_chapters(ctx, client, avid, cid) if options["require_chapter_info"] else []
await get_danmaku(ctx, client, cid, avid, args.danmaku_format) if args.require_danmaku else EmptyDanmakuData
) )
metadata = ugc_video_info["metadata"] if args.require_metadata else None danmaku = (
await get_danmaku(ctx, client, cid, avid, options["danmaku_format"])
if options["require_danmaku"]
else EmptyDanmakuData
)
metadata = ugc_video_info["metadata"] if options["require_metadata"] else None
if metadata and chapter_info_data: if metadata and chapter_info_data:
attach_chapter_info(metadata, chapter_info_data) attach_chapter_info(metadata, chapter_info_data)
cover_data = ( cover_data = (
await Fetcher.fetch_bin(ctx, client, ugc_video_info["metadata"]["thumb"]) if args.require_cover else None await Fetcher.fetch_bin(ctx, client, ugc_video_info["metadata"]["thumb"])
if options["require_cover"]
else None
) )
owner_uid: str = ( owner_uid: str = (
ugc_video_info["metadata"]["actor"][0]["profile"].split("/")[-1] ugc_video_info["metadata"]["actor"][0]["profile"].split("/")[-1]
@ -200,7 +212,7 @@ async def extract_ugc_video_data(
"owner_uid": owner_uid, "owner_uid": owner_uid,
} }
subpath_variables_base.update(subpath_variables) subpath_variables_base.update(subpath_variables)
path = resolve_path_template(args.subpath_template, auto_subpath_template, subpath_variables_base) path = resolve_path_template(options["subpath_template"], auto_subpath_template, subpath_variables_base)
return EpisodeData( return EpisodeData(
videos=videos, videos=videos,
audios=audios, audios=audios,

View File

@ -16,10 +16,10 @@ from yutto.utils.fetcher import Fetcher, FetcherContext
from yutto.utils.filter import Filter from yutto.utils.filter import Filter
if TYPE_CHECKING: if TYPE_CHECKING:
import argparse
import httpx import httpx
from yutto._typing import ExtractorOptions
class FavouritesExtractor(BatchExtractor): class FavouritesExtractor(BatchExtractor):
"""用户单一收藏夹""" """用户单一收藏夹"""
@ -38,7 +38,7 @@ class FavouritesExtractor(BatchExtractor):
return False return False
async def extract( async def extract(
self, ctx: FetcherContext, client: httpx.AsyncClient, args: argparse.Namespace self, ctx: FetcherContext, client: httpx.AsyncClient, options: ExtractorOptions
) -> list[CoroutineWrapper[EpisodeData | None] | None]: ) -> list[CoroutineWrapper[EpisodeData | None] | None]:
username, favourite_info = await asyncio.gather( username, favourite_info = await asyncio.gather(
get_user_name(ctx, client, self.mid), get_user_name(ctx, client, self.mid),
@ -76,7 +76,7 @@ class FavouritesExtractor(BatchExtractor):
client, client,
ugc_video_item["avid"], ugc_video_item["avid"],
ugc_video_item, ugc_video_item,
args, options,
{ {
"title": title, "title": title,
"username": username, "username": username,

View File

@ -16,10 +16,10 @@ from yutto.utils.fetcher import Fetcher, FetcherContext
from yutto.utils.filter import Filter from yutto.utils.filter import Filter
if TYPE_CHECKING: if TYPE_CHECKING:
import argparse
import httpx import httpx
from yutto._typing import ExtractorOptions
class SeriesExtractor(BatchExtractor): class SeriesExtractor(BatchExtractor):
"""视频列表""" """视频列表"""
@ -46,7 +46,7 @@ class SeriesExtractor(BatchExtractor):
return False return False
async def extract( async def extract(
self, ctx: FetcherContext, client: httpx.AsyncClient, args: argparse.Namespace self, ctx: FetcherContext, client: httpx.AsyncClient, options: ExtractorOptions
) -> list[CoroutineWrapper[EpisodeData | None] | None]: ) -> list[CoroutineWrapper[EpisodeData | None] | None]:
username, series_title = await asyncio.gather( username, series_title = await asyncio.gather(
get_user_name(ctx, client, self.mid), get_medialist_title(ctx, client, self.series_id) get_user_name(ctx, client, self.mid), get_medialist_title(ctx, client, self.series_id)
@ -80,7 +80,7 @@ class SeriesExtractor(BatchExtractor):
client, client,
ugc_video_item["avid"], ugc_video_item["avid"],
ugc_video_item, ugc_video_item,
args, options,
{ {
"series_title": series_title, "series_title": series_title,
"username": username, # 虽然默认模板的用不上,但这里可以提供一下 "username": username, # 虽然默认模板的用不上,但这里可以提供一下

View File

@ -17,10 +17,9 @@ from yutto.utils.asynclib import CoroutineWrapper
from yutto.utils.console.logger import Badge, Logger from yutto.utils.console.logger import Badge, Logger
if TYPE_CHECKING: if TYPE_CHECKING:
import argparse
import httpx import httpx
from yutto._typing import ExtractorOptions
from yutto.utils.fetcher import FetcherContext from yutto.utils.fetcher import FetcherContext
@ -73,7 +72,7 @@ class UgcVideoExtractor(SingleExtractor):
return False return False
async def extract( async def extract(
self, ctx: FetcherContext, client: httpx.AsyncClient, args: argparse.Namespace self, ctx: FetcherContext, client: httpx.AsyncClient, options: ExtractorOptions
) -> CoroutineWrapper[EpisodeData | None] | None: ) -> CoroutineWrapper[EpisodeData | None] | None:
try: try:
ugc_video_list = await get_ugc_video_list(ctx, client, self.avid) ugc_video_list = await get_ugc_video_list(ctx, client, self.avid)
@ -85,7 +84,7 @@ class UgcVideoExtractor(SingleExtractor):
client, client,
self.avid, self.avid,
ugc_video_list["pages"][self.page - 1], ugc_video_list["pages"][self.page - 1],
args, options,
{ {
"title": ugc_video_list["title"], "title": ugc_video_list["title"],
"pubdate": ugc_video_list["pubdate"], "pubdate": ugc_video_list["pubdate"],

View File

@ -13,10 +13,9 @@ from yutto.utils.asynclib import CoroutineWrapper
from yutto.utils.console.logger import Badge, Logger from yutto.utils.console.logger import Badge, Logger
if TYPE_CHECKING: if TYPE_CHECKING:
import argparse
import httpx import httpx
from yutto._typing import ExtractorOptions
from yutto.utils.fetcher import FetcherContext from yutto.utils.fetcher import FetcherContext
@ -65,7 +64,7 @@ class UgcVideoBatchExtractor(BatchExtractor):
return False return False
async def extract( async def extract(
self, ctx: FetcherContext, client: httpx.AsyncClient, args: argparse.Namespace self, ctx: FetcherContext, client: httpx.AsyncClient, options: ExtractorOptions
) -> list[CoroutineWrapper[EpisodeData | None] | None]: ) -> list[CoroutineWrapper[EpisodeData | None] | None]:
try: try:
ugc_video_list = await get_ugc_video_list(ctx, client, self.avid) ugc_video_list = await get_ugc_video_list(ctx, client, self.avid)
@ -76,7 +75,7 @@ class UgcVideoBatchExtractor(BatchExtractor):
return [] return []
# 选集过滤 # 选集过滤
episodes = parse_episodes_selection(args.episodes, len(ugc_video_list["pages"])) episodes = parse_episodes_selection(options["episodes"], len(ugc_video_list["pages"]))
ugc_video_list["pages"] = list(filter(lambda item: item["id"] in episodes, ugc_video_list["pages"])) ugc_video_list["pages"] = list(filter(lambda item: item["id"] in episodes, ugc_video_list["pages"]))
return [ return [
@ -86,7 +85,7 @@ class UgcVideoBatchExtractor(BatchExtractor):
client, client,
ugc_video_item["avid"], ugc_video_item["avid"],
ugc_video_item, ugc_video_item,
args, options,
{ {
"title": ugc_video_list["title"], "title": ugc_video_list["title"],
"pubdate": ugc_video_list["pubdate"], "pubdate": ugc_video_list["pubdate"],

View File

@ -15,10 +15,10 @@ from yutto.utils.fetcher import Fetcher, FetcherContext
from yutto.utils.filter import Filter from yutto.utils.filter import Filter
if TYPE_CHECKING: if TYPE_CHECKING:
import argparse
import httpx import httpx
from yutto._typing import ExtractorOptions
class UserAllFavouritesExtractor(BatchExtractor): class UserAllFavouritesExtractor(BatchExtractor):
"""用户所有收藏夹""" """用户所有收藏夹"""
@ -35,7 +35,7 @@ class UserAllFavouritesExtractor(BatchExtractor):
return False return False
async def extract( async def extract(
self, ctx: FetcherContext, client: httpx.AsyncClient, args: argparse.Namespace self, ctx: FetcherContext, client: httpx.AsyncClient, options: ExtractorOptions
) -> list[CoroutineWrapper[EpisodeData | None] | None]: ) -> list[CoroutineWrapper[EpisodeData | None] | None]:
username = await get_user_name(ctx, client, self.mid) username = await get_user_name(ctx, client, self.mid)
Logger.custom(username, Badge("用户收藏夹", fore="black", back="cyan")) Logger.custom(username, Badge("用户收藏夹", fore="black", back="cyan"))
@ -72,7 +72,7 @@ class UserAllFavouritesExtractor(BatchExtractor):
client, client,
ugc_video_item["avid"], ugc_video_item["avid"],
ugc_video_item, ugc_video_item,
args, options,
{ {
"title": title, "title": title,
"username": username, "username": username,

View File

@ -15,10 +15,10 @@ from yutto.utils.fetcher import Fetcher, FetcherContext
from yutto.utils.filter import Filter from yutto.utils.filter import Filter
if TYPE_CHECKING: if TYPE_CHECKING:
import argparse
import httpx import httpx
from yutto._typing import ExtractorOptions
class UserAllUgcVideosExtractor(BatchExtractor): class UserAllUgcVideosExtractor(BatchExtractor):
"""UP 主个人空间全部投稿视频""" """UP 主个人空间全部投稿视频"""
@ -35,7 +35,7 @@ class UserAllUgcVideosExtractor(BatchExtractor):
return False return False
async def extract( async def extract(
self, ctx: FetcherContext, client: httpx.AsyncClient, args: argparse.Namespace self, ctx: FetcherContext, client: httpx.AsyncClient, options: ExtractorOptions
) -> list[CoroutineWrapper[EpisodeData | None] | None]: ) -> list[CoroutineWrapper[EpisodeData | None] | None]:
username = await get_user_name(ctx, client, self.mid) username = await get_user_name(ctx, client, self.mid)
Logger.custom(username, Badge("UP 主投稿视频", fore="black", back="cyan")) Logger.custom(username, Badge("UP 主投稿视频", fore="black", back="cyan"))
@ -67,7 +67,7 @@ class UserAllUgcVideosExtractor(BatchExtractor):
client, client,
ugc_video_item["avid"], ugc_video_item["avid"],
ugc_video_item, ugc_video_item,
args, options,
{ {
"title": title, "title": title,
"username": username, "username": username,

View File

@ -14,11 +14,9 @@ from yutto.utils.fetcher import Fetcher, FetcherContext
from yutto.utils.filter import Filter from yutto.utils.filter import Filter
if TYPE_CHECKING: if TYPE_CHECKING:
import argparse
import httpx import httpx
from yutto._typing import EpisodeData from yutto._typing import EpisodeData, ExtractorOptions
class UserWatchLaterExtractor(BatchExtractor): class UserWatchLaterExtractor(BatchExtractor):
@ -34,7 +32,7 @@ class UserWatchLaterExtractor(BatchExtractor):
return False return False
async def extract( async def extract(
self, ctx: FetcherContext, client: httpx.AsyncClient, args: argparse.Namespace self, ctx: FetcherContext, client: httpx.AsyncClient, options: ExtractorOptions
) -> list[CoroutineWrapper[EpisodeData | None] | None]: ) -> list[CoroutineWrapper[EpisodeData | None] | None]:
Logger.custom("当前用户", Badge("稍后再看", fore="black", back="cyan")) Logger.custom("当前用户", Badge("稍后再看", fore="black", back="cyan"))
@ -73,7 +71,7 @@ class UserWatchLaterExtractor(BatchExtractor):
client, client,
ugc_video_item["avid"], ugc_video_item["avid"],
ugc_video_item, ugc_video_item,
args, options,
{ {
"title": title, "title": title,
"username": "", "username": "",