Source code for linuxnet.iptables.matches.markmatch

# Copyright (c) 2021, 2022, 2023, Panagiotis Tsirigotis

# This file is part of linuxnet-iptables.
#
# linuxnet-iptables is free software: you can redistribute it and/or
# modify it under the terms of version 3 of the GNU Affero General Public
# License as published by the Free Software Foundation.
#
# linuxnet-iptables is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
# License for more details.
#
# You should have received a copy of the GNU Affero General
# Public License along with linuxnet-iptables. If not, see
# <https://www.gnu.org/licenses/>.

"""
This module supports matching against the FW mark
"""

from typing import Iterable, List, Optional, Tuple

from ..exceptions import IptablesError
from ..deps import get_logger

from .match import Match, Criterion, MatchParser

_logger = get_logger('linuxnet.iptables.matches.markmatch')


[docs]class MarkCriterion(Criterion): """A criterion for a mark, used by :class:`MarkMatch` and :class:`ConnmarkMatch` since the **iptables(8)** option used by the mark/connmark modules is the same. The comparison value is a tuple consisting of an (integer) mark value and an integer mask value (``None`` in case of no mask). """ def __init__(self, match: Match): super().__init__(match) self.__mark = None self.__mask = None
[docs] def get_value(self) -> Tuple[int, Optional[int]]: """Returns the value that the criterion is comparing against. :rtype: tuple of (int, int|None) """ return (self.__mark, self.__mask)
[docs] def equals(self, # pylint: disable=arguments-differ mark: int, mask: Optional[int] =None) -> Match: """Check for equality against ``mark`` and optionally ``mask`` :param mark: the mark value :param mask: the mask value """ if mark is None: raise IptablesError('mark is None') self.__mark = mark self.__mask = mask return self._set_polarity(True)
def __mark2str(self): """Convert the mark/mask to a string; both values will be in hex """ markstr = f'{self.__mark:#x}' if self.__mask is not None: markstr += f'/{self.__mask:#x}' return markstr def _crit_iptables_args(self) -> List[str]: """Returns **iptables(8)** arguments for the specified mark """ return ['--mark', self.__mark2str()]
[docs]class MarkMatch(Match): """Match against the fwmark """ def __init__(self): self.__mark_crit = None
[docs] @staticmethod def get_match_name() -> str: """Returns the **iptables(8)** match extension name """ return 'mark'
[docs] def get_criteria(self) -> Iterable['Criterion']: """Returns the mark match criteria (only one). """ return (self.__mark_crit,)
[docs] def mark(self) -> MarkCriterion: """Match against the packet's fwmark. """ if self.__mark_crit is None: self.__mark_crit = MarkCriterion(self) return self.__mark_crit
@classmethod def parse(cls, parser: MatchParser) -> Match: """Parse the mark criteria:: mark match [!]<num>[/<num>] The 'mark' field has already been consumed. :meta private: """ parser.skip_field('match') is_equal, val = parser.parse_next_value() mask = None if '/' in val: valstr, maskstr = val.split('/', 1) value = int(valstr, 16) mask = int(maskstr, 16) else: value = int(val, 16) return MarkMatch().mark().compare(is_equal, value, mask)
MatchParser.register_match('mark', MarkMatch)