Python Locker Implementation Notes ---- python 中锁的实现
文章目录
分布式锁
portalocker.RedisLock
实现方式
- redis pubsub channel
创建
1 2 3 4 5 6 7 8import portalocker lock = portalocker.RedisLock('some_lock_channel_name') with lock: print('do something here') # *** Redis 连接设定Redis 连接设定
- 使用 redis.client.Redis 类 实现
- 用 RedisLock 可选关键字参数
redis_kwargs: typing.Optional[typing.Dict] = None指定 参考
1 2 3 4 5 6 7 8 9 10 11 12 13 14class Redis: def __init__(self, host='localhost', port=6379, db=0, password=None, socket_timeout=None, socket_connect_timeout=None, socket_keepalive=None, socket_keepalive_options=None, connection_pool=None, unix_socket_path=None, encoding='utf-8', encoding_errors='strict', charset=None, errors=None, decode_responses=False, retry_on_timeout=False, ssl=False, ssl_keyfile=None, ssl_certfile=None, ssl_cert_reqs='required', ssl_ca_certs=None, ssl_check_hostname=False, max_connections=None, single_connection_client=False, health_check_interval=0, client_name=None, username=None):
python-redis-lock pip 包实现
- 参考:GitHub - ionelmc/python-redis-lock: Lock context manager implemented via redi…
- 模块:=redis_lock=
例子
1 2 3 4 5 6 7 8 9 10from redis import Redis conn = Redis() import redis_lock lock = redis_lock.Lock(conn, "name-of-the-lock") if lock.acquire(blocking=False): print("Got the lock.") lock.release() else: print("Someone else has the lock.")
文件锁
- fcntl 模块
- portalocker 模块
filelock 模块
文件夹锁
例子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17class LockDirectory: def __init__(self, directory): assert os.path.exists(directory) self.directory = directory def __enter__(self): self.dir_fd = os.open(self.directory, os.O_RDONLY) try: # fcntl.flock(self.dir_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) fcntl.flock(self.dir_fd, fcntl.LOCK_EX) except IOError as ex: raise Exception('Somebody else is locking %r - quitting.' % self.directory) return self def __exit__(self, exc_type, exc_val, exc_tb): # fcntl.flock(self.dir_fd,fcntl.LOCK_UN) os.close(self.dir_fd)
文章作者
上次更新 2022-03-07 (de34a70)