Kafka Producer

Overview

You can use the Kafka Producer Snap to send messages to a Kafka topic.


Kafka Producer Snap Overview

Known issues

Recent changes in Kafka to enable idempotent writes by default may result in authorization errors when using the Kafka Snaps if your Kafka cluster version is lower than 2.8.

To fix this issue, perform one of the following actions:

  • Upgrade your Kafka cluster to at least version 2.8.
  • Use Kafka’s ACL management to ensure that IdempotentWrite is allowed for the cluster.

Snap views

View Description Examples of upstream and downstream Snaps
Input A key/value pair for the Kafka message.
Output
Not required: The Consumer Snap is decoupled from the Producer Snap. Kafka message records. Example:
[
                                    {"kafka_offset":"7139","kafka_partition":"0","kafka_topic":"PC_Test","original":{"msgID":"id1100","msgVal":"msg1100"}}]
                                
N/A

Snap settings

Legend:
  • Expression icon (): Allows using pipeline parameters to set field values dynamically (if enabled). SnapLogic Expressions are not supported. If disabled, you can provide a static value.
  • SnapGPT (): Generates SnapLogic Expressions based on natural language using SnapGPT. Learn more.
  • Suggestion icon (): Populates a list of values dynamically based on your Snap configuration. You can select only one attribute at a time using the icon. Type into the field if it supports a comma-separated list of values.
  • Upload : Uploads files. Learn more.
Learn more about the icons in the Snap settings dialog.
Field / Field set Type Description
Label String Required. Specify the name for the Snap. You can modify the default name to be specific and meaningful, especially if you have more than one of the same Snaps in your pipeline.

Default value: Kafka Producer

Example: Producer
Client ID String/Expression/ Suggestion Specify the Client ID to be used in the Kafka message. If you do not specify a Client ID, a random string is used. This Client ID is used for correlating the requests sent to the brokers with each client.

Default value: N/A

Example: testclient001
Topic String/Expression/ Suggestion Specify the topic to publish the message. If the topic does not exist, a new topic and its partitions are created based on the configuration.

Default value: N/A

Example: t1
Partition Number String/Expression/ Suggestion Specify the partition number in which the messages should be published. If you do not specify the partition number, the server publishes the messages to the next partition chosen based on the configuration.

Default value: N/A

Example: 0
Message Key String/Expression Enter the key for the message to be sent out.

Default value: N/A

Example: customerID
Message Value String/Expression Required. Provide the value for the corresponding message key. The messages are sent with a value and, optionally, a key.

Default value: N/A

Example: JSON.stringify($)
Timestamp String/Expression Specify a timestamp for the message in number, string, or date format. The timestamp represents milliseconds elapsed since midnight, January 1, 1970 UTC. Example: Date.parse($timestamp, "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") Default:
Note: Each record's timestamp must be greater than or equal to the timestamp of the preceding record in the same partition; that is, the records must be in chronological order within each partition.
Note:

If you enter the timestamp in number or Epoch format, the value must be non-negative, (for example, 01-01-1970), else an error message is displayed. If you do not set a timestamp or if the timestamp resolves to null or a blank string, the record's timestamp defaults to the current time.

The Timestamp field is relevant only if the Kafka topic is configured with message.timestamp.type = CreateTime (which is the default). For more information, see the official Kafka documentation.

Key Serializer Dropdown list Select the target data type for the Key Serializer of each record. The available options are:
  • String
  • JSON
  • JSON_SR (requires schema registry)
  • Avro (requires schema registry)
  • JSON (arbitrary complex data, schema-less)
  • Avro (arbitrary complex data, schema-centric)
    Note:
    • The Avro format depends on an Avro schema registered in Kafka Schema Registry. The Kafka account associated with the Snap must have the Schema Registry configured.
    • The Kafka Producer Snap looks up the Avro schema by name using Kafka’s standard naming convention: “topic-key” for a key schema, or “topic-value” for a value schema. For example, if the topic is “order”, then the schema name for the key schema would be “order-key”.
  • ByteArray (binary content, such as images)
  • Int16 (16-bit integer)
  • Int32 (32-bit integer)
  • Int64 (64-bit integer)
  • Float32 (32-bit floating-point number)
  • Float64 (64-bit floating-point number)
  • BigInteger (arbitrary precision integer)
  • BigDecimal (arbitrary precision decimal number)
Note:
  • If you select Avro or JSON_SR as the key deserializer value, ensure that your Confluent Kafka Account's configuration includes the schema registry.
  • The Avro and JSON Schema formats depend on the schema registered in the Kafka Schema Registry. The Kafka account associated with the Snap must have the Schema Registry configured.

Default value: String

