As a software architect, designing robust, scalable, and adaptable distributed systems is a constant pursuit. When working with Apache Kafka, a common challenge arises: how do you send messages that, while adhering to a generic wrapper, can carry different types of payloads based on the specific event or context? In our previous discussion, we explored using Avro Union Types within a single topic. Now, let’s explore an equally powerful and often simpler alternative: leveraging dedicated Kafka topics for each specific data type.

This approach streamlines consumer logic and can provide clearer topic semantics, making it a strong contender for managing polymorphic data in your event-driven architectures.

The Polymorphism Predicament and Topic Specialization

While Avro Union Types elegantly solve polymorphism within a single message field, sometimes the natural separation of data aligns better with dedicated topics. For instance, ProfileUpdate events might belong on a user-profile-updates topic, ProductView events on a product-views topic, and CartAbandonment events on an e-commerce-cart-events topic.

This strategy offers:

  • Simplified Consumer Logic: Each consumer listener can be directly typed to the specific message it expects, eliminating the need for instanceof checks.
  • Clearer Topic Semantics: Topic names can clearly indicate the type of data they contain, improving discoverability and understanding across your organization.
  • Easier Access Control: Kafka ACLs can be applied per topic, allowing more granular permissions for different data types.

Defining Our Specialized Avro Schemas

Instead of a single Message.avsc with a union, we will define separate Message schemas, each specialized for a particular data type. The Person, Product, and Order schemas remain the same.

Person.avsc (as before)

{
  "type": "record",
  "name": "Person",
  "namespace": "com.example.schemas",
  "fields": [
    {"name": "firstName", "type": "string"},
    {"name": "lastName", "type": "string"},
    {"name": "age", "type": ["int", "null"], "default": 0}
  ]
}

Product.avsc (as before)

{
  "type": "record",
  "name": "Product",
  "namespace": "com.example.schemas",
  "fields": [
    {"name": "productId", "type": "string"},
    {"name": "name", "type": "string"},
    {"name": "price", "type": "double"}
  ]
}

Order.avsc (as before)

{
  "type": "record",
  "name": "Order",
  "namespace": "com.example.schemas",
  "fields": [
    {"name": "orderId", "type": "string"},
    {"name": "customerId", "type": "string"},
    {"name": "totalAmount", "type": "double"},
    {"name": "items", "type": {"type": "array", "items": "string"}}
  ]
}

Dedicated Message Wrappers (Message_Person.avsc, etc.)

Now, each Message schema will directly reference its specific data type.

Message_Person.avsc

{
  "type": "record",
  "name": "Message_Person",
  "namespace": "com.example.schemas",
  "fields": [
    {"name": "key", "type": "string"},
    {"name": "user", "type": {"type": "string", "logicalType": "uuid"}},
    {"name": "data", "type": "com.example.schemas.Person"}
  ]
}

Message_Product.avsc (Similar for Product, replacing Person with Product)

{
  "type": "record",
  "name": "Message_Product",
  "namespace": "com.example.schemas",
  "fields": [
    {"name": "key", "type": "string"},
    {"name": "user", "type": {"type": "string", "logicalType": "uuid"}},
    {"name": "data", "type": "com.example.schemas.Product"}
  ]
}

Message_Order.avsc (Similar for Order, replacing Person with Order)

{
  "type": "record",
  "name": "Message_Order",
  "namespace": "com.example.schemas",
  "fields": [
    {"name": "key", "type": "string"},
    {"name": "user", "type": {"type": "string", "logicalType": "uuid"}},
    {"name": "data", "type": "com.example.schemas.Order"}
  ]
}

Automated Class Generation:

Ensure your Avro build plugin is configured to generate Java classes for Person, Product, Order, Message_Person, Message_Product, and Message_Order.

The Spring Boot and Kafka Setup

The core Spring Kafka and Schema Registry setup remains largely unchanged.

Key Dependencies: spring-boot-starter-web, spring-kafka, io.confluent:kafka-avro-serializer, and org.apache.avro:avro.

application.yml Configuration Highlights:

