Source code for linuxnet.iptables.matches.recentmatch

# Copyright (c) 2023, Panagiotis Tsirigotis

# This file is part of linuxnet-iptables.
#
# linuxnet-iptables is free software: you can redistribute it and/or
# modify it under the terms of version 3 of the GNU Affero General Public
# License as published by the Free Software Foundation.
#
# linuxnet-iptables is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
# License for more details.
#
# You should have received a copy of the GNU Affero General
# Public License along with linuxnet-iptables. If not, see
# <https://www.gnu.org/licenses/>.

"""
This module provides access to the ``recent`` match
"""

from enum import Enum
from ipaddress import IPv4Address
from typing import Iterable, List, Optional

from ..deps import get_logger
from ..exceptions import IptablesError, IptablesParsingError

from .match import Criterion, Match, MatchParser
from .util import GenericPositiveCriterion, BooleanCriterion

_logger = get_logger('linuxnet.iptables.matches.recentmatch')


[docs]class RecentMatchAction(Enum): """Actions for the ``recent`` match. The value of each is the corresponding iptables option. """ #: SET action SET = "--set" #: UPDATE action UPDATE = "--update" #: CHECK action CHECK = "--rcheck" #: REMOVE action REMOVE = "--remove"
[docs]class ActionCriterion(Criterion): """Specify the action to take """ def __init__(self, match: Match): super().__init__(match) self.__action = None
[docs] def get_value(self) -> RecentMatchAction: """Returns the action """ return self.__action
[docs] def equals(self, # pylint: disable=arguments-differ action: RecentMatchAction, *, match_if_found=True) -> Match: """ :param action: identifies the action to take :param match_if_found: if ``False``, when the packet address is present in the identified list, the ``recent`` match will cause the rule to **fail** to match the packet """ self.__action = action return self._set_polarity(match_if_found)
[docs] def not_equals(self, *args, **kwargs): """This criterion does not support inequality testing. This method will raise an :exc:`IptablesError` """ raise IptablesError("criterion does not support negation")
def _crit_iptables_args(self) -> List[str]: return [self.__action.value]
[docs]class AddressSelection(Enum): """Identify whether we are comparing against the packet's source or destination address """ #: select packet source address SOURCE_ADDRESS = '--rsource' #: select packet destination address DEST_ADDRESS = '--rdest'
[docs]class AddressSelectionCriterion(Criterion): """Compare against the packet's source or destination address """ def __init__(self, match: Match): super().__init__(match) self.__selection = None
[docs] def get_value(self) -> AddressSelection: """Returns the value of the criterion (identification of which packet address is selected) """ return self.__selection
[docs] def equals(self, # pylint: disable=arguments-differ selection: AddressSelection) -> Match: self.__selection = selection return self._set_polarity(True)
[docs] def not_equals(self, *args, **kwargs): """This criterion does not support inequality comparison. This method raises an :exc:`IptablesError` """ raise IptablesError("inequality comparison not supported")
def _crit_iptables_args(self) -> List[str]: """Returns **iptables(8)** arguments for the specified selection. """ if self.__selection is not None: return [self.__selection.value] return []
class MaskCriterion(Criterion): """Apply the specified mask. The value is an :class:`IPv4Address`. """ SKIP_MASK = IPv4Address('255.255.255.255') def __init__(self, match: Match): super().__init__(match) self.__mask = None def get_value(self) -> Optional[IPv4Address]: """Returns the mask """ return self.__mask def equals(self, mask) -> Match: # pylint: disable=arguments-differ """Use the specified source mask """ self.__mask = mask self._set_polarity(True) def not_equals(self, *args, **kwargs): """This :class:`Criterion` method is not supported """ raise IptablesError('method not_equals() not supported') def _crit_iptables_args(self) -> List[str]: """Returns **iptables(8)** arguments for the specified rate """ if self.__mask == self.SKIP_MASK: return [] return ['--mask', str(self.__mask)]
[docs]class RecentMatch(Match): """Match against list of IP addresses. """ #: SET action SET = RecentMatchAction.SET #: UPDATE action UPDATE = RecentMatchAction.UPDATE #: CHECK action CHECK = RecentMatchAction.CHECK #: REMOVE action REMOVE = RecentMatchAction.REMOVE #: select packet source address SOURCE_ADDRESS = AddressSelection.SOURCE_ADDRESS #: select packet destination address DEST_ADDRESS = AddressSelection.DEST_ADDRESS def __init__(self): self.__action_crit = None self.__name_crit = None self.__seconds_crit = None self.__hitcount_crit = None self.__same_ttl_crit = None self.__address_selection_crit = None self.__reap_crit = None self.__mask_crit = None
[docs] @staticmethod def get_match_name() -> str: """Returns the **iptables(8)** match extension name """ return 'recent'
[docs] def get_criteria(self) -> Iterable['Criterion']: """Returns the recent match criteria. """ # We create a temporary AddressSelectionCriterion with # the default match behavior. address_selection_crit = self.__address_selection_crit if address_selection_crit is None: address_selection_crit = AddressSelectionCriterion(self) address_selection_crit.equals(self.SOURCE_ADDRESS) # We create a temporary MaskCritetion with the default mask mask_crit = self.__mask_crit if mask_crit is None: mask_crit = MaskCriterion(self) mask_crit.equals(MaskCriterion.SKIP_MASK) return (self.__action_crit, self.__name_crit, self.__seconds_crit, self.__hitcount_crit, self.__same_ttl_crit, address_selection_crit, self.__reap_crit, mask_crit)
[docs] def name(self) -> GenericPositiveCriterion: """Identify the list name """ if self.__name_crit is None: self.__name_crit = GenericPositiveCriterion(self, '--name') return self.__name_crit
[docs] def action(self) -> ActionCriterion: """Identify the action """ if self.__action_crit is None: self.__action_crit = ActionCriterion(self) return self.__action_crit
[docs] def address_selection(self) -> AddressSelectionCriterion: """Identify the address selection """ if self.__address_selection_crit is None: self.__address_selection_crit = AddressSelectionCriterion(self) return self.__address_selection_crit
[docs] def seconds(self) -> GenericPositiveCriterion: """Specify number of seconds """ if self.__seconds_crit is None: self.__seconds_crit = GenericPositiveCriterion(self, '--seconds') return self.__seconds_crit
[docs] def hitcount(self) -> GenericPositiveCriterion: """Specify a hitcount """ if self.__hitcount_crit is None: self.__hitcount_crit = GenericPositiveCriterion(self, '--hitcount') return self.__hitcount_crit
[docs] def same_ttl(self) -> BooleanCriterion: """Specify same-TTL comparison. """ if self.__same_ttl_crit is None: self.__same_ttl_crit = BooleanCriterion(self, "--rttl", supports_negation=False) return self.__same_ttl_crit
[docs] def reap(self) -> BooleanCriterion: """Specify old address reaping """ if self.__reap_crit is None: self.__reap_crit = BooleanCriterion(self, "--reap", supports_negation=False) return self.__reap_crit
[docs] def mask(self) -> MaskCriterion: """Specify a source mask """ if self.__mask_crit is None: self.__mask_crit = MaskCriterion(self) return self.__mask_crit
# pylint: disable=too-many-branches @classmethod def parse(cls, parser: MatchParser) -> Match: """Possible output:: recent: CHECK TTL-Match name: foobar side: source LOG flags 0 level 4 !recent: SET name: foobar side: source LOG flags 0 level 4 !recent: UPDATE seconds: 4 hit_count: 3 name: foobar side: source recent: REMOVE name: foobar side: dest/* silly */ LOG flags 0 level 4 The 'recent' part is already consumed. The parser has also recorded if there was a '!' present. Note the output bug when the value of 'side:' is 'dest'. We look for this and put-back the rest of the string. :meta private: """ criteria_iter = parser.get_iter() match_if_found = parser.get_negation() is None match = RecentMatch() # # The action is always present at the beginning # actionstr = next(criteria_iter) if actionstr == 'CHECK': match.action().equals(cls.CHECK, match_if_found=match_if_found) elif actionstr == 'SET': match.action().equals(cls.SET, match_if_found=match_if_found) elif actionstr == 'UPDATE': match.action().equals(cls.UPDATE, match_if_found=match_if_found) elif actionstr == 'REMOVE': match.action().equals(cls.REMOVE, match_if_found=match_if_found) else: raise IptablesParsingError( f'recent match unexpected action: {actionstr}') for val in criteria_iter: if val == 'TTL-Match': match.same_ttl().equals() elif val == 'name:': match.name().equals(next(criteria_iter)) elif val == 'hit_count:': match.hitcount().equals(int(next(criteria_iter))) elif val == 'seconds:': match.seconds().equals(int(next(criteria_iter))) elif val == 'side:': address_selection = next(criteria_iter) if address_selection == 'source': match.address_selection().equals(cls.SOURCE_ADDRESS) elif address_selection.startswith('dest'): match.address_selection().equals(cls.DEST_ADDRESS) rest = address_selection[4:] if rest: criteria_iter.put_back(rest, replace_token=True) elif val == 'reap': match.reap().equals() elif val == 'mask:': match.mask().equals(IPv4Address(next(criteria_iter))) else: criteria_iter.put_back(val) break return match
# pylint: enable=too-many-branches MatchParser.register_match('recent:', RecentMatch)