Extensibility

linuxnet.iptables supports only a subset of the match extensions and rule targets that are available in the Linux kernel.

Additional matches can be supported by subclassing the Match and Criterion classes.

Additional targets can be supported by subclassing the Target class.

Supporting a new match

Supporting a new iptables match requires the addition of a new Match subclasss and one or more Criterion subclasses. This will be illustrated via example; the example uses the implementation of the existing TcpmssMatch class which supports the iptables tcpmss match extension:

iptables -m tcpmss --mss value

The example code includes the TcpmssMatch class and the associated MssCriterion class.

All classes needed for implementing a new match are available in the linux.iptables.extension module.

  1# Copyright (c) 2021-2023, Panagiotis Tsirigotis
  2
  3"""
  4Example module implementing matching against the TCP MSS
  5"""
  6
  7from typing import List, Optional, Tuple
  8from linuxnet.iptables.extension import (
  9        Match,
 10        Criterion,
 11        MatchParser,
 12        IptablesError)
 13
 14class TcpmssMatch(Match):
 15    """Match against the MSS field of the TCP header
 16    """
 17    def __init__(self):
 18        """This match uses a single criterion of type MssCritetion defined
 19        below
 20        """
 21        self.__mss_crit = None
 22
 23    def __eq__(self, other):
 24        return isinstance(other, TcpmssMatch) and self.mss() == other.mss()
 25
 26    def mss(self) -> 'MssCriterion':
 27        """Match against the MSS field of the TCP header
 28        """
 29        if self.__mss_crit is None:
 30            self.__mss_crit = MssCriterion(self)
 31        return self.__mss_crit
 32
 33    def to_iptables_args(self) -> List[str]:
 34        """Returns **iptables(8)** arguments for this match
 35        """
 36        return self.build_iptables_args('tcpmss', [self.__mss_crit])
 37
 38    @classmethod
 39    def parse(cls, parser: MatchParser) -> Match:
 40        """Parse tcpmss; example::
 41
 42            tcpmss match !10:20
 43
 44        The 'tcpmss' field has already been consumed by the parser.
 45        """
 46        parser.skip_field('match')
 47        criteria_iter = parser.get_iter()
 48        is_equal, val = parser.parse_value(next(criteria_iter))
 49        if ':' in val:
 50            mstr, estr = val.split(':', 1)
 51            mssval = int(mstr)
 52            endval = int(estr)
 53        else:
 54            mssval = int(val)
 55            endval = None
 56        return TcpmssMatch().mss().compare(is_equal, mssval, endval)
 57
 58# Registration of class, executed upon module import
 59MatchParser.register_match('tcpmss', TcpmssMatch)
 60
 61
 62class MssCriterion(Criterion):
 63    """Compare with MSS field of the TCP header.
 64
 65    The comparison value is a tuple (int, int|None) to compare against
 66    a specific MSS value or a range of values.
 67    """
 68    def __init__(self, match):
 69        super().__init__(match)
 70        self.__mssval = None
 71        self.__endval = None
 72
 73    def get_value(self) -> Tuple[int, Optional[int]]:
 74        """Returns the value that the criterion is comparing against.
 75
 76        :rtype: tuple of (int, int|None)
 77        """
 78        return (self.__mssval, self.__endval)
 79
 80    def equals(self, mssval: int, endval: Optional[int] =None) -> Match:
 81        """Check for equality against ``mssval``, or range equality
 82        if ``endval`` is present.
 83
 84        :param mssval: the MSS value
 85        :param endval: range of MSS values from ``mssval`` to this value
 86        """
 87        if mssval is None:
 88            raise IptablesError('mssval is None')
 89        self.__mssval = mssval
 90        if endval is not None and endval < mssval:
 91            raise IptablesError('bad range: endval < mssval')
 92        self.__endval = endval
 93        return self._set_polarity(True)
 94
 95    def _crit_iptables_args(self) -> List[str]:
 96        """Returns **iptables(8)** arguments for the criterion
 97        """
 98        mss = f'{self.__mssval}'
 99        if self.__endval is not None:
100            mss += f':{self.__endval}'
101        return ['--mss', mss]

Adding a Match subclass

