Skip to content

Instantly share code, notes, and snippets.

@ultrafunkamsterdam
Created August 25, 2024 20:46
Show Gist options
  • Save ultrafunkamsterdam/b83149076468695ffd7fc88cb50fd2b6 to your computer and use it in GitHub Desktop.
Save ultrafunkamsterdam/b83149076468695ffd7fc88cb50fd2b6 to your computer and use it in GitHub Desktop.
Fastapi File Streaming Response (HTML Video etc.)
import mimetypes
from pathlib import Path
import fastapi
import aiofiles
router = fastapi.APIRouter()
@router.get('{path:path}')
async def streaming_endpoint(path: str, range: str = fastapi.Header()):
chunk_size = 2 ** 20 # 1 MB
disk_path = Path(path)
if not disk_path.exists():
raise fastapi.HTTPException(404, "not found")
filesize = disk_path.stat().st_size
typ, *ext = mimetypes.guess_type(disk_path.as_uri())
if not typ:
typ = "application/octet-stream"
media_type = typ
start, end = range.split('=')[-1].split('-')
start = int(start)
end = (
int(end)
if end
else (start + chunk_size - 1)
)
size = end - start
async with aiofiles.open(disk_path, mode='rb') as fh:
await fh.seek(start)
data = await fh.read(size + 1)
headers = {
'Content-Range': f'bytes {start}-{end}/{filesize}',
'Accept-Ranges': 'bytes'
}
return fastapi.responses.Response(data, status_code=206, headers=headers, media_type=media_type)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment