improve sqlite integration
- add BEFORE_DISCONNECT and AFTER_CONNECT to run some pragmas on connection creation and destruction - add STRICT keyword to create table statements and fix not existing DATETIME in sqlite - use env::temp_dir to get a temp directory to run the database tests against which should make this runnable on windows as well - create/update version of FileModel in database when updating it
This commit is contained in:
parent
851c74cd2d
commit
c53bdb0ad3
2
.gitignore
vendored
2
.gitignore
vendored
@ -1,2 +1,4 @@
|
||||
/target
|
||||
.idea
|
||||
casket.sqlite
|
||||
data
|
||||
|
||||
@ -4,11 +4,17 @@ use crate::db::repository::Repository;
|
||||
use crate::errors::to_internal_error;
|
||||
use axum::http::StatusCode;
|
||||
use problem_details::ProblemDetails;
|
||||
use r2d2::PooledConnection;
|
||||
use r2d2::{CustomizeConnection, PooledConnection};
|
||||
use r2d2_sqlite::SqliteConnectionManager;
|
||||
use rusqlite::{Connection, OptionalExtension};
|
||||
use rusqlite_migration::{Migrations, M};
|
||||
use std::path::PathBuf;
|
||||
const BEFORE_DISCONNECT: &str = "PRAGMA analysis_limit=400;
|
||||
PRAGMA optimize;";
|
||||
|
||||
const AFTER_CONNECT: &str = "PRAGMA foreign_keys=1;
|
||||
PRAGMA journal_mode=WAL;
|
||||
PRAGMA synchronous=NORMAL;";
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Sqlite {
|
||||
@ -18,8 +24,15 @@ pub struct Sqlite {
|
||||
|
||||
impl Sqlite {
|
||||
pub fn from_path(path: &PathBuf) -> Sqlite {
|
||||
let pool = r2d2::Pool::builder()
|
||||
.connection_customizer(Box::new(ConnectionCustomizer))
|
||||
.build(
|
||||
SqliteConnectionManager::file(path)
|
||||
.with_init(|connection| connection.execute_batch(AFTER_CONNECT)),
|
||||
)
|
||||
.unwrap();
|
||||
Sqlite {
|
||||
pool: r2d2::Pool::new(SqliteConnectionManager::file(path)).unwrap(),
|
||||
pool,
|
||||
path: path.clone(),
|
||||
}
|
||||
}
|
||||
@ -30,14 +43,24 @@ impl Sqlite {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct ConnectionCustomizer;
|
||||
|
||||
impl<E> CustomizeConnection<Connection, E> for ConnectionCustomizer {
|
||||
fn on_release(&self, connection: Connection) {
|
||||
connection.execute_batch(BEFORE_DISCONNECT).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
impl Repository for Sqlite {
|
||||
async fn migrate(&self) -> Result<(), ProblemDetails> {
|
||||
let migrations = Migrations::new(vec![
|
||||
M::up("CREATE TABLE users(uuid TEXT PRIMARY KEY, created_at DATETIME DEFAULT CURRENT_TIMESTAMP);"),
|
||||
M::up("CREATE TABLE files(id INTEGER PRIMARY KEY, user_id TEXT, file_path TEXT, version INTEGER, FOREIGN KEY(user_id) REFERENCES users(uuid));"),
|
||||
M::up("CREATE TABLE users(uuid TEXT PRIMARY KEY, created_at TEXT DEFAULT CURRENT_TIMESTAMP) STRICT;"),
|
||||
M::up("CREATE TABLE files(id INTEGER PRIMARY KEY, user_id TEXT, file_path TEXT, version INTEGER, FOREIGN KEY(user_id) REFERENCES users(uuid)) STRICT;"),
|
||||
M::up("CREATE INDEX files_user_path_idx ON files(user_id, file_path);"),
|
||||
]);
|
||||
let mut connection = Connection::open(&self.path).unwrap();
|
||||
connection.execute_batch(AFTER_CONNECT).unwrap();
|
||||
migrations
|
||||
.to_latest(&mut connection)
|
||||
.map_err(to_internal_error)
|
||||
@ -82,18 +105,14 @@ impl Repository for Sqlite {
|
||||
let updated = conn
|
||||
.execute(
|
||||
"UPDATE files SET version = ?1 WHERE id = ?2 AND version = ?3",
|
||||
(
|
||||
file_model.version + 1,
|
||||
file_model.id,
|
||||
file_model.version,
|
||||
),
|
||||
(file_model.version + 1, file_model.id, file_model.version),
|
||||
)
|
||||
.unwrap();
|
||||
if updated != 1 {
|
||||
if updated == 1 {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(ProblemDetails::from_status_code(StatusCode::CONFLICT)
|
||||
.with_detail("File version was updated by another application!"))
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@ -126,23 +145,28 @@ impl Repository for Sqlite {
|
||||
mod tests {
|
||||
use crate::db::repository::{insert, Repository};
|
||||
use crate::db::sqlite::Sqlite;
|
||||
use std::path::PathBuf;
|
||||
use std::env;
|
||||
use tracing_log::log::debug;
|
||||
use uuid::Uuid;
|
||||
|
||||
const FILE_PATH: &str = "test";
|
||||
|
||||
async fn create_repository() -> Sqlite {
|
||||
let sqlite = Sqlite::from_path(&PathBuf::from(
|
||||
"/tmp/".to_owned() + &*Uuid::new_v4().to_string(),
|
||||
));
|
||||
let path = env::temp_dir().join(Uuid::new_v4().to_string());
|
||||
debug!("Using this file for database tests: {:?}", path);
|
||||
let sqlite = Sqlite::from_path(&path);
|
||||
sqlite.migrate().await.unwrap();
|
||||
sqlite
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_file_creation() {
|
||||
tracing_subscriber::fmt()
|
||||
.with_max_level(tracing::Level::DEBUG)
|
||||
.init();
|
||||
let repo = create_repository().await;
|
||||
let user_id = Uuid::new_v4().to_string();
|
||||
repo.create_user(&user_id).await.unwrap();
|
||||
const FILE_PATH: &'static str = "test";
|
||||
let created = repo
|
||||
.create_file(insert::File {
|
||||
user_id: user_id.clone(),
|
||||
@ -161,7 +185,6 @@ mod tests {
|
||||
let repo = create_repository().await;
|
||||
let user_id = Uuid::new_v4().to_string();
|
||||
repo.create_user(&user_id).await.unwrap();
|
||||
const FILE_PATH: &'static str = "test";
|
||||
let created = repo
|
||||
.create_file(insert::File {
|
||||
user_id: user_id.clone(),
|
||||
@ -170,6 +193,6 @@ mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(repo.bump_version(created).await.is_ok())
|
||||
assert!(repo.bump_version(created).await.is_ok());
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,3 +1,4 @@
|
||||
use crate::db::repository::{insert, Repository};
|
||||
use crate::errors::to_internal_error;
|
||||
use crate::files::build_system_path;
|
||||
use crate::{errors, AppState};
|
||||
@ -12,7 +13,7 @@ use futures_util::{StreamExt, TryStreamExt};
|
||||
use problem_details::ProblemDetails;
|
||||
use std::io;
|
||||
use std::io::Error;
|
||||
use std::path::PathBuf;
|
||||
use std::path::{Path, PathBuf};
|
||||
use tokio::fs::{create_dir_all, File};
|
||||
use tokio::io::AsyncWriteExt;
|
||||
|
||||
@ -46,15 +47,36 @@ async fn handle_single_file_upload<'field_lifetime>(
|
||||
Some(field_name) => {
|
||||
let path = PathBuf::from(field_name);
|
||||
let filesystem_path = build_system_path(&state.config.files.directory, user_id, &path)?;
|
||||
save_file(filesystem_path, map_error_to_io_error(field))
|
||||
save_file(&filesystem_path, map_error_to_io_error(field))
|
||||
.await
|
||||
.map_err(to_internal_error)
|
||||
.map_err(to_internal_error)?;
|
||||
update_file_version(state, user_id, &filesystem_path).await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn update_file_version(
|
||||
state: &AppState,
|
||||
user_id: &str,
|
||||
filesystem_path: &Path,
|
||||
) -> Result<(), ProblemDetails> {
|
||||
let repository = state.get_repository();
|
||||
let path_string = filesystem_path.to_string_lossy();
|
||||
match repository.get_file(user_id, path_string.as_ref()).await {
|
||||
Err(error) => Err(to_internal_error(error)),
|
||||
Ok(None) => repository
|
||||
.create_file(insert::File {
|
||||
user_id: user_id.to_string(),
|
||||
path: path_string.to_string(),
|
||||
})
|
||||
.await
|
||||
.map(|_| ()),
|
||||
Ok(Some(file)) => repository.bump_version(file).await,
|
||||
}
|
||||
}
|
||||
|
||||
async fn save_file(
|
||||
path: PathBuf,
|
||||
path: &PathBuf,
|
||||
mut content: impl Stream<Item = Result<Bytes, Error>> + Unpin,
|
||||
) -> Result<(), Error> {
|
||||
create_dir_all(path.parent().unwrap()).await?;
|
||||
|
||||
@ -26,7 +26,8 @@ pub struct AppState {
|
||||
}
|
||||
|
||||
impl AppState {
|
||||
pub fn get_repo(&self) -> &impl Repository {
|
||||
#[must_use]
|
||||
pub fn get_repository(&self) -> &impl Repository {
|
||||
&self.sqlite
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user