A new Match subclass must implement the following methods:

  1. to_iptables_args(): this method should return the iptables(8) arguments for the match. In our example, the return value might look like this:

    ['-m', 'tcpmss', '--mss', '500']
    

    if the MssCriterion value was set to 500.

    Notice the use of the Match.build_iptables_args() method; this is a helper method that will generate an empty list if none of the criteria in its argument list have been set.

  2. parse(): this is a classmethod that takes a single argument of type MatchParser and returns an instance of the new subclass. This method is responsible for parsing the iptables -Lxnv output for this match. Notice the use of the -Lxnv options when implementing this method.

    This method should raise an IptablesParsingError if unable to parse the iptables output.

    This method may raise a CriteriaExhaustedError to terminate the match parsing process.

  3. One or more methods returning instances of the match-specific criteria. In the example, the relevant method is TcpmssMatch.mss() (the TcpmssMatch supports a single criterion) which returns an instance of the MssCriterion class.

The new subclass should implement the special __eq__() method; two Match objects should compare equal if they are instances of the same Match subclass and their criteria are equal.

Finally, the new subclass must be registered with the MatchParser class by invoking the method MatchParser.register_class(), and specifying the keyword in the iptables(8) output that identifies the particular match (in this example, that keyword is tcpmss)

Adding a Criterion subclass

A new Criterion subclass must implement the following methods:

  1. equals(): this method expresses an equality comparison against a specific value. The method stores this value to be used later to generate the iptables arguments. In our example, the MssCriterion comparison value is an integer or an integer range.

    Notice the invocation of Match._set_polarity() at the end of this method; this is required and it serves two purposes:

    • it marks the criterion as having been set

    • it identifies that the criterion expresses an equality comparison (i.e. if the polarity were set to False, the criterion would express an inequality comparison)

  2. a method that returns the iptables arguments for implementing this criterion. In our example this method is MssCriterion._crit_iptables_args(); it returns a list with the relevant iptables option and the stored value, e.g. ['--mss', '500']. This method is invoked by the TcpmssMatch.to_iptables_args() method.

Supporting a new target

Supporting a new iptables target extension requires the addition of a new Target subclasss. This will be illustrated via example; the example uses the implementation of the existing RejectTarget class which supports the iptables REJECT target extension.

All classes needed for implementing a new target are available in the linux.iptables.extension module.

 1# Copyright (c) 2021-2023, Panagiotis Tsirigotis
 2
 3"""
 4Example module implementing a class to support the REJECT target
 5"""
 6
 7from typing import List, Optional
 8from linuxnet.iptables.extension import Target, TargetParser
 9
10class RejectTarget(Target):
11    """This class provides access to the ``REJECT`` target
12    """
13    def __init__(self, reject_with: Optional[str] =None):
14        """
15        :param reject_with: optional ``ICMP`` message type
16        """
17        super().__init__('REJECT', terminates=True)
18        self.__reject_with = reject_with
19
20    def to_iptables_args(self) -> List[str]:
21        """Returns a list of **iptables(8)** arguments
22        """
23        retval = super().to_iptables_args()
24        if self.__reject_with is not None:
25            retval += ['--reject-with', self.__reject_with]
26        return retval
27
28    @classmethod
29    def parse(cls, field_iter) -> Target:
30        """Parse the REJECT target options
31        """
32        icmp_message = field_iter.next_value('reject-with')
33        return RejectTarget(reject_with=icmp_message)
34
35TargetParser.register_target('REJECT', RejectTarget, 'reject-with')

A new Target subclass must implement the following methods:

  1. to_iptables_args(): this method should return the iptables(8) arguments for the target. In our example, the return value might look like this:

    ['-j', 'REJECT', '--reject-with', 'icmp-port-unreachable']
    

    assuming the RejectTarget object was initialized with the particular ICMP message.

  2. parse(): this is a classmethod that takes a single argument of type RuleFieldIterator and returns an instance of the new subclass.

    This method should raise an IptablesParsingError if unable to parse the iptables output.

The new subclass must be registered with the TargetParser class by invoking the method TargetParser.register_target(), and specifying the target name.

If there is a field in the iptables output that identifies the beginning of the target options, this field can be passed as an argument to the register_target() method. In our example, this was the reject-with field. Specifying such a field is optional. If specified, it allows the RuleFieldIterator argument of the parse() method to be positioned right after that field. Check the documentation of TargetParser.register_target() for possible arguments to this method.


Base classes

Match

The Match class is the parent class of all match-related classes.

class Match[source]

Parent class for all match-specific subclasses.

static build_iptables_args(match_name: Optional[str], crit_iterable) List[str][source]

Helper method to build an iptables(8) argument list from a match name and a list of Criterion objects. iptables(8) arguments will be extracted from each criterion that is set.

Parameters:
  • match_name – optional match name that will result in adding -m match_name at the beginning of the argument list; there must be at least one set criterion for the match name to be included in the return value

  • crit_iterable – an iterable containing Criterion objects

