Extensibility¶
linuxnet.iptables supports only a subset of the match extensions and rule targets that are available in the Linux kernel.
Additional matches can be supported by subclassing the
Match and Criterion classes.
Additional targets can be supported by subclassing the
Target class.
Supporting a new match¶
Supporting a new iptables match requires the addition of a new
Match subclasss and one or more Criterion
subclasses. This will be illustrated via example; the example
uses the implementation of the existing TcpmssMatch class which
supports the iptables tcpmss match extension:
iptables -m tcpmss --mss value
The example code includes the TcpmssMatch class and the
associated MssCriterion class.
All classes needed for implementing a new match are available in the linux.iptables.extension module.
1# Copyright (c) 2021-2023, Panagiotis Tsirigotis
2
3"""
4Example module implementing matching against the TCP MSS
5"""
6
7from typing import List, Optional, Tuple
8from linuxnet.iptables.extension import (
9 Match,
10 Criterion,
11 MatchParser,
12 IptablesError)
13
14class TcpmssMatch(Match):
15 """Match against the MSS field of the TCP header
16 """
17 def __init__(self):
18 """This match uses a single criterion of type MssCritetion defined
19 below
20 """
21 self.__mss_crit = None
22
23 def __eq__(self, other):
24 return isinstance(other, TcpmssMatch) and self.mss() == other.mss()
25
26 def mss(self) -> 'MssCriterion':
27 """Match against the MSS field of the TCP header
28 """
29 if self.__mss_crit is None:
30 self.__mss_crit = MssCriterion(self)
31 return self.__mss_crit
32
33 def to_iptables_args(self) -> List[str]:
34 """Returns **iptables(8)** arguments for this match
35 """
36 return self.build_iptables_args('tcpmss', [self.__mss_crit])
37
38 @classmethod
39 def parse(cls, parser: MatchParser) -> Match:
40 """Parse tcpmss; example::
41
42 tcpmss match !10:20
43
44 The 'tcpmss' field has already been consumed by the parser.
45 """
46 parser.skip_field('match')
47 criteria_iter = parser.get_iter()
48 is_equal, val = parser.parse_value(next(criteria_iter))
49 if ':' in val:
50 mstr, estr = val.split(':', 1)
51 mssval = int(mstr)
52 endval = int(estr)
53 else:
54 mssval = int(val)
55 endval = None
56 return TcpmssMatch().mss().compare(is_equal, mssval, endval)
57
58# Registration of class, executed upon module import
59MatchParser.register_match('tcpmss', TcpmssMatch)
60
61
62class MssCriterion(Criterion):
63 """Compare with MSS field of the TCP header.
64
65 The comparison value is a tuple (int, int|None) to compare against
66 a specific MSS value or a range of values.
67 """
68 def __init__(self, match):
69 super().__init__(match)
70 self.__mssval = None
71 self.__endval = None
72
73 def get_value(self) -> Tuple[int, Optional[int]]:
74 """Returns the value that the criterion is comparing against.
75
76 :rtype: tuple of (int, int|None)
77 """
78 return (self.__mssval, self.__endval)
79
80 def equals(self, mssval: int, endval: Optional[int] =None) -> Match:
81 """Check for equality against ``mssval``, or range equality
82 if ``endval`` is present.
83
84 :param mssval: the MSS value
85 :param endval: range of MSS values from ``mssval`` to this value
86 """
87 if mssval is None:
88 raise IptablesError('mssval is None')
89 self.__mssval = mssval
90 if endval is not None and endval < mssval:
91 raise IptablesError('bad range: endval < mssval')
92 self.__endval = endval
93 return self._set_polarity(True)
94
95 def _crit_iptables_args(self) -> List[str]:
96 """Returns **iptables(8)** arguments for the criterion
97 """
98 mss = f'{self.__mssval}'
99 if self.__endval is not None:
100 mss += f':{self.__endval}'
101 return ['--mss', mss]
Adding a Match subclass¶
A new Match subclass must implement the following methods:
to_iptables_args(): this method should return the iptables(8) arguments for the match. In our example, the return value might look like this:['-m', 'tcpmss', '--mss', '500']
if the
MssCriterionvalue was set to500.Notice the use of the
Match.build_iptables_args()method; this is a helper method that will generate an empty list if none of the criteria in its argument list have been set.parse(): this is a classmethod that takes a single argument of typeMatchParserand returns an instance of the new subclass. This method is responsible for parsing theiptables -Lxnvoutput for this match. Notice the use of the-Lxnvoptions when implementing this method.This method should raise an
IptablesParsingErrorif unable to parse the iptables output.This method may raise a
CriteriaExhaustedErrorto terminate the match parsing process.One or more methods returning instances of the match-specific criteria. In the example, the relevant method is
TcpmssMatch.mss()(theTcpmssMatchsupports a single criterion) which returns an instance of theMssCriterionclass.
The new subclass should implement the special __eq__()
method; two Match objects should compare equal if they are
instances of the same Match subclass and their criteria are equal.
Finally, the new subclass must be registered with the MatchParser
class by invoking the method MatchParser.register_class(), and specifying
the keyword in the iptables(8) output that identifies the particular
match (in this example, that keyword is tcpmss)
Adding a Criterion subclass¶
A new Criterion subclass must implement the following methods:
equals(): this method expresses an equality comparison against a specific value. The method stores this value to be used later to generate the iptables arguments. In our example, theMssCriterioncomparison value is an integer or an integer range.Notice the invocation of
Match._set_polarity()at the end of this method; this is required and it serves two purposes:it marks the criterion as having been set
it identifies that the criterion expresses an equality comparison (i.e. if the polarity were set to
False, the criterion would express an inequality comparison)
a method that returns the iptables arguments for implementing this criterion. In our example this method is
MssCriterion._crit_iptables_args(); it returns a list with the relevant iptables option and the stored value, e.g.['--mss', '500']. This method is invoked by theTcpmssMatch.to_iptables_args()method.
Supporting a new target¶
Supporting a new iptables target extension requires the addition of a new
Target subclasss. This will be illustrated via example; the example
uses the implementation of the existing RejectTarget class which
supports the iptables REJECT target extension.
All classes needed for implementing a new target are available in the linux.iptables.extension module.
1# Copyright (c) 2021-2023, Panagiotis Tsirigotis
2
3"""
4Example module implementing a class to support the REJECT target
5"""
6
7from typing import List, Optional
8from linuxnet.iptables.extension import Target, TargetParser
9
10class RejectTarget(Target):
11 """This class provides access to the ``REJECT`` target
12 """
13 def __init__(self, reject_with: Optional[str] =None):
14 """
15 :param reject_with: optional ``ICMP`` message type
16 """
17 super().__init__('REJECT', terminates=True)
18 self.__reject_with = reject_with
19
20 def to_iptables_args(self) -> List[str]:
21 """Returns a list of **iptables(8)** arguments
22 """
23 retval = super().to_iptables_args()
24 if self.__reject_with is not None:
25 retval += ['--reject-with', self.__reject_with]
26 return retval
27
28 @classmethod
29 def parse(cls, field_iter) -> Target:
30 """Parse the REJECT target options
31 """
32 icmp_message = field_iter.next_value('reject-with')
33 return RejectTarget(reject_with=icmp_message)
34
35TargetParser.register_target('REJECT', RejectTarget, 'reject-with')
A new Target subclass must implement the following methods:
to_iptables_args(): this method should return the iptables(8) arguments for the target. In our example, the return value might look like this:['-j', 'REJECT', '--reject-with', 'icmp-port-unreachable']
assuming the
RejectTargetobject was initialized with the particular ICMP message.parse(): this is a classmethod that takes a single argument of typeRuleFieldIteratorand returns an instance of the new subclass.This method should raise an
IptablesParsingErrorif unable to parse the iptables output.
The new subclass must be registered with the
TargetParser class by invoking the method
TargetParser.register_target(), and specifying
the target name.
If there is a field in the iptables output
that identifies the beginning of the target options, this field
can be passed as an argument to the register_target() method.
In our example, this was the reject-with field.
Specifying such a field is optional. If specified, it
allows the RuleFieldIterator argument of the
parse() method to be positioned right after that field.
Check the documentation of TargetParser.register_target()
for possible arguments to this method.
Base classes¶
Match¶
The Match class is the parent class of all match-related classes.
- class Match[source]¶
Parent class for all match-specific subclasses.
- static build_iptables_args(match_name: Optional[str], crit_iterable) List[str][source]¶
Helper method to build an iptables(8) argument list from a match name and a list of
Criterionobjects. iptables(8) arguments will be extracted from each criterion that is set.- Parameters:
match_name – optional match name that will result in adding
-m match_nameat the beginning of the argument list; there must be at least one set criterion for the match name to be included in the return valuecrit_iterable – an iterable containing
Criterionobjects
Criterion¶
The Criterion class is the parent class of all classes implementing
match-specific criteria.
Objects of subclasses of Criterion are never directly instantiated
by the user; they are instantiated
as needed by the Match subclasses.
- class Criterion(match: Match)[source]¶
This class is used to express an iptables(8) match criterion; it does not perform any comparisons.
Criterionis a superclass that serves the following purposes:it provides an
equals()method and anot_equals()method to express comparison against a valueit keeps track of whether the criterion has been set; a criterion is set when either the
equals()ornot_equals()method is invoked; a criterion may only be set onceit keeps track of whether a criterion is negated or not
its
iptables_to_args()method generates the ‘!’ iptables(8) argument, and also checks if the criterion was set
The
equals()/not_equals()methods ofCriterionsubclasses must invoke the_set_polarity()method ofCriterionto indicate the polarity of the test. These methods are also responsible for saving the comparison value in the subclass object.A
Criterionhas an owner which is an object of a subclass ofMatch. Theequals()/not_equals()methods return this object to facilitate building a criteria list:match.crit1().equals('foo').crit2().not_equals('bar')
- Parameters:
match – the
Matchobject that owns thisCriterion
- is_positive() bool[source]¶
Returns the ‘polarity’ of the criterion;
Trueforequals()orFalsefornot_equals()Raises
IptablesErrorif the criterion is not set
- equals(*args, **kwargs) Match[source]¶
Express equality comparison against the argument values.
Subclasses will implement this method to express comparisons against a specific value (or values). These values will be the arguments of the subclass method and will be stored in the subclass object.
Subclasses overriding this method should invoke the
_set_polarity()method of this class to set the polarity toTrue.Returns this
Matchobject.
- not_equals(*args, **kwargs) Match[source]¶
Express inequality comparison against the argument values.
The arguments of this method are the same as those of the
equals()method.This method invokes the
equals()method and then reverses the polarity.Returns this
Matchobject.
- compare(is_equal: bool, *args, **kwargs) Match[source]¶
Alternative method used for comparisons. It invokes
equals()(ornot_equals()) withargsandkwargsifis_equalisTrue(orFalse).
Target¶
Parser classes¶
MatchParser¶
A MatchParser instance is used to parse the match part of
a rule in the iptables(8) output.
- class MatchParser(field_iter: LookaheadIterator)[source]¶
This class handles match parsing
- get_iter() LookaheadIterator[source]¶
Returns the field iterator
- static parse_value(value: str) Tuple[bool, str][source]¶
Check if the specified value starts with ‘!’ indicating negation. Returns the tuple (is_negative, value) where the optional ‘!’ has been stripped from the argument ‘value’
- parse_next_value() Tuple[bool, str][source]¶
Parse the next value from the iterator. Allow for the following syntax:
! value (2 fields) !value (1 field)
Returns the tuple (is_negative, value)
- skip_field(expected: str)[source]¶
Skip the next field, if it is equal to
expected. Otherwise, raise anIptablesParsingErrorexception.
LookaheadIterator¶
- class LookaheadIterator(iterable, lookahead: int)[source]¶
A LookaheadIterator is an iterator that provides the ability to put back previously returned tokens.
Conceptual view of the LookaheadIterator:
deque +---------------+ +---+---+---+---+---+ | back-iterator | | T | T | T |...| T | +---------------+ +---+---+---+---+---+ ^ | Cursor
Tokens to the right of the cursor have been consumed.
Tokens up to, but not including, the cursor are previously consumed tokens that have been put back.
New tokens are obtained from the back-iterator.
The value of the cursor indicates the number of put-back tokens.
The maximum size of the deque is equal to the lookahead.
- Parameters:
iterable – an iterable object from which we create the back-iterator
lookahead – number of tokens of look ahead
- put_back(token: str) None[source]¶
Put back the specified token. This must be a token previously returned by the iterator (identity is checked, not equality)
- rewind(step=1) LookaheadIterator[source]¶
Put back last
steptokensA
ValueErrorwill be raised if there are not enough tokens to put back.
TargetParser¶
A TargetParser instance is used to parse the target part of
a rule in the iptables(8) output.
- class TargetParser(target_name: Optional[str], field_iter: RuleFieldIterator)[source]¶
This class handles target parsing
- Parameters:
target_name – the target name
field_iter – a
RuleFieldIterator
- classmethod register_target(target_name: str, target_klass, start_field: Optional[str] = None, prefix_match: Optional[bool] = False)[source]¶
Register a class to handle parsing for a target.
- Parameters:
target_name – this is the target name that appears in the
iptables -Loutput.target_klass – a subclass of
Targetstart_field – the field in the iptables(8) output that is the beginning of the target fields; if present, the iterator passed to
parse()of thetarget_classwill be forwarded past the field that matchesstart_fieldprefix_match – if
True,start_fieldis the prefix of the field that is the beginning of the target fields
RuleFieldIterator¶
- class RuleFieldIterator(*args, **kwargs)[source]¶
Bases:
LookaheadIteratorThis iterator is used to parse the target-related fields of a rule. When instantiated, the iterator may still be ‘inside’ the match fields due to encountering an unsupported match.
The target-specific parsing code should advance the iterator until it finds the beginning of the target-related fields. The methods
forward()andforward_to()may be used for this.- Parameters:
iterable – an iterable object from which we create the back-iterator
lookahead – number of tokens of look ahead
- next_field(attr: Optional[str] = None) str[source]¶
Returns the next field which holds the value for
attrRaises an
IptablesParsingErrorif the iterator has run out of fields.
- forward(field_name: Optional[str], *, prefix_match: Optional[bool] = False) RuleFieldIterator[source]¶
Forward past the field identified by
field_name, i.e. the next field to be returned by the iterator will be the one afterfield_name.- Parameters:
field_name – the name of field to reach; if
None, this method call is a no-opprefix_match – if
True, match the first field with afield_nameprefix
- Return type:
this object
Raises an
IptablesParsingErrorif no match is found.
- forward_to(fields: Iterable[str]) Optional[str][source]¶
Forward to a field among those in
fields; this field will be returned, if found. The next field returned by the iterator will be the one found.- Parameters:
fields – field to match; if
Noneor empty, this method call is a no-op- Return type:
the matching field, or
None
- store_field(field: str) None[source]¶
Store the specified field.
Target subclasses that choose to do their own forwarding to find the field that starts the target options (instead of using the
forward()method) should invoke this method to ‘store’ the fields they skip.
- peek() Optional[str]¶
Returns the next token, but does not consume it
- put_back(token: str) None¶
Put back the specified token. This must be a token previously returned by the iterator (identity is checked, not equality)
- rewind(step=1) LookaheadIterator¶
Put back last
steptokensA
ValueErrorwill be raised if there are not enough tokens to put back.