Source code for linuxnet.iptables.targets.connmarktarget

# Copyright (c) 2021, 2022, 2023, 2024, Panagiotis Tsirigotis

# This file is part of linuxnet-iptables.
#
# linuxnet-iptables is free software: you can redistribute it and/or
# modify it under the terms of version 3 of the GNU Affero General Public
# License as published by the Free Software Foundation.
#
# linuxnet-iptables is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
# License for more details.
#
# You should have received a copy of the GNU Affero General
# Public License along with linuxnet-iptables. If not, see
# <https://www.gnu.org/licenses/>.

"""
This module provides the ConnmarkTarget class which provides access to
the iptables CONNMARK target.
"""

from typing import List, Optional

from ..deps import get_logger
from ..exceptions import IptablesParsingError, IptablesError

from .target import Target, TargetParser
from .marktarget import _MarkOperations

_logger = get_logger("linuxnet.iptables.target.connmarktarget")


[docs]class ConnmarkTarget(_MarkOperations, Target): """This class provides access to the ``CONNMARK`` target """ def __init__(self, *, mark: Optional[int] =None, restore_mark=False, save_mark=False, nfmask: Optional[int] =None, ctmask: Optional[int] =None): """ :param mark: value used to set the ctmark value (associated with a connection) :param restore_mark: if ``True``, copy the connection mark to the packet mark :param save_mark: if ``True``, copy the packet mark to the connection mark :param nfmask: applies to the save/restore operation (see **iptables(8)**); defaults to ``0xffffffff`` if not present :param ctmask: applies to the save/restore operation (see **iptables(8)**); defaults to ``0xffffffff`` if not present """ super().__init__('CONNMARK', terminates=False) self.__save_mark = False self.__restore_mark = False self.__nfmask = self.NOMASK self.__ctmask = self.NOMASK if int(mark is not None) + int(restore_mark) + int(save_mark) > 1: raise IptablesError('can either set, save, or restore mark') if mark is not None: self.set_mark(mark) elif restore_mark or save_mark: self.__restore_mark = restore_mark self.__save_mark = save_mark if nfmask is not None: self.__nfmask = nfmask if ctmask is not None: self.__ctmask = ctmask def _assert_no_action(self) -> None: """Raise an IptablesError is an action has already been specified. """ if self.__restore_mark: raise IptablesError('restore action present') if self.__save_mark: raise IptablesError('save action present') super()._assert_no_action()
[docs] def is_restoring_mark(self) -> bool: """Returns ``True`` if this target object is set to restore the mark, i.e. copy the connection mark to the packet mark """ return self.__restore_mark
[docs] def is_saving_mark(self) -> bool: """Returns ``True`` if this target object is set to save the mark, i.e. copy the packet mark to the connection mark """ return self.__save_mark
[docs] def restore_mark(self, *, nfmask: Optional[int] =None, ctmask: Optional[int] =None) -> None: """Sets this target object to restore the mark. :param nfmask: defaults to ``0xffffffff`` if not present :param ctmask: defaults to ``0xffffffff`` if not present """ self._assert_no_action() self.__restore_mark = True self.__nfmask = nfmask if nfmask is not None else self.NOMASK self.__ctmask = ctmask if ctmask is not None else self.NOMASK
[docs] def save_mark(self, *, nfmask: Optional[int] =None, ctmask: Optional[int] =None) -> None: """Sets this target object to save the mark. :param nfmask: defaults to ``0xffffffff`` if not present :param ctmask: defaults to ``0xffffffff`` if not present """ self._assert_no_action() self.__save_mark = True self.__nfmask = nfmask if nfmask is not None else self.NOMASK self.__ctmask = ctmask if ctmask is not None else self.NOMASK
[docs] def get_nfmask(self) -> int: """Returns the nfmask """ return self.__nfmask
[docs] def get_ctmask(self) -> int: """Returns the ctmask """ return self.__ctmask
[docs] def to_iptables_args(self) -> List[str]: """Returns a list of **iptables(8)** arguments """ retval = super().to_iptables_args() if self.__restore_mark or self.__save_mark: if self.__restore_mark: retval += ['--restore-mark'] else: retval += ['--save-mark'] if self.__nfmask != self.NOMASK: retval += ['--nfmask', f'0x{self.__nfmask:x}'] if self.__ctmask != self.NOMASK: retval += ['--ctmask', f'0x{self.__ctmask:x}'] return retval return self.mark_iptables_args(retval)
@classmethod def _parse_masks(cls, field_iter: 'RuleFieldIterator'): """Parse the nfmask/ctmask and return a kwargs dictionary with possible keys 'nfmask', 'ctmask' and the corresponding values. """ kwargs = {} for val in field_iter: if val == 'nfmask': maskstr = next(field_iter) if maskstr[0] == '~': maskstr = maskstr[1:] kwargs['nfmask'] = int(maskstr, 16) elif val == 'ctmask': maskstr = next(field_iter) if maskstr[0] == '~': maskstr = maskstr[1:] kwargs['ctmask'] = int(maskstr, 16) else: raise IptablesParsingError(f'unexpected CONNMARK field: {val}') return kwargs @classmethod def parse(cls, parser: TargetParser) -> Target: """Parse the CONNMARK target options:: CONNMARK restore ctmask 0x1f nfmask ~0xfffff CONNMARK save nfmask 0xfffff ctmask ~0x1f CONNMARK xset 0x20/0xff :meta private: """ action = None field_iter = parser.get_field_iter() target = ConnmarkTarget() try: action = next(field_iter) if target.parse_op(action, field_iter): pass elif action == 'restore': restore_kwargs = cls._parse_masks(field_iter) target.restore_mark(**restore_kwargs) elif action == 'save': save_kwargs = cls._parse_masks(field_iter) target.save_mark(**save_kwargs) else: raise IptablesParsingError(f'unknown CONNMARK action: {action}') except StopIteration as stopiter: raise IptablesParsingError( 'insufficient CONNMARK options') from stopiter return target
TargetParser.register_target('CONNMARK', ConnmarkTarget, 'CONNMARK')