pyrate_limiter.buckets.redis_bucket module

Bucket implementation using Redis

class pyrate_limiter.buckets.redis_bucket.LuaScript

Bases: object

Scripts that deal with bucket operations

PUT_ITEM = "\n    local bucket = KEYS[1]\n    local now = ARGV[1]\n    local space_required = tonumber(ARGV[2])\n    local item_name = ARGV[3]\n    local rates_count = tonumber(ARGV[4])\n\n    for i=1,rates_count do\n        local offset = (i - 1) * 2\n        local interval = tonumber(ARGV[5 + offset])\n        local limit = tonumber(ARGV[5 + offset + 1])\n        local count = redis.call('ZCOUNT', bucket, now - interval, now)\n        local space_available = limit - tonumber(count)\n        if space_available < space_required then\n            return i - 1\n        end\n    end\n\n    local batch = {}\n    -- Each member adds two unpacked arguments; 1000 stays below Lua 5.1 limits.\n    local batch_size = 1000\n\n    for i=1,space_required do\n        batch[#batch + 1] = now\n        batch[#batch + 1] = item_name..i\n\n        if #batch == batch_size * 2 then\n            redis.call('ZADD', bucket, unpack(batch))\n            batch = {}\n        end\n    end\n\n    if #batch > 0 then\n        redis.call('ZADD', bucket, unpack(batch))\n    end\n\n    return -1\n    "
class pyrate_limiter.buckets.redis_bucket.RedisBucket(rates, redis, bucket_key, script_hash)

Bases: AbstractBucket

A bucket using redis for storing data - We are not using redis’ built-in TIME since it is non-deterministic - In distributed context, use local server time or a remote time server - Each bucket instance use a dedicated connection to avoid race-condition - can be either sync or async

bucket_key
count()

Count number of items in the bucket

flush()

Flush the whole bucket - Must remove failing-rate after flushing

classmethod init(rates, redis, bucket_key)
leak(current_timestamp=None)

leaking bucket - removing items that are outdated

Return type:

Union[int, Awaitable[int]]

now()

Retrieve current timestamp from the clock backend.

peek(index)

Peek at the rate-item at a specific index in latest-to-earliest order NOTE: The reason we cannot peek from the start of the queue(earliest-to-latest) is we can’t really tell how many outdated items are still in the queue

Return type:

Union[RateItem, None, Awaitable[Optional[RateItem]]]

put(item)

Add item to key

Return type:

Union[bool, Awaitable[bool]]

redis
script_hash