Source code for linuxnet.iptables.matches.match

# Copyright (c) 2021, 2022, 2023, Panagiotis Tsirigotis

# This file is part of linuxnet-iptables.
#
# linuxnet-iptables is free software: you can redistribute it and/or
# modify it under the terms of version 3 of the GNU Affero General Public
# License as published by the Free Software Foundation.
#
# linuxnet-iptables is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
# License for more details.
#
# You should have received a copy of the GNU Affero General
# Public License along with linuxnet-iptables. If not, see
# <https://www.gnu.org/licenses/>.

"""
This module contains the base classes for implementing match-specific
subclasses:
        - Match
        - Criterion

A class derived from Match (with the exception of PacketMatch) corresponds
to an iptables(8) extension match module, with the options of that module
mapping to Criterion subclasses.
The PacketMatch class is also derived from Match and offers matching
against the common criteria (source/dest address etc.)
"""

from typing import Any, List, Optional, Tuple

from ..exceptions import IptablesError, IptablesParsingError
from ..deps import get_logger
from ..parsing import LookaheadIterator

_logger = get_logger('linuxnet.iptables.matches.match')


[docs]class Criterion: """ This class is used to *express* an **iptables(8)** match criterion; it does not perform any comparisons. :class:`Criterion` is a superclass that serves the following purposes: 1) it provides an :meth:`equals` method and a :meth:`not_equals` method to express comparison against a value 2) it keeps track of whether the criterion has been set; a criterion is set when either the :meth:`equals` or :meth:`not_equals` method is invoked; **a criterion may only** **be set once** 3) it keeps track of whether a criterion is negated or not 4) its :meth:`iptables_to_args` method generates the '!' **iptables(8)** argument, and also checks if the criterion was set The :meth:`equals`/:meth:`not_equals` methods of :class:`Criterion` subclasses **must** invoke the :meth:`_set_polarity` method of :class:`Criterion` to indicate the polarity of the test. These methods are also responsible for saving the comparison value in the subclass object. A :class:`Criterion` has an owner which is an object of a subclass of :class:`Match`. The :meth:`equals`/:meth:`not_equals` methods return this object to facilitate building a criteria list: :: match.crit1().equals('foo').crit2().not_equals('bar') """ def __init__(self, match: 'Match'): """ :param match: the :class:`Match` object that owns this ``Criterion`` """ self.__match: 'Match' = match self.__positive = None def __eq__(self, other: 'Criterion') -> bool: """Returns ``True`` iff: * both criteria are of the same type * both criteria are set or both criteria are not set * if both criteria are set, they have the same polarity, and the same value """ if not isinstance(other, type(self)): return False if self.is_set() ^ other.is_set(): return False if self.is_set(): # Both set, so compare boolean values return (self.is_positive() == other.is_positive() and self.get_value() == other.get_value()) # None set, so equal return True def __ne__(self, other: 'Criterion'): return not self.__eq__(other)
[docs] def is_set(self) -> bool: """Returns ``True`` if the criterion has been set """ return self.__positive is not None
[docs] def is_positive(self) -> bool: """Returns the 'polarity' of the criterion; ``True`` for :meth:`equals` or ``False`` for :meth:`not_equals` Raises :class:`IptablesError` if the criterion is not set """ if not self.is_set(): raise IptablesError('criterion not set') return self.__positive
def _may_be_equal(self, other: 'Criterion') -> bool: """This is a helper method for derived classes that choose to implement the __eq__ operator. Returns ``True`` iff: * both criteria are set or both criteria are not set * if both criteria are set, they have the same polarity """ if self.is_set() ^ other.is_set(): return False if self.is_set(): # Both set, so compare boolean values return self.is_positive() == other.is_positive() # None set, so equal return True
[docs] def get_value(self) -> Any: """Returns the value that this criterion is comparing against """ raise NotImplementedError
def _set_polarity(self, polarity: bool) -> 'Match': """Set the comparison polarity: - ``True`` : equality test - ``False`` : inequality test Raises an :class:`IptablesError` if the polarity is already set. Returns this object. """ if self.__positive is not None: raise IptablesError(f"attempt to modify {self.__class__.__name__}") self.__positive = polarity return self.__match
[docs] def equals(self, *args, **kwargs) -> 'Match': """Express equality comparison against the argument values. Subclasses will implement this method to express comparisons against a specific value (or values). These values will be the arguments of the subclass method and will be stored in the subclass object. Subclasses overriding this method should invoke the :meth:`_set_polarity` method of this class to set the polarity to ``True``. Returns this :class:`Match` object. """ raise NotImplementedError
[docs] def not_equals(self, *args, **kwargs) -> 'Match': """Express inequality comparison against the argument values. The arguments of this method are the same as those of the :meth:`equals` method. This method invokes the :meth:`equals` method and then reverses the polarity. Returns this :class:`Match` object. """ # # The implementation of this method works as-is and normally # subclasses should not need to override it. # # Subclasses overriding this method should invoke the # :meth:`_set_polarity` method of this class to set the polarity # to ``False``. # _ = self.equals(*args, **kwargs) self.__positive = False return self.__match
[docs] def compare(self, is_equal: bool, *args, **kwargs) -> 'Match': """Alternative method used for comparisons. It invokes :meth:`equals` (or :meth:`not_equals`) with ``args`` and ``kwargs`` if ``is_equal`` is ``True`` (or ``False``). """ if is_equal: # pylint: disable=no-else-return return self.equals(*args, **kwargs) else: return self.not_equals(*args, **kwargs)
def _crit_iptables_args(self) -> List[str]: """Returns a list of **iptables(8)** arguments for the criterion, except for polarity """ raise NotImplementedError def to_iptables_args(self) -> List[str]: """Returns a list of **iptables(8)** arguments This method should be invoked only for criteria that are set, i.e. the caller is expected to check with :meth:`Criterion.is_set` prior to invoking this method. :meta private: """ retval = [] if self.is_positive() else ['!'] retval += self._crit_iptables_args() return retval
[docs]class Match: """Parent class for all match-specific subclasses. """
[docs] @staticmethod def build_iptables_args(match_name: Optional[str], crit_iterable) -> List[str]: """Helper method to build an **iptables(8)** argument list from a match name and a list of :class:`Criterion` objects. **iptables(8)** arguments will be extracted from each criterion that is set. :param match_name: optional match name that will result in adding ``-m match_name`` at the beginning of the argument list; there must be at least one set criterion for the match name to be included in the return value :param crit_iterable: an iterable containing :class:`Criterion` objects """ args = [] for crit in crit_iterable: if crit is not None and crit.is_set(): args += crit.to_iptables_args() if not args: return args if not match_name: return args return ['-m', match_name] + args
[docs] def has_criteria(self) -> bool: """Returns ``True`` if the match has any criteria set """ return bool(self.to_iptables_args())
[docs] def to_iptables_args(self) -> List[str]: """Returns a list of **iptables(8)** arguments This method must be implemented by subclasses. """ raise NotImplementedError
def __eq__(self, other: 'Match'): """We rely on subclasses to define equality by value """ return self is other def __ne__(self, other: 'Match'): return not self.__eq__(other)
class MatchNone(Match): """This is a special class to indicate the absence of any :class:`Match` objects. This class is intended to be used for comparison purposes. """ def to_iptables_args(self) -> List[str]: """Returns a list of **iptables(8)** arguments """ return [] class CriteriaExhaustedError(Exception): """Exception raised to indicate that criteria parsing has completed """
[docs]class MatchParser: """This class handles match parsing """ # Key: string # Value: Match subclass _match_class_map = {} def __init__(self, field_iter: LookaheadIterator): self.__iter = field_iter self.__match_name = None self.__negation = None
[docs] def get_iter(self) -> LookaheadIterator: """Returns the field iterator """ return self.__iter
[docs] def get_match_name(self) -> Optional[str]: """Returns the match name, if any """ return self.__match_name
[docs] def get_negation(self) -> Optional[str]: """Returns the negation string, if any """ return self.__negation
[docs] @staticmethod def parse_value(value: str) -> Tuple[bool, str]: """Check if the specified value starts with '!' indicating negation. Returns the tuple (is_negative, value) where the optional '!' has been stripped from the argument 'value' """ is_equal = True if value[0] == '!': is_equal = False value = value[1:] return is_equal, value
[docs] def parse_next_value(self) -> Tuple[bool, str]: """Parse the next value from the iterator. Allow for the following syntax:: ! value (2 fields) !value (1 field) Returns the tuple (is_negative, value) """ value = next(self.__iter) if value == '!': return False, next(self.__iter) return self.parse_value(value)
[docs] def skip_field(self, expected: str): """Skip the next field, if it is equal to ``expected``. Otherwise, raise an :exc:`IptablesParsingError` exception. """ val = next(self.__iter) if val != expected: _logger.error("parsing '%s': expected '%s'; found '%s'", self.__match_name, expected, val) raise IptablesParsingError(f"missing '{expected}' field")
[docs] def rewind_match(self): """Returns the match name, and negation string if any, back to the iterator. """ if self.__match_name is None: raise IptablesParsingError('attempt to rewind with no match') self.__iter.put_back(self.__match_name) self.__match_name = None if self.__negation is not None: self.__iter.put_back(self.__negation) self.__negation = None
[docs] def parse_matches(self) -> List[Match]: """This method traverses the match part of the rule invoking the match-specific classes based on the name of the match. """ match_list = [] try: for token in self.__iter: # # Newer iptables versions have the '!' as a standalone field # instead of as a prefix of the value. # if token == '!': self.__negation = token self.__match_name = next(self.__iter) else: self.__match_name = token match = None klass = self._match_class_map.get(self.__match_name) if klass is not None: try: match = klass.parse(self) except CriteriaExhaustedError: pass if match is None: # We don't know if it is a match criterion that we don't # know about, a target name, or a target option. # Let the caller figure it out. self.__iter.put_back(self.__match_name) self.__match_name = None if self.__negation is not None: self.__iter.put_back(self.__negation) self.__negation = None break match_list.append(match) self.__match_name = None self.__negation = None except StopIteration as stopiter: if self.__match_name is not None: raise IptablesParsingError( 'insufficient number of values for ' f'match {self.__match_name}') from stopiter return match_list
[docs] @classmethod def register_match(cls, ident:str, klass) -> None: """Register the given class (which should be a subclass of the :class:`Match` class). The ``ident`` string is the match name that appears in the ``iptables -L`` output. """ cls._match_class_map[ident] = klass