Gunnar Morling

Gunnar Morling

Random Musings on All Things Software Engineering

Gunnar Morling

Gunnar Morling

Random Musings on All Things Software Engineering

Testing Kafka Connectors

Posted at Aug 25, 2022

Kafka Connect is a key factor for the wide-spread adoption of Apache Kafka: a framework and runtime environment for connectors, it makes the task of getting data either into Kafka or out of Kafka solely a matter of configuration, rather than a bespoke programming job. There’s dozens, if not hundreds, of readymade source and sink connectors, allowing you to create no-code data pipelines between all kinds of databases, APIs, and other systems.

There may be situations though where there is no existing connector matching your requirements, in which case you can implement your own custom connector using the Kafka Connect framework. Naturally, this raises the question of how to test such a Kafka connector, making sure it propagates the data between the connected external system and Kafka correctly and completely. In this blog post I’d like to focus on testing approaches for Kafka Connect source connectors, i.e. connectors like Debezium, which ingest data from an external system into Kafka. Very similar strategies can be employed for testing sink connectors, though.

Unit Tests

One first obvious approach is implementing good old unit tests: simply instantiate the class under test (typically, your SourceConnector or SourceTask implementation), invoke its methods (for instance, SourceConnector::taskConfigs(), or SourceTask::poll()), and assert the return values.

Here’s an example for such a test from kc-etcd, a simple source connector for etcd, which is a distributed key/value store, most prominently used by Kubernetes as its metadata storage. Note that kc-etcd isn’t meant to be a production-ready connector; I have written it primarily for learning and teaching purposes.

This test verifies that the connector produces the correct task configuration, based on a given number of maximum tasks of two:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class EtcdSourceConnectorTest {

  @Test
  public void shouldCreateConfigurationForTasks() throws Exception {
    EtcdSourceConnector connector = new EtcdSourceConnector();
    Map<String, String> config = new HashMap<>();
    config.put(
      "clusters",
      "etcd-a=http://etcd-a-1:2379,http://etcd-a-2:2379,http://etcd-a-3:2379;etcd-b=http://etcd-b-1:2379;etcd-c=http://etcd-c-1:2379"
    ); (1)
    connector.start(config);

    List<Map<String, String>> taskConfigs = connector.taskConfigs(2); (2)
    assertThat(taskConfigs).hasSize(2);

    (3)
    taskConfig = taskConfigs.get(0);
    assertThat(taskConfig).containsEntry("clusters",
        "etcd-a=http://etcd-a-1:2379,http://etcd-a-2:2379,http://etcd-a-3:2379;etcd-b=http://etcd-b-1:2379");

    (4)
    taskConfig = taskConfigs.get(1);
    assertThat(taskConfig).containsEntry("clusters",
        "etcd-c=http://etcd-c-1:2379");
  }
}
1 Configure the connector with three etcd clusters
2 Request the configuration for two tasks
3 The first connector task should handle the first two clusters
4 The second task should handle the remaining third cluster

Things look similar when testing the actual polling loop of the connector’s task class. As this is about testing a source connector, we first need to do some data changes in the configured etcd cluster(s), before we can assert the corresponding SourceRecords that are emitted by the task. As part of kc-etcd, I’ve implemented a very basic testing harness named kcute ("Kafka Connect Unit Testing") which simplifies that process a bit. Here’s an example test from kc-etcd, based on kcute:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class EtcdSourceTaskTest {

  @RegisterExtension (1)
  public static final EtcdClusterExtension etcd = EtcdClusterExtension.builder()
      .withNodes(1)
      .build();

  @RegisterExtension (2)
  public TaskRunner taskRunner = TaskRunner.forSourceTask(EtcdSourceConnectorTask.class)
      .with("clusters", "test-etcd=" + etcd.clientEndpoints().get(0))
      .build();

  @Test
  public void shouldHandleAllTypesOfEvents() throws Exception {
    Client client = Client.builder() (3)
        .keepaliveWithoutCalls(false)
        .endpoints(etcd.clientEndpoints())
        .build();

    KV kvClient = client.getKVClient();
    long currentRevision = getCurrentRevision(kvClient);

    // insert
    ByteSequence key = ByteSequence.from("key-1".getBytes());
    ByteSequence value = ByteSequence.from("value-1".getBytes());
    kvClient.put(key, value).get();

    // update
    key = ByteSequence.from("key-1".getBytes());
    value = ByteSequence.from("value-1a".getBytes());
    kvClient.put(key, value).get();

    // delete
    key = ByteSequence.from("key-1".getBytes());
    kvClient.delete(key).get();

    (4)
    List<SourceRecord> records = taskRunner.take("test-etcd", 3);

    (5)
    // insert
    SourceRecord record = records.get(0);
    assertThat(record.sourcePartition()).isEqualTo(Collections.singletonMap("name", "test-etcd"));
    assertThat(record.sourceOffset()).isEqualTo(Collections.singletonMap("revision", ++currentRevision));
    assertThat(record.keySchema()).isEqualTo(Schema.STRING_SCHEMA);
    assertThat(record.key()).isEqualTo("key-1");
    assertThat(record.valueSchema()).isEqualTo(Schema.STRING_SCHEMA);
    assertThat(record.value()).isEqualTo("value-1");

    // update
    record = records.get(1);
    assertThat(record.sourceOffset()).isEqualTo(Collections.singletonMap("revision", ++currentRevision));
    assertThat(record.key()).isEqualTo("key-1");
    assertThat(record.value()).isEqualTo("value-1a");

    // delete
    record = records.get(2);
    assertThat(record.sourceOffset()).isEqualTo(Collections.singletonMap("revision", ++currentRevision));
    assertThat(record.key()).isEqualTo("key-1");
    assertThat(record.value()).isNull();
  }
}
1 Set up an etcd cluster using the JUnit extension provided by the jetcd client project
2 Set up the task unter test using kcute
3 Obtain a client for etcd and do some data changes
4 Retrieve three records for the specified topic via kcute
5 Assert the emitted SourceRecords corresponding to the data changes done before in etcd

