1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
// SPDX-License-Identifier: AGPL-3.0-or-later
use crate::fuse_fs::FuseFs;

use btfproto::server::FsProvider;
use btlib::{bterr, BlockPath, Result};
use fuse_backend_rs::{
    api::server::Server,
    transport::{self, FuseChannel, FuseSession},
};
use futures::future::FutureExt;
use log::{debug, error};
use std::path::PathBuf;
use std::{
    ffi::{c_char, c_int, CString},
    fs::File,
    os::{fd::FromRawFd, unix::ffi::OsStrExt},
    path::Path,
    sync::Arc,
};
use tokio::{sync::oneshot, task::JoinSet};

pub use private::FuseDaemon;

mod private {
    use std::{fs::DirBuilder, num::NonZeroUsize};

    use super::*;

    #[link(name = "fuse3")]
    extern "C" {
        /// Opens a channel to the kernel.
        fn fuse_open_channel(mountpoint: *const c_char, options: *const c_char) -> c_int;
    }

    /// Calls into libfuse3 to mount this file system at the given path. The file descriptor to use
    /// to communicate with the kernel is returned.
    fn mount_at<P: AsRef<Path>>(mnt_point: P, mnt_options: &str) -> Result<File> {
        let mountpoint = CString::new(mnt_point.as_ref().as_os_str().as_bytes())?;
        let options = CString::new(mnt_options)?;
        // Safety: mountpoint and options are both valid C strings.
        let raw_fd = unsafe { fuse_open_channel(mountpoint.as_ptr(), options.as_ptr()) };
        // According to the fuse3 docs only -1 will be returned on error.
        // source: http://libfuse.github.io/doxygen/fuse_8h.html#a9e8c9af40b22631f9f2636019cd073b6
        if raw_fd < 0 {
            return Err(bterr!("an error occurred in fuse_open_channel"));
        }
        // Safety: raw_rd is positive, indicating that fuse_open_channel succeeded.
        let file = unsafe { File::from_raw_fd(raw_fd) };
        Ok(file)
    }

    pub struct FuseDaemon {
        set: JoinSet<()>,
        session: FuseSession,
    }

    impl FuseDaemon {
        const FSNAME: &str = "btfuse";
        const FSTYPE: &str = "bt";

        pub fn new<P: 'static + FsProvider>(
            mntdir: PathBuf,
            mntoptions: &str,
            num_tasks: Option<NonZeroUsize>,
            fallback_path: Arc<BlockPath>,
            mounted_signal: Option<oneshot::Sender<()>>,
            provider: P,
        ) -> Result<Self> {
            let server = Arc::new(Server::new(FuseFs::new(provider, fallback_path)));
            let session = Self::session(mntdir, mntoptions)?;
            if let Some(tx) = mounted_signal {
                tx.send(())
                    .map_err(|_| bterr!("failed to send mounted signal"))?;
            }
            let mut set = JoinSet::new();
            let num_tasks = if let Some(num_tasks) = num_tasks {
                num_tasks
            } else {
                std::thread::available_parallelism()?
            };
            log::debug!("spawning {num_tasks} blocking tasks");
            for task_num in 0..num_tasks.get() {
                let server = server.clone();
                let channel = session.new_channel()?;
                let future = tokio::task::spawn_blocking(move || {
                    Self::server_loop(task_num, server, channel)
                });
                let future = future.map(|result| {
                    if let Err(err) = result {
                        error!("server_loop produced an error: {err}");
                    }
                });
                set.spawn(future);
            }
            Ok(Self { set, session })
        }

        fn session<T: AsRef<Path>>(mntdir: T, mntoptions: &str) -> Result<FuseSession> {
            DirBuilder::new()
                .recursive(true)
                .create(mntdir.as_ref())
                .map_err(|err| {
                    bterr!(err).context(format!(
                        "failed to create mntdir: '{}'",
                        mntdir.as_ref().display()
                    ))
                })?;
            let mut session = FuseSession::new(mntdir.as_ref(), Self::FSNAME, Self::FSTYPE, false)?;
            session.set_fuse_file(mount_at(mntdir, mntoptions)?);
            Ok(session)
        }

        /// Opens a channel to the kernel and processes messages received in an infinite loop.
        fn server_loop<P: 'static + FsProvider>(
            task_num: usize,
            server: Arc<Server<FuseFs<P>>>,
            mut channel: FuseChannel,
        ) {
            loop {
                match channel.get_request() {
                    Ok(Some((reader, writer))) => {
                        // Safety: reader and writer are not mutated while the future returned from
                        // async_handle_message is alive.
                        if let Err(err) = server.handle_message(reader, writer.into(), None, None) {
                            error!("error while handling FUSE message: {err}");
                        }
                    }
                    Ok(None) => break,
                    Err(err) => {
                        match err {
                            // Occurs when the file system is unmounted.
                            transport::Error::SessionFailure(_) => break,
                            _ => error!("unexpected  error from Channel::get_request: {err}"),
                        }
                    }
                }
            }
            debug!("server_loop {task_num} exiting");
        }

        pub async fn finished(&mut self) {
            while self.set.join_next().await.is_some() {}
        }
    }

    impl Drop for FuseDaemon {
        fn drop(&mut self) {
            if let Err(err) = self.session.wake() {
                error!("failed to wake FuseSession: {err}");
            }
        }
    }
}