Skip to content

Mirroring Kafka Clusters

Ahmed Abdul Hamid edited this page Jul 15, 2020 · 44 revisions

Contents

Overview

In this use case, we set up two independent Kafka brokers locally, and use Brooklin to selectively mirror a subset of topics between them.

Prerequisites

Install Java Development Kit 8 or higher if you do not already have it. Here are some options:

Instructions

You can skip the initial setup steps if you use the Vagrant file we authored to automate setting up Brooklin and Kafka for this demo.

If you would like to do so:

  1. Please, refer to https://www.vagrantup.com/intro/getting-started/ for instructions on how to install and use Vagrant with one of its virtualization providers (e.g. VirtualBox).

  2. Download this Vagrant file to a convenient location on your computer and run the command below to provision a local image with Brooklin and Kafka up and running:

    vagrant up
  3. SSH into the provisioned host:

    vagrant ssh
  4. After the Vagrant image is successfully set up, you can skip to the "Create a datastream" step below.

Set up two independent Kafka servers

  1. Download the latest Kafka tarball and untar it.

    tar -xzf kafka_2.12-2.2.0.tgz
    cd kafka_2.12-2.2.0
  2. Create two different server.properties files for the two different Kafka servers.

    cp config/server.properties config/server-src.properties
    cp config/server.properties config/server-dest.properties
  3. Edit these two config files to specify different values for the log.dirs, zookeeper.connect, and listeners config properties. You can do this manually or use the commands below.

    sed -ie 's/\/tmp\/kafka-logs/\/tmp\/kafka-logs\/src/; s/localhost:2181/localhost:2181\/src/' config/server-src.properties 
    echo listeners=PLAINTEXT://:9093 >> config/server-src.properties
    
    sed -ie 's/\/tmp\/kafka-logs/\/tmp\/kafka-logs\/dest/; s/localhost:2181/localhost:2181\/dest/' config/server-dest.properties
    echo listeners=PLAINTEXT://:9092 >> config/server-dest.properties
  4. Start a ZooKeeper server

    bin/zookeeper-server-start.sh config/zookeeper.properties > /dev/null & 
  5. Start two Kafka servers (we'll call them source and destination)

    bin/kafka-server-start.sh config/server-src.properties > /dev/null  &
    bin/kafka-server-start.sh config/server-dest.properties > /dev/null &

Create Kafka topics in the source server

  1. Create three topics in the source Kafka server

    bin/kafka-topics.sh --topic first-topic --bootstrap-server localhost:9093 --create --replication-factor 1 --partitions 1 
    bin/kafka-topics.sh --topic second-topic --bootstrap-server localhost:9093 --create --replication-factor 1 --partitions 1  
    bin/kafka-topics.sh --topic third-topic --bootstrap-server localhost:9093 --create --replication-factor 1 --partitions 1
  2. Populate the topics you created with some data

    # We use the LICENSE and NOTICE files packaged in the Kafka tarball
    
     cat LICENSE | bin/kafka-console-producer.sh --topic first-topic --broker-list localhost:9093
     cat NOTICE | bin/kafka-console-producer.sh --topic second-topic --broker-list localhost:9093
     cat NOTICE | bin/kafka-console-producer.sh --topic third-topic --broker-list localhost:9093

Set up Brooklin

  1. Download the latest tarball (tgz) from Brooklin releases
  2. Untar the Brooklin tarball
    tar -xzf brooklin-1.0.0.tgz
    cd brooklin-1.0.0 
  3. Run Brooklin
    bin/brooklin-server-start.sh config/server.properties > /dev/null 2>&1 &

Create a datastream

  1. Create a datastream to mirror only the first two Kafka topics you created, first-topic and second-topic, from the source to the destination Kafka server.

    Notice how we use a regex (-s option in the command below) to select the topics we are interested in. The pattern we specify intentionally excludes third-topic.

    cd brooklin-1.0.0
    bin/brooklin-rest-client.sh -o CREATE -u http://localhost:32311/ -n first-mirroring-stream -s "kafka://localhost:9093/^(first|second)-topic$" -c kafkaMirroringConnector -t kafkaTransportProvider -m '{"owner":"test-user","system.reuseExistingDestination":"false"}' 2>/dev/null

    Here are the options we used to create this datastream:

    -o CREATE                                         The operation is datastream creation
    -u http://localhost:32311/                        Datstream Management Service URI
    -n first-file-datastream                          Datastream name
    -s kafka://localhost:9093/^(first|second)-topic$  Datastream source URI
    -c kafkaMirroringConnector                        Connector name ("kafkaMirroringConnector" refers to KafkaMirrorConnector)
    -t kafkaTransportProvider                         Transport provider name ("kafkaTransportProvider" refers to KafkaTransportProvider)
    -m '{"owner":"test-user",
         "system.reuseExistingDestination": "false"}' Datastream metadata
    
    • For the datastream source (-s) option in this example, it is required to specify a URI that starts with kafka:// or kafkassl://.

    • For the datastream metadata (-m) option

      • Specifying an owner is mandatory
      • Setting system.reuseExistingDestination to false keeps Brooklin from reusing an existing Kafka topic (if any) in the destination Kafka server

    Check the KafkaMirrorConnector wiki page to learn more about its various configuration options.

  2. Verify the datastream creation by requesting all datastream metadata from Brooklin.

    bin/brooklin-rest-client.sh -o READALL -u http://localhost:32311/ 2>/dev/null

    Notice the connectionString values under source and destination

  3. Additionally, you can view some more information about the different Datastreams and DatastreamTasks by querying the diagnostics REST endpoint.

    curl -s "http://localhost:32311/diag?q=status&type=connector&scope=kafkaMirroringConnector&content=datastream_state?datastream=first-mirroring-stream"
  4. If you're using our Vagrant file, you may pipe the output of the above command to jq for better JSON formatting:

    curl -s "http://localhost:32311/diag?q=status&type=connector&scope=kafkaMirroringConnector&content=datastream_state?datastream=first-mirroring-stream" | jq .

Verify topic mirroring

  1. Verify that only first-topic and second-topic were created in the destination Kafka server by running:

    cd kafka_2.12-2.2.0
    bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
  2. Verify the created topics have the right contents by running:

    bin/kafka-console-consumer.sh --topic first-topic --bootstrap-server localhost:9092 --from-beginning
    bin/kafka-console-consumer.sh --topic second-topic --bootstrap-server localhost:9092 --from-beginning

See mirroring in action!

  1. Use the Kafka console consumer to read from the Kafka topic, first-topic, that Brooklin created in the destination server.

    bin/kafka-console-consumer.sh --topic first-topic --bootstrap-server localhost:9092 --from-beginning
  2. Open another terminal window, and launch the Kafka console producer configuring it to write to first-topic in the source server.

    bin/kafka-console-producer.sh --topic first-topic --broker-list localhost:9093 
  3. Start typing text in the Kafka producer terminal. Hit enter then observe the Kafka consumer terminal you launched in step 1. You should see the message you typed getting mirrored to the destination server.

Pause and resume

  • You can stop mirroring temporarily by pausing the datasteam

    cd brooklin-1.0.0
    bin/brooklin-rest-client.sh -o PAUSE -n first-mirroring-stream -u http://localhost:32311/ 2>/dev/null
  • Similarly, you can re-enable mirroring by resuming the datastream

    bin/brooklin-rest-client.sh -o RESUME -n first-mirroring-stream -u http://localhost:32311/ 2>/dev/null

Stop Brooklin, Kafka, and ZooKeeper

When you are done, run the following commands to stop all running apps.

cd brooklin-1.0.0 
bin/brooklin-server-stop.sh

cd kafka_2.12-2.2.0
bin/kafka-server-stop.sh
bin/zookeeper-server-stop.sh