Now one could argue about whether this test is a true unit test or not, given it launches and relies on an instance of an external system in the form of etcd. My personal take on these matters is to be pragmatic here, as a) there’s a difference to true end-to-end integration tests as discussed in the next section, and b) approaches to mock external systems usually are not worth the effort or, worse, result in poor tests, due to incorrect assumptions when implemening the mocked behavior.

This testing approach works very well in general; in particular it doesn’t require you to start Apache Kafka (and ZooKeeper), nor Kafka Connect, resulting in very fast test execution times and a great dev experience when creating and running these tests in your IDE.

But there are some limitations, too. Essentially, we end up emulating the behavior of the actual Kafka Connect runtime in our testing harness. This can become tedious when more advanced Connect features are required for a given test, for instance retrying/restart logic, the dynamic reconfiguration of connector tasks while the connector is running, etc. Ideally, there’d be a testing harness with all these capabilities provided as part of Kafka Connect itself (similar in spirit to the TopologyTestDriver of Kafka Streams), but in the absence of that, we may be better off for certain tests by deploying our source connector into an actual Kafka Connect instance and run assertions against the topic(s) it writes to.

Integration Tests

When it comes to setting up the required infrastructure for integration tests in Java, the go-to solution these days is the excellent Testcontainers project. So let’s see what it takes to test a custom Kafka connector using Testcontainers.

As far as Kafka itself is concerned, there’s a module for that coming with Testcontainers, based on Confluent Platform. Alternatively, you could use the Testcontainers module from the Strimzi project, which provides you with plain upstream Apache Kafka container images. For Kafka Connect, we provide a Testcontainers integration as part of the Debezium project, offering an API for registering connectors and controlling their lifecycle.

Now, unfortunately, the application server like deployment model of Kafka Connect poses a challenge when it comes to testing a connector which is built as part of the current project itself. For each connector plug-in, Connect expects a directory on its plug-in path which contains all the JARs of the connector itself and its dependencies. I’m not aware of any kind of "exploded mode", where you could point Connect to a directory which contains a connector’s class files and its dependencies in JAR form.

This means that that the connector must be packaged into a JAR file as part of the test preparation. In order to make integration tests friendly towards being run from within an IDE, this should happen programmatically within the test itself. That way, any code changes to the connector will be picked up automatically when running the test for the next time, without having to run the project’s Maven build. The entire code for doing this is a bit too long (and boring) for sharing it in this blog post, but you can find it in the kc-etcd repository on GitHub.

Here’s the key parts of an integration test based on that approach, though:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
public class EtcdConnectorIT {

  private static Network network = Network.newNetwork();

  (1)
  private static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.2.0"))
      .withNetwork(network);

  (2)
  public static DebeziumContainer connectContainer = new DebeziumContainer("debezium/connect-base:1.9.5.Final")
      .withFileSystemBind("target/kcetcd-connector", "/kafka/connect/kcetcd-connector")
      .withNetwork(network)
      .withKafka(kafkaContainer)
      .dependsOn(kafkaContainer);

