Source code for linuxnet.iptables.matches.limitmatch
# Copyright (c) 2021, 2022, 2023, Panagiotis Tsirigotis
# This file is part of linuxnet-iptables.
#
# linuxnet-iptables is free software: you can redistribute it and/or
# modify it under the terms of version 3 of the GNU Affero General Public
# License as published by the Free Software Foundation.
#
# linuxnet-iptables is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
# License for more details.
#
# You should have received a copy of the GNU Affero General
# Public License along with linuxnet-iptables. If not, see
# <https://www.gnu.org/licenses/>.
"""
This module provides matching against rate limits
"""
from typing import Iterable, List
from ..exceptions import IptablesError
from ..deps import get_logger
from .match import Match, Criterion, MatchParser
from .util import GenericCriterion
from .util import Rate as UtilRate
_logger = get_logger('linuxnet.iptables.matches.limitmatch')
[docs]class RateLimitCriterion(Criterion):
"""Compare with a rate limit
The comparison value is a :class:`LimitMatch.Rate` object
"""
def __init__(self, match: Match):
super().__init__(match)
self.__rate = None
[docs] def get_value(self) -> 'LimitMatch.Rate':
"""Returns the value that the criterion is comparing against
"""
return self.__rate
[docs] def equals(self, # pylint: disable=arguments-differ
rate: UtilRate) -> Match:
"""Compare with the specified rate (a :class:`LimitMatch.Rate`
object)
"""
self.__rate = rate
return self._set_polarity(True)
[docs] def not_equals(self, *args, **kwargs):
"""This :class:`Criterion` method is not supported because the
limit match does not support '!'
"""
raise IptablesError('method not_equals() not supported')
def _crit_iptables_args(self) -> List[str]:
"""Returns **iptables(8)** arguments for the specified rate
"""
return ['--limit', str(self.__rate)]
[docs]class BurstCriterion(GenericCriterion):
"""Compare with the burst limit
The comparison value is an integer
"""
def __init__(self, match: Match):
super().__init__(match, '--limit-burst')
[docs] def not_equals(self, *args, **kwargs):
"""This :class:`Criterion` method is not supported because the
limit match does not support '!'
"""
raise IptablesError('method not_equals() not supported')
[docs]class LimitMatch(Match):
"""Match against a rate limit with a maximum burst
"""
# Make this available to users
Rate = UtilRate
def __init__(self):
self.__rate_limit_crit = None
self.__limit_burst_crit = None
[docs] @staticmethod
def get_match_name() -> str:
"""Returns the **iptables(8)** match extension name
"""
return 'limit'
[docs] def get_criteria(self) -> Iterable['Criterion']:
"""Returns the limit match criteria: rate-limit, burst
Note that the burst criterion will be ``None`` if the rate-limit
criterion has not been set.
"""
if (self.__rate_limit_crit is not None and
self.__rate_limit_crit.is_set()):
burst_crit = self.__limit_burst_crit
else:
burst_crit = None
return (self.__rate_limit_crit, burst_crit)
[docs] def limit(self) -> RateLimitCriterion:
"""Compare with the rate limit
"""
if self.__rate_limit_crit is None:
self.__rate_limit_crit = RateLimitCriterion(self)
return self.__rate_limit_crit
[docs] def burst(self) -> BurstCriterion:
"""Compare with the burst limit
"""
if self.__limit_burst_crit is None:
self.__limit_burst_crit = BurstCriterion(self)
return self.__limit_burst_crit
@classmethod
def parse(cls, parser: MatchParser) -> Match:
"""Parse limit match::
limit: avg <num>/<interval> burst <num>
The 'limit:' has already been consumed.
:meta private:
"""
parser.skip_field('avg')
criteria_iter = parser.get_iter()
match = LimitMatch()
rate_spec = next(criteria_iter)
match.limit().equals(cls.Rate.str2rate(rate_spec))
parser.skip_field('burst')
burst = int(next(criteria_iter))
return match.burst().equals(burst)
MatchParser.register_match('limit:', LimitMatch)