Source code for linuxnet.iptables.parsing

# Copyright (c) 2021, 2022, 2023, 2025, Panagiotis Tsirigotis

# This file is part of linuxnet-iptables.
#
# linuxnet-iptables is free software: you can redistribute it and/or
# modify it under the terms of version 3 of the GNU Affero General Public
# License as published by the Free Software Foundation.
#
# linuxnet-iptables is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
# License for more details.
#
# You should have received a copy of the GNU Affero General
# Public License along with linuxnet-iptables. If not, see
# <https://www.gnu.org/licenses/>.

"""
This module provides classes related to the parsing of the iptables output:

    - LookaheadIterator
    - RuleFieldIterator
"""

from collections import deque
from typing import Iterable, List, Optional, Tuple, Union

from .exceptions import IptablesParsingError
from .deps import get_logger


_logger = get_logger("linuxnet.iptables.parsing")


[docs]class LookaheadIterator: """A LookaheadIterator is an iterator that provides the ability to put back previously returned tokens. Conceptual view of the LookaheadIterator:: deque +---------------+ +---+---+---+---+---+ | back-iterator | | T | T | T |...| T | +---------------+ +---+---+---+---+---+ ^ | Cursor * Tokens to the right of the cursor have been consumed. * Tokens up to, but not including, the cursor are previously consumed tokens that have been put back. * New tokens are obtained from the back-iterator. * The value of the cursor indicates the number of put-back tokens. * The maximum size of the deque is equal to the lookahead. """ def __init__(self, iterable, lookahead: int): """ :param iterable: an iterable object from which we create the back-iterator :param lookahead: number of tokens of look ahead """ self.__iter = iter(iterable) if lookahead <= 0: raise ValueError(f'bad lookahead value {lookahead}') self.__tokens = deque(maxlen=lookahead) self.__cursor = 0 def __iter__(self): return self def __next__(self): if self.__cursor == 0: token = next(self.__iter) self.__tokens.appendleft(token) else: self.__cursor -= 1 token = self.__tokens[self.__cursor] return token
[docs] def peek(self) -> Optional[str]: """Returns the next token, but does not consume it """ try: token = self.__next__() self.put_back(token) return token except StopIteration: return None
[docs] def put_back(self, token: str, *, replace_token=False) -> None: """This method puts the previously returned token back to the iterator, so that the token can be returned again. :param token: the token to put back to the iterator; when ``replace_token`` is ``False``, this must be **the** token previously returned by the iterator: identity is checked, not equality :param replace_token: if ``True``, it allows ``token`` to be different than then one previously returned by the iterator """ if self.__cursor == len(self.__tokens): # Either there are no consumed tokens, or this is an attempt to # put back one more tokens than those already consumed. raise ValueError('not a consumed token') if token is not self.__tokens[self.__cursor]: if not replace_token: raise ValueError( f'wrong token: expected={self.__tokens[self.__cursor]}, ' f'putback={token}') self.__tokens[self.__cursor] = token self.__cursor += 1
[docs] def rewind(self, step=1) -> 'LookaheadIterator': """Put back last ``step`` tokens A :exc:`ValueError` will be raised if there are not enough tokens to put back. """ avail = len(self.__tokens) - self.__cursor if step > avail: raise ValueError(f'unable to rewind {step} token(s)') self.__cursor += step return self
[docs]class RuleFieldIterator(LookaheadIterator): """ This iterator is used to parse the target-related fields of a rule. When instantiated, the iterator may still be 'inside' the match fields due to encountering an unsupported match. The target-specific parsing code should advance the iterator until it finds the beginning of the target-related fields. The methods :meth:`forward` and :meth:`forward_to` may be used for this. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.__stored_fields = []
[docs] def next_field(self, attr: Optional[str] =None) -> str: """Returns the next field which holds the value for ``attr`` Raises an :exc:`IptablesParsingError` if the iterator has run out of fields. """ try: return next(self) except StopIteration as stopit: if attr is not None: raise IptablesParsingError( f"missing value for '{attr}'") from stopit raise IptablesParsingError('no more fields') from stopit
[docs] def forward(self, field_name: Optional[Union[str,Tuple]], *, prefix_match: Optional[bool] =False) -> 'RuleFieldIterator': """Forward past the field identified by ``field_name``, i.e. the next field to be returned by the iterator will be the one after ``field_name``. :param field_name: the name of field to reach; if ``None``, this method call is a no-op :param prefix_match: if ``True``, match the first field with a ``field_name`` prefix :rtype: this object Raises an :exc:`IptablesParsingError` if no match is found. """ if field_name is None: return self try: while True: field = next(self) if prefix_match: if field.startswith(field_name): return self elif isinstance(field_name, tuple) and field in field_name: return self elif field == field_name: return self self.__stored_fields.append(field) except StopIteration as stopit: raise IptablesParsingError( f"missing '{field_name}'") from stopit
[docs] def forward_to(self, fields: Iterable[str]) -> Optional[str]: """Forward to a field among those in ``fields``; this field will be returned, if found. The next field returned by the iterator will be the one found. :param fields: field to match; if ``None`` or empty, this method call is a no-op :rtype: the matching field, or ``None`` """ if not fields: return None try: while True: field = next(self) if field in fields: return field self.__stored_fields.append(field) except StopIteration: return None
[docs] def next_value(self, attr: str) -> str: """Returns the next field which holds the value for ``attr`` """ try: return next(self) except StopIteration as stopit: raise IptablesParsingError( f"missing value for '{attr}'") from stopit
[docs] def store_field(self, field: str) -> None: """Store the specified field. Target subclasses that choose to do their own forwarding to find the field that starts the target options (instead of using the :meth:`forward` method) should invoke this method to 'store' the fields they skip. """ self.__stored_fields.append(field)
def store_rest(self) -> None: """Store any remaining fields :meta private: """ self.__stored_fields.extend(self) def get_stored_fields(self) -> List[str]: """Returns any stored fields :meta private: """ return self.__stored_fields