  (3)
  public static EtcdContainer etcdContainer = new EtcdContainer("gcr.io/etcd-development/etcd:v3.5.4",
      "etcd-a", Arrays.asList("etcd-a"))
          .withNetworkAliases("etcd")
          .withNetwork(network);

  @BeforeAll
  public static void startContainers() throws Exception {
    createConnectorJar(); (4)

    Startables.deepStart(Stream.of(
            kafkaContainer, etcdContainer, connectContainer))
            .join();
  }

  @Test
  public void shouldHandleAllTypesOfEvents() throws Exception {
    Client client = Client.builder()
        .endpoints(etcdContainer.clientEndpoint()).build();

    (5)
    ConnectorConfiguration connector = ConnectorConfiguration.create()
        .with("connector.class", "dev.morling.kcetcd.source.EtcdSourceConnector")
        .with("clusters", "test-etcd=http://etcd:2379")
        .with("tasks.max", "2")
        .with("key.converter", "org.apache.kafka.connect.storage.StringConverter")
        .with("value.converter", "org.apache.kafka.connect.storage.StringConverter");

    (6)
    connectContainer.registerConnector("my-connector", connector);
    connectContainer.ensureConnectorTaskState("my-connector", 0, State.RUNNING);

    KV kvClient = client.getKVClient();

    (7)
    // insert
    ByteSequence key = ByteSequence.from("key-1".getBytes());
    ByteSequence value = ByteSequence.from("value-1".getBytes());
    kvClient.put(key, value).get();

    // update
    key = ByteSequence.from("key-1".getBytes());
    value = ByteSequence.from("value-1a".getBytes());
    kvClient.put(key, value).get();

    // delete
    key = ByteSequence.from("key-2".getBytes());
    kvClient.delete(key).get();

    (8)
    List<ConsumerRecord<String, String>> records = drain(getConsumer(kafkaContainer), 3);

    // insert
    ConsumerRecord<String, String> record = records.get(0);
    assertThat(record.key()).isEqualTo("key-1");
    assertThat(record.value()).isEqualTo("value-1");

    // update
    record = records.get(1);
    assertThat(record.key()).isEqualTo("key-1");
    assertThat(record.value()).isEqualTo("value-1a");

    // delete
    record = records.get(2);
    assertThat(record.key()).isEqualTo("key-2");
    assertThat(record.value()).isNull();
  }
}
1 Set up Apache Kafka in a container using the Testcontainers Kafka module
2 Set up Kafka Connect in a container, mounting the target/kcetcd-connector directory onto the plug-in path; as part of the project’s Maven build, all the dependencies of the kc-etcd connector are copied into that directory
3 Set up etcd in a container
4 Package the connector classes from the target/classes directory into a JAR and add that JAR to the mounted plug-in directory; the complete source code for this can be found here
5 Configure an instance of the etcd source connector, using String as the key and value format
6 Register the connector, then block until its tasks have reached the RUNNING state
7 Do some changes in the source etcd cluster
8 Using a regular Kafka consumer, read three records from the corresponding Kafka topic and assert the keys and values (complete code here)

And that’s all there is to it; we now have a test which packages our source connector, deploys it into Kafka Connect and asserts the messages it sends to Kafka. While this is definitely more time-consuming to run than the simple test harness shown above, this true end-to-end approach tests the connector in the actual runtime environment, verifying its behavior when executed via Kafka Connect, just as it would be the case when running the connector in production later on.

Wrap-Up

In this post, we’ve discussed two approaches for testing Kafka Connect source connectors: plain unit tests, "manually" invoking the methods of the connector/task classes under test, and integration tests, deploying a connector into Kafka Connect and verifying its behavior there via Testcontainers.

The former approach provides you with faster turnaround times and shorter feedback cycles, whereas the latter approach gives you the confidence of testing a connector within the actual Kafka Connect runtime environment, at the cost of a more complex infrastructure set-up and longer test execution times. While we’ve focused on testing source connectors in this post, both approaches could equally be applied to sink connectors, with the only difference being that you’d feed records to the connector (either directly or by writing to a Kafka topic) and then observe and assert the expected state changes of the sink system in question.

You can find the complete source code for this post, including some parts omitted here for brevity, in the kc-etcd repository on GitHub. If you think that having a test harness like kcute for unit testing connectors is a good idea and it’s something you’d like to contribute to, then please let me know. Ultimately, this could be extracted into its own project, independent from kc-etcd, or even be upstreamed to the Apache Kafka project proper, reusing as much as possible the actual Connect code, sans the bits for "deploying" connectors via a separate process.

Many thanks to Hans-Peter Grahsl and Kate Stanley for their feedback while writing this blog post!