As a software architect, designing robust, scalable, and adaptable distributed systems is a constant pursuit. When working with Apache Kafka, a common challenge arises: how do you send messages that, while adhering to a generic wrapper, can carry different types of payloads based on the specific event or context? In our previous discussion, we explored using Avro Union Types within a single topic. Now, let’s explore an equally powerful and often simpler alternative: leveraging dedicated Kafka topics for each specific data type.
This approach streamlines consumer logic and can provide clearer topic semantics, making it a strong contender for managing polymorphic data in your event-driven architectures.
The Polymorphism Predicament and Topic Specialization
While Avro Union Types elegantly solve polymorphism within a single message field, sometimes the natural separation of data aligns better with dedicated topics. For instance, ProfileUpdate
events might belong on a user-profile-updates
topic, ProductView
events on a product-views
topic, and CartAbandonment
events on an e-commerce-cart-events
topic.
This strategy offers:
- Simplified Consumer Logic: Each consumer listener can be directly typed to the specific message it expects, eliminating the need for
instanceof
checks. - Clearer Topic Semantics: Topic names can clearly indicate the type of data they contain, improving discoverability and understanding across your organization.
- Easier Access Control: Kafka ACLs can be applied per topic, allowing more granular permissions for different data types.
Defining Our Specialized Avro Schemas
Instead of a single Message.avsc
with a union, we will define separate Message
schemas, each specialized for a particular data type. The Person
, Product
, and Order
schemas remain the same.
Person.avsc
(as before)
{
"type": "record",
"name": "Person",
"namespace": "com.example.schemas",
"fields": [
{"name": "firstName", "type": "string"},
{"name": "lastName", "type": "string"},
{"name": "age", "type": ["int", "null"], "default": 0}
]
}
Product.avsc
(as before)
{
"type": "record",
"name": "Product",
"namespace": "com.example.schemas",
"fields": [
{"name": "productId", "type": "string"},
{"name": "name", "type": "string"},
{"name": "price", "type": "double"}
]
}
Order.avsc
(as before)
{
"type": "record",
"name": "Order",
"namespace": "com.example.schemas",
"fields": [
{"name": "orderId", "type": "string"},
{"name": "customerId", "type": "string"},
{"name": "totalAmount", "type": "double"},
{"name": "items", "type": {"type": "array", "items": "string"}}
]
}
Dedicated Message Wrappers (Message_Person.avsc
, etc.)
Now, each Message
schema will directly reference its specific data type.
Message_Person.avsc
{
"type": "record",
"name": "Message_Person",
"namespace": "com.example.schemas",
"fields": [
{"name": "key", "type": "string"},
{"name": "user", "type": {"type": "string", "logicalType": "uuid"}},
{"name": "data", "type": "com.example.schemas.Person"}
]
}
Message_Product.avsc
(Similar for Product, replacing Person
with Product
)
{
"type": "record",
"name": "Message_Product",
"namespace": "com.example.schemas",
"fields": [
{"name": "key", "type": "string"},
{"name": "user", "type": {"type": "string", "logicalType": "uuid"}},
{"name": "data", "type": "com.example.schemas.Product"}
]
}
Message_Order.avsc
(Similar for Order, replacing Person
with Order
)
{
"type": "record",
"name": "Message_Order",
"namespace": "com.example.schemas",
"fields": [
{"name": "key", "type": "string"},
{"name": "user", "type": {"type": "string", "logicalType": "uuid"}},
{"name": "data", "type": "com.example.schemas.Order"}
]
}
Automated Class Generation:
Ensure your Avro build plugin is configured to generate Java classes for Person, Product, Order, Message_Person, Message_Product, and Message_Order.
The Spring Boot and Kafka Setup
The core Spring Kafka and Schema Registry setup remains largely unchanged.
Key Dependencies: spring-boot-starter-web
, spring-kafka
, io.confluent:kafka-avro-serializer
, and org.apache.avro:avro
.
application.yml
Configuration Highlights:
spring:
kafka:
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
properties:
schema.registry.url: http://localhost:8081 # Your Schema Registry URL
consumer:
bootstrap-servers: localhost:9092
group-id: dedicated-topic-group # Unique consumer group ID
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
properties:
schema.registry.url: http://localhost:8081
specific.avro.reader: true # CRUCIAL for deserializing to specific Avro classes
The specific.avro.reader: true
property is still vital for the deserializer to return your generated Java classes.
Producing Messages to Dedicated Topics
On the producer side, you will now send messages to different Kafka topics based on the type of data payload. This means you might have multiple KafkaTemplate
instances (or a single one with a dynamic topic name) or distinct producer services.
import com.example.schemas.Message_Person;
import com.example.schemas.Message_Product;
import com.example.schemas.Message_Order;
import com.example.schemas.Person;
import com.example.schemas.Product;
import com.example.schemas.Order;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import java.util.UUID;
@Service
public class DedicatedTopicProducer {
private final KafkaTemplate<String, Object> kafkaTemplate; // Still Object for generic Avro types
private static final String PERSON_TOPIC = "person-data-topic";
private static final String PRODUCT_TOPIC = "product-data-topic";
private static final String ORDER_TOPIC = "order-data-topic";
// Constructor injection
public DedicatedTopicProducer(KafkaTemplate<String, Object> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendPersonMessage(String key, Person personData) {
Message_Person message = Message_Person.newBuilder()
.setKey(key)
.setUser(UUID.randomUUID().toString())
.setData(personData)
.build();
kafkaTemplate.send(PERSON_TOPIC, key, message)
.whenComplete((result, ex) -> {
if (ex == null) {
System.out.println("Sent Person message to " + PERSON_TOPIC + " at offset: " + result.getRecordMetadata().offset());
} else {
System.err.println("Failed to send Person message: " + ex.getMessage());
}
});
}
public void sendProductMessage(String key, Product productData) {
Message_Product message = Message_Product.newBuilder()
.setKey(key)
.setUser(UUID.randomUUID().toString())
.setData(productData)
.build();
kafkaTemplate.send(PRODUCT_TOPIC, key, message)
.whenComplete((result, ex) -> {
if (ex == null) {
System.out.println("Sent Product message to " + PRODUCT_TOPIC + " at offset: " + result.getRecordMetadata().offset());
} else {
System.err.println("Failed to send Product message: " + ex.getMessage());
}
});
}
public void sendOrderMessage(String key, Order orderData) {
Message_Order message = Message_Order.newBuilder()
.setKey(key)
.setUser(UUID.randomUUID().toString())
.setData(orderData)
.build();
kafkaTemplate.send(ORDER_TOPIC, key, message)
.whenComplete((result, ex) -> {
if (ex == null) {
System.out.println("Sent Order message to " + ORDER_TOPIC + " at offset: " + result.getRecordMetadata().offset());
} else {
System.err.println("Failed to send Order message: " + ex.getMessage());
}
});
}
}
Note: In a real application, these sendXxxMessage
methods would be called from various service layers or REST controllers.
Consuming from Dedicated Topics
The consumer side becomes much cleaner. Each @KafkaListener
can subscribe to a specific topic and directly receive the strongly typed Message_Person
, Message_Product
, or Message_Order
object. The instanceof
checks are no longer necessary, as the type is known by the topic itself.
import com.example.schemas.Message_Person;
import com.example.schemas.Message_Product;
import com.example.schemas.Message_Order;
import com.example.schemas.Person;
import com.example.schemas.Product;
import com.example.schemas.Order;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class DedicatedTopicConsumer {
// Listener for Person messages
@KafkaListener(topics = "${person-data-topic}", groupId = "${spring.kafka.consumer.group-id}")
public void listenPersonMessages(Message_Person message) {
System.out.println("Received Person Message Key: " + message.getKey() + ", User: " + message.getUser());
Person personData = message.getData();
if (personData != null) {
System.out.println(" -> Person Details: " + personData.getFirstName() + " " + personData.getLastName() + ", Age: " + personData.getAge());
}
System.out.println("---");
}
// Listener for Product messages
@KafkaListener(topics = "${product-data-topic}", groupId = "${spring.kafka.consumer.group-id}")
public void listenProductMessages(Message_Product message) {
System.out.println("Received Product Message Key: " + message.getKey() + ", User: " + message.getUser());
Product productData = message.getData();
if (productData != null) {
System.out.println(" -> Product Details: ID=" + productData.getProductId() + ", Name=" + productData.getName() + ", Price=" + productData.getPrice());
}
System.out.println("---");
}
// Listener for Order messages
@KafkaListener(topics = "${order-data-topic}", groupId = "${spring.kafka.consumer.group-id}")
public void listenOrderMessages(Message_Order message) {
System.out.println("Received Order Message Key: " + message.getKey() + ", User: " + message.getUser());
Order orderData = message.getData();
if (orderData != null) {
System.out.println(" -> Order Details: ID=" + orderData.getOrderId() + ", Customer=" + orderData.getCustomerId() + ", Total=" + orderData.getTotalAmount() + ", Items=" + orderData.getItems());
}
System.out.println("---");
}
}
Note: You would also need to define these topic names in your application.yml
for the @Value
annotations to resolve:
# ... (existing kafka config)
person-data-topic: person-events
product-data-topic: product-events
order-data-topic: order-events
Navigating Schema Evolution with Dedicated Topics
Schema evolution is still handled by Avro and the Schema Registry, but the rules apply to the specific message schema for each topic (e.g., Message_Person.avsc
evolution independently of Message_Product.avsc
).
- Adding Fields: You can add new fields with a default value or as nullable (
["type", "null"]
) to any of your individual schemas (e.g.,Person.avsc
orMessage_Person.avsc
). This remains backward compatible. - Removing Fields: As with unions, removing fields is generally not backward compatible without careful planning.
- Introducing New Data Types: When a new data type is introduced, you simply define its Avro schema, create a new dedicated
Message_NewType.avsc
wrapper, and set up a new Kafka topic and corresponding producer/consumer. This cleanly isolates the new type without impacting existing topics.
This strategy often simplifies schema evolution management since changes to one data type’s schema do not directly impact the serialization of other, unrelated data types within a union.
Key Architectural Considerations
Choosing between Avro Union Types and dedicated topics depends on your specific use case and architectural preferences:
- Topic Proliferation: This strategy leads to more Kafka topics. While Kafka handles a large number of topics efficiently, it can increase operational overhead for monitoring, alerting, and security management if not well-managed.
- Simplicity vs. Flexibility: Dedicated topics simplify consumer logic, as each listener focuses on a single message type. Union types offer extreme flexibility within a single topic but require more complex consumer routing.
- Data Locality/Ordering: If the order of all polymorphic events is crucial (e.g., a complex business process where
Person
updates,Product
views, andOrder
creations must be processed in a strict global order), a single topic with union types might be preferred, as Kafka only guarantees order within a partition. - Schema Management Discipline: Regardless of the approach, disciplined schema definition, versioning, and compatibility testing with the Schema Registry remain paramount.
Conclusion
For scenarios where different data types naturally align with distinct processing pipelines or require clear logical separation, adopting dedicated Kafka topics with specific Avro schemas for each data type offers a clean, maintainable, and type-safe solution. This strategy, alongside the power of Spring Kafka and Confluent Schema Registry, provides software architects with yet another robust tool to design highly flexible and resilient event-driven systems. By understanding the trade-offs, you can select the approach that best fits your system’s unique requirements.
Discover more from GhostProgrammer - Jeff Miller
Subscribe to get the latest posts sent to your email.