Source code for linuxnet.iptables.matches.ownermatch

# Copyright (c) 2023, Panagiotis Tsirigotis

# This file is part of linuxnet-iptables.
#
# linuxnet-iptables is free software: you can redistribute it and/or
# modify it under the terms of version 3 of the GNU Affero General Public
# License as published by the Free Software Foundation.
#
# linuxnet-iptables is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
# License for more details.
#
# You should have received a copy of the GNU Affero General
# Public License along with linuxnet-iptables. If not, see
# <https://www.gnu.org/licenses/>.

"""
This module provides matching against UID/GID
"""

from typing import List, Optional, Tuple

from ..exceptions import IptablesParsingError
from ..deps import get_logger

from .match import Match, Criterion, MatchParser
from .util import BooleanCriterion

_logger = get_logger('linuxnet.iptables.matches.limitmatch')


class _NumberRangeCriterion(Criterion):
    """Compare against a source/destination port or port-range
    """
    def __init__(self, match: Match, iptables_option: str):
        super().__init__(match)
        self.__option = iptables_option
        self.__from_num = None
        self.__to_num = None

    def get_value(self) -> Tuple[int, Optional[int]]:
        """Returns the value that the criterion is comparing against

        :rtype: a tuple of (int, int|None)
        """
        return (self.__from_num, self.__to_num)

    def equals(self,                    # pylint: disable=arguments-differ
                from_num: int, to_num: Optional[int] =None) -> Match:
        """Compare with a number (or inclusion in number-range if ``to_num``
        is present)
        """
        self.__from_num = from_num
        self.__to_num = to_num
        return self._set_polarity(True)

    def _crit_iptables_args(self) -> List[str]:
        """Returns **iptables(8)** arguments for the specified number(s)
        """
        num_spec = str(self.__from_num)
        if self.__to_num is not None:
            num_spec += f'-{self.__to_num}'
        return [self.__option, num_spec]


[docs]class UidCriterion(_NumberRangeCriterion): """Compare with a uid, or uid range """ def __init__(self, match: Match): super().__init__(match, '--uid-owner')
[docs]class GidCriterion(_NumberRangeCriterion): """Compare with a gid, or gid range """ def __init__(self, match: Match): super().__init__(match, '--gid-owner')
[docs]class SocketExistsCriterion(BooleanCriterion): """Perform a socket existence test """ def __init__(self, match: Match): super().__init__(match, '--socket-exists')
[docs]class OwnerMatch(Match): """Match against userid, groupid, or socket existence. Only numeric userid, groupid values are supported. """ def __init__(self): self.__uid_crit = None self.__gid_crit = None self.__socket_exists_crit = None def __eq__(self, other): return ( isinstance(other, OwnerMatch) and self.uid() == other.uid() and self.gid() == other.gid() and self.socket_exists() == other.socket_exists() )
[docs] def uid(self) -> UidCriterion: """Compare with the UID """ if self.__uid_crit is None: self.__uid_crit = UidCriterion(self) return self.__uid_crit
[docs] def gid(self) -> GidCriterion: """Compare with the GID """ if self.__gid_crit is None: self.__gid_crit = GidCriterion(self) return self.__gid_crit
[docs] def socket_exists(self) -> SocketExistsCriterion: """Match if there is a socket for this packet """ if self.__socket_exists_crit is None: self.__socket_exists_crit = SocketExistsCriterion(self) return self.__socket_exists_crit
[docs] def to_iptables_args(self) -> List[str]: """Returns **iptables(8)** arguments for this match """ criteria = (self.__uid_crit, self.__gid_crit, self.__socket_exists_crit) return self.build_iptables_args('owner', criteria)
def _parse_criteria(self, criteria_iter, is_equal) -> bool: """Parse the owner criteria. Returns ``False`` if the same criterion is encountered again (example: owner UID 10-100 ! owner UID 20) This is an indication of a new match. """ token = next(criteria_iter) if token in ('UID', 'GID'): if token == 'UID': crit = self.uid() else: crit = self.gid() if crit.is_set(): criteria_iter.put_back(token) return False if next(criteria_iter) != 'match': raise IptablesParsingError( f"found token {token}; expected 'match'") from_num = None to_num = None num_spec = next(criteria_iter) if '-' in num_spec: numbers = num_spec.split('-', 1) from_num = int(numbers[0]) to_num = int(numbers[1]) else: from_num = int(num_spec) crit.compare(is_equal, from_num, to_num) elif token == 'socket': crit = self.socket_exists() if crit.is_set(): criteria_iter.put_back(token) return False token = next(criteria_iter) if token != 'exists': raise IptablesParsingError( f"found token {token}; expected 'exists'") crit.compare(is_equal) else: raise IptablesParsingError( f'unexpected token {token} parsing owner match') return True @classmethod def parse(cls, parser: MatchParser) -> Match: """Parse owner; the syntax is a concatenation of a subset of the following forms:: [!] owner UID|GID match <num>[-<num>] [!] owner socket exists The leading 'owner' field (and the preceding '!' if present) has already been consumed when this method is invoked. :meta private: """ # Return the 'owner' field and the negation if present to the # iterator so that we can process them as part of the for-loop below. # The for-loop is designed to handle all owner-related criteria # (which we expect to appear consecutively). # Because of the rewind, this method is now responsible for handling # StopIteration errors. parser.rewind_match() match = OwnerMatch() criteria_iter = parser.get_iter() negation = None for token in criteria_iter: try: if token == '!': negation = token token = next(criteria_iter) if token != 'owner' or not match._parse_criteria( criteria_iter, negation is None): criteria_iter.put_back(token) if negation is not None: criteria_iter.put_back(negation) break except StopIteration as stopiter: if negation is not None or token is not None: if token is None: raise IptablesParsingError( 'negation without criterion') from stopiter raise IptablesParsingError( 'incomplete owner match') from stopiter negation = None token = None return match
MatchParser.register_match('owner', OwnerMatch)