diff --git a/Dockerfile b/Dockerfile index d102c1c57..d01049bb6 100644 --- a/Dockerfile +++ b/Dockerfile @@ -198,6 +198,7 @@ COPY agent agent COPY graphrag graphrag COPY agentic_reasoning agentic_reasoning COPY pyproject.toml uv.lock ./ +COPY mcp mcp COPY docker/service_conf.yaml.template ./conf/service_conf.yaml.template COPY docker/entrypoint.sh ./ diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 6457eec76..182f3baab 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -8,6 +8,13 @@ services: mysql: condition: service_healthy image: ${RAGFLOW_IMAGE} + # example to setup MCP server + # command: + # - --enable-mcpserver + # - --mcp-host=0.0.0.0 + # - --mcp-port=9382 + # - --mcp-base-url=http://127.0.0.1:9380 + # - --mcp-script-path=/ragflow/mcp/server/server.py container_name: ragflow-server ports: - ${SVR_HTTP_PORT}:9380 @@ -15,6 +22,7 @@ services: - 443:443 - 5678:5678 - 5679:5679 + - 9382:9382 # entry for MCP (host_port:docker_port). The docker_port should match with the value you set for `mcp-port` above volumes: - ./ragflow-logs:/ragflow/logs - ./nginx/ragflow.conf:/etc/nginx/conf.d/ragflow.conf diff --git a/docker/entrypoint.sh b/docker/entrypoint.sh index 1f582aaac..6b394916d 100644 --- a/docker/entrypoint.sh +++ b/docker/entrypoint.sh @@ -10,6 +10,7 @@ function usage() { echo echo " --disable-webserver Disables the web server (nginx + ragflow_server)." echo " --disable-taskexecutor Disables task executor workers." + echo " --enable-mcpserver Enables the MCP server." echo " --consumer-no-beg= Start range for consumers (if using range-based)." echo " --consumer-no-end= End range for consumers (if using range-based)." echo " --workers= Number of task executors to run (if range is not used)." @@ -19,15 +20,22 @@ function usage() { echo " $0 --disable-taskexecutor" echo " $0 --disable-webserver --consumer-no-beg=0 --consumer-no-end=5" echo " $0 --disable-webserver --workers=2 --host-id=myhost123" + echo " $0 --enable-mcpserver" exit 1 } ENABLE_WEBSERVER=1 # Default to enable web server ENABLE_TASKEXECUTOR=1 # Default to enable task executor +ENABLE_MCP_SERVER=0 CONSUMER_NO_BEG=0 CONSUMER_NO_END=0 WORKERS=1 +MCP_HOST="127.0.0.1" +MCP_PORT=9382 +MCP_BASE_URL="http://127.0.0.1:9380" +MCP_SCRIPT_PATH="/ragflow/mcp/server/server.py" + # ----------------------------------------------------------------------------- # Host ID logic: # 1. By default, use the system hostname if length <= 32 @@ -53,6 +61,26 @@ for arg in "$@"; do ENABLE_TASKEXECUTOR=0 shift ;; + --enable-mcpserver) + ENABLE_MCP_SERVER=1 + shift + ;; + --mcp-host=*) + MCP_HOST="${arg#*=}" + shift + ;; + --mcp-port=*) + MCP_PORT="${arg#*=}" + shift + ;; + --mcp-base-url=*) + MCP_BASE_URL="${arg#*=}" + shift + ;; + --mcp-script-path=*) + MCP_SCRIPT_PATH="${arg#*=}" + shift + ;; --consumer-no-beg=*) CONSUMER_NO_BEG="${arg#*=}" shift @@ -105,6 +133,14 @@ function task_exe() { done } +function start_mcp_server() { + echo "Starting MCP Server on ${MCP_HOST}:${MCP_PORT} with base URL ${MCP_BASE_URL}..." + "$PY" "${MCP_SCRIPT_PATH}" \ + --host="${MCP_HOST}" \ + --port="${MCP_PORT}" \ + --base_url="${MCP_BASE_URL}" & +} + # ----------------------------------------------------------------------------- # Start components based on flags # ----------------------------------------------------------------------------- @@ -119,6 +155,11 @@ if [[ "${ENABLE_WEBSERVER}" -eq 1 ]]; then done & fi + +if [[ "${ENABLE_MCP_SERVER}" -eq 1 ]]; then + start_mcp_server +fi + if [[ "${ENABLE_TASKEXECUTOR}" -eq 1 ]]; then if [[ "${CONSUMER_NO_END}" -gt "${CONSUMER_NO_BEG}" ]]; then echo "Starting task executors on host '${HOST_ID}' for IDs in [${CONSUMER_NO_BEG}, ${CONSUMER_NO_END})..." diff --git a/mcp/client/client.py b/mcp/client/client.py new file mode 100644 index 000000000..4ee8de71c --- /dev/null +++ b/mcp/client/client.py @@ -0,0 +1,42 @@ +# +# Copyright 2025 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +from mcp.client.session import ClientSession +from mcp.client.sse import sse_client + + +async def main(): + try: + async with sse_client("http://localhost:9382/sse", headers={"api_key": "ragflow-IyMGI1ZDhjMTA2ZTExZjBiYTMyMGQ4Zm"}) as streams: + async with ClientSession( + streams[0], + streams[1], + ) as session: + await session.initialize() + tools = await session.list_tools() + print(f"{tools.tools=}") + response = await session.call_tool(name="ragflow_retrival", arguments={"dataset_ids": ["ce3bb17cf27a11efa69751e139332ced"], "document_ids": [], "question": "How to install neovim?"}) + print(f"Tool response: {response.model_dump()}") + + except Exception as e: + print(e) + + +if __name__ == "__main__": + from anyio import run + + run(main) diff --git a/mcp/server/server.py b/mcp/server/server.py new file mode 100644 index 000000000..b4dec43e1 --- /dev/null +++ b/mcp/server/server.py @@ -0,0 +1,234 @@ +# +# Copyright 2025 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import json +from collections.abc import AsyncIterator +from contextlib import asynccontextmanager + +import requests +from starlette.applications import Starlette +from starlette.middleware import Middleware +from starlette.middleware.base import BaseHTTPMiddleware +from starlette.responses import JSONResponse +from starlette.routing import Mount, Route + +import mcp.types as types +from mcp.server.lowlevel import Server +from mcp.server.sse import SseServerTransport + +BASE_URL = "http://127.0.0.1:9380" +HOST = "127.0.0.1" +PORT = "9382" + + +class RAGFlowConnector: + def __init__(self, base_url: str, version="v1"): + self.base_url = base_url + self.version = version + self.api_url = f"{self.base_url}/api/{self.version}" + + def bind_api_key(self, api_key: str): + self.api_key = api_key + self.authorization_header = {"Authorization": "{} {}".format("Bearer", self.api_key)} + + def _post(self, path, json=None, stream=False, files=None): + if not self.api_key: + return None + res = requests.post(url=self.api_url + path, json=json, headers=self.authorization_header, stream=stream, files=files) + return res + + def _get(self, path, params=None, json=None): + res = requests.get(url=self.api_url + path, params=params, headers=self.authorization_header, json=json) + return res + + def list_datasets(self, page: int = 1, page_size: int = 1000, orderby: str = "create_time", desc: bool = True, id: str | None = None, name: str | None = None): + res = self._get("/datasets", {"page": page, "page_size": page_size, "orderby": orderby, "desc": desc, "id": id, "name": name}) + if not res: + raise Exception([types.TextContent(type="text", text=res.get("Cannot process this operation."))]) + + res = res.json() + if res.get("code") == 0: + result_list = [] + for data in res["data"]: + d = {"description": data["description"], "id": data["id"]} + result_list.append(json.dumps(d, ensure_ascii=False)) + return "\n".join(result_list) + return "" + + def retrival( + self, dataset_ids, document_ids=None, question="", page=1, page_size=30, similarity_threshold=0.2, vector_similarity_weight=0.3, top_k=1024, rerank_id: str | None = None, keyword: bool = False + ): + if document_ids is None: + document_ids = [] + data_json = { + "page": page, + "page_size": page_size, + "similarity_threshold": similarity_threshold, + "vector_similarity_weight": vector_similarity_weight, + "top_k": top_k, + "rerank_id": rerank_id, + "keyword": keyword, + "question": question, + "dataset_ids": dataset_ids, + "document_ids": document_ids, + } + # Send a POST request to the backend service (using requests library as an example, actual implementation may vary) + res = self._post("/retrieval", json=data_json) + if not res: + raise Exception([types.TextContent(type="text", text=res.get("Cannot process this operation."))]) + + res = res.json() + if res.get("code") == 0: + chunks = [] + for chunk_data in res["data"].get("chunks"): + chunks.append(json.dumps(chunk_data, ensure_ascii=False)) + return [types.TextContent(type="text", text="\n".join(chunks))] + raise Exception([types.TextContent(type="text", text=res.get("message"))]) + + +class RAGFlowCtx: + def __init__(self, connector: RAGFlowConnector): + self.conn = connector + + +@asynccontextmanager +async def server_lifespan(server: Server) -> AsyncIterator[dict]: + ctx = RAGFlowCtx(RAGFlowConnector(base_url=BASE_URL)) + + try: + yield {"ragflow_ctx": ctx} + finally: + pass + + +app = Server("ragflow-server", lifespan=server_lifespan) +sse = SseServerTransport("/messages/") + + +@app.list_tools() +async def list_tools() -> list[types.Tool]: + ctx = app.request_context + ragflow_ctx = ctx.lifespan_context["ragflow_ctx"] + if not ragflow_ctx: + raise ValueError("Get RAGFlow Context failed") + connector = ragflow_ctx.conn + + api_key = ctx.session._init_options.capabilities.experimental["headers"]["api_key"] + if not api_key: + raise ValueError("RAGFlow API_KEY is required.") + connector.bind_api_key(api_key) + + dataset_description = connector.list_datasets() + + return [ + types.Tool( + name="retrival", + description="Retrieve relevant chunks from the RAGFlow retrieve interface based on the question, using the specified dataset_ids and optionally document_ids. Below is the list of all available datasets, including their descriptions and IDs. If you're unsure which datasets are relevant to the question, simply pass all dataset IDs to the function." + + dataset_description, + inputSchema={ + "type": "object", + "properties": {"dataset_ids": {"type": "array", "items": {"type": "string"}}, "documents_ids": {"type": "array", "items": {"type": "string"}}, "question": {"type": "string"}}, + "required": ["dataset_ids", "question"], + }, + ), + ] + + +@app.call_tool() +async def call_tool(name: str, arguments: dict) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: + ctx = app.request_context + ragflow_ctx = ctx.lifespan_context["ragflow_ctx"] + if not ragflow_ctx: + raise ValueError("Get RAGFlow Context failed") + connector = ragflow_ctx.conn + + api_key = ctx.session._init_options.capabilities.experimental["headers"]["api_key"] + if not api_key: + raise ValueError("RAGFlow API_KEY is required.") + connector.bind_api_key(api_key) + + if name == "ragflow_retrival": + return connector.retrival(dataset_ids=arguments["dataset_ids"], document_ids=arguments["document_ids"], question=arguments["question"]) + raise ValueError(f"Tool not found: {name}") + + +async def handle_sse(request): + async with sse.connect_sse(request.scope, request.receive, request._send) as streams: + await app.run(streams[0], streams[1], app.create_initialization_options(experimental_capabilities={"headers": dict(request.headers)})) + + +class AuthMiddleware(BaseHTTPMiddleware): + async def dispatch(self, request, call_next): + if request.url.path.startswith("/sse") or request.url.path.startswith("/messages"): + api_key = request.headers.get("api_key") + if not api_key: + return JSONResponse({"error": "Missing unauthorization header"}, status_code=401) + return await call_next(request) + + +starlette_app = Starlette( + debug=True, + routes=[ + Route("/sse", endpoint=handle_sse), + Mount("/messages/", app=sse.handle_post_message), + ], + middleware=[Middleware(AuthMiddleware)], +) + + +if __name__ == "__main__": + """ + Launch example: + uv run mcp/server/server.py --host=127.0.0.1 --port=9382 --base_url=http://127.0.0.1:9380 + """ + + import argparse + import os + + import uvicorn + from dotenv import load_dotenv + + load_dotenv() + + parser = argparse.ArgumentParser(description="RAGFlow MCP Server, `base_url` and `api_key` are needed.") + parser.add_argument("--base_url", type=str, default="http://127.0.0.1:9380", help="api_url: http://") + parser.add_argument("--host", type=str, default="127.0.0.1", help="RAGFlow MCP SERVER host") + parser.add_argument("--port", type=str, default="9382", help="RAGFlow MCP SERVER port") + args = parser.parse_args() + + BASE_URL = os.environ.get("RAGFLOW_MCP_BASE_URL", args.base_url) + HOST = os.environ.get("RAGFLOW_MCP_HOST", args.host) + PORT = os.environ.get("RAGFLOW_MCP_PORT", args.port) + + print( + r""" +__ __ ____ ____ ____ _____ ______ _______ ____ +| \/ |/ ___| _ \ / ___|| ____| _ \ \ / / ____| _ \ +| |\/| | | | |_) | \___ \| _| | |_) \ \ / /| _| | |_) | +| | | | |___| __/ ___) | |___| _ < \ V / | |___| _ < +|_| |_|\____|_| |____/|_____|_| \_\ \_/ |_____|_| \_\ + """, + flush=True, + ) + print(f"MCP host: {HOST}", flush=True) + print(f"MCP port: {PORT}", flush=True) + print(f"MCP base_url: {BASE_URL}", flush=True) + + uvicorn.run( + starlette_app, + host=HOST, + port=int(PORT), + ) diff --git a/pyproject.toml b/pyproject.toml index 59afacdf3..3132a4a0e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -126,6 +126,7 @@ dependencies = [ "trio>=0.29.0", "langfuse>=2.60.0", "debugpy>=1.8.13", + "mcp>=1.6.0", ] [project.optional-dependencies] diff --git a/uv.lock b/uv.lock index 9644e92f0..8ea35461d 100644 --- a/uv.lock +++ b/uv.lock @@ -150,7 +150,7 @@ wheels = [ [[package]] name = "akshare" -version = "1.16.60" +version = "1.16.77" source = { registry = "https://mirrors.aliyun.com/pypi/simple" } dependencies = [ { name = "aiohttp" }, @@ -171,9 +171,9 @@ dependencies = [ { name = "urllib3" }, { name = "xlrd" }, ] -sdist = { url = "https://mirrors.aliyun.com/pypi/packages/46/0a/cd9dacaec5b437fd10a21f022c90e055adade935375d47230b2e6ae6b609/akshare-1.16.60.tar.gz", hash = "sha256:f7ce998316954e64c737d99da2afb37275f94f6447d4bb9af83691b135b12c81" } +sdist = { url = "https://mirrors.aliyun.com/pypi/packages/94/90/ae63cd06d24bbfd0e79b0f4e1ac35a28af30ef9c7b9faee22e90deee3ae7/akshare-1.16.77.tar.gz", hash = "sha256:7f8d9f9fad15dda3d2736c3079ee4726e003701f9450574dc653e6047aa76ecc" } wheels = [ - { url = "https://mirrors.aliyun.com/pypi/packages/67/a9/dcd31299b4eb4d098148aef115e104d20d3d38dd36b0336510f44635c533/akshare-1.16.60-py3-none-any.whl", hash = "sha256:d8a5c8611dbffc0deddce7b2c3fdb168f0cd053fc0ebc5f0a4ac775c539db9d2" }, + { url = "https://mirrors.aliyun.com/pypi/packages/51/85/c0ccc7d8b3697997cc36351bb16377a37f31ff92fd82ac2a52c08ce05a0f/akshare-1.16.77-py3-none-any.whl", hash = "sha256:7e330514d2f851bb9938d5b469602918703951cbaa0e0961851ae39249d8a265" }, ] [[package]] @@ -2986,6 +2986,25 @@ wheels = [ { url = "https://mirrors.aliyun.com/pypi/packages/fd/7d/6a8b31dd07ed856b3eae001c9129670ef75c4698fa1c2a6ac9f00a4a7054/matplotlib-3.10.1-pp310-pypy310_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d3809916157ba871bcdd33d3493acd7fe3037db5daa917ca6e77975a94cef779" }, ] +[[package]] +name = "mcp" +version = "1.6.0" +source = { registry = "https://mirrors.aliyun.com/pypi/simple" } +dependencies = [ + { name = "anyio" }, + { name = "httpx" }, + { name = "httpx-sse" }, + { name = "pydantic" }, + { name = "pydantic-settings" }, + { name = "sse-starlette" }, + { name = "starlette" }, + { name = "uvicorn" }, +] +sdist = { url = "https://mirrors.aliyun.com/pypi/packages/95/d2/f587cb965a56e992634bebc8611c5b579af912b74e04eb9164bd49527d21/mcp-1.6.0.tar.gz", hash = "sha256:d9324876de2c5637369f43161cd71eebfd803df5a95e46225cab8d280e366723" } +wheels = [ + { url = "https://mirrors.aliyun.com/pypi/packages/10/30/20a7f33b0b884a9d14dd3aa94ff1ac9da1479fe2ad66dd9e2736075d2506/mcp-1.6.0-py3-none-any.whl", hash = "sha256:7bd24c6ea042dbec44c754f100984d186620d8b841ec30f1b19eda9b93a634d0" }, +] + [[package]] name = "mdurl" version = "0.1.2" @@ -4351,6 +4370,19 @@ wheels = [ { url = "https://mirrors.aliyun.com/pypi/packages/a9/f9/b6bcaf874f410564a78908739c80861a171788ef4d4f76f5009656672dfe/pydantic_core-2.23.4-pp310-pypy310_pp73-win_amd64.whl", hash = "sha256:9a5bce9d23aac8f0cf0836ecfc033896aa8443b501c58d0602dbfd5bd5b37753" }, ] +[[package]] +name = "pydantic-settings" +version = "2.8.1" +source = { registry = "https://mirrors.aliyun.com/pypi/simple" } +dependencies = [ + { name = "pydantic" }, + { name = "python-dotenv" }, +] +sdist = { url = "https://mirrors.aliyun.com/pypi/packages/88/82/c79424d7d8c29b994fb01d277da57b0a9b09cc03c3ff875f9bd8a86b2145/pydantic_settings-2.8.1.tar.gz", hash = "sha256:d5c663dfbe9db9d5e1c646b2e161da12f0d734d422ee56f567d0ea2cee4e8585" } +wheels = [ + { url = "https://mirrors.aliyun.com/pypi/packages/0b/53/a64f03044927dc47aafe029c42a5b7aabc38dfb813475e0e1bf71c4a59d0/pydantic_settings-2.8.1-py3-none-any.whl", hash = "sha256:81942d5ac3d905f7f3ee1a70df5dfb62d5569c12f51a5a647defc1c3d9ee2e9c" }, +] + [[package]] name = "pydash" version = "7.0.7" @@ -4797,6 +4829,7 @@ dependencies = [ { name = "langfuse" }, { name = "markdown" }, { name = "markdown-to-json" }, + { name = "mcp" }, { name = "mini-racer" }, { name = "minio" }, { name = "mistralai" }, @@ -4932,6 +4965,7 @@ requires-dist = [ { name = "langfuse", specifier = ">=2.60.0" }, { name = "markdown", specifier = "==3.6" }, { name = "markdown-to-json", specifier = "==2.1.1" }, + { name = "mcp", specifier = ">=1.6.0" }, { name = "mini-racer", specifier = ">=0.12.4,<0.13.0" }, { name = "minio", specifier = "==7.2.4" }, { name = "mistralai", specifier = "==0.4.2" }, @@ -5790,6 +5824,31 @@ wheels = [ { url = "https://mirrors.aliyun.com/pypi/packages/6c/60/ddcb8f70eaad61b5e574a8adba10cc8d8c0b8e5ad87f4836c3c52767a2ad/sqlglot-11.7.1-py3-none-any.whl", hash = "sha256:1ed7f5965eb4c917821f8a324af6586432d0019628c2e067958d1470637e1398" }, ] +[[package]] +name = "sse-starlette" +version = "2.2.1" +source = { registry = "https://mirrors.aliyun.com/pypi/simple" } +dependencies = [ + { name = "anyio" }, + { name = "starlette" }, +] +sdist = { url = "https://mirrors.aliyun.com/pypi/packages/71/a4/80d2a11af59fe75b48230846989e93979c892d3a20016b42bb44edb9e398/sse_starlette-2.2.1.tar.gz", hash = "sha256:54470d5f19274aeed6b2d473430b08b4b379ea851d953b11d7f1c4a2c118b419" } +wheels = [ + { url = "https://mirrors.aliyun.com/pypi/packages/d9/e0/5b8bd393f27f4a62461c5cf2479c75a2cc2ffa330976f9f00f5f6e4f50eb/sse_starlette-2.2.1-py3-none-any.whl", hash = "sha256:6410a3d3ba0c89e7675d4c273a301d64649c03a5ef1ca101f10b47f895fd0e99" }, +] + +[[package]] +name = "starlette" +version = "0.46.2" +source = { registry = "https://mirrors.aliyun.com/pypi/simple" } +dependencies = [ + { name = "anyio" }, +] +sdist = { url = "https://mirrors.aliyun.com/pypi/packages/ce/20/08dfcd9c983f6a6f4a1000d934b9e6d626cff8d2eeb77a89a68eef20a2b7/starlette-0.46.2.tar.gz", hash = "sha256:7f7361f34eed179294600af672f565727419830b54b7b084efe44bb82d2fccd5" } +wheels = [ + { url = "https://mirrors.aliyun.com/pypi/packages/8b/0c/9d30a4ebeb6db2b25a841afbb80f6ef9a854fc3b41be131d249a977b4959/starlette-0.46.2-py3-none-any.whl", hash = "sha256:595633ce89f8ffa71a015caed34a5b2dc1c0cdb3f0f1fbd1e69339cf2abeec35" }, +] + [[package]] name = "statsmodels" version = "0.14.4" @@ -6266,6 +6325,20 @@ socks = [ { name = "pysocks" }, ] +[[package]] +name = "uvicorn" +version = "0.34.1" +source = { registry = "https://mirrors.aliyun.com/pypi/simple" } +dependencies = [ + { name = "click" }, + { name = "h11" }, + { name = "typing-extensions", marker = "python_full_version < '3.11'" }, +] +sdist = { url = "https://mirrors.aliyun.com/pypi/packages/86/37/dd92f1f9cedb5eaf74d9999044306e06abe65344ff197864175dbbd91871/uvicorn-0.34.1.tar.gz", hash = "sha256:af981725fc4b7ffc5cb3b0e9eda6258a90c4b52cb2a83ce567ae0a7ae1757afc" } +wheels = [ + { url = "https://mirrors.aliyun.com/pypi/packages/5f/38/a5801450940a858c102a7ad9e6150146a25406a119851c993148d56ab041/uvicorn-0.34.1-py3-none-any.whl", hash = "sha256:984c3a8c7ca18ebaad15995ee7401179212c59521e67bfc390c07fa2b8d2e065" }, +] + [[package]] name = "valkey" version = "6.0.2"