Source code for padlock.distributed.cassandra

import calendar
import datetime
from zope.interface import  implements
from zope.component import getUtility
from time_uuid import TimeUUID
from padlock import ILock
from padlock.distributed.retry_policy import IRetryPolicy

try:
    from pycassa import ConsistencyLevel, ColumnFamily, NotFoundException
except ImportError:
    # pycassa must be available for any of this to work
    ConsistencyLevel = ColumnFamily = NotFoundException = None


# args that we'll read from the keyword arguments and pass to the column family constructor
_cf_args = [
    'read_consistency_level',
    'write_consistency_level',
    'autopack_names',
    'autopack_values',
    'autopack_keys',
    'column_class_name',
    'super_column_name_class',
    'default_validation_class',
    'column_validators',
    'key_validation_class',
    'dict_class',
    'buffer_size',
    'column_bufer_size',
    'timestamp'
]


[docs]class BusyLockException(Exception): pass
[docs]class StaleLockException(Exception): pass
[docs]class CassandraDistributedRowLock(object): """ A lock that is implemented in the row of a Cassandra column family. Shamelessly lifted from: Netflix's Astynax library: https://github.com/Netflix/astyanax/blob/master/src/main/java/com/netflix/astyanax/recipes/locks/ColumnPrefixDistributedRowLock.java Here is their license agreement: Copyright 2011 Netflix Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ implements(ILock) def __init__(self, pool, column_family, key, **kwargs): self.pool = pool if isinstance(column_family, ColumnFamily): self.column_family = column_family else: cf_kwargs = {k: kwargs.get(k) for k in _cf_args if k in kwargs} self.column_family = ColumnFamily(self.pool, column_family, **cf_kwargs) self.key = key self.consistency_level = kwargs.get('consistency_level', ConsistencyLevel.LOCAL_QUORUM) self.prefix = kwargs.get('prefix', '_lock_') self.lock_id = kwargs.get('lock_id', str(TimeUUID.with_utcnow())) self.fail_on_stale_lock = kwargs.get('fail_on_stale_lock', False) self.lock_column = kwargs.get('lock_column', None) self.timeout = kwargs.get('timeout', 60.0) # seconds self.ttl = kwargs.get('ttl', None) self.backoff_policy = kwargs.get('backoff_policy', getUtility(IRetryPolicy, 'run_once')) self.allow_retry = kwargs.get('allow_retry', True) self.locks_to_delete = set()
[docs] def acquire(self): if self.ttl is not None: if self.timeout > self.ttl: raise ValueError("Timeout {} must be less than TTL {}".format(self.timeout, self.ttl)) retry = self.backoff_policy.duplicate() retry_count = 0 while True: try: cur_time = self.utcnow() mutation = self.column_family.batch() self.fill_lock_mutation(mutation, cur_time, self.ttl) mutation.send() self.verify_lock(cur_time) self.acquire_time = self.utcnow() return except BusyLockException, e: self.release() if not retry.allow_retry(): raise e retry_count += 1
[docs] def verify_lock(self, cur_time): if self.lock_column is None: raise ValueError("verify_lock() called without attempting to take the lock") cols = self.read_lock_columns() for k, v in cols.iteritems(): if v != 0 and cur_time > v: if self.fail_on_stale_lock: raise StaleLockException("Stale lock on row '{}'. Manual cleanup required.".format(self.key)) self.locks_to_delete.add(k) elif k != self.lock_column: raise BusyLockException("Lock already acquired for row '{}' with lock column '{}'".format(self.key, k))
[docs] def release(self): if not len(self.locks_to_delete) or self.lock_column is not None: mutation = self.column_family.batch() self.fill_release_mutation(mutation, False) mutation.send()
[docs] def read_lock_columns(self): res = {} try: cols = self.column_family.get(self.key, column_count=1e9) except NotFoundException: cols = {} for k, v in cols.iteritems(): res[k] = self.read_timeout_value(v) return res
[docs] def release_locks(self, force=False): locks = self.read_lock_columns() cols_to_remove = [] now = self.utcnow() for k, v in locks.iteritems(): if force or (v > 0 and v < now): cols_to_remove.add(k) self.column_family.batch().remove(self.key, cols_to_remove).send() return locks
[docs] def utcnow(self): d = datetime.datetime.utcnow() return long(calendar.timegm(d.timetuple())*1e6) + long(d.microsecond)
[docs] def fill_lock_mutation(self, mutation, time, ttl): if self.lock_column is not None: if self.lock_column != (self.prefix + self.lock_id): raise ValueError("Can't change prefix or lock_id after acquiring the lock") else: self.lock_column = self.prefix + self.lock_id if time is None: timeout_val = 0 else: timeout_val = time + long(self.timeout * 1e6) # convert self.timeout to microseconds kw = {} if ttl is not None: kw['ttl'] = ttl mutation.insert(self.key, {self.lock_column: self.generate_timeout_value(timeout_val)}, **kw) return self.lock_column
[docs] def generate_timeout_value(self, timeout_val): return repr(timeout_val)
[docs] def read_timeout_value(self, col): return long(col)
[docs] def fill_release_mutation(self, mutation, exclude_current_lock=False): cols_to_delete = [] for lock_col_name in self.locks_to_delete: cols_to_delete.append(lock_col_name) if not exclude_current_lock and self.lock_column is not None: cols_to_delete.append(self.lock_column) mutation.remove(self.key, cols_to_delete) self.locks_to_delete.clear() self.lock_column = None
def __enter__(self): self.acquire() return self def __exit__(self, exc_type, exc_val, exc_tb): self.release()