-
Notifications
You must be signed in to change notification settings - Fork 32
Description
Overview
A design document for adding support for message updates, deletes, and appends to ably-go SDK, following the TypeScript implementation pattern while maintaining backwards compatibility.
Goals
- Enable message update, delete, and append operations
- Support both REST and Realtime channels
- Provide async variants for non-blocking AI token streaming
- Match TypeScript API behavior and wire format
- Zero breaking changes to existing API
Non-Goals
- Message annotations
- Annotation summaries
Architecture
┌─────────────────────────────────────────────────────────┐
│ Public API Layer │
│ - RESTChannel: PublishWithResult, UpdateMessage, etc. │
│ - RealtimeChannel: Same + Async variants │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ Message Structure Layer │
│ - Message: Add Serial, Action, Version fields │
│ - PublishResult: New struct with Serial │
│ - UpdateResult: New struct with VersionSerial │
│ - MessageVersion: Version metadata │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ Protocol/Wire Layer │
│ - Encode/decode new fields │
│ - Handle action enum (create/update/delete/append) │
│ - Parse server responses for serials │
└─────────────────────────────────────────────────────────┘
Data Structures
Message Extensions
type Message struct {
// Existing fields (unchanged)
ID string
ClientID string
ConnectionID string
Name string
Data interface{}
Encoding string
Timestamp int64
Extras map[string]interface{}
// NEW: Permanent message identifier
Serial string `json:"serial,omitempty" codec:"serial,omitempty"`
// NEW: Action type
Action MessageAction `json:"action,omitempty" codec:"action,omitempty"`
// NEW: Version metadata
Version *MessageVersion `json:"version,omitempty" codec:"version,omitempty"`
}New Types
// MessageAction enum
type MessageAction string
const (
MessageActionCreate MessageAction = "message.create"
MessageActionUpdate MessageAction = "message.update"
MessageActionDelete MessageAction = "message.delete"
MessageActionAppend MessageAction = "message.append"
)
// MessageVersion - complete version info (some fields server-generated)
type MessageVersion struct {
Serial string `json:"serial,omitempty"` // Server-generated
Timestamp int64 `json:"timestamp,omitempty"` // Server-generated
ClientID string `json:"clientId,omitempty"` // User or auto-populated
Description string `json:"description,omitempty"` // User-provided
Metadata map[string]string `json:"metadata,omitempty"` // User-provided
}
// PublishResult - returned from PublishWithResult methods
type PublishResult struct {
Serial string // May be empty if message discarded by conflation
}
// UpdateResult - returned from Update/Delete/Append methods
type UpdateResult struct {
VersionSerial string // Serial of new version, may be empty if superseded
}
// UpdateOption - functional option for operations
type UpdateOption func(*updateOptions)
type updateOptions struct {
version *MessageVersion // unexported, built from options
}Functional Options
// UpdateWithDescription sets operation description
func UpdateWithDescription(description string) UpdateOption {
return func(opts *updateOptions) {
if opts.version == nil {
opts.version = &MessageVersion{}
}
opts.version.Description = description
}
}
// UpdateWithClientID sets operation client ID
func UpdateWithClientID(clientID string) UpdateOption {
return func(opts *updateOptions) {
if opts.version == nil {
opts.version = &MessageVersion{}
}
opts.version.ClientID = clientID
}
}
// UpdateWithMetadata sets operation metadata
func UpdateWithMetadata(metadata map[string]string) UpdateOption {
return func(opts *updateOptions) {
if opts.version == nil {
opts.version = &MessageVersion{}
}
opts.version.Metadata = metadata
}
}API Methods
REST Channel
// Existing methods - unchanged
func (c *RESTChannel) Publish(ctx, name, data, ...opts) error
func (c *RESTChannel) PublishMultiple(ctx, msgs, ...opts) error
// NEW: Publish with results
func (c *RESTChannel) PublishWithResult(
ctx context.Context,
name string,
data interface{},
options ...PublishMultipleOption,
) (*PublishResult, error)
func (c *RESTChannel) PublishMultipleWithResult(
ctx context.Context,
messages []*Message,
options ...PublishMultipleOption,
) ([]PublishResult, error)
// NEW: Operations
func (c *RESTChannel) UpdateMessage(
ctx context.Context,
msg *Message,
options ...UpdateOption,
) (*UpdateResult, error)
func (c *RESTChannel) DeleteMessage(
ctx context.Context,
msg *Message,
options ...UpdateOption,
) (*UpdateResult, error)
func (c *RESTChannel) AppendMessage(
ctx context.Context,
msg *Message,
options ...UpdateOption,
) (*UpdateResult, error)
// NEW: Query methods
func (c *RESTChannel) GetMessage(
ctx context.Context,
serial string,
) (*Message, error)
func (c *RESTChannel) GetMessageVersions(
ctx context.Context,
serial string,
params map[string]string,
) (*PaginatedResult, error)Realtime Channel
// Existing methods - unchanged
func (c *RealtimeChannel) Publish(ctx, name, data) error
func (c *RealtimeChannel) PublishAsync(name, data, onAck func(error)) error
func (c *RealtimeChannel) PublishMultiple(ctx, msgs) error
func (c *RealtimeChannel) PublishMultipleAsync(msgs, onAck func(error)) error
// NEW: Blocking publish with results
func (c *RealtimeChannel) PublishWithResult(
ctx context.Context,
name string,
data interface{},
) (*PublishResult, error)
func (c *RealtimeChannel) PublishMultipleWithResult(
ctx context.Context,
messages []*Message,
) ([]PublishResult, error)
// NEW: Async publish with results
func (c *RealtimeChannel) PublishWithResultAsync(
name string,
data interface{},
onAck func(*PublishResult, error),
) error
func (c *RealtimeChannel) PublishMultipleWithResultAsync(
messages []*Message,
onAck func([]PublishResult, error),
) error
// NEW: Blocking operations
func (c *RealtimeChannel) UpdateMessage(
ctx context.Context,
msg *Message,
options ...UpdateOption,
) (*UpdateResult, error)
func (c *RealtimeChannel) DeleteMessage(
ctx context.Context,
msg *Message,
options ...UpdateOption,
) (*UpdateResult, error)
func (c *RealtimeChannel) AppendMessage(
ctx context.Context,
msg *Message,
options ...UpdateOption,
) (*UpdateResult, error)
// NEW: Async operations (critical for AI streaming)
func (c *RealtimeChannel) UpdateMessageAsync(
msg *Message,
onAck func(*UpdateResult, error),
options ...UpdateOption,
) error
func (c *RealtimeChannel) DeleteMessageAsync(
msg *Message,
onAck func(*UpdateResult, error),
options ...UpdateOption,
) error
func (c *RealtimeChannel) AppendMessageAsync(
msg *Message,
onAck func(*UpdateResult, error),
options ...UpdateOption,
) error
// NEW: Query methods (delegate to REST)
func (c *RealtimeChannel) GetMessage(
ctx context.Context,
serial string,
) (*Message, error)
func (c *RealtimeChannel) GetMessageVersions(
ctx context.Context,
serial string,
params map[string]string,
) (*PaginatedResult, error)Differences from TypeScript API
While we aim for API parity with TypeScript, several Go-idiomatic design decisions differ:
1. No MessageOperation Type
TypeScript: Has separate MessageOperation type passed as parameter:
updateMessage(message: Message, operation?: MessageOperation)Go: Uses functional options that build MessageVersion directly:
UpdateMessage(msg *Message, options ...UpdateOption)
// Called with: UpdateWithDescription("reason"), UpdateWithMetadata(...)Rationale: Go's functional option pattern is more idiomatic and extensible than requiring users to construct intermediate structs.
2. Query Method Parameters
TypeScript: getMessage and getMessageVersions accept string | Message union type:
getMessage(serialOrMessage: string | Message): Promise<Message>Go: Only accepts string serial:
GetMessage(ctx context.Context, serial string) (*Message, error)Rationale: The Message-accepting variant was a convenience feature in TypeScript. In Go, users can easily extract the serial themselves (msg.Serial), making the union type unnecessary complexity.
3. PublishResult Structure
TypeScript: Returns single result with array of serials:
interface PublishResult {
serials: (string | null)[]
}Go: Return type matches method cardinality:
PublishWithResult() (*PublishResult, error) // Single result
PublishMultipleWithResult() ([]PublishResult, error) // Array of resultsRationale: More idiomatic Go - each result corresponds to one message by index, avoiding the need to extract from an array field.
4. UpdateResult Naming
TypeScript: UpdateDeleteResult
Go: UpdateResult
Rationale: The result type is used by update, delete, AND append operations, so the name should reflect all three.
5. Operation Metadata as Options
TypeScript: Operation metadata passed as direct parameter:
updateMessage(message, operation?: MessageOperation)Go: Operation metadata passed via functional options:
UpdateMessage(msg, UpdateWithDescription("reason"))Rationale: Consistent with Go conventions and existing SDK patterns. More flexible for optional parameters.
Usage Examples
Basic Update
// Publish and get serial
result, err := channel.PublishWithResult(ctx, "event", "original data")
serial := result.Serial
// Update with metadata
err = channel.UpdateMessage(ctx, &ably.Message{
Serial: serial,
Data: "updated data",
}, ably.UpdateWithDescription("Fixed typo"))AI Token Streaming (Non-blocking Appends)
// Publish initial message
result, _ := channel.PublishWithResult(ctx, "", "Hello")
// Rapid non-blocking appends
channel.AppendMessageAsync(&ably.Message{Serial: result.Serial, Data: " world"}, nil)
channel.AppendMessageAsync(&ably.Message{Serial: result.Serial, Data: "!"}, nil)Delete with Reason
err = channel.DeleteMessage(ctx, &ably.Message{Serial: serial},
ably.UpdateWithDescription("User requested deletion"),
ably.UpdateWithMetadata(map[string]string{"reason": "spam"}))Implementation Details
Protocol Version Strategy
Message operations (update, delete, append) require Ably protocol version 5 to enable the server to return message serials and version information. However, protocol v5 also includes changes to the stats response format that would require breaking API changes in the Go SDK.
Solution: The SDK uses protocol v5 by default for all requests, but explicitly overrides this to use protocol v2 by setting the X-Ably-Version HTTP header to 2 for stats requests. This provides:
- Message operations: Use protocol v5 to get serials and version data
- Stats API: Use protocol v2 to maintain compatibility with existing nested
Statsstructure - Backwards compatibility: Zero breaking changes to existing SDK users
Stats API migration to the v3+ flattened format is planned for ably-go v2.0 when breaking changes can be introduced.
Message Action Encoding
TypeScript uses numeric codes for actions on the wire:
var messageActions = []MessageAction{
MessageActionCreate, // 0
MessageActionUpdate, // 1
MessageActionDelete, // 2
"", // 3 (reserved for meta)
"", // 4 (reserved for message.summary)
MessageActionAppend, // 5
}
func encodeMessageAction(action MessageAction) int {
for i, a := range messageActions {
if a == action {
return i
}
}
return 0 // default to create
}
func decodeMessageAction(action int) MessageAction {
if action >= 0 && action < len(messageActions) {
return messageActions[action]
}
return MessageActionCreate
}Data Flow: REST Channel
PublishWithResult(ctx, name, data)
↓
1. Create Message with data, name
2. Encode message (existing logic)
3. POST to /channels/{name}/messages
4. Parse response for serials array
5. Return PublishResult{Serial: serials[0]}
UpdateMessage(ctx, msg, options...)
↓
1. Validate msg.Serial is not empty
2. Apply UpdateOptions to build MessageVersion
3. Set msg.Action = MessageActionUpdate
4. Set msg.Version = version (user fields only)
5. Encode message
6. POST to /channels/{name}/messages
7. Parse response for version serial
8. Return UpdateResult{VersionSerial: serial}
Data Flow: Realtime Channel (Async)
AppendMessageAsync(msg, onAck, options...)
↓
1. Validate msg.Serial
2. Apply UpdateOptions
3. Set msg.Action = MessageActionAppend
4. Set msg.Version from options
5. Send via protocol message (non-blocking)
6. Register onAck callback for when ACK arrives
7. Return immediately (error only if send fails)
Server Response Parsing
The server returns serials in protocol message ACKs:
{
"action": 1,
"msgSerial": 42,
"count": 1,
"serials": ["abc123:0"]
}Key Implementation Points
- Serial validation - All operations require non-empty
msg.Serial, return error 40003 if missing - Action defaults - If Action is empty on publish, default to
MessageActionCreate - Version mapping - User-provided fields populate
msg.Version, server addsserialandtimestamp - Conflation handling - Serial may be empty in result if message was discarded
- TypeScript compatibility -
MessageOperationfields map directly toMessageVersionfields
Error Handling
Validation Errors
// Missing serial (matches TypeScript exactly)
if msg.Serial == "" {
return nil, newError(40003, 400,
"This message lacks a serial and cannot be updated. Make sure you have enabled \"Message annotations, updates, and deletes\" in channel settings on your dashboard.")
}
// For query methods
if serial == "" {
return nil, newError(40003, 400,
"This message lacks a serial. Make sure you have enabled \"Message annotations, updates, and deletes\" in channel settings on your dashboard.")
}Server Errors
The SDK passes through server errors for:
- Permissions - Insufficient capabilities (message-update-own/any, message-delete-own/any)
- Not found - Message with serial doesn't exist
- Channel config - Updates/deletes not enabled on channel
- Type mismatch - Append type doesn't match original message type
Edge Cases
- Empty serial in result - Not an error, indicates conflation
- Context cancellation - Respect ctx.Done() in blocking methods
- Async callback errors - Always invoke callback, even on send failure
- Nil message - Return clear validation error
Testing Strategy
Unit Tests
- Message struct field serialization/deserialization
- MessageAction encoding/decoding matches TypeScript wire format
- UpdateOption builders correctly populate MessageVersion
- Error message text matches TypeScript exactly
Integration Tests
- REST PublishWithResult returns serial
- REST UpdateMessage/DeleteMessage/AppendMessage work correctly
- Realtime blocking methods return results
- Realtime async methods invoke callbacks
- GetMessage and GetMessageVersions query methods
- Error validation (missing serial, non-existent message)
- AI streaming pattern (rapid non-blocking appends)
- Integration with real Ably sandbox environment
Test Coverage Goals
- All new struct fields serialize/deserialize correctly
- MessageAction encoding matches TypeScript wire format
- UpdateOption builders work correctly
- REST methods return correct results
- Realtime blocking methods work
- Realtime async methods work (callbacks invoked)
- GetMessage and GetMessageVersions query methods
- Error validation (missing serial, etc.)
- Integration with real Ably service (via sandbox)
- AI streaming pattern (rapid appends)