# Copyright (c) 2021, 2022, 2023, 2024, Panagiotis Tsirigotis
# This file is part of linuxnet-iptables.
#
# linuxnet-iptables is free software: you can redistribute it and/or
# modify it under the terms of version 3 of the GNU Affero General Public
# License as published by the Free Software Foundation.
#
# linuxnet-iptables is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
# License for more details.
#
# You should have received a copy of the GNU Affero General
# Public License along with linuxnet-iptables. If not, see
# <https://www.gnu.org/licenses/>.
"""
This module provides the PacketMatch class which supports
matching against standard packet attributes
"""
from ipaddress import IPv4Network, IPv6Network, IPv4Address, IPv6Address
from typing import Iterable, List, Optional, Union
from ..exceptions import IptablesError, IptablesParsingError
from ..deps import get_logger
from .match import Criterion, Match, MatchParser
from .util import BooleanCriterion, GenericCriterion
_logger = get_logger('linuxnet.iptables.matches.packetmatch')
[docs]class OutputInterfaceCriterion(GenericCriterion):
"""Compare with the output interface.
The comparison value is an interface name (a string).
"""
def __init__(self, match: Match):
super().__init__(match, '-o')
class AddressCriterion(Criterion):
"""Compare against an IPv4/IPv6 address.
The comparison value is an :class:`IPv4Network` or an :class:`IPv6Network`
"""
def __init__(self, match: Match, iptables_option: str,
ipv6: Optional[bool]):
"""
:param match: the owner :class:`Match`
:param iptables_option: the **iptables(8)** option to use when
generating the iptables arguments
:param ipv6: if ``True``, assume IPv6 addresses, otherwise assume
IPv4 addresses
"""
super().__init__(match)
self.__option = iptables_option
self.__ipv6 = ipv6
self.__value = None
def get_iptables_option(self) -> str:
"""Returns the **iptables(8)** option
"""
return self.__option
def get_value(self) -> Union[IPv4Network, IPv6Network, None]:
"""Returns the criterion value
"""
return self.__value
def equals(self, # pylint: disable=arguments-differ
value: Union[IPv4Network, IPv6Network,
IPv4Address, IPv6Address, str]) -> Match:
"""Compare with the specified value, which can be specified as
an :class:`IPv4Network`, an :class:`IPv6Network`,
an :class:`IPv4Address`, an :class:`IPv6Address`, or as
a string. Internally the value is always stored as
an :class:`IPv4Network`, or an :class:`IPv6Network`.
"""
if isinstance(value, str):
try:
if ':' in value:
value = IPv6Network(value)
else:
value = IPv4Network(value)
except Exception as ex:
raise IptablesError(
f"{value} cannot be parsed as IPv4/IPv6 address") from ex
elif isinstance(value, IPv4Address):
value = IPv4Network(value)
elif isinstance(value, IPv6Address):
value = IPv6Network(value)
if isinstance(value, IPv4Network):
if self.__ipv6 is not None:
if self.__ipv6:
raise IptablesError(
f"criterion expects IPv6 address (got {value}")
else:
self.__ipv6 = False
elif isinstance(value, IPv6Network):
if self.__ipv6 is not None:
if not self.__ipv6:
raise IptablesError(
f"criterion expects IPv4 address (got {value}")
else:
self.__ipv6 = True
else:
raise IptablesError(
f"criterion expects IPv4/IPv6 address (got {value}")
self.__value = value
return self._set_polarity(True)
def _crit_iptables_args(self) -> List[str]:
"""Convert to **iptables(8)** arguments
"""
return [self.__option, str(self.__value)]
[docs]class SourceAddressCriterion(AddressCriterion):
"""Compare with the source address.
The comparison value is an :class:`IPv4Network` or an :class:`IPv6Network`
"""
def __init__(self, match: Match, *, ipv6: Optional[bool] =None):
"""
:param match: the owner :class:`PacketMatch`
:param ipv6: if ``True``, assume IPv6 addresses, otherwise assume
IPv4 addresses
"""
super().__init__(match, '-s', ipv6)
[docs]class DestAddressCriterion(AddressCriterion):
"""Compare with the destination address.
The comparison value is an :class:`IPv4Network` or an :class:`IPv6Network`
"""
def __init__(self, match: Match, *, ipv6: bool):
"""
:param match: the owner :class:`PacketMatch`
:param ipv6: if ``True``, assume IPv6 addresses, otherwise assume
IPv4 addresses
"""
super().__init__(match, '-d', ipv6)
[docs]class ProtocolCriterion(Criterion):
"""Compare with the protocol.
The comparison value is a protocol name (a string); it may also
be a number in string form if there is no mapping of that number
to a protocol name in ``/etc/protocols``.
"""
# Key: protocol number
# Value: protocol name
__proto_map = {}
__proto_map_ready = False
def __init__(self, match: Match):
super().__init__(match)
self.__proto_name = None
@classmethod
def __getprotobynumber(cls, protonum: int) -> Optional[str]:
"""Returns the protocol name for the specified protocol
"""
if cls.__proto_map_ready:
return cls.__proto_map.get(protonum)
try:
with open("/etc/protocols", encoding="utf-8") as protofile:
for line in protofile:
pos = line.find('#')
if pos < 0:
line = line.strip()
else:
line = line[:pos].strip()
if not line:
continue
fields = line.split()
if len(fields) < 2:
continue
try:
cls.__proto_map[int(fields[1])] = fields[0]
except ValueError:
pass
except Exception: # pylint: disable=broad-except
_logger.exception("unable to process /etc/protocols")
finally:
cls.__proto_map_ready = True
return cls.__proto_map.get(protonum)
[docs] def get_value(self) -> str:
"""Return protocol name
"""
return self.__proto_name
[docs] def equals(self, proto) -> Match: # pylint: disable=arguments-differ
"""Compare with the specified protocol.
:param proto: the parameter can a string or an integer; if it
is an integer, it will be converted to the corresponding
protocol name, if possible, otherwise it will be used as-is
in string form (i.e. 199 will be converted to "199")
"""
if isinstance(proto, str):
# Check if is a number in string form
try:
self.__proto_name = self.__getprotobynumber(int(proto)) or proto
except ValueError:
self.__proto_name = proto
elif isinstance(proto, int):
self.__proto_name = self.__getprotobynumber(int(proto)) or \
str(proto)
else:
raise IptablesError(f'unexpected argument type: {proto}')
return self._set_polarity(True)
def _crit_iptables_args(self) -> List[str]:
"""Returns **iptables(8)** arguments for the specified protocol
"""
return ['-p', self.__proto_name]
[docs]class FragmentCriterion(BooleanCriterion):
"""Check if a packet is a fragment.
"""
def __init__(self, match: Match):
super().__init__(match, '-f')
[docs]class PacketMatch(Match):
"""This class provides matching against the following attributes of
a packet:
* input interface
* output interface
* protocol
* source address
* destination address
* fragment bit (IPv4-only)
"""
def __init__(self, *, ipv6=False):
"""
:param ipv6: optional boolean to indicate IPv6 address matching when
``True``; the default is IPv4
"""
self._ipv6 = ipv6
self.__iif_crit = None
self.__oif_crit = None
self.__proto_crit = None
self.__frag_crit = None
self.__source_crit = None
self.__dest_crit = None
def __eq__(self, other):
if not isinstance(other, type(self)):
return False
if self._ipv6 != other._ipv6:
raise IptablesError(
f'Protocol mismatch: {self._ipv6=}, {other._ipv6=}')
return super().__eq__(other)
[docs] @staticmethod
def get_match_name() -> Optional[str]:
"""Returns the **iptables(8)** match extension name. In the case of
the standard packet match, there is no name.
"""
return None
[docs] def get_criteria(self) -> Iterable['Criterion']:
"""Returns the packet match criteria: input-interface, output-interface,
protocol, fragmented, source, destination.
"""
return (
self.__iif_crit,
self.__oif_crit,
self.__proto_crit,
self.__frag_crit,
self.__source_crit,
self.__dest_crit,
)
[docs] def protocol(self) -> ProtocolCriterion:
"""Match against the protocol
"""
if self.__proto_crit is None:
self.__proto_crit = ProtocolCriterion(self)
return self.__proto_crit
[docs] def output_interface(self) -> OutputInterfaceCriterion:
"""Match against the output interface
"""
if self.__oif_crit is None:
self.__oif_crit = OutputInterfaceCriterion(self)
return self.__oif_crit
[docs] def source_address(self) -> SourceAddressCriterion:
"""Match against the source address
"""
if self.__source_crit is None:
self.__source_crit = SourceAddressCriterion(self, ipv6=self._ipv6)
return self.__source_crit
[docs] def dest_address(self) -> DestAddressCriterion:
"""Match against the destination address
"""
if self.__dest_crit is None:
self.__dest_crit = DestAddressCriterion(self, ipv6=self._ipv6)
return self.__dest_crit
[docs] def fragment(self) -> FragmentCriterion:
"""Match if packet has (or has not) the fragment bit set
"""
if self._ipv6:
raise IptablesError(
'PacketMatch has no fragment criterion for IPv6')
if self.__frag_crit is None:
self.__frag_crit = FragmentCriterion(self)
return self.__frag_crit
@classmethod
def _parse(cls, field_iter, *, ipv6: bool) -> Optional['PacketMatch']:
"""Parse the following fields, which will be returned in-order
from field_iter:
protocol, options, input-interface, output-interface,
source, destination
Returns a :class:`PacketMatch` object if any criteria for the above
fields are defined, otherwise ``None``
:param field_iter: an iterator that returns the fields of an
**iptables(8)** output line starting with the protocol field
:meta private:
"""
packet_match = PacketMatch(ipv6=ipv6)
proto = next(field_iter)
# The absence of a specific protocol is indicated via 'all'
# (iptables-1.8.5), or '0' (iptables-nft-1.8.10)
if proto not in ('all', '0'):
is_equal, proto = MatchParser.parse_value(proto)
packet_match.protocol().compare(is_equal, proto)
# We currently expect the 'option' field to be absent for IPv6
if not ipv6:
opt = next(field_iter)
if opt == '--':
pass
elif opt == '-f':
packet_match.fragment().equals()
elif opt == '!f':
packet_match.fragment().not_equals()
else:
raise IptablesParsingError(f'cannot parse option: {opt}')
iif = next(field_iter)
if iif != '*':
is_equal, interface_name = MatchParser.parse_value(iif)
packet_match.input_interface().compare(is_equal, interface_name)
oif = next(field_iter)
if oif != '*':
is_equal, interface_name = MatchParser.parse_value(oif)
packet_match.output_interface().compare(is_equal, interface_name)
source = next(field_iter)
if ipv6:
anyipstr = '::/0'
klass = IPv6Network
else:
anyipstr = '0.0.0.0/0'
klass = IPv4Network
if source != anyipstr:
is_equal, addr = MatchParser.parse_value(source)
packet_match.source_address().compare(is_equal, klass(addr))
dest = next(field_iter)
if dest != anyipstr:
is_equal, addr = MatchParser.parse_value(dest)
packet_match.dest_address().compare(is_equal, klass(addr))
return packet_match if packet_match.has_criteria() else None