Writing Python modules
AxoSyslog has comprehensive support for implementing various logging components in Python. This chapter shows you how to use this functionality.
When to use Python
The Python bindings are useful if the facilities provided by the AxoSyslog configuration language is not sufficient, that is:
- AxoSyslog doesn’t support a specific service (for example, API based log sources or information sources that you want to use for data enrichment purposes).
- The AxoSyslog configuration language does not support a specific transformation (in that case, please tell us about your use-case).
- You want to work on complex data structures (for example, JSON) which easier to do in a real programming language.
While Python is very powerful and you can produce clean and production ready solutions with it, the drawback is usually performance. Python code is usually slower than the native functionality that AxoSyslog provides.
To offset this impact of performance degradation, it’s a good strategy to only process a subset of the incoming log stream with Python code and use native configuration elements to select which subset is traversing said Python code.
Creating and storing the Python code
You can embed Python code directly into syslog-ng.conf
, or work with Python modules.
Embedding Python into AxoSyslog configuration
You can simply use a top-level python {}
block to embed your Python code, like this:
@version: 4.0
python {
def template_function(msg):
return b"Hello World from Python! Original message: " + msg['MSGHDR'] + msg['MESSAGE']
};
log {
source { tcp(port(2000)); };
destination { file("logfile" template("$ISODATE $(python template_function)")); };
};
Using Python modules
You can also put your code into a proper Python module and then use it from there. AxoSyslog automatically adds ${sysconfdir}/python
to your PYTHONPATH (normally /etc/syslog-ng/python
), with that in mind add the following code to /etc/syslog-ng/python/mytemplate.py
:
def template_function(msg):
return b"Hello World from Python! Original message: " + msg['MSGHDR'] + msg['MESSAGE']
The Python glue in AxoSyslog automatically imports modules when it encounters an identifier in dotted notation, so if you use this AxoSyslog config:
@version: 4.0
log {
source { tcp(port(2000)); };
destination { file("logfile" template("$ISODATE $(python mytemplate.template_function)")); };
};
AxoSyslog recognizes that mytemplate.template_function
is a qualified name and attempts to import mytemplate
as a module and then looks up template_function
within that module.
Note: Modules are only imported once, so you will need to restart AxoSyslog for a change to take effect.
AxoSyslog reload and Python
When you reload syslog-ng
(with syslog-ng-ctl
reload or systemctl reload syslog-ng
) then the python
block in your configuration is reloaded with the rest of the configuration file. Any changes you make in Python code directly embedded in your configuration takes effect after the reload. This also means that any global variables are reset, so you cannot store state across reloads in your python {}
block.
Modules are only imported once and kept across reloads, even if the AxoSyslog configuration is reloaded. This means that you can store global state in modules and they will be kept, even as AxoSyslog reinitializes the configuration.
In case you want to reload a module every time AxoSyslog configuration is reinitialized, you need to do this explicitly with a code similar to this:
python {
import mymodule
import importlib
# reload mymodule every time syslog-ng reloads
importlib.reload(mymodule)
};
Destination driver
You can derive a destination driver in Python from the LogDestination
class, as defined by the syslogng
module, like in the following example:
mydestination.py
:
from syslogng import LogDestination
class MyDestination(LogDestination):
def send(self, msg):
return True
The interface of the LogDestination
class is documented in the syslogng.dest
module, which is stored in the modules/python-modules/syslogng/dest.py
file of the source tree.
Once all required methods are implemented, you can use the python
destination in the AxoSyslog configuration language.
destination whatever {
python(class(mydestination.MyDestination));
};
There’s a more complete example destination in the python_example()
destination plugin, that is located in the directory modules/python-modules/syslogng/modules/example/
directory within the source tree, or the same files installed under ${exec_prefix}/syslog-ng/python/syslogng/modules
in a production deployment.
Template function plugin
Template functions extend the AxoSyslog template language. They get a LogMessage
object and return a string which gets embedded into the
output of the template. You can have AxoSyslog call a Python function from the template language using the $(python)
template function.
@version: 4.0
python {
def template_function(msg):
return b"Hello World from Python! Original message: " + msg['MSGHDR'] + msg['MESSAGE']
};
...
destination d_file {
file("/var/log/whatever" template("$(python template_function)"));
};
The Python function must be callable. IT receives a LogMessage
instance and returns a string (str
or bytes
). The message passed to the template function is read-only. If you are trying to change a name-value pair, you will receive an exception.
Parser plugin
You can derive parser plugins in Python from the LogParser
class as this example shows:
from syslogng import LogParser
class MyParser(LogParser):
def parse(self, msg):
msg['name'] = 'value'
return True
In contrast to template functions, parsers receive a read-writable LogMessage
object, so you can modify its contents.
Source plugins
There are two kinds of source plugins that can be implemented in Python:
LogFetcher
:LogFetcher
provides a convenient interface for fetching messages from backend services via blocking APIs, but it is limited to performing the fetching operation in a sequential manner: you fetch a batch of messages, feed them to the AxoSyslog pipeline, then repeat.LogSource
:LogSource
is more low-level but allows the use of an asynchronous framework (for example,asyncio
) to perform message fetching along multiple threads of execution.
Both are defined by the syslogng.source
module.
Source driver based on LogFetcher
LogFetcher
provides a convenient interface for fetching messages from backend services via blocking APIs, but it is limited to performing the fetching operation in a sequential manner: you fetch a batch of messages, feed them to the AxoSyslog pipeline, then repeat.
For a LogFetcher
class, you have to implement the fetch()
method. This is the main entry point, which is automatically invoked by syslog-ng
, whenever it consumes incoming messages.
@version: 4.0
python {
from syslogng import LogFetcher
from syslogng import LogMessage
import time
class MyFetcher(LogFetcher):
def fetch(self):
time.sleep(1)
msg = LogMessage.parse("<5>2022-02-02T10:23:45+02:00 HOST program[pid]: foobar", self.parse_options)
return self.SUCCESS, msg
};
log {
source { python-fetcher(class(MyFetcher)); };
destination { file("messages"); };
};
This example generates one message every second, based on a literal string that is parsed as a syslog message.
The source is running in a dedicated thread, so it is free to block.
To limit the rate of generating messages, the time.sleep(1)
call in the first line of the fetch()
method sleeps for 1 second between the invocations of the method. If that sleep wasn’t there, the source would produce about 100-110k messages per second, depending on the speed of your CPU, the performance of the Python interpreter and the syslog-ng
core.
If the fetcher connects to an external API, the sleep is usually not needed, as the response time of the API is a limiting factor.
Adding persistent state
If you are fetching messages from an API, you need to keep track of which messages were already fetched. Storing the position in a variable is not a good solution, because the value of the variable is lost when syslog-ng
is reloaded or restarted (depending on where you store that variable, in the python {}
block or in a module).
Use the Persist()
class that uses the persistent state handling functionality of AxoSyslog. This allows you to persist variables in a file that gets stored in the ${localstatedir}/syslog-ng.persist
file along with the rest of the syslog-ng
states.
class MyFetcher(LogFetcher):
def init(self, options):
self.persist = Persist("MyFetcher_persistent_data", defaults={"counter": 1})
return True
def fetch(self):
time.sleep(1)
counter = self.persist['counter']
self.persist['counter'] += 1
msg = LogMessage.parse("<5>2022-02-02T10:23:45+02:00 HOST program[pid]: foobar %d" % counter, self.parse_options)
return self.SUCCESS, msg
Once initialized, a Persist()
instance behaves as a dict where you can store Python values. Currently str
, bytes
and int
are supported. Anything you store in a persist instance is remembered even across restarts. The entries are backed up to disk immediately after changing them (using an mmap()
-ped file), so you don’t have to explicitly commit them to disk.
You can store position information in a Persist()
entry, but it’s not always the best choice. In AxoSyslog, producing messages is decoupled from their delivery: sometimes a message is still in-flight for a while before being delivered. This time can be significant if a destination consumes messages at a slow rate. In this case, if you store the position once fetched, the message would still be sitting in a queue waiting to be delivered. If the queue is not backed by a disk-buffer, then these messages would be lost, once syslog-ng
is restarted.
To anticipate this case, use bookmarks.
Bookmarks in a source
The bookmarking mechanism allows messages to carry individual markers that uniquely identify a message and its position in a source stream. For example, in a source file the bookmark would contain the position of the message within that file. An API may have a similar mechanism in place in which the source API associates an opaque to each message, which signifies its position in the repository.
A specific example for bookmarks is systemd-journald, which has a “cursor” indicating the position of each journal record. The cursor can be used to restart the reading of the log stream.
Once you’ve identified what mechanism the source offers that maps to the bookmark concept, decide how you want to track these bookmarks. Which bookmark tracking strategy you should use depends on the API specifics.
- Some APIs are sequential in nature, thus you can only acknowledge the “last fetch position” in that sequence.
- Other APIs allow you to acknowledge messages individually.
AxoSyslog supports both methods.
The following Python example updates the current position in a source stream only when the AxoSyslog destination has acknowledged the messages in the sequence (that is, when the messages were properly sent).
class MyFetcher(LogFetcher):
counter = 0
def init(self, options):
self.persist = Persist("MyFetcher_persistent_data", defaults={"position": 0})
self.counter = self.persist['position']
# pass self.message_acked method as ACK callback
self.ack_tracker = ConsecutiveAckTracker(ack_callback=self.message_acked)
return True
def message_acked(self, acked_message_bookmark):
# update current persisted position when syslog-ng delivered the
# message, but only then.
self.persist['position'] = acked_message_bookmark
def fetch(self):
time.sleep(1)
self.counter += 1
# depending on the speed of our consumer and the setting of
# flags(flow-control), the current counter and the acked value may
# differ in the messages generated.
msg = LogMessage.parse("<5>2022-02-02T10:23:45+02:00 HOST program[pid]: foobar %d (acked so far %d)" % (self.counter, self.persist['position']), self.parse_options)
# this is where we set the bookmark for the message
msg.set_bookmark(self.counter)
return self.SUCCESS, msg
Acknowledgement tracking strategies
Some APIs provide simple, while others provide more complex ways to track messages that are processed. AxoSyslog provides the following strategies to cope with them.
-
Instant tracking (
InstantAckTracker
): Messages are considered delivered as soon as the destination driver (or the disk-buffer) acknowledges them. Out-of-order deliveries are reported as they happen, so an earlier message may be acknowledged later than a message originally encountered later in the source stream. -
Consecutive tracking (
ConsecutiveAckTracker
): Messages are assumed to form a stream and the bookmark is a position in that stream. Unordered deliveries are properly handled by only acknowledging messages that were delivered in order. If unordered delivery happens, the tracker waits for the sequence to fill up, that is, it waits for all preceeding messages to be delivered as well. -
Batched tracking (
BatchedAckTracker
): Messages are assumed to be independent, not forming a sequence of events. Each message is individually tracked, the source driver has the means to get delivery notifications of each and every message independently. The acknowledgements are accumulated until a timeout happens, at which point they get reported as a single batch.
You can initialize your ack_tracker
in the init
method, like this:
class MyFetcher(LogFetcher):
...
def init(self, options):
# pass self.message_acked method as ACK callback
self.ack_tracker = ConsecutiveAckTracker(ack_callback=self.message_acked)
return True
def message_acked(self, acked_message_bookmark):
pass
def fetch(self):
...
msg.set_bookmark("whatever-bookmark-value-that-denotes-position")
...
The previous example uses ConsecutiveAckTracker
, so you get acknowledgements in the order messages were generated. The argument of the message_acked
callback is the “bookmark” value that you set using set_bookmark()
.
Using InstantAckTracker
is very similar, just replace ConsecutiveAckTracker
with InstantAckTracker
. In this case you’d get a callback as soon as a message is delivered without preserving the original ordering.
class MyFetcher(LogFetcher):
...
def init(self, options):
self.ack_tracker = InstantAckTracker(ack_callback=self.message_acked)
return True
def message_acked(self, acked_message_bookmark):
pass
While ConsecutiveAckTracker()
seems to provide a much more useful service, InstantAckTracker()
performs better, as it does not have to track acknowledgements of individual messages.
The most complex scenario is implemented by BatchedAckTracker
, this allows you to track the acknowledgements for individual messages, as they happen, not enforcing any kind of ordering.
class MyFetcher(LogFetcher):
...
def init(self, options):
self.ack_tracker = BatchedAckTracker(timeout=500, batch_size=100,
batched_ack_callback=self.messages_acked)
return True
def messages_acked(self, acked_message_bookmarks):
pass
BatchedAckTracker
calls your callback periodically, as set by the timeout
argument in milliseconds. batch_size
specifies the number of outstanding messages at a time.
With this interface it’s quite easy to send acknowledgements back to the source interface where per-message acknowledgements are needed (for example, Google PubSub).
Accessing the flags()
option
The state of the flags()
option is mapped to the self.flags
variable, which is a Dict[str, bool]
, for example:
{
'parse': True,
'check-hostname': False,
'syslog-protocol': True,
'assume-utf8': False,
'validate-utf8': False,
'sanitize-utf8': False,
'multi-line': True,
'store-legacy-msghdr': True,
'store-raw-message': False,
'expect-hostname': True,
'guess-timezone': False,
'header': True,
'rfc3164-fallback': True,
}
Source driver based on LogSource
LogSource
allows the use of an asynchronous framework (for example, asyncio
) to perform message fetching along multiple threads of execution.
The following example uses asyncio
to generate two independent sequences of messages: the first is generated every second, the other every 1.5 seconds, running concurrently via an asyncio
event loop.
It is also easy to create a source that implements an HTTP server, and which injects messages coming via HTTP to the AxoSyslog pipeline.
from syslogng import LogSource
from syslogng import LogMessage
import asyncio
class MySource(LogSource):
cancelled = False
def run(self):
asyncio.run(self.main())
async def main(self):
await asyncio.gather(self.sequence1(), self.sequence2())
async def sequence1(self):
while not self.cancelled:
await asyncio.sleep(1)
self.post_message(LogMessage("message1"))
async def sequence2(self):
while not self.cancelled:
await asyncio.sleep(1.5)
self.post_message(LogMessage("message2"))
def request_exit(self):
self.cancelled = True
Acknowledgement mechanisms (ConsecutiveAckTracker
, BatchedAckTracker
) and flags()
mapping can be used similarly to how it was described at LogFetcher
.
Making it more native config-wise
The examples so far used some form of python()
driver, for instance, this was the Python based destination driver:
destination whatever {
python(class(mydestination.MyDestination)
options(option1 => value,
option2 => value));
};
While this works, the syntax doesn’t look like other parts of the configuration, and is also hard to read. The usual syntax for referencing regular drivers is something like this:
destination whatever {
my-destination(option1(value) option2(value));
};
To make the syntax more native, you can use the block functionality to wrap your Python driver, hide that it’s actually Python, and provide a syntax to your code that is more convenient to use.
block destination my-destination(option1(value)
option2(value)) {
python(class(mydestination.MyDestination)
options(option1 => `option1`,
option2 => `option2`));
};
This block allows the use of the more AxoSyslog-native syntax, completely hiding the fact that the implementation is Python based, concentrating on functionality.
Add this wrapper to your Python module in an scl
subdirectory as a file with a .conf extension. AxoSyslog automatically includes these files along the rest of the SCL.
Adding the code to syslog-ng
To add your Python-based modules to syslog-ng
, complete the following steps.
- Create a Python package: add the
__init__.py
file and anything that the file references to themodules/python-modules/<name-of-your-module>
directory of the syslog-ng repository. - Add your files to the source tarball by listing them in the EXTRA_DIST variable of the
modules/python-modules/Makefile.am
file. - Run
make install
to install your module along the rest of thesyslog-ng
binaries. - Open a pull request.
External dependencies
If your Python code depends on third-party libraries, those need to be installed on the system where your code is deployed. If your deployment mechanism is based on DEB or RPM packages, make sure that you add these OS-level dependencies to the packages generated.
- For DEB packages, add the dependency package to the
Depends
line. - For RPM packages, add the dependency package as a
Requires
line to the.spec
file.
If you want to use pip/requirements.txt
to deploy dependencies, you can invoke pip during make install
time so that AxoSyslog’s private Python directory would contain all the dependencies that you require.
Adding Python code to the DEB package
To add your module to the syslog-ng
DEB package, complete the following steps.
-
Create a new file in
packaging/debian/
calledsyslog-ng-mod-<yourmodule>.install
. -
Populate this file with wildcard patterns that capture the files of your package after installation. For example:
usr/lib/syslog-ng/python/syslogng/modules/<yourmodule>/*
-
Add an entry to
packaging/debian/control
:Package: syslog-ng-mod-<yourmodule> Architecture: any Multi-Arch: foreign Depends: ${shlibs:Depends}, ${misc:Depends}, syslog-ng-core (>= ${source:Version}), syslog-ng-core (<< ${source:Version}.1~), syslog-ng-mod-python Description: The short description of the package This is a longer description with dots separating paragraphs. . This package provides a collection of example plugins.
-
Add your
.install
file to the tarball by adding it to theEXTRA_DIST
in theMakefile.am
.
Adding Python code to RPM packages
The RPM package is less modular than the Debian package and it automatically captures all Python modules in the syslog-ng-python
package without having to list them explicitly.
If you need to customize the installation, you can find the spec file in packaging/rhel/syslog-ng.spec
which is populated and copied to the root at tarball creation.