has_criteria() bool[source]

Returns True if the match has any criteria set

to_iptables_args() List[str][source]

Returns a list of iptables(8) arguments

This method must be implemented by subclasses.

Criterion

The Criterion class is the parent class of all classes implementing match-specific criteria. Objects of subclasses of Criterion are never directly instantiated by the user; they are instantiated as needed by the Match subclasses.

class Criterion(match: Match)[source]

This class is used to express an iptables(8) match criterion; it does not perform any comparisons.

Criterion is a superclass that serves the following purposes:
  1. it provides an equals() method and a not_equals() method to express comparison against a value

  2. it keeps track of whether the criterion has been set; a criterion is set when either the equals() or not_equals() method is invoked; a criterion may only be set once

  3. it keeps track of whether a criterion is negated or not

  4. its iptables_to_args() method generates the ‘!’ iptables(8) argument, and also checks if the criterion was set

The equals()/not_equals() methods of Criterion subclasses must invoke the _set_polarity() method of Criterion to indicate the polarity of the test. These methods are also responsible for saving the comparison value in the subclass object.

A Criterion has an owner which is an object of a subclass of Match. The equals()/not_equals() methods return this object to facilitate building a criteria list:

match.crit1().equals('foo').crit2().not_equals('bar')
Parameters:

match – the Match object that owns this Criterion

is_set() bool[source]

Returns True if the criterion has been set

is_positive() bool[source]

Returns the ‘polarity’ of the criterion; True for equals() or False for not_equals()

Raises IptablesError if the criterion is not set

get_value() Any[source]

Returns the value that this criterion is comparing against

equals(*args, **kwargs) Match[source]

Express equality comparison against the argument values.

Subclasses will implement this method to express comparisons against a specific value (or values). These values will be the arguments of the subclass method and will be stored in the subclass object.

Subclasses overriding this method should invoke the _set_polarity() method of this class to set the polarity to True.

Returns this Match object.

not_equals(*args, **kwargs) Match[source]

Express inequality comparison against the argument values.

The arguments of this method are the same as those of the equals() method.

This method invokes the equals() method and then reverses the polarity.

Returns this Match object.

compare(is_equal: bool, *args, **kwargs) Match[source]

Alternative method used for comparisons. It invokes equals() (or not_equals()) with args and kwargs if is_equal is True (or False).

Target

class Target(target_name: str, terminates: bool)[source]

Parent class for all targets.

Parameters:
  • target_name – the name of the target

  • terminates – if True, this target terminates processing

is_terminating() bool[source]

Returns True if this is a terminating target

get_target_name() str[source]

Returns the target name

to_iptables_args() List[str][source]

Returns a list of iptables(8) arguments


Parser classes

MatchParser

A MatchParser instance is used to parse the match part of a rule in the iptables(8) output.

class MatchParser(field_iter: LookaheadIterator)[source]

This class handles match parsing

get_iter() LookaheadIterator[source]

Returns the field iterator

get_match_name() Optional[str][source]

Returns the match name, if any

get_negation() Optional[str][source]

Returns the negation string, if any

static parse_value(value: str) Tuple[bool, str][source]

Check if the specified value starts with ‘!’ indicating negation. Returns the tuple (is_negative, value) where the optional ‘!’ has been stripped from the argument ‘value’

parse_next_value() Tuple[bool, str][source]

Parse the next value from the iterator. Allow for the following syntax:

! value  (2 fields)
!value (1 field)

Returns the tuple (is_negative, value)

skip_field(expected: str)[source]

Skip the next field, if it is equal to expected. Otherwise, raise an IptablesParsingError exception.

rewind_match()[source]

Returns the match name, and negation string if any, back to the iterator.

parse_matches() List[Match][source]

This method traverses the match part of the rule invoking the match-specific classes based on the name of the match.

classmethod register_match(ident: str, klass) None[source]

Register the given class (which should be a subclass of the Match class). The ident string is the match name that appears in the iptables -L output.

LookaheadIterator

class LookaheadIterator(iterable, lookahead: int)[source]

A LookaheadIterator is an iterator that provides the ability to put back previously returned tokens.

Conceptual view of the LookaheadIterator:

                        deque
+---------------+  +---+---+---+---+---+
| back-iterator |  | T | T | T |...| T |
+---------------+  +---+---+---+---+---+
                         ^
                         |
                       Cursor
  • Tokens to the right of the cursor have been consumed.

  • Tokens up to, but not including, the cursor are previously consumed tokens that have been put back.

  • New tokens are obtained from the back-iterator.

  • The value of the cursor indicates the number of put-back tokens.

  • The maximum size of the deque is equal to the lookahead.