Example: JSON
Note: The Key Deserializer field's value in the corresponding Consumer Snap should also be of the same data type.
Value Serializer Dropdown list Choose the target data type for the Value Serializer of each record. The available options are:
  • String
  • JSON
  • JSON_SR (requires schema registry)
  • Avro (requires schema registry)
    • The Avro format depends on an Avro schema registered in Kafka Schema Registry. The Kafka account associated with the Snap must have the Schema Registry configured.
    • The Kafka Producer Snap looks up the Avro schema by name using Kafka’s standard naming convention: “topic-key” for a key schema, or “topic-value” for a value schema. For example, if the topic is “order”, then the schema name for the key schema would be “order-key”.
  • ByteArray
  • Int16 (16-bit integer)
  • Int32 (32-bit integer)
  • Int64 (64-bit integer)
  • Float32 (32-bit floating-point number)
  • Float64 (64-bit floating-point number)
  • BigInteger (arbitrary precision integer)
  • BigDecimal (arbitrary precision decimal number)

Default value: String

Example: ByteArray
Note: The Avro and JSON_SR formats depend on the schema registered in the Kafka Schema Registry. The Kafka account associated with the Snap must have the Schema Registry configured.
Note: The Value Deserializer field's value in the corresponding Consumer Snap should also be of the same data type.
Headers Use this field set to define an optional set of headers for each record. Click the add icon ( ) to add a new row for record headers and define key, value, and serializer values.
Note: The Kafka Producer Snap supports the addition of multiple headers with the same key. When converting a Kafka record to an output document, multiple headers with the same key are combined into a single header whose value is a list or an array.
Key String/Expression Specify a name to use for the header. You can add multiple headers with the same key in either of the following ways:
  • Configure multiple rows in the Header table with the same key.
  • Configure a row in the Header table with a value that evaluates to a list or an array.

Default value: N/A

Example: City
Value String/Expression Specify a value for the header.

Default value: N/A

Example: SFO
Serializer Dropdown list Select a Serializer for the header to convert the value from a specific data type such as string or Int32.

Default value: String

Example: ByteArray
Acknowledgements Dropdown list Select the type of acknowledgment for the message, which determines the number of acknowledgments that the Producer requires the server to have received before considering a request to be complete.

The available options are:

  • 0 - Send without any acknowledgment: The Producer does not wait for any acknowledgment from the server.
  • 1 - Send with leader acknowledgment: The Producer waits for the replication leader to acknowledge the message received by the server.
  • all - Send with full acknowledgment: The Producer waits for all replicas to acknowledge the message received by the server.

Default value: 1 - Send with leader acknowledgment

Example: all - Send with full acknowledgment
Batch Size (bytes) Integer/Expression Specify the number of bytes that are received before messages are sent to the server.

Default value: 16384

Example: 15000
Linger Time (milliseconds) Integer/Expression Specify the linger time for the messages to be sent. Linger time is the time in milliseconds that the Producer Snap waits before sending data. During this period, the Producer continues sending data in batches. The end of the sending process is triggered by both the Batch Size and Linger Time settings, depending on which condition occurs first.

Default value: 0

Example: 1
Retries Integer/Expression Set the total number of times to attempt a message delivery in case of failure.

Default value: 0

Example: 3
Note: Whenever the Snap needs to connect to an endpoint and the connection object is either null or closed, the Snap opens the connection and retries up to three times before aborting.
Preserve Message Order With Retry Checkbox Select this checkbox to resend messages in the same order each time.

Default status: Deselected

Note: This field is applicable when Retires is more than 0, in which case, the order of messages may get out of the set order; activating this check box ensures the messages order is preserved.
Compression Type Dropdown list Choose the compression type before sending messages. The available options are:
  • NONE
  • GZIP
  • LZ4
  • SNAPPY
  • ZSTD

Default value: None

Example: GZIP

Learn more: Optimizing for Throughput — Confluent Documentation

Send Synchronously Checkbox Select this checkbox to send each message synchronously. The consumer blocks the message until metadata from the broker is received.

Default status: Deselected

Output Records Checkbox Select this check box to include the record’s data and metadata in each output document, that is, key, value, headers, and timestamp. If deselected, each document includes only the basic metadata (topic, partition, offset) for each record, including the original input document. A copy of each input document is stored in the output document under the key original.

Default status: Deselected

Message Publish Timeout (milliseconds) Integer/Expression Specify the timeout value in milliseconds to publish a message. If the Snap fails to publish the message within the specified time, a timeout error appears.

Default value: 60000

Example: 60000
Snap execution Dropdown list Select one of the three modes in which the Snap executes. The available options are:
  • Validate & Execute: Performs limited execution of the Snap and generates a data preview during Pipeline validation. Then performs full execution during Pipeline runtime.
  • Execute only: Performs full execution of the Snap during Pipeline execution without generating preview data.
  • Disabled: Disables the Snap and all downstream Snaps.

Default value: Execute only

Example: Validate & Execute