use crate::msg::{Read as ReadMsg, *};
use btlib::{crypto::Creds, BlockPath, Result};
use bttp::{MsgCallback, MsgReceived, Receiver};
use core::future::Future;
use std::{net::IpAddr, ops::Deref, sync::Arc};
pub trait FsProvider: Send + Sync {
type LookupFut<'c>: Send + Future<Output = Result<LookupReply>>
where
Self: 'c;
fn lookup<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Lookup<'c>) -> Self::LookupFut<'c>;
type CreateFut<'c>: Send + Future<Output = Result<CreateReply>>
where
Self: 'c;
fn create<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Create<'c>) -> Self::CreateFut<'c>;
type OpenFut<'c>: Send + Future<Output = Result<OpenReply>>
where
Self: 'c;
fn open<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Open) -> Self::OpenFut<'c>;
type ReadGuard: Send + Sync + Deref<Target = [u8]>;
type ReadFut<'c>: Send + Future<Output = Result<Self::ReadGuard>>
where
Self: 'c;
fn read<'c>(&'c self, from: &'c Arc<BlockPath>, msg: ReadMsg) -> Self::ReadFut<'c>;
type WriteFut<'r>: Send + Future<Output = Result<WriteReply>>
where
Self: 'r;
fn write<'c>(&'c self, from: &'c Arc<BlockPath>, write: Write<&'c [u8]>) -> Self::WriteFut<'c>;
type FlushFut<'c>: Send + Future<Output = Result<()>>
where
Self: 'c;
fn flush<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Flush) -> Self::FlushFut<'c>;
type ReadDirFut<'c>: Send + Future<Output = Result<ReadDirReply>>
where
Self: 'c;
fn read_dir<'c>(&'c self, from: &'c Arc<BlockPath>, msg: ReadDir) -> Self::ReadDirFut<'c>;
type LinkFut<'c>: Send + Future<Output = Result<LinkReply>>
where
Self: 'c;
fn link<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Link<'c>) -> Self::LinkFut<'c>;
type UnlinkFut<'c>: Send + Future<Output = Result<()>>
where
Self: 'c;
fn unlink<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Unlink<'c>) -> Self::UnlinkFut<'c>;
type ReadMetaFut<'c>: Send + Future<Output = Result<ReadMetaReply>>
where
Self: 'c;
fn read_meta<'c>(&'c self, from: &'c Arc<BlockPath>, msg: ReadMeta) -> Self::ReadMetaFut<'c>;
type WriteMetaFut<'c>: Send + Future<Output = Result<WriteMetaReply>>
where
Self: 'c;
fn write_meta<'c>(&'c self, from: &'c Arc<BlockPath>, msg: WriteMeta)
-> Self::WriteMetaFut<'c>;
type AllocateFut<'c>: Send + Future<Output = Result<()>>
where
Self: 'c;
fn allocate<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Allocate) -> Self::AllocateFut<'c>;
type CloseFut<'c>: Send + Future<Output = Result<()>>
where
Self: 'c;
fn close<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Close) -> Self::CloseFut<'c>;
type ForgetFut<'c>: Send + Future<Output = Result<()>>
where
Self: 'c;
fn forget<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Forget) -> Self::ForgetFut<'c>;
type LockFut<'c>: Send + Future<Output = Result<()>>
where
Self: 'c;
fn lock<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Lock) -> Self::LockFut<'c>;
type UnlockFut<'c>: Send + Future<Output = Result<()>>
where
Self: 'c;
fn unlock<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Unlock) -> Self::UnlockFut<'c>;
type AddReacapFut<'c>: Send + Future<Output = Result<()>>
where
Self: 'c;
fn add_readcap<'c>(
&'c self,
from: &'c Arc<BlockPath>,
msg: AddReadcap,
) -> Self::AddReacapFut<'c>;
type GrantAccessFut<'c>: Send + Future<Output = Result<()>>
where
Self: 'c;
fn grant_access<'c>(
&'c self,
from: &'c Arc<BlockPath>,
msg: GrantAccess,
) -> Self::GrantAccessFut<'c>;
}
impl<P: 'static + ?Sized + FsProvider, Ptr: Send + Sync + Deref<Target = P>> FsProvider for Ptr {
type LookupFut<'c> = P::LookupFut<'c> where Self: 'c;
fn lookup<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Lookup<'c>) -> Self::LookupFut<'c> {
self.deref().lookup(from, msg)
}
type CreateFut<'c> = P::CreateFut<'c> where Self: 'c;
fn create<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Create<'c>) -> Self::CreateFut<'c> {
self.deref().create(from, msg)
}
type OpenFut<'c> = P::OpenFut<'c> where Self: 'c;
fn open<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Open) -> Self::OpenFut<'c> {
self.deref().open(from, msg)
}
type ReadGuard = P::ReadGuard;
type ReadFut<'c> = P::ReadFut<'c> where Self: 'c;
fn read<'c>(&'c self, from: &'c Arc<BlockPath>, msg: ReadMsg) -> Self::ReadFut<'c> {
self.deref().read(from, msg)
}
type WriteFut<'r> = P::WriteFut<'r> where Self: 'r;
fn write<'c>(&'c self, from: &'c Arc<BlockPath>, write: Write<&'c [u8]>) -> Self::WriteFut<'c> {
self.deref().write(from, write)
}
type FlushFut<'c> = P::FlushFut<'c> where Self: 'c;
fn flush<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Flush) -> Self::FlushFut<'c> {
self.deref().flush(from, msg)
}
type ReadDirFut<'c> = P::ReadDirFut<'c> where Self: 'c;
fn read_dir<'c>(&'c self, from: &'c Arc<BlockPath>, msg: ReadDir) -> Self::ReadDirFut<'c> {
self.deref().read_dir(from, msg)
}
type LinkFut<'c> = P::LinkFut<'c> where Self: 'c;
fn link<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Link<'c>) -> Self::LinkFut<'c> {
self.deref().link(from, msg)
}
type UnlinkFut<'c> = P::UnlinkFut<'c> where Self: 'c;
fn unlink<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Unlink<'c>) -> Self::UnlinkFut<'c> {
self.deref().unlink(from, msg)
}
type ReadMetaFut<'c> = P::ReadMetaFut<'c> where Self: 'c;
fn read_meta<'c>(&'c self, from: &'c Arc<BlockPath>, msg: ReadMeta) -> Self::ReadMetaFut<'c> {
self.deref().read_meta(from, msg)
}
type WriteMetaFut<'c> = P::WriteMetaFut<'c> where Self: 'c;
fn write_meta<'c>(
&'c self,
from: &'c Arc<BlockPath>,
msg: WriteMeta,
) -> Self::WriteMetaFut<'c> {
self.deref().write_meta(from, msg)
}
type AllocateFut<'c> = P::AllocateFut<'c> where Self: 'c;
fn allocate<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Allocate) -> Self::AllocateFut<'c> {
self.deref().allocate(from, msg)
}
type CloseFut<'c> = P::CloseFut<'c> where Self: 'c;
fn close<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Close) -> Self::CloseFut<'c> {
self.deref().close(from, msg)
}
type ForgetFut<'c> = P::ForgetFut<'c> where Self: 'c;
fn forget<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Forget) -> Self::ForgetFut<'c> {
self.deref().forget(from, msg)
}
type LockFut<'c> = P::LockFut<'c> where Self: 'c;
fn lock<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Lock) -> Self::LockFut<'c> {
self.deref().lock(from, msg)
}
type UnlockFut<'c> = P::UnlockFut<'c> where Self: 'c;
fn unlock<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Unlock) -> Self::UnlockFut<'c> {
self.deref().unlock(from, msg)
}
type AddReacapFut<'c> = P::AddReacapFut<'c> where Self: 'c;
fn add_readcap<'c>(
&'c self,
from: &'c Arc<BlockPath>,
msg: AddReadcap,
) -> Self::AddReacapFut<'c> {
self.deref().add_readcap(from, msg)
}
type GrantAccessFut<'c> = P::GrantAccessFut<'c> where Self: 'c;
fn grant_access<'c>(
&'c self,
from: &'c Arc<BlockPath>,
msg: GrantAccess,
) -> Self::GrantAccessFut<'c> {
self.deref().grant_access(from, msg)
}
}
struct ServerCallback<P> {
provider: Arc<P>,
}
impl<P> ServerCallback<P> {
fn new(provider: Arc<P>) -> Self {
Self { provider }
}
}
impl<P> Clone for ServerCallback<P> {
fn clone(&self) -> Self {
Self {
provider: self.provider.clone(),
}
}
}
impl<P: 'static + Send + Sync + FsProvider> MsgCallback for ServerCallback<P> {
type Arg<'de> = FsMsg<'de>;
type CallFut<'de> = impl 'de + Future<Output = btlib::Result<()>>;
fn call<'de>(&'de self, arg: MsgReceived<FsMsg<'de>>) -> Self::CallFut<'de> {
async move {
let (from, body, replier) = arg.into_parts();
let provider = self.provider.as_ref();
let reply = match body {
FsMsg::Lookup(lookup) => FsReply::Lookup(provider.lookup(&from, lookup).await?),
FsMsg::Create(create) => FsReply::Create(provider.create(&from, create).await?),
FsMsg::Open(open) => FsReply::Open(provider.open(&from, open).await?),
FsMsg::Read(read) => {
let guard = provider.read(&from, read).await?;
let mut replier = replier.unwrap();
replier
.reply(FsReply::Read(ReadReply { data: &guard }))
.await?;
return Ok(());
}
FsMsg::Write(write) => FsReply::Write(provider.write(&from, write).await?),
FsMsg::Flush(flush) => FsReply::Ack(provider.flush(&from, flush).await?),
FsMsg::ReadDir(read_dir) => {
FsReply::ReadDir(provider.read_dir(&from, read_dir).await?)
}
FsMsg::Link(link) => FsReply::Link(provider.link(&from, link).await?),
FsMsg::Unlink(unlink) => FsReply::Ack(provider.unlink(&from, unlink).await?),
FsMsg::ReadMeta(read_meta) => {
FsReply::ReadMeta(provider.read_meta(&from, read_meta).await?)
}
FsMsg::WriteMeta(write_meta) => {
FsReply::WriteMeta(provider.write_meta(&from, write_meta).await?)
}
FsMsg::Allocate(allocate) => {
FsReply::Ack(provider.allocate(&from, allocate).await?)
}
FsMsg::Close(close) => FsReply::Ack(provider.close(&from, close).await?),
FsMsg::Forget(forget) => FsReply::Ack(provider.forget(&from, forget).await?),
FsMsg::Lock(lock) => FsReply::Ack(provider.lock(&from, lock).await?),
FsMsg::Unlock(unlock) => FsReply::Ack(provider.unlock(&from, unlock).await?),
FsMsg::AddReadcap(add_readcap) => {
FsReply::Ack(provider.add_readcap(&from, add_readcap).await?)
}
FsMsg::GrantAccess(grant_access) => {
FsReply::Ack(provider.grant_access(&from, grant_access).await?)
}
};
replier.unwrap().reply(reply).await
}
}
}
pub fn new_fs_server<C, P>(ip_addr: IpAddr, creds: Arc<C>, provider: Arc<P>) -> Result<Receiver>
where
C: 'static + Creds,
P: 'static + FsProvider,
{
Receiver::new(ip_addr, creds, ServerCallback::new(provider))
}