# Copyright (c) 2021, 2022, Panagiotis Tsirigotis
# This file is part of linuxnet-iptables.
#
# linuxnet-iptables is free software: you can redistribute it and/or
# modify it under the terms of version 3 of the GNU Affero General Public
# License as published by the Free Software Foundation.
#
# linuxnet-iptables is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
# License for more details.
#
# You should have received a copy of the GNU Affero General
# Public License along with linuxnet-iptables. If not, see
# <https://www.gnu.org/licenses/>.
"""This module offers two types of classes:
- xxxCriterion
- xxxMatch
An xxxMatch class allows packet matching using one or more match-specific
criteria. An xxxMatch class corresponds to an iptables(8) extension match
module (except for PacketMatch), with the options of that module mapping
to xxxCriterion classes, while the PacketMatch class offers matching
against the common criteria (source/dest address etc.)
"""
# pylint: disable=too-many-lines
from enum import IntFlag
from ipaddress import IPv4Network
from typing import Any, List, Optional, Set, Tuple
from .exceptions import IptablesError, IptablesParsingError
from .deps import get_logger
_logger = get_logger('linuxnet.iptables.match')
class _CriteriaExhaustedError(Exception):
"""Raised to indicate that criteria parsing has completed
"""
[docs]class Criterion:
"""
This class is used to *express* an **iptables(8)** match criterion;
it does not perform any comparisons.
:class:`Criterion` is a superclass that serves the following purposes:
1) it provides an :meth:`equals` method and a :meth:`not_equals`
method to express comparison against a value
2) it keeps track of whether the criterion has been set;
a criterion is set when either the :meth:`equals` or
:meth:`not_equals` method is invoked; **a criterion may only**
**be set once**
3) it keeps track of whether a criterion is negated or not
4) its :meth:`iptables_to_args` method generates the '!'
**iptables(8)** argument, and also checks if the criterion was set
The :meth:`equals`/:meth:`not_equals` methods of :class:`Criterion`
subclasses **must invoke** the :meth:`_set_polarity` method of
:class:`Criterion` to indicate the polarity of the test.
These methods are also responsible for saving the comparison value
in the subclass object.
A :class:`Criterion` has an owner which is an object of a subclass
of :class:`Match`. The :meth:`equals`/:meth:`not_equals` methods return
this object to facilitate building a criteria list:
::
match.crit1().equals('foo').crit2().not_equals('bar')
"""
def __init__(self, match: 'Match'):
"""
:param match: the :class:`Match` object that owns this ``Criterion``
"""
self.__match: 'Match' = match
self.__positive = None
def __eq__(self, other: 'Criterion'):
"""Compare this :class:`Criterion` with ``other``
"""
if not self._may_be_equal(other):
return False
if not self.is_set():
return True
return self.get_value() == other.get_value()
def __ne__(self, other: 'Criterion'):
return not self.__eq__(other)
[docs] def is_set(self) -> bool:
"""Returns ``True`` if the criterion has been set
"""
return self.__positive is not None
[docs] def is_positive(self) -> bool:
"""Returns the 'polarity' of the criterion; ``True`` for
:meth:`equals` or ``False`` for :meth:`not_equals`
Raises :class:`IptablesError` if the criterion is not set
"""
if not self.is_set():
raise IptablesError('criterion not set')
return self.__positive
def _may_be_equal(self, other: 'Criterion') -> bool:
"""Returns ``True`` iff:
* both criteria are set or both criteria are not set
* if both criteria are set, they have the same polarity
"""
if self.is_set() ^ other.is_set():
return False
if self.is_set():
# Both set, so compare boolean values
return self.is_positive() == other.is_positive()
# None set, so equal
return True
[docs] def get_value(self) -> Any:
"""Returns the value that this criterion is comparing against
"""
raise NotImplementedError
def _set_polarity(self, polarity: bool) -> 'Match':
"""Set the comparison polarity:
- ``True`` : equality test
- ``False`` : inequality test
Raises an :class:`IptablesError` if the polarity is already set.
Returns this object.
"""
if self.__positive is not None:
raise IptablesError(f"attempt to modify {self.__class__.__name__}")
self.__positive = polarity
return self.__match
[docs] def equals(self, *args, **kwargs) -> 'Match':
"""Express equality comparison against the argument values.
The method implementation in this class expects no arguments and
expresses a bool comparison.
Subclasses will implement this method to express comparisons
against a specific value (or values). These values will be the
arguments of the subclass method and will be stored in the
subclass object.
Subclasses overriding this method should invoke the
:meth:`_set_polarity` method of this class to set the polarity
to ``True``.
Returns this object.
"""
_ = (args, kwargs)
return self._set_polarity(True)
[docs] def not_equals(self, *args, **kwargs) -> 'Match':
"""Express inequality comparison against the argument values.
The method implementation in this class invokes the :meth:`equals`
method and then reverses the polarity; subclasses should not need
to override it.
Subclasses overriding this method should invoke the
:meth:`_set_polarity` method of this class to set the polarity
to ``False``.
Returns this object.
"""
_ = self.equals(*args, **kwargs)
self.__positive = False
return self.__match
[docs] def compare(self, is_equal: bool, *args, **kwargs) -> 'Match':
"""Alternative method used for comparisons. It invokes
:meth:`equals` (or :meth:`not_equals`) with ``args`` and ``kwargs``
if ``is_equal`` is ``True`` (or ``False``).
"""
if is_equal: # pylint: disable=no-else-return
return self.equals(*args, **kwargs)
else:
return self.not_equals(*args, **kwargs)
def _crit_iptables_args(self) -> List[str]:
"""Returns a list of **iptables(8)** arguments for the criterion,
except for polarity
"""
raise NotImplementedError
[docs] def to_iptables_args(self) -> List[str]:
"""Returns a list of **iptables(8)** arguments
"""
retval = [] if self.is_positive() else ['!']
retval += self._crit_iptables_args()
return retval
[docs]class Match:
"""Parent class for all match-specific subclasses.
"""
[docs] @staticmethod
def build_iptables_args(match_name: Optional[str],
crit_iterable) -> List[str]:
"""Helper method to build an **iptables(8)** argument list from
a match name and a list of :class:`Criterion` objects.
**iptables(8)** arguments will be extracted from each
criterion that is set.
:param match_name: optional match name that will result
in adding '-m' ``match_name`` at the beginning of
the argument list; there must be at least one set
criterion for the match name to be included in the
return value
:param crit_iterable: a :class:`Criterion` iterator
"""
args = []
for crit in crit_iterable:
if crit is not None and crit.is_set():
args += crit.to_iptables_args()
if not args:
return args
if not match_name:
return args
return ['-m', match_name] + args
[docs] def has_criteria(self) -> bool:
"""Returns ``True`` if the match has any criteria set
"""
return bool(self.to_iptables_args())
[docs] def to_iptables_args(self) -> List[str]:
"""Returns a list of **iptables(8)** arguments
This method must be implemented by subclasses.
"""
raise NotImplementedError
def __eq__(self, other: 'Match'):
"""We rely on subclasses to define equality by value
"""
return self is other
def __ne__(self, other: 'Match'):
return not self.__eq__(other)
[docs]class MatchNone(Match):
"""This is a special class to indicate the absence of any
:class:`Match` objects.
"""
[docs] def to_iptables_args(self) -> List[str]:
"""Returns a list of **iptables(8)** arguments
"""
return []
class _GenericCriterion(Criterion):
"""A generic criterion that can be used by all criteria that
correspond to **iptables(8)** options of the form "[!] option value",
for example, "[!] -p tcp"
"""
def __init__(self, match: Match, iptables_option: str):
super().__init__(match)
self.__value = None
self.__option = iptables_option
def get_value(self) -> Any:
"""Returns the criterion value
"""
return self.__value
def equals(self, value) -> Match: # pylint: disable=arguments-differ
"""Compare with the specified value
"""
self.__value = value
return self._set_polarity(True)
def _crit_iptables_args(self) -> List[str]:
"""Convert to **iptables(8)** arguments
"""
return [self.__option, str(self.__value)]
#
#######################################################################
#
# Matching against standard packet attributes
#
[docs]class OutputInterfaceCriterion(_GenericCriterion):
"""Compare with the output interface; used by :class:`PacketMatch`.
The comparison value is a string.
"""
def __init__(self, match: Match):
super().__init__(match, '-o')
def __eq__(self, other):
return (isinstance(other, OutputInterfaceCriterion) and
super().__eq__(other))
[docs]class SourceAddressCriterion(_GenericCriterion):
"""Compare with the source address; used by :class:`PacketMatch`.
The comparison value is an :class:`IPv4Network`.
"""
def __init__(self, match: Match):
super().__init__(match, '-s')
def __eq__(self, other):
return (isinstance(other, SourceAddressCriterion)
and super().__eq__(other))
[docs]class DestAddressCriterion(_GenericCriterion):
"""Compare with the destination address; used by :class:`PacketMatch`.
The comparison value is an :class:`IPv4Network`.
"""
def __init__(self, match: Match):
super().__init__(match, '-d')
def __eq__(self, other):
return (isinstance(other, DestAddressCriterion) and
super().__eq__(other))
class ProtocolCriterion(_GenericCriterion):
"""Compare with the protocol; used by :class:`PacketMatch`.
The comparison value is a string.
"""
def __init__(self, match: Match):
super().__init__(match, '-p')
def __eq__(self, other):
return isinstance(other, ProtocolCriterion) and super().__eq__(other)
[docs]class FragmentCriterion(Criterion):
"""Check if a packet is a fragment. The parent :class:`Criterion`
methods :meth:`equals`, :meth:`not_equals` can be used with no
arguments to indicate matching when the fragment bit is set or is not set.
This criterion is used by :class:`PacketMatch`.
"""
def get_value(self) -> bool:
"""Returns the 'polarity' of the criterion
"""
return self.is_positive()
def __eq__(self, other):
return (isinstance(other, FragmentCriterion) and
self._may_be_equal(other))
def _crit_iptables_args(self) -> List[str]:
"""Convert to **iptables(8)** arguments
"""
return ['-f']
[docs]class PacketMatch(Match):
"""This class provides matching against the following attributes of
a packet:
* input interface
* output interface
* source address
* destination address
* fragment bit
"""
def __init__(self):
self.__iif_crit = None
self.__oif_crit = None
self.__proto_crit = None
self.__frag_crit = None
self.__source_crit = None
self.__dest_crit = None
def __eq__(self, other):
return (
isinstance(other, PacketMatch) and
self.input_interface() == other.input_interface() and
self.output_interface() == other.output_interface() and
self.protocol() == other.protocol() and
self.fragment() == other.fragment() and
self.source_address() == other.source_address() and
self.dest_address() == other.dest_address()
)
[docs] def protocol(self) -> ProtocolCriterion:
"""Match against the protocol
"""
if self.__proto_crit is None:
self.__proto_crit = ProtocolCriterion(self)
return self.__proto_crit
[docs] def output_interface(self) -> OutputInterfaceCriterion:
"""Match against the output interface
"""
if self.__oif_crit is None:
self.__oif_crit = OutputInterfaceCriterion(self)
return self.__oif_crit
[docs] def source_address(self) -> SourceAddressCriterion:
"""Match against the source address
"""
if self.__source_crit is None:
self.__source_crit = SourceAddressCriterion(self)
return self.__source_crit
[docs] def dest_address(self) -> DestAddressCriterion:
"""Match against the destination address
"""
if self.__dest_crit is None:
self.__dest_crit = DestAddressCriterion(self)
return self.__dest_crit
[docs] def fragment(self) -> FragmentCriterion:
"""Match if packet has (or has not) the fragment bit set
"""
if self.__frag_crit is None:
self.__frag_crit = FragmentCriterion(self)
return self.__frag_crit
[docs] def to_iptables_args(self) -> List[str]:
"""Generate an **iptables(8)** arguments list for the set criteria
"""
criteria = (
self.__iif_crit,
self.__oif_crit,
self.__proto_crit,
self.__frag_crit,
self.__source_crit,
self.__dest_crit,
)
return self.build_iptables_args(None, criteria)
@classmethod
def _parse(cls, field_iter) -> Optional['PacketMatch']:
"""Parse the following fields, which will be returned in-order
from field_iter:
protocol, options, input-interface, output-interface,
source, destination
Returns a PacketMatch object if any criteria for the above
fields are defined, otherwise None.
:param field_iter: an iterator that returns the fields of an
**iptables(8)** output line starting with the protocol field
"""
packet_match = PacketMatch()
proto = next(field_iter)
if proto != 'all':
is_equal, proto = _parse_value(proto)
packet_match.protocol().compare(is_equal, proto)
opt = next(field_iter)
if opt == '--':
pass
elif opt == '-f':
packet_match.fragment().equals()
elif opt == '!f':
packet_match.fragment().not_equals()
else:
raise IptablesParsingError(f'cannot parse option: {opt}')
iif = next(field_iter)
if iif != '*':
is_equal, interface_name = _parse_value(iif)
packet_match.input_interface().compare(is_equal, interface_name)
oif = next(field_iter)
if oif != '*':
is_equal, interface_name = _parse_value(oif)
packet_match.output_interface().compare(is_equal, interface_name)
source = next(field_iter)
if source != '0.0.0.0/0':
is_equal, srcaddr = _parse_value(source, IPv4Network)
packet_match.source_address().compare(is_equal, srcaddr)
dest = next(field_iter)
if dest != '0.0.0.0/0':
is_equal, destaddr = _parse_value(dest, IPv4Network)
packet_match.dest_address().compare(is_equal, destaddr)
return packet_match if packet_match.has_criteria() else None
#
#######################################################################
#
# Matching against packet marks (either fwmark or ctmark)
#
[docs]class MarkCriterion(Criterion):
"""A criterion for a mark, used by :class:`MarkMatch` and
:class:`ConnmarkMatch`
since the **iptables(8)** option used by the mark/connmark modules is
the same.
The comparison value is a tuple consisting of an (integer) mark value
and an integer mask value (``None`` in case of no mask).
"""
def __init__(self, match: Match):
super().__init__(match)
self.__mark = None
self.__mask = None
def __eq__(self, other):
return isinstance(other, MarkCriterion) and super().__eq__(other)
[docs] def get_value(self):
"""Returns the value that the criterion is comparing against.
:rtype: tuple of (int, int|None)
"""
return (self.__mark, self.__mask)
[docs] def equals(self, # pylint: disable=arguments-differ
mark: int, mask: Optional[int] =None) -> Match:
"""Check for equality against ``mark`` and optionally ``mask``
:param mark: the mark value
:param mask: the mask value
"""
self.__mark = mark
self.__mask = mask
return self._set_polarity(True)
def __mark2str(self):
"""Convert the mark/mask to a string; both values will be in hex
"""
markstr = f'{self.__mark:#x}'
if self.__mask is not None:
markstr += f'/{self.__mask:#x}'
return markstr
def _crit_iptables_args(self) -> List[str]:
"""Returns **iptables(8)** arguments for the specified mark
"""
return ['--mark', self.__mark2str()]
[docs]class MarkMatch(Match):
"""Match against the fwmark
"""
def __init__(self):
self.__mark_crit = None
def __eq__(self, other):
return isinstance(other, MarkMatch) and self.mark() == other.mark()
[docs] def mark(self) -> MarkCriterion:
"""Match against the packet's fwmark.
"""
if self.__mark_crit is None:
self.__mark_crit = MarkCriterion(self)
return self.__mark_crit
[docs] def to_iptables_args(self) -> List[str]:
"""Returns **iptables(8)** arguments for this match
"""
return self.build_iptables_args('mark', [self.__mark_crit])
@classmethod
def _parse( # pylint: disable=unused-argument
cls, criteria_iter, match_name: str,
negation: Optional[str]) -> Match:
"""Parse the mark criteria
"""
val = next(criteria_iter)
if val != 'match':
raise IptablesParsingError("missing 'match' keyword")
is_equal, val = _parse_next_value(criteria_iter,
xform=lambda x: int(x, 16))
return MarkMatch().mark().compare(is_equal, val)
[docs]class ConnmarkMatch(Match):
"""Match against the ctmark
"""
def __init__(self):
self.__mark_crit = None
def __eq__(self, other):
return isinstance(other, ConnmarkMatch) and self.mark() == other.mark()
[docs] def mark(self) -> MarkCriterion:
"""Match against the packet's ctmark
"""
if self.__mark_crit is None:
self.__mark_crit = MarkCriterion(self)
return self.__mark_crit
[docs] def to_iptables_args(self) -> List[str]:
"""Returns **iptables(8)** arguments for this match
"""
return self.build_iptables_args('connmark', [self.__mark_crit])
@classmethod
def _parse( # pylint: disable=unused-argument
cls, criteria_iter, match_name: str,
negation: Optional[str]) -> Match:
"""Parse the mark criteria
"""
val = next(criteria_iter)
if val != 'match':
raise IptablesParsingError("missing 'match' keyword")
is_equal, val = _parse_next_value(criteria_iter,
xform=lambda x: int(x, 16))
return ConnmarkMatch().mark().compare(is_equal, val)
#
#######################################################################
#
# Matching against connection tracking attributes
#
[docs]class CtStateCriterion(_GenericCriterion):
"""Compare against the connection tracking state
The comparison value is a string.
"""
def __init__(self, match):
super().__init__(match, '--ctstate')
def __eq__(self, other):
return isinstance(other, CtStateCriterion) and super().__eq__(other)
[docs]class CtStatusCriterion(_GenericCriterion):
"""Compare against the connection tracking status
The comparison value is a string.
"""
def __init__(self, match):
super().__init__(match, '--ctstatus')
def __eq__(self, other):
return isinstance(other, CtStatusCriterion) and super().__eq__(other)
[docs]class ConntrackMatch(Match):
"""Match against the connection tracking attributes.
"""
def __init__(self):
self.__ctstate_crit = None
self.__ctstatus_crit = None
def __eq__(self, other):
return (isinstance(other, ConntrackMatch) and
self.ctstate() == other.ctstate() and
self.ctstatus() == other.ctstatus()
)
[docs] def ctstate(self) -> CtStateCriterion:
"""Match against the connection tracking state
"""
if self.__ctstate_crit is None:
self.__ctstate_crit = CtStateCriterion(self)
return self.__ctstate_crit
[docs] def ctstatus(self) -> CtStatusCriterion:
"""Matching against the connection tracking status
"""
if self.__ctstatus_crit is None:
self.__ctstatus_crit = CtStatusCriterion(self)
return self.__ctstatus_crit
[docs] def to_iptables_args(self) -> List[str]:
"""Returns **iptables(8)** arguments for this match
"""
return self.build_iptables_args('conntrack',
[self.__ctstate_crit, self.__ctstatus_crit])
@classmethod
def _parse(cls, criteria_iter, match_name: str,
negation: Optional[str]) -> Match:
"""Match against ctstate, ctstatus
"""
# Return the match_name and (optionally) negation to the iterator
# so that we can process them as part of the for-loop below.
# The for-loop is designed to handle all conntrack-related criteria
# (which we expect to appear consecutively).
criteria_iter.put_back(match_name)
if negation is not None:
criteria_iter.put_back(negation)
match = ConntrackMatch()
for criterion in criteria_iter:
negation = None
if criterion == '!':
negation = match_name
criterion = next(criteria_iter)
if criterion == 'ctstate':
is_equal, val = _parse_value(next(criteria_iter))
if negation is not None:
is_equal = False
match.ctstate().compare(is_equal, val)
elif criterion == 'ctstatus':
is_equal, val = _parse_value(next(criteria_iter))
if negation is not None:
is_equal = False
match.ctstatus().compare(is_equal, val)
else:
criteria_iter.put_back(criterion)
if negation is not None:
criteria_iter.put_back(negation)
break
return match
#
#######################################################################
#
# Matching against connection tracking state
#
[docs]class StateCriterion(_GenericCriterion):
"""Compare with the connection tracking state
The comparison value is a string.
"""
def __init__(self, match):
super().__init__(match, '--state')
def __eq__(self, other):
return isinstance(other, StateCriterion) and super().__eq__(other)
[docs]class StateMatch(Match):
"""Match against the connection tracking state
This match is accessed via the state module, but it is not clear
how its functionality is different from the conntrack module's
--ctstate option.
"""
def __init__(self):
self.__state_crit = None
def __eq__(self, other):
return isinstance(other, StateMatch) and self.state() == other.state()
[docs] def state(self) -> StateCriterion:
"""Match against the connection tracking state
"""
if self.__state_crit is None:
self.__state_crit = StateCriterion(self)
return self.__state_crit
[docs] def to_iptables_args(self) -> List[str]:
"""Returns **iptables(8)** arguments for this match
"""
return self.build_iptables_args('state', [self.__state_crit])
@classmethod
def _parse( # pylint: disable=unused-argument
cls, criteria_iter, match_name: str,
negation: Optional[str]) -> Match:
"""Match against state
"""
is_equal, val = _parse_value(next(criteria_iter))
if negation is not None:
is_equal = False
return StateMatch().state().compare(is_equal, val)
#
#######################################################################
#
# Matching against protocol-related attributes, where protocol
# is TCP, UDP, ICMP
#
[docs]class MssCriterion(_GenericCriterion):
"""Compare with MSS field of the TCP header.
The comparison value is a string in the form ``value``[:``value``]
"""
def __init__(self, match):
super().__init__(match, '--mss')
def __eq__(self, other):
return isinstance(other, MssCriterion) and super().__eq__(other)
[docs]class TcpmssMatch(Match):
"""Match against the MSS field of the TCP header
"""
def __init__(self):
self.__mss_crit = None
def __eq__(self, other):
return isinstance(other, TcpmssMatch) and self.mss() == other.mss()
[docs] def mss(self) -> MssCriterion:
"""Match against the MSS field of the TCP header
"""
if self.__mss_crit is None:
self.__mss_crit = MssCriterion(self)
return self.__mss_crit
[docs] def to_iptables_args(self) -> List[str]:
"""Returns **iptables(8)** arguments for this match
"""
return self.build_iptables_args('tcpmss', [self.__mss_crit])
@classmethod
def _parse( # pylint: disable=unused-argument
cls, criteria_iter, match_name: str,
negation: Optional[str]) -> Match:
"""Parse tcpmss.
"""
# NB: although the expected value has the syntax <num>[:num>]
# we currently treat it as a string
val = next(criteria_iter)
if val != 'match':
raise IptablesParsingError("missing 'match' keyword")
is_equal, val = _parse_value(next(criteria_iter))
return TcpmssMatch().mss().compare(is_equal, val)
class TcpFlag(IntFlag):
"""Names and values for the TCP flags.
"""
FIN = 0x1
SYN = 0x2
RST = 0x4
PSH = 0x8
ACK = 0x10
URG = 0x20
[docs]class TcpFlagsCriterion(Criterion):
"""A criterion for comparing against packets with an arbitrary set of
TCP flags set.
As a special case, it also serves for comparing against SYN packets.
The value is the tuple (flags-checked, flags-set); both flags-checked
and flags-set are comma-separated lists of TCP flag names.
"""
def __init__(self, match: Match, syn_only=False):
"""
:param match: the :class:`Match` object that owns this object
:param syn_only: optional boolean value indicating a check only
against the **SYN** flag
"""
super().__init__(match)
# If syn_only is True, then flags_checked/flags_set will be None
self.__syn_only = syn_only
self.__flags_checked = None
self.__flags_set = None
def __eq__(self, other):
if not isinstance(other, TcpFlagsCriterion):
return False
if self.is_syn_only():
return other.is_syn_only()
if other.is_syn_only():
return False
return self.get_value() == other.get_value()
def get_value(self) -> Any:
"""Returns the value that the criterion is comparing against
"""
return (self.__flags_checked, self.__flags_set)
def is_syn_only(self):
"""Returns ``True`` if the criterion is only meant to check
for the SYN flag (but note that it may not be set yet)
"""
return self.__syn_only
def equals(self, # pylint: disable=arguments-differ
flags_checked: Optional[Set[TcpFlag]] =None,
flags_set: Optional[List[TcpFlag]] =None) -> Match:
"""Perform flags comparison
"""
if self.__syn_only:
if not (flags_checked is None and flags_set is None):
raise IptablesError("cannot set flags in SYN criterion")
return self._set_polarity(True)
if flags_checked is None:
raise IptablesError("need to specify flags to check")
if flags_set is None:
raise IptablesError("need to specify flags that are set")
self.__flags_checked = flags_checked
self.__flags_set = flags_set
return self._set_polarity(True)
def _crit_iptables_args(self) -> List[str]:
"""Returns **iptables(8)** arguments for the specified TCP flags
"""
if self.__syn_only:
return ['--syn']
return ['--tcp-flags',
','.join([f.name for f in self.__flags_checked]),
','.join([f.name for f in self.__flags_set])]
class _PortCriterion(Criterion):
"""Compare against a source/destination port or port-range
"""
def __init__(self, match: Match, iptables_option: str):
super().__init__(match)
self.__option = iptables_option
self.__port = None
self.__last_port = None
def get_value(self) -> Any:
"""Returns the value that the criterion is comparing against
"""
return (self.__port, self.__last_port)
def equals(self, # pylint: disable=arguments-differ
port: int, last_port: Optional[int] =None) -> Match:
"""Compare with a port (or inclusion in port-range if ``last_post``
is present)
"""
self.__port = port
self.__last_port = last_port
return self._set_polarity(True)
def _crit_iptables_args(self) -> List[str]:
"""Returns **iptables(8)** arguments for the specified port(s)
"""
port_spec = str(self.__port)
if self.__last_port is not None:
port_spec += f':{self.__last_port}'
return [self.__option, port_spec]
[docs]class SourcePortCriterion(_PortCriterion):
"""Compare with a source port or check for inclusion in port-range
"""
def __init__(self, match: Match):
super().__init__(match, '--sport')
def __eq__(self, other):
return isinstance(other, SourcePortCriterion) and super().__eq__(other)
[docs]class DestPortCriterion(_PortCriterion):
"""Compare against a destination port or check for inclusion in port-range
"""
def __init__(self, match: Match):
super().__init__(match, '--dport')
def __eq__(self, other):
return isinstance(other, DestPortCriterion) and super().__eq__(other)
class _PortParser: # pylint: disable=too-few-public-methods
"""Helper class used to parse TCP/UDP port criteria
"""
SOURCE_PORT_PREFIX = ('spt:', 'spts:')
DEST_PORT_PREFIX = ('dpt:', 'dpts:')
PORT_PREFIX = SOURCE_PORT_PREFIX + DEST_PORT_PREFIX
@classmethod
def parse(cls, port_match_str: str, match: Match):
"""Add the proper criterion to 'match'
"""
if port_match_str.startswith(cls.SOURCE_PORT_PREFIX):
port_crit = match.source_port()
else:
port_crit = match.dest_port()
port_spec = port_match_str.split(':', 1)[1]
is_equal, port_spec = _parse_value(port_spec)
if ':' not in port_spec:
port_crit.compare(is_equal, int(port_spec))
return
ports = port_spec.split(':', 1)
port_crit.compare(is_equal, int(ports[0]), int(ports[1]))
[docs]class TcpMatch(Match):
"""Match against the fields of the TCP header
"""
def __init__(self):
self.__flags_crit = None
self.__src_port_crit = None
self.__dest_port_crit = None
def __eq__(self, other):
return (
isinstance(other, TcpMatch) and
self.tcp_flags() == other.tcp_flags() and
self.source_port() == other.source_port() and
self.dest_port() == other.dest_port()
)
[docs] def syn(self) -> TcpFlagsCriterion:
"""Criterion for matching against a SYN packet
"""
if self.__flags_crit is None:
self.__flags_crit = TcpFlagsCriterion(self, syn_only=True)
return self.__flags_crit
[docs] def tcp_flags(self) -> TcpFlagsCriterion:
"""Compare with TCP flags
"""
if self.__flags_crit is None:
self.__flags_crit = TcpFlagsCriterion(self)
return self.__flags_crit
[docs] def source_port(self) -> SourcePortCriterion:
"""Matching against the source port
"""
if self.__src_port_crit is None:
self.__src_port_crit = SourcePortCriterion(self)
return self.__src_port_crit
[docs] def dest_port(self) -> DestPortCriterion:
"""Match against the destination port
"""
if self.__dest_port_crit is None:
self.__dest_port_crit = DestPortCriterion(self)
return self.__dest_port_crit
[docs] def to_iptables_args(self) -> List[str]:
"""Returns **iptables(8)** arguments for this match
"""
criteria = (self.__flags_crit, self.__src_port_crit,
self.__dest_port_crit)
return self.build_iptables_args('tcp', criteria)
@classmethod
def __parse_tcp_flags_num(cls, numstr: int) -> Set[TcpFlag]:
"""Parse a hex-value numstr (e.g. 0x11) into a set of TCP flags.
"""
try:
flag_mask = int(numstr, 16)
flags = {flag for flag in TcpFlag if flag_mask & flag}
return flags
except ValueError as valerr:
raise IptablesParsingError(
"Bad TCP flag mask: " + numstr) from valerr
@classmethod
def _parse( # pylint: disable=unused-argument
cls, criteria_iter, match_name: str,
negation: Optional[str]) -> Match:
"""Parse the TCP criteria
"""
match = TcpMatch()
for val in criteria_iter:
if val.startswith('flags:'):
flag_spec = val.split(':', 1)[1]
is_equal, flag_spec = _parse_value(flag_spec)
if '/' not in flag_spec:
raise IptablesParsingError(
f"no '/' in TCP flags: {flag_spec}")
mask, comp = flag_spec.split('/', 1)
flags_checked = cls.__parse_tcp_flags_num(mask)
flags_set = cls.__parse_tcp_flags_num(comp)
if (flags_set == {TcpFlag.SYN} and
flags_checked == {TcpFlag.FIN, TcpFlag.SYN,
TcpFlag.RST, TcpFlag.ACK}):
match.syn().compare(is_equal)
else:
match.tcp_flags().compare(is_equal,
flags_checked, flags_set)
elif val.startswith(_PortParser.PORT_PREFIX):
_PortParser.parse(val, match)
else:
criteria_iter.put_back(val)
break
return match
[docs]class UdpMatch(Match):
"""Match against the fields of the UDP header
"""
def __init__(self):
self.__src_port_crit = None
self.__dest_port_crit = None
def __eq__(self, other):
return (
isinstance(other, UdpMatch) and
self.source_port() == other.source_port() and
self.dest_port() == other.dest_port()
)
[docs] def source_port(self) -> SourcePortCriterion:
"""Match against the source port
"""
if self.__src_port_crit is None:
self.__src_port_crit = SourcePortCriterion(self)
return self.__src_port_crit
[docs] def dest_port(self) -> DestPortCriterion:
"""Match against the destination port
"""
if self.__dest_port_crit is None:
self.__dest_port_crit = DestPortCriterion(self)
return self.__dest_port_crit
[docs] def to_iptables_args(self) -> List[str]:
"""Returns **iptables(8)** arguments for this match
"""
criteria = (self.__src_port_crit, self.__dest_port_crit)
return self.build_iptables_args('udp', criteria)
@classmethod
def _parse( # pylint: disable=unused-argument
cls, criteria_iter, match_name: str,
negation: Optional[str]) -> Match:
"""Parse the UDP criteria
"""
match = UdpMatch()
for val in criteria_iter:
if val.startswith(_PortParser.PORT_PREFIX):
_PortParser.parse(val, match)
else:
criteria_iter.put_back(val)
break
return match
[docs]class IcmpTypeCriterion(Criterion):
"""Compare with the ICMP type.
The comparison value is the tuple
(icmp-type-name, icmp-type-value, icmp-code); icmp-type-name
is a string, icmp-type-value is an integer, and icmp-code is a string.
"""
# Mapping of ICMP codes to iptables(8) --icmp-type parameter values
__VAL2NAME_MAP = {
0 : 'echo-reply',
3 : 'destination-unreachable',
4 : 'source-quench',
5 : 'redirect',
8 : 'echo-request',
9 : 'router-advertisement',
10 : 'router-solicitation',
11 : 'time-exceeded',
12 : 'parameter-problem',
13 : 'timestamp-request',
14 : 'timestamp-reply',
17 : 'address-mask-request',
18 : 'address-mask-reply',
}
__NAME2VAL_MAP = {v : k for k, v in __VAL2NAME_MAP.items()}
def __init__(self, match: Match):
super().__init__(match)
# If __icmp_type_name is not None, __icmp_type_value and __icmp_code
# must be None.
# If icmp_type_value is not None, __icmp_code may or may not be None
self.__icmp_type_name = None
self.__icmp_type_value = None
self.__icmp_code = None
def __eq__(self, other):
return isinstance(other, IcmpTypeCriterion) and super().__eq__(other)
def get_value(self) -> Any:
"""Returns the value that the criterion is comparing against
"""
return (self.__icmp_type_name, self.__icmp_type_value, self.__icmp_code)
def equals(self, # pylint: disable=arguments-differ)
icmp_type_name: Optional[str] =None,
icmp_type_value: Optional[int] =None,
icmp_code: Optional[str] =None) -> Match:
"""Check for equality against icmp_type_name, or
icmp_type_value and icmp_code.
Exactly one of icmp_type_name/icmp_type_value must be present;
icmp_code is optional.
"""
if icmp_type_name is not None:
if icmp_type_value is not None:
raise IptablesError(
'cannot specify both ICMP type name and value')
if icmp_type_name not in self.__NAME2VAL_MAP:
raise IptablesError(f'unknown ICMP type name: {icmp_type_name}')
# If an icmp_code was specified, replace icmp_type_name with
# icmp_type_value so that it can be expressed in --icmp-type syntax
if icmp_code is not None:
icmp_type_value = self.__NAME2VAL_MAP[icmp_type_name]
icmp_type_name = None
else:
if icmp_type_value is None:
raise IptablesError(
'must specify either ICMP type name or value')
if icmp_code is None:
icmp_type_name = self.__VAL2NAME_MAP.get(icmp_type_value)
if icmp_type_name is not None:
icmp_type_value = None
self.__icmp_type_name = icmp_type_name
self.__icmp_type_value = icmp_type_value
self.__icmp_code = icmp_code
return self._set_polarity(True)
def _crit_iptables_args(self) -> List[str]:
"""Returns **iptables(8)** arguments for the specified mark
"""
retval = ['--icmp-type']
if self.__icmp_type_name is not None:
retval.append(self.__icmp_type_name)
elif self.__icmp_type_value is not None:
val = f'{self.__icmp_type_value}'
if self.__icmp_code is not None:
val += f'/{self.__icmp_code}'
retval.append(val)
else:
raise IptablesError('no ICMP type present')
return retval
[docs]class IcmpMatch(Match):
"""Match against the fields of the ICMP header
"""
def __init__(self):
self.__icmp_type_crit = None
def __eq__(self, other):
return (isinstance(other, IcmpMatch) and
self.icmp_type() == other.icmp_type())
[docs] def icmp_type(self) -> IcmpTypeCriterion:
"""Criterion for matching against the ICMP type
"""
if self.__icmp_type_crit is None:
self.__icmp_type_crit = IcmpTypeCriterion(self)
return self.__icmp_type_crit
[docs] def to_iptables_args(self) -> List[str]:
"""Returns **iptables(8)** arguments for this match
"""
return self.build_iptables_args('icmp', [self.__icmp_type_crit])
@classmethod
def _parse( # pylint: disable=unused-argument
cls, criteria_iter, match_name: str,
negation: Optional[str]) -> Match:
"""Parse the ICMP criteria
"""
# iptables 1.4.7 output:
# icmp type 3
# icmp !type 3
# iptables 1.8.4 output:
# icmptype 3
# icmp !type 3
if match_name == 'icmp':
is_equal, val = _parse_value(next(criteria_iter))
if val != 'type':
raise IptablesParsingError(f"expecting 'type' instead of {val}")
elif match_name == 'icmptype':
is_equal = True
else:
raise IptablesParsingError(f"ICMP unable to parse '{match_name}'")
match = IcmpMatch()
val = int(next(criteria_iter))
return match.icmp_type().compare(is_equal, icmp_type_value=val)
#
#######################################################################
#
# Rate limit
#
[docs]class RateLimitCriterion(Criterion):
"""Compare with a rate limit
The comparison value is an integer
"""
__INTERVAL_LIST = [
(86400, 'sec'),
(1440, 'min'),
(24, 'hour'),
(1, 'day'),
]
@classmethod
def spec2rate(cls, spec: str) -> int:
"""Convert a rate spec which has the form <num>/<interval> into
a rate number which is per-day
"""
fields = spec.split('/')
if len(fields) != 2:
raise ValueError(f"bad rate spec '{spec}'")
rate = int(fields[0])
spec_interval = fields[1]
for numsec, interval in cls.__INTERVAL_LIST:
if spec_interval == interval:
return rate * numsec
raise ValueError(f"unknown interval '{spec_interval}'")
@classmethod
def rate2spec(cls, rate: int) -> str:
"""Convert a rate number which is per-second to the form
<num>/<interval>
"""
for numsec, interval in cls.__INTERVAL_LIST:
if (rate % numsec) == 0:
spec_rate = rate // numsec
return f'{spec_rate:d}/{interval}'
raise ValueError(f"bad rate value: {rate}")
def __init__(self, match: Match):
super().__init__(match)
# The rate, when present, is always in events/day
self.__rate = None
def __eq__(self, other):
return isinstance(other, RateLimitCriterion) and super().__eq__(other)
def get_value(self) -> int:
"""Returns the value that the criterion is comparing against
"""
return self.__rate
def equals(self, rate: int) -> Match: # pylint: disable=arguments-differ
"""Compare with the specified rate
"""
if rate <= 0:
raise IptablesError(f'invalid rate: {rate}')
self.__rate = rate
return self._set_polarity(True)
def _crit_iptables_args(self) -> List[str]:
"""Returns **iptables(8)** arguments for the specified rate
"""
if not self.is_set():
raise IptablesError('limit not set')
return ['--limit', self.rate2spec(self.__rate)]
[docs]class BurstCriterion(_GenericCriterion):
"""Compare with the burst limit
The comparison value is an integer
"""
def __init__(self, match: Match):
super().__init__(match, '--limit-burst')
def __eq__(self, other):
return isinstance(other, BurstCriterion) and super().__eq__(other)
[docs]class LimitMatch(Match):
"""Match against a rate limit with a maximum burst
"""
def __init__(self):
self.__rate_limit_crit = None
self.__limit_burst_crit = None
def __eq__(self, other):
return (
isinstance(other, LimitMatch) and
self.limit() == other.limit() and
self.burst() == other.burst()
)
[docs] def limit(self) -> RateLimitCriterion:
"""Compare with the rate limit
"""
if self.__rate_limit_crit is None:
self.__rate_limit_crit = RateLimitCriterion(self)
return self.__rate_limit_crit
[docs] def burst(self) -> BurstCriterion:
"""Compare with the burst limit
"""
if self.__limit_burst_crit is None:
self.__limit_burst_crit = BurstCriterion(self)
return self.__limit_burst_crit
[docs] def to_iptables_args(self) -> List[str]:
"""Returns **iptables(8)** arguments for this match
"""
criteria = (self.__rate_limit_crit, self.__limit_burst_crit)
return self.build_iptables_args('limit', criteria)
@classmethod
def _parse( # pylint: disable=unused-argument
cls, criteria_iter, match_name: str,
negation: Optional[str]) -> Match:
"""Parse limit
"""
val = next(criteria_iter)
if val != 'avg':
_logger.error(
"%s: parsing limit: expected 'avg'; found '%s'",
cls._parse.__qualname__, val)
raise IptablesParsingError("missing 'avg' field")
match = LimitMatch()
val = next(criteria_iter)
rate = RateLimitCriterion.spec2rate(val)
# limit does not support '!'
match.limit().equals(rate)
val = next(criteria_iter)
if val != 'burst':
_logger.error(
"%s: parsing limit: expected 'burst'; found '%s'",
cls._parse.__qualname__, val)
raise IptablesParsingError("missing 'burst' field")
burst = int(next(criteria_iter))
return match.burst().equals(burst)
#
#######################################################################
#
# Packet type
#
[docs]class PacketTypeCriterion(_GenericCriterion):
"""Compare with the packet type
The comparison value is a string.
"""
def __init__(self, match: Match):
super().__init__(match, '--pkt-type')
def __eq__(self, other):
return isinstance(other, PacketTypeCriterion) and super().__eq__(other)
[docs]class PacketTypeMatch(Match):
"""Match against the packet type
"""
def __init__(self):
self.__packet_type_crit = None
def __eq__(self, other):
return (isinstance(other, PacketTypeMatch) and
self.packet_type() == other.packet_type())
[docs] def packet_type(self) -> PacketTypeCriterion:
"""Compare with the packet type
"""
if self.__packet_type_crit is None:
self.__packet_type_crit = PacketTypeCriterion(self)
return self.__packet_type_crit
[docs] def to_iptables_args(self) -> List[str]:
"""Returns **iptables(8)** arguments for this match
"""
return self.build_iptables_args('pkttype', [self.__packet_type_crit])
@classmethod
def _parse( # pylint: disable=unused-argument
cls, criteria_iter, match_name: str,
negation: Optional[str]) -> Match:
"""Parse PKTTYPE
"""
val = next(criteria_iter)
if val == '=':
is_equal = True
elif val == '!=':
is_equal = False
else:
_logger.error(
"%s: parsing PKTTYPE: expected comparator; found '%s'",
cls._parse.__qualname__, val)
raise IptablesParsingError("missing comparator field")
return PacketTypeMatch().packet_type().compare(is_equal,
next(criteria_iter))
#
#######################################################################
#
# Comment
#
#
#######################################################################
#
# TTL
#
[docs]class TtlCriterion(Criterion):
"""A criterion for a TTL value comparison used by :class:`TtlMatch`.
"""
_EQ_COMP = '=='
_LT_COMP = '<'
_GT_COMP = '>'
def __init__(self, match: Match):
super().__init__(match)
self.__value = None
self.__comp = None
def __eq__(self, other):
return isinstance(other, TtlCriterion) and super().__eq__(other)
[docs] def get_value(self):
"""Returns the value that the criterion is comparing against
and the comparison operation (as a string)
:rtype: tuple of (int, str)
"""
return (self.__value, self.__comp)
[docs] def equals(self, value: int) -> Match: # pylint: disable=arguments-differ
"""Check if the packet TTL is equal to ``value``
:param value: the TTL value
"""
self.__value = value
self.__comp = self._EQ_COMP
return self._set_polarity(True)
[docs] def less_than(self, value: int) -> Match:
"""Check if the packet TTL is less than ``value``
:param value: the TTL value
"""
self.__value = value
self.__comp = self._LT_COMP
return self._set_polarity(True)
[docs] def greater_than(self, value: int) -> Match:
"""Check if the packet TTL is greater than ``value``
:param value: the TTL value
"""
self.__value = value
self.__comp = self._GT_COMP
return self._set_polarity(True)
def _crit_iptables_args(self) -> List[str]:
"""Returns **iptables(8)** arguments for the specified TTL comparison
"""
if self.__comp == self._EQ_COMP:
return ['--ttl-eq', str(self.__value)]
if self.__comp == self._LT_COMP:
return ['--ttl-lt', str(self.__value)]
return ['--ttl-gt', str(self.__value)]
[docs]class TtlMatch(Match):
"""Match against the packet TTL value
"""
def __init__(self):
self.__ttl_crit = None
def __eq__(self, other):
return (isinstance(other, TtlMatch) and
self.ttl() == other.ttl())
[docs] def ttl(self) -> TtlCriterion:
"""Returns the TTL criterion
"""
if self.__ttl_crit is None:
self.__ttl_crit = TtlCriterion(self)
return self.__ttl_crit
[docs] def to_iptables_args(self) -> List[str]:
"""Returns **iptables(8)** arguments for this match
"""
return self.build_iptables_args('ttl', [self.__ttl_crit])
@classmethod
def _parse( # pylint: disable=unused-argument
cls, criteria_iter, match_name: str,
negation: Optional[str]) -> Match:
"""Parse the TTL criterion
"""
val = next(criteria_iter)
if val != 'match':
# It must be a TTL target
criteria_iter.put_back(val)
raise _CriteriaExhaustedError()
val = next(criteria_iter)
if val != 'TTL':
raise IptablesParsingError(f"expected 'TTL', found {val} ")
comp = next(criteria_iter)
value = int(next(criteria_iter))
if comp == '==':
return TtlMatch().ttl().equals(value)
if comp == '!=':
return TtlMatch().ttl().not_equals(value)
if comp == '>':
return TtlMatch().ttl().greater_than(value)
if comp == '<':
return TtlMatch().ttl().less_than(value)
raise IptablesParsingError(f"bad TTL comparison: '{comp}' ")
#
#######################################################################
#
# Criteria parsing
#
# pylint: disable=protected-access
_parser_map = {
'connmark' : ConnmarkMatch._parse,
'ctstate' : ConntrackMatch._parse,
'ctstatus' : ConntrackMatch._parse,
'icmp' : IcmpMatch._parse,
'icmptype' : IcmpMatch._parse,
'limit:' : LimitMatch._parse,
'PKTTYPE' : PacketTypeMatch._parse,
'mark' : MarkMatch._parse,
'state' : StateMatch._parse,
'tcp' : TcpMatch._parse,
'tcpmss' : TcpmssMatch._parse,
'TTL' : TtlMatch._parse,
'udp' : UdpMatch._parse,
'/*' : CommentMatch._parse
}
# pylint: enable=protected-access
def _parse_value(value: str, xform=None) -> Tuple[bool, Any]:
"""Check if the specified value starts with '!' indicating negation.
Returns the tuple (is_negative, value) where the optional '!'
has been stripped from the argument 'value'; the 'value'
may also be transformed by the xform callable; this allows
returning a value of an arbitrary type.
"""
is_equal = True
if value[0] == '!':
is_equal = False
value = value[1:]
if xform is not None:
return is_equal, xform(value)
return is_equal, value
def _parse_next_value(field_iter, xform=None) -> Tuple[bool, Any]:
"""Parse the next value from the iterator.
Allow for the following syntax:
! value (2 fields)
!value (1 field)
Returns the tuple (is_negative, value)
the 'value' may also be transformed by the xform callable; this allows
returning a value of an arbitrary type.
"""
value = next(field_iter)
if value == '!':
is_equal = False
value = next(field_iter)
elif value[0] == '!':
is_equal = False
value = value[1:]
else:
is_equal = True
if xform is not None:
return is_equal, xform(value)
return is_equal, value
def parse_criteria(field_iter) -> List[Match]:
"""Parse criteria and return a match list
"""
match_list = []
try:
match_name = None
for match_name in field_iter:
#
# For state/ctstate (maybe others), newer versions of iptables
# (e.g. 1.8.4) display negation as:
# ! state NEW
# where older versions (e.g. 1.4.7) would show all states
# except NEW
#
negation = None
if match_name == '!':
negation = match_name
match_name = next(field_iter)
#
# This code parses only a subset of possible criteria
# It needs to be expanded.
#
match = None
parser = _parser_map.get(match_name)
if parser is not None:
try:
match = parser(field_iter, match_name, negation)
except _CriteriaExhaustedError:
pass
if match is None:
# We don't know if it is a match criterion that we don't
# know about, a target name, or a target option.
# Let the caller figure it out.
field_iter.put_back(match_name)
if negation is not None:
field_iter.put_back(negation)
break
match_list.append(match)
match_name = None
except StopIteration as stopiter:
if match_name is not None:
raise IptablesParsingError(
'insufficient number of values for '
f'match {match_name}') from stopiter
return match_list