Source code for linuxnet.iptables.matches.conntrackmatch

# Copyright (c) 2021, 2022, 2023, 2024, Panagiotis Tsirigotis

# This file is part of linuxnet-iptables.
#
# linuxnet-iptables is free software: you can redistribute it and/or
# modify it under the terms of version 3 of the GNU Affero General Public
# License as published by the Free Software Foundation.
#
# linuxnet-iptables is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
# License for more details.
#
# You should have received a copy of the GNU Affero General
# Public License along with linuxnet-iptables. If not, see
# <https://www.gnu.org/licenses/>.

"""
This module provides matching against connection tracking attributes
"""

from ipaddress import IPv4Network, IPv4Address, IPv6Network
from typing import Iterable, Optional, Tuple, Union

from ..exceptions import IptablesParsingError
from ..deps import get_logger

from .match import Match, MatchParser
from .packetmatch import ProtocolCriterion, AddressCriterion
from .util import (
                        GenericCriterion,
                        GenericPositiveCriterion,
                        NumberOrRangeCriterion,
                    )

_logger = get_logger('linuxnet.iptables.matches.conntrackmatch')


[docs]class CtStateCriterion(GenericCriterion): """Compare against the connection tracking state The comparison value is a string. """ def __init__(self, match): super().__init__(match, '--ctstate')
[docs]class CtStatusCriterion(GenericCriterion): """Compare against the connection tracking status The comparison value is a string. """ def __init__(self, match): super().__init__(match, '--ctstatus')
[docs]class CtDirectionCriterion(GenericPositiveCriterion): """Compare against the connection tracking direction The comparison value is a string. """ def __init__(self, match): super().__init__(match, '--ctdir')
[docs]class CtOrigSrcCriterion(AddressCriterion): """Compare against the origin's source IP address. The comparison value is an :class:`IPv4Network` or an :class:`IPv6Network` """ def __init__(self, match: Match, *, ipv6: Optional[bool] =None): super().__init__(match, '--ctorigsrc', ipv6)
[docs]class CtOrigDstCriterion(AddressCriterion): """Compare against the origin's destination IP address. The comparison value is an :class:`IPv4Network` or an :class:`IPv6Network` """ def __init__(self, match: Match, *, ipv6: Optional[bool] =None): super().__init__(match, '--ctorigdst', ipv6)
[docs]class CtReplSrcCriterion(AddressCriterion): """Compare against the reply's source IP address. The comparison value is an :class:`IPv4Network` or an :class:`IPv6Network` """ def __init__(self, match: Match, *, ipv6: Optional[bool] =None): super().__init__(match, '--ctreplsrc', ipv6)
[docs]class CtReplDstCriterion(AddressCriterion): """Compare against the reply's destination IP address. The comparison value is an :class:`IPv4Network` or an :class:`IPv6Network` """ def __init__(self, match: Match, *, ipv6: Optional[bool] =None): super().__init__(match, '--ctrepldst', ipv6)
[docs]class CtOrigSrcPortCriterion(NumberOrRangeCriterion): """Compare against the origin's source port (or port range). The comparison value is the tuple (port, last_port) where last_port may be ``None`` """ def __init__(self, match: Match): super().__init__(match, '--ctorigsrcport', sep=':')
[docs]class CtOrigDstPortCriterion(NumberOrRangeCriterion): """Compare against the origin's destination port (or port range). The comparison value is the tuple (port, last_port) where last_port may be ``None`` """ def __init__(self, match: Match): super().__init__(match, '--ctorigdstport', sep=':')
[docs]class CtReplSrcPortCriterion(NumberOrRangeCriterion): """Compare against the reply's source port (or port range). The comparison value is the tuple (port, last_port) where last_port may be ``None`` """ def __init__(self, match: Match): super().__init__(match, '--ctreplsrcport', sep=':')
[docs]class CtReplDstPortCriterion(NumberOrRangeCriterion): """Compare against the origin's destination port (or port range). The comparison value is the tuple (port, last_port) where last_port may be ``None`` """ def __init__(self, match: Match): super().__init__(match, '--ctrepldstport', sep=':')
[docs]class CtExpireCriterion(NumberOrRangeCriterion): """Compare against the remaining lifetime of the connection tracking The comparison value is the tuple (time, end_time) where end_time may be ``None`` (time is measured in seconds) """ def __init__(self, match: Match): super().__init__(match, '--ctexpire', sep=':')
[docs]class ConntrackMatch(Match): # pylint: disable=too-many-instance-attributes """Match against the connection tracking attributes. """ def __init__(self): self.__ctstate_crit = None self.__ctstatus_crit = None self.__ctdir_crit = None self.__ctproto_crit = None self.__ctorigsrc_crit = None self.__ctorigdst_crit = None self.__ctreplsrc_crit = None self.__ctrepldst_crit = None self.__ctorigsrcport_crit = None self.__ctorigdstport_crit = None self.__ctreplsrcport_crit = None self.__ctrepldstport_crit = None self.__ctexpire_crit = None
[docs] @staticmethod def get_match_name() -> str: """Returns the **iptables(8)** match extension name """ return 'conntrack'
[docs] def get_criteria(self) -> Iterable['Criterion']: """Returns the conntrack match criteria: ctstate, ctstatus """ return (self.__ctstate_crit, self.__ctstatus_crit, self.__ctdir_crit, self.__ctproto_crit, self.__ctorigsrc_crit, self.__ctorigdst_crit, self.__ctreplsrc_crit, self.__ctrepldst_crit, self.__ctorigsrcport_crit, self.__ctorigdstport_crit, self.__ctreplsrcport_crit, self.__ctrepldstport_crit, self.__ctexpire_crit)
[docs] def ctstate(self) -> CtStateCriterion: """Match against the connection tracking state """ if self.__ctstate_crit is None: self.__ctstate_crit = CtStateCriterion(self) return self.__ctstate_crit
[docs] def ctstatus(self) -> CtStatusCriterion: """Matching against the connection tracking status """ if self.__ctstatus_crit is None: self.__ctstatus_crit = CtStatusCriterion(self) return self.__ctstatus_crit
[docs] def ctdir(self) -> CtDirectionCriterion: """Matching against the connection tracking status """ if self.__ctdir_crit is None: self.__ctdir_crit = CtDirectionCriterion(self) return self.__ctdir_crit
[docs] def ctproto(self) -> ProtocolCriterion: """Matching against the L4 protocol """ if self.__ctproto_crit is None: self.__ctproto_crit = ProtocolCriterion(self) return self.__ctproto_crit
[docs] def ctorigsrc(self) -> CtOrigSrcCriterion: """Matching against the origin's source IP address """ if self.__ctorigsrc_crit is None: self.__ctorigsrc_crit = CtOrigSrcCriterion(self) return self.__ctorigsrc_crit
[docs] def ctorigdst(self) -> CtOrigDstCriterion: """Matching against the origin's destination IP address """ if self.__ctorigdst_crit is None: self.__ctorigdst_crit = CtOrigDstCriterion(self) return self.__ctorigdst_crit
[docs] def ctreplsrc(self) -> CtReplSrcCriterion: """Matching against the reply's source IP address """ if self.__ctreplsrc_crit is None: self.__ctreplsrc_crit = CtReplSrcCriterion(self) return self.__ctreplsrc_crit
[docs] def ctrepldst(self) -> CtReplDstCriterion: """Matching against the reply's destination IP address """ if self.__ctrepldst_crit is None: self.__ctrepldst_crit = CtReplDstCriterion(self) return self.__ctrepldst_crit
[docs] def ctorigsrcport(self) -> CtOrigSrcPortCriterion: """Matching against the origin's source port """ if self.__ctorigsrcport_crit is None: self.__ctorigsrcport_crit = CtOrigSrcPortCriterion(self) return self.__ctorigsrcport_crit
[docs] def ctorigdstport(self) -> CtOrigDstPortCriterion: """Matching against the origin's destination port """ if self.__ctorigdstport_crit is None: self.__ctorigdstport_crit = CtOrigDstPortCriterion(self) return self.__ctorigdstport_crit
[docs] def ctreplsrcport(self) -> CtReplSrcPortCriterion: """Matching against the replin's source port """ if self.__ctreplsrcport_crit is None: self.__ctreplsrcport_crit = CtReplSrcPortCriterion(self) return self.__ctreplsrcport_crit
[docs] def ctrepldstport(self) -> CtReplDstPortCriterion: """Matching against the replin's destination port """ if self.__ctrepldstport_crit is None: self.__ctrepldstport_crit = CtReplDstPortCriterion(self) return self.__ctrepldstport_crit
[docs] def ctexpire(self) -> CtExpireCriterion: """Matching against the replin's destination port """ if self.__ctrepldstport_crit is None: self.__ctrepldstport_crit = CtReplDstPortCriterion(self) return self.__ctrepldstport_crit
@staticmethod def __parse_addr(field: str) -> Union[IPv4Network, IPv6Network]: """Parse the address reported by iptables into an IPv4Network object. """ if ':' in field: return IPv6Network(field) # Assume IPv4 if '/' in field: return IPv4Network(field) # # Old iptables versions do not report the netmask, so we have to # guess it, e.g. 10.10.0.0 implies 10.10.0.0/16. # This is clearly ambiguous and we only detect /16, /24, and /28 # addr = IPv4Address(field) num = (addr.packed[0] << 24 | addr.packed[1] << 16 | addr.packed[2] << 8 | addr.packed[3]) for prefix in (16, 24, 28): mask = (1 << (32-prefix)) - 1 if (num & mask) == 0: field += f'/{prefix}' break return IPv4Network(field) @staticmethod def __parse_range(field: str) -> Tuple[int, Optional[int]]: """Parse a string of the form <num>[:<num] into a tuple """ if ':' not in field: return (int(field), None) numfields = field.split(':') return (int(numfields[0]), int(numfields[1])) # pylint: disable=too-many-branches, too-many-statements @classmethod def parse(cls, parser: MatchParser) -> Match: """The conntrack match is not identified by name in the iptables output. Instead, the parameters appear by themselves. The first parameter has already been consumed. :meta private: """ criteria_iter = parser.get_iter() # Return the match_name and (optionally) negation to the iterator # so that we can process them as part of the for-loop below. # The for-loop is designed to handle all conntrack-related criteria # (which we expect to appear consecutively). # Because of the rewind, this method is now responsible for handling # StopIteration errors. parser.rewind_match() match = ConntrackMatch() criterion = None negation = None rewind = False # # The loop logic handles criteria that appear twice. This can happen # in the case of consecutive conntrack matches, e.g. # iptables -m conntrack --ctstate NEW -m conntrack --ctstate INVALID # for token in criteria_iter: try: if token == '!': negation = token is_equal = False criterion = next(criteria_iter) else: is_equal = True criterion = token if criterion == 'ctstate': crit = match.ctstate() if crit.is_set(): rewind = True break crit.compare(is_equal, next(criteria_iter)) elif criterion == 'ctstatus': crit = match.ctstatus() if crit.is_set(): rewind = True break crit.compare(is_equal, next(criteria_iter)) elif criterion == 'ctdir': crit = match.ctdir() if crit.is_set(): rewind = True break crit.compare(is_equal, next(criteria_iter)) elif criterion == 'ctproto': crit = match.ctproto() if crit.is_set(): rewind = True break crit.compare(is_equal, next(criteria_iter)) elif criterion == 'ctorigsrc': crit = match.ctorigsrc() if crit.is_set(): rewind = True break crit.compare(is_equal, cls.__parse_addr( next(criteria_iter))) elif criterion == 'ctorigdst': crit = match.ctorigdst() if crit.is_set(): rewind = True break crit.compare(is_equal, cls.__parse_addr( next(criteria_iter))) elif criterion == 'ctreplsrc': crit = match.ctreplsrc() if crit.is_set(): rewind = True break crit.compare(is_equal, cls.__parse_addr( next(criteria_iter))) elif criterion == 'ctrepldst': crit = match.ctrepldst() if crit.is_set(): rewind = True break crit.compare(is_equal, cls.__parse_addr( next(criteria_iter))) elif criterion == 'ctorigsrcport': crit = match.ctorigsrcport() if crit.is_set(): rewind = True break crit.compare(is_equal, *cls.__parse_range( next(criteria_iter))) elif criterion == 'ctorigdstport': crit = match.ctorigdstport() if crit.is_set(): rewind = True break crit.compare(is_equal, *cls.__parse_range( next(criteria_iter))) elif criterion == 'ctreplsrcport': crit = match.ctreplsrcport() if crit.is_set(): rewind = True break crit.compare(is_equal, *cls.__parse_range( next(criteria_iter))) elif criterion == 'ctrepldstport': crit = match.ctrepldstport() if crit.is_set(): rewind = True break crit.compare(is_equal, *cls.__parse_range( next(criteria_iter))) elif criterion == 'ctexpire': crit = match.ctexpire() if crit.is_set(): rewind = True break crit.compare(is_equal, *cls.__parse_range( next(criteria_iter))) else: rewind = True break criterion = None negation = None except StopIteration as stopiter: if negation is not None or criterion is not None: if criterion is None: raise IptablesParsingError( 'negation without criterion') from stopiter raise IptablesParsingError( f'no value for {criterion}') from stopiter if rewind: criteria_iter.put_back(criterion) if negation is not None: criteria_iter.put_back(negation) return match
# pylint: enable=too-many-branches, too-many-statements MatchParser.register_match('ctstate', ConntrackMatch) MatchParser.register_match('ctstatus', ConntrackMatch) MatchParser.register_match('ctproto', ConntrackMatch) MatchParser.register_match('ctorigsrc', ConntrackMatch) MatchParser.register_match('ctorigdst', ConntrackMatch) MatchParser.register_match('ctreplsrc', ConntrackMatch) MatchParser.register_match('ctrepldst', ConntrackMatch) MatchParser.register_match('ctorigsrcport', ConntrackMatch) MatchParser.register_match('ctorigdstport', ConntrackMatch) MatchParser.register_match('ctreplsrcport', ConntrackMatch) MatchParser.register_match('ctrepldstport', ConntrackMatch) MatchParser.register_match('ctdir', ConntrackMatch) MatchParser.register_match('ctexpire', ConntrackMatch)