INTERNALS
4ARCHITECTURE
Trafilo processes 10,000 UDP datagrams per second. A single global lock around state modifications destroys throughput. Trafilo decouples ingestion from execution and separates state by key to eliminate contention. The architecture consists of a network listener, a bounded queue, a thread pool, and a sharded hashmap.
Ingestion and Backpressure
A dedicated listener thread executes listener_loop. It invokes recvfrom to capture datagrams. It pushes the raw buffer onto the bounded_queue_t via bq_push. The queue enforces backpressure. When the worker pool saturates, the queue reaches capacity. The bq_push call blocks. The listener thread halts and the kernel drops incoming UDP packets. This mechanism prevents memory exhaustion under extreme load.
/* Excerpt from socket.c */
while (!listener->done) {
ssize_t num_bytes = recvfrom(listener->sockfd, buf, listener->max_line - 1, 0, NULL, NULL);
if (num_bytes > 0) {
buf[num_bytes] = '\0';
bq_push(listener->bounded_q, buf);
buf = malloc(listener->max_line); /* Allocate next buffer */
}
} Event Dispatch
The dispatcher_t manages a pool of worker threads. Each thread executes worker_loop. A thread invokes bq_pop to acquire a raw buffer. It invokes the user-provided parse_fn to instantiate an event_t. Parsing executes concurrently across the pool.
State Synchronization
The framework shards state by key. The hashmap_t manages an array of buckets. Each bucket contains a dedicated POSIX mutex. A worker thread invokes hashmap_find_or_create. This function hashes the event key, locates the target bucket, and acquires its mutex. The function returns the locked bucket.
Operations on distinct keys execute in parallel. Operations on identical keys serialize. The caller modifies user_state without implementing internal locks. The framework invokes the user-provided handle_fn while holding the bucket lock. The worker invokes hashmap_unlock_bucket upon completion.
/* Excerpt from dispatcher.c */
bucket_node *b = hashmap_find_or_create(d->hash_m, ev->key);
/* The thread holds the bucket lock */
if (!b->state && d->config->state_init) {
b->state = d->config->state_init(ev->key);
}
d->config->handle(ev, b->state);
hashmap_unlock_bucket(d->hash_m, ev->key); Temporal Aggregation
The bucket_node stores a sliding_window_t. The window maintains a linked list of event timestamps. The framework invokes sliding_window_add to append the current event. The function traverses the list and evicts timestamps older than window_size_ms. Eviction executes under the bucket lock. This design eliminates the necessity for a secondary garbage-collection thread.
Sink Output
The framework evaluates slide_interval_ms against the current clock during the bucket update. If the interval has elapsed, the worker invokes the user-provided sink_fn. The callback receives a window_result_t containing the boundary timestamps, the event count, and the associated user_state.