Source code for linuxnet.iptables.matches.setmatch

# Copyright (c) 2023, Panagiotis Tsirigotis

# This file is part of linuxnet-iptables.
#
# linuxnet-iptables is free software: you can redistribute it and/or
# modify it under the terms of version 3 of the GNU Affero General Public
# License as published by the Free Software Foundation.
#
# linuxnet-iptables is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
# License for more details.
#
# You should have received a copy of the GNU Affero General
# Public License along with linuxnet-iptables. If not, see
# <https://www.gnu.org/licenses/>.

"""
This module provides matching against IP sets defined by ipset(8)
"""

from typing import Iterable, List, Tuple

from ..deps import get_logger

from .match import Match, Criterion, MatchParser
from .util import BooleanCriterion

_logger = get_logger('linuxnet.iptables.matches.setmatch')


[docs]class MatchSetCriterion(Criterion): """Match against a set defined by **ipset(8)** """ def __init__(self, match: Match): super().__init__(match) self.__name = None self.__flags = None
[docs] def get_value(self) -> Tuple[str, str]: """The value is the tuple (set-name, flags) :rtype: tuple of ``(str, str)`` """ return (self.__name, self.__flags)
[docs] def equals(self, # pylint: disable=arguments-differ name: str, flags: str) -> Match: """Check against the specified ipset name using the specified flags :param name: **ipset(8)** name :param flags: comma-separated list of **src** and/or **dst** specifications """ self.__name = name self.__flags = flags return self._set_polarity(True)
def _crit_iptables_args(self) -> List[str]: """Returns **iptables(8)** arguments for the specified comparison """ return ["--match-set", self.__name, self.__flags]
class _CounterCriterion(Criterion): """A criterion for a counter value comparison. """ _EQ_COMP = '==' _LT_COMP = '<' _GT_COMP = '>' def __init__(self, match: Match, unit_name: str): super().__init__(match) self.__unit = unit_name self.__value = None self.__comp = None def get_value(self) -> Tuple[int, str]: """Returns the value that the criterion is comparing against and the comparison operation (as a string) :rtype: tuple of ``(int, str)`` """ return (self.__value, self.__comp) def equals(self, value: int) -> Match: # pylint: disable=arguments-differ """Check if the counter is equal to ``value`` :param value: the counter value """ self.__value = value self.__comp = self._EQ_COMP return self._set_polarity(True) def less_than(self, value: int) -> Match: """Check if the counter is less than ``value`` :param value: the counter value """ self.__value = value self.__comp = self._LT_COMP return self._set_polarity(True) def greater_than(self, value: int) -> Match: """Check if the counter is greater than ``value`` :param value: the counter value """ self.__value = value self.__comp = self._GT_COMP return self._set_polarity(True) def _crit_iptables_args(self) -> List[str]: """Returns **iptables(8)** arguments for the specified comparison """ if self.__comp == self._EQ_COMP: return [f"--{self.__unit}-eq", str(self.__value)] if self.__comp == self._LT_COMP: return [f"--{self.__unit}-lt", str(self.__value)] return [f"--{self.__unit}-gt", str(self.__value)]
[docs]class PacketCounterCriterion(_CounterCriterion): """A criterion for a packet counter value comparison used by :class:`SetMatch`. """ def __init__(self, match: Match): super().__init__(match, "packets")
[docs]class ByteCounterCriterion(_CounterCriterion): """A criterion for a byte counter value comparison used by :class:`SetMatch`. """ def __init__(self, match: Match): super().__init__(match, "bytes")
[docs]class SetMatch(Match): """Match against IP sets defined via **ipset(8)** """ def __init__(self): self.__match_set_crit = None self.__return_nomatch_crit = None self.__update_counters_crit = None self.__update_subcounters_crit = None self.__packet_counter_crit = None self.__byte_counter_crit = None
[docs] @staticmethod def get_match_name(): """Returns the **iptables(8)** match extension name """ return 'set'
[docs] def get_criteria(self) -> Iterable[Criterion]: """Returns the set match criteria. """ return (self.__match_set_crit, self.__return_nomatch_crit, self.__update_counters_crit, self.__update_subcounters_crit, self.__packet_counter_crit, self.__byte_counter_crit,)
[docs] def match_set(self) -> MatchSetCriterion: """Returns the criterion to identify the IPset and flags """ if self.__match_set_crit is None: self.__match_set_crit = MatchSetCriterion(self) return self.__match_set_crit
[docs] def return_nomatch(self) -> BooleanCriterion: """Specify the ``--return-nomatch`` option. """ if self.__return_nomatch_crit is None: self.__return_nomatch_crit = BooleanCriterion(self, "--return-nomatch", supports_negation=False) return self.__return_nomatch_crit
[docs] def update_counters(self) -> BooleanCriterion: """Specify update of packet/byte counters """ if self.__update_counters_crit is None: self.__update_counters_crit = BooleanCriterion(self, "--update-counters") return self.__update_counters_crit
[docs] def update_subcounters(self) -> BooleanCriterion: """Specify update of packet/byte counters of the matching element in the member set of a list type """ if self.__update_subcounters_crit is None: self.__update_subcounters_crit = BooleanCriterion(self, "--update-subcounters") return self.__update_subcounters_crit
[docs] def packet_counter(self) -> PacketCounterCriterion: """Returns the criterion comparing against the set's packet counter """ if self.__packet_counter_crit is None: self.__packet_counter_crit = PacketCounterCriterion(self) return self.__packet_counter_crit
[docs] def byte_counter(self) -> ByteCounterCriterion: """Returns the criterion comparing against the set's byte counter """ if self.__byte_counter_crit is None: self.__byte_counter_crit = ByteCounterCriterion(self) return self.__byte_counter_crit
@classmethod def parse(cls, # pylint: disable=too-many-branches parser: MatchParser) -> Match: """Possible output:: match-set foo6 src,src,dst,dst,src,dst match-set foo6 dst,src return-nomatch match-set foo6 dst,src return-nomatch packets-eq 10 match-set foo6 dst,src return-nomatch packets-eq 10 ! bytes-eq 512 match-set foo6 dst,src ! update-counters ! match-set foo6 dst,src ! update-counters ! update-subcounters match-set foo6 dst,src return-nomatch packets-lt 10 bytes-gt 512 :meta private: """ criteria_iter = parser.get_iter() positive_match = parser.get_negation() is None match = SetMatch() set_name = next(criteria_iter) flags = next(criteria_iter) match.match_set().compare(positive_match, set_name, flags) for val in criteria_iter: positive_match = True if val == '!': positive_match = False val = next(criteria_iter) if val == 'return-nomatch': match.return_nomatch().equals() elif val == 'update-counters': match.update_counters().compare(positive_match) elif val == 'update-subcounters': match.update_subcounters().compare(positive_match) elif val == 'packets-eq': match.packet_counter().compare(positive_match, int(next(criteria_iter))) elif val == 'packets-lt': match.packet_counter().less_than(int(next(criteria_iter))) elif val == 'packets-gt': match.packet_counter().greater_than(int(next(criteria_iter))) elif val == 'bytes-eq': match.byte_counter().compare(positive_match, int(next(criteria_iter))) elif val == 'bytes-lt': match.byte_counter().less_than(int(next(criteria_iter))) elif val == 'bytes-gt': match.byte_counter().greater_than(int(next(criteria_iter))) else: criteria_iter.put_back(val) if not positive_match: criteria_iter.rewind() break return match
MatchParser.register_match('match-set', SetMatch)