Source code for linuxnet.iptables.targets.target

# Copyright (c) 2021, 2022, 2023, 2024, 2025, Panagiotis Tsirigotis

# This file is part of linuxnet-iptables.
#
# linuxnet-iptables is free software: you can redistribute it and/or
# modify it under the terms of version 3 of the GNU Affero General Public
# License as published by the Free Software Foundation.
#
# linuxnet-iptables is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
# License for more details.
#
# You should have received a copy of the GNU Affero General
# Public License along with linuxnet-iptables. If not, see
# <https://www.gnu.org/licenses/>.

"""
This module contains the Target base class for implementing subclasses
for iptables target extensions.
"""

import traceback

from typing import List, Optional, Tuple, Union

from ..deps import get_logger
from ..exceptions import IptablesError
from ..parsing import RuleFieldIterator

_logger = get_logger("linuxnet.iptables.targets.target")

[docs]class Target: """Parent class for all targets. """ def __init__(self, target_name: str, terminates: bool): """ :param target_name: the name of the target :param terminates: if ``True``, this target terminates processing """ self.__target_name = target_name self.__terminates = terminates def __str__(self): return f'Target({self.__target_name})'
[docs] def is_terminating(self) -> bool: """Returns ``True`` if this is a terminating target """ return self.__terminates
[docs] def get_target_name(self) -> str: """Returns the target name """ return self.__target_name
def __eq__(self, other): """Target comparison is only by name. This implies that we do not distinguish between targets with the same name but different options. """ return (isinstance(other, Target) and self.__target_name == other.get_target_name()) def __ne__(self, other): return not self.__eq__(other)
[docs] def to_iptables_args(self) -> List[str]: """Returns a list of **iptables(8)** arguments """ if not self.__target_name: return [] return [self.__target_name]
class TargetNone(Target): # pylint: disable=too-few-public-methods """A target that is not there. This class is intended to be used for comparison purposes. """ def __init__(self): super().__init__("", terminates=False) def to_iptables_args(self) -> List[str]: """Returns a list of **iptables(8)** arguments """ return []
[docs]class Targets: """This class provides a namespace for all target classes """ #: Special ``ACCEPT`` target ACCEPT = Target('ACCEPT', terminates=True) #: Special ``DROP`` target DROP = Target('DROP', terminates=True) #: Special ``QUEUE`` target QUEUE = Target('QUEUE', terminates=True) #: Special ``RETURN`` target RETURN = Target('RETURN', terminates=True) __SPECIAL_TARGET_MAP = { 'ACCEPT' : ACCEPT, 'DROP' : DROP, 'QUEUE' : QUEUE, 'RETURN' : RETURN, }
[docs] @classmethod def get_special(cls, target_name: str) -> Optional[Target]: """Returns the :class:`Target` object for the special target identified by ``target_name``. The special targets are: - ``ACCEPT`` - ``DROP`` - ``RETURN`` - ``QUEUE`` """ return cls.__SPECIAL_TARGET_MAP.get(target_name)
[docs] @classmethod def from_policy(cls, policy: str) -> Target: """Return the :class:`Target` object for one the special targets that can be used as a policy target. These include: - ``ACCEPT`` - ``DROP`` - ``QUEUE`` """ if policy == 'ACCEPT': return cls.ACCEPT if policy == 'DROP': return cls.DROP if policy == 'QUEUE': return cls.QUEUE raise IptablesError(f"No target for policy '{policy}'")
class UnparsedTarget(Target): # pylint: disable=too-few-public-methods """We use this class for targets we cannot parse. This allows us to process **iptables(8)** output without triggering parsing errors. An error will be triggered lazily if/when an object of this class is used to generate an **iptables(8)** command line. """ def __init__(self, target_name: str, field_iter): """ :param target_name: the target name :param field_iter: iterator returning fields of a line """ super().__init__(target_name, terminates=False) self.__options = [] for field in field_iter: if field == target_name: self.__options = list(field_iter) break field_iter.store_field(field) def get_target_options(self) -> List[str]: """Returns target options """ return self.__options def is_terminating(self) -> bool: """Raises an :exc:`IptablesError` since we don't know if this target is terminating or not. """ raise IptablesError( f"unknown if unparsed target {self.get_target_name()} " "is terminating or not") def to_iptables_args(self) -> List[str]: """Since this is an unparsed target, it cannot be expressed in **iptables(8)** arguments. """ raise IptablesError(f'unable to parse options of {self}')
[docs]class ChainTarget(Target): """This class handles a target that is a chain """ def __init__(self, *, chain=None, real_chain_name: Optional[str] =None): """ Either ``chain`` or ``real_chain_name`` must be present (and not ``None``). If both are present, the chain's real name must be equal to ``real_chain_name``. The target name is set to the real chain name. :param chain: a :class:`Chain` object :param real_chain_name: a string """ if real_chain_name is not None: if chain is not None and chain.get_real_name() != real_chain_name: raise IptablesError( f"chain name '{chain.get_real_name()}' does not match " f"provided name '{real_chain_name}'") target_name = real_chain_name else: if chain is None: raise IptablesError( 'attempt to create ChainTarget without providing ' 'chain object or chain name') target_name = chain.get_real_name() super().__init__(target_name, terminates=False) self.__chain = chain
[docs] def get_chain(self) -> Optional['Chain']: """Returns the :class:`Chain` object """ return self.__chain
[docs] def resolve_chain(self, pft, log_failure=True) -> Optional['Chain']: """Resolve the target name to the :class:`Chain` object, and return that object. :param pft: the :class:`IptablesPacketFilterTable` object that is expected to contain the chain :param log_failure: if ``True`` and resolution fails, log a warning :rtype: a :class:`Chain` object or ``None`` """ if self.__chain is None: real_chain_name = self.get_target_name() self.__chain = pft.get_chain_by_rcn(real_chain_name) if self.__chain is None and log_failure: _logger.warning("%s: unable to resolve chain name %s", self.resolve_chain.__qualname__, real_chain_name) _logger.warning("Call stack:\n%s", ''.join(traceback.extract_stack().format()[:-1])) return self.__chain
[docs]class TargetParser: """This class handles target parsing """ # Key: string # Value: tuple of (Target subclass, start_field, prefix_match) _target_class_map = {} def __init__(self, target_name: Optional[str], field_iter: RuleFieldIterator, *, ipv6: bool): """ :param target_name: the target name :param field_iter: a :class:`RuleFieldIterator` :param ipv6: if ``True``, then we are parsing the output of **ip6tables(8)** """ self.__target_name = target_name self.__iter = field_iter self.__ipv6 = ipv6
[docs] def is_ipv6_output(self): """Returns ``True`` if parsing the output of **ip6tables(8)** output """ return self.__ipv6
[docs] def get_field_iter(self) -> RuleFieldIterator: """Returns the :class:`RuleFieldIterator` instance that iterates over the fields of the rule. """ return self.__iter
[docs] @classmethod def register_target(cls, target_name:str, target_klass, start_field: Optional[Union[str, Tuple]] =None, prefix_match: Optional[bool] =False): """Register a class to handle parsing for a target. :param target_name: this is the target name that appears in the ``iptables -L`` output. :param target_klass: a subclass of :class:`Target` :param start_field: the field in the **iptables(8)** output that is the beginning of the target fields; if present, the iterator passed to :meth:`parse` of the ``target_class`` will be forwarded past the field that matches ``start_field``; ``start_field`` may also be specified as tuple of field names :param prefix_match: if ``True``, ``start_field`` is the prefix of the field that is the beginning of the target fields """ cls._target_class_map[target_name] = (target_klass, start_field, prefix_match)
[docs] def parse_target(self, is_goto: bool) -> Optional[Target]: """Parses the specified target name and options. Returns a (subclass of) :class:`Target`, or ``None`` if there is no target name. """ target_name = self.__target_name field_iter = self.__iter # # 1. No target name # if target_name is None: field_iter.store_rest() return None # # 2. Potential chain target: either goto, or target name is not # all upper-case # if is_goto: field_iter.store_rest() return ChainTarget(real_chain_name=target_name) if not target_name.isupper(): field_iter.store_rest() return ChainTarget(real_chain_name=target_name) # # 3. Special target # target = Targets.get_special(target_name) if target is not None: field_iter.store_rest() return target # # 4. Target extension # tupval = self._target_class_map.get(target_name) if tupval is not None: klass, start_field, prefix_match = tupval # NB: will only advance if start_field is not None field_iter.forward(start_field, prefix_match=prefix_match) return klass.parse(self) # # 5. Unparsed # return UnparsedTarget(target_name, field_iter)