Source code for linuxnet.iptables.targets.marktarget

# Copyright (c) 2021, 2022, 2023, 2024, Panagiotis Tsirigotis

# This file is part of linuxnet-iptables.
#
# linuxnet-iptables is free software: you can redistribute it and/or
# modify it under the terms of version 3 of the GNU Affero General Public
# License as published by the Free Software Foundation.
#
# linuxnet-iptables is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
# License for more details.
#
# You should have received a copy of the GNU Affero General
# Public License along with linuxnet-iptables. If not, see
# <https://www.gnu.org/licenses/>.

"""
This module provides the MarkTarget class which provides access to
the iptables MARK target.
"""

from typing import List, Optional

from ..deps import get_logger
from ..exceptions import IptablesParsingError, IptablesError

from .target import Target, TargetParser

_logger = get_logger("linuxnet.iptables.target.marktarget")


class _MarkOperations:
    """Mixin class to provide mark-related operations
    """

    #
    # Mark operations
    #
    SET = 1
    XSET = 2
    AND = 3
    OR = 4
    XOR = 5

    NOMASK = 0xffffffff

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.__value = None
        self.__mask = None
        self.__op = None

    def get_mark(self) -> Optional[int]:
        """Returns the mark value set by this target
        """
        return self.__value

    def get_mask(self) -> Optional[int]:
        """Returns the mask used by this target
        """
        return self.__mask

    def get_op(self) -> Optional[int]:
        """Returns the operation

        :rtype: an integer with one of the following values:
                `SET`, `XSET`, `AND`, `OR`, `XOR`,
                or ``None``
        """
        return self.__op

    def set_mark(self, value: int, mask: Optional[int] =None) -> Target:
        """Perform the operation::

                mark = (mark AND NOT(mask)) OR value

        If ``mask`` is not present, the operation becomes ``mark = value``.
        """
        self._assert_no_action()
        self.__value = value
        self.__mask = mask if mask is not None else self.NOMASK
        self.__op = self.SET
        return self

    def set_xmark(self, value: int, mask: Optional[int] =None) -> Target:
        """Perform the operation::

                mark = (mark AND NOT(mask)) XOR value

        If ``mask`` is not present, the operation becomes ``mark = value``.
        """
        self._assert_no_action()
        self.__value = value
        self.__mask = mask if mask is not None else self.NOMASK
        self.__op = self.XSET
        return self

    def and_mark(self, mask: int) -> Target:
        """Clear the bits identified by mask
        """
        self._assert_no_action()
        self.__value = None
        self.__mask = mask
        self.__op = self.AND
        return self

    def or_mark(self, mask: int) -> Target:
        """Set the bits identified by mask
        """
        self._assert_no_action()
        self.__value = None
        self.__mask = mask
        self.__op = self.OR
        return self

    def xor_mark(self, mask: int) -> Target:
        """Xor the bits identified by mask
        """
        self._assert_no_action()
        self.__value = None
        self.__mask = mask
        self.__op = self.XOR
        return self

    def _assert_no_action(self) -> None:
        """Raise an IptablesError if an op has already been specified.
        """
        if self.__op is not None:
            raise IptablesError(f'mark operation already set: {self.__op}')

    def parse_op(self, val: str, field_iter) -> bool:
        """Parse the operation identified by 'val'

        :meta private:
        """
        if val == 'set':
            self.set_mark(int(field_iter.next_value(val), 16))
        elif val == 'xset':
            field = field_iter.next_value(val)
            valstr, maskstr = field.split('/', 1)
            self.set_xmark(int(valstr, 16), int(maskstr, 16))
        elif val == 'or':
            self.or_mark(int(field_iter.next_value(val), 16))
        elif val == 'xor':
            self.xor_mark(int(field_iter.next_value(val), 16))
        elif val == 'and':
            self.and_mark(int(field_iter.next_value(val), 16))
        else:
            return False
        return True

    def mark_iptables_args(self, args: List[str]) -> List[str]:
        """Converts the op/value/mask to a list of **iptables(8)** arguments
        for the MARK target

        :meta private:
        """
        if self.__value is None and self.__mask is None:
            raise IptablesError("no mark operation specified")
        if self.__op in (self.SET, self.XSET):
            if self.__op == self.SET:
                args.append('--set-mark')
            else:
                args.append('--set-xmark')
            if self.__mask == self.NOMASK:
                args.append(f'0x{self.__value:x}')
            else:
                args.append(f'0x{self.__value:x}/0x{self.__mask:x}')
        elif self.__op == self.AND:
            args.extend(['--and-mark', f'0x{self.__mask:x}'])
        elif self.__op == self.OR:
            args.extend(['--or-mark', f'0x{self.__mask:x}'])
        elif self.__op == self.XOR:
            args.extend(['--xor-mark', f'0x{self.__mask:x}'])
        else:
            raise IptablesError(f"unexpected mark operation: {self.__op}")
        return args


[docs]class MarkTarget(_MarkOperations, Target): """This class provides access to the ``MARK`` target """ def __init__(self, mark: Optional[int] =None): """ :param mark: value used to set the mark value in the packet """ super().__init__('MARK', terminates=False) if mark is not None: self.set_mark(mark)
[docs] def to_iptables_args(self) -> List[str]: """Returns a list of **iptables(8)** arguments """ retval = super().to_iptables_args() return self.mark_iptables_args(retval)
@classmethod def parse(cls, parser: TargetParser) -> Target: """Parse the MARK target options :meta private: """ field_iter = parser.get_field_iter() target = MarkTarget() for val in field_iter: if not target.parse_op(val, field_iter): raise IptablesParsingError(f'unknown MARK argument: {val}') return target
TargetParser.register_target('MARK', MarkTarget, 'MARK')