sebastiandaschner news
Welcome to my newsletter #2!
I’m sitting in a coffee shop in New York, preparing for the Oracle Code conference tomorrow. After interesting days in Canada I had a fews days of client work in Germany until I crossed the Atlantic again — and now New York for the first time.
What’s new
Integrating Apache Kafka with Java EE
Apache Kafka can be used as a reliable, distributed Event Store and integrated into Java applications using the Java Client API. The API offers an easy integration into (enterprise) applications.
First, let’s have a look how to produce messages.
@ApplicationScoped
public class EventProducer {
private Producer<String, CoffeeEvent> producer;
@Inject
Properties kafkaProperties;
@Inject
Logger logger;
@PostConstruct
private void init() {
producer = new KafkaProducer<>(kafkaProperties);
}
public void publish(CoffeeEvent event) {
ProducerRecord<String, CoffeeEvent> record = new ProducerRecord<>("beans", event);
logger.info("publishing = " + record);
producer.send(record);
producer.flush();
}
@PreDestroy
public void close() {
producer.close();
}
}
The event producer CDI managed bean contains a KafkaProducer<String, CoffeeEvent>
to publish messages serialized from CoffeeEvent
to the topic beans
.
Here the events are sent and flushed in every call, so that the caller of publish(CoffeeEvent)
can rely upon the message being sent.
On application shutdown the producer should be closed not to leak resources — we ensure this by the @PreDestroy
annotation.
Our consumer is wrapped in an EventConsumer
, as we want it to consume messages as long as the application is up.
public class EventConsumer implements Runnable {
private KafkaConsumer<String, CoffeeEvent> consumer;
private final Consumer<CoffeeEvent> eventConsumer;
private final AtomicBoolean closed = new AtomicBoolean();
public EventConsumer(Properties kafkaProperties,
Consumer<CoffeeEvent> eventConsumer, String... topics) {
this.eventConsumer = eventConsumer;
consumer = new KafkaConsumer<>(kafkaProperties);
consumer.subscribe(asList(topics));
}
@Override
public void run() {
try {
while (!closed.get()) {
consume();
}
} catch (WakeupException e) {
// will wakeup for closing
} finally {
consumer.close();
}
}
private void consume() {
ConsumerRecords<String, CoffeeEvent> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<String, CoffeeEvent> record : records) {
eventConsumer.accept(record.value());
}
consumer.commitSync();
}
public void stop() {
closed.set(true);
consumer.wakeup();
}
}
The consumer subscribes to one or more topics and by calling poll(long)
we wait until new messages (or the timeout) happen.
The message will then be handled in the Consumer<CoffeeEvent>
provided by the caller.
The commit functionality of Kafka enables reliable “only once” messages for our consumer group.
Once the consumer confirms the message consumption by calling commitSync
the same message won’t be sent a second time for the configured consumer group.
As we want this consumption to happen indefinitely, we wrap consume
into a while-loop that will be ended when the caller invokes stop
.
To avoid the consumer being stuck in polling for messages, Kafka uses WakeupException
s that may eventually occur.
Now another managed bean integrates this EventConsumer
into our Java EE environment.
@Startup
@Singleton
public class BeanUpdateConsumer {
private EventConsumer eventConsumer;
@Inject
Event<CoffeeEvent> events;
@Resource
ManagedExecutorService mes;
@Inject
Properties kafkaProperties;
@Inject
Logger logger;
@PostConstruct
private void init() {
kafkaProperties.put("group.id", "beans-consumer-" + UUID.randomUUID());
eventConsumer = new EventConsumer(kafkaProperties, ev -> {
logger.info("firing = " + ev);
events.fire(ev);
}, "beans");
mes.execute(eventConsumer);
}
@PreDestroy
public void close() {
eventConsumer.stop();
}
}
This managed bean will fire CDI events that contain the actual message.
Therefore it creates a new EventConsumer
with the corresponding setup and the (JDK 8) Consumer
, that fires the actual event.
As enterprise developers are discouraged to start non-managed threads themselves we use the injected ManagedExecutorService
to do so.
Again, the container will stop
the event consumer in the @PreDestroy
-annotated method.
This example is taken from the scalable coffee shop, discussed in my CQRS video course.
SummaryStatistics JDK classes
Recently, I (per accident) learned about the *SummaryStatistics
classes, shipped in the JDK since 1.8 — DoubleSummaryStatistics
, IntSummaryStatistics
and LongSummaryStatistics
, respectively.
These classes extend number Consumer
s and calculate the minimum, maximum and average of provided values.
They are meant to be used within streams like follows:
double averageAge = people.stream()
.collect(Collectors.summarizingDouble(Person::getAge))
.getAverage();
However, these classes can also be used standalone.
DoubleSummaryStatistics statistics = new DoubleSummaryStatistics();
statistics.accept(3);
statistics.accept(4);
statistics.accept(5);
assertThat(statistics.getAverage(), is(4d));
Just keep in mind that they’re not thread-safe — on purpose, as they were primarily designed for streams and parallel streams provide necessary isolation already.
IntelliJ navigation
This newsletter’s “making-developers-faster” tip is about IntelliJ’s navigation again:
By Ctrl + U
you navigate to the super declaration of a method — it is the counterpart to Ctrl + Alt + B
.
Placing the cursor on toString
of the String
class will, for instance, jump to Object#toString
.
Thanks a lot for reading and greetings from Manhattan, New York!
Did you like the content? You can subscribe to the newsletter for free:
All opinions are my own and do not reflect those of my employer or colleagues.