Source code for linuxnet.iptables.matches.tcpmatch

# Copyright (c) 2021, 2022, 2023, Panagiotis Tsirigotis

# This file is part of linuxnet-iptables.
#
# linuxnet-iptables is free software: you can redistribute it and/or
# modify it under the terms of version 3 of the GNU Affero General Public
# License as published by the Free Software Foundation.
#
# linuxnet-iptables is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
# License for more details.
#
# You should have received a copy of the GNU Affero General
# Public License along with linuxnet-iptables. If not, see
# <https://www.gnu.org/licenses/>.

"""
This module supports matching against TCP packets
"""

from enum import IntFlag
from typing import Iterable, List, Optional, Set, Tuple

from ..exceptions import IptablesError, IptablesParsingError
from ..deps import get_logger

from .match import Match, Criterion, MatchParser
from .util import GenericCriterion, NumberOrRangeCriterion

_logger = get_logger('linuxnet.iptables.matches.tcpmatch')


[docs]class TcpFlag(IntFlag): """Names and values for the TCP flags. """ #: ``FIN`` bit FIN = 0x1 #: ``SYN`` bit SYN = 0x2 #: ``RST`` bit RST = 0x4 #: ``PSH`` bit PSH = 0x8 #: ``ACK`` bit ACK = 0x10 #: ``URG`` bit URG = 0x20
[docs]class TcpFlagsCriterion(Criterion): """A criterion for comparing against packets with an arbitrary set of TCP flags set, or for comparing against ``SYN`` packets. This is an either-or use, determined at the time of object instantiation. The value is the tuple (flags-checked, flags-set); both flags-checked and flags-set are comma-separated lists of TCP flag names as defined in :class:`TcpFlag` """ def __init__(self, match: Match, syn_only=False): """ :param match: the :class:`Match` object that owns this object :param syn_only: optional boolean value indicating a check only against the ``SYN`` flag """ super().__init__(match) # If syn_only is True, then flags_checked/flags_set will be None self.__syn_only = syn_only self.__flags_checked = None self.__flags_set = None def __eq__(self, other): if not isinstance(other, TcpFlagsCriterion): return False if not self._may_be_equal(other): return False if self.is_syn_only() ^ other.is_syn_only(): return self._any or other._any return self.get_value() == other.get_value()
[docs] def get_value(self) -> Tuple[Set[TcpFlag], Set[TcpFlag]]: """Returns the value that the criterion is comparing against """ return (self.__flags_checked, self.__flags_set)
[docs] def is_syn_only(self) -> bool: """Returns ``True`` if the criterion is only meant to check for the SYN flag (but note that it may not be set yet) """ return self.__syn_only
[docs] def bit_set(self) -> Match: """This method can be used if this criterion implements a SYN-only comparison to check if the packet flags include only the SYN bit. """ if not self.__syn_only: raise IptablesError('not a syn-only criterion') return self.equals()
[docs] def bit_not_set(self) -> Match: """This method can be used if this criterion implements a SYN-only comparison to check for the non-existence of the SYN bit """ return self.not_equals()
[docs] def equals(self, # pylint: disable=arguments-differ flags_checked: Optional[Set[TcpFlag]] =None, flags_set: Optional[List[TcpFlag]] =None) -> Match: """Perform flags comparison """ if self.__syn_only: if not (flags_checked is None and flags_set is None): raise IptablesError("cannot set flags in SYN criterion") return self._set_polarity(True) if flags_checked is None: raise IptablesError("need to specify flags to check") if flags_set is None: raise IptablesError("need to specify flags that are set") self.__flags_checked = frozenset(flags_checked) self.__flags_set = frozenset(flags_set) return self._set_polarity(True)
def _crit_iptables_args(self) -> List[str]: """Returns **iptables(8)** arguments for the specified TCP flags """ if self.__syn_only: return ['--syn'] return ['--tcp-flags', ','.join([f.name for f in self.__flags_checked]), ','.join([f.name for f in self.__flags_set])]
[docs]class SourcePortCriterion(NumberOrRangeCriterion): """Compare with a source port or check for inclusion in port-range The value is the tuple (port, last_port) where last_port may be ``None`` """ def __init__(self, match: Match): super().__init__(match, '--sport', sep=':')
[docs]class DestPortCriterion(NumberOrRangeCriterion): """Compare against a destination port or check for inclusion in port-range The value is the tuple (port, last_port) where last_port may be ``None`` """ def __init__(self, match: Match): super().__init__(match, '--dport', sep=':')
[docs]class TcpOptionCriterion(GenericCriterion): """Compare against a TCP option number. The value is an integer. """ def __init__(self, match: Match): super().__init__(match, '--tcp-option')
class _PortParser: # pylint: disable=too-few-public-methods """Helper class used to parse TCP/UDP port criteria """ SOURCE_PORT_PREFIX = ('spt:', 'spts:') DEST_PORT_PREFIX = ('dpt:', 'dpts:') PORT_PREFIX = SOURCE_PORT_PREFIX + DEST_PORT_PREFIX @classmethod def parse(cls, port_match_str: str, match: Match): """Add the proper criterion to 'match' """ if port_match_str.startswith(cls.SOURCE_PORT_PREFIX): port_crit = match.source_port() else: port_crit = match.dest_port() port_spec = port_match_str.split(':', 1)[1] is_equal, port_spec = MatchParser.parse_value(port_spec) if ':' not in port_spec: port_crit.compare(is_equal, int(port_spec)) return ports = port_spec.split(':', 1) port_crit.compare(is_equal, int(ports[0]), int(ports[1]))
[docs]class TcpMatch(Match): """Match against the fields of the TCP header """ def __init__(self): self.__flags_crit = None self.__src_port_crit = None self.__dest_port_crit = None self.__tcp_option_crit = None
[docs] @staticmethod def get_match_name() -> str: """Returns the **iptables(8)** match extension name, in this case, ``tcp`` """ return 'tcp'
[docs] def get_criteria(self) -> Iterable['Criterion']: """Returns the TCP match criteria: flags, source-port, dest-port, tcp-option """ return (self.__flags_crit, self.__src_port_crit, self.__dest_port_crit, self.__tcp_option_crit)
[docs] def syn(self) -> TcpFlagsCriterion: """Criterion for matching against a SYN packet """ if self.__flags_crit is None: self.__flags_crit = TcpFlagsCriterion(self, syn_only=True) return self.__flags_crit
[docs] def tcp_flags(self) -> TcpFlagsCriterion: """Compare with TCP flags """ if self.__flags_crit is None: self.__flags_crit = TcpFlagsCriterion(self) return self.__flags_crit
[docs] def source_port(self) -> SourcePortCriterion: """Matching against the source port """ if self.__src_port_crit is None: self.__src_port_crit = SourcePortCriterion(self) return self.__src_port_crit
[docs] def dest_port(self) -> DestPortCriterion: """Match against the destination port """ if self.__dest_port_crit is None: self.__dest_port_crit = DestPortCriterion(self) return self.__dest_port_crit
[docs] def tcp_option(self) -> TcpOptionCriterion: """Match against a TCP option """ if self.__tcp_option_crit is None: self.__tcp_option_crit = TcpOptionCriterion(self) return self.__tcp_option_crit
@staticmethod def __parse_tcp_flags_num(numstr: int) -> Set[TcpFlag]: """Parse a hex-value numstr (e.g. 0x11) into a set of TCP flags. """ try: flag_mask = int(numstr, 16) flags = {flag for flag in TcpFlag if flag_mask & flag} return flags except ValueError as valerr: raise IptablesParsingError( "Bad TCP flag mask: " + numstr) from valerr @classmethod def parse(cls, parser: MatchParser) -> Match: """Parse the TCP criteria. The iptables output looks like this:: tcp dpt:20 option=!10 flags:0x17/0x02 The 'tcp' field has already been consumed. :meta private: """ criteria_iter = parser.get_iter() match = TcpMatch() for val in criteria_iter: if val.startswith('flags:'): flag_spec = val.split(':', 1)[1] is_equal, flag_spec = parser.parse_value(flag_spec) if '/' not in flag_spec: raise IptablesParsingError( f"no '/' in TCP flags: {flag_spec}") mask, comp = flag_spec.split('/', 1) flags_checked = cls.__parse_tcp_flags_num(mask) flags_set = cls.__parse_tcp_flags_num(comp) if (flags_set == {TcpFlag.SYN} and flags_checked == {TcpFlag.FIN, TcpFlag.SYN, TcpFlag.RST, TcpFlag.ACK}): match.syn().compare(is_equal) else: match.tcp_flags().compare(is_equal, flags_checked, flags_set) elif val.startswith(_PortParser.PORT_PREFIX): _PortParser.parse(val, match) elif val.startswith('option'): # The comparison looks either like 'option=10' or 'option=!10' is_equal, value = parser.parse_value(val[len('option')+1:]) match.tcp_option().compare(is_equal, int(value)) else: criteria_iter.put_back(val) break return match
MatchParser.register_match('tcp', TcpMatch)