diff --git a/rust/Cargo.toml b/rust/Cargo.toml index b39ce67c..bfa2451b 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -4,10 +4,21 @@ version = "0.1.0" edition = "2021" [dependencies] -tokio = { version = "1.19", features = ["full"] } -async-std = { version = "1.12", features = ["attributes"] } -async-graphql = "4.0" -actix-web = "4.0" -async-graphql-actix-web = "4.0" -#doublets = { version = "0.1.0-alpha.20", features = ["full"] } -doublets = "0.1.0-beta.3" +actix-web = { version = "4.1.0" } +async-graphql = { version = "4.0.6" } +async-graphql-actix-web = { version = "4.0.6" } +itertools = { version = "0.10.3" } +smallvec = { version = "1.9.0" } +rayon = { version = "1.5.3" } + +tokio = { version = "1.19", features = ["macros", "rt-multi-thread"] } + +doublets = { git = "https://github.com/linksplatform/doublets-rs" } + +mimalloc = { version = "0.1", default-features = false, optional = true } + +[target.'cfg(all(target_os = "linux", target_arch = "x86_64"))'.dependencies] +jemalloc = { package = "tikv-jemallocator", version = "0.5.0", optional = true } + +[features] +default = ["mimalloc"] diff --git a/rust/src/main.rs b/rust/src/main.rs index 55e38179..cdf3de52 100644 --- a/rust/src/main.rs +++ b/rust/src/main.rs @@ -1,11 +1,14 @@ #![feature(never_type)] #![feature(result_flattening)] +#![feature(box_syntax)] +#![feature(try_trait_v2)] mod model; +mod store; use crate::model::{ - Links, LinksInsertInput, LinksMutationResponse, LinksOnConflict, LinksOptionExt, MutationRoot, - QueryRoot, + LinkType, Links, LinksInsertInput, LinksMutationResponse, LinksOnConflict, LinksOptionExt, + MutationRoot, QueryRoot, }; use actix_web::{guard, web, App, HttpResponse, HttpServer, Responder}; use async_graphql::{ @@ -14,12 +17,11 @@ use async_graphql::{ }; use async_graphql_actix_web::{GraphQLRequest, GraphQLResponse}; use async_std::sync::RwLock; -use doublets::mem::FileMappedMem; -use doublets::{splited, Link}; +use doublets::mem::FileMapped; +use doublets::{split, Doublets, Link}; use std::{error::Error, fs::File, io, path::Path}; -// todo: wait for fix type infer -type RawStore = splited::Store; +type RawStore = Box>; type Store = RwLock; type Schema = async_graphql::Schema; @@ -35,20 +37,16 @@ async fn index_playground() -> actix_web::Result { } // todo: may be add support async-std files to platform-mem -fn map_db_file>(path: P) -> io::Result { - File::options() - .create(true) - .read(true) - .write(true) - .open(path) - .map(FileMappedMem::new) - .flatten() +fn map_db_file>(path: P) -> io::Result> { + FileMapped::from_path(path) } #[tokio::main] // todo: implement Into for LinksError async fn main() -> Result<(), Box> { - let store = RawStore::new(map_db_file("db.links")?, map_db_file("index.links")?)?; + let store = + split::Store::::new(map_db_file("db.links")?, map_db_file("index.links")?)?; + let store: Box> = box store::Store::new(store); let schema = Schema::build(QueryRoot, MutationRoot, EmptySubscription) .data(Store::new(store)) .finish(); @@ -66,3 +64,10 @@ async fn main() -> Result<(), Box> { .await .map_err(|e| e.into()) } + +#[cfg(feature = "mimalloc")] +#[global_allocator] +static MIMALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc; + +#[cfg(feature = "jemalloc")] +static JEMALLOC: jemalloc::Jemalloc = jemalloc::Jemalloc; diff --git a/rust/src/model/distinct.rs b/rust/src/model/distinct.rs new file mode 100644 index 00000000..b4ac380d --- /dev/null +++ b/rust/src/model/distinct.rs @@ -0,0 +1,32 @@ +use crate::model::LinkType; +use doublets::Link; +use std::hash::{Hash, Hasher}; + +pub struct DistinctWrapper { + matches: (LinkType, LinkType), + link: Link, +} + +impl DistinctWrapper { + pub fn from_match_link(matches: (LinkType, LinkType), link: Link) -> Self { + Self { matches, link } + } + + pub fn into_link(self) -> Link { + self.link + } +} + +impl Hash for DistinctWrapper { + fn hash(&self, state: &mut H) { + self.matches.hash(state) + } +} + +impl PartialEq for DistinctWrapper { + fn eq(&self, other: &Self) -> bool { + &self.matches == &other.matches + } +} + +impl Eq for DistinctWrapper {} diff --git a/rust/src/model/enum_type/links_select_column.rs b/rust/src/model/enum_type/links_select_column.rs index c4353b53..41e589ba 100644 --- a/rust/src/model/enum_type/links_select_column.rs +++ b/rust/src/model/enum_type/links_select_column.rs @@ -1,6 +1,6 @@ use async_graphql::*; -#[derive(Enum, Debug, Copy, Clone, Eq, PartialEq)] +#[derive(Enum, Debug, Copy, Clone, Eq, PartialEq, Hash)] #[graphql(name = "links_select_column")] pub enum LinksSelectColumn { #[graphql(name = "from_id")] diff --git a/rust/src/model/enum_type/order_by.rs b/rust/src/model/enum_type/order_by.rs index df413e74..68b4e2cc 100644 --- a/rust/src/model/enum_type/order_by.rs +++ b/rust/src/model/enum_type/order_by.rs @@ -1,4 +1,6 @@ +use crate::model::LinkType; use async_graphql::*; +use std::cmp::Ordering; #[derive(Enum, Debug, Copy, Clone, Eq, PartialEq)] #[graphql(name = "order_by")] @@ -16,3 +18,16 @@ pub enum OrderBy { #[graphql(name = "desc_nulls_last")] DescNullsLast, } + +impl OrderBy { + pub fn matches(&self, a: LinkType, b: LinkType) -> Ordering { + match self { + OrderBy::Asc => a.cmp(&b), + OrderBy::AscNullsFirst => a.cmp(&b), + OrderBy::AscNullsLast => a.cmp(&b), + OrderBy::Desc => b.cmp(&a), + OrderBy::DescNullsFirst => b.cmp(&a), + OrderBy::DescNullsLast => b.cmp(&a), + } + } +} diff --git a/rust/src/model/input_object_type/bigint_comparison_exp.rs b/rust/src/model/input_object_type/bigint_comparison_exp.rs index 3829a893..e2ebe7e3 100644 --- a/rust/src/model/input_object_type/bigint_comparison_exp.rs +++ b/rust/src/model/input_object_type/bigint_comparison_exp.rs @@ -1,5 +1,4 @@ use crate::model::{Bigint, LinkType}; -use crate::RawStore; use async_graphql::*; use doublets::Doublets; diff --git a/rust/src/model/input_object_type/links_order_by.rs b/rust/src/model/input_object_type/links_order_by.rs index 6f921114..39e0a314 100644 --- a/rust/src/model/input_object_type/links_order_by.rs +++ b/rust/src/model/input_object_type/links_order_by.rs @@ -1,4 +1,3 @@ -use crate::model::CanAggregateOrderBy; use crate::model::LinksAggregateOrderBy; use crate::model::MpAggregateOrderBy; use crate::model::NumbersOrderBy; @@ -6,7 +5,10 @@ use crate::model::ObjectsOrderBy; use crate::model::OrderBy; use crate::model::SelectorsAggregateOrderBy; use crate::model::StringsOrderBy; +use crate::model::{CanAggregateOrderBy, LinkType}; use async_graphql::*; +use doublets::Link; +use std::cmp::Ordering; #[derive(InputObject, Debug)] #[graphql(name = "links_order_by")] @@ -52,3 +54,23 @@ pub struct LinksOrderBy { pub typed_aggregate: Option, pub value: Option, } + +impl LinksOrderBy { + pub fn matches(&self, a: &Link, b: &Link) -> Ordering { + let mut ord = Ordering::Equal; + + if let Some(id) = self.id { + ord = ord.then_with(|| id.matches(a.index, b.index)); + } + + if let Some(from_id) = self.from_id { + ord = ord.then_with(|| from_id.matches(a.source, b.source)); + } + + if let Some(to_id) = self.to_id { + ord = ord.then_with(|| to_id.matches(a.target, b.target)); + } + + ord + } +} diff --git a/rust/src/model/mod.rs b/rust/src/model/mod.rs index 5566f5db..01db9c8b 100644 --- a/rust/src/model/mod.rs +++ b/rust/src/model/mod.rs @@ -1,10 +1,11 @@ +mod distinct; mod enum_type; mod input_object_type; mod iterator; mod object_type; mod scalar_type; -use doublets::data::LinksError; +use doublets::data::Error; pub use enum_type::CanSelectColumn; pub use enum_type::LinksConstraint; pub use enum_type::LinksSelectColumn; @@ -234,10 +235,12 @@ pub use scalar_type::Jsonb; pub use scalar_type::LinksOptionExt; use std::marker::PhantomData; +pub use distinct::DistinctWrapper; + pub type LinkType = u64; -pub type LinksResult = Result>; +pub type LinksResult = Result>; -struct LinkTypeAssert(PhantomData); +struct LinkTypeAssert(PhantomData); #[allow(dead_code)] #[allow(non_camel_case_types)] diff --git a/rust/src/model/object_type/links.rs b/rust/src/model/object_type/links.rs index d4e17f90..4bf8794a 100644 --- a/rust/src/model/object_type/links.rs +++ b/rust/src/model/object_type/links.rs @@ -1,4 +1,3 @@ -use crate::model::Bigint; use crate::model::Can; use crate::model::CanAggregate; use crate::model::CanBoolExp; @@ -25,12 +24,14 @@ use crate::model::SelectorsOrderBy; use crate::model::SelectorsSelectColumn; use crate::model::Strings; use crate::model::UpLinksArgs; +use crate::model::{Bigint, LinkType}; use crate::Store; use async_graphql::*; use doublets::Doublets; #[derive(Debug, Clone)] -pub struct Links(pub doublets::Link); +#[repr(transparent)] +pub struct Links(pub doublets::Link); #[Object(name = "links")] impl Links { diff --git a/rust/src/model/object_type/mutation_root.rs b/rust/src/model/object_type/mutation_root.rs index 4a6123a0..9082213f 100644 --- a/rust/src/model/object_type/mutation_root.rs +++ b/rust/src/model/object_type/mutation_root.rs @@ -1,4 +1,3 @@ -use crate::model::Links; use crate::model::LinksBoolExp; use crate::model::LinksIncInput; use crate::model::LinksInsertInput; @@ -44,10 +43,12 @@ use crate::model::StringsOnConflict; use crate::model::StringsPkColumnsInput; use crate::model::StringsSetInput; use crate::model::{Bigint, LinksResult}; -use crate::Store; +use crate::model::{LinkType, Links}; +use crate::{QueryRoot, Store}; use async_graphql::*; -use doublets::data::{LinksError, Query}; +use doublets::data::{Error, Query}; use doublets::{Doublets, Link}; +use smallvec::{smallvec, SmallVec}; use std::io::{Read, Write}; pub use crate::model::LinksOptionExt; @@ -62,9 +63,27 @@ impl MutationRoot { &self, ctx: &Context<'_>, _where: Box, - ) -> Option { - todo!() + ) -> Result> { + let mut store = ctx.data_unchecked::().write().await; + + let ids: Vec<_> = QueryRoot::filter_links(&*store, Some(_where)) + .await + .map(|link| link.index) + .collect(); + + let returning: LinksResult> = ids + .into_iter() + .map(move |id| -> LinksResult<_> { + let link = store.try_get_link(id)?; + store.delete(id)?; + Ok(Links(link)) + }) + .collect(); + returning + .map(|s| Some(LinksMutationResponse(s))) + .map_err(|e| e.into()) } + #[graphql(name = "delete_links_by_pk")] pub async fn delete_links_by_pk(&self, ctx: &Context<'_>, id: Bigint) -> Option { todo!() @@ -230,6 +249,7 @@ impl MutationRoot { ) -> Option { todo!() } + #[graphql(name = "update_links")] pub async fn update_links( &self, @@ -237,9 +257,42 @@ impl MutationRoot { #[graphql(name = "_inc")] inc: Option, #[graphql(name = "_set")] set: Option, _where: Box, - ) -> Option { - todo!() + ) -> Result> { + let mut store = ctx.data_unchecked::().write().await; + + let ids: Vec<_> = QueryRoot::filter_links(&*store, Some(_where)) + .await + .map(|link| link.index) + .collect(); + + let returning: LinksResult> = ids + .into_iter() + .map(move |id| -> LinksResult<_> { + let link = store.try_get_link(id)?; + let (from_id, to_id) = if let Some(inc) = &inc { + ( + link.source + inc.from_id.to_link(), + link.target + inc.from_id.to_link(), + ) + } else if let Some(set) = &set { + (set.from_id.to_link(), set.to_id.to_link()) + } else { + (link.source, link.target) + }; + + if (link.source, link.target) != (from_id, to_id) { + let id = store.update(id, from_id, to_id)?; + Ok(Links(Link::new(id, from_id, to_id))) + } else { + Ok(Links(link)) + } + }) + .collect(); + returning + .map(|s| Some(LinksMutationResponse(s))) + .map_err(|e| e.into()) } + #[graphql(name = "update_links_by_pk")] pub async fn update_links_by_pk( &self, diff --git a/rust/src/model/object_type/query_root.rs b/rust/src/model/object_type/query_root.rs index 0746d259..0cb778ca 100644 --- a/rust/src/model/object_type/query_root.rs +++ b/rust/src/model/object_type/query_root.rs @@ -1,4 +1,3 @@ -use crate::model::Can; use crate::model::CanAggregate; use crate::model::CanBoolExp; use crate::model::CanOrderBy; @@ -37,9 +36,14 @@ use crate::model::StringsBoolExp; use crate::model::StringsOrderBy; use crate::model::StringsSelectColumn; use crate::model::{Bigint, BigintComparisonExp, LinkType}; -use crate::Store; +use crate::model::{Can, DistinctWrapper}; +use crate::{store, RawStore, Store}; use async_graphql::*; -use doublets::Doublets; +use doublets::{Doublet, Doublets, Link}; +use rayon::prelude::*; +use std::cmp::Ordering; +use std::collections::{HashMap, HashSet}; +use std::mem::ManuallyDrop; #[derive(Debug)] pub struct QueryRoot; @@ -76,17 +80,11 @@ impl QueryRoot { todo!() } - pub async fn links( - &self, - ctx: &Context<'_>, - #[graphql(name = "distinct_on")] distinct_on: Option>, - limit: Option, - offset: Option, - #[graphql(name = "order_by")] order_by: Option>, + #[graphql(skip)] + pub(crate) async fn filter_links( + store: &RawStore, _where: Option>, - ) -> Vec { - let store = ctx.data_unchecked::().read().await; - + ) -> Box> + '_> { let fast_param_impl = |param: Option<&BigintComparisonExp>| -> LinkType { let any = store.constants().any; if let Some(param) = param { @@ -104,14 +102,66 @@ impl QueryRoot { let from_id = fast_param_impl(r#where.from_id.as_deref()); let to_id = fast_param_impl(r#where.to_id.as_deref()); - store + box store .each_iter([id, from_id, to_id]) - .filter(|link| r#where.matches(&*store, link)) - .map(|link| Links(link)) - .collect() + .filter(move |link| r#where.matches(&*store, link)) } else { - store.iter().map(|link| Links(link)).collect() + // todo: come up with something smarter + // store.iter() + box store.iter() + } + } + + pub async fn links( + &self, + ctx: &Context<'_>, + #[graphql(name = "distinct_on")] distinct_on: Option>, + limit: Option, + offset: Option, + #[graphql(name = "order_by")] order_by: Option>, + _where: Option>, + ) -> Vec { + let offset = offset.unwrap_or(0); + let limit = limit.unwrap_or(usize::MAX); + let store = ctx.data_unchecked::().read().await; + let mut links = Self::filter_links(&*store, _where).await; + if let Some(distinct_on) = distinct_on { + let distinct_on: HashSet<_> = distinct_on.into_iter().collect(); + links = box links + .map(move |link| { + let mut from_id = 0; + let mut to_id = 0; + for column in &distinct_on { + match column { + LinksSelectColumn::FromId => from_id = link.source, + LinksSelectColumn::ToId => to_id = link.target, + _ => { + todo!() + } + } + } + DistinctWrapper::from_match_link((from_id, to_id), link) + }) + .collect::>() + .into_iter() + .map(DistinctWrapper::into_link) } + + let mut links: Vec<_> = links.collect(); + if let Some(order_by) = order_by { + links.par_sort_unstable_by(|a, b| { + order_by.iter().fold(Ordering::Equal, |ord, order| { + ord.then_with(|| order.matches(a, b)) + }) + }) + } + + links + .into_par_iter() + .map(Links) + .skip(offset) + .take(limit) + .collect() } #[graphql(name = "links_aggregate")] diff --git a/rust/src/store.rs b/rust/src/store.rs new file mode 100644 index 00000000..01912718 --- /dev/null +++ b/rust/src/store.rs @@ -0,0 +1,119 @@ +use crate::model::{LinkType, LinksResult}; +use doublets::data::{Flow, ReadHandler, WriteHandler}; +use doublets::{ + data::{Flow::Continue, LinksConstants, ToQuery}, + mem::FileMapped, + parts, split, Doublets, Error, Link, Links, +}; +use smallvec::SmallVec; +use std::ops::Try; + +pub struct Store(Inner); + +impl> Store { + pub fn new(inner: Inner) -> Self { + Self(inner) + } + + pub fn iter(&self) -> impl Iterator> + '_ { + self.0.iter() + } + + pub fn each_iter<'a>( + &self, + query: impl ToQuery + 'a, + ) -> impl Iterator> + 'a { + self.0.each_iter(query) + } +} + +impl> Links for Store { + fn constants(&self) -> &LinksConstants { + self.0.constants() + } + + fn count_links(&self, query: &[LinkType]) -> LinkType { + self.0.count_links(query) + } + + fn create_links( + &mut self, + query: &[LinkType], + handler: WriteHandler, + ) -> Result> { + self.0.create_links(query, handler) + } + + fn each_links(&self, query: &[LinkType], handler: ReadHandler) -> Flow { + self.0.each_links(query, handler) + } + + fn update_links( + &mut self, + query: &[LinkType], + change: &[LinkType], + handler: WriteHandler, + ) -> Result> { + self.0.update_links(query, change, handler) + } + + fn delete_links( + &mut self, + query: &[LinkType], + handler: WriteHandler, + ) -> Result> { + self.0.delete_links(query, handler) + } +} + +impl> Doublets for Store { + fn update_by_with( + &mut self, + query: impl ToQuery, + replacement: impl ToQuery, + mut handler: F, + ) -> Result> + where + F: FnMut(Link, Link) -> R, + R: Try, + { + let query = query.to_query(); + let replacement = replacement.to_query(); + + let constants = self.constants().clone(); + let store = &mut self.0; + let (new, from_id, to_id) = ( + query[constants.index_part as usize], + replacement[constants.source_part as usize], + replacement[constants.target_part as usize], + ); + let id = if let Some(old) = store.search(from_id, to_id) { + store.rebase_with(new, old, &mut handler)?; + store.delete_with(new, &mut handler)?; + old + } else { + new + }; + + store.update_with(id, from_id, to_id, handler) + } + + fn delete_by_with( + &mut self, + query: impl ToQuery, + mut handler: F, + ) -> Result> + where + F: FnMut(Link, Link) -> R, + R: Try, + { + let index_part = self.constants().index_part; + let store = &mut self.0; + store.delete_usages_with(query.to_query()[index_part as usize], &mut handler)?; + store.delete_by_with(query, handler) + } + + fn get_link(&self, index: LinkType) -> Option> { + self.0.get_link(index) + } +}