Source code for linuxnet.iptables.matches.conntrackmatch

# Copyright (c) 2021, 2022, 2023, Panagiotis Tsirigotis

# This file is part of linuxnet-iptables.
#
# linuxnet-iptables is free software: you can redistribute it and/or
# modify it under the terms of version 3 of the GNU Affero General Public
# License as published by the Free Software Foundation.
#
# linuxnet-iptables is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
# License for more details.
#
# You should have received a copy of the GNU Affero General
# Public License along with linuxnet-iptables. If not, see
# <https://www.gnu.org/licenses/>.

"""
This module provides matching against connection tracking attributes
"""

from typing import List

from ..exceptions import IptablesParsingError
from ..deps import get_logger

from .match import Match, MatchParser
from .util import GenericCriterion

_logger = get_logger('linuxnet.iptables.matches.conntrackmatch')


[docs]class CtStateCriterion(GenericCriterion): """Compare against the connection tracking state The comparison value is a string. """ def __init__(self, match): super().__init__(match, '--ctstate')
[docs]class CtStatusCriterion(GenericCriterion): """Compare against the connection tracking status The comparison value is a string. """ def __init__(self, match): super().__init__(match, '--ctstatus')
[docs]class ConntrackMatch(Match): """Match against the connection tracking attributes. """ def __init__(self): self.__ctstate_crit = None self.__ctstatus_crit = None def __eq__(self, other): return (isinstance(other, ConntrackMatch) and self.ctstate() == other.ctstate() and self.ctstatus() == other.ctstatus() )
[docs] def ctstate(self) -> CtStateCriterion: """Match against the connection tracking state """ if self.__ctstate_crit is None: self.__ctstate_crit = CtStateCriterion(self) return self.__ctstate_crit
[docs] def ctstatus(self) -> CtStatusCriterion: """Matching against the connection tracking status """ if self.__ctstatus_crit is None: self.__ctstatus_crit = CtStatusCriterion(self) return self.__ctstatus_crit
[docs] def to_iptables_args(self) -> List[str]: """Returns **iptables(8)** arguments for this match """ return self.build_iptables_args('conntrack', [self.__ctstate_crit, self.__ctstatus_crit])
# pylint: disable=too-many-branches @classmethod def parse(cls, parser: MatchParser) -> Match: """Match against ctstate, ctstatus:: [!] ctstate <state> [!] ctstatus <status> The ctstate/ctstatus etc. (and the optional '!') has already been consumed. :meta private: """ criteria_iter = parser.get_iter() # Return the match_name and (optionally) negation to the iterator # so that we can process them as part of the for-loop below. # The for-loop is designed to handle all conntrack-related criteria # (which we expect to appear consecutively). # Because of the rewind, this method is now responsible for handling # StopIteration errors. parser.rewind_match() match = ConntrackMatch() criterion = None negation = None rewind = False for token in criteria_iter: try: if token == '!': negation = token is_equal = False criterion = next(criteria_iter) else: is_equal = True criterion = token if criterion == 'ctstate': crit = match.ctstate() if crit.is_set(): rewind = True break crit.compare(is_equal, next(criteria_iter)) elif criterion == 'ctstatus': crit = match.ctstatus() if crit.is_set(): rewind = True break crit.compare(is_equal, next(criteria_iter)) else: rewind = True break criterion = None negation = None except StopIteration as stopiter: if negation is not None or criterion is not None: if criterion is None: raise IptablesParsingError( 'negation without criterion') from stopiter raise IptablesParsingError( f'no value for {criterion}') from stopiter if rewind: criteria_iter.put_back(criterion) if negation is not None: criteria_iter.put_back(negation) return match
# pylint: enable=too-many-branches MatchParser.register_match('ctstate', ConntrackMatch) MatchParser.register_match('ctstatus', ConntrackMatch)