mirror of
https://git.mirrors.martin98.com/https://github.com/infiniflow/ragflow.git
synced 2025-07-31 02:01:57 +08:00
support S3 for file storage (#15)
* feat: support S3 for file storage * doc: fmt env template
This commit is contained in:
parent
6858ec5f38
commit
f4d1c724dd
3
.env
3
.env
@ -1,3 +0,0 @@
|
|||||||
HOST=127.0.0.1
|
|
||||||
PORT=8000
|
|
||||||
DATABASE_URL="postgresql://infiniflow:infiniflow@localhost/docgpt"
|
|
@ -1,3 +1,9 @@
|
|||||||
|
# Database
|
||||||
HOST=127.0.0.1
|
HOST=127.0.0.1
|
||||||
PORT=8000
|
PORT=8000
|
||||||
DATABASE_URL="postgresql://infiniflow:infiniflow@localhost/docgpt"
|
DATABASE_URL="postgresql://infiniflow:infiniflow@localhost/docgpt"
|
||||||
|
|
||||||
|
# S3 Storage
|
||||||
|
S3_BASE_URL="https://play.min.io"
|
||||||
|
S3_ACCESS_KEY="Q3AM3UQ867SPQQA43P2F"
|
||||||
|
S3_SECRET_KEY="zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG"
|
@ -23,6 +23,7 @@ dotenvy = "0.15.7"
|
|||||||
listenfd = "1.0.1"
|
listenfd = "1.0.1"
|
||||||
chrono = "0.4.31"
|
chrono = "0.4.31"
|
||||||
migration = { path = "./migration" }
|
migration = { path = "./migration" }
|
||||||
|
minio = "0.1.0"
|
||||||
futures-util = "0.3.29"
|
futures-util = "0.3.29"
|
||||||
actix-multipart-extract = "0.1.5"
|
actix-multipart-extract = "0.1.5"
|
||||||
|
|
||||||
|
@ -1,8 +1,9 @@
|
|||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::io::Write;
|
use std::io::Write;
|
||||||
use actix_multipart_extract::{File, Multipart, MultipartForm};
|
use actix_multipart_extract::{File, Multipart, MultipartForm};
|
||||||
use actix_web::{ HttpResponse, post, web };
|
use actix_web::{get, HttpResponse, post, web};
|
||||||
use chrono::{Utc, FixedOffset};
|
use chrono::{Utc, FixedOffset};
|
||||||
|
use minio::s3::args::{BucketExistsArgs, MakeBucketArgs, UploadObjectArgs};
|
||||||
use sea_orm::DbConn;
|
use sea_orm::DbConn;
|
||||||
use crate::api::JsonResponse;
|
use crate::api::JsonResponse;
|
||||||
use crate::AppState;
|
use crate::AppState;
|
||||||
@ -11,6 +12,9 @@ use crate::errors::AppError;
|
|||||||
use crate::service::doc_info::{ Mutation, Query };
|
use crate::service::doc_info::{ Mutation, Query };
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
|
|
||||||
|
const BUCKET_NAME: &'static str = "docgpt-upload";
|
||||||
|
|
||||||
|
|
||||||
fn now() -> chrono::DateTime<FixedOffset> {
|
fn now() -> chrono::DateTime<FixedOffset> {
|
||||||
Utc::now().with_timezone(&FixedOffset::east_opt(3600 * 8).unwrap())
|
Utc::now().with_timezone(&FixedOffset::east_opt(3600 * 8).unwrap())
|
||||||
}
|
}
|
||||||
@ -69,53 +73,50 @@ async fn upload(
|
|||||||
data: web::Data<AppState>
|
data: web::Data<AppState>
|
||||||
) -> Result<HttpResponse, AppError> {
|
) -> Result<HttpResponse, AppError> {
|
||||||
let uid = payload.uid;
|
let uid = payload.uid;
|
||||||
async fn add_number_to_filename(
|
let file_name = payload.file_field.name.as_str();
|
||||||
file_name: String,
|
async fn add_number_to_filename(file_name: &str, conn:&DbConn, uid:i64, parent_id:i64) -> String {
|
||||||
conn: &DbConn,
|
|
||||||
uid: i64,
|
|
||||||
parent_id: i64
|
|
||||||
) -> String {
|
|
||||||
let mut i = 0;
|
let mut i = 0;
|
||||||
let mut new_file_name = file_name.to_string();
|
let mut new_file_name = file_name.to_string();
|
||||||
let arr: Vec<&str> = file_name.split(".").collect();
|
let arr: Vec<&str> = file_name.split(".").collect();
|
||||||
let suffix = String::from(arr[arr.len()-1]);
|
let suffix = String::from(arr[arr.len()-1]);
|
||||||
let preffix = arr[..arr.len()-1].join(".");
|
let preffix = arr[..arr.len()-1].join(".");
|
||||||
let mut docs = Query::find_doc_infos_by_name(
|
let mut docs = Query::find_doc_infos_by_name(conn, uid, &new_file_name, Some(parent_id)).await.unwrap();
|
||||||
conn,
|
|
||||||
uid,
|
|
||||||
&new_file_name,
|
|
||||||
Some(parent_id)
|
|
||||||
).await.unwrap();
|
|
||||||
while docs.len()>0 {
|
while docs.len()>0 {
|
||||||
i += 1;
|
i += 1;
|
||||||
new_file_name = format!("{}_{}.{}", preffix, i, suffix);
|
new_file_name = format!("{}_{}.{}", preffix, i, suffix);
|
||||||
docs = Query::find_doc_infos_by_name(
|
docs = Query::find_doc_infos_by_name(conn, uid, &new_file_name, Some(parent_id)).await.unwrap();
|
||||||
conn,
|
|
||||||
uid,
|
|
||||||
&new_file_name,
|
|
||||||
Some(parent_id)
|
|
||||||
).await.unwrap();
|
|
||||||
}
|
}
|
||||||
new_file_name
|
new_file_name
|
||||||
}
|
}
|
||||||
let fnm = add_number_to_filename(
|
let fnm = add_number_to_filename(file_name, &data.conn, uid, payload.did).await;
|
||||||
payload.file_field.name.clone(),
|
|
||||||
&data.conn,
|
|
||||||
uid,
|
|
||||||
payload.did
|
|
||||||
).await;
|
|
||||||
|
|
||||||
std::fs::create_dir_all(format!("./upload/{}/", uid));
|
let s3_client = &data.s3_client;
|
||||||
let filepath = format!("./upload/{}/{}-{}", payload.uid, payload.did, fnm.clone());
|
let buckets_exists = s3_client
|
||||||
let mut f = std::fs::File::create(&filepath)?;
|
.bucket_exists(&BucketExistsArgs::new(BUCKET_NAME)?)
|
||||||
f.write(&payload.file_field.bytes)?;
|
.await?;
|
||||||
|
if !buckets_exists {
|
||||||
|
s3_client
|
||||||
|
.make_bucket(&MakeBucketArgs::new(BUCKET_NAME)?)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
s3_client
|
||||||
|
.upload_object(
|
||||||
|
&mut UploadObjectArgs::new(
|
||||||
|
BUCKET_NAME,
|
||||||
|
fnm.as_str(),
|
||||||
|
format!("/{}/{}-{}", payload.uid, payload.did, fnm).as_str()
|
||||||
|
)?
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let location = format!("/{}/{}", BUCKET_NAME, fnm);
|
||||||
let doc = Mutation::create_doc_info(&data.conn, Model {
|
let doc = Mutation::create_doc_info(&data.conn, Model {
|
||||||
did:Default::default(),
|
did:Default::default(),
|
||||||
uid: uid,
|
uid: uid,
|
||||||
doc_name: fnm,
|
doc_name: fnm,
|
||||||
size: payload.file_field.bytes.len() as i64,
|
size: payload.file_field.bytes.len() as i64,
|
||||||
location: filepath,
|
location,
|
||||||
r#type: "doc".to_string(),
|
r#type: "doc".to_string(),
|
||||||
created_at: now(),
|
created_at: now(),
|
||||||
updated_at: now(),
|
updated_at: now(),
|
||||||
|
@ -3,15 +3,23 @@ use thiserror::Error;
|
|||||||
|
|
||||||
#[derive(Debug, Error)]
|
#[derive(Debug, Error)]
|
||||||
pub(crate) enum AppError {
|
pub(crate) enum AppError {
|
||||||
#[error("`{0}`")] User(#[from] UserError),
|
#[error("`{0}`")]
|
||||||
|
User(#[from] UserError),
|
||||||
|
|
||||||
#[error("`{0}`")] Json(#[from] serde_json::Error),
|
#[error("`{0}`")]
|
||||||
|
Json(#[from] serde_json::Error),
|
||||||
|
|
||||||
#[error("`{0}`")] Actix(#[from] actix_web::Error),
|
#[error("`{0}`")]
|
||||||
|
Actix(#[from] actix_web::Error),
|
||||||
|
|
||||||
#[error("`{0}`")] Db(#[from] sea_orm::DbErr),
|
#[error("`{0}`")]
|
||||||
|
Db(#[from] sea_orm::DbErr),
|
||||||
|
|
||||||
#[error("`{0}`")] Std(#[from] std::io::Error),
|
#[error("`{0}`")]
|
||||||
|
MinioS3(#[from] minio::s3::error::Error),
|
||||||
|
|
||||||
|
#[error("`{0}`")]
|
||||||
|
Std(#[from] std::io::Error),
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Error)]
|
#[derive(Debug, Error)]
|
||||||
@ -28,7 +36,8 @@ pub(crate) enum UserError {
|
|||||||
#[error("`password` field of `User` cannot contain whitespaces!")]
|
#[error("`password` field of `User` cannot contain whitespaces!")]
|
||||||
PasswordInvalidCharacter,
|
PasswordInvalidCharacter,
|
||||||
|
|
||||||
#[error("Could not find any `User` for id: `{0}`!")] NotFound(i64),
|
#[error("Could not find any `User` for id: `{0}`!")]
|
||||||
|
NotFound(i64),
|
||||||
|
|
||||||
#[error("Failed to login user!")]
|
#[error("Failed to login user!")]
|
||||||
LoginFailed,
|
LoginFailed,
|
||||||
@ -46,8 +55,7 @@ pub(crate) enum UserError {
|
|||||||
impl ResponseError for AppError {
|
impl ResponseError for AppError {
|
||||||
fn status_code(&self) -> actix_web::http::StatusCode {
|
fn status_code(&self) -> actix_web::http::StatusCode {
|
||||||
match self {
|
match self {
|
||||||
AppError::User(user_error) =>
|
AppError::User(user_error) => match user_error {
|
||||||
match user_error {
|
|
||||||
UserError::EmptyUsername => actix_web::http::StatusCode::UNPROCESSABLE_ENTITY,
|
UserError::EmptyUsername => actix_web::http::StatusCode::UNPROCESSABLE_ENTITY,
|
||||||
UserError::UsernameInvalidCharacter => {
|
UserError::UsernameInvalidCharacter => {
|
||||||
actix_web::http::StatusCode::UNPROCESSABLE_ENTITY
|
actix_web::http::StatusCode::UNPROCESSABLE_ENTITY
|
||||||
@ -61,11 +69,9 @@ impl ResponseError for AppError {
|
|||||||
UserError::Empty => actix_web::http::StatusCode::NOT_FOUND,
|
UserError::Empty => actix_web::http::StatusCode::NOT_FOUND,
|
||||||
UserError::LoginFailed => actix_web::http::StatusCode::NOT_FOUND,
|
UserError::LoginFailed => actix_web::http::StatusCode::NOT_FOUND,
|
||||||
UserError::InvalidToken => actix_web::http::StatusCode::UNAUTHORIZED,
|
UserError::InvalidToken => actix_web::http::StatusCode::UNAUTHORIZED,
|
||||||
}
|
},
|
||||||
AppError::Json(_) => actix_web::http::StatusCode::INTERNAL_SERVER_ERROR,
|
|
||||||
AppError::Actix(fail) => fail.as_response_error().status_code(),
|
AppError::Actix(fail) => fail.as_response_error().status_code(),
|
||||||
AppError::Db(_) => actix_web::http::StatusCode::INTERNAL_SERVER_ERROR,
|
_ => actix_web::http::StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
AppError::Std(_) => actix_web::http::StatusCode::INTERNAL_SERVER_ERROR,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
39
src/main.rs
39
src/main.rs
@ -13,18 +13,22 @@ use actix_web::dev::ServiceRequest;
|
|||||||
use actix_web::error::ErrorUnauthorized;
|
use actix_web::error::ErrorUnauthorized;
|
||||||
use actix_web_httpauth::extractors::bearer::BearerAuth;
|
use actix_web_httpauth::extractors::bearer::BearerAuth;
|
||||||
use listenfd::ListenFd;
|
use listenfd::ListenFd;
|
||||||
|
use minio::s3::client::Client;
|
||||||
|
use minio::s3::creds::StaticProvider;
|
||||||
|
use minio::s3::http::BaseUrl;
|
||||||
use sea_orm::{Database, DatabaseConnection};
|
use sea_orm::{Database, DatabaseConnection};
|
||||||
use migration::{Migrator, MigratorTrait};
|
use migration::{Migrator, MigratorTrait};
|
||||||
use crate::errors::UserError;
|
use crate::errors::{AppError, UserError};
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
struct AppState {
|
struct AppState {
|
||||||
conn: DatabaseConnection,
|
conn: DatabaseConnection,
|
||||||
|
s3_client: Client,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn validator(
|
pub(crate) async fn validator(
|
||||||
req: ServiceRequest,
|
req: ServiceRequest,
|
||||||
credentials: BearerAuth
|
credentials: BearerAuth,
|
||||||
) -> Result<ServiceRequest, Error> {
|
) -> Result<ServiceRequest, Error> {
|
||||||
if let Some(token) = req.get_identity() {
|
if let Some(token) = req.get_identity() {
|
||||||
println!("{}, {}",credentials.token(), token);
|
println!("{}, {}",credentials.token(), token);
|
||||||
@ -37,7 +41,7 @@ pub(crate) async fn validator(
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[actix_web::main]
|
#[actix_web::main]
|
||||||
async fn main() -> std::io::Result<()> {
|
async fn main() -> Result<(), AppError> {
|
||||||
std::env::set_var("RUST_LOG", "debug");
|
std::env::set_var("RUST_LOG", "debug");
|
||||||
tracing_subscriber::fmt::init();
|
tracing_subscriber::fmt::init();
|
||||||
|
|
||||||
@ -48,12 +52,29 @@ async fn main() -> std::io::Result<()> {
|
|||||||
let port = env::var("PORT").expect("PORT is not set in .env file");
|
let port = env::var("PORT").expect("PORT is not set in .env file");
|
||||||
let server_url = format!("{host}:{port}");
|
let server_url = format!("{host}:{port}");
|
||||||
|
|
||||||
|
let s3_base_url = env::var("S3_BASE_URL").expect("S3_BASE_URL is not set in .env file");
|
||||||
|
let s3_access_key = env::var("S3_ACCESS_KEY").expect("S3_ACCESS_KEY is not set in .env file");;
|
||||||
|
let s3_secret_key = env::var("S3_SECRET_KEY").expect("S3_SECRET_KEY is not set in .env file");;
|
||||||
|
|
||||||
// establish connection to database and apply migrations
|
// establish connection to database and apply migrations
|
||||||
// -> create post table if not exists
|
// -> create post table if not exists
|
||||||
let conn = Database::connect(&db_url).await.unwrap();
|
let conn = Database::connect(&db_url).await.unwrap();
|
||||||
Migrator::up(&conn, None).await.unwrap();
|
Migrator::up(&conn, None).await.unwrap();
|
||||||
|
|
||||||
let state = AppState { conn };
|
let static_provider = StaticProvider::new(
|
||||||
|
s3_access_key.as_str(),
|
||||||
|
s3_secret_key.as_str(),
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
|
||||||
|
let s3_client = Client::new(
|
||||||
|
s3_base_url.parse::<BaseUrl>()?,
|
||||||
|
Some(Box::new(static_provider)),
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
let state = AppState { conn, s3_client };
|
||||||
|
|
||||||
// create server and try to serve over socket if possible
|
// create server and try to serve over socket if possible
|
||||||
let mut listenfd = ListenFd::from_env();
|
let mut listenfd = ListenFd::from_env();
|
||||||
@ -61,20 +82,18 @@ async fn main() -> std::io::Result<()> {
|
|||||||
App::new()
|
App::new()
|
||||||
.service(Files::new("/static", "./static"))
|
.service(Files::new("/static", "./static"))
|
||||||
.app_data(web::Data::new(state.clone()))
|
.app_data(web::Data::new(state.clone()))
|
||||||
.wrap(
|
.wrap(IdentityService::new(
|
||||||
IdentityService::new(
|
|
||||||
CookieIdentityPolicy::new(&[0; 32])
|
CookieIdentityPolicy::new(&[0; 32])
|
||||||
.name("auth-cookie")
|
.name("auth-cookie")
|
||||||
.login_deadline(Duration::seconds(120))
|
.login_deadline(Duration::seconds(120))
|
||||||
.secure(false)
|
.secure(false),
|
||||||
)
|
))
|
||||||
)
|
|
||||||
.wrap(
|
.wrap(
|
||||||
CookieSession::signed(&[0; 32])
|
CookieSession::signed(&[0; 32])
|
||||||
.name("session-cookie")
|
.name("session-cookie")
|
||||||
.secure(false)
|
.secure(false)
|
||||||
// WARNING(alex): This uses the `time` crate, not `std::time`!
|
// WARNING(alex): This uses the `time` crate, not `std::time`!
|
||||||
.expires_in_time(Duration::seconds(60))
|
.expires_in_time(Duration::seconds(60)),
|
||||||
)
|
)
|
||||||
.wrap(middleware::Logger::default())
|
.wrap(middleware::Logger::default())
|
||||||
.configure(init)
|
.configure(init)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user