Skip to content

Instantly share code, notes, and snippets.

@farukyildiz
Created July 1, 2021 23:35
Show Gist options
  • Save farukyildiz/e2925c4b91ec7d24d182c48fda20efc9 to your computer and use it in GitHub Desktop.
Save farukyildiz/e2925c4b91ec7d24d182c48fda20efc9 to your computer and use it in GitHub Desktop.
Python lock test
import threading
from contextlib import contextmanager
import time
from concurrent.futures import ThreadPoolExecutor
@contextmanager
def acquire_timeout(lock, timeout):
result = lock.acquire(timeout=timeout)
yield result
if result:
lock.release()
# Usage:
lock = threading.Lock()
def _func(to):
with acquire_timeout(lock, to) as acquired:
if acquired:
print('got the lock')
time.sleep(to)
# do something ....
else:
print('timeout: lock not available')
# do something else ...
with ThreadPoolExecutor(max_workers=8) as executor:
print("thread loop............")
executor.submit(_func, 7)
executor.submit(_func, 1)
executor.submit(_func, 6)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment