Skip to content

Instantly share code, notes, and snippets.

@hermes-pimentel
Created November 7, 2022 18:03
Show Gist options
  • Save hermes-pimentel/d5d4c9396c2fb55448aa5a154896c7fe to your computer and use it in GitHub Desktop.
Save hermes-pimentel/d5d4c9396c2fb55448aa5a154896c7fe to your computer and use it in GitHub Desktop.
#!/bin/bash
# Permission is hereby granted, free of charge, to any person obtaining a copy of
# this software and associated documentation files (the "Software"), to deal in
# the Software without restriction, including without limitation the rights to
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
# the Software, and to permit persons to whom the Software is furnished to do so.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
###########################################################
export MSKSOURCE="SOURCE:9092,..."
export MSKTARGET="TARGET:9092,..."
export KAFKAVERSION="3.3.1"
export JMXAGENTVERSION="0.13.0"
export JAVAMEM="-Xms1g -Xmx5g"
###########################################################
amazon-linux-extras install java-openjdk11 -y
yum update -y
yum install python3.7 -y
yum install nmap-ncat -y
yum install git -y
yum erase awscli -y
yum install jq -y
yum install maven -y
cd /home/ec2-user
wget https://bootstrap.pypa.io/get-pip.py
su -c "python3.7 get-pip.py --user" -s /bin/sh ec2-user
su -c "/home/ec2-user/.local/bin/pip3 install boto3 --user" -s /bin/sh ec2-user
su -c "/home/ec2-user/.local/bin/pip3 install kafka-python --user" -s /bin/sh ec2-user
# install AWS CLI 2 - access with aws2
curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
unzip awscliv2.zip
./aws/install -b /usr/local/bin/aws2
ln -s /usr/local/bin/aws2/aws /usr/local/bin/aws
# Create dirs, get Apache Kafka
su -c "mkdir -p kafka3" -s /bin/sh ec2-user
ln -s /home/ec2-user/kafka3 /home/ec2-user/kafka
cd kafka
su -c "wget https://downloads.apache.org/kafka/${KAFKAVERSION}/kafka_2.13-${KAFKAVERSION}.tgz" -s /bin/sh ec2-user
su -c "tar -xzf kafka_2.13-${KAFKAVERSION}.tgz --strip 1" -s /bin/sh ec2-user
# Setup prometheus agent
cd /home/ec2-user
su -c "mkdir prometheus" -s /bin/sh ec2-user
cd prometheus
su -c "wget https://repo1.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_javaagent/${JMXAGENTVERSION}/jmx_prometheus_javaagent-${JMXAGENTVERSION}.jar" -s /bin/sh ec2-user
#create config files
cd /home/ec2-user
su -c "mkdir -p kafka-connect-config" -s /bin/sh ec2-user
su -c "cp /home/ec2-user/kafka/config/connect-distributed.properties /home/ec2-user/kafka-connect-config/connect-distributed.properties" -s /bin/sh ec2-user
sed -i -e "s/bootstrap.servers=localhost:9092/bootstrap.servers=${MSKTARGET}/g" /home/ec2-user/kafka-connect-config/connect-distributed.properties
sed -i -e "s/offset.storage.replication.factor=1/offset.storage.replication.factor=3/g" /home/ec2-user/kafka-connect-config/connect-distributed.properties
sed -i -e "s/config.storage.replication.factor=1/config.storage.replication.factor=3/g" /home/ec2-user/kafka-connect-config/connect-distributed.properties
sed -i -e "s/status.storage.replication.factor=1/status.storage.replication.factor=3/g" /home/ec2-user/kafka-connect-config/connect-distributed.properties
sed -i -e "s/#listeners=HTTP:\/\/:8083/listeners=http:\/\/localhost:8083/g" /home/ec2-user/kafka-connect-config/connect-distributed.properties
sed -i -e "s/group.id=connect-cluster/group.id=mm2-connect-cluster/g" /home/ec2-user/kafka-connect-config/connect-distributed.properties
sed -i -e "s/#status.storage.partitions=5/status.storage.partitions=5/g" /home/ec2-user/kafka-connect-config/connect-distributed.properties
sed -i -e "s/#offset.storage.partitions=25/offset.storage.partitions=25/g" /home/ec2-user/kafka-connect-config/connect-distributed.properties
#MirrorSourceConnector
cat << 'EOF' | envsubst > /home/ec2-user/kafka-connect-config/mm2-msc-cust-repl-policy-no-auth.json
{
"name": "mm2-msc",
"connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"clusters": "source,target",
"source.cluster.alias": "source",
"target.cluster.alias": "target",
"target.cluster.bootstrap.servers": "${MSKTARGET}",
"source.cluster.bootstrap.servers": "${MSKSOURCE}",
"topics": ".*",
"topics.exclude": ".*[-.]internal,.*.replica,__.*,.*-config,.*-status,.*-offset",
"tasks.max": "10",
"groups.exclude": "console-consumer-.*,connect-.*,__.*",
"key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"replication.policy.class": "org.apache.kafka.connect.mirror.IdentityReplicationPolicy",
"replication.factor": "3",
"emit.checkpoints.enabled":"true",
"refresh.topics.enabled":"true",
"refresh.groups.enabled":"true",
"producer.enable.idempotence":"true",
"offset-syncs.topic.replication.factor": "3",
"sync.topic.acls.interval.seconds": "20",
"sync.topic.configs.interval.seconds": "20",
"refresh.topics.interval.seconds": "20",
"refresh.groups.interval.seconds": "20",
"consumer.group.id": "MirrorSourceConnector",
"producer.max.block.ms":"10000",
"producer.linger.ms": "500",
"producer.retry.backoff.ms":"1000",
"consumer.auto.offset.reset":"earliest",
"consumer.max.partition.fetch.bytes":"15728640",
"consumer.max.poll.records":"5000",
"consumer.receive.buffer.bytes":"15728640",
"consumer.send.buffer.bytes":"10485760",
"producer.batch.size":"10485760",
"producer.buffer.memory":"30331648",
"producer.max.request.size":"10485760",
"producer.message.max.bytes":"10485760"
}
EOF
#MirrorCheckpointConnector
cat << 'EOF' | envsubst > /home/ec2-user/kafka-connect-config/mm2-cpc-cust-repl-policy-no-auth.json
{
"name": "mm2-cpc",
"connector.class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
"clusters": "source,target",
"source.cluster.alias": "source",
"target.cluster.alias": "target",
"target.cluster.bootstrap.servers": "${MSKTARGET}",
"source.cluster.bootstrap.servers": "${MSKSOURCE}",
"tasks.max": "10",
"key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"replication.policy.class": "org.apache.kafka.connect.mirror.IdentityReplicationPolicy",
"replication.factor": "3",
"checkpoints.topic.replication.factor": "3",
"emit.checkpoints.interval.seconds": "20",
"sync.group.offsets.enabled": "true",
"emit.checkpoints.enabled":"true",
"refresh.topics.enabled":"true",
"refresh.groups.enabled":"true",
"producer.enable.idempotence":"true",
"offset-syncs.topic.replication.factor": "3",
"sync.topic.acls.interval.seconds": "20",
"sync.topic.configs.interval.seconds": "20",
"refresh.topics.interval.seconds": "20",
"refresh.groups.interval.seconds": "20",
"topics": ".*",
"topics.exclude": ".*[-.]internal,.*.replica,__.*,.*-config,.*-status,.*-offset",
"groups.exclude": "console-consumer-.*,connect-.*,__.*"
}
EOF
#MirrorHeartbeatConnector
cat << 'EOF' | envsubst > /home/ec2-user/kafka-connect-config/mm2-hbc-cust-repl-policy-no-auth.json
{
"name": "mm2-hbc",
"connector.class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
"clusters": "source,target",
"source.cluster.alias": "source",
"target.cluster.alias": "target",
"target.cluster.bootstrap.servers": "${MSKTARGET}",
"source.cluster.bootstrap.servers": "${MSKSOURCE}",
"replication.policy.class": "org.apache.kafka.connect.mirror.IdentityReplicationPolicy",
"key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"replication.factor": "3",
"heartbeats.topic.replication.factor": "3",
"emit.heartbeats.interval.seconds": "5",
"emit.checkpoints.interval.seconds": "5",
"sync.group.offsets.enabled": "true",
"emit.checkpoints.enabled":"true",
"refresh.topics.enabled":"true",
"refresh.groups.enabled":"true",
"topics": ".*",
"topics.exclude": ".*[-.]internal,.*.replica,__.*,.*-config,.*-status,.*-offset",
"tasks.max": "10",
"groups.exclude": "console-consumer-.*,connect-.*,__.*",
"sync.group.offsets.interval.seconds":"5"
}
EOF
cat << 'EOF' > /home/ec2-user/prometheus/kafka-connect.yml
lowercaseOutputName: true
rules:
#kafka.connect:type=app-info,client-id="{clientid}"
#kafka.consumer:type=app-info,client-id="{clientid}"
#kafka.producer:type=app-info,client-id="{clientid}"
- pattern: 'kafka.(.+)<type=app-info, client-id=(.+)><>start-time-ms'
name: kafka_$1_start_time_seconds
labels:
clientId: "$2"
help: "Kafka $1 JMX metric start time seconds"
type: GAUGE
valueFactor: 0.001
- pattern: 'kafka.(.+)<type=app-info, client-id=(.+)><>(commit-id|version): (.+)'
name: kafka_$1_$3_info
value: 1
labels:
clientId: "$2"
$3: "$4"
help: "Kafka $1 JMX metric info version and commit-id"
type: GAUGE
#kafka.producer:type=producer-topic-metrics,client-id="{clientid}",topic="{topic}"", partition="{partition}"
#kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{clientid}",topic="{topic}"", partition="{partition}"
- pattern: kafka.(.+)<type=(.+)-metrics, client-id=(.+), topic=(.+), partition=(.+)><>(.+-total|compression-rate|.+-avg|.+-replica|.+-lag|.+-lead)
name: kafka_$2_$6
labels:
clientId: "$3"
topic: "$4"
partition: "$5"
help: "Kafka $1 JMX metric type $2"
type: GAUGE
#kafka.producer:type=producer-topic-metrics,client-id="{clientid}",topic="{topic}"
#kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{clientid}",topic="{topic}"", partition="{partition}"
- pattern: kafka.(.+)<type=(.+)-metrics, client-id=(.+), topic=(.+)><>(.+-total|compression-rate|.+-avg)
name: kafka_$2_$5
labels:
clientId: "$3"
topic: "$4"
help: "Kafka $1 JMX metric type $2"
type: GAUGE
#kafka.connect:type=connect-node-metrics,client-id="{clientid}",node-id="{nodeid}"
#kafka.consumer:type=consumer-node-metrics,client-id=consumer-1,node-id="{nodeid}"
- pattern: kafka.(.+)<type=(.+)-metrics, client-id=(.+), node-id=(.+)><>(.+-total|.+-avg)
name: kafka_$2_$5
labels:
clientId: "$3"
nodeId: "$4"
help: "Kafka $1 JMX metric type $2"
type: UNTYPED
#kafka.connect:type=kafka-metrics-count,client-id="{clientid}"
#kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{clientid}"
#kafka.consumer:type=consumer-coordinator-metrics,client-id="{clientid}"
#kafka.consumer:type=consumer-metrics,client-id="{clientid}"
- pattern: kafka.(.+)<type=(.+)-metrics, client-id=(.*)><>(.+-total|.+-avg|.+-bytes|.+-count|.+-ratio|.+-age|.+-flight|.+-threads|.+-connectors|.+-tasks|.+-ago)
name: kafka_$2_$4
labels:
clientId: "$3"
help: "Kafka $1 JMX metric type $2"
type: GAUGE
#kafka.connect:type=connector-task-metrics,connector="{connector}",task="{task}<> status"
- pattern: 'kafka.connect<type=connector-task-metrics, connector=(.+), task=(.+)><>status: ([a-z-]+)'
name: kafka_connect_connector_status
value: 1
labels:
connector: "$1"
task: "$2"
status: "$3"
help: "Kafka Connect JMX Connector status"
type: GAUGE
#kafka.connect:type=task-error-metrics,connector="{connector}",task="{task}"
#kafka.connect:type=source-task-metrics,connector="{connector}",task="{task}"
#kafka.connect:type=sink-task-metrics,connector="{connector}",task="{task}"
#kafka.connect:type=connector-task-metrics,connector="{connector}",task="{task}"
- pattern: kafka.connect<type=(.+)-metrics, connector=(.+), task=(.+)><>(.+-total|.+-count|.+-ms|.+-ratio|.+-avg|.+-failures|.+-requests|.+-timestamp|.+-logged|.+-errors|.+-retries|.+-skipped)
name: kafka_connect_$1_$4
labels:
connector: "$2"
task: "$3"
help: "Kafka Connect JMX metric type $1"
type: GAUGE
#kafka.connect:type=connector-metrics,connector="{connector}"
#kafka.connect:type=connect-worker-metrics,connector="{connector}"
- pattern: kafka.connect<type=connect-worker-metrics, connector=(.+)><>([a-z-]+)
name: kafka_connect_worker_$2
labels:
connector: "$1"
help: "Kafka Connect JMX metric $1"
type: GAUGE
#kafka.connect:type=connect-worker-metrics
- pattern: kafka.connect<type=connect-worker-metrics><>([a-z-]+)
name: kafka_connect_worker_$1
help: "Kafka Connect JMX metric worker"
type: GAUGE
#kafka.connect:type=connect-worker-rebalance-metrics
- pattern: kafka.connect<type=connect-worker-rebalance-metrics><>([a-z-]+)
name: kafka_connect_worker_rebalance_$1
help: "Kafka Connect JMX metric rebalance information"
type: GAUGE
#kafka.connect.mirror:type=MirrorSourceConnector
- pattern: kafka.connect.mirror<type=MirrorSourceConnector, target=(.+), topic=(.+), partition=([0-9]+)><>([a-z-]+)
name: kafka_connect_mirror_source_connector_$4
help: Kafka Connect MM2 Source Connector Information
labels:
destination: "$1"
topic: "$2"
partition: "$3"
type: GAUGE
#kafka.connect.mirror:type=MirrorCheckpointConnector
- pattern: kafka.connect.mirror<type=MirrorCheckpointConnector, source=(.+), target=(.+)><>([a-z-]+)
name: kafka_connect_mirror_checkpoint_connector_$3
help: Kafka Connect MM2 Checkpoint Connector Information
labels:
source: "$1"
target: "$2"
EOF
sudo chown -R ec2-user:ec2-user /home/ec2-user/kafka-connect-config
# Setup unit in systemd for Kafka Connect
echo -n "
[Unit]
Description=Kafka Connect
After=network.target
[Service]
Type=simple
User=ec2-user
Environment='KAFKA_OPTS=-javaagent:/home/ec2-user/prometheus/jmx_prometheus_javaagent-${JMXAGENTVERSION}.jar=3600:/home/ec2-user/prometheus/kafka-connect.yml'
Environment='KAFKA_HEAP_OPTS=${JAVAMEM}'
ExecStart=/bin/sh -c '/home/ec2-user/kafka/bin/connect-distributed.sh /home/ec2-user/kafka-connect-config/connect-distributed.properties > /tmp/kafka-connect.log 2>&1'
Restart=on-abnormal
[Install]
WantedBy=multi-user.target" | envsubst > /etc/systemd/system/kafka-connect.service
systemctl daemon-reload
systemctl start kafka-connect
sleep 10
cd /home/ec2-user/kafka-connect-config/
curl -X PUT -H "Content-Type: application/json" --data @mm2-msc-cust-repl-policy-no-auth.json http://localhost:8083/connectors/mm2-msc/config | jq '.'
curl -X PUT -H "Content-Type: application/json" --data @mm2-cpc-cust-repl-policy-no-auth.json http://localhost:8083/connectors/mm2-cpc/config | jq .
curl -X PUT -H "Content-Type: application/json" --data @mm2-hbc-cust-repl-policy-no-auth.json http://localhost:8083/connectors/mm2-hbc/config | jq '.'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment