pyrate_limiter.abstracts.bucket module

Implement this class to create a workable bucket for Limiter to use

class pyrate_limiter.abstracts.bucket.AbstractBucket

Bases: ABC

Base bucket interface Assumption: len(rates) always > 0 TODO: allow empty rates

close()

Release any resources held by the bucket.

Subclasses may override this method to perform any necessary cleanup (e.g., closing files, network connections, or releasing locks) when the bucket is no longer needed.

Return type:

None

abstractmethod count()

Count number of items in the bucket

Return type:

Union[int, Awaitable[int]]

failing_rate = None
abstractmethod flush()

Flush the whole bucket - Must remove failing-rate after flushing

Return type:

Optional[Awaitable[None]]

is_async = None
abstractmethod leak(current_timestamp=None)

leaking bucket - removing items that are outdated

Return type:

Union[int, Awaitable[int]]

limiter_lock()

An additional lock to be used by Limiter in-front of the thread lock. Intended for multiprocessing environments where a thread lock is insufficient.

Return type:

Optional[object]

now()

Retrieve current timestamp from the clock backend.

abstractmethod peek(index)

Peek at the rate-item at a specific index in latest-to-earliest order NOTE: The reason we cannot peek from the start of the queue(earliest-to-latest) is we can’t really tell how many outdated items are still in the queue

Return type:

Union[RateItem, None, Awaitable[Optional[RateItem]]]

abstractmethod put(item)

Put an item (typically the current time) in the bucket return true if successful, otherwise false

Return type:

Union[bool, Awaitable[bool]]

property rates
waiting(item)

Calculate time until bucket become availabe to consume an item again

Return type:

Union[int, Awaitable[int]]

class pyrate_limiter.abstracts.bucket.BucketFactory

Bases: ABC

Asbtract BucketFactory class. It is reserved for user to implement/override this class with his own bucket-routing/creating logic

close()
Return type:

None

create(bucket_class, *args, **kwargs)

Creating a bucket dynamically

Return type:

AbstractBucket

dispose(bucket)

Delete a bucket from the factory

Return type:

bool

abstractmethod get(item)

Get the corresponding bucket to this item

Return type:

Union[AbstractBucket, Awaitable[AbstractBucket]]

get_buckets()

Iterator over all buckets in the factory

Return type:

List[AbstractBucket]

property leak_interval

Retrieve leak-interval from inner Leaker task

schedule_leak(new_bucket)

Schedule all the buckets’ leak, reset bucket’s failing rate

Return type:

None

abstractmethod wrap_item(name, weight=1)

Add the current timestamp to the receiving item using any clock backend - Turn it into a RateItem - Can return either a coroutine or a RateItem instance

Return type:

Union[RateItem, Awaitable[RateItem]]

class pyrate_limiter.abstracts.bucket.Leaker(leak_interval)

Bases: object

Responsible for scheduling buckets’ leaking at the background either through a daemon thread (for sync buckets) or a task using asyncio.Task.

The sync worker is an encapsulated daemon thread rather than self (this class used to subclass Thread). A Thread can only be started once, so when the worker exits after every sync bucket has been disposed, re-registering a bucket and calling start() again would raise RuntimeError: threads can only be started once (issue #301). Holding the thread as an attribute lets start() spin up a fresh one on demand.

aio_leak_task = None
async_buckets
close()
deregister(bucket_id)

Deregister a bucket

Return type:

bool

is_alive()

Whether the sync-leak worker thread is currently running.

Return type:

bool

leak_async()
leak_interval = 10000
name = "PyrateLimiter's Leaker"
register(bucket)

Register a new bucket, routing it to the sync or async leak loop.

Prefers the bucket’s declared is_async; only falls back to the side-effecting leak(0) probe when that is None (issue #305).

start()

Start (or restart) the daemon thread that leaks sync buckets.

The worker exits once every sync bucket is disposed; a finished Thread cannot be restarted, so we create a fresh one here instead of re-starting the dead one (which would raise RuntimeError).

Return type:

None

sync_buckets