Skip to content

Instantly share code, notes, and snippets.

@achinta
Created January 29, 2021 13:06
Show Gist options
  • Save achinta/e5413753d385ad8c84fc0489a01560b5 to your computer and use it in GitHub Desktop.
Save achinta/e5413753d385ad8c84fc0489a01560b5 to your computer and use it in GitHub Desktop.
Lock File Decorator
from pathlib import Path
import time
import json
from typing import Dict
def lock_file(func):
'''
Decorator which
'''
def wrapper(path: Path, data):
# before writing ensure there is no lock
assert path.exists()
lock_path = Path(path.resolve().as_posix() + '.LCK')
# how long we have been waiting
wait_secs = 0
while lock_path.exists() and wait_secs < 10:
print(f'lock exists for {path}. Sleeping....')
time.sleep(1)
wait_secs += 1
lock_path.touch()
print(f'created lock file for {path}')
# call the decorated function
func(path, data)
# remove the lock
if lock_path.exists():
print(f'Done. Removing lock for {path}. Sleeping')
lock_path.unlink()
return wrapper
@lock_file
def update_json(path: Path, data: Dict):
with open(path, 'r') as f:
file_json = json.load(f)
file_json.update(data)
with open(path, 'w') as f:
json.dump(file_json, f)
@lock_file
def update_json_slow(path: Path, data: Dict):
time.sleep(8)
with open(path, 'r') as f:
file_json = json.load(f)
file_json.update(data)
with open(path, 'w') as f:
json.dump(file_json, f)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment