Feat: add mcp self-host mode (#7157)

### What problem does this PR solve?

Add mcp self-host mode, a complement of #7084.

### Type of change

- [x] New Feature (non-breaking change which adds functionality)
This commit is contained in:
Yongteng Lei 2025-04-22 10:04:21 +08:00 committed by GitHub
parent 91c5a5c08f
commit ad220a0a3c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 73 additions and 17 deletions

View File

@ -15,6 +15,8 @@ services:
# - --mcp-port=9382 # - --mcp-port=9382
# - --mcp-base-url=http://127.0.0.1:9380 # - --mcp-base-url=http://127.0.0.1:9380
# - --mcp-script-path=/ragflow/mcp/server/server.py # - --mcp-script-path=/ragflow/mcp/server/server.py
# - --mcp-mode=self-host
# - --mcp--host-api-key="ragflow-xxxxxxx"
container_name: ragflow-server container_name: ragflow-server
ports: ports:
- ${SVR_HTTP_PORT}:9380 - ${SVR_HTTP_PORT}:9380

View File

@ -35,6 +35,8 @@ MCP_HOST="127.0.0.1"
MCP_PORT=9382 MCP_PORT=9382
MCP_BASE_URL="http://127.0.0.1:9380" MCP_BASE_URL="http://127.0.0.1:9380"
MCP_SCRIPT_PATH="/ragflow/mcp/server/server.py" MCP_SCRIPT_PATH="/ragflow/mcp/server/server.py"
MCP_MODE="self-host"
MCP_HOST_API_KEY=""
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Host ID logic: # Host ID logic:
@ -77,6 +79,14 @@ for arg in "$@"; do
MCP_BASE_URL="${arg#*=}" MCP_BASE_URL="${arg#*=}"
shift shift
;; ;;
--mcp-mode=*)
MCP_MODE="${arg#*=}"
shift
;;
--mcp-host-api-key=*)
MCP_HOST_API_KEY="${arg#*=}"
shift
;;
--mcp-script-path=*) --mcp-script-path=*)
MCP_SCRIPT_PATH="${arg#*=}" MCP_SCRIPT_PATH="${arg#*=}"
shift shift
@ -138,7 +148,9 @@ function start_mcp_server() {
"$PY" "${MCP_SCRIPT_PATH}" \ "$PY" "${MCP_SCRIPT_PATH}" \
--host="${MCP_HOST}" \ --host="${MCP_HOST}" \
--port="${MCP_PORT}" \ --port="${MCP_PORT}" \
--base_url="${MCP_BASE_URL}" & --base_url="${MCP_BASE_URL}" \
--mode="${MCP_MODE}" \
--api_key="${MCP_HOST_API_KEY}" \ &
} }
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------

View File