spring:
  kafka:
    producer:
      bootstrap-servers: localhost:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
      properties:
        schema.registry.url: http://localhost:8081 # Your Schema Registry URL
    consumer:
      bootstrap-servers: localhost:9092
      group-id: dedicated-topic-group # Unique consumer group ID
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
      properties:
        schema.registry.url: http://localhost:8081
        specific.avro.reader: true # CRUCIAL for deserializing to specific Avro classes

The specific.avro.reader: true property is still vital for the deserializer to return your generated Java classes.

Producing Messages to Dedicated Topics

On the producer side, you will now send messages to different Kafka topics based on the type of data payload. This means you might have multiple KafkaTemplate instances (or a single one with a dynamic topic name) or distinct producer services.

import com.example.schemas.Message_Person;
import com.example.schemas.Message_Product;
import com.example.schemas.Message_Order;
import com.example.schemas.Person;
import com.example.schemas.Product;
import com.example.schemas.Order;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import java.util.UUID;

@Service
public class DedicatedTopicProducer {

    private final KafkaTemplate<String, Object> kafkaTemplate; // Still Object for generic Avro types
    private static final String PERSON_TOPIC = "person-data-topic";
    private static final String PRODUCT_TOPIC = "product-data-topic";
    private static final String ORDER_TOPIC = "order-data-topic";

    // Constructor injection
    public DedicatedTopicProducer(KafkaTemplate<String, Object> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendPersonMessage(String key, Person personData) {
        Message_Person message = Message_Person.newBuilder()
            .setKey(key)
            .setUser(UUID.randomUUID().toString())
            .setData(personData)
            .build();
        kafkaTemplate.send(PERSON_TOPIC, key, message)
            .whenComplete((result, ex) -> {
                if (ex == null) {
                    System.out.println("Sent Person message to " + PERSON_TOPIC + " at offset: " + result.getRecordMetadata().offset());
                } else {
                    System.err.println("Failed to send Person message: " + ex.getMessage());
                }
            });
    }

    public void sendProductMessage(String key, Product productData) {
        Message_Product message = Message_Product.newBuilder()
            .setKey(key)
            .setUser(UUID.randomUUID().toString())
            .setData(productData)
            .build();
        kafkaTemplate.send(PRODUCT_TOPIC, key, message)
            .whenComplete((result, ex) -> {
                if (ex == null) {
                    System.out.println("Sent Product message to " + PRODUCT_TOPIC + " at offset: " + result.getRecordMetadata().offset());
                } else {
                    System.err.println("Failed to send Product message: " + ex.getMessage());
                }
            });
    }

    public void sendOrderMessage(String key, Order orderData) {
        Message_Order message = Message_Order.newBuilder()
            .setKey(key)
            .setUser(UUID.randomUUID().toString())
            .setData(orderData)
            .build();
        kafkaTemplate.send(ORDER_TOPIC, key, message)
            .whenComplete((result, ex) -> {
                if (ex == null) {
                    System.out.println("Sent Order message to " + ORDER_TOPIC + " at offset: " + result.getRecordMetadata().offset());
                } else {
                    System.err.println("Failed to send Order message: " + ex.getMessage());
                }
            });
    }
}

Note: In a real application, these sendXxxMessage methods would be called from various service layers or REST controllers.

Consuming from Dedicated Topics

The consumer side becomes much cleaner. Each @KafkaListener can subscribe to a specific topic and directly receive the strongly typed Message_Person, Message_Product, or Message_Order object. The instanceof checks are no longer necessary, as the type is known by the topic itself.

import com.example.schemas.Message_Person;
import com.example.schemas.Message_Product;
import com.example.schemas.Message_Order;
import com.example.schemas.Person;
import com.example.schemas.Product;
import com.example.schemas.Order;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class DedicatedTopicConsumer {

    // Listener for Person messages
    @KafkaListener(topics = "${person-data-topic}", groupId = "${spring.kafka.consumer.group-id}")
    public void listenPersonMessages(Message_Person message) {
        System.out.println("Received Person Message Key: " + message.getKey() + ", User: " + message.getUser());
        Person personData = message.getData();
        if (personData != null) {
            System.out.println("  -> Person Details: " + personData.getFirstName() + " " + personData.getLastName() + ", Age: " + personData.getAge());
        }
        System.out.println("---");
    }

    // Listener for Product messages
    @KafkaListener(topics = "${product-data-topic}", groupId = "${spring.kafka.consumer.group-id}")
    public void listenProductMessages(Message_Product message) {
        System.out.println("Received Product Message Key: " + message.getKey() + ", User: " + message.getUser());
        Product productData = message.getData();
        if (productData != null) {
            System.out.println("  -> Product Details: ID=" + productData.getProductId() + ", Name=" + productData.getName() + ", Price=" + productData.getPrice());
        }
        System.out.println("---");
    }

    // Listener for Order messages
    @KafkaListener(topics = "${order-data-topic}", groupId = "${spring.kafka.consumer.group-id}")
    public void listenOrderMessages(Message_Order message) {
        System.out.println("Received Order Message Key: " + message.getKey() + ", User: " + message.getUser());
        Order orderData = message.getData();
        if (orderData != null) {
            System.out.println("  -> Order Details: ID=" + orderData.getOrderId() + ", Customer=" + orderData.getCustomerId() + ", Total=" + orderData.getTotalAmount() + ", Items=" + orderData.getItems());
        }
        System.out.println("---");
    }
}

Note: You would also need to define these topic names in your application.yml for the @Value annotations to resolve:

# ... (existing kafka config)
person-data-topic: person-events
product-data-topic: product-events
order-data-topic: order-events

Navigating Schema Evolution with Dedicated Topics

Schema evolution is still handled by Avro and the Schema Registry, but the rules apply to the specific message schema for each topic (e.g., Message_Person.avsc evolution independently of Message_Product.avsc).

  • Adding Fields: You can add new fields with a default value or as nullable (["type", "null"]) to any of your individual schemas (e.g., Person.avsc or Message_Person.avsc). This remains backward compatible.
  • Removing Fields: As with unions, removing fields is generally not backward compatible without careful planning.
  • Introducing New Data Types: When a new data type is introduced, you simply define its Avro schema, create a new dedicated Message_NewType.avsc wrapper, and set up a new Kafka topic and corresponding producer/consumer. This cleanly isolates the new type without impacting existing topics.

This strategy often simplifies schema evolution management since changes to one data type’s schema do not directly impact the serialization of other, unrelated data types within a union.

Key Architectural Considerations

Choosing between Avro Union Types and dedicated topics depends on your specific use case and architectural preferences:

  • Topic Proliferation: This strategy leads to more Kafka topics. While Kafka handles a large number of topics efficiently, it can increase operational overhead for monitoring, alerting, and security management if not well-managed.
  • Simplicity vs. Flexibility: Dedicated topics simplify consumer logic, as each listener focuses on a single message type. Union types offer extreme flexibility within a single topic but require more complex consumer routing.
  • Data Locality/Ordering: If the order of all polymorphic events is crucial (e.g., a complex business process where Person updates, Product views, and Order creations must be processed in a strict global order), a single topic with union types might be preferred, as Kafka only guarantees order within a partition.
  • Schema Management Discipline: Regardless of the approach, disciplined schema definition, versioning, and compatibility testing with the Schema Registry remain paramount.

Conclusion

For scenarios where different data types naturally align with distinct processing pipelines or require clear logical separation, adopting dedicated Kafka topics with specific Avro schemas for each data type offers a clean, maintainable, and type-safe solution. This strategy, alongside the power of Spring Kafka and Confluent Schema Registry, provides software architects with yet another robust tool to design highly flexible and resilient event-driven systems. By understanding the trade-offs, you can select the approach that best fits your system’s unique requirements.


Discover more from GhostProgrammer - Jeff Miller

Subscribe to get the latest posts sent to your email.

By Jeffery Miller

I am known for being able to quickly decipher difficult problems to assist development teams in producing a solution. I have been called upon to be the Team Lead for multiple large-scale projects. I have a keen interest in learning new technologies, always ready for a new challenge.