Skip to content

Instantly share code, notes, and snippets.

@mikitsu
Created November 7, 2020 17:23
Show Gist options
  • Save mikitsu/5166e8bfb072ee943daa5defb5ff0349 to your computer and use it in GitHub Desktop.
Save mikitsu/5166e8bfb072ee943daa5defb5ff0349 to your computer and use it in GitHub Desktop.
Simple Python FUSE filesystem for firejail sandboxes

FirejailFS

Requirements: Python, fusepy and firejail.

Usage: FirejailFS.py sandboxname /path/to/mountpoint

The filesystem supports everything available via firejail's own --get, --put and --ls options. The main intended usage is to be able to use your shell's path completition features when getting a file from a sandbox. Limitations include no correct timestamps, no possibility to delete files (which includes not being able to properly move them out of the sandbox), no link counts and questionable support for non-regular files.

#!/usr/bin/env python3
"""Firejail FS"""
import re
import argparse
import subprocess
import tempfile
import stat
import time
import functools
import logging
import shlex
import os
import shutil
import pathlib
import errno
import pwd
import grp
import fuse
def lru_cache_time(seconds, maxsize=128):
"""
Adds time aware caching to lru_cache
adapted from https://stackoverflow.com/a/57300326
"""
def dec(func):
@functools.lru_cache(maxsize)
def inner_wrapper(__ttl, *args, **kwargs):
"""
Main wrapper, note that the first argument (ttl) is not passed down.
This is because no function should bother to know that this is here.
"""
return func(*args, **kwargs)
@functools.wraps(inner_wrapper,
assigned=functools.WRAPPER_ASSIGNMENTS
+ ('cache_clear', 'cache_info'))
def wrapper(*args, **kwargs):
"""outer wrapper adds __ttl to cached inner wrapper call"""
return inner_wrapper(round(time.time() / seconds), *args, **kwargs)
return wrapper
return dec
def parse_ls_output(data):
"""seems to be the best firejail provides..."""
def parse_mode(text_mode):
# reverse-copied from
# https://github.com/coreutils/gnulib/blob/master/lib/filemode.c
# commented-out lines don't exist in (my?) Python
mode = {
'-': stat.S_IFREG,
'd': stat.S_IFDIR,
'b': stat.S_IFBLK,
'c': stat.S_IFCHR,
'l': stat.S_IFLNK,
'p': stat.S_IFIFO,
's': stat.S_IFSOCK,
# 'C': stat.S_IFCTG,
'D': stat.S_IFDOOR,
# 'm': stat.S_IFMPB,
# 'n': stat.S_IFNWK,
'P': stat.S_IFPORT,
'w': stat.S_IFWHT,
}.get(text_mode[0], 0)
if text_mode[3] in 'sS':
mode |= stat.S_ISUID
if text_mode[6] in 'sS':
mode |= stat.S_ISGID
if text_mode[9] in 'tT':
mode |= stat.S_ISVTX
text_mode = re.sub(
'(?i)s|t',
lambda m: '-' if m.group(0).isupper() else 'x',
text_mode[1:],
)
for ent, char in zip((*['USR']*3, *['GRP']*3, *['OTH']*3), text_mode):
if char != '-':
mode |= getattr(stat, f'S_I{char.upper()}{ent}')
return mode
r = {}
for line in data.splitlines():
# usernames: https://serverfault.com/a/578264
m = re.match(
r'([-dbclpsDPw][-rwxsStT]{9}) '
'([\w.-]+) +([\w.-]+) +(\d+) (.+)', line)
if m is None:
continue # files with newlines around
mode, owner, group, size, name = m.groups()
r[name] = dict(
st_mode=parse_mode(mode),
st_uid=pwd.getpwnam(owner).pw_uid,
st_gid=grp.getgrnam(group).gr_gid,
st_size=int(size),
)
return r
class FirejailFS(fuse.Operations):
def __init__(self, sandbox):
self.sandbox = sandbox
self.open_files = {}
self.tmpdir = pathlib.Path(tempfile.mkdtemp())
self.datadir = pathlib.Path(tempfile.mkdtemp())
self.fd = 0
self.invalidate_caches = self.get_dir_info.cache_clear
def __del__(self):
shutil.rmtree(self.tmpdir)
shutil.rmtree(self.datadir)
def firejail_command(self, command, *args, **kwargs):
cmdline = ('firejail', f'--{command}={self.sandbox}') + args
logging.info('calling %s', shlex.join(cmdline))
r = subprocess.run(
cmdline,
capture_output=True,
**kwargs,
)
if r.returncode:
if b'Error: Cannot access' in r.stderr:
raise fuse.FuseOSError(errno.EACCES)
elif b'Warning: cannot open source file' in r.stderr:
raise fuse.FuseOSError(errno.EACCES)
elif b'is an invalid filename' in r.stderr:
raise fuse.FuseOSError(errno.EINVAL)
else:
raise RuntimeError('firejail exited with status '
f'{r.returncode} and stderr "{r.stderr}"')
return r.stdout.decode(errors='replace')
@lru_cache_time(2)
def get_dir_info(self, path):
return parse_ls_output(self.firejail_command('ls', path))
### --- Start FUSE operations ---
def create(self, path, mode):
outer_tmp_fd, outer_path = tempfile.mkstemp(dir=self.datadir)
os.chmod(outer_path, mode)
os.close(outer_tmp_fd)
self.fd += 1
self.open_files[self.fd] = open(outer_path, 'w+b')
self.flush(path, self.fd)
self.invalidate_caches()
logging.info(f'created {path}')
return self.fd
def flush(self, path, fh):
f = self.open_files[fh]
if '+' in f.mode or 'w' in f.mode:
logging.debug(f'flushing {path}')
os.fsync(f.fileno())
self.firejail_command('put', f.name, path)
self.invalidate_caches()
# fsync not implemented
def getattr(self, path, fh=None):
logging.debug(f'stat {path}')
defaults = dict(
st_ctime=time.time(),
st_mtime=time.time(),
st_atime=time.time(),
)
if path == '/':
stat_result = os.stat('/')
return {k: getattr(stat_result, k) for k in dir(stat_result)
if k.startswith('st_')}
dirname, basename = os.path.split(path)
try:
data = self.get_dir_info(dirname)[basename]
except KeyError:
raise fuse.FuseOSError(errno.ENOENT)
return {**data, **defaults}
# getxattr not implemented
# listxattr not implemented
# mkdir not implemented
def open(self, path, flags):
if flags & os.O_PATH:
raise fuse.FuseOSError(errno.ENOSYS)
flags &= (os.O_RDONLY | os.O_RDWR | os.O_WRONLY)
mode = {
os.O_RDONLY: 'rb',
os.O_RDWR: 'rb+',
os.O_WRONLY | os.O_APPEND: 'ab',
os.O_WRONLY | os.O_TRUNC: 'wb',
os.O_RDWR | os.O_TRUNC: 'wb+',
os.O_WRONLY: 'rb+', # probably closest I can get
}[flags]
outer_path = tempfile.mktemp(dir=self.datadir)
directory, base_name = os.path.split(path)
if base_name not in self.readdir(directory, None):
return self.create(path, 0o644) # should probably read umask here...
self.firejail_command('get', path, cwd=self.tmpdir)
shutil.move(self.tmpdir/base_name, outer_path)
self.fd += 1
self.open_files[self.fd] = open(outer_path, mode)
return self.fd
def read(self, path, size, offset, fh):
f = self.open_files[fh]
f.seek(offset)
return f.read(size)
def readdir(self, path, fh):
return self.get_dir_info(path).keys()
# readlink not implemented
def release(self, path, fh):
logging.debug(f'releasing {path}')
f = self.open_files.pop(fh)
if '+' in f.mode or 'w' in f.mode:
# I have no idea why, but the following two lines are important
f.seek(0)
f.read()
os.fsync(f.fileno())
self.firejail_command('put', f.name, path)
self.invalidate_caches()
os.unlink(f.name)
# removexattr not implemented
# rename not implemented
# rmdir not implemented
# setxattr not implemented
# statfs not implemented
# symlink not implemented
def truncate(self, path, length, fh=None):
release = False
if fh is None:
fh = self.open(path, os.O_WRONLY)
release = True
self.open_files[fh].truncate(length)
if release:
self.release(path, fh)
# unlink not implemented
# utimens not implemented
def write(self, path, data, offset, fh):
logging.debug(f'write {data} to {path}')
f = self.open_files[fh]
f.seek(offset)
return f.write(data)
def get_args():
parser = argparse.ArgumentParser()
parser.add_argument('sandbox')
parser.add_argument('mountpoint')
parser.add_argument(
'--loglevel',
default='INFO',
type=str.upper,
choices=('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'),
)
return parser.parse_args()
if __name__ == '__main__':
args = get_args()
logging.basicConfig(
level=getattr(logging, args.loglevel),
format='%(asctime)s - %(levelname)s: %(message)s',
)
fuse = fuse.FUSE(FirejailFS(args.sandbox), args.mountpoint, foreground=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment