Source code for linuxnet.iptables.matches.util
# Copyright (c) 2021, 2022, 2023, Panagiotis Tsirigotis
# This file is part of linuxnet-iptables.
#
# linuxnet-iptables is free software: you can redistribute it and/or
# modify it under the terms of version 3 of the GNU Affero General Public
# License as published by the Free Software Foundation.
#
# linuxnet-iptables is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
# License for more details.
#
# You should have received a copy of the GNU Affero General
# Public License along with linuxnet-iptables. If not, see
# <https://www.gnu.org/licenses/>.
"""
This module provides helper classes for implementing Match/Criteria
subclasses.
"""
from typing import Any, List, Optional
from ..deps import get_logger
from ..exceptions import IptablesError
from .match import Criterion, Match
_logger = get_logger('linuxnet.iptables.matches.util')
[docs]class GenericCriterion(Criterion):
"""A helper class that can be used by all criteria that
correspond to **iptables(8)** options of the form "[!] option value",
for example, "[!] -p tcp"
"""
def __init__(self, match: Match, iptables_option: str):
super().__init__(match)
self.__value = None
self.__option = iptables_option
[docs] def get_iptables_option(self) -> str:
"""Returns the **iptables(8)** option
"""
return self.__option
[docs] def get_value(self) -> Any:
"""Returns the criterion value
"""
return self.__value
[docs] def equals(self, value) -> Match: # pylint: disable=arguments-differ
"""Compare with the specified value
"""
self.__value = value
return self._set_polarity(True)
def _crit_iptables_args(self) -> List[str]:
"""Convert to **iptables(8)** arguments
"""
return [self.__option, str(self.__value)]
[docs]class BooleanCriterion(Criterion):
"""Helper class for criteria that test single bits.
"""
def __init__(self, match: Match, iptables_option: str,
supports_negation=True):
"""
:param match: the :class:`Match` object that owns this ``Criterion``
:param iptables_option: the **iptables(8)** option to use for this
criterion
:param supports_negation: if ``True``, the criteria supports
the meth:not_equals method
"""
super().__init__(match)
self.__option = iptables_option
self.__supports_negation = supports_negation
[docs] def get_value(self) -> bool:
"""Returns the criterion value
"""
return self.is_positive()
[docs] def bit_set(self) -> Match:
"""Check if the bit is set
"""
return self.equals()
[docs] def bit_not_set(self) -> Match:
"""Check if the bit is set
"""
return self.not_equals()
[docs] def equals(self) -> Match: # pylint: disable=arguments-differ
"""Compare with the setting of the bit
"""
return self._set_polarity(True)
[docs] def not_equals(self) -> Match: # pylint: disable=arguments-differ
"""Express a test against the criterion being ``False``
"""
if not self.__supports_negation:
raise IptablesError(
f'{self.__option} does not support negation')
return super().not_equals()
def _crit_iptables_args(self) -> List[str]:
"""Convert to **iptables(8)** arguments
"""
return [self.__option]
class Rate: # pylint: disable=too-few-public-methods
"""Used to express rates
"""
def __init__(self, rate: int, interval: Optional[str] =None):
"""
:param rate: (integer) rate value
:param interval: optional (string) time interval (defaults to `PER_SEC`)
Possible interval values are:
"""
if interval is None:
interval = self.PER_SEC
elif interval not in (self.PER_DAY, self.PER_HOUR,
self.PER_MINUTE, self.PER_MIN, self.PER_SECOND, self.PER_SEC):
raise ValueError(f'bad time interval: {interval}')
if rate <= 0:
raise ValueError(f'bad rate: {rate}')
self.__rate = int(rate)
if interval == self.PER_MINUTE:
interval = self.PER_MIN
elif interval == self.PER_SECOND:
interval = self.PER_SEC
self.__interval = interval
#: day interval
PER_DAY = 'day'
#: hour interval
PER_HOUR = 'hour'
#: minute interval
PER_MIN = 'min'
#: minute interval (normalized to 'min')
PER_MINUTE = 'minute'
#: second interval
PER_SEC = 'sec'
#: second interval (normalized to 'sec')
PER_SECOND = 'second'
def get_rate(self) -> int:
"""Returns the rate value
"""
return self.__rate
def get_interval(self) -> str:
"""Returns the rate interval
"""
return self.__interval
def __eq__(self, other):
return (isinstance(other, Rate) and
self.__rate == other.get_rate() and
self.__interval == other.get_interval())
@classmethod
def str2rate(cls, spec: str) -> 'Rate':
"""Convert the ``spec`` string into a :class:`Rate` object.
The expected format is: ``num/interval`` (e.g. ``10/min``).
Raises a :exc:`ValueError` if ``spec`` cannot be parsed.
"""
fields = spec.split('/')
if len(fields) != 2:
raise ValueError(f"bad rate spec '{spec}'")
return cls(int(fields[0]), fields[1])
def __str__(self):
return f'{self.__rate}/{self.__interval}'