From cb09dbef66471b98cc9ed9de2ecd07f182acca9e Mon Sep 17 00:00:00 2001 From: Charles Zhou Date: Mon, 1 Jul 2024 01:21:17 -0500 Subject: [PATCH] feat: correctly delete applications using Celery workers (#5787) --- .devcontainer/post_create_command.sh | 2 +- .vscode/launch.json | 2 +- api/README.md | 2 +- api/docker/entrypoint.sh | 2 +- api/events/app_event.py | 3 - api/events/event_handlers/__init__.py | 3 - .../delete_installed_app_when_app_deleted.py | 12 -- .../delete_site_record_when_app_deleted.py | 11 -- ...elete_workflow_as_tool_when_app_deleted.py | 14 -- api/services/app_service.py | 17 +- api/tasks/remove_app_and_related_data_task.py | 150 ++++++++++++++++++ 11 files changed, 158 insertions(+), 60 deletions(-) delete mode 100644 api/events/event_handlers/delete_installed_app_when_app_deleted.py delete mode 100644 api/events/event_handlers/delete_site_record_when_app_deleted.py delete mode 100644 api/events/event_handlers/delete_workflow_as_tool_when_app_deleted.py create mode 100644 api/tasks/remove_app_and_related_data_task.py diff --git a/.devcontainer/post_create_command.sh b/.devcontainer/post_create_command.sh index 3ebc06e605..34bdf041d9 100755 --- a/.devcontainer/post_create_command.sh +++ b/.devcontainer/post_create_command.sh @@ -3,7 +3,7 @@ cd web && npm install echo 'alias start-api="cd /workspaces/dify/api && flask run --host 0.0.0.0 --port=5001 --debug"' >> ~/.bashrc -echo 'alias start-worker="cd /workspaces/dify/api && celery -A app.celery worker -P gevent -c 1 --loglevel INFO -Q dataset,generation,mail,ops_trace"' >> ~/.bashrc +echo 'alias start-worker="cd /workspaces/dify/api && celery -A app.celery worker -P gevent -c 1 --loglevel INFO -Q dataset,generation,mail,ops_trace,app_deletion"' >> ~/.bashrc echo 'alias start-web="cd /workspaces/dify/web && npm run dev"' >> ~/.bashrc echo 'alias start-containers="cd /workspaces/dify/docker && docker-compose -f docker-compose.middleware.yaml -p dify up -d"' >> ~/.bashrc diff --git a/.vscode/launch.json b/.vscode/launch.json index 03b15e7f27..1b1c05281b 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -48,7 +48,7 @@ "--loglevel", "info", "-Q", - "dataset,generation,mail,ops_trace" + "dataset,generation,mail,ops_trace,app_deletion" ] }, ] diff --git a/api/README.md b/api/README.md index 9e3c7c446b..d89a645c4e 100644 --- a/api/README.md +++ b/api/README.md @@ -66,7 +66,7 @@ 10. If you need to debug local async processing, please start the worker service. ```bash - poetry run python -m celery -A app.celery worker -P gevent -c 1 --loglevel INFO -Q dataset,generation,mail,ops_trace + poetry run python -m celery -A app.celery worker -P gevent -c 1 --loglevel INFO -Q dataset,generation,mail,ops_trace,app_deletion ``` The started celery app handles the async tasks, e.g. dataset importing and documents indexing. diff --git a/api/docker/entrypoint.sh b/api/docker/entrypoint.sh index 0bb494abd7..e74c6c2406 100755 --- a/api/docker/entrypoint.sh +++ b/api/docker/entrypoint.sh @@ -9,7 +9,7 @@ fi if [[ "${MODE}" == "worker" ]]; then celery -A app.celery worker -P ${CELERY_WORKER_CLASS:-gevent} -c ${CELERY_WORKER_AMOUNT:-1} --loglevel INFO \ - -Q ${CELERY_QUEUES:-dataset,generation,mail,ops_trace} + -Q ${CELERY_QUEUES:-dataset,generation,mail,ops_trace,app_deletion} elif [[ "${MODE}" == "beat" ]]; then celery -A app.celery beat --loglevel INFO else diff --git a/api/events/app_event.py b/api/events/app_event.py index 3a975958fc..67a5982527 100644 --- a/api/events/app_event.py +++ b/api/events/app_event.py @@ -3,9 +3,6 @@ from blinker import signal # sender: app app_was_created = signal('app-was-created') -# sender: app -app_was_deleted = signal('app-was-deleted') - # sender: app, kwargs: app_model_config app_model_config_was_updated = signal('app-model-config-was-updated') diff --git a/api/events/event_handlers/__init__.py b/api/events/event_handlers/__init__.py index c82b8a92d9..7ee7146d09 100644 --- a/api/events/event_handlers/__init__.py +++ b/api/events/event_handlers/__init__.py @@ -4,10 +4,7 @@ from .create_document_index import handle from .create_installed_app_when_app_created import handle from .create_site_record_when_app_created import handle from .deduct_quota_when_messaeg_created import handle -from .delete_installed_app_when_app_deleted import handle -from .delete_site_record_when_app_deleted import handle from .delete_tool_parameters_cache_when_sync_draft_workflow import handle -from .delete_workflow_as_tool_when_app_deleted import handle from .update_app_dataset_join_when_app_model_config_updated import handle from .update_app_dataset_join_when_app_published_workflow_updated import handle from .update_provider_last_used_at_when_messaeg_created import handle diff --git a/api/events/event_handlers/delete_installed_app_when_app_deleted.py b/api/events/event_handlers/delete_installed_app_when_app_deleted.py deleted file mode 100644 index 1d6271a466..0000000000 --- a/api/events/event_handlers/delete_installed_app_when_app_deleted.py +++ /dev/null @@ -1,12 +0,0 @@ -from events.app_event import app_was_deleted -from extensions.ext_database import db -from models.model import InstalledApp - - -@app_was_deleted.connect -def handle(sender, **kwargs): - app = sender - installed_apps = db.session.query(InstalledApp).filter(InstalledApp.app_id == app.id).all() - for installed_app in installed_apps: - db.session.delete(installed_app) - db.session.commit() diff --git a/api/events/event_handlers/delete_site_record_when_app_deleted.py b/api/events/event_handlers/delete_site_record_when_app_deleted.py deleted file mode 100644 index 2e476d3d53..0000000000 --- a/api/events/event_handlers/delete_site_record_when_app_deleted.py +++ /dev/null @@ -1,11 +0,0 @@ -from events.app_event import app_was_deleted -from extensions.ext_database import db -from models.model import Site - - -@app_was_deleted.connect -def handle(sender, **kwargs): - app = sender - site = db.session.query(Site).filter(Site.app_id == app.id).first() - db.session.delete(site) - db.session.commit() diff --git a/api/events/event_handlers/delete_workflow_as_tool_when_app_deleted.py b/api/events/event_handlers/delete_workflow_as_tool_when_app_deleted.py deleted file mode 100644 index 0c56688ff6..0000000000 --- a/api/events/event_handlers/delete_workflow_as_tool_when_app_deleted.py +++ /dev/null @@ -1,14 +0,0 @@ -from events.app_event import app_was_deleted -from extensions.ext_database import db -from models.tools import WorkflowToolProvider - - -@app_was_deleted.connect -def handle(sender, **kwargs): - app = sender - workflow_tools = db.session.query(WorkflowToolProvider).filter( - WorkflowToolProvider.app_id == app.id - ).all() - for workflow_tool in workflow_tools: - db.session.delete(workflow_tool) - db.session.commit() diff --git a/api/services/app_service.py b/api/services/app_service.py index 23c00740c8..1de0bce1eb 100644 --- a/api/services/app_service.py +++ b/api/services/app_service.py @@ -16,13 +16,14 @@ from core.model_runtime.entities.model_entities import ModelPropertyKey, ModelTy from core.model_runtime.model_providers.__base.large_language_model import LargeLanguageModel from core.tools.tool_manager import ToolManager from core.tools.utils.configuration import ToolParameterConfigurationManager -from events.app_event import app_model_config_was_updated, app_was_created, app_was_deleted +from events.app_event import app_model_config_was_updated, app_was_created from extensions.ext_database import db from models.account import Account from models.model import App, AppMode, AppModelConfig from models.tools import ApiToolProvider from services.tag_service import TagService from services.workflow_service import WorkflowService +from tasks.remove_app_and_related_data_task import remove_app_and_related_data_task class AppService: @@ -393,18 +394,8 @@ class AppService: Delete app :param app: App instance """ - db.session.delete(app) - db.session.commit() - - app_was_deleted.send(app) - - # todo async delete related data by event - # app_model_configs, site, api_tokens, installed_apps, recommended_apps BY app - # app_annotation_hit_histories, app_annotation_settings, app_dataset_joins BY app - # workflows, workflow_runs, workflow_node_executions, workflow_app_logs BY app - # conversations, pinned_conversations, messages BY app - # message_feedbacks, message_annotations, message_chains BY message - # message_agent_thoughts, message_files, saved_messages BY message + # Trigger asynchronous deletion of app and related data + remove_app_and_related_data_task.delay(app.id) def get_app_meta(self, app_model: App) -> dict: """ diff --git a/api/tasks/remove_app_and_related_data_task.py b/api/tasks/remove_app_and_related_data_task.py new file mode 100644 index 0000000000..0a6bfb1520 --- /dev/null +++ b/api/tasks/remove_app_and_related_data_task.py @@ -0,0 +1,150 @@ +import logging +import time + +import click +from celery import shared_task +from sqlalchemy import select +from sqlalchemy.exc import SQLAlchemyError + +from extensions.ext_database import db +from models.dataset import AppDatasetJoin +from models.model import ( + ApiToken, + App, + AppAnnotationHitHistory, + AppAnnotationSetting, + AppModelConfig, + Conversation, + EndUser, + InstalledApp, + Message, + MessageAgentThought, + MessageAnnotation, + MessageChain, + MessageFeedback, + MessageFile, + RecommendedApp, + Site, + TagBinding, +) +from models.tools import WorkflowToolProvider +from models.web import PinnedConversation, SavedMessage +from models.workflow import Workflow, WorkflowAppLog, WorkflowNodeExecution, WorkflowRun + + +@shared_task(queue='app_deletion', bind=True, max_retries=3) +def remove_app_and_related_data_task(self, app_id: str): + logging.info(click.style(f'Start deleting app and related data: {app_id}', fg='green')) + start_at = time.perf_counter() + + deletion_cache_key = f'app_{app_id}_deletion' + + try: + # Use a transaction to ensure all deletions succeed or none do + with db.session.begin_nested(): + app = db.session.query(App).filter(App.id == app_id).first() + if not app: + logging.warning(click.style(f"App {app_id} not found", fg='yellow')) + return + + # Delete related data + _delete_app_model_configs(app_id) + _delete_app_site(app_id) + _delete_app_api_tokens(app_id) + _delete_installed_apps(app_id) + _delete_recommended_apps(app_id) + _delete_app_annotation_data(app_id) + _delete_app_dataset_joins(app_id) + _delete_app_workflows(app_id) + _delete_app_conversations(app_id) + _delete_app_messages(app_id) + _delete_workflow_tool_providers(app_id) + _delete_app_tag_bindings(app_id) + _delete_end_users(app_id) + + # Delete the app itself + db.session.delete(app) + + + # If we reach here, the transaction was successful + db.session.commit() + + end_at = time.perf_counter() + logging.info(click.style(f'App and related data deleted: {app_id} latency: {end_at - start_at}', fg='green')) + + except SQLAlchemyError as e: + db.session.rollback() + logging.exception(click.style(f"Database error occurred while deleting app {app_id} and related data", fg='red')) + raise self.retry(exc=e, countdown=60) # Retry after 60 seconds + + except Exception as e: + logging.exception(click.style(f"Error occurred while deleting app {app_id} and related data", fg='red')) + raise self.retry(exc=e, countdown=60) # Retry after 60 seconds + + +def _delete_app_model_configs(app_id: str): + db.session.query(AppModelConfig).filter(AppModelConfig.app_id == app_id).delete() + +def _delete_app_site(app_id: str): + db.session.query(Site).filter(Site.app_id == app_id).delete() + +def _delete_app_api_tokens(app_id: str): + db.session.query(ApiToken).filter(ApiToken.app_id == app_id).delete() + +def _delete_installed_apps(app_id: str): + db.session.query(InstalledApp).filter(InstalledApp.app_id == app_id).delete() + +def _delete_recommended_apps(app_id: str): + db.session.query(RecommendedApp).filter(RecommendedApp.app_id == app_id).delete() + +def _delete_app_annotation_data(app_id: str): + db.session.query(AppAnnotationHitHistory).filter(AppAnnotationHitHistory.app_id == app_id).delete() + db.session.query(AppAnnotationSetting).filter(AppAnnotationSetting.app_id == app_id).delete() + +def _delete_app_dataset_joins(app_id: str): + db.session.query(AppDatasetJoin).filter(AppDatasetJoin.app_id == app_id).delete() + +def _delete_app_workflows(app_id: str): + db.session.query(WorkflowRun).filter( + WorkflowRun.workflow_id.in_( + db.session.query(Workflow.id).filter(Workflow.app_id == app_id) + ) + ).delete(synchronize_session=False) + db.session.query(WorkflowNodeExecution).filter( + WorkflowNodeExecution.workflow_id.in_( + db.session.query(Workflow.id).filter(Workflow.app_id == app_id) + ) + ).delete(synchronize_session=False) + db.session.query(WorkflowAppLog).filter(WorkflowAppLog.app_id == app_id).delete(synchronize_session=False) + db.session.query(Workflow).filter(Workflow.app_id == app_id).delete(synchronize_session=False) + +def _delete_app_conversations(app_id: str): + db.session.query(PinnedConversation).filter( + PinnedConversation.conversation_id.in_( + db.session.query(Conversation.id).filter(Conversation.app_id == app_id) + ) + ).delete(synchronize_session=False) + db.session.query(Conversation).filter(Conversation.app_id == app_id).delete() + +def _delete_app_messages(app_id: str): + message_ids = select(Message.id).filter(Message.app_id == app_id).scalar_subquery() + db.session.query(MessageFeedback).filter(MessageFeedback.message_id.in_(message_ids)).delete(synchronize_session=False) + db.session.query(MessageAnnotation).filter(MessageAnnotation.message_id.in_(message_ids)).delete(synchronize_session=False) + db.session.query(MessageChain).filter(MessageChain.message_id.in_(message_ids)).delete(synchronize_session=False) + db.session.query(MessageAgentThought).filter(MessageAgentThought.message_id.in_(message_ids)).delete(synchronize_session=False) + db.session.query(MessageFile).filter(MessageFile.message_id.in_(message_ids)).delete(synchronize_session=False) + db.session.query(SavedMessage).filter(SavedMessage.message_id.in_(message_ids)).delete(synchronize_session=False) + db.session.query(Message).filter(Message.app_id == app_id).delete(synchronize_session=False) + +def _delete_workflow_tool_providers(app_id: str): + db.session.query(WorkflowToolProvider).filter( + WorkflowToolProvider.app_id == app_id + ).delete(synchronize_session=False) + +def _delete_app_tag_bindings(app_id: str): + db.session.query(TagBinding).filter( + TagBinding.target_id == app_id + ).delete(synchronize_session=False) + +def _delete_end_users(app_id: str): + db.session.query(EndUser).filter(EndUser.app_id == app_id).delete()