Build Resilient Unit Tests for Kafka Streams
Discussions on unit testing typically center on coverage, specifically statement coverage. Do the unit tests cover every line of code? Even the most simple of methods or functions can have 100% statement coverage and still create serious production problems.
Test engineers (and application developers) tend to default to example-based tests, where one input scenario is tested. These example-based tests are useful, but the “universe of possible inputs” could generate nondeterministic outputs — or, even worse, a runtime error. Property-based testing is a useful addition where a test suite can execute hundreds or thousands of test inputs, using every conceivable input that could break the method under test — empty collections, negative numbers, long strings, special characters, etc…
If the code passes a property-based test suite, you’ll have a greater degree of confidence in the resiliency of that method in the production environment.
Applying the Paradigm to Data Pipelines
The asynchronous, distributed nature of stream processing codebases could greatly benefit from this type of resiliency testing. Why? In most cases there’s not a problem until there’s a PROBLEM! Are these data pipelines processing events from multiple locales in several languages? What about data formats? Special characters? Null/empty values?
Let’s apply the principles of property-based testing to Apache Kafka-based stream processing applications and topologies. We’ll start with Kafka Streams, then attempt the same with Flink SQL. This is a Java example project. Therefore, we’ll use the jqwik framework to generate arbitrary objects as input events.
Tooling
I’ve always looked for libraries to provide the tools I found useful in functional paradigm languages with respect to Java shops. In my Scala/Finatra/Spark days, I was impressed by libraries like scalacheck — particularly how simple it was to generate random inputs and run many scenarios with a short feedback loop. There were simple bindings to test libraries like scalatest and specs2, making it easy to incorporate property-based tests into the existing test pipeline.
Jqwik seems to check those boxes, integrating seamlessly with junit to execute test scenarios. Jqwik is far from the pioneer in the realm of property-based testing — not even on the JVM. Clojure now has test.check — earlier there was test.generative. All of these frameworks are inspired in some way by Haskell QuickCheck. For more information on property-based testing and frameworks, check out this page in the jqwik documentation.
My Soapbox
I believe there is a welcomed side effect — particularly for Java codebases — when it comes to property-based testing. I find the style of my code to become much more functional and concise in nature. Object-oriented programming can easily lead us towards behavior such as methods with side effects, mutability, and overuse of instance fields. In distributed, event-driven systems, problems often caused by these concepts can have quite the blast radius.
When we begin thinking more functional, our code becomes much easier to read, debug, and refactor. Our applications are now composed of well-tested, highly resilient functions and methods.
Examples with Kafka Streams
The test-utils module of Kafka Streams provides the tools we need to execute JUnit test cases of a given topology. The TopologyTestDriver can be used to pipe data to input topics and interrogate output topics, applying basic assertions on the output data.
Using the provided test-utils along with jqwik, let’s create a simple topology with property-based test suites. Later, we’ll explore a more “realistic” topology — specifically a join of 2 streams.
Simple Example
The ShoutStreams topology is a very simple use case to start our property-based test discussion. This topology simply performs a [1] toUpperCase() on any String input event, piping the resulting String to the [2] designed output topic.
public Topology buildTopology(final Properties props) {
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> inputStream = builder.stream(INPUT_TOPIC);
inputStream.peek((k, v) -> LOG.debug("input event -> key: {}, value {}", k, v))
.mapValues(v -> v.toUpperCase()) // [1]
.to(OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.String())); // [2]
return builder.build(props);
}
With this in mind, a property-based test could simply create a random alphanumeric String as the value of the inbound Kafka event. Then the test evaluates the output topic, comparing the all-caps version of the input to the actual output event(s).
@Property // [1]
void testShout(@ForAll("alphaNumericStrings") String input) { //[2]
// for later comparison, force the input to lower case
final String lowerInput = input.toLowerCase();
Topology topology = new ShoutStreams().buildTopology(props);
try(TopologyTestDriver testDriver = new TopologyTestDriver(topology, props)) {
TestInputTopic<String, String> testInputTopic = testDriver
.createInputTopic(INPUT_TOPIC,
Serdes.String().serializer(),
Serdes.String().serializer());
TestOutputTopic<String, String> testOutputTopic = testDriver
.createOutputTopic(OUTPUT_TOPIC,
Serdes.String().deserializer(),
Serdes.String().deserializer());
// with a randomized key, send the lowerInput to the input topic
testInputTopic.pipeInput(UUID.randomUUID().toString(), lowerInput);
final String actualOutput = testOutputTopic.readValue();
// the topology should output a string identical to the UPPER CASE of our input value.
assertEquals(lowerInput.toUpperCase(), actualOutput);
}
}
@Provide
public Arbitrary<String> alphaNumericStrings() {
return Arbitraries.strings().alpha()
.numeric()
.ofMaxLength(1)
.ofMaxLength(100);
}
Let’s explain some concepts here. The junit-savvy user will immediately notice the [1] @Propertry
annotation versus the typical @Test annotation. This annotation tells junit to execute this test with the jqwik runner.
Next is the [2] @ForAll
annotation on a parameter in the test method. This tells jqwik to generate some input with “interesting” values. In this case, we explicitly define an Arbitrary supplier to use for generating input — a method annotated to @Provide
an Artibrary<String>
. As the name implies, an Arbitrary
is just a randomly generated value.
Our arbitrary supplier uses one of the canned Arbitraries
methods to define how we’d like our String
objects to be generated. In this case the result is an alphanumeric string of some length between 1 and 100 characters.
Executing this simple test will generate many (in this case 1000) test executions:
timestamp = 2024-06-17T11:09:23.554582, ShoutStreamsTest:testShout =
|-----------------------jqwik-----------------------
tries = 1000 | # of calls to property
checks = 1000 | # of not rejected calls
generation = RANDOMIZED | parameters are randomly generated
after-failure = SAMPLE_FIRST | try previously failed sample, then previous seed
when-fixed-seed = ALLOW | fixing the random seed is allowed
edge-cases#mode = MIXIN | edge cases are mixed in
edge-cases#total = 7 | # of all combined edge cases
edge-cases#tried = 7 | # of edge cases tried in current run
seed = -8907747202341246742 | random seed to reproduce generated values
Now you see a tangible benefit: we ran 1000 tests on this method in a matter of seconds. ONE THOUSAND TEST SCENARIOS! It’s virtually impossible to code this number of example-based tests.
So you’re likely thinking: “This simple example is nice. Maybe over-simplified. My Kafka Streams topologies have REAL work to do.”
Join Example
Let’s create an example topology that joins 2 streams of data. One stream contains User
events, which have an id
attribute. The other stream consists of Device
events, which have a correlating userId
attribute. Kafka Streams is more than capable of joining these 2 streams — outputting the details of a Device
enriched with information about the User
of that device as a UserDeviceDetails
object to an output topic.
public Topology buildTopology(final Properties props) {
final StreamsBuilder builder = new StreamsBuilder();
// Rekey the input Device stream by the userId attribute.
KStream<String, Device> devicesByUser = builder.stream(DEVICES_TOPIC,
Consumed.with(Serdes.String(), deviceSerde))
.peek((k, v) -> LOG.warn("device: key = {}, value = {}", k, v))
.map((k,v) -> new KeyValue<>(v.getUserId(), v));
devicesByUser.to("rekeyed-devices", Produced.with(Serdes.String(), deviceSerde));
KTable<String, User> userTable = builder.table(USERS_TOPIC,
Consumed.with(Serdes.String(), new JsonSerdes<>(User.class)));
// With both inputs keyed by userId, we can join them using our ValueJoiner implementation.
devicesByUser.join(userTable, new DeviceUserValueJoiner(),
Joined.with(Serdes.String(), new JsonSerdes<>(Device.class), userSerde))
.to(OUTPUT_TOPIC, Produced.with(Serdes.String(), userDeviceDetailsSerde));
return builder.build(props);
}
There are 2 use cases to test in this topology — the “match” and the “miss” of the defined join. But first we need a way to generate instances of both the Device
and User
input objects.
/**
* Arbitrary generator for User objects.
*/
@Provide
public Arbitrary<User> userArbitrary() {
// some alphanumeric string of length 20
Arbitrary<String> idArb = Arbitraries.strings().alpha().numeric().ofLength(20);
// some alphanumeric string of length 10
Arbitrary<String> nameArb = Arbitraries.strings().alpha().ofLength(10);
// jqwik includes some canned Atribraries, like email.
EmailArbitrary emailArb = Web.emails();
return Combinators.combine(idArb, nameArb, emailArb).as((id, name, email) ->
User.builder()
.id(id)
.name(name)
.email(email)
.build());
}
/**
* list of available device types
*/
private static final List<String> MOBILE_DEVICES = Arrays.asList(
"iPhone",
"Galaxy",
"Pixel",
"OnePlus",
"Xperia",
"Nokia",
"Huawei",
"Motorola"
);
/**
* Arbitrary generator for Device objects.
*/
@Provide
public Arbitrary<Device> deviceArbitrary() {
Arbitrary<String> idArb = Arbitraries.strings().alpha().numeric().ofLength(20);
// pick one of the mobile devices from the available list above.
Arbitrary<String> typeArb = Arbitraries.of(MOBILE_DEVICES);
Arbitrary<String> uidArb = Arbitraries.strings().alpha().numeric().ofLength(20);
return Combinators.combine(idArb, typeArb, uidArb).as((id, type, uid) ->
Device.builder()
.id(id)
.type(type)
.userId(uid)
.build()
);
}
With these Arbitrary
suppliers, we can get into our test cases. In the “match” use case, we’ll manipulate the input objects to simulate an expected match from the topology:
// [1]
@Property(tries = 50, edgeCases = EdgeCasesMode.FIRST, shrinking = ShrinkingMode.BOUNDED)
void testMatch(@ForAll("userArbitrary") User user, @ForAll("deviceArbitrary") Device device) {
// generate a user id to use in both objects
final String matchingUserId = UUID.randomUUID().toString();
User inputUser = user.toBuilder()
.id(matchingUserId)
.build();
Device inputDevice = device.toBuilder()
.userId(matchingUserId)
.build();
final Function<TestOutputTopic<String, UserDeviceDetails>, List<UserDeviceDetails>> outputFunction = topic ->
topic.readValuesToList()
.stream()
.filter(Objects::nonNull)
.filter(ud -> ud.userId().equals(matchingUserId))
.collect(Collectors.toUnmodifiableList());
// [2]
List<UserDeviceDetails> output = executeTopology(inputUser, inputDevice, outputFunction);
// there should ALWAYS be a matching UserDeviceDetails record from the topology because we matched the user id values.
assertEquals(1, output.size());
}
There are a couple of new concepts here. Given the complexity of this topology, running 1000 test cases doesn’t give us the “quick feedback loop” that unit tests are designed to satisfy. There is quite a bit of Kafka Streams framework instantiation needed to execute the topology. With that in mind, we use the parameters of the [1] @Property annotation to limit the number of test executions (tries) and execute “edge cases” first. Property-based testing uses a concept known as “shrinking” such that when failures occur, the framework will output a minimal example case. This example case could be used to create an example-based test for debugging the method under test.
I took the liberty here to make the code executing the topology reusable to the “miss”-use case. The [2] executeTopology()
method takes the input objects ( User
and Device
) and a Function
dictating how to filter the output topic, returning a List
used in our test assertions. In the “match” use case, we expect there to be a value in the returned list.
The output of the testMatch
execution would look something like this:
timestamp = 2024-06-18T14:13:52.089763, DeviceUserEnricherTest:testMatch =
|-----------------------jqwik-----------------------
tries = 50 | # of calls to property
checks = 50 | # of not rejected calls
generation = RANDOMIZED | parameters are randomly generated
after-failure = SAMPLE_FIRST | try previously failed sample, then previous seed
when-fixed-seed = ALLOW | fixing the random seed is allowed
edge-cases#mode = FIRST | edge cases are generated first
edge-cases#total = 50 | # of all combined edge cases
edge-cases#tried = 50 | # of edge cases tried in current run
seed = -8018840044203219213 | random seed to reproduce generated values
The “miss” use case test will force the user ID values of the input objects to NOT be equal. The join does not match the User
to the Device
— this our executeTopology()
method returns an empty List
from the output topic.
@Property(tries = 50, edgeCases = EdgeCasesMode.FIRST, shrinking = ShrinkingMode.BOUNDED)
void testMiss(@ForAll("userArbitrary") User user, @ForAll("deviceArbitrary") Device device) {
// generate a user ID
final String userId = UUID.randomUUID().toString();
// set that user ID here
User inputUser = user.toBuilder()
.id(userId)
.build();
// force a different user ID onto the device
Device inputDevice = device.toBuilder()
.userId(new StringBuilder(userId).reverse().toString())
.build();
final Function<TestOutputTopic<String, UserDeviceDetails>, List<UserDeviceDetails>> outputFunction = topic ->
topic.readValuesToList()
.stream()
.filter(Objects::nonNull)
.filter(ud -> ud.userId().equals(userId))
.collect(Collectors.toUnmodifiableList());
List<UserDeviceDetails> output = executeTopology(inputUser, inputDevice, outputFunction);
assertTrue(output.isEmpty());
}
The output of the testMiss execution would look something like this:
timestamp = 2024-06-18T14:13:59.606335, DeviceUserEnricherTest:testMiss =
|-----------------------jqwik-----------------------
tries = 50 | # of calls to property
checks = 50 | # of not rejected calls
generation = RANDOMIZED | parameters are randomly generated
after-failure = SAMPLE_FIRST | try previously failed sample, then previous seed
when-fixed-seed = ALLOW | fixing the random seed is allowed
edge-cases#mode = FIRST | edge cases are generated first
edge-cases#total = 50 | # of all combined edge cases
edge-cases#tried = 50 | # of edge cases tried in current run
seed = -1411052433841211412 | random seed to reproduce generated values
What about other Kafka-based stream processing?
Great question. The examples here are limited to Kafka Streams. I’ll update this repo with Flink SQL examples, along with the Flink Table API in Java.
Watch this space…
In Summary
The examples we’ve discussed can be found in GitHub. As mentioned above, as I expand on this topic I’ll post other examples to this repo.
As developers and test engineers, it’s beyond our reach to create example-based tests to cover the universe of possible inputs we could encounter. Property-based testing is a powerful way to gauge and improve the resiliency of any function. Murphy’s law applies to production event stream data, so why not use tools to help mitigate the outcomes we have yet to imagine?
Libraries like jqwik are here to facilitate just that. There are other JVM libraries and frameworks, as well as implementations for other languages you may use in your stream processing applications. In the end, the goal is quality through resiliency. I think you’ll find property-based testing to be a useful part of your toolbelt.
Wanna Learn More?
Confluent Developer is a great resource for stream processing enthusiasts of all levels. There you’ll find articles, materials and courses on all manner of event streaming topics — including a library of tutorials for subjects like Kafka Streams and Flink SQL.