# Copyright (c) 2021, 2022, 2023, 2024, Panagiotis Tsirigotis
# This file is part of linuxnet-iptables.
#
# linuxnet-iptables is free software: you can redistribute it and/or
# modify it under the terms of version 3 of the GNU Affero General Public
# License as published by the Free Software Foundation.
#
# linuxnet-iptables is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
# License for more details.
#
# You should have received a copy of the GNU Affero General
# Public License along with linuxnet-iptables. If not, see
# <https://www.gnu.org/licenses/>.
"""This module provides the ChainRule class
"""
from typing import Iterator, List, Optional
from .exceptions import IptablesError, IptablesParsingError
from .matches import Match, MatchNone, PacketMatch, CommentMatch
from .matches.match import MatchParser
from .parsing import LookaheadIterator, RuleFieldIterator
from .targets import Target, ChainTarget, UnparsedTarget, TargetNone
from .targets.target import TargetParser
from .deps import get_logger
_logger = get_logger("linuxnet.iptables.rule")
[docs]class ChainRule: # pylint: disable=too-many-instance-attributes
"""This class represents a rule in an **iptables(8)** chain.
A :class:`ChainRule` has a (possibly empty) list of :class:`Match`
objects and an optional :class:`Target` object.
Multiple :class:`Match` objects of the same type can be included
in a rule. Since multiple :class:`Match` objects imply a logical-AND,
including objects of the same type may be useful when using negation.
However, there can be at most one :class:`PacketMatch` object included.
A :class:`ChainRule` object is iterable, returning the rule's
:class:`Match` instances.
"""
def __init__(self, *,
match: Optional[Match] =None,
match_list: Optional[List[Match]] =None,
target: Optional[Target] =None,
uses_goto: Optional[bool] =False,
goto_chain: Optional['Chain'] = None):
"""
:param match: optional :class:`Match` object; if present, it is added
to the rule's list of :class:`Match` objects
:param match_list: optional list of :class:`Match` objects;
if present, it is appended to the rule's list of :class:`Match`
objects
:param target: a :class:`Target` object; either this parameter or
the ``goto_chain`` parameter may be specified
:param uses_goto: if ``True``, rule processing continues at the
specified target (which **must** be a :class:`ChainTarget`)
short-circuiting any rules following this one in the chain
:param goto_chain: an optional :class:`Chain` object that is
the target of this rule via a ``goto`` (instead of ``jump``);
either this parameter or the ``target`` parameter may be
specified
"""
if goto_chain is not None and target is not None:
raise IptablesError('both target and goto chain specified')
self.__match_list = []
if match is not None:
self.__match_list.append(match)
if match_list:
self.__match_list += match_list
self.__target = target
self.__uses_goto = uses_goto
if goto_chain is not None:
self.__target = ChainTarget(chain=goto_chain)
self.__uses_goto = True
self.__packet_count = 0
self.__byte_count = 0
self.__owner_chain = None
self.__rulenum = 0
# __iptables_line is not None if this rule is created from
# iptables(8) output
self.__iptables_line = None
# The __parsed attribute will be True if the __iptables_line
# is not None and has been successfully parsed.
self.__parsed = False
# Verify that there is at most 1 PacketMatch
has_packet_match = False
for mobj in self.__match_list:
if isinstance(mobj, PacketMatch):
if has_packet_match:
raise IptablesError('more than one PacketMatch in rule')
has_packet_match = True
def __str__(self):
if self.parsing_failed():
rule_str = f"UNPARSED: {self.__iptables_line}"
else:
rule_str = ' '.join(self.to_iptables_args())
return f"ChainRule('{rule_str}')"
def __iter__(self):
"""Iterator for the rule's matches
"""
return iter(self.__match_list)
def _set_stats(self, *, packet_count: int, byte_count: int):
"""Set the rule stats
This method is only used by the parsing code.
"""
self.__packet_count = packet_count
self.__byte_count = byte_count
def _set_iptables_line(self, line: str, parsed: bool) -> None:
"""Setting the iptables line marks the rule as unparsed
"""
self.__iptables_line = line
self.__parsed = parsed
[docs] def parsing_failed(self) -> bool:
"""Returns ``True`` if the rule has not been parsed successfully
"""
return (self.__iptables_line is not None and
(not self.__parsed or isinstance(self.__target, UnparsedTarget)))
[docs] def get_iptables_line(self) -> Optional[str]:
"""Returns the iptables line if this rule was created from
the output of **iptables(8)**, otherwise it returns ``None``.
"""
return self.__iptables_line
[docs] def get_packet_count(self) -> int:
"""Returns the packet count of the rule
"""
return self.__packet_count
[docs] def get_byte_count(self) -> int:
"""Returns the byte count of the rule
"""
return self.__byte_count
[docs] def get_chain(self) -> 'Chain':
"""Returns the :class:`Chain` where this rule belongs (returns
``None`` if this rule is not in any chain)
"""
return self.__owner_chain
[docs] def get_rulenum(self) -> int:
"""Returns the rule number
"""
return self.__rulenum
def _set_chain(self, chain, rulenum: int) -> None:
"""Set the :class:`Chain` where this rule belongs; also sets
the rule number.
"""
if self.__owner_chain is not None:
raise IptablesError("rule belongs to different chain")
self.__owner_chain = chain
self.__rulenum = rulenum
def _inc_rulenum(self) -> None:
"""Increase rulenum by 1; this is used when a rule is inserted
before this one.
"""
self.__rulenum += 1
def _dec_rulenum(self) -> None:
"""Decrease rulenum by 1; this is used when a rule after this
one is deleted from a chain.
"""
self.__rulenum -= 1
def _deleted(self) -> None:
"""Invoked when the rule is deleted
"""
self.__owner_chain = None
self.__rulenum = 0
[docs] def get_target(self) -> Optional[Target]:
"""Returns the rule target (a :class:`Target` object) or ``None``
"""
return self.__target
[docs] def uses_goto(self) -> bool:
"""Returns ``True`` if this rule 'goes' to its (chain) target
instead of 'jumping' to it.
"""
return self.__uses_goto
def _set_target(self, target: Target) -> None:
"""Change the rule's target without checking if the
rule belongs to a chain.
"""
self.__target = target
if (isinstance(self.__target, UnparsedTarget) and
not isinstance(target, UnparsedTarget) and
self.__iptables_line is not None):
self.__parsed = True
[docs] def set_target(self, target: Target) -> None:
"""Set the rule target
"""
if self.__owner_chain is not None:
raise IptablesError('attempt to replace target of active rule')
self.__target = target
[docs] def iter_match_list(self) -> Iterator[Match]:
"""Returns an iterator for the matches of this rule.
**This method is deprecated and will be removed at a future version.**
"""
return iter(self.__match_list)
[docs] def iter_matches(self, lookfor: Optional[Match] =None) -> Iterator[Match]:
"""Returns an iterator for the matches of this rule.
If ``lookfor`` is not ``None``, the iterator will return
:class:`Match` instances with criteria that compare equal to those of
the ``lookfor`` :class:`Match`; if ``lookfor`` has no criteria
defined, the iterator will return :class:`Match` instances of
the **same** type as the ``lookfor`` :class:`Match`.
"""
if lookfor is None:
return iter(self.__match_list)
lookfor_klass = type(lookfor)
for crit in lookfor.get_criteria():
if crit is not None and crit.is_set():
# Perform a match value comparison
lookfor_klass = type(None)
break
return filter(lambda m: lookfor_klass is type(m) or m == lookfor,
self.__match_list)
[docs] def get_match_count(self) -> int:
"""Returns the number of matches.
"""
return len(self.__match_list)
[docs] def get_match_list(self) -> List[Match]:
"""Returns the match list of this rule.
"""
return self.__match_list[:]
[docs] def has_match(self, match: Match, is_only_match=True) -> bool:
"""Returns ``True`` if the match list of this rule consists only
of the specified match
(when ``is_only_match`` is ``True``)
or if the match list contains the specified match
(when ``is_only_match`` is ``False``).
An object of :class:`MatchNone` can be
used to test for an empty match list.
"""
if isinstance(match, MatchNone):
return not bool(self.__match_list)
if is_only_match:
if len(self.__match_list) != 1:
return False
return match == self.__match_list[0]
for existing_match in self.__match_list:
if match == existing_match:
return True
return False
[docs] def has_target(self, target: Target) -> bool:
"""Returns ``True`` if the rule has the specified target.
An object of :class:`TargetNone` can be used to test for lack
of target.
"""
if isinstance(target, TargetNone):
return self.__target is None
return self.__target == target
[docs] def targets_chain(self, chain: 'Chain') -> bool:
"""Returns ``True`` if the target of this rule is the specified chain
:param chain: a :class:`Chain` object
"""
# Must invoke the get_target_chain() method to force lazy resolution
# of the Chain object
target_chain = self.get_target_chain()
return (target_chain is not None and
target_chain.get_real_name() == chain.get_real_name())
[docs] def get_target_chain(self) -> Optional['Chain']:
"""Returns the :class:`Chain` object that is
the target of this rule, or ``None`` if this rule does not
target a chain.
"""
if not isinstance(self.__target, ChainTarget):
return None
target_chain = self.__target.get_chain()
if target_chain is not None:
return target_chain
# The ChainTarget may only have the chain name, but not the
# chain object; try to resolve it
pft = self.__owner_chain.get_pft()
if pft is None:
raise IptablesError("rule is not in IptablesPacketFilterTable")
return self.__target.resolve_chain(pft)
[docs] def matches_all_packets(self) -> bool:
"""Returns ``True`` iff this rule matches all packets. This
can be because the rule has no matches, or because the only
matches are comments.
"""
for match in self.__match_list:
if not isinstance(match, CommentMatch):
return False
return True
[docs] def to_iptables_args(self) -> List[str]:
"""Returns a list suitable to be used as an argument to
the **iptables(8)** command
Raises an :exc:`IptablesError` if this is an unparsed rule
"""
if self.parsing_failed():
raise IptablesError(f'unable to parse rule: {self.__iptables_line}')
retval = []
for match in self.__match_list:
retval += match.to_iptables_args()
if self.__target is not None:
target_args = self.__target.to_iptables_args()
if target_args:
retval.append('-g' if self.__uses_goto else '-j')
retval += target_args
return retval
[docs] def jump_to(self, *, target: Optional[Target] =None,
chain: Optional['Chain'] =None) -> 'ChainRule':
"""Add a jump to the specified target.
The target is identified either via the ``target`` argument or
via the ``chain`` argument.
Raises an :exc:`IptablesError` if:
- both ``target`` and ``chain`` arguments are not ``None``
- the rule is already part of a :class:`Chain`
:param target: optional :class:`Target` object
:param chain: optional :class:`Chain` object
:rtype: this :class:`ChainRule` object
"""
if self.__owner_chain is not None:
rcn = self.__owner_chain.get_real_name()
raise IptablesError(f'rule already inserted in chain {rcn}')
if target is not None and chain is not None:
raise IptablesError('both target and chain specified')
if chain is not None:
target = ChainTarget(chain=chain)
self.__target = target
return self
[docs] def go_to(self, *, chain: 'Chain') -> 'ChainRule':
"""Add a goto to the specified chain.
Raises an :exc:`IptablesError` if the rule is already part
of a :class:`Chain`
:param chain: a :class:`Chain` object
:rtype: this :class:`ChainRule` object
"""
if self.__owner_chain is not None:
rcn = self.__owner_chain.get_real_name()
raise IptablesError(f'rule already inserted in chain {rcn}')
self.__target = ChainTarget(chain=chain)
self.__uses_goto = True
return self
[docs] def zero_counters(self) -> None:
"""Zero the packet and byte counters of this rule
"""
if self.__owner_chain is None:
raise IptablesError('rule not in a chain')
pft = self.__owner_chain.get_pft()
if pft is None:
raise IptablesError('rule belongs to chain that is not in kernel')
pft.zero_counters(chain=self.__owner_chain, rulenum=self.__rulenum)
__PROTO_NAMES = set(('tcp', 'udp', 'udplite', 'icmp',
'esp', 'ah', 'sctp', 'all'))
@classmethod
def __parse_target_name(cls, field_iter: LookaheadIterator):
"""Returns the target_name if present, otherwise None
"""
#
# Sample lines:
# pkts bytes target prot opt in out source destination
# 0 0 !22 -- * * 0.0.0.0/0 0.0.0.0/0
# 0 0 foo all -- * * 0.0.0.0/0 0.0.0.0/0
#
# The field_iter has already returned the pkts and bytes fields.
#
candidate = next(field_iter)
# If all upper-case, assume it is a target
if candidate.isupper():
return candidate
# If it starts with a '!', it is a protocol
if candidate[0] == '!' or candidate.isdigit():
field_iter.put_back(candidate)
return None
if candidate not in cls.__PROTO_NAMES:
return candidate
# At this point we know that the candidate field matches a protocol
# name, but it is possible that there is a chain named after a
# protocol. So we test if the next field might be a protocol
nextone = field_iter.peek()
if nextone[0] == '!':
nextone = nextone[1:]
if nextone.isdigit() or nextone in cls.__PROTO_NAMES:
return candidate
field_iter.put_back(candidate)
return None
[docs] @classmethod
def create_from_existing(cls, # pylint: disable=too-many-locals
iptables_output_line: str,
pft: 'IptablesPacketFilterTable') -> 'ChainRule':
"""Create a ChainRule from a line of ``iptables -xnv`` output
:param iptables_output_line: line of ``iptables -xnv`` output
:param pft: an :class:`IptablesPacketFilterTable` object
"""
fields = iptables_output_line.split()
# Minimum lookahead is 3, in order to support put_back based
# on peek'ed value
field_iter = LookaheadIterator(fields, 3)
try:
packet_count = int(next(field_iter))
byte_count = int(next(field_iter))
target_name = cls.__parse_target_name(field_iter)
# pylint: disable=protected-access
packet_match = PacketMatch._parse(field_iter, ipv6=pft.is_ipv6())
# pylint: enable=protected-access
# Check for '[goto]'
uses_goto = False
if field_iter.peek() == '[goto]':
_ = next(field_iter)
uses_goto = True
parser = MatchParser(field_iter, ipv6=pft.is_ipv6())
match_list = parser.parse_matches()
new_field_iter = RuleFieldIterator(field_iter, 3)
parser = TargetParser(target_name, new_field_iter,
ipv6=pft.is_ipv6())
target = parser.parse_target(uses_goto)
stored_fields = new_field_iter.get_stored_fields()
if stored_fields:
raise IptablesParsingError(
f"unparsed fields: {' '.join(stored_fields)}")
except IptablesParsingError as parserr:
parserr.set_line(iptables_output_line)
raise
except StopIteration as stopit:
raise IptablesParsingError('insufficient number of fields',
line=iptables_output_line) from stopit
except ValueError as valerr:
raise IptablesParsingError('bad field value',
line=iptables_output_line) from valerr
rule = ChainRule(match=packet_match, match_list=match_list,
target=target, uses_goto=uses_goto)
rule._set_stats(packet_count=packet_count, byte_count=byte_count)
if isinstance(target, (ChainTarget, UnparsedTarget)):
# An UnparsedTarget may really be a ChainTarget
# We currently assume that targets with all-upper-case names
# are target extensions, but this may not always be the case.
# So we will try to resolve the target name as a chain name
# after we have collected all the chain names.
pft._add_unresolved_rule(rule) # pylint: disable=protected-access
rule._set_iptables_line(iptables_output_line, True)
return rule
@classmethod
def _create_unparsed_rule(cls, iptables_line: str) -> 'ChainRule':
"""Create an unparsed rule
"""
rule = ChainRule()
rule._set_iptables_line(iptables_line, False)
return rule