use crate::{msg::*, server::FsProvider, Handle, Inode};
use btlib::{bterr, crypto::ConcretePub, BlockPath, IssuedProcRec, Result};
use bttp::{DeserCallback, Transmitter};
use core::future::{ready, Future, Ready};
use futures::FutureExt;
use paste::paste;
use std::sync::Arc;
macro_rules! extractor {
($variant:ident) => {
paste! {
[<Extract $variant>]
}
};
}
macro_rules! reply {
($variant:ident) => {
paste! {
[<$variant Reply>]
}
};
}
macro_rules! extractor_callback {
($variant:ident) => {
paste! {
struct [<Extract $variant>];
impl DeserCallback for extractor!($variant) {
type Arg<'de> = FsReply<'de>;
type Return = Result<reply!($variant)>;
type CallFut<'de> = Ready<Self::Return>;
fn call<'de>(&'de mut self, arg: Self::Arg<'de>) -> Self::CallFut<'de> {
let result = if let FsReply::$variant(value) = arg {
Ok(value)
} else {
Err(bterr!("wrong message type sent as reply"))
};
ready(result)
}
}
}
};
}
extractor_callback!(Lookup);
extractor_callback!(Create);
extractor_callback!(Open);
extractor_callback!(Write);
extractor_callback!(Link);
extractor_callback!(ReadDir);
extractor_callback!(WriteMeta);
struct AckCallback;
impl DeserCallback for AckCallback {
type Arg<'de> = FsReply<'de>;
type Return = Result<()>;
type CallFut<'de> = Ready<Self::Return>;
fn call<'de>(&'de mut self, arg: Self::Arg<'de>) -> Self::CallFut<'de> {
let result = if let FsReply::Ack(_) = arg {
Ok(())
} else {
Err(bterr!("wrong message type sent as reply"))
};
ready(result)
}
}
struct ExtractReadMeta;
impl DeserCallback for ExtractReadMeta {
type Arg<'de> = FsReply<'de>;
type Return = Result<ReadMetaReply>;
type CallFut<'de> = Ready<Self::Return>;
fn call<'de>(&'de mut self, arg: Self::Arg<'de>) -> Self::CallFut<'de> {
let result = if let FsReply::ReadMeta(value) = arg {
Ok(value)
} else {
Err(bterr!("wrong message type sent as reply"))
};
ready(result)
}
}
struct ExtractRead<F> {
callback: Option<F>,
}
impl<F> ExtractRead<F> {
fn new(callback: F) -> Self {
Self {
callback: Some(callback),
}
}
}
impl<R, Fut, F> DeserCallback for ExtractRead<F>
where
F: 'static + Send + FnOnce(ReadReply<'_>) -> Fut,
Fut: Send + Future<Output = R>,
{
type Arg<'de> = FsReply<'de> where F: 'de;
type Return = Result<R>;
type CallFut<'de> = impl 'de + Send + Future<Output = Self::Return>;
fn call<'de>(&'de mut self, arg: Self::Arg<'de>) -> Self::CallFut<'de> {
let callback = self.callback.take().unwrap();
async move {
if let FsReply::Read(reply) = arg {
Ok(callback(reply).await)
} else {
Err(bterr!("wrong variant sent in reply to Read message"))
}
}
}
}
pub struct FsClient {
tx: Transmitter,
}
impl FsClient {
pub fn new(tx: Transmitter) -> Self {
Self { tx }
}
pub fn into_inner(self) -> Transmitter {
self.tx
}
pub fn get_ref(&self) -> &Transmitter {
&self.tx
}
pub fn get_mut(&mut self) -> &mut Transmitter {
&mut self.tx
}
}
impl FsProvider for FsClient {
type LookupFut<'c> = impl 'c + Send + Future<Output = Result<LookupReply>>;
fn lookup<'c>(&'c self, _from: &'c Arc<BlockPath>, msg: Lookup<'c>) -> Self::LookupFut<'c> {
self.tx
.call(FsMsg::Lookup(msg), extractor!(Lookup))
.map(|e| e?)
}
type CreateFut<'c> = impl 'c + Send + Future<Output = Result<CreateReply>>;
fn create<'c>(&'c self, _from: &'c Arc<BlockPath>, msg: Create<'c>) -> Self::CreateFut<'c> {
self.tx
.call(FsMsg::Create(msg), extractor!(Create))
.map(|e| e?)
}
type OpenFut<'c> = impl 'c + Send + Future<Output = Result<OpenReply>>;
fn open<'c>(&'c self, _from: &'c Arc<BlockPath>, msg: Open) -> Self::OpenFut<'c> {
self.tx.call(FsMsg::Open(msg), extractor!(Open)).map(|e| e?)
}
type ReadGuard = Vec<u8>;
type ReadFut<'c> = impl 'c + Send + Future<Output = Result<Self::ReadGuard>>;
fn read<'c>(&'c self, _from: &'c Arc<BlockPath>, msg: Read) -> Self::ReadFut<'c> {
let callback = ExtractRead::new(|reply: ReadReply<'_>| {
ready(reply.data.to_vec())
});
self.tx.call(FsMsg::Read(msg), callback).map(|e| e?)
}
type WriteFut<'r> = impl 'r + Send + Future<Output = Result<WriteReply>>;
fn write<'c>(&'c self, _from: &'c Arc<BlockPath>, msg: Write<&'c [u8]>) -> Self::WriteFut<'c> {
self.tx
.call(FsMsg::Write(msg), extractor!(Write))
.map(|e| e?)
}
type FlushFut<'c> = impl 'c + Send + Future<Output = Result<()>>;
fn flush<'c>(&'c self, _from: &'c Arc<BlockPath>, msg: Flush) -> Self::FlushFut<'c> {
self.tx.call(FsMsg::Flush(msg), AckCallback).map(|e| e?)
}
type ReadDirFut<'c> = impl 'c + Send + Future<Output = Result<ReadDirReply>>;
fn read_dir<'c>(&'c self, _from: &'c Arc<BlockPath>, msg: ReadDir) -> Self::ReadDirFut<'c> {
self.tx
.call(FsMsg::ReadDir(msg), extractor!(ReadDir))
.map(|e| e?)
}
type LinkFut<'c> = impl 'c + Send + Future<Output = Result<LinkReply>>;
fn link<'c>(&'c self, _from: &'c Arc<BlockPath>, msg: Link<'c>) -> Self::LinkFut<'c> {
self.tx.call(FsMsg::Link(msg), extractor!(Link)).map(|e| e?)
}
type UnlinkFut<'c> = impl 'c + Send + Future<Output = Result<()>>;
fn unlink<'c>(&'c self, _from: &'c Arc<BlockPath>, msg: Unlink<'c>) -> Self::UnlinkFut<'c> {
self.tx.call(FsMsg::Unlink(msg), AckCallback).map(|e| e?)
}
type ReadMetaFut<'c> = impl 'c + Send + Future<Output = Result<ReadMetaReply>>;
fn read_meta<'c>(&'c self, _from: &'c Arc<BlockPath>, msg: ReadMeta) -> Self::ReadMetaFut<'c> {
self.tx
.call(FsMsg::ReadMeta(msg), extractor!(ReadMeta))
.map(|e| e?)
}
type WriteMetaFut<'c> = impl 'c + Send + Future<Output = Result<WriteMetaReply>>;
fn write_meta<'c>(
&'c self,
_from: &'c Arc<BlockPath>,
msg: WriteMeta,
) -> Self::WriteMetaFut<'c> {
self.tx
.call(FsMsg::WriteMeta(msg), extractor!(WriteMeta))
.map(|e| e?)
}
type AllocateFut<'c> = impl 'c + Send + Future<Output = Result<()>>;
fn allocate<'c>(&'c self, _from: &'c Arc<BlockPath>, msg: Allocate) -> Self::AllocateFut<'c> {
self.tx.call(FsMsg::Allocate(msg), AckCallback).map(|e| e?)
}
type CloseFut<'c> = impl 'c + Send + Future<Output = Result<()>>;
fn close<'c>(&'c self, _from: &'c Arc<BlockPath>, msg: Close) -> Self::CloseFut<'c> {
self.tx.call(FsMsg::Close(msg), AckCallback).map(|e| e?)
}
type ForgetFut<'c> = impl 'c + Send + Future<Output = Result<()>>;
fn forget<'c>(&'c self, _from: &'c Arc<BlockPath>, msg: Forget) -> Self::ForgetFut<'c> {
self.tx.call(FsMsg::Forget(msg), AckCallback).map(|e| e?)
}
type LockFut<'c> = impl 'c + Send + Future<Output = Result<()>>;
fn lock<'c>(&'c self, _from: &'c Arc<BlockPath>, msg: Lock) -> Self::LockFut<'c> {
self.tx.call(FsMsg::Lock(msg), AckCallback).map(|e| e?)
}
type UnlockFut<'c> = impl 'c + Send + Future<Output = Result<()>>;
fn unlock<'c>(&'c self, _from: &'c Arc<BlockPath>, msg: Unlock) -> Self::UnlockFut<'c> {
self.tx.call(FsMsg::Unlock(msg), AckCallback).map(|e| e?)
}
type AddReacapFut<'c> = impl 'c + Send + Future<Output = Result<()>>;
fn add_readcap<'c>(
&'c self,
_from: &'c Arc<BlockPath>,
msg: AddReadcap,
) -> Self::AddReacapFut<'c> {
self.tx
.call(FsMsg::AddReadcap(msg), AckCallback)
.map(|e| e?)
}
type GrantAccessFut<'c> = impl 'c + Send + Future<Output = Result<()>>;
fn grant_access<'c>(
&'c self,
_from: &'c Arc<BlockPath>,
msg: GrantAccess,
) -> Self::GrantAccessFut<'c> {
self.tx
.call(FsMsg::GrantAccess(msg), AckCallback)
.map(|e| e?)
}
}
impl FsClient {
pub async fn lookup(&self, parent: Inode, name: &str) -> Result<LookupReply> {
let msg = FsMsg::Lookup(Lookup { parent, name });
self.tx.call(msg, extractor!(Lookup)).await?
}
pub async fn create(
&self,
parent: Inode,
name: &str,
flags: Flags,
mode: u32,
umask: u32,
) -> Result<CreateReply> {
let msg = FsMsg::Create(Create {
parent,
name,
flags,
mode,
umask,
});
self.tx.call(msg, extractor!(Create)).await?
}
pub async fn open(&self, inode: Inode, flags: Flags) -> Result<OpenReply> {
let msg = FsMsg::Open(Open { inode, flags });
self.tx.call(msg, extractor!(Open)).await?
}
pub async fn read<R, Fut, F>(
&self,
inode: Inode,
handle: Handle,
offset: u64,
size: u64,
callback: F,
) -> Result<R>
where
F: 'static + Send + FnOnce(ReadReply<'_>) -> Fut,
Fut: Send + Future<Output = R>,
{
let msg = FsMsg::Read(Read {
inode,
handle,
offset,
size,
});
let callback = ExtractRead::new(callback);
self.tx.call(msg, callback).await?
}
pub async fn write(
&self,
inode: Inode,
handle: Handle,
offset: u64,
data: &[u8],
) -> Result<WriteReply> {
let msg = FsMsg::Write(Write {
inode,
handle,
offset,
data,
});
self.tx.call(msg, extractor!(Write)).await?
}
pub async fn flush(&self, inode: Inode, handle: Handle) -> Result<()> {
let msg = FsMsg::Flush(Flush { inode, handle });
self.tx.call(msg, AckCallback).await?
}
pub async fn read_dir(
&self,
inode: Inode,
handle: Handle,
limit: u32,
state: u64,
) -> Result<ReadDirReply> {
let msg = FsMsg::ReadDir(ReadDir {
inode,
handle,
limit,
state,
});
self.tx.call(msg, ExtractReadDir).await?
}
pub async fn link(&self, inode: Inode, new_parent: Inode, name: &str) -> Result<LinkReply> {
let msg = FsMsg::Link(Link {
inode,
new_parent,
name,
});
self.tx.call(msg, extractor!(Link)).await?
}
pub async fn unlink(&self, parent: Inode, name: &str) -> Result<()> {
let msg = FsMsg::Unlink(Unlink { parent, name });
self.tx.call(msg, AckCallback).await?
}
pub async fn read_meta(&self, inode: Inode, handle: Option<Handle>) -> Result<ReadMetaReply> {
let msg = FsMsg::ReadMeta(ReadMeta { inode, handle });
self.tx.call(msg, ExtractReadMeta).await?
}
pub async fn write_meta(
&self,
inode: Inode,
handle: Option<Handle>,
attrs: Attrs,
attrs_set: AttrsSet,
) -> Result<WriteMetaReply> {
let msg = FsMsg::WriteMeta(WriteMeta {
inode,
handle,
attrs,
attrs_set,
});
self.tx.call(msg, ExtractWriteMeta).await?
}
pub async fn allocate(&self, inode: Inode, handle: Handle, size: u64) -> Result<()> {
let msg = FsMsg::Allocate(Allocate {
inode,
handle,
offset: None,
size,
});
self.tx.call(msg, AckCallback).await?
}
pub async fn close(&self, inode: Inode, handle: Handle) -> Result<()> {
let msg = FsMsg::Close(Close { inode, handle });
self.tx.call(msg, AckCallback).await?
}
pub async fn forget(&self, inode: Inode, count: u64) -> Result<()> {
let msg = FsMsg::Forget(Forget { inode, count });
self.tx.call(msg, AckCallback).await?
}
pub async fn lock(&self, inode: Inode, handle: Handle, desc: LockDesc) -> Result<()> {
let msg = FsMsg::Lock(Lock {
inode,
handle,
desc,
});
self.tx.call(msg, AckCallback).await?
}
pub async fn unlock(&self, inode: Inode, handle: Handle) -> Result<()> {
let msg = FsMsg::Unlock(Unlock { inode, handle });
self.tx.call(msg, AckCallback).await?
}
pub async fn add_readcap(
&self,
inode: Inode,
handle: Handle,
pub_creds: ConcretePub,
) -> Result<()> {
let msg = FsMsg::AddReadcap(AddReadcap {
inode,
handle,
pub_creds,
});
self.tx.call(msg, AckCallback).await?
}
pub async fn grant_access(&self, inode: Inode, record: IssuedProcRec) -> Result<()> {
let msg = FsMsg::GrantAccess(GrantAccess { inode, record });
self.tx.call(msg, AckCallback).await?
}
}
impl AsRef<Transmitter> for FsClient {
fn as_ref(&self) -> &Transmitter {
self.get_ref()
}
}
impl AsMut<Transmitter> for FsClient {
fn as_mut(&mut self) -> &mut Transmitter {
self.get_mut()
}
}
impl From<Transmitter> for FsClient {
fn from(value: Transmitter) -> Self {
Self::new(value)
}
}