Parameters:
  • iterable – an iterable object from which we create the back-iterator

  • lookahead – number of tokens of look ahead

peek() Optional[str][source]

Returns the next token, but does not consume it

put_back(token: str) None[source]

Put back the specified token. This must be a token previously returned by the iterator (identity is checked, not equality)

rewind(step=1) LookaheadIterator[source]

Put back last step tokens

A ValueError will be raised if there are not enough tokens to put back.

TargetParser

A TargetParser instance is used to parse the target part of a rule in the iptables(8) output.

class TargetParser(target_name: Optional[str], field_iter: RuleFieldIterator)[source]

This class handles target parsing

Parameters:
classmethod register_target(target_name: str, target_klass, start_field: Optional[str] = None, prefix_match: Optional[bool] = False)[source]

Register a class to handle parsing for a target.

Parameters:
  • target_name – this is the target name that appears in the iptables -L output.

  • target_klass – a subclass of Target

  • start_field – the field in the iptables(8) output that is the beginning of the target fields; if present, the iterator passed to parse() of the target_class will be forwarded past the field that matches start_field

  • prefix_match – if True, start_field is the prefix of the field that is the beginning of the target fields

parse_target(is_goto: bool) Optional[Target][source]

Parses the specified target name and options. Returns a (subclass of) Target, or None if there is no target name.

RuleFieldIterator

class RuleFieldIterator(*args, **kwargs)[source]

Bases: LookaheadIterator

This iterator is used to parse the target-related fields of a rule. When instantiated, the iterator may still be ‘inside’ the match fields due to encountering an unsupported match.

The target-specific parsing code should advance the iterator until it finds the beginning of the target-related fields. The methods forward() and forward_to() may be used for this.

Parameters:
  • iterable – an iterable object from which we create the back-iterator

  • lookahead – number of tokens of look ahead

next_field(attr: Optional[str] = None) str[source]

Returns the next field which holds the value for attr

Raises an IptablesParsingError if the iterator has run out of fields.

forward(field_name: Optional[str], *, prefix_match: Optional[bool] = False) RuleFieldIterator[source]

Forward past the field identified by field_name, i.e. the next field to be returned by the iterator will be the one after field_name.

Parameters:
  • field_name – the name of field to reach; if None, this method call is a no-op

  • prefix_match – if True, match the first field with a field_name prefix

Return type:

this object

Raises an IptablesParsingError if no match is found.

forward_to(fields: Iterable[str]) Optional[str][source]

Forward to a field among those in fields; this field will be returned, if found. The next field returned by the iterator will be the one found.

Parameters:

fields – field to match; if None or empty, this method call is a no-op

Return type:

the matching field, or None

next_value(attr: str) str[source]

Returns the next field which holds the value for attr

store_field(field: str) None[source]

Store the specified field.

Target subclasses that choose to do their own forwarding to find the field that starts the target options (instead of using the forward() method) should invoke this method to ‘store’ the fields they skip.

peek() Optional[str]

Returns the next token, but does not consume it

put_back(token: str) None

Put back the specified token. This must be a token previously returned by the iterator (identity is checked, not equality)

rewind(step=1) LookaheadIterator

Put back last step tokens

A ValueError will be raised if there are not enough tokens to put back.


Utility classes

BooleanCriterion

class BooleanCriterion(match: Match, iptables_option: str, supports_negation=True)[source]

Bases: Criterion

Helper class for criteria that test single bits.

Parameters:
  • match – the Match object that owns this Criterion

  • iptables_option – the iptables(8) option to use for this criterion

  • supports_negation – if True, the criteria supports the meth:not_equals method

get_value() bool[source]

Returns the criterion value

bit_set() Match[source]

Check if the bit is set

bit_not_set() Match[source]

Check if the bit is set

equals() Match[source]

Compare with the setting of the bit

not_equals() Match[source]

Express a test against the criterion being False

GenericCriterion

class GenericCriterion(match: Match, iptables_option: str)[source]

Bases: Criterion

A helper class that can be used by all criteria that correspond to iptables(8) options of the form “[!] option value”, for example, “[!] -p tcp”

Parameters:

match – the Match object that owns this Criterion

get_iptables_option() str[source]

Returns the iptables(8) option

get_value() Any[source]

Returns the criterion value

equals(value) Match[source]

Compare with the specified value