Pertanyaan Tidak dapat terhubung ke penjaga kebun binatang jarak jauh dari produsen Kafka


Saya sudah bermain dengan Apache Kafka selama beberapa hari, dan inilah masalah saya, Jika saya mengatur tes lokal yang dijelaskan di bagian "mulai cepat" di situs web, semuanya baik-baik saja, produsen kafka / konsumen, server penjaga kebun binatang dan broker kafka bekerja dengan sempurna.

Sekarang jika saya menjalankan pada server jauh (sebut saja simpul2):  - Zookeeper - port 2181  - Kafka Broker - port 9092  - konsumen kafka

Dan kemudian jika saya lari dari komputer lokal saya:  - produser kafka

Dengan asumsi tidak ada firewall di node2. Sambungan berakhir dengan batas waktu.

Berikut ini adalah log kesalahan:

/etc/java/jdk1.6.0_41/bin/java -Didea.launcher.port=7533 -Didea.launcher.bin.path=/home/kevin/Documents/idea-IU-123.169/bin -Dfile.encoding=UTF-8 -classpath /etc/java/jdk1.6.0_41/lib/dt.jar:/etc/java/jdk1.6.0_41/lib/tools.jar:/etc/java/jdk1.6.0_41/lib/jconsole.jar:/etc/java/jdk1.6.0_41/lib/htmlconverter.jar:/etc/java/jdk1.6.0_41/lib/sa-jdi.jar:/home/kevin/Desktop/kafka-0.7.2/examples/target/scala_2.8.0/classes:/home/kevin/Desktop/kafka-0.7.2/project/boot/scala-2.8.0/lib/scala-compiler.jar:/home/kevin/Desktop/kafka-0.7.2/project/boot/scala-2.8.0/lib/scala-library.jar:/home/kevin/Desktop/kafka-0.7.2/core/target/scala_2.8.0/classes:/home/kevin/Desktop/kafka-0.7.2/core/lib_managed/scala_2.8.0/compile/jopt-simple-3.2.jar:/home/kevin/Desktop/kafka-0.7.2/core/lib_managed/scala_2.8.0/compile/log4j-1.2.15.jar:/home/kevin/Desktop/kafka-0.7.2/core/lib_managed/scala_2.8.0/compile/zookeeper-3.3.4.jar:/home/kevin/Desktop/kafka-0.7.2/core/lib_managed/scala_2.8.0/compile/zkclient-0.1.jar:/home/kevin/Desktop/kafka-0.7.2/core/lib_managed/scala_2.8.0/compile/snappy-java-1.0.4.1.jar:/home/kevin/Desktop/kafka-0.7.2/examples/lib_managed/scala_2.8.0/compile/jopt-simple-3.2.jar:/home/kevin/Desktop/kafka-0.7.2/examples/lib_managed/scala_2.8.0/compile/log4j-1.2.15.jar:/home/kevin/Documents/idea-IU-123.169/lib/idea_rt.jar com.intellij.rt.execution.application.AppMain kafka.examples.KafkaConsumerProducerDemo
log4j:WARN No appenders could be found for logger (org.I0Itec.zkclient.ZkConnection).
log4j:WARN Please initialize the log4j system properly.
Exception in thread "Thread-0" java.net.ConnectException: Connection timed out
    at sun.nio.ch.Net.connect(Native Method)
    at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:532)
    at kafka.producer.SyncProducer.connect(SyncProducer.scala:173)
    at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:196)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:92)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:125)
    at kafka.producer.ProducerPool$$anonfun$send$1.apply$mcVI$sp(ProducerPool.scala:114)
    at kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:100)
    at kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:100)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
    at kafka.producer.ProducerPool.send(ProducerPool.scala:100)
    at kafka.producer.Producer.zkSend(Producer.scala:137)
    at kafka.producer.Producer.send(Producer.scala:99)
    at kafka.javaapi.producer.Producer.send(Producer.scala:103)
    at kafka.examples.Producer.run(Producer.java:53)

Process finished with exit code 0

Dan inilah kode Produser saya:

import java.util.Properties;
import kafka.javaapi.producer.ProducerData;
import kafka.producer.ProducerConfig;


public class Producer extends Thread{

  private final kafka.javaapi.producer.Producer<String, String> producer;
  private final String topic;
  private final Properties props = new Properties();

  public Producer(String topic)
  {
    props.put("zk.connect", "node2:2181");
    props.put("connect.timeout.ms", "5000");
    props.put("socket.timeout.ms", "30000");
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("producer.type", "sync");
    props.put("conpression.codec", "0");
    producer = new kafka.javaapi.producer.Producer<String, String>(new ProducerConfig(props));
    this.topic = topic;
  }

  public void run() {
      String messageStr = new String("Message_test");
      producer.send(new ProducerData<String, String>(topic, messageStr));
  }
}

** Jadi saya juga diuji untuk beralih

props.put("zk.connect", "node2:2181");

oleh

props.put("broker.list", "0:node2:9082");

Dan dalam hal ini saya dapat terhubung dengan sukses. **


4
2018-03-04 19:21


asal


Jawaban:


Lihat item # 3 di http://kafka.apache.org/faq.html

Solusinya adalah secara eksplisit mengatur properti hostname di server.properties dari Kafka

Anda dapat memverifikasi ini dengan menggunakan Zookeeper. Jika Anda menggunakan kafka 0.7 *, buka konsol ZkCli dan dapatkan / broker / id / 0 dan Anda harus mendapatkan semua metadata broker. Pastikan alamat IP / nama host di sini cocok dengan string koneksi Zk yang Anda gunakan dalam kode produser -

props.put("zk.connect", "node2:2181");

Dalam kasus saya, saya menggunakan produser yang berjalan di mesin lokal saya yang terhubung ke VM ubuntu (kotak yang sama, IP yang berbeda) dan solusi ini membantu.


3
2018-04-17 22:09