当前位置首页 > Apache知识

ApacheKafkaProducerForBeginners2019(转发)

阅读次数:228 次  来源:admin  发布时间:

原文:https://data-flair.training/blogs/kafka-producer/

In our last Kafka Tutorial, we discussed Kafka Cluster.

Today, we will discuss Kafka Producer with the example.

Moreover, we will see KafkaProducer API and Producer API.

Also, we will learn configurations settings in Kafka Producer.

At last, we will discuss simple producer application in Kafka Producer tutorial.

In order to publish messages to an Apache Kafka topic, we use Kafka Producer.

So, let’s explore Apache Kafka Producer in detail.

1. What is Kafka Producer?

Basically, an application that is the source of the data stream is what we call a producer.

In order to generate tokens or messages and further publish it to one or more topics in the Kafka cluster, we use Apache Kafka Producer.

Also, the Producer API from Kafka helps to pack the message or token and deliver it to Kafka Server.

Learn How to Create Kafka Client

Further, the picture below is showing the working of Apache Kafka Producer.

Kafka Producer – Apache Kafka Producer Working

There are some API’s available in Kafka Producer Client.

2. KafkaProducer API

However, to publish a stream of records to one or more Kafka topics, this Kafka Producer API permits to an application.

Moreover, its central part is KafkaProducer class.

Basically, with the following methods, this class offers an option to connect a Kafka broker in its constructor:

In order to send messages asynchronously to a topic, KafkaProducer class provides send method. So, the signature of send() is: producer.send(new ProducerRecord<byte[],byte[]>(topic,partition, key1, value1) , callback)

ProducerRecord − Generally, the producer manages a buffer of records waiting to be sent. Callback − When the record has been acknowledged by the server, a user-supplied callback to execute.

Note: Here, null indicates no callback.

Let’s discuss Kafka- serialization and deserializatio

Moreover, to ensure all previously sent messages have been actually completed, KafkaProducer class provides a flush method. So, the syntax of the flush method is − public void flush()

Also, to get the partition metadata for a given topic, KafkaProducer class provides the partition for method. Moreover, we can use it for custom partitioning. So, the signature of this method is: public Map metrics()

In this way, this method returns the map of internal metrics maintained by the producer.

ublic void close() − It also offers a close method blocks until all previously sent requests are completed.

3. Producer API

Producer class is the central part of the Kafka Producer API.

By the following methods, it offers an option to connect the Kafka broker in its constructor.

a. Kafka Producer Cla

Basically, to send messages to either single or multiple topics, the producer class offers an send method. The following are the signatures we can use for it.

ublic void send(KeyedMessaget<k,v> message)

– sends the data to a single topic, partitioned by key using either sync or async producer.

ublic void send(List<KeyedMessage<k,v>>messages)

– sends data to multiple topics.

Have a look at Apache Kafka-Load Test with JMeter

Properties prop = new Properties(); prop.put(producer.type,”async”) ProducerConfig config = new ProducerConfig(prop)

However, there are two types of producers, such as Sync and Async.

Although, to Sync producer, the same API configuration applies.

There is only one difference in both: Sync producer sends messages directly but in the background whereas, when we want higher throughput, we prefer the Async producer.

However, an Async producer does not have a callback for send() to register error handlers in the previous releases like 0.8. It is only available in the current release of 0.9.

. Public Void Close()

In order to close the producer pool connections to all Kafka brokers, producer class offers a public void close() method.

Read Kafka Use Cases and Applicatio

4. Configuration Settings For Kafka Producer API

Here, we are listing the Kafka Producer API’s main configuration settings:

a. client.id

It identifies producer application.

. producer.type

Either sync or async.

c. ack

Basically, it controls the criteria for producer requests that are considered complete.

d. retrie

“Retries” means if somehow producer request fails, then automatically retry with the specific value.

e. bootstrap.server

It bootstraps list of brokers.

f. linger.m

Basically, we can set linger.ms to something greater than some value, if we want to reduce the number of requests.

g. key.serializer

It is a key for the serializer interface.

h. value.serializer

A value for the serializer interface.

i. batch.size

Simply, Buffer size.

j. buffer.memory

“buffer.memory” controls the total amount of memory available to the producer for buffering.

5. ProducerRecord API

By using the following signature, it is a key/value pair that is sent to the Kafka cluster.

ProducerRecord class constructor is for creating a record with partition, key and value pairs.

ublic ProducerRecord (string topic, int partition, k key, v value)

Topic − user-defined topic name that will append to record. Partition − partition count. Key − The key that will be included in the record. Value − Record contents.

ublic ProducerRecord (string topic, k key, v value)

To create a record with the key, value pairs and without partition, we use the ProducerRecord class constructor.

Topic − Create a topic to assign record. Key − key for the record. Value − Record contents.

ublic ProducerRecord (string topic, v value)

Moreover, without partition and key, ProducerRecord class creates a record.

Topic − Create a topic. Value − Record contents.

Now, here we are listing the ProducerRecord class methods −

1. public string topic()

The topic will append to the record.

2. public K key()

Key that will be included in the record. If no such key, null will be returned here.

3. public V value()

To record contents.

4. partition()

Partition count for the record.

6. Simple Kafka Producer Applicatio

However, make sure that first start ZooKeeper and Kafka broker then create your own topic in Kafka broker using create topic command. Then create a Java class named Sim-pleProducer.java and proceed with the following coding:

//import util.properties packages
import java.util.Properties;
//import simple producer packages
import org.apache.kafka.clients.producer.Producer;
//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;
//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;
//Create java class named “SimpleProducer”
public class SimpleProducer {
  public static void main(String[] args) throws Exception{
     // Check arguments length value
     if(args.length == 0){
        System.out.println("Enter topic name”);
        return;
     }
     //Assign topicName to string variable
     String topicName = args[0].toString();
     // create instance for properties to access producer configs
     Properties props = new Properties();
     //Assign localhost id
     props.put("bootstrap.servers", “localhost:9092");
     //Set acknowledgements for producer requests.
     props.put("acks", “all");
     //If the request fails, the producer can automatically retry,
     props.put("retries", 0);
     //Specify buffer size in config
     props.put("batch.size", 16384);
     //Reduce the no of requests less than 0
     props.put("linger.ms", 1);
     //The buffer.memory controls the total amount of memory available to the producer for buffering.
     props.put("buffer.memory", 33554432);
     props.put("key.serializer",
        "org.apache.kafka.common.serializa-tion.StringSerializer");
     props.put("value.serializer",
        "org.apache.kafka.common.serializa-tion.StringSerializer");
     Producer<String, String> producer = new KafkaProducer
        <String, String>(props);
     for(int i = 0; i < 10; i++)
        producer.send(new ProducerRecord<String, String>(topicName,
           Integer.toString(i), Integer.toString(i)));
              System.out.println(“Message sent successfully”);
              producer.close();
  }
}

a. Compilatio

By using the following command, we can compile the application.

Let’s revise Kafka Command

. Executio

Further, using the following command, we can execute the application.

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleProducer <topic-name>

c. Output

Message sent successfully

To check the above output open the new terminal and type Consumer CLI command to receive messages.

gt;> bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic <topic-name> —from-beginning

1

2

3

4

5

6

7

8

9

10

So, this was all about Apache Kafka Producer. Hope you like our explanation.

Read Apache Kafka Workflow | Kafka Pub-Sub Messaging

7. Summary: Kafka Producer

Hence, in this Kafka Tutorial, we have seen the concept of Kafka Producer along with the example.

Now, in the next tutorial, we will learn about the Kafka Consumer, in order to consume messages from the Kafka cluster.

Further, we have learned Producer API, Producer class, public void close. Also, we discussed the configuration setting for the Kafka Producer API and Producer Record API.

Finally, we saw SimpleProducer Application with the help of compilation, execution, and output.

Furthermore, if you have any doubt, feel free to ask in the comment section.

上一篇:使用移动硬盘安装fedora14
下一篇:替换Docker或Laradock中Debian系统镜像源解决软件安装问题