Options of the kafka() destination's C implementation
The C implementation of the kafka()
destination of AxoSyslog can directly publish log messages to the Apache Kafka message bus, where subscribers can access them. The C implementation of the kafka()
destination has the following options.
Required options:
The following options are required: bootstrap-servers()
, topic()
. Note that to use the C implementation of the kafka()
destination, you must add the following lines to the beginning of your AxoSyslog configuration:
@define kafka-implementation kafka-c
batch-lines()
Type: | number |
Default: | 1 |
Description: Specifies how many lines are flushed to a destination in one batch. The AxoSyslog application waits for this number of lines to accumulate and sends them off in a single batch. Increasing this number increases throughput as more messages are sent in a single batch, but also increases message latency.
For example, if you set batch-lines()
to 100, AxoSyslog waits for 100 messages.
If the batch-timeout()
option is disabled, the AxoSyslog application flushes the messages if it has sent batch-lines()
number of messages, or the queue became empty. If you stop or reload AxoSyslog or in case of network sources, the connection with the client is closed, AxoSyslog automatically sends the unsent messages to the destination.
Note that if the batch-timeout()
option is enabled and the queue becomes empty, AxoSyslog flushes the messages only if batch-timeout()
expires, or the batch reaches the limit set in batch-lines()
.
For optimal performance, make sure that the AxoSyslog source that feeds messages to this destination is configured properly: the value of the log-iw-size()
option of the source must be higher than the batch-lines()
*workers()
of the destination. Otherwise, the size of the batches cannot reach the batch-lines()
limit.
batch-timeout()
Type: | time in milliseconds |
Default: | -1 (disabled) |
Description: Specifies the time AxoSyslog waits for lines to accumulate in the output buffer. The AxoSyslog application sends batches to the destinations evenly. The timer starts when the first message arrives to the buffer, so if only few messages arrive, AxoSyslog sends messages to the destination at most once every batch-timeout()
milliseconds.
batch-lines()
with sync-send()
set to both "yes"
or "no"
, but the option will only take effect if you set sync-send()
to "yes"
.
sync-send()
to "yes"
, the number you specify for batch-lines()
affects how many messages AxoSyslog packs into once transaction.
When setting batch-timeout()
, consider the value of the transaction.timeout.ms
Kafka property. If in case of timeout (that is, if AxoSyslog does not receive batch-lines()
amount of messages) the value of batch-timeout()
exceeds the value of transaction.timeout.ms
, AxoSyslog will not send out messages in time.
For more information about the default values of the transaction.timeout.ms
Kafka property, see the librdkafka documentation.
bootstrap-servers()
Type: | string |
Default: |
Description: Specifies the hostname or IP address of the Kafka server. When specifying an IP address, IPv4 (for example, 192.168.0.1
) or IPv6 (for example, [::1]
) can be used as well. Use a colon (:
) after the address to specify the port number of the server. When specifying multiple addresses, use a comma to separate the addresses, for example, bootstrap-servers("127.0.0.1:2525,remote-server-hostname:6464")
client-lib-dir()
Type: | string |
Default: | The AxoSyslog module directory: /opt/syslog-ng/lib/syslog-ng/java-modules/ |
Description: The list of the paths where the required Java classes are located. For example, class-path("/opt/syslog-ng/lib/syslog-ng/java-modules/:/opt/my-java-libraries/libs/")
. If you set this option multiple times in your AxoSyslog configuration (for example, because you have multiple Java-based destinations), AxoSyslog will merge every available paths to a single list.
For the kafka
destination, include the path to the directory where you copied the required libraries (see Prerequisites), for example, client-lib-dir("/opt/syslog-ng/lib/syslog-ng/java-modules/KafkaDestination.jar:/usr/share/kafka/lib/*.jar")
.
client-lib-dir()
option has no significant role in the C implementation of the kafka()
destination. The programming language accepts this option for better compatibility.
config()
Description: You can use this option to expand or override the options of the properties-file()
.
The AxoSyslog kafka
destination supports all properties of the official Kafka producer. For details, see the librdkafka documentation.
The syntax of the config() option is the following:
config(
“key1” => “value1”
“key2” => “value2”
)
disk-buffer()
Description: This option enables putting outgoing messages into the disk buffer of the destination to avoid message loss in case of a system failure on the destination side. It has the following options:
capacity-bytes()
Type: | number (bytes) |
---|---|
Default: | 1MiB |
Description: This is a required option. The maximum size of the disk-buffer in bytes. The minimum value is 1048576
bytes. If you set a smaller value, the minimum value will be used automatically. It replaces the old log-disk-fifo-size()
option.
In AxoSyslog version 4.2 and earlier, this option was called disk-buf-size()
.
compaction()
Type: | yes/no |
---|---|
Default: | no |
Description: If set to yes
, AxoSyslog prunes the unused space in the LogMessage representation, making the disk queue size smaller at the cost of some CPU time. Setting the compaction()
argument to yes
is recommended when numerous name-value pairs are unset during processing, or when the same names are set multiple times.
unset()
rewrite operation is not enough, as due to performance reasons that help when AxoSyslog is CPU bound, the internal representation of a LogMessage
will not release the memory associated with these name-value pairs. In some cases, however, the size of this overhead becomes significant (the raw message size can grow up to four times its original size), which unnecessarily increases the disk queue file size. For these cases, the compaction will drop unset
values, making the LogMessage
representation smaller at the cost of some CPU time required to perform compaction.
dir()
Type: | string |
---|---|
Default: | N/A |
Description: Defines the folder where the disk-buffer files are stored.
When creating a new dir()
option for a disk buffer, or modifying an existing one, make sure you delete the persist file.
AxoSyslog creates disk-buffer files based on the path recorded in the persist file. Therefore, if the persist file is not deleted after modifying the dir()
option, then following a restart, AxoSyslog will look for or create disk-buffer files in their old location. To ensure that AxoSyslog uses the new dir()
setting, the persist file must not contain any information about the destinations which the disk-buffer file in question belongs to.
dir()
path provided by the user does not exist, AxoSyslog creates the path with the same permission as the running instance.
flow-control-window-bytes()
Type: | number (bytes) |
---|---|
Default: | 163840000 |
Description: Use this option if the option reliable()
is set to yes
. This option contains the size of the messages in bytes that is used in the memory part of the disk buffer. It replaces the old log-fifo-size()
option. It does not inherit the value of the global log-fifo-size()
option, even if it is provided. Note that this option will be ignored if the option reliable()
is set to no
.
In AxoSyslog version 4.2 and earlier, this option was called mem-buf-size()
.
flow-control-window-size()
Type: | number(messages) |
---|---|
Default: | 10000 |
Description: Use this option if the option reliable()
is set to no
. This option contains the number of messages stored in overflow queue. It replaces the old log-fifo-size()
option. It inherits the value of the global log-fifo-size()
option if provided. If it is not provided, the default value is 10000
messages. Note that this option will be ignored if the option reliable()
is set to yes
.
In AxoSyslog version 4.2 and earlier, this option was called mem-buf-length()
.
front-cache-size()
Type: | number(messages) |
---|---|
Default: | 1000 |
Description: The number of messages stored in the output buffer of the destination. Note that if you change the value of this option and the disk-buffer already exists, the change will take effect when the disk-buffer becomes empty.
Options reliable()
and capacity-bytes()
are required options.
In AxoSyslog version 4.2 and earlier, this option was called qout-size()
.
prealloc()
Type: | yes/no |
---|---|
Default: | no |
Description:
By default, AxoSyslog doesn’t reserve the disk space for the disk-buffer file, since in a properly configured and sized environment the disk-buffer is practically empty, so a large preallocated disk-buffer file is just a waste of disk space. But a preallocated buffer can prevent other data from using the intended buffer space (and elicit a warning from the OS if disk space is low), preventing message loss if the buffer is actually needed. To avoid this problem, when using AxoSyslog 4.0 or later, you can preallocate the space for your disk-buffer files by setting prealloc(yes)
.
In addition to making sure that the required disk space is available when needed, preallocated disk-buffer files provide radically better (3-4x) performance as well: in case of an outage the amount of messages stored in the disk-buffer is continuously growing, and using large continuous files is faster, than constantly waiting on a file to change its size.
If you are running AxoSyslog on a dedicated host (always recommended for any high-volume settings), use prealloc(yes)
.
Available in AxoSyslog 4.0 and later.
reliable()
Type: | yes/no |
---|---|
Default: | no |
Description: If set to yes
, AxoSyslog cannot lose logs in case of reload/restart, unreachable destination or AxoSyslog crash. This solution provides a slower, but reliable disk-buffer option. It is created and initialized at startup and gradually grows as new messages arrive. If set to no
, the normal disk-buffer will be used. This provides a faster, but less reliable disk-buffer option.
reliable()
option when there are messages in the disk-buffer, the messages stored in the disk-buffer will be lost.
truncate-size-ratio()
Type: | number((between 0 and 1)) |
---|---|
Default: | 1 (do not truncate) |
Description: Limits the truncation of the disk-buffer file. Truncating the disk-buffer file can slow down the disk IO operations, but it saves disk space. By default, AxoSyslog version 4.0 and later doesn’t truncate disk-buffer files by default (truncate-size-ratio(1)
). Earlier versions freed the disk-space when at least 10% of the disk-buffer file could be freed (truncate-size-ratio(0.1)
).
AxoSyslog only truncates the file if the possible disk gain is more than truncate-size-ratio()
times capacity-bytes()
.
- Smaller values free disk space quicker.
- Larger ratios result in better performance.
If you want to avoid performance fluctuations:
- use
truncate-size-ratio(1)
(never truncate), or - use
prealloc(yes)
to reserve the entire size of the disk-buffer on disk.
truncate-size-ratio()
. Only change its value if you understand the performance implications of doing so.
Example: Examples for using disk-buffer()
In the following case reliable disk-buffer() is used.
destination d_demo {
network(
"127.0.0.1"
port(3333)
disk-buffer(
flow-control-window-bytes(10000)
capacity-bytes(2000000)
reliable(yes)
dir("/tmp/disk-buffer")
)
);
};
In the following case normal disk-buffer() is used.
destination d_demo {
network(
"127.0.0.1"
port(3333)
disk-buffer(
flow-control-window-size(10000)
capacity-bytes(2000000)
reliable(no)
dir("/tmp/disk-buffer")
)
);
};
frac-digits()
Type: | number |
Default: | 0 |
Description: The AxoSyslog application can store fractions of a second in the timestamps according to the ISO8601 format. The frac-digits()
parameter specifies the number of digits stored. The digits storing the fractions are padded by zeros if the original timestamp of the message specifies only seconds. Fractions can always be stored for the time the message was received.
frac-digits()
option is set to a value higher than 6, AxoSyslog will truncate the fraction seconds in the timestamps after 6 digits.
flush-timeout-on-reload()
Type: | integer in msec |
Default: | 1000 |
Description: When AxoSyslog reloads, the Kafka client will also reload. The flush-timeout-on-reload()
option specifies the number of milliseconds AxoSyslog waits for the Kafka client to send the unsent messages. The unsent messages will be retained in syslog-ng
’s own queue and AxoSyslog will continue sending them after reload. This works without disk-buffering, too.
flush-timeout-on-shutdown()
Type: | integer in msec |
Default: | 60000 |
Description: When AxoSyslog shuts down, the Kafka client will also shut down. The flush-timeout-on-shutdown()
option specifies the number of milliseconds AxoSyslog waits for the Kafka client to send the unsent messages. Any messages not sent after the specified time will be lost. To avoid losing messages, we recommend you use the disk-buffer option.
hook-commands()
Description: This option makes it possible to execute external programs when the relevant driver is initialized or torn down. The hook-commands()
can be used with all source and destination drivers with the exception of the usertty()
and internal()
drivers.
Using hook-commands()
when AxoSyslog starts or stops
To execute an external program when AxoSyslog starts or stops, use the following options:
startup()
Type: | string |
---|---|
Default: | N/A |
Description: Defines the external program that is executed as AxoSyslog starts.
shutdown()
Type: | string |
---|---|
Default: | N/A |
Description: Defines the external program that is executed as AxoSyslog stops.
Using the hook-commands() when AxoSyslog reloads
To execute an external program when the AxoSyslog configuration is initiated or torn down, for example, on startup/shutdown or during a AxoSyslog reload, use the following options:
setup()
Type: | string |
---|---|
Default: | N/A |
Description: Defines an external program that is executed when the AxoSyslog configuration is initiated, for example, on startup or during a AxoSyslog reload.
teardown()
Type: | string |
---|---|
Default: | N/A |
Description: Defines an external program that is executed when the AxoSyslog configuration is stopped or torn down, for example, on shutdown or during a AxoSyslog reload.
Example: Using hook-commands() with a network source
In the following example, the hook-commands()
is used with the network()
driver and it opens an iptables port automatically as AxoSyslog is started/stopped.
The assumption in this example is that the LOGCHAIN
chain is part of a larger ruleset that routes traffic to it. Whenever the AxoSyslog created rule is there, packets can flow, otherwise the port is closed.
source {
network(transport(udp)
hook-commands(
startup("iptables -I LOGCHAIN 1 -p udp --dport 514 -j ACCEPT")
shutdown("iptables -D LOGCHAIN 1")
)
);
};
key()
Type: | template |
Default: | empty string |
Description: The key of the partition under which the message is published. You can use templates to change the topic dynamically based on the source or the content of the message, for example, key("${PROGRAM}")
.
log-fifo-size()
Type: | number |
Default: | Use global setting. |
Description: The number of messages that the output queue can store.
local-time-zone()
Type: | name of the timezone, or the timezone offset |
Default: | The local timezone. |
Description: Sets the timezone used when expanding filename and tablename templates.
The timezone can be specified by using the name, for example, time-zone("Europe/Budapest")
), or as the timezone offset in +/-HH:MM format, for example, +01:00
). On Linux and UNIX platforms, the valid timezone names are listed under the /usr/share/zoneinfo
directory.
on-error()
Type: | One of: drop-message , drop-property , fallback-to-string , silently-drop-message , silently-drop-property , silently-fallback-to-string |
---|---|
Default: | Use the global setting (which defaults to drop-message ) |
Description: Controls what happens when type-casting fails and AxoSyslog cannot convert some data to the specified type. By default, AxoSyslog drops the entire message and logs the error. Currently the value-pairs()
option uses the settings of on-error()
.
drop-message
: Drop the entire message and log an error message to theinternal()
source. This is the default behavior of AxoSyslog.drop-property
: Omit the affected property (macro, template, or message-field) from the log message and log an error message to theinternal()
source.fallback-to-string
: Convert the property to string and log an error message to theinternal()
source.silently-drop-message
: Drop the entire message silently, without logging the error.silently-drop-property
: Omit the affected property (macro, template, or message-field) silently, without logging the error.silently-fallback-to-string
: Convert the property to string silently, without logging the error.
persist-name()
Type: | string |
Default: | N/A |
Description: If you receive the following error message during AxoSyslog startup, set the persist-name()
option of the duplicate drivers:
Error checking the uniqueness of the persist names, please override it with persist-name option. Shutting down.
This error happens if you use identical drivers in multiple sources, for example, if you configure two file sources to read from the same file. In this case, set the persist-name()
of the drivers to a custom string, for example, persist-name("example-persist-name1")
.
poll-timeout()
Type: | integer in msec |
Default: | 1000 |
Description: Specifies the frequency your AxoSyslog queries the Kafka client about the amount of messages sent since the last poll-timeout ()
. In case of multithreading, the first AxoSyslog worker is responsible for poll-timeout()
.
properties-file()
Type: | string (absolute path) |
Default: | N/A |
Description: The absolute path and filename of the Kafka properties file to load. For example, properties-file("/opt/syslog-ng/etc/kafka_dest.properties")
. The AxoSyslog application reads this file and passes the properties to the Kafka Producer.
The AxoSyslogkafka
destination supports all properties of the official Kafka producer. For details, see the librdkafka documentation.
The bootstrap-servers
option is translated to the bootstrap.servers
property.
For example, the following properties file defines the acknowledgment method and compression:
example
`acks=all
compression.type=snappy`.
retries()
Type: | number (of attempts) |
Default: | 3 |
Description: If AxoSyslog cannot send a message, it will try again until the number of attempts reaches retries()
.
If the number of attempts reaches retries()
, AxoSyslog will wait for time-reopen()
time, then tries sending the message again.
send-time-zone()
Accepted values: | name of the timezone, or the timezone offset |
Default: | local timezone |
Description: Specifies the time zone associated with the messages sent by syslog-ng
, if not specified otherwise in the message or in the destination driver. For details, see Timezones and daylight saving.
The timezone can be specified by using the name, for example, time-zone("Europe/Budapest")
), or as the timezone offset in +/-HH:MM format, for example, +01:00
). On Linux and UNIX platforms, the valid timezone names are listed under the /usr/share/zoneinfo
directory.
sync-send()
Type: | `true |
Default: | false |
Description: When sync-send
is set to true
, AxoSyslog sends the message reliably: it sends a message to the Kafka server, then waits for a reply. In case of failure, AxoSyslog repeats sending the message, as set in the retries()
parameter. If sending the message fails for retries()
times, AxoSyslog drops the message.
This method ensures reliable message transfer, but is very slow.
When sync-send()
is set to false
, AxoSyslog sends messages asynchronously, and receives the response asynchronously. In case of a problem, AxoSyslog cannot resend the messages.
This method is fast, but the transfer is not reliable. Several thousands of messages can be lost before AxoSyslog recognizes the error.
sync-send()
option set to "yes"
, Axoflow recommends that you use librdkafka version 1.4.0 or higher, and a Kafka server with version number 0.11.0 or higher.
template()
Type: | template or template function |
Default: | $ISODATE $HOST $MSGHDR$MSG\\n |
Description: The message as published to Apache Kafka. You can use templates and template functions (for example, format-json()
) to format the message, for example, template("$(format-json --scope rfc5424 --exclude DATE --key ISODATE)")
.
For details on formatting messages in JSON format, see format-json.
throttle()
Type: | number |
Default: | 0 |
Description: Sets the maximum number of messages sent to the destination per second. Use this output-rate-limiting functionality only when using disk-buffer as well to avoid the risk of losing messages. Specifying 0
or a lower value sets the output limit to unlimited.
time-zone()
Type: | name of the timezone, or the timezone offset |
Default: | unspecified |
Description: Convert timestamps to the timezone specified by this option. If this option is not set, then the original timezone information in the message is used. Converting the timezone changes the values of all date-related macros derived from the timestamp, for example, HOUR
. For the complete list of such macros, see Date-related macros.
The timezone can be specified by using the name, for example, time-zone("Europe/Budapest")
), or as the timezone offset in +/-HH:MM format, for example, +01:00
). On Linux and UNIX platforms, the valid timezone names are listed under the /usr/share/zoneinfo
directory.
topic()
Type: | string |
Default: | N/A |
Description: The Kafka topic under which the message is published.
ts-format()
Type: | rfc3164, bsd, rfc3339, iso |
Default: | rfc3164 |
Description: Override the global timestamp format (set in the global ts-format()
parameter) for the specific destination. For details, see ts-format().
network()
, or syslog()
) ignore this option. For protocol-like destinations, use a template locally in the destination, or use the proto-template option.
workers()
Type: | integer |
Default: | 1 |
Description: Specifies the number of worker threads (at least 1) that AxoSyslog uses to send messages to the server. Increasing the number of worker threads can drastically improve the performance of the destination.
Hazard of data loss.
When you use more than one worker threads together with disk-based buffering, AxoSyslog creates a separate disk buffer for each worker thread. This means that decreasing the number of workers can result in losing data currently stored in the disk buffer files. Do not decrease the number of workers when the disk buffer files are in use.
workers()
option has no effect on this threadpool.