Documentation
¶
Overview ¶
Package hooks provides composable hook implementations for the MQTT broker.
Index ¶
- type ACLConfig
- type ACLHook
- func (h *ACLHook) AddRule(rule ACLRule)
- func (h *ACLHook) CanRead(ctx context.Context, client broker.ClientInfo, topicName string) bool
- func (h *ACLHook) ClearRules()
- func (h *ACLHook) ID() string
- func (h *ACLHook) Init(opts *broker.HookOptions, config any) error
- func (h *ACLHook) OnPublish(ctx context.Context, client broker.ClientInfo, pkt *packet.Publish) error
- func (h *ACLHook) OnSubscribe(ctx context.Context, client broker.ClientInfo, subs []packet.Subscription) ([]packet.Subscription, error)
- func (h *ACLHook) Provides(event byte) bool
- type ACLRule
- type AuthConfig
- type AuthHook
- func (h *AuthHook) AddUser(username, password string)
- func (h *AuthHook) ID() string
- func (h *AuthHook) Init(opts *broker.HookOptions, config any) error
- func (h *AuthHook) OnConnect(ctx context.Context, client broker.ClientInfo, pkt *packet.Connect) error
- func (h *AuthHook) Provides(event byte) bool
- func (h *AuthHook) RemoveUser(username string)
- type AuthValidator
- type ClusterConfig
- type ClusterHook
- func (h *ClusterHook) AckInflight(ctx context.Context, clientID string, packetID uint16) error
- func (h *ClusterHook) Client() redis.UniversalClient
- func (h *ClusterHook) GetActiveNodes(ctx context.Context) ([]string, error)
- func (h *ClusterHook) GetRetained(ctx context.Context, filter string) ([]*packet.Publish, error)
- func (h *ClusterHook) ID() string
- func (h *ClusterHook) Init(opts *broker.HookOptions, config any) error
- func (h *ClusterHook) NodeID() string
- func (h *ClusterHook) OnConnect(ctx context.Context, client broker.ClientInfo, pkt *packet.Connect) error
- func (h *ClusterHook) OnConnected(ctx context.Context, client broker.ClientInfo)
- func (h *ClusterHook) OnDisconnect(ctx context.Context, client broker.ClientInfo, err error)
- func (h *ClusterHook) OnPublishReceived(ctx context.Context, client broker.ClientInfo, pkt *packet.Publish) (*packet.Publish, error)
- func (h *ClusterHook) OnSessionCreated(ctx context.Context, client broker.ClientInfo)
- func (h *ClusterHook) OnSessionEnded(ctx context.Context, clientID string)
- func (h *ClusterHook) OnSessionResumed(ctx context.Context, client broker.ClientInfo)
- func (h *ClusterHook) OnSubscribe(ctx context.Context, client broker.ClientInfo, subs []packet.Subscription) ([]packet.Subscription, error)
- func (h *ClusterHook) Provides(event byte) bool
- func (h *ClusterHook) QueueMessage(ctx context.Context, clientID string, pkt *packet.Publish) error
- func (h *ClusterHook) Stop() error
- func (h *ClusterHook) StoreRetained(ctx context.Context, topic string, pkt *packet.Publish) error
- func (h *ClusterHook) TrackInflight(ctx context.Context, clientID string, packetID uint16, state string, ...) error
- type LogLevel
- type LoggerConfig
- type LoggerHook
- func (h *LoggerHook) ID() string
- func (h *LoggerHook) Init(opts *broker.HookOptions, config any) error
- func (h *LoggerHook) OnConnected(ctx context.Context, client broker.ClientInfo)
- func (h *LoggerHook) OnDisconnect(ctx context.Context, client broker.ClientInfo, err error)
- func (h *LoggerHook) OnPublishDeliver(ctx context.Context, subscriber broker.ClientInfo, pkt *packet.Publish) (*packet.Publish, error)
- func (h *LoggerHook) OnPublishReceived(ctx context.Context, client broker.ClientInfo, pkt *packet.Publish) (*packet.Publish, error)
- func (h *LoggerHook) OnSessionCreated(ctx context.Context, client broker.ClientInfo)
- func (h *LoggerHook) OnSessionEnded(ctx context.Context, clientID string)
- func (h *LoggerHook) OnSessionResumed(ctx context.Context, client broker.ClientInfo)
- func (h *LoggerHook) OnSubscribe(ctx context.Context, client broker.ClientInfo, subs []packet.Subscription) ([]packet.Subscription, error)
- func (h *LoggerHook) Provides(event byte) bool
- type RateLimitConfig
- type RateLimitHook
- func (h *RateLimitHook) ID() string
- func (h *RateLimitHook) Init(opts *broker.HookOptions, config any) error
- func (h *RateLimitHook) OnDisconnect(ctx context.Context, client broker.ClientInfo, err error)
- func (h *RateLimitHook) OnPublishReceived(ctx context.Context, client broker.ClientInfo, pkt *packet.Publish) (*packet.Publish, error)
- func (h *RateLimitHook) Provides(event byte) bool
- func (h *RateLimitHook) Stop() error
- type RedisConfig
- type RedisHook
- func (h *RedisHook) Client() *redis.Client
- func (h *RedisHook) GetRetained(ctx context.Context, filter string) ([]*packet.Publish, error)
- func (h *RedisHook) ID() string
- func (h *RedisHook) Init(opts *broker.HookOptions, config any) error
- func (h *RedisHook) OnPublishReceived(ctx context.Context, client broker.ClientInfo, pkt *packet.Publish) (*packet.Publish, error)
- func (h *RedisHook) Provides(event byte) bool
- func (h *RedisHook) Stop() error
- func (h *RedisHook) StoreRetained(ctx context.Context, topic string, pkt *packet.Publish) error
- type SysConfig
- type SysHook
- func (h *SysHook) DecrementSubscriptions(n int64)
- func (h *SysHook) ID() string
- func (h *SysHook) IncrementSubscriptions(n int64)
- func (h *SysHook) Init(opts *broker.HookOptions, config any) error
- func (h *SysHook) Metrics() SysMetrics
- func (h *SysHook) OnConnected(ctx context.Context, client broker.ClientInfo)
- func (h *SysHook) OnDisconnect(ctx context.Context, client broker.ClientInfo, err error)
- func (h *SysHook) OnPublishDeliver(ctx context.Context, subscriber broker.ClientInfo, pkt *packet.Publish) (*packet.Publish, error)
- func (h *SysHook) OnPublishReceived(ctx context.Context, client broker.ClientInfo, pkt *packet.Publish) (*packet.Publish, error)
- func (h *SysHook) Provides(event byte) bool
- func (h *SysHook) Stop() error
- type SysMetrics
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ACLConfig ¶
type ACLConfig struct {
// Rules defines the access control rules (evaluated in order).
Rules []ACLRule
// DenyByDefault denies access if no rule matches (default: false = allow).
DenyByDefault bool
}
ACLConfig configures the ACL hook.
type ACLHook ¶
ACLHook provides topic-based access control.
func (*ACLHook) Init ¶
func (h *ACLHook) Init(opts *broker.HookOptions, config any) error
Init is called when the hook is registered with the broker.
func (*ACLHook) OnPublish ¶
func (h *ACLHook) OnPublish(ctx context.Context, client broker.ClientInfo, pkt *packet.Publish) error
OnPublish checks write permissions for the topic.
func (*ACLHook) OnSubscribe ¶
func (h *ACLHook) OnSubscribe(ctx context.Context, client broker.ClientInfo, subs []packet.Subscription) ([]packet.Subscription, error)
OnSubscribe filters subscriptions based on read permissions.
type ACLRule ¶
type ACLRule struct {
// ClientID pattern (supports * wildcard, empty = any).
ClientID string
// Username pattern (supports * wildcard, empty = any).
Username string
// TopicFilter pattern (supports MQTT wildcards + and #).
TopicFilter string
// Access permissions.
Read bool
Write bool
}
ACLRule defines an access control rule.
type AuthConfig ¶
type AuthConfig struct {
// Credentials is a map of username -> password for simple auth.
Credentials map[string]string
// Validator is a custom authentication function.
// If set, Credentials is ignored.
Validator AuthValidator
}
AuthConfig configures the auth hook.
type AuthHook ¶
AuthHook provides simple username/password authentication.
func (*AuthHook) Init ¶
func (h *AuthHook) Init(opts *broker.HookOptions, config any) error
Init is called when the hook is registered with the broker.
func (*AuthHook) OnConnect ¶
func (h *AuthHook) OnConnect(ctx context.Context, client broker.ClientInfo, pkt *packet.Connect) error
OnConnect validates client credentials.
func (*AuthHook) RemoveUser ¶
RemoveUser removes a user credential.
type AuthValidator ¶
AuthValidator is a custom authentication function.
type ClusterConfig ¶
type ClusterConfig struct {
// Addr is the Redis server address (default: "localhost:6379").
// For Redis Cluster, use comma-separated addresses.
Addr string
// Addrs is a list of Redis addresses for cluster mode.
Addrs []string
// Password for Redis authentication (optional).
Password string
// DB is the Redis database number (default: 0).
// Ignored in cluster mode.
DB int
// KeyPrefix is prepended to all Redis keys (default: "mqtt:").
KeyPrefix string
// NodeID uniquely identifies this broker node (required).
NodeID string
// HeartbeatInterval is how often to refresh the node TTL (default: 3s).
HeartbeatInterval time.Duration
// NodeTTL is how long before a node is considered dead (default: 10s).
NodeTTL time.Duration
// Client allows providing a pre-configured Redis client.
// If set, Addr/Addrs/Password/DB are ignored.
Client redis.UniversalClient
}
ClusterConfig configures the cluster hook.
type ClusterHook ¶
ClusterHook provides full distributed state using Redis/Valkey.
Features:
- Node registry with heartbeat and auto-expiry
- Session persistence across nodes
- Subscription storage and route index for targeted fan-out
- Offline message queue using Redis Streams
- QoS 2 in-flight state tracking
- Retained message storage with index for wildcard matching
- Per-node pub/sub channels for efficient message routing
Key structure (prefix: "mqtt:"):
mqtt:node:{nodeID} → STRING (with TTL) - node registry
mqtt:session:{clientID} → HASH - session state
mqtt:subs:{clientID} → HASH - client subscriptions
mqtt:route:{filter} → SET - nodes with subscribers to filter
mqtt:queue:{clientID} → STREAM - offline message queue
mqtt:inflight:{clientID} → HASH - QoS 2 in-flight state
mqtt:retained:{topic} → STRING - retained message (msgpack)
mqtt:retained:_idx → SET - retained message index
mqtt:fanout:{nodeID} → PUB/SUB channel - per-node messaging
func (*ClusterHook) AckInflight ¶
AckInflight removes QoS 2 state from Redis.
func (*ClusterHook) Client ¶
func (h *ClusterHook) Client() redis.UniversalClient
Client returns the underlying Redis client for advanced usage.
func (*ClusterHook) GetActiveNodes ¶
func (h *ClusterHook) GetActiveNodes(ctx context.Context) ([]string, error)
GetActiveNodes returns all active nodes in the cluster.
func (*ClusterHook) GetRetained ¶
GetRetained retrieves retained messages matching a topic filter.
func (*ClusterHook) ID ¶
func (h *ClusterHook) ID() string
func (*ClusterHook) Init ¶
func (h *ClusterHook) Init(opts *broker.HookOptions, config any) error
Init connects to Redis and starts background workers.
func (*ClusterHook) OnConnect ¶
func (h *ClusterHook) OnConnect(ctx context.Context, client broker.ClientInfo, pkt *packet.Connect) error
OnConnect stores/updates session state.
func (*ClusterHook) OnConnected ¶
func (h *ClusterHook) OnConnected(ctx context.Context, client broker.ClientInfo)
OnConnected is called after successful connection.
func (*ClusterHook) OnDisconnect ¶
func (h *ClusterHook) OnDisconnect(ctx context.Context, client broker.ClientInfo, err error)
OnDisconnect updates session state and handles cleanup.
func (*ClusterHook) OnPublishReceived ¶
func (h *ClusterHook) OnPublishReceived(ctx context.Context, client broker.ClientInfo, pkt *packet.Publish) (*packet.Publish, error)
OnPublishReceived routes messages to other nodes via targeted fan-out.
func (*ClusterHook) OnSessionCreated ¶
func (h *ClusterHook) OnSessionCreated(ctx context.Context, client broker.ClientInfo)
OnSessionCreated initializes session in Redis.
func (*ClusterHook) OnSessionEnded ¶
func (h *ClusterHook) OnSessionEnded(ctx context.Context, clientID string)
OnSessionEnded cleans up session data in Redis.
func (*ClusterHook) OnSessionResumed ¶
func (h *ClusterHook) OnSessionResumed(ctx context.Context, client broker.ClientInfo)
OnSessionResumed updates session state in Redis.
func (*ClusterHook) OnSubscribe ¶
func (h *ClusterHook) OnSubscribe(ctx context.Context, client broker.ClientInfo, subs []packet.Subscription) ([]packet.Subscription, error)
OnSubscribe stores subscriptions and updates route index.
func (*ClusterHook) Provides ¶
func (h *ClusterHook) Provides(event byte) bool
Provides indicates which events this hook handles.
func (*ClusterHook) QueueMessage ¶
QueueMessage adds a message to the client's offline queue.
func (*ClusterHook) Stop ¶
func (h *ClusterHook) Stop() error
Stop gracefully shuts down the cluster hook.
func (*ClusterHook) StoreRetained ¶
StoreRetained stores a retained message in Redis.
type LogLevel ¶
type LogLevel int
LogLevel controls which events are logged.
const ( // LogLevelConnection logs connect/disconnect events. LogLevelConnection LogLevel = 1 << iota // LogLevelSubscribe logs subscribe/unsubscribe events. LogLevelSubscribe // LogLevelPublish logs publish events. LogLevelPublish // LogLevelSession logs session lifecycle events. LogLevelSession // LogLevelAll logs all events. LogLevelAll = LogLevelConnection | LogLevelSubscribe | LogLevelPublish | LogLevelSession )
type LoggerConfig ¶
type LoggerConfig struct {
// Logger is the slog.Logger to use (default: slog.Default()).
Logger *slog.Logger
// Level controls which events are logged (default: LogLevelAll).
Level LogLevel
}
LoggerConfig configures the logger hook.
type LoggerHook ¶
LoggerHook logs broker events using slog.
func (*LoggerHook) ID ¶
func (h *LoggerHook) ID() string
func (*LoggerHook) Init ¶
func (h *LoggerHook) Init(opts *broker.HookOptions, config any) error
Init is called when the hook is registered with the broker.
func (*LoggerHook) OnConnected ¶
func (h *LoggerHook) OnConnected(ctx context.Context, client broker.ClientInfo)
OnConnected logs client connections.
func (*LoggerHook) OnDisconnect ¶
func (h *LoggerHook) OnDisconnect(ctx context.Context, client broker.ClientInfo, err error)
OnDisconnect logs client disconnections.
func (*LoggerHook) OnPublishDeliver ¶
func (h *LoggerHook) OnPublishDeliver(ctx context.Context, subscriber broker.ClientInfo, pkt *packet.Publish) (*packet.Publish, error)
OnPublishDeliver logs delivered messages.
func (*LoggerHook) OnPublishReceived ¶
func (h *LoggerHook) OnPublishReceived(ctx context.Context, client broker.ClientInfo, pkt *packet.Publish) (*packet.Publish, error)
OnPublishReceived logs received messages.
func (*LoggerHook) OnSessionCreated ¶
func (h *LoggerHook) OnSessionCreated(ctx context.Context, client broker.ClientInfo)
OnSessionCreated logs session creation.
func (*LoggerHook) OnSessionEnded ¶
func (h *LoggerHook) OnSessionEnded(ctx context.Context, clientID string)
OnSessionEnded logs session termination.
func (*LoggerHook) OnSessionResumed ¶
func (h *LoggerHook) OnSessionResumed(ctx context.Context, client broker.ClientInfo)
OnSessionResumed logs session resumption.
func (*LoggerHook) OnSubscribe ¶
func (h *LoggerHook) OnSubscribe(ctx context.Context, client broker.ClientInfo, subs []packet.Subscription) ([]packet.Subscription, error)
OnSubscribe logs subscriptions.
func (*LoggerHook) Provides ¶
func (h *LoggerHook) Provides(event byte) bool
Provides indicates which events this hook handles.
type RateLimitConfig ¶
type RateLimitConfig struct {
// PublishRate is the max number of publishes per interval per client.
PublishRate int
// Interval is the rate limit window (default: 1s).
Interval time.Duration
// BurstSize is the max burst allowed (default: PublishRate * 2).
BurstSize int
}
RateLimitConfig configures the rate limiter.
type RateLimitHook ¶
RateLimitHook limits message rates per client.
func (*RateLimitHook) ID ¶
func (h *RateLimitHook) ID() string
func (*RateLimitHook) Init ¶
func (h *RateLimitHook) Init(opts *broker.HookOptions, config any) error
Init is called when the hook is registered with the broker.
func (*RateLimitHook) OnDisconnect ¶
func (h *RateLimitHook) OnDisconnect(ctx context.Context, client broker.ClientInfo, err error)
OnDisconnect cleans up the client's bucket.
func (*RateLimitHook) OnPublishReceived ¶
func (h *RateLimitHook) OnPublishReceived(ctx context.Context, client broker.ClientInfo, pkt *packet.Publish) (*packet.Publish, error)
OnPublishReceived checks the publish rate limit.
func (*RateLimitHook) Provides ¶
func (h *RateLimitHook) Provides(event byte) bool
Provides indicates which events this hook handles.
func (*RateLimitHook) Stop ¶
func (h *RateLimitHook) Stop() error
Stop is called when the broker shuts down.
type RedisConfig ¶
type RedisConfig struct {
// Addr is the Redis server address (default: "localhost:6379").
Addr string
// Password for Redis authentication (optional).
Password string
// DB is the Redis database number (default: 0).
DB int
// KeyPrefix is prepended to all Redis keys (default: "mqtt:").
KeyPrefix string
// NodeID uniquely identifies this broker node (required for clustering).
// If empty, cross-node routing is disabled.
NodeID string
// Client allows providing a pre-configured Redis client.
// If set, Addr/Password/DB are ignored.
Client *redis.Client
}
RedisConfig configures the Redis hook.
type RedisHook ¶
RedisHook provides distributed state using Redis/Valkey. Features:
- Retained message storage (survives broker restart)
- Cross-node message routing via Redis pub/sub
- Session persistence for cluster failover
func (*RedisHook) GetRetained ¶
GetRetained retrieves retained messages matching a topic filter.
func (*RedisHook) Init ¶
func (h *RedisHook) Init(opts *broker.HookOptions, config any) error
Init connects to Redis and starts the cluster subscription listener.
func (*RedisHook) OnPublishReceived ¶
func (h *RedisHook) OnPublishReceived(ctx context.Context, client broker.ClientInfo, pkt *packet.Publish) (*packet.Publish, error)
OnPublishReceived forwards messages to other nodes via Redis pub/sub.
type SysConfig ¶
type SysConfig struct {
// Interval is how often to publish metrics (default: 10s).
Interval time.Duration
// Version is the broker version string (default: "1.0.0").
Version string
}
SysConfig configures the $SYS hook.
type SysHook ¶
SysHook publishes broker metrics to $SYS topics like Mosquitto. Topics published:
- $SYS/broker/version
- $SYS/broker/uptime
- $SYS/broker/clients/connected
- $SYS/broker/clients/total
- $SYS/broker/messages/received
- $SYS/broker/messages/sent
- $SYS/broker/bytes/received
- $SYS/broker/bytes/sent
- $SYS/broker/subscriptions/count
func (*SysHook) DecrementSubscriptions ¶
DecrementSubscriptions decrements the subscription count.
func (*SysHook) IncrementSubscriptions ¶
IncrementSubscriptions increments the subscription count.
func (*SysHook) Init ¶
func (h *SysHook) Init(opts *broker.HookOptions, config any) error
Init is called when the hook is registered with the broker.
func (*SysHook) Metrics ¶
func (h *SysHook) Metrics() SysMetrics
Metrics provides direct access to current metrics.
func (*SysHook) OnConnected ¶
func (h *SysHook) OnConnected(ctx context.Context, client broker.ClientInfo)
OnConnected tracks connected clients.
func (*SysHook) OnDisconnect ¶
OnDisconnect tracks disconnected clients.
func (*SysHook) OnPublishDeliver ¶
func (h *SysHook) OnPublishDeliver(ctx context.Context, subscriber broker.ClientInfo, pkt *packet.Publish) (*packet.Publish, error)
OnPublishDeliver tracks sent messages.
func (*SysHook) OnPublishReceived ¶
func (h *SysHook) OnPublishReceived(ctx context.Context, client broker.ClientInfo, pkt *packet.Publish) (*packet.Publish, error)
OnPublishReceived tracks received messages.