Source code for linuxnet.iptables.matches.statisticmatch
# Copyright (c) 2023, Panagiotis Tsirigotis
# This file is part of linuxnet-iptables.
#
# linuxnet-iptables is free software: you can redistribute it and/or
# modify it under the terms of version 3 of the GNU Affero General Public
# License as published by the Free Software Foundation.
#
# linuxnet-iptables is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
# License for more details.
#
# You should have received a copy of the GNU Affero General
# Public License along with linuxnet-iptables. If not, see
# <https://www.gnu.org/licenses/>.
"""
This module provides matching against IP sets defined by ipset(8)
"""
from typing import Iterable
from ..deps import get_logger
from .match import Match, Criterion, MatchParser
from .util import GenericCriterion, GenericPositiveCriterion
_logger = get_logger('linuxnet.iptables.matches.statisticmatch')
[docs]class StatisticMatch(Match):
"""Match packets based on some statistic condition
"""
def __init__(self):
self.__mode_crit = None
self.__probability_crit = None
self.__every_crit = None
self.__packet_crit = None
[docs] @staticmethod
def get_match_name():
"""Returns the **iptables(8)** match extension name
"""
return 'statistic'
[docs] def get_criteria(self) -> Iterable[Criterion]:
"""Returns the set match criteria.
"""
return (self.__mode_crit, self.__probability_crit,
self.__every_crit, self.__packet_crit)
[docs] def mode(self) -> GenericPositiveCriterion:
"""Returns the criterion that identifies the matching mode
"""
if self.__mode_crit is None:
self.__mode_crit = GenericPositiveCriterion(self, "--mode")
return self.__mode_crit
[docs] def probability(self) -> GenericCriterion:
"""Set the probability for ``random`` mode; the criterion value
is a floating-point number.
"""
if self.__probability_crit is None:
self.__probability_crit = GenericCriterion(self, "--probability")
return self.__probability_crit
[docs] def every(self) -> GenericCriterion:
"""Identify the packet to match for the ``nth`` mode; the criterion
value is an integer.
"""
if self.__every_crit is None:
self.__every_crit = GenericCriterion(self, "--every")
return self.__every_crit
[docs] def packet(self) -> GenericPositiveCriterion:
"""Set the initial counter value for the ``nth`` mode; the criterion
value is an integer.
"""
if self.__packet_crit is None:
self.__packet_crit = GenericPositiveCriterion(self, "--packet")
return self.__packet_crit
@classmethod
def parse(cls, # pylint: disable=too-many-branches
parser: MatchParser) -> Match:
"""Possible output::
statistic mode random probability 0.50000000000
statistic mode random ! probability 0.10000000009
statistic mode nth every 100 packet 10
statistic mode nth ! every 100 packet 10
:meta private:
"""
criteria_iter = parser.get_iter()
parser.skip_field('mode')
match = StatisticMatch()
match.mode().equals(next(criteria_iter))
for val in criteria_iter:
positive_match = True
if val == '!':
positive_match = False
val = next(criteria_iter)
if val == 'probability':
match.probability().compare(positive_match,
float(next(criteria_iter)))
elif val == 'every':
match.every().compare(positive_match, int(next(criteria_iter)))
elif val == 'packet':
match.packet().compare(positive_match, int(next(criteria_iter)))
else:
criteria_iter.put_back(val)
if not positive_match:
criteria_iter.rewind()
break
return match
MatchParser.register_match('statistic', StatisticMatch)