Skip to content

✨ add timeout option to batched event handler#641

Open
rparcus wants to merge 5 commits intocommanded:masterfrom
Spectral-Finance:rp/add_batch_timeout
Open

✨ add timeout option to batched event handler#641
rparcus wants to merge 5 commits intocommanded:masterfrom
Spectral-Finance:rp/add_batch_timeout

Conversation

@rparcus
Copy link

@rparcus rparcus commented Nov 15, 2025

Adds a batch_timeout option to event handlers.

Comment on lines 881 to 885
if batch_timeout_provided? do
raise ArgumentError,
inspect(module) <>
" :batch_timeout requires :batch_size. Remove the timeout or configure batching."
end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this check makes more sense with the checks up above like "both :concurrency and :batch_size are specified..." at the beginning of this function.

A bonus of moving the checks is that I think you would no longer need the logic surrounding __no_batch_timeout__.

Comment on lines 1193 to 1195
# No batch_size configured - process immediately
is_nil(batch_size) ->
handle_batch(events, %{flush_reason: :immediate}, state)
Copy link
Contributor

@TylerPachal TylerPachal Feb 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this case actually possible? If we are in :batch callback mode doesn't there have to be a batch_size configured upstream from here? Enforced by this code: https://github.com/commanded/commanded/blob/master/lib/commanded/event/handler.ex#L782-L784

If this case is not possible, and there are actually only two possible cases here, I think this code would be a little easier to follow if you moved things up into the handle_info above and had all three code paths directly in the case statement:

try do
  events = Upcast.upcast_event_stream(events, additional_metadata: %{application: application})

  state =
    case {callback, state.batch_timeout) do
      {:event, _} ->
        # Non-batched: process immediately
        Enum.reduce(events, state, &handle_event/2)

      # Batched with no timeout configured
      {:batch, nil} ->
        handle_batch(events, state)

      # Batched with timeout
      {:batch, batch_timeout} ->
        buffer_and_maybe_flush(events, state)
    end
  end

# ...

defp buffer_and_maybe_flush(events, %Handler{} = state) do
  %Handler{
    batch_buffer: buffer,
    batch_size: batch_size,
    batch_timeout: batch_timeout
  } = state

  current_buffer = buffer || []
  new_buffer = current_buffer ++ events
  state = %Handler{state | batch_buffer: new_buffer}

  # Start timer if this is first event in batch
  state = maybe_start_batch_timer(state)

  # Check if we should flush based on size
  if length(new_buffer) >= batch_size do
    state
    |> cancel_batch_timer()
    |> flush_batch_buffer(:size)
  else
    state
  end
end

I think this makes more sense because the buffer_and_maybe_flush function now is clearly only on the codepath with the timer. It was a little confusing about why all :batch paths would go through the buffer + flush mechanism.

} = state

# Upcast events before buffering
events = Upcast.upcast_event_stream(events, additional_metadata: %{application: application})
Copy link
Contributor

@TylerPachal TylerPachal Feb 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is being called here, and in the :event codepath of handle_info({:events, events}, state) here then I think its simpler for it to reside only in the handle_info({:events, events}, state) function (at the top of the try block).

- Move "batch_timeout requires batch_size" validation earlier
- Remove __no_batch_timeout__ sentinel pattern
- Restructure handle_info with clear three-way code paths
- Move upcasting to single location before branching
- Simplify buffer_and_maybe_flush (remove dead code paths)
- Fix unused default argument warning in handle_batch/3
@rparcus rparcus force-pushed the rp/add_batch_timeout branch from 1cadc59 to 66ab766 Compare February 2, 2026 08:36
Copy link
Contributor

@drteeth drteeth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like the idea of this change. Thanks for putting the time in. There are a few things I'm unclear about still, but we'll get there.

{batch_timeout, config} = Keyword.pop(config, :batch_timeout, :infinity)

# Validate batch_size
unless is_nil(batch_size) or (is_integer(batch_size) and batch_size > 0) do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unless was deprecated in Elixir 1.18, can you refactor this to use if please?

end

# Validate batch_timeout
unless batch_timeout == :infinity or (is_integer(batch_timeout) and batch_timeout > 0) do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unless was deprecated in Elixir 1.18, can you refactor this to use if please?


# Clear buffer and timer BEFORE processing to prevent race condition
# If timer fires during batch processing, it will see empty buffer
state = %Handler{state | batch_buffer: [], batch_timer_ref: nil}
Copy link
Contributor

@drteeth drteeth Feb 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting batch_timer_ref to nil here can't prevent a race condition can it?. You are cancelling the timer, draining any flush messages that made it through, and setting the timer ref to nil in cancel_batch_timer/1 before this call.

{:batch, _timeout} ->
# Batched with timeout: buffer events and flush on size or timeout
buffer_and_maybe_flush(events, state)
end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love this.


defp drain_flush_batch_timeout_message do
receive do
:flush_batch_timeout -> :ok
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the next message isn't a flush messge? I think we'll crash here no? And if you do manage to change the pattern match, then we've taken the message out of the mailbox and we have to process it it, whatever it is?

Am I wrong?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The after 0 clause below makes this a "non-blocking selective receive", meaning that it only takes :flush_batch_timeout from the mailbox if one is already there, otherwise returns immediately.

It never blocks and never consumes other messages. This pattern is used for draining a specific message after cancelling a timer (Process.cancel_timer/1 returns false when the timer already fired, meaning the message may be in the mailbox).

@TylerPachal
Copy link
Contributor

@rparcus Could you explain a little bit what behavior you are trying to add/change with this PR?

From what I can see, if I set batch_size: 100 and dispatch a command, my event handler sees that event immediately (i.e. it does not wait for another 99 events to "fill" the batch).

Are you seeing different behavior than that?

batch_size controls the EventStore subscription's in-flight buffer,
not handler-level accumulation. batch_timeout adds opt-in time-based
buffering at the handler level for use cases like batching projection
writes during steady-state operation.

Made-with: Cursor
@rparcus
Copy link
Author

rparcus commented Feb 27, 2026

@rparcus Could you explain a little bit what behavior you are trying to add/change with this PR?

From what I can see, if I set batch_size: 100 and dispatch a command, my event handler sees that event immediately (i.e. it does not wait for another 99 events to "fill" the batch).

Are you seeing different behavior than that?

You're right, batch_size alone does not cause accumulation/holdup of events. I tested this again, against the real PostgreSQL EventStore: with batch_size: 100, dispatching a single command results in handle_batch/1 being called immediately with that one event. batch_size sets the subscription's buffer_size (in-flight window) only, so real batches only form during catch-up replay or under back-pressure. Found the old issue describing this actually, handle_batch in most cases is nothing more than a normal handler wrapped in a list as it "does its thing" only during catchup.

With this change, the batch_timeout option adds something that doesn't exist in commanded today: time-based buffering at the handler level. When configured, events are accumulated in the handler process and flushed when either batch_size events have collected OR batch_timeout milliseconds have elapsed. This enables use cases like batching projection writes during steady-state live operation, not just during catch-up. I have updated the module docs to make this distinction more evident.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants