pub use private::Trailered;
mod private {
use std::{
io::{self, BufReader, Seek, SeekFrom},
marker::PhantomData,
};
use btserde::{read_from, write_to};
use positioned_io::{ReadAt, Size, WriteAt};
use serde::{de::DeserializeOwned, Serialize};
use crate::{
bterr, BoxInIoErr, Cursor, Decompose, FlushMeta, Result, Sectored, SizeExt, WriteInteg,
};
pub struct Trailered<T, D> {
inner: T,
body_len: u64,
phantom: PhantomData<D>,
write_buf: Vec<u8>,
}
impl<T: ReadAt + Size, D: DeserializeOwned> Trailered<T, D> {
pub fn empty(inner: T) -> Trailered<T, D> {
Trailered {
inner,
body_len: 0,
phantom: PhantomData,
write_buf: Vec::new(),
}
}
pub fn new(inner: T) -> Result<(Trailered<T, D>, Option<D>)> {
let end = inner.size_or_err()?;
if 0 == end {
return Ok((Self::empty(inner), None));
}
let mut reader = BufReader::new(Cursor::new(inner));
let offset: i64 = std::mem::size_of::<i64>() as i64;
if end < offset as u64 {
return Err(bterr!("inner stream is non-empty but too small"));
}
reader.seek(SeekFrom::End(-offset))?;
let offset: i64 = read_from(&mut reader)?;
if end < offset.unsigned_abs() {
return Err(bterr!("inner stream is non-empty but too small"));
}
let body_len = reader.seek(SeekFrom::End(offset))?;
let trailer: D = read_from(&mut reader)?;
let inner = reader.into_inner().into_inner();
Ok((
Trailered {
inner,
body_len,
phantom: PhantomData,
write_buf: Vec::new(),
},
Some(trailer),
))
}
}
impl<T, D> Trailered<T, D> {
fn update_body_len(&mut self, pos: u64, written: usize) -> usize {
let new_pos = pos + written as u64;
self.body_len = self.body_len.max(new_pos);
written
}
}
impl<T: ReadAt, D> ReadAt for Trailered<T, D> {
fn read_at(&self, pos: u64, buf: &mut [u8]) -> io::Result<usize> {
if pos > self.body_len {
return Err(bterr!("pos {pos} is past the end of body ({})", self.body_len).into());
}
let available_u64 = self.body_len - pos;
let available: usize = available_u64.try_into().box_err()?;
let limit = buf.len().min(available);
self.inner.read_at(pos, &mut buf[..limit])
}
}
impl<T: WriteAt, D: Serialize> Trailered<T, D> {
pub fn write_at(&mut self, pos: u64, buf: &[u8]) -> io::Result<usize> {
let written = self.inner.write_at(pos, buf)?;
Ok(self.update_body_len(pos, written))
}
fn write_trailer(&mut self, trailer: &D) -> io::Result<()> {
self.write_buf.clear();
write_to(trailer, &mut self.write_buf)?;
let write_buf_len = (self.write_buf.len() + std::mem::size_of::<i64>()) as u64;
let offset: i64 = write_buf_len.try_into().box_err()?;
write_to(&(-offset), &mut self.write_buf)?;
self.inner.write_all_at(self.body_len, &self.write_buf)?;
Ok(())
}
pub fn flush(&mut self, trailer: &D) -> io::Result<()> {
self.write_trailer(trailer)?;
self.inner.flush()
}
}
impl<T: WriteInteg + Size, D: Serialize> Trailered<T, D> {
pub fn flush_integ(&mut self, trailer: &D, integrity: &[u8]) -> io::Result<()> {
self.write_trailer(trailer)?;
self.inner.flush_integ(integrity)
}
}
impl<T, D> Decompose<T> for Trailered<T, D> {
fn into_inner(self) -> T {
self.inner
}
}
impl<U, T: AsRef<U>, D> AsRef<U> for Trailered<T, D> {
fn as_ref(&self) -> &U {
self.inner.as_ref()
}
}
impl<U, T: AsMut<U>, D> AsMut<U> for Trailered<T, D> {
fn as_mut(&mut self) -> &mut U {
self.inner.as_mut()
}
}
impl<T: FlushMeta, D> Trailered<T, D> {
pub fn flush_meta(&mut self) -> Result<()> {
self.inner.flush_meta()
}
}
impl<T: Sectored, D> Sectored for Trailered<T, D> {
fn sector_sz(&self) -> usize {
self.inner.sector_sz()
}
}
impl<T, D> Size for Trailered<T, D> {
fn size(&self) -> io::Result<Option<u64>> {
Ok(Some(self.body_len))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use positioned_io::ReadAt;
use crate::{test_helpers::BtCursor as Cursor, Decompose};
#[test]
fn trailered_new_empty() {
let cursor = Cursor::new(Vec::new());
let (_, trailer): (_, Option<String>) =
Trailered::new(cursor).expect("Trailered::new failed");
assert_eq!(None, trailer);
}
#[test]
fn trailered_new_inner_too_short_is_error() {
let cursor = Cursor::new([0u8; 5]);
let result = Trailered::<_, u128>::new(cursor);
assert!(result.is_err())
}
#[test]
fn trailered_trailer_persisted() {
const EXPECTED: &str = "Everyone deserves to be remembered,";
let cursor = {
let cursor = Cursor::new(Vec::new());
let (mut trailered, trailer) =
Trailered::<_, String>::new(cursor).expect("Trailered::new failed");
assert!(trailer.is_none());
trailered
.flush(&EXPECTED.to_string())
.expect("flush failed");
trailered.into_inner()
};
let (_, trailer) = Trailered::<_, String>::new(cursor).expect("Trailered::new failed");
assert_eq!(EXPECTED, trailer.unwrap());
}
#[test]
fn trailered_written_data_persisted() {
const EXPECTED: &[u8] = b"and every life has something to teach us.";
let cursor = {
let (mut trailered, _) = Trailered::<_, u8>::new(Cursor::new(Vec::new()))
.expect("failed to create first trailered");
trailered.write_at(0, EXPECTED).expect("write failed");
trailered.flush(&1).expect("flush failed");
trailered.into_inner()
};
let (trailered, _) =
Trailered::<_, u8>::new(cursor).expect("failed to created second trailered");
let mut actual = vec![0u8; EXPECTED.len()];
trailered.read_at(0, &mut actual).expect("read failed");
assert_eq!(EXPECTED, actual);
}
#[test]
fn trailered_read_limited_to_body_len() {
const EXPECTED: &[u8] = &[1, 1, 1, 1, 1, 0, 0, 0];
let (mut trailered, ..) =
Trailered::new(Cursor::new(Vec::new())).expect("failed to create Trailered");
trailered.write_at(0, &[1u8; 5]).expect("write failed");
trailered.flush(&1u8).expect("flush failed");
let mut actual = vec![0u8; EXPECTED.len()];
trailered.read_at(0, &mut actual).expect("read failed");
assert_eq!(EXPECTED, actual);
}
}