From cded812b97c19a98ccc073376f9f7dab6583285b Mon Sep 17 00:00:00 2001 From: so95 Date: Thu, 3 Apr 2025 15:51:37 +0700 Subject: [PATCH] Feat: add OpenAI compatible API for agent (#6329) ### What problem does this PR solve? add openai agent _Briefly describe what this PR aims to solve. Include background context that will help reviewers understand the purpose of the PR._ ### Type of change - [ ] Bug Fix (non-breaking change which fixes an issue) - [x] New Feature (non-breaking change which adds functionality) - [ ] Documentation Update - [ ] Refactoring - [ ] Performance Improvement - [ ] Other (please describe): --------- Co-authored-by: Kevin Hu --- api/apps/sdk/session.py | 54 +++++-- api/db/services/canvas_service.py | 212 +++++++++++++++++++++++++- api/utils/api_utils.py | 40 +++++ docs/references/http_api_reference.md | 144 +++++++++++++++++ 4 files changed, 433 insertions(+), 17 deletions(-) diff --git a/api/apps/sdk/session.py b/api/apps/sdk/session.py index 51657128..44d0d8ba 100644 --- a/api/apps/sdk/session.py +++ b/api/apps/sdk/session.py @@ -17,22 +17,23 @@ import json import re import time +import tiktoken from flask import Response, jsonify, request - +from api.db.services.conversation_service import ConversationService, iframe_completion +from api.db.services.conversation_service import completion as rag_completion +from api.db.services.canvas_service import completion as agent_completion, completionOpenAI from agent.canvas import Canvas from api.db import LLMType, StatusEnum from api.db.db_models import APIToken from api.db.services.api_service import API4ConversationService from api.db.services.canvas_service import UserCanvasService -from api.db.services.canvas_service import completion as agent_completion -from api.db.services.conversation_service import ConversationService, iframe_completion -from api.db.services.conversation_service import completion as rag_completion from api.db.services.dialog_service import DialogService, ask, chat from api.db.services.file_service import FileService from api.db.services.knowledgebase_service import KnowledgebaseService -from api.db.services.llm_service import LLMBundle from api.utils import get_uuid -from api.utils.api_utils import get_error_data_result, get_result, token_required, validate_request +from api.utils.api_utils import get_result, token_required, get_data_openai, get_error_data_result, validate_request +from api.db.services.llm_service import LLMBundle + @manager.route("/chats//sessions", methods=["POST"]) # noqa: F821 @@ -71,14 +72,11 @@ def create_agent_session(tenant_id, agent_id): req = request.form files = request.files user_id = request.args.get("user_id", "") - e, cvs = UserCanvasService.get_by_id(agent_id) if not e: return get_error_data_result("Agent not found.") - if not UserCanvasService.query(user_id=tenant_id, id=agent_id): return get_error_data_result("You cannot access the agent.") - if not isinstance(cvs.dsl, str): cvs.dsl = json.dumps(cvs.dsl, ensure_ascii=False) @@ -352,6 +350,40 @@ def chat_completion_openai_like(tenant_id, chat_id): } return jsonify(response) +@manager.route('/agents_openai//chat/completions', methods=['POST']) # noqa: F821 +@validate_request("model", "messages") # noqa: F821 +@token_required +def agents_completion_openai_compatibility (tenant_id, agent_id): + req = request.json + tiktokenenc = tiktoken.get_encoding("cl100k_base") + messages = req.get("messages", []) + if not messages: + return get_error_data_result("You must provide at least one message.") + if not UserCanvasService.query(user_id=tenant_id, id=agent_id): + return get_error_data_result(f"You don't own the agent {agent_id}") + + filtered_messages = [m for m in messages if m["role"] in ["user", "assistant"]] + prompt_tokens = sum(len(tiktokenenc.encode(m["content"])) for m in filtered_messages) + if not filtered_messages: + return jsonify(get_data_openai( + id=agent_id, + content="No valid messages found (user or assistant).", + finish_reason="stop", + model=req.get("model", ""), + completion_tokens=len(tiktokenenc.encode("No valid messages found (user or assistant).")), + prompt_tokens=prompt_tokens, + )) + + # Get the last user message as the question + question = next((m["content"] for m in reversed(messages) if m["role"] == "user"), "") + + if req.get("stream", True): + return Response(completionOpenAI(tenant_id, agent_id, question, session_id=req.get("id", ""), stream=True), mimetype="text/event-stream") + else: + # For non-streaming, just return the response directly + response = next(completionOpenAI(tenant_id, agent_id, question, session_id=req.get("id", ""), stream=False)) + return jsonify(response) + @manager.route("/agents//completions", methods=["POST"]) # noqa: F821 @token_required @@ -364,9 +396,7 @@ def agent_completions(tenant_id, agent_id): dsl = cvs[0].dsl if not isinstance(dsl, str): dsl = json.dumps(dsl) - # canvas = Canvas(dsl, tenant_id) - # if canvas.get_preset_param(): - # req["question"] = "" + conv = API4ConversationService.query(id=req["session_id"], dialog_id=agent_id) if not conv: return get_error_data_result(f"You don't own the session {req['session_id']}") diff --git a/api/db/services/canvas_service.py b/api/db/services/canvas_service.py index 8ca1affe..bca6a6be 100644 --- a/api/db/services/canvas_service.py +++ b/api/db/services/canvas_service.py @@ -24,8 +24,9 @@ from api.db.services.api_service import API4ConversationService from api.db.services.common_service import CommonService from api.db.services.conversation_service import structure_answer from api.utils import get_uuid +from api.utils.api_utils import get_data_openai +import tiktoken from peewee import fn - class CanvasTemplateService(CommonService): model = CanvasTemplate @@ -100,14 +101,14 @@ class UserCanvasService(CommonService): ] if keywords: angents = cls.model.select(*fields).join(User, on=(cls.model.user_id == User.id)).where( - ((cls.model.user_id.in_(joined_tenant_ids) & (cls.model.permission == + ((cls.model.user_id.in_(joined_tenant_ids) & (cls.model.permission == TenantPermission.TEAM.value)) | ( cls.model.user_id == user_id)), (fn.LOWER(cls.model.title).contains(keywords.lower())) ) else: angents = cls.model.select(*fields).join(User, on=(cls.model.user_id == User.id)).where( - ((cls.model.user_id.in_(joined_tenant_ids) & (cls.model.permission == + ((cls.model.user_id.in_(joined_tenant_ids) & (cls.model.permission == TenantPermission.TEAM.value)) | ( cls.model.user_id == user_id)) ) @@ -154,8 +155,6 @@ def completion(tenant_id, agent_id, question, session_id=None, stream=True, **kw "dsl": cvs.dsl } API4ConversationService.save(**conv) - - conv = API4Conversation(**conv) else: e, conv = API4ConversationService.get_by_id(session_id) @@ -221,3 +220,206 @@ def completion(tenant_id, agent_id, question, session_id=None, stream=True, **kw API4ConversationService.append_message(conv.id, conv.to_dict()) yield result break +def completionOpenAI(tenant_id, agent_id, question, session_id=None, stream=True, **kwargs): + """Main function for OpenAI-compatible completions, structured similarly to the completion function.""" + tiktokenenc = tiktoken.get_encoding("cl100k_base") + e, cvs = UserCanvasService.get_by_id(agent_id) + + if not e: + yield get_data_openai( + id=session_id, + model=agent_id, + content="**ERROR**: Agent not found." + ) + return + + if cvs.user_id != tenant_id: + yield get_data_openai( + id=session_id, + model=agent_id, + content="**ERROR**: You do not own the agent" + ) + return + + if not isinstance(cvs.dsl, str): + cvs.dsl = json.dumps(cvs.dsl, ensure_ascii=False) + + canvas = Canvas(cvs.dsl, tenant_id) + canvas.reset() + message_id = str(uuid4()) + + # Handle new session creation + if not session_id: + query = canvas.get_preset_param() + if query: + for ele in query: + if not ele["optional"]: + if not kwargs.get(ele["key"]): + yield get_data_openai( + id=None, + model=agent_id, + content=f"`{ele['key']}` is required", + completion_tokens=len(tiktokenenc.encode(f"`{ele['key']}` is required")), + prompt_tokens=len(tiktokenenc.encode(question if question else "")) + ) + return + ele["value"] = kwargs[ele["key"]] + if ele["optional"]: + if kwargs.get(ele["key"]): + ele["value"] = kwargs[ele['key']] + else: + if "value" in ele: + ele.pop("value") + + cvs.dsl = json.loads(str(canvas)) + session_id = get_uuid() + conv = { + "id": session_id, + "dialog_id": cvs.id, + "user_id": kwargs.get("user_id", "") if isinstance(kwargs, dict) else "", + "message": [{"role": "assistant", "content": canvas.get_prologue(), "created_at": time.time()}], + "source": "agent", + "dsl": cvs.dsl + } + API4ConversationService.save(**conv) + conv = API4Conversation(**conv) + + # Handle existing session + else: + e, conv = API4ConversationService.get_by_id(session_id) + if not e: + yield get_data_openai( + id=session_id, + model=agent_id, + content="**ERROR**: Session not found!" + ) + return + + canvas = Canvas(json.dumps(conv.dsl), tenant_id) + canvas.messages.append({"role": "user", "content": question, "id": message_id}) + canvas.add_user_input(question) + + if not conv.message: + conv.message = [] + conv.message.append({ + "role": "user", + "content": question, + "id": message_id + }) + + if not conv.reference: + conv.reference = [] + conv.reference.append({"chunks": [], "doc_aggs": []}) + + # Process request based on stream mode + final_ans = {"reference": [], "content": ""} + prompt_tokens = len(tiktokenenc.encode(str(question))) + + if stream: + try: + completion_tokens = 0 + for ans in canvas.run(stream=True): + if ans.get("running_status"): + completion_tokens += len(tiktokenenc.encode(ans.get("content", ""))) + yield "data: " + json.dumps( + get_data_openai( + id=session_id, + model=agent_id, + content=ans["content"], + object="chat.completion.chunk", + completion_tokens=completion_tokens, + prompt_tokens=prompt_tokens + ), + ensure_ascii=False + ) + "\n\n" + continue + + for k in ans.keys(): + final_ans[k] = ans[k] + + completion_tokens += len(tiktokenenc.encode(final_ans.get("content", ""))) + yield "data: " + json.dumps( + get_data_openai( + id=session_id, + model=agent_id, + content=final_ans["content"], + object="chat.completion.chunk", + finish_reason="stop", + completion_tokens=completion_tokens, + prompt_tokens=prompt_tokens + ), + ensure_ascii=False + ) + "\n\n" + + # Update conversation + canvas.messages.append({"role": "assistant", "content": final_ans["content"], "created_at": time.time(), "id": message_id}) + canvas.history.append(("assistant", final_ans["content"])) + if final_ans.get("reference"): + canvas.reference.append(final_ans["reference"]) + conv.dsl = json.loads(str(canvas)) + API4ConversationService.append_message(conv.id, conv.to_dict()) + + yield "data: [DONE]\n\n" + + except Exception as e: + traceback.print_exc() + conv.dsl = json.loads(str(canvas)) + API4ConversationService.append_message(conv.id, conv.to_dict()) + yield "data: " + json.dumps( + get_data_openai( + id=session_id, + model=agent_id, + content="**ERROR**: " + str(e), + finish_reason="stop", + completion_tokens=len(tiktokenenc.encode("**ERROR**: " + str(e))), + prompt_tokens=prompt_tokens + ), + ensure_ascii=False + ) + "\n\n" + yield "data: [DONE]\n\n" + + else: # Non-streaming mode + try: + all_answer_content = "" + for answer in canvas.run(stream=False): + if answer.get("running_status"): + continue + + final_ans["content"] = "\n".join(answer["content"]) if "content" in answer else "" + final_ans["reference"] = answer.get("reference", []) + all_answer_content += final_ans["content"] + + final_ans["content"] = all_answer_content + + # Update conversation + canvas.messages.append({"role": "assistant", "content": final_ans["content"], "created_at": time.time(), "id": message_id}) + canvas.history.append(("assistant", final_ans["content"])) + if final_ans.get("reference"): + canvas.reference.append(final_ans["reference"]) + conv.dsl = json.loads(str(canvas)) + API4ConversationService.append_message(conv.id, conv.to_dict()) + + # Return the response in OpenAI format + yield get_data_openai( + id=session_id, + model=agent_id, + content=final_ans["content"], + finish_reason="stop", + completion_tokens=len(tiktokenenc.encode(final_ans["content"])), + prompt_tokens=prompt_tokens, + param=canvas.get_preset_param() # Added param info like in completion + ) + + except Exception as e: + traceback.print_exc() + conv.dsl = json.loads(str(canvas)) + API4ConversationService.append_message(conv.id, conv.to_dict()) + yield get_data_openai( + id=session_id, + model=agent_id, + content="**ERROR**: " + str(e), + finish_reason="stop", + completion_tokens=len(tiktokenenc.encode("**ERROR**: " + str(e))), + prompt_tokens=prompt_tokens + ) + diff --git a/api/utils/api_utils.py b/api/utils/api_utils.py index 6de8c412..593ec625 100644 --- a/api/utils/api_utils.py +++ b/api/utils/api_utils.py @@ -378,6 +378,46 @@ def get_parser_config(chunk_method, parser_config): return parser_config +def get_data_openai(id=None, + created=None, + model=None, + prompt_tokens= 0, + completion_tokens=0, + content = None, + finish_reason= None, + object="chat.completion", + param=None, +): + + total_tokens= prompt_tokens + completion_tokens + return { + "id":f"{id}", + "object": object, + "created": int(time.time()) if created else None, + "model": model, + "param":param, + "usage": { + "prompt_tokens": prompt_tokens, + "completion_tokens": completion_tokens, + "total_tokens": total_tokens, + "completion_tokens_details": { + "reasoning_tokens": 0, + "accepted_prediction_tokens": 0, + "rejected_prediction_tokens": 0 + } + }, + "choices": [ + { + "message": { + "role": "assistant", + "content": content + }, + "logprobs": None, + "finish_reason": finish_reason, + "index": 0 + } + ] + } def valid_parser_config(parser_config): if not parser_config: return diff --git a/docs/references/http_api_reference.md b/docs/references/http_api_reference.md index 5d6f363a..b8ec90d1 100644 --- a/docs/references/http_api_reference.md +++ b/docs/references/http_api_reference.md @@ -166,6 +166,150 @@ Non-stream: Failure: +```json +{ + "code": 102, + "message": "The last content of this conversation is not from user." +} +``` +--- +### Create agent completion + +**POST** `/api/v1/agents_openai/{agent_id}/chat/completions` + +Creates a model response for a given chat conversation. + +This API follows the same request and response format as OpenAI's API. It allows you to interact with the model in a manner similar to how you would with [OpenAI's API](https://platform.openai.com/docs/api-reference/chat/create). + +#### Request + +- Method: POST +- URL: `/api/v1/agents_openai/{agent_id}/chat/completions` +- Headers: + - `'content-Type: application/json'` + - `'Authorization: Bearer '` +- Body: + - `"model"`: `string` + - `"messages"`: `object list` + - `"stream"`: `boolean` + +##### Request example + +```bash +curl --request POST \ + --url http://{address}/api/v1/agents_openai/{agent_id}/chat/completions \ + --header 'Content-Type: application/json' \ + --header 'Authorization: Bearer ' \ + --data '{ + "model": "model", + "messages": [{"role": "user", "content": "Say this is a test!"}], + "stream": true + }' +``` + +##### Request Parameters + +- `model` (*Body parameter*) `string`, *Required* + The model used to generate the response. The server will parse this automatically, so you can set it to any value for now. + +- `messages` (*Body parameter*) `list[object]`, *Required* + A list of historical chat messages used to generate the response. This must contain at least one message with the `user` role. + +- `stream` (*Body parameter*) `boolean` + Whether to receive the response as a stream. Set this to `false` explicitly if you prefer to receive the entire response in one go instead of as a stream. + +#### Response + +Stream: + +```json +{ + "id": "chatcmpl-3a9c3572f29311efa69751e139332ced", + "choices": [ + { + "delta": { + "content": "This is a test. If you have any specific questions or need information, feel", + "role": "assistant", + "function_call": null, + "tool_calls": null + }, + "finish_reason": null, + "index": 0, + "logprobs": null + } + ], + "created": 1740543996, + "model": "model", + "object": "chat.completion.chunk", + "system_fingerprint": "", + "usage": null +} +// omit duplicated information +{"choices":[{"delta":{"content":" free to ask, and I will do my best to provide an answer based on","role":"assistant"}}]} +{"choices":[{"delta":{"content":" the knowledge I have. If your question is unrelated to the provided knowledge base,","role":"assistant"}}]} +{"choices":[{"delta":{"content":" I will let you know.","role":"assistant"}}]} +// the last chunk +{ + "id": "chatcmpl-3a9c3572f29311efa69751e139332ced", + "choices": [ + { + "delta": { + "content": null, + "role": "assistant", + "function_call": null, + "tool_calls": null + }, + "finish_reason": "stop", + "index": 0, + "logprobs": null + } + ], + "created": 1740543996, + "model": "model", + "object": "chat.completion.chunk", + "system_fingerprint": "", + "usage": { + "prompt_tokens": 18, + "completion_tokens": 225, + "total_tokens": 243 + } +} +``` + +Non-stream: + +```json +{ + "choices":[ + { + "finish_reason":"stop", + "index":0, + "logprobs":null, + "message":{ + "content":"This is a test. If you have any specific questions or need information, feel free to ask, and I will do my best to provide an answer based on the knowledge I have. If your question is unrelated to the provided knowledge base, I will let you know.", + "role":"assistant" + } + } + ], + "created":1740543499, + "id":"chatcmpl-3a9c3572f29311efa69751e139332ced", + "model":"model", + "object":"chat.completion", + "usage":{ + "completion_tokens":246, + "completion_tokens_details":{ + "accepted_prediction_tokens":246, + "reasoning_tokens":18, + "rejected_prediction_tokens":0 + }, + "prompt_tokens":18, + "total_tokens":264 + } +} +``` + +Failure: + ```json { "code": 102,