import logging import random from typing import cast from core.app.entities.app_invoke_entities import ModelConfigWithCredentialsEntity from core.entities import DEFAULT_PLUGIN_ID from core.model_runtime.entities.model_entities import ModelType from core.model_runtime.errors.invoke import InvokeBadRequestError from core.model_runtime.model_providers.__base.moderation_model import ModerationModel from core.model_runtime.model_providers.model_provider_factory import ModelProviderFactory from extensions.ext_hosting_provider import hosting_configuration from models.provider import ProviderType logger = logging.getLogger(__name__) def check_moderation(tenant_id: str, model_config: ModelConfigWithCredentialsEntity, text: str) -> bool: moderation_config = hosting_configuration.moderation_config openai_provider_name = f"{DEFAULT_PLUGIN_ID}/openai/openai" if ( moderation_config and moderation_config.enabled is True and openai_provider_name in hosting_configuration.provider_map and hosting_configuration.provider_map[openai_provider_name].enabled is True ): using_provider_type = model_config.provider_model_bundle.configuration.using_provider_type provider_name = model_config.provider if using_provider_type == ProviderType.SYSTEM and provider_name in moderation_config.providers: hosting_openai_config = hosting_configuration.provider_map[openai_provider_name] if hosting_openai_config.credentials is None: return False # 2000 text per chunk length = 2000 text_chunks = [text[i : i + length] for i in range(0, len(text), length)] if len(text_chunks) == 0: return True text_chunk = random.choice(text_chunks) try: model_provider_factory = ModelProviderFactory(tenant_id) # Get model instance of LLM model_type_instance = model_provider_factory.get_model_type_instance( provider=openai_provider_name, model_type=ModelType.MODERATION ) model_type_instance = cast(ModerationModel, model_type_instance) moderation_result = model_type_instance.invoke( model="omni-moderation-latest", credentials=hosting_openai_config.credentials, text=text_chunk ) if moderation_result is True: return True except Exception: logger.exception(f"Fails to check moderation, provider_name: {provider_name}") raise InvokeBadRequestError("Rate limit exceeded, please try again later.") return False