Asynchronous Webhook Handling with WebFlux, Kotlin Coroutines And Kafka
Webhooks offer a robust integration with third-party services, facilitating complex and time-consuming processes asynchronously. Yet, they present a high risk of missed events due to high latency or performance issues. In this blog post, I’ll guide you through a reliable and scalable solution for handling webhooks. The toolkit consists of Spring Boot, WebFlux, Kotlin coroutines, and Apache Kafka. I’ll show you how to build a system that efficiently enqueues incoming requests into an internal queue for background processing. So let’s delve into it!
Table of Contents
- Acknowledge and queue up!
- Seeing is believing
- Anatomy of a webhook request
- Asynchronous Processing with WebFlux and Kotlin Coroutines
- Kafka As An Internal Queue
- Transforming A HTTP Request Into An Event
- Don’t Block Threads
- Divide And Conquer
- Summary
Acknowledge and queue up!
Handling webhooks in an efficient and resilient manner requires proactive planning for various failure scenarios. This becomes essential when integrating with major platforms such as Zoom for video conferencing or Stripe for payment automation, as they have strict expectations for response timelines. Timeouts can easily lead to the loss of events, negatively impacting your business. Especially when the number of retry attempts from your integration partners are limited.
An asynchronous approach to handling the incoming webhook requests ensures resilience and significantly reduces the risk of lost events due to timeouts. Fundamentally, incoming webhook requests should be treated in a non-blocking manner. The process involves queuing up the incoming requests in your internal message queue and immediately responding with a success HTTP status code within the 2xx
range, indicating to the sender that the request has been successfully received.
This approach not only ensures system resilience, but also saves time and resources all around, resulting in more efficient webhook processing. This ensures your system remains resilient even if it’s under heavy load or if there are intermittent network issues.
Seeing is believing
So much for the theory. Let’s walk through an example that highlights the points made about sluggish response times and background processing. The source code is available on GitHub.
The example comprises a small number of key components:
- A webhook handler. First and foremost, there is an endpoint that intercepts webhook requests. This is the API you want to communicate to your integration partners. Your job is to ensure that this endpoint remains reachable and responsive, regardless of performance issues or increased load on your system.
- A message queue. Each incoming request is dispatched, almost “as is”, into an internal queue. In our example, we’ll use a designated Kafka topic, along with a message producer and a consumer.
- A request verifier. Your endpoint is publicly accessible which makes you vulnerable to malicious traffic. Each request needs to be verified before we enqueue if for further processing.
- A request processor. At some point, we need to do the actual work and process the request.
Anatomy of a webhook request
It’s important to validate every incoming request for authenticity. In our example, each request is signed, and the signature is transmitted through a custom header, accompanied by a timestamp. In our example we’re using the following headers:
- X-CUSTOM-SIGNATURE. This header holds a HMAC SHA-256 hash of the request’s payload. The hash is created using a secret token and a timestamp of when the request was made.
- X-CUSTOM-REQUEST-TIMESTAMP. The client timestamp as an input into the validation process.
Usually, a webhook request leverages the HTTP POST method, so that the caller can conveniently send a (JSON) payload. Our example is no exception to this rule.
Here is how to make a test call to our webhook enpdoint:
### Send a webhook request
< {%
const signature = crypto.hmac.sha256()
.withTextSecret(request.environment.get("secret"))
.updateWithText(request.body.tryGetSubstituted())
.digest().toHex();
const timestamp = new Date().getTime();
request.variables.set("signature", signature)
request.variables.set("timestamp", timestamp.toString())
%}
POST {{baseUrl}}/webhook
Content-Type: application/json
X-CUSTOM-REQUEST-TIMESTAMP: {{timestamp}}
X-CUSTOM-SIGNATURE: {{signature}}
{
"id": "{{$random.uuid}}",
"message": "Hello world!"
}
If you want to give it a spin, checkout the full example, follow the instructions to start the server and have fun.
Asynchronous Processing with WebFlux and Kotlin Coroutines
Spring WebFlux relies on Servlet non-blocking I/O, which means the capability to handle more traffic with less resources. The only downside of using WebFlux is learning the ropes of using Mono and Flux. That’s when Kotlin’s coroutines step in and remove this additional layer of complexity. If you’re curious to learn more, this post has more details on benefits of using Kotlin’s asynchronous programming.
The implementation relies primarily on these dependencies (Gradle Kotlin syntax):
implementation("org.springframework.boot:spring-boot-starter-webflux")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactive:$kotlinx_version")
Our server exposes an endpoint allowing to receive a webhook request:
import org.springframework.http.ResponseEntity
import org.springframework.http.server.reactive.ServerHttpRequest
import org.springframework.web.bind.annotation.PostMapping
import org.springframework.web.bind.annotation.RequestHeader
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RestController
@RestController
@RequestMapping("/webhook")
class WebhookController {
@PostMapping
suspend fun handleWebhook(
request: ServerHttpRequest,
@RequestHeader("X-CUSTOM-REQUEST-TIMESTAMP") timestampHeader: String,
@RequestHeader("X-CUSTOM-SIGNATURE") sigHeader: String,
): ResponseEntity<Unit> {
// TODO Handle the request in the background
return ResponseEntity.ok().build()
}
}
The endpoint has all the features required to successfully handle an incoming request.
- Scalability. Note it’s a suspension method designed not to block the main thread. For this to work, all underlying calls must be non-blocking as well. More on that later.
- Access to the request object. WebFlux doesn’t rely on traditional blocking Servlet API, but luckily it’s still possible to get the full access to the request.
- Access to custom headers. Spring let’s us cherry pick the headers we are interested in.
- Swift response. In order to respond as quickly as possible, any subsequent processing happens asynchronously. The endpoint instantly responds with HTTP 200 OK.
Kafka As An Internal Queue
Reliably queuing up unprocessed requests is another important piece of the puzzle. Apache Kafka is known for its resilience and reliability. On top of that, Spring provides an excellent integration.
The configuration is quite involved, but in a nutshell we define:
- A Producer which publishes all incoming requests into a designated topic.
- A Consumer which is fed with events in the topic and dispatches them further down the processing pipeline.
- A Topic Listener that intercepts new Kafka events and feeds them into the consumer. Frankly speaking, the listener could be part of the consumer, but I chose to keep them separate for better visibility.
Key configuration:
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.KafkaTemplate
// The producer
@Bean
fun kafkaTemplate(): KafkaTemplate<String, String> {
return KafkaTemplate(producerFactory())
}
// The consumer
@Bean
fun kafkaListenerConsumerFactory():
ConcurrentKafkaListenerContainerFactory<String, String> {
val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
factory.consumerFactory = consumerFactory()
return factory
}
Explore the full config in GitHub.
Transforming A HTTP Request Into An Event
An HTTP request, in its raw form, is too complex to be directly enqueued. It requires an additional transformation to extract only the necessary details. Along with the JSON payload, it’s important to include the signature and timestamp headers, as they are necessary for the validation step.
The JSON payload is represented as an Event. Remember to include all of the fields, skipping some might result into errors or bugs in your application. Refer to your integration partner documentation to fully understand the schema behind the request payload.
data class Event(val id: String, val message: String)
The payload itself isn’t enough. Let’s wrap the event to include the validation headers.
data class EventWrapper(val timestamp: Long, val signature: String, val event: Event)
Our HTTP endpoint captures the incoming requests, serializes the request body as string, captures the headers, transforms all of this into the EventWrapper. Next, using the Kafka producer the wrapped events end up in the respective topic.
Explore the full implementation of the endpoint on GitHub.
Don’t Block Threads
On an important side note, make sure you don’t accidentally block threads when using 3rd party libraries. While WebFlux sets you off the right path of non-blocking I/O. However, not all moving parts at play are non-blocking. Take the Kafka listener as an example:
import org.springframework.kafka.annotation.KafkaListener
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
@KafkaListener(topics = ["\${spring.kafka.topic}"], groupId = "kotlin101")
fun listen(message: String) = runBlocking {
logger.debug("Received message: {}", message)
val eventWrapper = objectMapper.readValue(message, EventWrapper::class.java)
launch { consume(eventWrapper) }
}
private suspend fun consume(eventWrapper: EventWrapper) {
withContext(Dispatchers.IO) {
eventConsumer.consume(eventWrapper)
}
}
The listen method blocks the current thread. There’s no way around it (see runBlocking). Luckily, our consumer supports coroutines and we are therefore able to launch a new coroutine without blocking the thread. Moreover, there’s a neat trick that ensures a proper scoping – withContext(..). I encourage you to read Roman Elizarov’s post that explains in-depth why using a global scope is sub-optimal.
Divide And Conquer
There’s one more detail to unpack on. Here’s the implementation of our event consumer:
override suspend fun consume(eventWrapper: EventWrapper): Unit =
withContext(dispatcher + MDCContext()) {
if (eventValidator.validate(eventWrapper)) {
logger.debug("Consuming event: {}", eventWrapper)
eventHandler.handle(eventWrapper.event)
} else {
logger.debug("Ignoring invalid event: {}", eventWrapper)
}
}
It’s an appropriately scoped suspension call. More importantly though, note how we initially pass the wrapped event, only to work with the actual event’s payload after a successful validation. It might feel like a subtle detail, but paying attention to details like this one allow you to streamline your processing pipeline.
Summary
In this post, we discussed effectively implementing webhooks with a focus on efficiency and scalability. Among the key points to remember is the strict requirement for fast response times. By swiftly acknowledging incoming requests, you can enhance your integration partner’s experience. Adherence to best practices and choosing the right technology results in a robust and future-proof solution.