Source code for linuxnet.iptables.matches.util
# Copyright (c) 2021, 2022, 2023, Panagiotis Tsirigotis
# This file is part of linuxnet-iptables.
#
# linuxnet-iptables is free software: you can redistribute it and/or
# modify it under the terms of version 3 of the GNU Affero General Public
# License as published by the Free Software Foundation.
#
# linuxnet-iptables is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
# License for more details.
#
# You should have received a copy of the GNU Affero General
# Public License along with linuxnet-iptables. If not, see
# <https://www.gnu.org/licenses/>.
"""
This module provides helper classes for implementing Match/Criteria
subclasses.
"""
from typing import Any, List, Optional, Tuple
from ..deps import get_logger
from ..exceptions import IptablesError
from .match import Criterion, Match
_logger = get_logger('linuxnet.iptables.matches.util')
[docs]class GenericCriterion(Criterion):
"""A helper class that can be used by all criteria that
correspond to **iptables(8)** options of the form "[!] option value",
for example, "[!] -p tcp"
"""
def __init__(self, match: Match, iptables_option: str, norm=None):
"""
:param match: the owner :class:`Match`
:param iptables_option: the **iptables(8)** option to use when
generating the iptables arguments
:param norm: an optional callable that normalizes the value
that we are comparing against
"""
super().__init__(match)
self.__value = None
self.__option = iptables_option
self.__norm = norm
[docs] def get_iptables_option(self) -> str:
"""Returns the **iptables(8)** option
"""
return self.__option
[docs] def get_value(self) -> Any:
"""Returns the criterion value
"""
return self.__value
[docs] def equals(self, value) -> Match: # pylint: disable=arguments-differ
"""Compare with the specified value
"""
if self.__norm is None:
self.__value = value
else:
self.__value = self.__norm(value)
return self._set_polarity(True)
def _crit_iptables_args(self) -> List[str]:
"""Convert to **iptables(8)** arguments
"""
return [self.__option, str(self.__value)]
[docs]class GenericPositiveCriterion(GenericCriterion):
"""A subclass of :class:`GenericCriterion` for criteria that do
not support negation.
"""
[docs] def not_equals(self, *args, **kwargs):
"""Indicates that negation is not supported by raising
an :exc:`IptablesError`
"""
raise IptablesError(
f'{self.get_iptables_option()} does not support negation')
[docs]class BooleanCriterion(Criterion):
"""Helper class for criteria that test single bits.
"""
def __init__(self, match: Match, iptables_option: str,
supports_negation=True):
"""
:param match: the :class:`Match` object that owns this ``Criterion``
:param iptables_option: the **iptables(8)** option to use for this
criterion
:param supports_negation: if ``True``, the criteria supports
the meth:not_equals method
"""
super().__init__(match)
self.__option = iptables_option
self.__supports_negation = supports_negation
[docs] def get_value(self) -> bool:
"""Returns the criterion value
"""
return self.is_positive()
[docs] def bit_set(self) -> Match:
"""Check if the bit is set
"""
return self.equals()
[docs] def bit_not_set(self) -> Match:
"""Check if the bit is set
"""
return self.not_equals()
[docs] def equals(self) -> Match: # pylint: disable=arguments-differ
"""Compare with the setting of the bit
"""
return self._set_polarity(True)
[docs] def not_equals(self) -> Match: # pylint: disable=arguments-differ
"""Express a test against the criterion being ``False``
"""
if not self.__supports_negation:
raise IptablesError(
f'{self.__option} does not support negation')
return super().not_equals()
def _crit_iptables_args(self) -> List[str]:
"""Convert to **iptables(8)** arguments
"""
return [self.__option]
class NumberOrRangeCriterion(Criterion):
"""Compare against a number or a number range
"""
def __init__(self, match: Match, iptables_option: str, *, sep: str):
super().__init__(match)
self.__option = iptables_option
self.__sep = sep
self.__first = None
self.__last = None
def get_value(self) -> Tuple[int, Optional[int]]:
"""Returns the value that the criterion is comparing against
:rtype: a tuple of (int, int|None)
"""
return (self.__first, self.__last)
def equals(self, # pylint: disable=arguments-differ
first: int, last: Optional[int] =None) -> Match:
"""Compare with a number (or inclusion in number-range if ``last``
is present)
"""
self.__first = first
self.__last = last
return self._set_polarity(True)
def _crit_iptables_args(self) -> List[str]:
"""Returns **iptables(8)** arguments for the specified port(s)
"""
range_spec = str(self.__first)
if self.__last is not None:
range_spec += f'{self.__sep}{self.__last}'
return [self.__option, range_spec]
class Rate: # pylint: disable=too-few-public-methods
"""Used to express rates
"""
def __init__(self, rate: int, interval: Optional[str] =None):
"""
:param rate: (integer) rate value
:param interval: optional (string) time interval (defaults to `PER_SEC`)
Possible interval values are:
"""
if interval is None:
interval = self.PER_SEC
elif interval not in (self.PER_DAY, self.PER_HOUR,
self.PER_MINUTE, self.PER_MIN, self.PER_SECOND, self.PER_SEC):
raise ValueError(f'bad time interval: {interval}')
if rate <= 0:
raise ValueError(f'bad rate: {rate}')
self.__rate = int(rate)
if interval == self.PER_MINUTE:
interval = self.PER_MIN
elif interval == self.PER_SECOND:
interval = self.PER_SEC
self.__interval = interval
#: day interval
PER_DAY = 'day'
#: hour interval
PER_HOUR = 'hour'
#: minute interval
PER_MIN = 'min'
#: minute interval (normalized to 'min')
PER_MINUTE = 'minute'
#: second interval
PER_SEC = 'sec'
#: second interval (normalized to 'sec')
PER_SECOND = 'second'
def get_rate(self) -> int:
"""Returns the rate value
"""
return self.__rate
def get_interval(self) -> str:
"""Returns the rate interval
"""
return self.__interval
def __eq__(self, other):
return (isinstance(other, Rate) and
self.__rate == other.get_rate() and
self.__interval == other.get_interval())
@classmethod
def str2rate(cls, spec: str) -> 'Rate':
"""Convert the ``spec`` string into a :class:`Rate` object.
The expected format is: ``num/interval`` (e.g. ``10/min``).
Raises a :exc:`ValueError` if ``spec`` cannot be parsed.
"""
fields = spec.split('/')
if len(fields) != 2:
raise ValueError(f"bad rate spec '{spec}'")
return cls(int(fields[0]), fields[1])
def __str__(self):
return f'{self.__rate}/{self.__interval}'