@ -21,7 +21,9 @@ from mcp.client.sse import sse_client
async def main(): async def main():
try: try:
async with sse_client("http://localhost:9382/sse", headers={"api_key": "ragflow-IyMGI1ZDhjMTA2ZTExZjBiYTMyMGQ4Zm"}) as streams: # To access RAGFlow server in `host` mode, you need to attach `api_key` for each request to indicate identification.
# async with sse_client("http://localhost:9382/sse", headers={"api_key": "ragflow-IyMGI1ZDhjMTA2ZTExZjBiYTMyMGQ4Zm"}) as streams:
async with sse_client("http://localhost:9382/sse") as streams:
async with ClientSession( async with ClientSession(
streams[0], streams[0],
streams[1], streams[1],
@ -29,7 +31,7 @@ async def main():
await session.initialize() await session.initialize()
tools = await session.list_tools() tools = await session.list_tools()
print(f"{tools.tools=}") print(f"{tools.tools=}")
response = await session.call_tool(name="ragflow_retrival", arguments={"dataset_ids": ["ce3bb17cf27a11efa69751e139332ced"], "document_ids": [], "question": "How to install neovim?"}) response = await session.call_tool(name="ragflow_retrieval", arguments={"dataset_ids": ["ce3bb17cf27a11efa69751e139332ced"], "document_ids": [], "question": "How to install neovim?"})
print(f"Tool response: {response.model_dump()}") print(f"Tool response: {response.model_dump()}")
except Exception as e: except Exception as e:

View File

@ -24,14 +24,23 @@ from starlette.middleware import Middleware
from starlette.middleware.base import BaseHTTPMiddleware from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import JSONResponse from starlette.responses import JSONResponse
from starlette.routing import Mount, Route from starlette.routing import Mount, Route
from strenum import StrEnum
import mcp.types as types import mcp.types as types
from mcp.server.lowlevel import Server from mcp.server.lowlevel import Server
from mcp.server.sse import SseServerTransport from mcp.server.sse import SseServerTransport
class LaunchMode(StrEnum):
SELF_HOST = "self-host"
HOST = "host"
BASE_URL = "http://127.0.0.1:9380" BASE_URL = "http://127.0.0.1:9380"
HOST = "127.0.0.1" HOST = "127.0.0.1"
PORT = "9382" PORT = "9382"
HOST_API_KEY = ""
MODE = ""
class RAGFlowConnector: class RAGFlowConnector:
@ -68,7 +77,7 @@ class RAGFlowConnector:
return "\n".join(result_list) return "\n".join(result_list)
return "" return ""
def retrival( def retrieval(
self, dataset_ids, document_ids=None, question="", page=1, page_size=30, similarity_threshold=0.2, vector_similarity_weight=0.3, top_k=1024, rerank_id: str | None = None, keyword: bool = False self, dataset_ids, document_ids=None, question="", page=1, page_size=30, similarity_threshold=0.2, vector_similarity_weight=0.3, top_k=1024, rerank_id: str | None = None, keyword: bool = False
): ):
if document_ids is None: if document_ids is None:
@ -126,21 +135,24 @@ async def list_tools() -> list[types.Tool]:
raise ValueError("Get RAGFlow Context failed") raise ValueError("Get RAGFlow Context failed")
connector = ragflow_ctx.conn connector = ragflow_ctx.conn
if MODE == LaunchMode.HOST:
api_key = ctx.session._init_options.capabilities.experimental["headers"]["api_key"] api_key = ctx.session._init_options.capabilities.experimental["headers"]["api_key"]
if not api_key: if not api_key:
raise ValueError("RAGFlow API_KEY is required.") raise ValueError("RAGFlow API_KEY is required.")
else:
api_key = HOST_API_KEY
connector.bind_api_key(api_key) connector.bind_api_key(api_key)
dataset_description = connector.list_datasets() dataset_description = connector.list_datasets()
return [ return [
types.Tool( types.Tool(
name="retrival", name="ragflow_retrieval",
description="Retrieve relevant chunks from the RAGFlow retrieve interface based on the question, using the specified dataset_ids and optionally document_ids. Below is the list of all available datasets, including their descriptions and IDs. If you're unsure which datasets are relevant to the question, simply pass all dataset IDs to the function." description="Retrieve relevant chunks from the RAGFlow retrieve interface based on the question, using the specified dataset_ids and optionally document_ids. Below is the list of all available datasets, including their descriptions and IDs. If you're unsure which datasets are relevant to the question, simply pass all dataset IDs to the function."
+ dataset_description, + dataset_description,
inputSchema={ inputSchema={
"type": "object", "type": "object",
"properties": {"dataset_ids": {"type": "array", "items": {"type": "string"}}, "documents_ids": {"type": "array", "items": {"type": "string"}}, "question": {"type": "string"}}, "properties": {"dataset_ids": {"type": "array", "items": {"type": "string"}}, "document_ids": {"type": "array", "items": {"type": "string"}}, "question": {"type": "string"}},
"required": ["dataset_ids", "question"], "required": ["dataset_ids", "question"],
}, },
), ),
@ -155,13 +167,17 @@ async def call_tool(name: str, arguments: dict) -> list[types.TextContent | type
raise ValueError("Get RAGFlow Context failed") raise ValueError("Get RAGFlow Context failed")
connector = ragflow_ctx.conn connector = ragflow_ctx.conn
if MODE == LaunchMode.HOST:
api_key = ctx.session._init_options.capabilities.experimental["headers"]["api_key"] api_key = ctx.session._init_options.capabilities.experimental["headers"]["api_key"]
if not api_key: if not api_key:
raise ValueError("RAGFlow API_KEY is required.") raise ValueError("RAGFlow API_KEY is required.")
else:
api_key = HOST_API_KEY
connector.bind_api_key(api_key) connector.bind_api_key(api_key)
if name == "ragflow_retrival": if name == "ragflow_retrieval":
return connector.retrival(dataset_ids=arguments["dataset_ids"], document_ids=arguments["document_ids"], question=arguments["question"]) document_ids = arguments.get("document_ids", [])
return connector.retrieval(dataset_ids=arguments["dataset_ids"], document_ids=document_ids, question=arguments["question"])
raise ValueError(f"Tool not found: {name}") raise ValueError(f"Tool not found: {name}")
@ -179,20 +195,27 @@ class AuthMiddleware(BaseHTTPMiddleware):
return await call_next(request) return await call_next(request)
middleware = None
if MODE == LaunchMode.HOST:
middleware = [Middleware(AuthMiddleware)]
starlette_app = Starlette( starlette_app = Starlette(
debug=True, debug=True,
routes=[ routes=[
Route("/sse", endpoint=handle_sse), Route("/sse", endpoint=handle_sse),
Mount("/messages/", app=sse.handle_post_message), Mount("/messages/", app=sse.handle_post_message),
], ],
middleware=[Middleware(AuthMiddleware)], middleware=middleware,
) )
if __name__ == "__main__": if __name__ == "__main__":
""" """
Launch example: Launch example:
uv run mcp/server/server.py --host=127.0.0.1 --port=9382 --base_url=http://127.0.0.1:9380 self-host:
uv run mcp/server/server.py --host=127.0.0.1 --port=9382 --base_url=http://127.0.0.1:9380 --mode=self-host --api_key=ragflow-xxxxx
host:
uv run mcp/server/server.py --host=127.0.0.1 --port=9382 --base_url=http://127.0.0.1:9380 --mode=host
""" """
import argparse import argparse
@ -203,15 +226,31 @@ if __name__ == "__main__":
load_dotenv() load_dotenv()
parser = argparse.ArgumentParser(description="RAGFlow MCP Server, `base_url` and `api_key` are needed.") parser = argparse.ArgumentParser(description="RAGFlow MCP Server")
parser.add_argument("--base_url", type=str, default="http://127.0.0.1:9380", help="api_url: http://<host_address>") parser.add_argument("--base_url", type=str, default="http://127.0.0.1:9380", help="api_url: http://<host_address>")
parser.add_argument("--host", type=str, default="127.0.0.1", help="RAGFlow MCP SERVER host") parser.add_argument("--host", type=str, default="127.0.0.1", help="RAGFlow MCP SERVER host")
parser.add_argument("--port", type=str, default="9382", help="RAGFlow MCP SERVER port") parser.add_argument("--port", type=str, default="9382", help="RAGFlow MCP SERVER port")
parser.add_argument(
"--mode",
type=str,
default="self-host",
help="Launch mode options:\n"
" * self-host: Launches an MCP server to access a specific tenant space. The 'api_key' argument is required.\n"
" * host: Launches an MCP server that allows users to access their own spaces. Each request must include a header "
"indicating the user's identification.",
)
parser.add_argument("--api_key", type=str, default="", help="RAGFlow MCP SERVER HOST API KEY")
args = parser.parse_args() args = parser.parse_args()
if args.mode not in ["self-host", "host"]:
parser.error("--mode is only accept 'self-host' or 'host'")
if args.mode == "self-host" and not args.api_key:
parser.error("--api_key is required when --mode is 'self-host'")
BASE_URL = os.environ.get("RAGFLOW_MCP_BASE_URL", args.base_url) BASE_URL = os.environ.get("RAGFLOW_MCP_BASE_URL", args.base_url)
HOST = os.environ.get("RAGFLOW_MCP_HOST", args.host) HOST = os.environ.get("RAGFLOW_MCP_HOST", args.host)
PORT = os.environ.get("RAGFLOW_MCP_PORT", args.port) PORT = os.environ.get("RAGFLOW_MCP_PORT", args.port)
MODE = os.environ.get("RAGFLOW_MCP_LAUNCH_MODE", args.mode)
HOST_API_KEY = os.environ.get("RAGFLOW_MCP_HOST_API_KEY", args.api_key)
print( print(
r""" r"""
@ -223,6 +262,7 @@ __ __ ____ ____ ____ _____ ______ _______ ____
""", """,
flush=True, flush=True,
) )
print(f"MCP launch mode: {MODE}", flush=True)
print(f"MCP host: {HOST}", flush=True) print(f"MCP host: {HOST}", flush=True)
print(f"MCP port: {PORT}", flush=True) print(f"MCP port: {PORT}", flush=True)
print(f"MCP base_url: {BASE_URL}", flush=True) print(f"MCP base_url: {BASE_URL}", flush=True)