use crate::fuse_fs::FuseFs;
use btfproto::server::FsProvider;
use btlib::{bterr, BlockPath, Result};
use fuse_backend_rs::{
api::server::Server,
transport::{self, FuseChannel, FuseSession},
};
use futures::future::FutureExt;
use log::{debug, error};
use std::path::PathBuf;
use std::{
ffi::{c_char, c_int, CString},
fs::File,
os::{fd::FromRawFd, unix::ffi::OsStrExt},
path::Path,
sync::Arc,
};
use tokio::{sync::oneshot, task::JoinSet};
pub use private::FuseDaemon;
mod private {
use std::{fs::DirBuilder, num::NonZeroUsize};
use super::*;
#[link(name = "fuse3")]
extern "C" {
fn fuse_open_channel(mountpoint: *const c_char, options: *const c_char) -> c_int;
}
fn mount_at<P: AsRef<Path>>(mnt_point: P, mnt_options: &str) -> Result<File> {
let mountpoint = CString::new(mnt_point.as_ref().as_os_str().as_bytes())?;
let options = CString::new(mnt_options)?;
let raw_fd = unsafe { fuse_open_channel(mountpoint.as_ptr(), options.as_ptr()) };
if raw_fd < 0 {
return Err(bterr!("an error occurred in fuse_open_channel"));
}
let file = unsafe { File::from_raw_fd(raw_fd) };
Ok(file)
}
pub struct FuseDaemon {
set: JoinSet<()>,
session: FuseSession,
}
impl FuseDaemon {
const FSNAME: &str = "btfuse";
const FSTYPE: &str = "bt";
pub fn new<P: 'static + FsProvider>(
mntdir: PathBuf,
mntoptions: &str,
num_tasks: Option<NonZeroUsize>,
fallback_path: Arc<BlockPath>,
mounted_signal: Option<oneshot::Sender<()>>,
provider: P,
) -> Result<Self> {
let server = Arc::new(Server::new(FuseFs::new(provider, fallback_path)));
let session = Self::session(mntdir, mntoptions)?;
if let Some(tx) = mounted_signal {
tx.send(())
.map_err(|_| bterr!("failed to send mounted signal"))?;
}
let mut set = JoinSet::new();
let num_tasks = if let Some(num_tasks) = num_tasks {
num_tasks
} else {
std::thread::available_parallelism()?
};
log::debug!("spawning {num_tasks} blocking tasks");
for task_num in 0..num_tasks.get() {
let server = server.clone();
let channel = session.new_channel()?;
let future = tokio::task::spawn_blocking(move || {
Self::server_loop(task_num, server, channel)
});
let future = future.map(|result| {
if let Err(err) = result {
error!("server_loop produced an error: {err}");
}
});
set.spawn(future);
}
Ok(Self { set, session })
}
fn session<T: AsRef<Path>>(mntdir: T, mntoptions: &str) -> Result<FuseSession> {
DirBuilder::new()
.recursive(true)
.create(mntdir.as_ref())
.map_err(|err| {
bterr!(err).context(format!(
"failed to create mntdir: '{}'",
mntdir.as_ref().display()
))
})?;
let mut session = FuseSession::new(mntdir.as_ref(), Self::FSNAME, Self::FSTYPE, false)?;
session.set_fuse_file(mount_at(mntdir, mntoptions)?);
Ok(session)
}
fn server_loop<P: 'static + FsProvider>(
task_num: usize,
server: Arc<Server<FuseFs<P>>>,
mut channel: FuseChannel,
) {
loop {
match channel.get_request() {
Ok(Some((reader, writer))) => {
if let Err(err) = server.handle_message(reader, writer.into(), None, None) {
error!("error while handling FUSE message: {err}");
}
}
Ok(None) => break,
Err(err) => {
match err {
transport::Error::SessionFailure(_) => break,
_ => error!("unexpected error from Channel::get_request: {err}"),
}
}
}
}
debug!("server_loop {task_num} exiting");
}
pub async fn finished(&mut self) {
while self.set.join_next().await.is_some() {}
}
}
impl Drop for FuseDaemon {
fn drop(&mut self) {
if let Err(err) = self.session.wake() {
error!("failed to wake FuseSession: {err}");
}
}
}
}