use crate::fuse_fs::FuseFs;
use btfproto::server::FsProvider;
use btlib::{bterr, BlockPath, Result};
use fuse_backend_rs::{
    api::server::Server,
    transport::{self, FuseChannel, FuseSession},
};
use futures::future::FutureExt;
use log::{debug, error};
use std::path::PathBuf;
use std::{
    ffi::{c_char, c_int, CString},
    fs::File,
    os::{fd::FromRawFd, unix::ffi::OsStrExt},
    path::Path,
    sync::Arc,
};
use tokio::{sync::oneshot, task::JoinSet};
pub use private::FuseDaemon;
mod private {
    use std::{fs::DirBuilder, num::NonZeroUsize};
    use super::*;
    #[link(name = "fuse3")]
    extern "C" {
        fn fuse_open_channel(mountpoint: *const c_char, options: *const c_char) -> c_int;
    }
    fn mount_at<P: AsRef<Path>>(mnt_point: P, mnt_options: &str) -> Result<File> {
        let mountpoint = CString::new(mnt_point.as_ref().as_os_str().as_bytes())?;
        let options = CString::new(mnt_options)?;
        let raw_fd = unsafe { fuse_open_channel(mountpoint.as_ptr(), options.as_ptr()) };
        if raw_fd < 0 {
            return Err(bterr!("an error occurred in fuse_open_channel"));
        }
        let file = unsafe { File::from_raw_fd(raw_fd) };
        Ok(file)
    }
    pub struct FuseDaemon {
        set: JoinSet<()>,
        session: FuseSession,
    }
    impl FuseDaemon {
        const FSNAME: &str = "btfuse";
        const FSTYPE: &str = "bt";
        pub fn new<P: 'static + FsProvider>(
            mntdir: PathBuf,
            mntoptions: &str,
            num_tasks: Option<NonZeroUsize>,
            fallback_path: Arc<BlockPath>,
            mounted_signal: Option<oneshot::Sender<()>>,
            provider: P,
        ) -> Result<Self> {
            let server = Arc::new(Server::new(FuseFs::new(provider, fallback_path)));
            let session = Self::session(mntdir, mntoptions)?;
            if let Some(tx) = mounted_signal {
                tx.send(())
                    .map_err(|_| bterr!("failed to send mounted signal"))?;
            }
            let mut set = JoinSet::new();
            let num_tasks = if let Some(num_tasks) = num_tasks {
                num_tasks
            } else {
                std::thread::available_parallelism()?
            };
            log::debug!("spawning {num_tasks} blocking tasks");
            for task_num in 0..num_tasks.get() {
                let server = server.clone();
                let channel = session.new_channel()?;
                let future = tokio::task::spawn_blocking(move || {
                    Self::server_loop(task_num, server, channel)
                });
                let future = future.map(|result| {
                    if let Err(err) = result {
                        error!("server_loop produced an error: {err}");
                    }
                });
                set.spawn(future);
            }
            Ok(Self { set, session })
        }
        fn session<T: AsRef<Path>>(mntdir: T, mntoptions: &str) -> Result<FuseSession> {
            DirBuilder::new()
                .recursive(true)
                .create(mntdir.as_ref())
                .map_err(|err| {
                    bterr!(err).context(format!(
                        "failed to create mntdir: '{}'",
                        mntdir.as_ref().display()
                    ))
                })?;
            let mut session = FuseSession::new(mntdir.as_ref(), Self::FSNAME, Self::FSTYPE, false)?;
            session.set_fuse_file(mount_at(mntdir, mntoptions)?);
            Ok(session)
        }
        fn server_loop<P: 'static + FsProvider>(
            task_num: usize,
            server: Arc<Server<FuseFs<P>>>,
            mut channel: FuseChannel,
        ) {
            loop {
                match channel.get_request() {
                    Ok(Some((reader, writer))) => {
                        if let Err(err) = server.handle_message(reader, writer.into(), None, None) {
                            error!("error while handling FUSE message: {err}");
                        }
                    }
                    Ok(None) => break,
                    Err(err) => {
                        match err {
                            transport::Error::SessionFailure(_) => break,
                            _ => error!("unexpected  error from Channel::get_request: {err}"),
                        }
                    }
                }
            }
            debug!("server_loop {task_num} exiting");
        }
        pub async fn finished(&mut self) {
            while self.set.join_next().await.is_some() {}
        }
    }
    impl Drop for FuseDaemon {
        fn drop(&mut self) {
            if let Err(err) = self.session.wake() {
                error!("failed to wake FuseSession: {err}");
            }
        }
    }
}