Synadia Connect
Kafka
NAME | RUNTIME | TYPE | VERSION |
---|---|---|---|
kafka_franz | wombat | sink | latest |
A Kafka output using the https://github.com/twmb/franz-go[Franz Kafka client library^].
Writes a batch of messages to Kafka brokers and waits for acknowledgement before propagating it back to the input.
Configuration
Name | Type | Optional | Secret | Description |
---|---|---|---|---|
seed_brokers | []string | no | no | A list of broker addresses to connect to in order to establish connections. If an item of the list contains commas it will be expanded into multiple addresses. |
client_id | string | yes | no | An identifier for the client connection. The value defaults to "benthos" . |
tls | tls | no | no | Custom TLS settings can be used to override system defaults. |
sasl | []sasl | no | no | Specify one or more methods of SASL authentication. SASL is tried in order; if the broker supports the first mechanism, all connections will use that mechanism. If the first mechanism fails, the client will pick the first supported mechanism. If the broker does not support any client mechanisms, connections will fail. |
metadata_max_age | string | yes | no | The maximum age of metadata before it is refreshed. The value defaults to "5m" . |
topic | string | no | no | A topic to write messages to. |
key | string | no | no | An optional key to populate for each message. |
partition | string | no | no | An optional explicit partition to set for each message. This field is only relevant when the partitioner is set to manual . The provided interpolation string must be a valid integer. |
metadata | metadata | no | no | Determine which (if any) metadata values should be added to messages as headers. |
timestamp_ms | string | no | no | An optional timestamp to set for each message expressed in milliseconds. When left empty, the current timestamp is used. |
max_in_flight | int | yes | no | The maximum number of batches to be sending in parallel at any given time. The value defaults to 10 . |
batching | batching | no | no | Allows you to configure a batching policy. |
partitioner | string | no | no | Override the default murmur2 hashing partitioner. |
idempotent_write | bool | yes | no | Enable the idempotent write producer option. This requires the IDEMPOTENT_WRITE permission on CLUSTER and can be disabled if this permission is not available. The value defaults to true . |
compression | string | no | no | Optionally set an explicit compression type. The default preference is to use snappy when the broker supports it, and fall back to none if not. The value should be one of lz4 , snappy , gzip , none or zstd . |
timeout | string | yes | no | The maximum period of time to wait for message sends before abandoning the request and retrying. The value defaults to "10s" . |
max_message_bytes | string | yes | no | The maximum space in bytes than an individual message may take, messages larger than this value will be rejected. This field corresponds to Kafka's max.message.bytes . The value defaults to "1MB" . |
broker_write_max_bytes | string | yes | no | The upper bound for the number of bytes written to a broker connection in a single write. This field corresponds to Kafka's socket.request.max.bytes . The value defaults to "100MB" . |
TLS Configuration
Name | Type | Optional | Secret | Description |
---|---|---|---|---|
enabled | bool | yes | no | Whether custom TLS settings are enabled. The value defaults to false . |
skip_cert_verify | bool | yes | no | Whether to skip server side certificate verification. The value defaults to false . |
enable_renegotiation | bool | yes | no | Whether to allow the remote server to repeatedly request renegotiation. |
Enable this option if you're seeing the error message local error: tls: no renegotiation . The value defaults to false . | ||||
root_cas | string | yes | yes | An optional root certificate authority to use. |
This is a string, representing a certificate chain from the parent trusted root certificate, | ||||
to possible intermediate signing certificates, to the host certificate. The value defaults to "" . | ||||
client_certs | []client_certs | yes | no | A list of client certificates to use. . The value defaults to [] . |
SASL
Name | Type | Optional | Secret | Description |
---|---|---|---|---|
mechanism | string | no | no | The SASL mechanism to use. |
username | string | yes | no | A username to provide for PLAIN or SCRAM-* authentication. The value defaults to "" . |
password | string | yes | yes | A password to provide for PLAIN or SCRAM-* authentication. The value defaults to "" . |
token | string | yes | no | The token to use for a single session's OAUTHBEARER authentication. The value defaults to "" . |
extensions | extensions | no | no | Key/value pairs to add to OAUTHBEARER authentication requests. |
aws | aws | no | no | Contains AWS specific fields for when the mechanism is set to AWS_MSK_IAM . |
Metadata
Name | Type | Optional | Secret | Description |
---|---|---|---|---|
include_prefixes | []string | yes | no | Provide a list of explicit metadata key prefixes to match against. The value defaults to [] . |
include_patterns | []string | yes | no | Provide a list of explicit metadata key regular expression (re2) patterns to match against. The value defaults to [] . |
Batching
Name | Type | Optional | Secret | Description |
---|---|---|---|---|
count | int | yes | no | A number of messages at which the batch should be flushed. If 0 disables count based batching. The value defaults to 0 . |
byte_size | int | yes | no | An amount of bytes at which the batch should be flushed. If 0 disables size based batching. The value defaults to 0 . |
period | string | yes | no | A period in which an incomplete batch should be flushed regardless of its size. The value defaults to "" . |
check | string | yes | no | A Bloblang query that should return a boolean value indicating whether a message should end a batch. The value defaults to "" . |