# Copyright (c) 2021, 2022, 2023, Panagiotis Tsirigotis
# This file is part of linuxnet-iptables.
#
# linuxnet-iptables is free software: you can redistribute it and/or
# modify it under the terms of version 3 of the GNU Affero General Public
# License as published by the Free Software Foundation.
#
# linuxnet-iptables is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
# License for more details.
#
# You should have received a copy of the GNU Affero General
# Public License along with linuxnet-iptables. If not, see
# <https://www.gnu.org/licenses/>.
"""
This module provides the PacketMatch class which supports
matching against standard packet attributes
"""
from ipaddress import IPv4Network
from typing import List, Optional
from ..exceptions import IptablesParsingError
from ..deps import get_logger
from .match import Match, MatchParser
from .util import BooleanCriterion, GenericCriterion
_logger = get_logger('linuxnet.iptables.matches.packetmatch')
[docs]class OutputInterfaceCriterion(GenericCriterion):
"""Compare with the output interface.
The comparison value is an interface name (a string).
"""
def __init__(self, match: Match):
super().__init__(match, '-o')
[docs]class SourceAddressCriterion(GenericCriterion):
"""Compare with the source address.
The comparison value is an :class:`IPv4Network`.
"""
def __init__(self, match: Match):
super().__init__(match, '-s')
[docs]class DestAddressCriterion(GenericCriterion):
"""Compare with the destination address.
The comparison value is an :class:`IPv4Network`.
"""
def __init__(self, match: Match):
super().__init__(match, '-d')
[docs]class ProtocolCriterion(GenericCriterion):
"""Compare with the protocol.
The comparison value is a protocol name (a string).
"""
def __init__(self, match: Match):
super().__init__(match, '-p')
[docs]class FragmentCriterion(BooleanCriterion):
"""Check if a packet is a fragment.
"""
def __init__(self, match: Match):
super().__init__(match, '-f')
[docs]class PacketMatch(Match):
"""This class provides matching against the following attributes of
a packet:
* input interface
* output interface
* protocol
* source address
* destination address
* fragment bit
"""
def __init__(self):
self.__iif_crit = None
self.__oif_crit = None
self.__proto_crit = None
self.__frag_crit = None
self.__source_crit = None
self.__dest_crit = None
def __eq__(self, other):
return (
isinstance(other, PacketMatch) and
self.input_interface() == other.input_interface() and
self.output_interface() == other.output_interface() and
self.protocol() == other.protocol() and
self.fragment() == other.fragment() and
self.source_address() == other.source_address() and
self.dest_address() == other.dest_address()
)
[docs] def protocol(self) -> ProtocolCriterion:
"""Match against the protocol
"""
if self.__proto_crit is None:
self.__proto_crit = ProtocolCriterion(self)
return self.__proto_crit
[docs] def output_interface(self) -> OutputInterfaceCriterion:
"""Match against the output interface
"""
if self.__oif_crit is None:
self.__oif_crit = OutputInterfaceCriterion(self)
return self.__oif_crit
[docs] def source_address(self) -> SourceAddressCriterion:
"""Match against the source address
"""
if self.__source_crit is None:
self.__source_crit = SourceAddressCriterion(self)
return self.__source_crit
[docs] def dest_address(self) -> DestAddressCriterion:
"""Match against the destination address
"""
if self.__dest_crit is None:
self.__dest_crit = DestAddressCriterion(self)
return self.__dest_crit
[docs] def fragment(self) -> FragmentCriterion:
"""Match if packet has (or has not) the fragment bit set
"""
if self.__frag_crit is None:
self.__frag_crit = FragmentCriterion(self)
return self.__frag_crit
[docs] def to_iptables_args(self) -> List[str]:
"""Generate an **iptables(8)** arguments list for the set criteria
"""
criteria = (
self.__iif_crit,
self.__oif_crit,
self.__proto_crit,
self.__frag_crit,
self.__source_crit,
self.__dest_crit,
)
return self.build_iptables_args(None, criteria)
@classmethod
def _parse(cls, field_iter) -> Optional['PacketMatch']:
"""Parse the following fields, which will be returned in-order
from field_iter:
protocol, options, input-interface, output-interface,
source, destination
Returns a :class:`PacketMatch` object if any criteria for the above
fields are defined, otherwise ``None``
:param field_iter: an iterator that returns the fields of an
**iptables(8)** output line starting with the protocol field
:meta private:
"""
packet_match = PacketMatch()
proto = next(field_iter)
if proto != 'all':
is_equal, proto = MatchParser.parse_value(proto)
packet_match.protocol().compare(is_equal, proto)
opt = next(field_iter)
if opt == '--':
pass
elif opt == '-f':
packet_match.fragment().equals()
elif opt == '!f':
packet_match.fragment().not_equals()
else:
raise IptablesParsingError(f'cannot parse option: {opt}')
iif = next(field_iter)
if iif != '*':
is_equal, interface_name = MatchParser.parse_value(iif)
packet_match.input_interface().compare(is_equal, interface_name)
oif = next(field_iter)
if oif != '*':
is_equal, interface_name = MatchParser.parse_value(oif)
packet_match.output_interface().compare(is_equal, interface_name)
source = next(field_iter)
if source != '0.0.0.0/0':
is_equal, addr = MatchParser.parse_value(source)
packet_match.source_address().compare(is_equal, IPv4Network(addr))
dest = next(field_iter)
if dest != '0.0.0.0/0':
is_equal, addr = MatchParser.parse_value(dest)
packet_match.dest_address().compare(is_equal, IPv4Network(addr))
return packet_match if packet_match.has_criteria() else None