kafka应用简单案例

在主机进行生产发送

/opt/cloudera/parcels/KAFKA-3.0.0-1.3.0.0.p0.40/lib/kafka/bin/kafka-console-producer.sh –broker-list localhost.slave1:9092 –topic T2

 

接收信息

/opt/cloudera/parcels/KAFKA-3.0.0-1.3.0.0.p0.40/lib/kafka/bin/kafka-console-consumer.sh –zookeeper localhost.slave1:2181 –topic T2 –from-beginning

The console consumer is a tool that reads data from Kafka and outputs it to standard output.
Option Description
—— ———–
–blacklist <String: blacklist> Blacklist of topics to exclude from
consumption.
–bootstrap-server <String: server to REQUIRED (unless old consumer is
connect to> used): The server to connect to.
–consumer-property <String: A mechanism to pass user-defined
consumer_prop> properties in the form key=value to
the consumer.
–consumer.config <String: config file> Consumer config properties file. Note
that [consumer-property] takes
precedence over this config.
–csv-reporter-enabled If set, the CSV metrics reporter will
be enabled
–delete-consumer-offsets If specified, the consumer path in
zookeeper is deleted when starting up
–enable-systest-events Log lifecycle events of the consumer
in addition to logging consumed
messages. (This is specific for
system tests.)
–formatter <String: class> The name of a class to use for
formatting kafka messages for
display. (default: kafka.tools.
DefaultMessageFormatter)
–from-beginning If the consumer does not already have
an established offset to consume
from, start with the earliest
message present in the log rather
than the latest message.
–isolation-level <String> Set to read_committed in order to
filter out transactional messages
which are not committed. Set to
read_uncommittedto read all
messages. (default: read_uncommitted)
–key-deserializer <String:
deserializer for key>
–max-messages <Integer: num_messages> The maximum number of messages to
consume before exiting. If not set,
consumption is continual.
–metrics-dir <String: metrics If csv-reporter-enable is set, and
directory> this parameter isset, the csv
metrics will be output here
–new-consumer Use the new consumer implementation.
This is the default.
–offset <String: consume offset> The offset id to consume from (a non-
negative number), or ‘earliest’
which means from beginning, or
‘latest’ which means from end
(default: latest)
–partition <Integer: partition> The partition to consume from.
Consumption starts from the end of
the partition unless ‘–offset’ is
specified.
–property <String: prop> The properties to initialize the
message formatter.
–skip-message-on-error If there is an error when processing a
message, skip it instead of halt.
–timeout-ms <Integer: timeout_ms> If specified, exit if no message is
available for consumption for the
specified interval.
–topic <String: topic> The topic id to consume on.
–value-deserializer <String:
deserializer for values>
–whitelist <String: whitelist> Whitelist of topics to include for
consumption.
–zookeeper <String: urls> REQUIRED (only when using old
consumer): The connection string for
the zookeeper connection in the form
host:port. Multiple URLS can be
given to allow fail-over.

 

 

如果自己开发的java程序不能接收到productor的信息,先查一下日志,

offsets.topic.replication.factor

中设置值是否和自己实际broker数目一致


发表评论

电子邮件地址不会被公开。 必填项已用*标注