Kafka Producer
Overview
You can use the Kafka Producer Snap to send messages to a Kafka topic.

Known issues
Recent changes in Kafka to enable idempotent writes by default may result in authorization errors when using the Kafka Snaps if your Kafka cluster version is lower than 2.8.
To fix this issue, perform one of the following actions:
- Upgrade your Kafka cluster to at least version 2.8.
- Use Kafka’s ACL management to ensure that IdempotentWrite is allowed for the cluster.
Snap views
| View | Description | Examples of upstream and downstream Snaps |
|---|---|---|
| Input | A key/value pair for the Kafka message. | |
| Output |
Not required: The Consumer Snap is decoupled from the Producer
Snap. Kafka message records. Example:
|
N/A |
Snap settings
- Expression icon (
): Allows using pipeline parameters to set field values dynamically (if enabled). SnapLogic Expressions are not supported. If disabled, you can provide a static value.
- SnapGPT (
): Generates SnapLogic Expressions based on natural language using SnapGPT. Learn more.
- Suggestion icon (
): Populates a list of values dynamically based on your Snap configuration. You can select only one attribute at a time using the icon. Type into the field if it supports a comma-separated list of values.
- Upload
: Uploads files. Learn more.
| Field / Field set | Type | Description |
|---|---|---|
| Label | String | Required. Specify the name for the Snap. You can modify the default name to be specific and meaningful, especially if you have more than one of the same Snaps in your pipeline.
Default value: Kafka Producer Example: Producer |
| Client ID | String/Expression/ Suggestion | Specify the Client ID to be used in the Kafka message. If you do
not specify a Client ID, a random string is used. This Client ID is
used for correlating the requests sent to the brokers with each
client. Default value: N/A Example: testclient001 |
| Topic | String/Expression/ Suggestion | Specify the topic to publish the message. If the topic does not
exist, a new topic and its partitions are created based on the
configuration. Default value: N/A Example: t1 |
| Partition Number | String/Expression/ Suggestion | Specify the partition number in which the messages should be published. If you do not specify the partition number, the server publishes the messages to the next partition chosen based on the configuration.
Default value: N/A Example: 0 |
| Message Key | String/Expression | Enter the key for the message to be sent out.
Default value: N/A Example: customerID |
| Message Value | String/Expression | Required. Provide the value for the corresponding message key. The messages are sent with a value and, optionally, a key.
Default value: N/A Example: JSON.stringify($) |
| Timestamp | String/Expression | Specify a timestamp for the message in number, string, or date
format. The timestamp represents milliseconds elapsed since
midnight, January 1, 1970 UTC. Example: Date.parse($timestamp, "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") Default: Note: Each record's timestamp
must be greater than or equal to the timestamp of the preceding
record in the same partition; that is, the records must be in
chronological order within each partition.
Note:
If you enter the timestamp in number or Epoch format, the value must be non-negative, (for example, 01-01-1970), else an error message is displayed. If you do not set a timestamp or if the timestamp resolves to null or a blank string, the record's timestamp defaults to the current time. The Timestamp field is relevant only if the Kafka
topic is configured with
|
| Key Serializer | Dropdown list | Select the target data type for the Key Serializer of each
record. The available options are:
Note:
Default value: String Example: JSONNote: The Key Deserializer
field's value in the corresponding Consumer Snap should also be
of the same data type. |
| Value Serializer | Dropdown list | Choose the target data type for the Value Serializer of each
record. The available options are:
Default value: String Example: ByteArrayNote: The Avro and
JSON_SR formats depend on the schema registered in the Kafka
Schema Registry. The Kafka account associated with the Snap must
have the Schema Registry configured. Note: The Value
Deserializer field's value in the corresponding Consumer Snap
should also be of the same data type. |
| Headers | Use this field set to define an
optional set of headers for each record. Click the add icon ( Note: The
Kafka Producer Snap supports the addition of multiple headers
with the same key. When converting a Kafka record to an output
document, multiple headers with the same key are combined into a
single header whose value is a list or an array. |
|
| Key | String/Expression | Specify a name to use for the header. You can add multiple
headers with the same key in either of the following ways:
Default value: N/A Example: City |
| Value | String/Expression | Specify a value for the header.
Default value: N/A Example: SFO |
| Serializer | Dropdown list | Select a Serializer for the header to convert the value from a
specific data type such as string or Int32. Default value: String Example: ByteArray |
| Acknowledgements | Dropdown list | Select the type of acknowledgment for the message, which
determines the number of acknowledgments that the Producer requires
the server to have received before considering a request to be
complete. The available options are:
Default value: 1 - Send with leader acknowledgment Example: all - Send with full acknowledgment |
| Batch Size (bytes) | Integer/Expression | Specify the number of bytes that are received before messages are sent to the server.
Default value: 16384 Example: 15000 |
| Linger Time (milliseconds) | Integer/Expression | Specify the linger time for the messages to be sent. Linger time is the time in milliseconds that the Producer Snap waits before sending data. During this period, the Producer continues sending data in batches. The end of the sending process is triggered by both the Batch Size and Linger Time settings, depending on which condition occurs first.
Default value: 0 Example: 1 |
| Retries | Integer/Expression | Set the total number of times to attempt a message delivery in case of failure.
Default value: 0 Example: 3Note: Whenever the Snap needs to connect to an endpoint and the connection object is either null or closed, the Snap opens the connection and retries up to three times before aborting.
|
| Preserve Message Order With Retry | Checkbox | Select this checkbox to resend messages in the same order each
time. Default status: Deselected Note: This field is applicable when Retires is
more than 0, in which case, the order of messages may get out of
the set order; activating this check box ensures the messages
order is preserved. |
| Compression Type | Dropdown list | Choose the compression type before sending messages. The
available options are:
Default value: None Example: GZIPLearn more: Optimizing for Throughput — Confluent Documentation |
| Send Synchronously | Checkbox | Select this checkbox to send each message synchronously. The
consumer blocks the message until metadata from the broker is
received. Default status: Deselected |
| Output Records | Checkbox | Select this check box to include the record’s data and metadata
in each output document, that is, key, value, headers, and
timestamp. If deselected, each document includes only the basic
metadata (topic, partition, offset) for each record, including the
original input document. A copy of each input document is stored in
the output document under the key original.Default status: Deselected |
| Message Publish Timeout (milliseconds) | Integer/Expression | Specify the timeout value in milliseconds to publish a message. If the Snap fails to publish the message within the specified time, a timeout error appears.
Default value: 60000 Example: 60000 |
| Snap execution | Dropdown list | Select one of the three modes in which the Snap executes. The
available options are:
Default value: Execute only Example: Validate & Execute |