refactor: move snapshot loop and initial prompt logic into PTYConversation#179
refactor: move snapshot loop and initial prompt logic into PTYConversation#179
Conversation
…nversation
Changes:
- Add InitialPrompt []MessagePart and OnSnapshot callback to PTYConversationConfig
- Remove initialPrompt string parameter from NewPTY function (now reads from config)
- Add initialPromptReady chan struct{} field for signaling readiness
- Add sendLocked() helper (same as Send but without lock)
- Add messagesLocked() helper that returns a copy of messages
- Update statusLocked() to return ConversationStatusChanging while initial prompt
is pending, fixing the status flip-flop issue (changing → stable → changing)
- Update Start() to use select with:
- Ticker for snapshots (calling OnSnapshot callback if set)
- initialPromptReady channel to send initial prompt when ready
This consolidates initial prompt logic inside PTYConversation.Start() instead
of requiring the server to manipulate internal fields directly. The server.go
changes to use this new API will be done in a separate commit.
…itialPrompt handling - Update NewServer to format InitialPrompt into []MessagePart via FormatMessage - Pass InitialPrompt and OnSnapshot callback in PTYConversationConfig - OnSnapshot callback emits status/messages/screen changes to EventEmitter - Remove initialPrompt string parameter from NewPTY call (now in config) - Simplify StartSnapshotLoop to just call s.conversation.Start(ctx) - Remove redundant goroutine, ticker, and initial prompt send logic The snapshot loop and initial prompt handling are now internalized in PTYConversation.Start(), which calls the OnSnapshot callback after each snapshot.
- Update all NewPTY calls to use new signature (config only, no initialPrompt param) - For tests needing initial prompt, use InitialPrompt config field with []MessagePart - Update tests to expect status 'changing' until InitialPromptSent is true (new behavior prevents status flip 'changing' -> 'stable' -> 'changing') - Remove direct manipulation of internal fields where possible, use Status() API - Keep minimal internal field access (InitialPromptSent) where needed for testing post-send behavior without running the Start() goroutine
sendLocked() was failing with ErrMessageValidationChanging because statusLocked() returns ConversationStatusChanging when InitialPromptSent is false. This is a chicken-and-egg problem: we need to send the initial prompt before we can set InitialPromptSent=true. Solution: Add skipStatusCheck parameter to sendLocked() to bypass the status check for the initial prompt case. The Start() goroutine passes true to skip the check, while external Send() calls pass false to preserve the existing validation behavior.
Remove the StartSnapshotLoop method which only delegated to s.conversation.Start(ctx), and add a Conversation() accessor method instead. This allows callers to invoke Start() directly on the conversation. Part of refactoring to move snapshot loop logic inside PTYConversation.
…versation Remove exported InitialPromptSent and ReadyForInitialPrompt boolean fields from PTYConversation struct: - InitialPromptSent → initialPromptSent (unexported) - ReadyForInitialPrompt boolean removed entirely The initialPromptReady channel now handles readiness signaling entirely. When the agent is ready (detected via cfg.ReadyForInitialPrompt callback), the channel is closed and set to nil to prevent double-close. This simplifies statusLocked() by removing the intermediate boolean state and using the channel's nil state to track whether readiness was already signaled. Note: Tests will need updates to verify behavior through Status() API rather than setting internal fields directly.
…napshotLoop The StartSnapshotLoop method was removed from Server in favor of exposing a Conversation() accessor that returns the PTYConversation, which has its own Start(ctx) method.
The InitialPromptSent field was unexported as initialPromptSent. Rework the test to verify the same behavior (normal status logic applies after initial prompt handling) by configuring no InitialPrompt instead of manually setting the field. When no InitialPrompt is configured, initialPromptSent defaults to true, which achieves the same testing outcome through the public API.
…rsation() accessor - Move the s.conversation.Start(ctx) call into NewServer(), just before the return statement, so the conversation starts immediately when the server is created. - Add nil check for config.Process to handle test scenarios where no process is configured. - Remove the Conversation() accessor method from Server since it is no longer needed externally. - Remove the external srv.Conversation().Start(ctx) call from cmd/server/server.go.
Remove the skipStatusCheck parameter from sendLocked and move the status check into Send() where it belongs. This simplifies the code since: - Start() always skipped the check (for initial prompt) - Send() always respected cfg.SkipSendMessageStatusCheck Now the check happens in Send() before calling sendLocked, and the initial prompt in Start() naturally bypasses it by calling sendLocked directly.
|
✅ Preview binaries are ready! To test with modules: |
- Expand comment for process nil check to explain: - Process is nil only for --print-openapi mode - Process is already running (termexec.StartProcess blocks) - Agent readiness is handled asynchronously via ReadyForInitialPrompt - Add comment for OnSnapshot callback explaining: - Callback pattern keeps screentracker decoupled from httpapi - Preserves clean package boundaries and avoids import cycles
Change Server.conversation from *st.PTYConversation to st.Conversation to program against the interface abstraction rather than the concrete type. This ensures the Conversation interface is a complete abstraction.
Use config.AgentType directly in the OnSnapshot closure instead of creating a redundant local variable.
Remove unnecessary channel creation for nil initialPromptReady. In Go's select statement, nil channel cases are simply skipped (never selected), so we don't need to create a new channel that blocks forever - the nil channel already has the desired behavior. Addresses PR review feedback.
| // OnSnapshot uses a callback rather than passing the emitter directly | ||
| // to keep the screentracker package decoupled from httpapi concerns. | ||
| // This preserves clean package boundaries and avoids import cycles. | ||
| OnSnapshot: func(status st.ConversationStatus, messages []st.ConversationMessage, screen string) { | ||
| emitter.UpdateStatusAndEmitChanges(status, agentType) | ||
| emitter.UpdateMessagesAndEmitChanges(messages) | ||
| emitter.UpdateScreenAndEmitChanges(screen) | ||
| }, |
There was a problem hiding this comment.
Self-review: Could alternatively extract an Emitter interface and pass this in.
| mu sync.RWMutex | ||
| logger *slog.Logger | ||
| conversation *st.PTYConversation | ||
| conversation st.Conversation |
There was a problem hiding this comment.
self-review: this is the whole point of this PR
There was a problem hiding this comment.
Pull request overview
This PR refactors the PTYConversation implementation to fully encapsulate snapshot polling and initial prompt handling, removing direct field manipulation from the HTTP server layer and fixing a status transition bug.
Changes:
- Moved snapshot loop from
Server.StartSnapshotLoop()intoPTYConversation.Start() - Initial prompt configuration and sending logic now managed entirely within PTYConversation using a channel-based signaling mechanism
- Status logic updated to stay "changing" until initial prompt is sent, preventing the "changing" → "stable" → "changing" flip
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
| lib/screentracker/pty_conversation.go | Added OnSnapshot callback, InitialPrompt config field, channel-based initial prompt signaling in Start(), and updated status logic to prevent premature "stable" transitions |
| lib/screentracker/pty_conversation_test.go | Updated tests to reflect new API (InitialPrompt moved to config) and new behavior (status stays "changing" until initial prompt sent) |
| lib/httpapi/server.go | Removed StartSnapshotLoop, integrated snapshot loop via conversation.Start(), added OnSnapshot callback to update emitter, changed conversation field from concrete type to interface |
| cmd/server/server.go | Removed StartSnapshotLoop call (now handled in NewServer) |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
The public InitialPrompt string field is no longer used after refactoring. The initial prompt is now stored in cfg.InitialPrompt (as []MessagePart) and managed internally. Removing this field avoids confusion and maintains clean encapsulation. Addresses PR review feedback.
mafredri
left a comment
There was a problem hiding this comment.
Nice work. I think the changes look good in general, although internal state management could use a bit of additional refactoring.
|
Thinking about the current implementation some more, I wonder why we even need initial prompt handling? Wouldn't it be nicer to just have a queue of messages where the initial prompt goes in first? I'd assume this would simplify the logic as you just wait until stable to send the message? |
Yeah, that's a nice idea actually. |
would help solve this too: #21 |
Replace initialPromptSent bool and initialPromptReady chan with:
- outboundQueue chan []MessagePart (buffered, size 1)
- agentReady chan struct{} (nil if no initial prompt)
The initial prompt is now enqueued in NewPTY() and sent via the
queue in Start(). This makes the code more extensible for future
queued message handling.
Start() now uses a two-phase loop:
- Phase 1: Wait for agentReady while still processing ticker snapshots
- Phase 2: Normal loop with ticker + outboundQueue select cases
No external API behavior changes.
Add no-op default functions for OnSnapshot and ReadyForInitialPrompt in NewPTY() constructor instead of checking for nil throughout the code. This removes: - Two nil checks for OnSnapshot in Start() phase 1 and phase 2 loops - One nil check for ReadyForInitialPrompt in statusLocked()
- Replace time.Now() with fixed time.Date(2025, 1, 1, ...) - Delete msgNoTime type and stripTimes function - Rewrite assertMessages to compare full ConversationMessage fields with flexible time checking (zero = assert non-zero) - Update all call sites to use []st.ConversationMessage with named fields
Extract a shared driveClockUntil helper that advances the mock clock one event at a time until a condition is met. Use it in both sendWithClockDrive and the initial prompt lifecycle test, replacing the ad-hoc 500-iteration loop. Also change sendWithClockDrive to return nothing (instead of error) since all call sites were wrapping it with require.NoError. The error check now happens inside via require.NoError.
| // Clear spinner on cancellation | ||
| fmt.Print("\r" + strings.Repeat(" ", 20) + "\r") | ||
| return | ||
| func runSpinner(ctx context.Context) <-chan struct{} { |
There was a problem hiding this comment.
review: turns out there was a race condition in our e2e echo agent 😂
| if err := json.NewEncoder(&sb).Encode(evt); err != nil { | ||
| t.Logf("Failed to encode event: %v", err) | ||
| } | ||
| t.Logf("Got event: %s", sb.String()) |
There was a problem hiding this comment.
review: improved logging
| // Re-apply the pre-send agent message from the screen captured before | ||
| // the write. While the lock was released during writeStabilize, the | ||
| // snapshot loop continued taking snapshots and calling | ||
| // updateLastAgentMessageLocked with whatever was on screen at each | ||
| // tick (typically echoed user input or intermediate terminal state). | ||
| // Those updates corrupt the agent message for this turn. Restoring it | ||
| // here ensures the conversation history is correct. The next line sets | ||
| // screenBeforeLastUserMessage so the *next* agent message will be | ||
| // diffed relative to the pre-send screen. | ||
| c.updateLastAgentMessageLocked(screenBeforeMessage, now) |
There was a problem hiding this comment.
review: I'm not really a fan of this. I'd much prefer to have the snapshot loop be paused while we're writing but this seems to cause deadlocks in tests.
| // Handle initial prompt readiness: report "changing" until the queue is drained | ||
| // to avoid the status flipping "changing" -> "stable" -> "changing" | ||
| if len(c.outboundQueue) > 0 { |
There was a problem hiding this comment.
review: this is a behavioural change
| func assertMessages(t *testing.T, c *st.PTYConversation, expected []st.ConversationMessage) { | ||
| t.Helper() | ||
| actual := c.Messages() | ||
| for i := range actual { | ||
| require.False(t, actual[i].Time.IsZero(), "message %d Time should be non-zero", i) | ||
| actual[i].Time = time.Time{} | ||
| } | ||
| require.Equal(t, expected, actual) | ||
| } |
There was a problem hiding this comment.
review: this is so we don't need to care about the times, just that they are non-zero.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 7 out of 7 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| return nil | ||
| } | ||
|
|
||
| interval = min(interval*2, maxInterval) |
There was a problem hiding this comment.
sleepTimer.Reset(interval) can be called while sleepTimer.C still has an unread tick (e.g., when InitialWait is false or condition() takes longer than the current interval). With Go-style timers this can cause the next wait to return immediately and can also starve the timeout case (because sleepTimer.C stays ready), potentially preventing WaitTimedOut from ever being observed. Consider stopping + draining the timer channel before Reset when needed (or recreating the timer) to ensure correct backoff/timeout behavior.
| interval = min(interval*2, maxInterval) | |
| interval = min(interval*2, maxInterval) | |
| if !sleepTimer.Stop() { | |
| select { | |
| case <-sleepTimer.C: | |
| default: | |
| } | |
| } |
| errCh := make(chan error, 1) | ||
| c.outboundQueue <- outboundMessage{parts: messageParts, errCh: errCh} | ||
| return <-errCh | ||
| } |
There was a problem hiding this comment.
Send enqueues into outboundQueue and then blocks waiting for <-errCh. If Start() was never called (or if the Start() context is canceled before the send loop drains the queue), this will block forever. Consider either (a) making Send non-blocking with a clear error when the conversation isn't running, or (b) ensuring shutdown drains outboundQueue and replies to all pending errChs with ctx.Err() (and/or selecting on a stored internal context).
| for { | ||
| select { | ||
| case <-ctx.Done(): | ||
| return | ||
| case <-ticker.C: | ||
| // It's important that we hold the lock while reading the screen. | ||
| // There's a race condition that occurs without it: | ||
| // 1. The screen is read | ||
| // 2. Independently, Send is called and takes the lock. | ||
| // 3. snapshotLocked is called and waits on the lock. | ||
| // 4. Send modifies the terminal state, releases the lock | ||
| // 5. snapshotLocked adds a snapshot from a stale screen | ||
| c.lock.Lock() | ||
| screen := c.cfg.AgentIO.ReadScreen() | ||
| c.snapshotLocked(screen) | ||
| c.lock.Unlock() | ||
| case <-c.stableSignal: | ||
| select { | ||
| case <-ctx.Done(): | ||
| return | ||
| case msg := <-c.outboundQueue: | ||
| err := c.sendMessage(ctx, msg.parts...) | ||
| if msg.errCh != nil { | ||
| msg.errCh <- err | ||
| } | ||
| default: |
There was a problem hiding this comment.
The send-loop exits immediately on ctx.Done() without notifying any queued/pending outboundMessage.errCh waiters. If cancellation happens while a request is waiting inside Send(), that goroutine can hang indefinitely. Consider draining outboundQueue on shutdown and sending ctx.Err() into each non-nil errCh (or closing a dedicated 'stopped' channel that Send selects on).
| // Signal send loop if agent is ready and queue has items. | ||
| // We check readiness independently of statusLocked() because | ||
| // statusLocked() returns "changing" when queue has items. | ||
| isReady := false | ||
| select { | ||
| case <-c.initialPromptReady: | ||
| isReady = true | ||
| default: | ||
| } | ||
| if isReady && len(c.outboundQueue) > 0 && c.isScreenStableLocked() { | ||
| select { | ||
| case c.stableSignal <- struct{}{}: | ||
| default: | ||
| // Signal already pending | ||
| } | ||
| } |
There was a problem hiding this comment.
Send() only checks statusLocked() == stable before enqueueing, but the snapshot loop only signals the send loop when initialPromptReady has been closed (isReady == true). This mismatch means Send() can accept a message and then block waiting for readiness, while Status() may still report stable (since readiness is not part of statusLocked()). Consider making readiness part of statusLocked()/Send() validation (or relaxing the isReady gate when it should not apply) so callers never block unexpectedly when Status() is stable.
| c.lock.Lock() | ||
| if c.statusLocked() != ConversationStatusStable { | ||
| c.lock.Unlock() | ||
| return ErrMessageValidationChanging | ||
| } | ||
| c.lock.Unlock() | ||
|
|
||
| errCh := make(chan error, 1) | ||
| c.outboundQueue <- outboundMessage{parts: messageParts, errCh: errCh} | ||
| return <-errCh |
There was a problem hiding this comment.
Two concurrent callers can both pass the statusLocked() == stable check and then race to send into outboundQueue (buffer size 1). The second goroutine will block on c.outboundQueue <- ..., potentially tying up an HTTP handler thread. Consider serializing Send calls (e.g., with a dedicated mutex) or using a larger queue + non-blocking enqueue that returns a clear error when already sending.
mafredri
left a comment
There was a problem hiding this comment.
A nit and a suggestion, but otherwise looks good. I don't need to re-review. 👍🏻
|
Oh, one thing I forgot to verify but worth taking a look: Is there a chance that queue is populated with entries out-of-order? It might make sense to use a slice backing for the queue rather than a channel because sends on that channel are not ordered if the channel is full. |
Add comments explaining why close(msg.errCh) is called at both sites. Add sendingMessage flag to prevent statusLocked() from returning 'stable' while the send loop is processing a dequeued message outside the lock.
I don't think that's an issue right now, as we hold a lock while we're processing. |
This PR refactors the snapshot loop and initial prompt handling to be fully encapsulated within
PTYConversation, removing direct field manipulation from the HTTP server layer.StartSnapshotLoopintoPTYConversation.Start()PTYConversation.Start()Also:
./e2e./e2e./lib/screentracker🤖 Created using Mux (Opus 4.5).