Synadia Connect

Kafka

NAMERUNTIMETYPEVERSION
kafka_franzwombatsinklatest

A Kafka output using the https://github.com/twmb/franz-go[Franz Kafka client library^].

Writes a batch of messages to Kafka brokers and waits for acknowledgement before propagating it back to the input.

Configuration

NameTypeOptionalSecretDescription
seed_brokers[]stringnonoA list of broker addresses to connect to in order to establish connections. If an item of the list contains commas it will be expanded into multiple addresses.
client_idstringyesnoAn identifier for the client connection. The value defaults to "benthos".
tlstlsnonoCustom TLS settings can be used to override system defaults.
sasl[]saslnonoSpecify one or more methods of SASL authentication. SASL is tried in order; if the broker supports the first mechanism, all connections will use that mechanism. If the first mechanism fails, the client will pick the first supported mechanism. If the broker does not support any client mechanisms, connections will fail.
metadata_max_agestringyesnoThe maximum age of metadata before it is refreshed. The value defaults to "5m".
topicstringnonoA topic to write messages to.
keystringnonoAn optional key to populate for each message.
partitionstringnonoAn optional explicit partition to set for each message. This field is only relevant when the partitioner is set to manual. The provided interpolation string must be a valid integer.
metadatametadatanonoDetermine which (if any) metadata values should be added to messages as headers.
timestamp_msstringnonoAn optional timestamp to set for each message expressed in milliseconds. When left empty, the current timestamp is used.
max_in_flightintyesnoThe maximum number of batches to be sending in parallel at any given time. The value defaults to 10.
batchingbatchingnonoAllows you to configure a batching policy.
partitionerstringnonoOverride the default murmur2 hashing partitioner.
idempotent_writeboolyesnoEnable the idempotent write producer option. This requires the IDEMPOTENT_WRITE permission on CLUSTER and can be disabled if this permission is not available. The value defaults to true.
compressionstringnonoOptionally set an explicit compression type. The default preference is to use snappy when the broker supports it, and fall back to none if not. The value should be one of lz4, snappy, gzip, none or zstd.
timeoutstringyesnoThe maximum period of time to wait for message sends before abandoning the request and retrying. The value defaults to "10s".
max_message_bytesstringyesnoThe maximum space in bytes than an individual message may take, messages larger than this value will be rejected. This field corresponds to Kafka's max.message.bytes. The value defaults to "1MB".
broker_write_max_bytesstringyesnoThe upper bound for the number of bytes written to a broker connection in a single write. This field corresponds to Kafka's socket.request.max.bytes. The value defaults to "100MB".

TLS Configuration

NameTypeOptionalSecretDescription
enabledboolyesnoWhether custom TLS settings are enabled. The value defaults to false.
skip_cert_verifyboolyesnoWhether to skip server side certificate verification. The value defaults to false.
enable_renegotiationboolyesnoWhether to allow the remote server to repeatedly request renegotiation.
Enable this option if you're seeing the error message local error: tls: no renegotiation. The value defaults to false.
root_casstringyesyesAn optional root certificate authority to use.
This is a string, representing a certificate chain from the parent trusted root certificate,
to possible intermediate signing certificates, to the host certificate. The value defaults to "".
client_certs[]client_certsyesnoA list of client certificates to use. . The value defaults to [].

SASL

NameTypeOptionalSecretDescription
mechanismstringnonoThe SASL mechanism to use.
usernamestringyesnoA username to provide for PLAIN or SCRAM-* authentication. The value defaults to "".
passwordstringyesyesA password to provide for PLAIN or SCRAM-* authentication. The value defaults to "".
tokenstringyesnoThe token to use for a single session's OAUTHBEARER authentication. The value defaults to "".
extensionsextensionsnonoKey/value pairs to add to OAUTHBEARER authentication requests.
awsawsnonoContains AWS specific fields for when the mechanism is set to AWS_MSK_IAM.

Metadata

NameTypeOptionalSecretDescription
include_prefixes[]stringyesnoProvide a list of explicit metadata key prefixes to match against. The value defaults to [].
include_patterns[]stringyesnoProvide a list of explicit metadata key regular expression (re2) patterns to match against. The value defaults to [].

Batching

NameTypeOptionalSecretDescription
countintyesnoA number of messages at which the batch should be flushed. If 0 disables count based batching. The value defaults to 0.
byte_sizeintyesnoAn amount of bytes at which the batch should be flushed. If 0 disables size based batching. The value defaults to 0.
periodstringyesnoA period in which an incomplete batch should be flushed regardless of its size. The value defaults to "".
checkstringyesnoA Bloblang query that should return a boolean value indicating whether a message should end a batch. The value defaults to "".
Previous
HTTP