#!/bin/bash LOCAL_IP={{ inventory_hostname }}:9094 ZK_SERVER={% for dev_info in groups.zookeeper -%} {% if loop.last -%} {{dev_info}}:2181/kafka {%- else %} {{dev_info}}:2181, {%- endif %} {%- endfor %} KAFKA_SERVER={% for dev_info in groups.kafka -%} {% if loop.last -%} {{dev_info}}:9092 {%- else %} {{dev_info}}:9092, {%- endif %} {%- endfor %} PARTITIONS={{groups.kafka|length}} case $1 in producer) kafka-console-producer.sh --producer.config $KAFKA_HOME/config/producer.properties --broker-list $LOCAL_IP --topic $2 ;; consumer) kafka-console-consumer.sh --consumer.config $KAFKA_HOME/config/consumer.properties --bootstrap-server $LOCAL_IP --topic $2 ;; consumer-begin) kafka-console-consumer.sh --consumer.config $KAFKA_HOME/config/consumer.properties --from-beginning --bootstrap-server $LOCAL_IP --topic $2 ;; create) kafka-topics.sh --create --bootstrap-server $KAFKA_SERVER --replication-factor 1 --partitions $PARTITIONS --topic $2 ;; delete) kafka-topics.sh --delete --bootstrap-server $KAFKA_SERVER --topic $2 ;; list) kafka-topics.sh --list --bootstrap-server $KAFKA_SERVER ;; groups) kafka-consumer-groups.sh --all-groups --all-topics --list --bootstrap-server $KAFKA_SERVER ;; group) kafka-consumer-groups.sh --bootstrap-server $KAFKA_SERVER --describe --group $2 ;; election-leader) kafka-leader-election.sh --bootstrap-server $KAFKA_SERVER --all-topic-partitions --election-type PREFERRED ;; *) echo 'Usage: kafka-operation.sh {producer|consumer|consumer-begin|create|delete} {topic-name}' echo 'Status: kafka-operation.sh {list|groups}' echo 'Status: kafka-operation.sh {group} {group name}' echo 'maintenance: kafka-operation.sh {election-leader}' esac