Skip to content

Instantly share code, notes, and snippets.

@pepoluan
Last active September 10, 2024 08:16
Show Gist options
  • Save pepoluan/ad9d9f9f818361ccaf07a4af541603ba to your computer and use it in GitHub Desktop.
Save pepoluan/ad9d9f9f818361ccaf07a4af541603ba to your computer and use it in GitHub Desktop.
sponge utility -- in Python
#!/usr/bin/env python3
# SPDX-License-Identifier: MPL-2.0
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
from __future__ import annotations
import argparse
import sys
import tempfile
from io import BytesIO
from pathlib import Path
from typing import TYPE_CHECKING, BinaryIO, Protocol, cast
if TYPE_CHECKING:
from contextlib import AbstractContextManager
DEFA_CHUNK_SIZE: int = 1024 * 1024
class _Options(Protocol):
chunksize: int
no_clobber: bool
# noinspection PyUnresolvedReferences
tmpfile: Ellipsis | None | str
target: Path
def _get_options() -> _Options:
parser = argparse.ArgumentParser(
description="Soaks up all input (from STDIN) before saving to file (or re-emitting to STDOUT)",
epilog=(
"WARNING: If size of input is particularly large, consider using --tmpfile or you might run out of memory"
),
)
parser.add_argument(
"--chunksize",
"-c",
metavar="BYTES",
type=int,
default=DEFA_CHUNK_SIZE,
help=f"Chunk size when using a temp file. Default = {DEFA_CHUNK_SIZE:_}",
)
parser.add_argument(
"--no-clobber",
"-n",
action="store_true",
default=False,
help="Do not clobber existing file. No effect if target is STDOUT",
)
parser.add_argument(
"--tmpfile",
"-t",
metavar="FILE",
nargs="?",
default=Ellipsis,
help=(
"Use a tempfile to absorb stdin. Can optionally specify name of temp file. NOTE: If this option is "
"specified without a filename, you MUST specify '--' before specifying the target file. WARNING: If "
"tempfile exists, it will clobber the contents of the file!"
),
)
parser.add_argument("target", type=Path, nargs="?", help="Target file. If not specified, or is '-', dump to stdout")
return cast(_Options, parser.parse_args())
def _absorb(sponge: BinaryIO) -> None:
inp = sys.stdin.buffer
while incoming := inp.read(8192):
sponge.write(incoming)
def _main(opts: _Options) -> None:
if opts.target and opts.target.name != "-":
if opts.no_clobber and opts.target.exists():
print(f"ERROR: Target file exists: '{opts.target}'", file=sys.stderr, flush=True)
sys.exit(1)
# We annot define _output_context directly at this point, because if we do that the file will be truncated,
# thus defeating the purpose of sponge when both stdin and stdout are backed by the same file.
# E.g. :
# awk '{ some_awk_program }' file1 | sponge file1
def _output_context() -> AbstractContextManager:
return Path(opts.target).open("w+b") # noqa: SIM115
else:
def _output_context() -> AbstractContextManager:
return sys.stdout.buffer
if opts.tmpfile is Ellipsis:
# Ellipsis is not the same as user specifying "..." on the command line
# It is an object that users are unable to specify as an option, acting as the default value for the option
# if user does not use the "--tmpfile" option.
sponge_context = BytesIO()
elif opts.tmpfile is None:
# Auto-generated file
sponge_context = tempfile.NamedTemporaryFile(delete=False)
else:
sponge_context = Path(opts.tmpfile).open("w+b") # noqa: SIM115
with sponge_context as sponge:
_absorb(sponge)
with _output_context() as fout:
if opts.tmpfile is Ellipsis:
# Use fast .getvalue() method if using in-memory buffer
fout.write(sponge.getvalue())
else:
sponge.seek(0)
while outgoing := sponge.read(opts.chunksize):
fout.write(outgoing)
if opts.tmpfile is None:
Path(sponge_context.name).unlink(missing_ok=True)
if __name__ == "__main__":
options = _get_options()
_main(options)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment