Skip to content

Instantly share code, notes, and snippets.

@mooreniemi
Created May 19, 2024 19:31
Show Gist options
  • Save mooreniemi/8d37e5be1729b308ea99f9b25d114fc7 to your computer and use it in GitHub Desktop.
Save mooreniemi/8d37e5be1729b308ea99f9b25d114fc7 to your computer and use it in GitHub Desktop.
`SharedFile` using `unsafe`
[package]
name = "delta_fake"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow = "1.0.86"
fd-lock = "4.0.2"
nix = "0.28.0"
rand = "0.8.5"
use anyhow::{Context, Result};
use libc::{flock, F_RDLCK, F_SETLK, F_SETLKW, F_UNLCK, F_WRLCK, SEEK_SET};
use nix::libc;
use std::fs::{File, OpenOptions};
use std::os::fd::AsRawFd;
use std::os::unix::io::RawFd;
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
use std::{fmt, io};
fn seekless_read(fd: i32, buffer: &mut [u8], offset: i64) -> io::Result<usize> {
let ret = unsafe {
libc::pread(
fd,
buffer.as_mut_ptr() as *mut libc::c_void,
buffer.len(),
offset,
)
};
if ret < 0 {
Err(io::Error::last_os_error())
} else {
Ok(ret as usize)
}
}
fn seekless_write(fd: i32, buffer: &[u8], offset: i64) -> io::Result<usize> {
let ret = unsafe {
libc::pwrite(
fd,
buffer.as_ptr() as *const libc::c_void,
buffer.len(),
offset,
)
};
if ret < 0 {
Err(io::Error::last_os_error())
} else {
Ok(ret as usize)
}
}
/// Attempts to acquire a lock on a file descriptor with a specified timeout.
///
/// # Arguments
/// * `fd` - The file descriptor on which to acquire the lock.
/// * `lock` - The lock structure describing the type and specifics of the lock.
/// * `timeout_override` - Duration to keep trying before giving up.
///
/// # Returns
/// * `Ok(())` if the lock was successfully acquired.
/// * `Err(io::Error)` if the lock could not be acquired within the timeout.
fn acquire_lock_with_timeout(
fd: RawFd,
lock: &mut flock,
timeout_override: Option<Duration>,
) -> io::Result<()> {
let timeout = timeout_override.unwrap_or(Duration::from_millis(1));
let start_time = Instant::now();
let action = if lock.l_type == libc::F_WRLCK {
"write"
} else {
"read"
};
loop {
if unsafe { libc::fcntl(fd, F_SETLKW, &mut *lock) } != -1 {
return Ok(());
} else if let Some(error_code) = io::Error::last_os_error().raw_os_error() {
match error_code {
libc::EACCES | libc::EAGAIN => {
if start_time.elapsed() > timeout {
return Err(io::Error::new(
io::ErrorKind::TimedOut,
format!("{} lock acquisition timed out", action),
));
}
println!("Timeout not reached, will sleep to acquire lock.");
thread::sleep(Duration::from_micros(10));
}
_ => {
eprintln!(
"Failed to acquire {} lock, unexpected error code: {}.",
action, error_code
);
return Err(io::Error::last_os_error());
}
}
}
}
}
/// A `SharedFile` allows for multiple threads to `read` and `write` with byte range locks.
/// For this, a `file` is opened and the raw file descriptor is used to coordinate the lock.
/// Because we're using the `fd` this way, we must avoid using `seek` on the `file` since
/// that is actually also a shared pointer it will race when different threads use it. Thus
/// we use wrapped calls to `pread` and `pwrite` which can change the byte ranges without
/// relying on the `seek`.
#[derive(Clone)]
struct SharedFile {
file: Arc<File>,
path: String,
}
impl fmt::Debug for SharedFile {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("SharedFile")
.field("path", &self.path)
.finish()
}
}
impl SharedFile {
fn new(path: &str) -> Result<Self> {
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(path)?;
Ok(Self {
file: Arc::new(file),
path: path.to_string(),
})
}
/// This is approximate because we are not guaranteeing that in-flight writes are commited.
fn approx_len(self) -> Result<u64> {
let file = Arc::clone(&self.file);
// NOTE: we do not want to use any technique with `seek` even if it is faster
Ok(file.metadata().context("reading file metadata")?.len())
}
/// Depends on `approx_len` so is not guaranteed to append exactly at end.
fn append(&self, data: &[u8]) -> Result<()> {
let end = <SharedFile as Clone>::clone(&self).approx_len()?;
// NOTE: we do not want to use any technique with `seek` even if it is faster
self.write(end, data)?;
Ok(())
}
/// Read lock a given range of a file and read from it.
fn read(&self, start: u64, length: usize) -> Result<Vec<u8>> {
let file = Arc::clone(&self.file);
// we avoid seek because this would be shared across threads!
let file = file.try_clone()?;
let mut buffer = vec![0u8; length];
let mut lock = flock {
l_type: F_RDLCK,
l_whence: SEEK_SET as i16,
l_start: start as libc::off_t,
l_len: length as libc::off_t,
l_pid: 0,
};
acquire_lock_with_timeout(file.as_raw_fd(), &mut lock, None)?;
seekless_read(file.as_raw_fd(), &mut buffer, start.try_into().unwrap())?;
lock.l_type = F_UNLCK;
unsafe {
libc::fcntl(file.as_raw_fd(), F_SETLK, &mut lock);
}
Ok(buffer)
}
/// Write lock a given range of a file and write to it.
fn write(&self, start: u64, mut data: &[u8]) -> Result<()> {
let file = Arc::clone(&self.file);
let file = file.try_clone()?;
let mut lock = flock {
l_type: F_WRLCK,
l_whence: SEEK_SET as i16,
l_start: start as libc::off_t,
l_len: data.len() as libc::off_t,
l_pid: 0,
};
acquire_lock_with_timeout(file.as_raw_fd(), &mut lock, None)?;
seekless_write(file.as_raw_fd(), &mut data, start.try_into().unwrap())?;
lock.l_type = F_UNLCK;
unsafe {
libc::fcntl(file.as_raw_fd(), F_SETLK, &mut lock);
}
Ok(())
}
}
fn main() -> Result<()> {
let shared_file = SharedFile::new("/tmp/foo.txt")?;
shared_file.append(b"Initial data, ")?;
// can use this to position randomly inside
let initial_len = shared_file.clone().approx_len()?;
println!("initial length of file: {}", initial_len);
let handle1 = thread::spawn({
let file = shared_file.clone();
move || {
let data = file.read(0, 10).expect("handle1 read data");
println!("Reader 1: {:?}", String::from_utf8_lossy(&data));
}
});
let handle2 = thread::spawn({
let file = shared_file.clone();
move || {
file.write(5, b"Hello").expect("handle2 wrote data");
println!("Writer 1 wrote 'Hello'");
}
});
let handle3 = thread::spawn({
let file = shared_file.clone();
move || {
let data = file.read(0, 7).expect("handle3 read data");
println!("Reader 2: {:?}", String::from_utf8_lossy(&data));
}
});
let handle4 = thread::spawn({
let file = shared_file.clone();
move || {
file.write(5, b"Hallo").expect("handle4 wrote data");
println!("Writer 2 wrote 'Hallo'");
}
});
handle1.join().unwrap();
handle2.join().unwrap();
handle3.join().unwrap();
handle4.join().unwrap();
let mut final_content = String::new();
let mut file = OpenOptions::new().read(true).open("/tmp/foo.txt")?;
io::Read::read_to_string(&mut file, &mut final_content)?;
println!("Final content of the file: {}", final_content);
Ok(())
}
@mooreniemi
Copy link
Author

This example of course is a huge "foot gun" given you are empowered to write over whatever sections you want but in practice if you have variable length data (or data varies as you want to update it) you'd still need to pay the cost of tracking what's active so you don't corrupt what exists.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment