1. Java Producer
public class ProducerDemo {
private static final Logger log = LoggerFactory.getLogger(ProducerDemo.class.getSimpleName());
public static void main(String[] args) {
log.info("I am a Kafka Producer");
// create Producer Properties
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// create the Producer
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// create a producer record
ProducerRecord<String, String> producerRecord =
new ProducerRecord<>("demo_java", "hello world");
// send the data - asynchronous
producer.send(producerRecord);
// flush data - synchronous
producer.flush();
// flush and close producer
producer.close();
}
}
2. Java Producer Callbacks
producer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
// executes every time a record is successfully sent or an exception is thrown
if (e == null){
// the record was successfully sent
log.info("Received new metadata. \n" +
"Topic: " + metadata.topic() + "\n" +
"Partition: " + metadata.partition() + "\n" +
"Offset: " + metadata.offset() + "\n" +
"Timestamp: " + metadata.timestamp());
} else {
log.error("Error while producing", e);
}
}
});
粘性分区:如果发送速度足够快,几条消息可能作为同一批发送到同一分区
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
DefaultPartitioner
初始化时默认采用 StickyPartitionCache
3. Java Consumer
String boostrapServers = "127.0.0.1:9092";
String groupId = "my-second-application";
String topic = "demo_java";
// create consumer configs
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, boostrapServers);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// create consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// subscribe consumer to our topic(s)
consumer.subscribe(Arrays.asList(topic));
// poll for new data
while(true) {
log.info("Polling");
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
log.info("Key: " + record.key() + ", Value: " + record.value());
log.info("Partition: " + record.partition() + ", Offset: " + record.offset());
}
}
4. Kafka Consumer - Graceful shutdown
// create consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// get a reference to the current thread
final Thread mainThread = Thread.currentThread();
// adding the shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
log.info("Detected a shutdown, let's exit by calling consumer.wakeup()...");
consumer.wakeup();
// 会让下面的poll() throw exception
// join the main thread to allow the execution of the code in the main thread
try {
mainThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
try {
// subscribe consumer to our topic(s)
consumer.subscribe(Arrays.asList(topic));
// poll for new data
while(true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
log.info("Key: " + record.key() + ", Value: " + record.value());
log.info("Partition: " + record.partition() + ", Offset: " + record.offset());
}
}
} catch (WakeupException e) {
log.info("Wake up exception!");
// we ignore this as this is an expected exception when closing a consumer
} catch (Exception e){
log.error("Unexpected exception");
} finally {
consumer.close(); // this will also commit the offsets if need be
log.info("The consumer is now gracefully closed");
}
5. 自动提交-至多一次
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
consumer.poll(100);
6. 手动提交-至少一次
props.put("enable.auto.commit", "false");
consumer.commitSync();
7. 手动指定消费分区和消费位置
指定消费分区
String topic ="foo";
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 =new TopicPartition(topic,1);
consumer.assign(Arrays.asList(partition0,partition1));
指定消费位置
seek(TopicPartition, long)