Source code for linuxnet.iptables.targets.nflogtarget
# Copyright (c) 2024, Panagiotis Tsirigotis
# This file is part of linuxnet-iptables.
#
# linuxnet-iptables is free software: you can redistribute it and/or
# modify it under the terms of version 3 of the GNU Affero General Public
# License as published by the Free Software Foundation.
#
# linuxnet-iptables is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
# License for more details.
#
# You should have received a copy of the GNU Affero General
# Public License along with linuxnet-iptables. If not, see
# <https://www.gnu.org/licenses/>.
"""
This module provides the NFLogTarget class which provides access to
the iptables NFLOG target.
"""
from typing import List, Optional
from ..deps import get_logger
from ..exceptions import IptablesParsingError
from .target import Target, TargetParser
_logger = get_logger("linuxnet.iptables.targets.logtarget")
[docs]class NFLogTarget(Target):
"""This class provides access to the ``NFLOG`` target
"""
def __init__(self, *, # pylint: disable=too-many-arguments
group: Optional[int] = None,
prefix: Optional[str] =None,
size: Optional[int] =None,
threshold: Optional[int] =None):
"""
:param group: netlink group to send packets to
:param prefix: prefix to include in every log message
:param size: number of packets to copy to userspace
:param threshold: number of packets to queue in kernel before
sending to userspace
"""
super().__init__('NFLOG', terminates=False)
self.__group = group
self.__prefix = prefix
self.__size = size
self.__threshold = threshold
[docs] def get_nflog_group(self) -> Optional[int]:
"""Returns the nflog group
"""
return self.__group
[docs] def get_nflog_prefix(self) -> Optional[str]:
"""Returns the nflog prefix
"""
return self.__prefix
[docs] def get_nflog_size(self) -> Optional[int]:
"""Returns the nflog size
"""
return self.__size
[docs] def get_nflog_threshold(self) -> Optional[int]:
"""Returns the nflog threshold
"""
return self.__threshold
[docs] def to_iptables_args(self) -> List[str]:
"""Returns a list of **iptables(8)** arguments
"""
retval = super().to_iptables_args()
if self.__group:
retval += ['--nflog-group', str(self.__group)]
if self.__prefix:
retval += ['--nflog-prefix', self.__prefix]
if self.__size:
retval += ['--nflog-size', str(self.__size)]
if self.__threshold:
retval += ['--nflog-threshold', str(self.__threshold)]
return retval
@classmethod
def parse(cls, parser: TargetParser) -> Target:
"""Parse the NFLOG target options
:meta private:
"""
nflog_group = None
nflog_prefix = None
nflog_size = None
nflog_threshold = None
field_iter = parser.get_field_iter()
field_iter.rewind()
for val in field_iter:
if val == 'nflog-group':
nflog_group = int(field_iter.next_value(val))
elif val == 'nflog-threshold':
nflog_threshold = int(field_iter.next_value(val))
elif val == 'nflog-size':
nflog_size = int(field_iter.next_value(val))
elif val == 'nflog-range':
_logger.warning("ignoring nflog-range option")
_ = field_iter.next_value(val)
elif val == 'nflog-prefix':
prefix = field_iter.next_value(val)
if prefix[0] == '"':
#
# Consume fields until the one containing the
# closing double-quote is located.
#
while True:
field = next(field_iter)
prefix += ' ' + field
# We are done if the last field character is
# a back-quote, which is not back-slash escaped
if (field[-1] == '"' and
(len(field) == 1 or field[-2] != "\\")):
break
# truncate double-quotes
nflog_prefix = prefix[1:-1]
else:
nflog_prefix = prefix
else:
raise IptablesParsingError(f'unknown target option: {val}')
target = NFLogTarget(group=nflog_group, prefix=nflog_prefix,
size=nflog_size, threshold=nflog_threshold)
return target
TargetParser.register_target('NFLOG', NFLogTarget, 'nflog-', prefix_match=True)