Source code for linuxnet.iptables.targets.settarget
# Copyright (c) 2025, Panagiotis Tsirigotis
# This file is part of linuxnet-iptables.
#
# linuxnet-iptables is free software: you can redistribute it and/or
# modify it under the terms of version 3 of the GNU Affero General Public
# License as published by the Free Software Foundation.
#
# linuxnet-iptables is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
# License for more details.
#
# You should have received a copy of the GNU Affero General
# Public License along with linuxnet-iptables. If not, see
# <https://www.gnu.org/licenses/>.
"""
This module provides the SetTarget class which provides access to
the iptables SET target.
"""
from typing import List, Optional
from ..deps import get_logger
from ..exceptions import IptablesError, IptablesParsingError
from .target import Target, TargetParser
_logger = get_logger("linuxnet.iptables.targets.settarget")
[docs]class SetTarget(Target):
"""This class provides access to the ``SET`` target
"""
#: Add operation
ADD_SET = 1
#: Delete operation
DEL_SET = 2
_op_map = {
ADD_SET : "--add-set",
DEL_SET : "--del-set",
}
def __init__(self,
operation: int,
ipset_name: str,
ipset_flags: List[str],
*,
timeout: Optional[int] =None,
exist: Optional[bool] = False):
"""
:param operation: identifies what ipset operation to perform;
possible values include ``SetTarget.ADD_SET`` or
``SetTarget.DEL_SET``
:param ipset_name: name of ipset
:param ipset_flags: list of flags identifying packet data; at
least one flag must be present, otherwise an
:exc:`IptablesError` exception will be raised
:param timeout: timeout value for added entries
:param exist: if ``True``, reset timeout when adding an entry
that exists already
"""
if operation not in (self.ADD_SET, self.DEL_SET):
raise IptablesError(
f"unknown operation for SET target: {operation}")
super().__init__('SET', terminates=False)
self.__operation = operation
self.__ipset_name = ipset_name
self.__ipset_flags = ipset_flags
self.__timeout = timeout
self.__exist = exist
[docs] def get_ipset_name(self) -> str:
"""Returns the ipset name
"""
return self.__ipset_name
[docs] def get_ipset_flags(self) -> List[str]:
"""Returns the ipset flags
"""
return self.__ipset_flags
[docs] def get_operation(self) -> int:
"""Returns the operation
"""
return self.__operation
[docs] def is_updating_existing(self) -> bool:
"""Returns ``True`` if the ``--exist`` option is set.
"""
return self.__exist
[docs] def update_existing(self) -> Target:
"""Set the ``--exist`` option.
"""
self.__exist = True
return self
[docs] def get_timeout(self) -> Optional[int]:
"""Returns the timeout, or ``None``
"""
return self.__timeout
[docs] def set_timeout(self, timeout: Optional[int]) -> Target:
"""Set the timeout value for entries added to the ipset.
"""
self.__timeout = timeout
return self
[docs] def to_iptables_args(self) -> List[str]:
"""Returns a list of **iptables(8)** arguments
"""
retval = super().to_iptables_args()
retval.append(self._op_map[self.__operation])
retval.append(self.__ipset_name)
retval.append(",".join(self.__ipset_flags))
if self.__timeout:
retval.extend(["--timeout", str(self.__timeout)])
if self.__exist:
retval.append("--exist")
return retval
@classmethod
def parse(cls, parser: TargetParser) -> Target:
"""Parse the SET target options
The target part of the rule looks like this:
add-set testchain_set src,dst exist timeout 100
:meta private:
"""
timeout = None
exist = False
operation = 0
field_iter = parser.get_field_iter()
field_iter.rewind()
opstr = next(field_iter)
if opstr == 'add-set':
operation = cls.ADD_SET
elif opstr == 'del-set':
operation = cls.DEL_SET
else:
raise IptablesParsingError(f'unknown SET target operation: {opstr}')
ipset_name = next(field_iter)
flagstr = next(field_iter)
for val in field_iter:
if val == 'exist':
exist = True
elif val == 'timeout':
timeout = int(field_iter.next_value(val))
else:
raise IptablesParsingError(f'unknown target option: {val}')
target = SetTarget(operation, ipset_name, flagstr.split(','),
timeout=timeout, exist=exist)
return target
TargetParser.register_target('SET', SetTarget, ('add-set', 'del-set'))