hooks

package
v0.0.0-...-beaa201 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 28, 2025 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Overview

Package hooks provides composable hook implementations for the MQTT broker.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ACLConfig

type ACLConfig struct {
	// Rules defines the access control rules (evaluated in order).
	Rules []ACLRule

	// DenyByDefault denies access if no rule matches (default: false = allow).
	DenyByDefault bool
}

ACLConfig configures the ACL hook.

type ACLHook

type ACLHook struct {
	broker.HookBase
	// contains filtered or unexported fields
}

ACLHook provides topic-based access control.

func (*ACLHook) AddRule

func (h *ACLHook) AddRule(rule ACLRule)

AddRule adds a rule at runtime.

func (*ACLHook) CanRead

func (h *ACLHook) CanRead(ctx context.Context, client broker.ClientInfo, topicName string) bool

CanRead checks if a client can receive messages on a topic.

func (*ACLHook) ClearRules

func (h *ACLHook) ClearRules()

ClearRules removes all rules.

func (*ACLHook) ID

func (h *ACLHook) ID() string

func (*ACLHook) Init

func (h *ACLHook) Init(opts *broker.HookOptions, config any) error

Init is called when the hook is registered with the broker.

func (*ACLHook) OnPublish

func (h *ACLHook) OnPublish(ctx context.Context, client broker.ClientInfo, pkt *packet.Publish) error

OnPublish checks write permissions for the topic.

func (*ACLHook) OnSubscribe

func (h *ACLHook) OnSubscribe(ctx context.Context, client broker.ClientInfo, subs []packet.Subscription) ([]packet.Subscription, error)

OnSubscribe filters subscriptions based on read permissions.

func (*ACLHook) Provides

func (h *ACLHook) Provides(event byte) bool

Provides indicates which events this hook handles.

type ACLRule

type ACLRule struct {
	// ClientID pattern (supports * wildcard, empty = any).
	ClientID string

	// Username pattern (supports * wildcard, empty = any).
	Username string

	// TopicFilter pattern (supports MQTT wildcards + and #).
	TopicFilter string

	// Access permissions.
	Read  bool
	Write bool
}

ACLRule defines an access control rule.

type AuthConfig

type AuthConfig struct {
	// Credentials is a map of username -> password for simple auth.
	Credentials map[string]string

	// Validator is a custom authentication function.
	// If set, Credentials is ignored.
	Validator AuthValidator
}

AuthConfig configures the auth hook.

type AuthHook

type AuthHook struct {
	broker.HookBase
	// contains filtered or unexported fields
}

AuthHook provides simple username/password authentication.

func (*AuthHook) AddUser

func (h *AuthHook) AddUser(username, password string)

AddUser adds or updates a user credential.

func (*AuthHook) ID

func (h *AuthHook) ID() string

func (*AuthHook) Init

func (h *AuthHook) Init(opts *broker.HookOptions, config any) error

Init is called when the hook is registered with the broker.

func (*AuthHook) OnConnect

func (h *AuthHook) OnConnect(ctx context.Context, client broker.ClientInfo, pkt *packet.Connect) error

OnConnect validates client credentials.

func (*AuthHook) Provides

func (h *AuthHook) Provides(event byte) bool

Provides indicates which events this hook handles.

func (*AuthHook) RemoveUser

func (h *AuthHook) RemoveUser(username string)

RemoveUser removes a user credential.

type AuthValidator

type AuthValidator func(ctx context.Context, username, password string) bool

AuthValidator is a custom authentication function.

type ClusterConfig

type ClusterConfig struct {
	// Addr is the Redis server address (default: "localhost:6379").
	// For Redis Cluster, use comma-separated addresses.
	Addr string

	// Addrs is a list of Redis addresses for cluster mode.
	Addrs []string

	// Password for Redis authentication (optional).
	Password string

	// DB is the Redis database number (default: 0).
	// Ignored in cluster mode.
	DB int

	// KeyPrefix is prepended to all Redis keys (default: "mqtt:").
	KeyPrefix string

	// NodeID uniquely identifies this broker node (required).
	NodeID string

	// HeartbeatInterval is how often to refresh the node TTL (default: 3s).
	HeartbeatInterval time.Duration

	// NodeTTL is how long before a node is considered dead (default: 10s).
	NodeTTL time.Duration

	// Client allows providing a pre-configured Redis client.
	// If set, Addr/Addrs/Password/DB are ignored.
	Client redis.UniversalClient
}

ClusterConfig configures the cluster hook.

type ClusterHook

type ClusterHook struct {
	broker.HookBase
	// contains filtered or unexported fields
}

ClusterHook provides full distributed state using Redis/Valkey.

Features:

  • Node registry with heartbeat and auto-expiry
  • Session persistence across nodes
  • Subscription storage and route index for targeted fan-out
  • Offline message queue using Redis Streams
  • QoS 2 in-flight state tracking
  • Retained message storage with index for wildcard matching
  • Per-node pub/sub channels for efficient message routing

Key structure (prefix: "mqtt:"):

mqtt:node:{nodeID}         → STRING (with TTL) - node registry
mqtt:session:{clientID}    → HASH - session state
mqtt:subs:{clientID}       → HASH - client subscriptions
mqtt:route:{filter}        → SET - nodes with subscribers to filter
mqtt:queue:{clientID}      → STREAM - offline message queue
mqtt:inflight:{clientID}   → HASH - QoS 2 in-flight state
mqtt:retained:{topic}      → STRING - retained message (msgpack)
mqtt:retained:_idx         → SET - retained message index
mqtt:fanout:{nodeID}       → PUB/SUB channel - per-node messaging

func (*ClusterHook) AckInflight

func (h *ClusterHook) AckInflight(ctx context.Context, clientID string, packetID uint16) error

AckInflight removes QoS 2 state from Redis.

func (*ClusterHook) Client

func (h *ClusterHook) Client() redis.UniversalClient

Client returns the underlying Redis client for advanced usage.

func (*ClusterHook) GetActiveNodes

func (h *ClusterHook) GetActiveNodes(ctx context.Context) ([]string, error)

GetActiveNodes returns all active nodes in the cluster.

func (*ClusterHook) GetRetained

func (h *ClusterHook) GetRetained(ctx context.Context, filter string) ([]*packet.Publish, error)

GetRetained retrieves retained messages matching a topic filter.

func (*ClusterHook) ID

func (h *ClusterHook) ID() string

func (*ClusterHook) Init

func (h *ClusterHook) Init(opts *broker.HookOptions, config any) error

Init connects to Redis and starts background workers.

func (*ClusterHook) NodeID

func (h *ClusterHook) NodeID() string

NodeID returns this node's ID.

func (*ClusterHook) OnConnect

func (h *ClusterHook) OnConnect(ctx context.Context, client broker.ClientInfo, pkt *packet.Connect) error

OnConnect stores/updates session state.

func (*ClusterHook) OnConnected

func (h *ClusterHook) OnConnected(ctx context.Context, client broker.ClientInfo)

OnConnected is called after successful connection.

func (*ClusterHook) OnDisconnect

func (h *ClusterHook) OnDisconnect(ctx context.Context, client broker.ClientInfo, err error)

OnDisconnect updates session state and handles cleanup.

func (*ClusterHook) OnPublishReceived

func (h *ClusterHook) OnPublishReceived(ctx context.Context, client broker.ClientInfo, pkt *packet.Publish) (*packet.Publish, error)

OnPublishReceived routes messages to other nodes via targeted fan-out.

func (*ClusterHook) OnSessionCreated

func (h *ClusterHook) OnSessionCreated(ctx context.Context, client broker.ClientInfo)

OnSessionCreated initializes session in Redis.

func (*ClusterHook) OnSessionEnded

func (h *ClusterHook) OnSessionEnded(ctx context.Context, clientID string)

OnSessionEnded cleans up session data in Redis.

func (*ClusterHook) OnSessionResumed

func (h *ClusterHook) OnSessionResumed(ctx context.Context, client broker.ClientInfo)

OnSessionResumed updates session state in Redis.

func (*ClusterHook) OnSubscribe

func (h *ClusterHook) OnSubscribe(ctx context.Context, client broker.ClientInfo, subs []packet.Subscription) ([]packet.Subscription, error)

OnSubscribe stores subscriptions and updates route index.

func (*ClusterHook) Provides

func (h *ClusterHook) Provides(event byte) bool

Provides indicates which events this hook handles.

func (*ClusterHook) QueueMessage

func (h *ClusterHook) QueueMessage(ctx context.Context, clientID string, pkt *packet.Publish) error

QueueMessage adds a message to the client's offline queue.

func (*ClusterHook) Stop

func (h *ClusterHook) Stop() error

Stop gracefully shuts down the cluster hook.

func (*ClusterHook) StoreRetained

func (h *ClusterHook) StoreRetained(ctx context.Context, topic string, pkt *packet.Publish) error

StoreRetained stores a retained message in Redis.

func (*ClusterHook) TrackInflight

func (h *ClusterHook) TrackInflight(ctx context.Context, clientID string, packetID uint16, state string, pkt *packet.Publish) error

TrackInflight stores QoS 2 state in Redis.

type LogLevel

type LogLevel int

LogLevel controls which events are logged.

const (
	// LogLevelConnection logs connect/disconnect events.
	LogLevelConnection LogLevel = 1 << iota
	// LogLevelSubscribe logs subscribe/unsubscribe events.
	LogLevelSubscribe
	// LogLevelPublish logs publish events.
	LogLevelPublish
	// LogLevelSession logs session lifecycle events.
	LogLevelSession
	// LogLevelAll logs all events.
	LogLevelAll = LogLevelConnection | LogLevelSubscribe | LogLevelPublish | LogLevelSession
)

type LoggerConfig

type LoggerConfig struct {
	// Logger is the slog.Logger to use (default: slog.Default()).
	Logger *slog.Logger

	// Level controls which events are logged (default: LogLevelAll).
	Level LogLevel
}

LoggerConfig configures the logger hook.

type LoggerHook

type LoggerHook struct {
	broker.HookBase
	// contains filtered or unexported fields
}

LoggerHook logs broker events using slog.

func (*LoggerHook) ID

func (h *LoggerHook) ID() string

func (*LoggerHook) Init

func (h *LoggerHook) Init(opts *broker.HookOptions, config any) error

Init is called when the hook is registered with the broker.

func (*LoggerHook) OnConnected

func (h *LoggerHook) OnConnected(ctx context.Context, client broker.ClientInfo)

OnConnected logs client connections.

func (*LoggerHook) OnDisconnect

func (h *LoggerHook) OnDisconnect(ctx context.Context, client broker.ClientInfo, err error)

OnDisconnect logs client disconnections.

func (*LoggerHook) OnPublishDeliver

func (h *LoggerHook) OnPublishDeliver(ctx context.Context, subscriber broker.ClientInfo, pkt *packet.Publish) (*packet.Publish, error)

OnPublishDeliver logs delivered messages.

func (*LoggerHook) OnPublishReceived

func (h *LoggerHook) OnPublishReceived(ctx context.Context, client broker.ClientInfo, pkt *packet.Publish) (*packet.Publish, error)

OnPublishReceived logs received messages.

func (*LoggerHook) OnSessionCreated

func (h *LoggerHook) OnSessionCreated(ctx context.Context, client broker.ClientInfo)

OnSessionCreated logs session creation.

func (*LoggerHook) OnSessionEnded

func (h *LoggerHook) OnSessionEnded(ctx context.Context, clientID string)

OnSessionEnded logs session termination.

func (*LoggerHook) OnSessionResumed

func (h *LoggerHook) OnSessionResumed(ctx context.Context, client broker.ClientInfo)

OnSessionResumed logs session resumption.

func (*LoggerHook) OnSubscribe

func (h *LoggerHook) OnSubscribe(ctx context.Context, client broker.ClientInfo, subs []packet.Subscription) ([]packet.Subscription, error)

OnSubscribe logs subscriptions.

func (*LoggerHook) Provides

func (h *LoggerHook) Provides(event byte) bool

Provides indicates which events this hook handles.

type RateLimitConfig

type RateLimitConfig struct {
	// PublishRate is the max number of publishes per interval per client.
	PublishRate int

	// Interval is the rate limit window (default: 1s).
	Interval time.Duration

	// BurstSize is the max burst allowed (default: PublishRate * 2).
	BurstSize int
}

RateLimitConfig configures the rate limiter.

type RateLimitHook

type RateLimitHook struct {
	broker.HookBase
	// contains filtered or unexported fields
}

RateLimitHook limits message rates per client.

func (*RateLimitHook) ID

func (h *RateLimitHook) ID() string

func (*RateLimitHook) Init

func (h *RateLimitHook) Init(opts *broker.HookOptions, config any) error

Init is called when the hook is registered with the broker.

func (*RateLimitHook) OnDisconnect

func (h *RateLimitHook) OnDisconnect(ctx context.Context, client broker.ClientInfo, err error)

OnDisconnect cleans up the client's bucket.

func (*RateLimitHook) OnPublishReceived

func (h *RateLimitHook) OnPublishReceived(ctx context.Context, client broker.ClientInfo, pkt *packet.Publish) (*packet.Publish, error)

OnPublishReceived checks the publish rate limit.

func (*RateLimitHook) Provides

func (h *RateLimitHook) Provides(event byte) bool

Provides indicates which events this hook handles.

func (*RateLimitHook) Stop

func (h *RateLimitHook) Stop() error

Stop is called when the broker shuts down.

type RedisConfig

type RedisConfig struct {
	// Addr is the Redis server address (default: "localhost:6379").
	Addr string

	// Password for Redis authentication (optional).
	Password string

	// DB is the Redis database number (default: 0).
	DB int

	// KeyPrefix is prepended to all Redis keys (default: "mqtt:").
	KeyPrefix string

	// NodeID uniquely identifies this broker node (required for clustering).
	// If empty, cross-node routing is disabled.
	NodeID string

	// Client allows providing a pre-configured Redis client.
	// If set, Addr/Password/DB are ignored.
	Client *redis.Client
}

RedisConfig configures the Redis hook.

type RedisHook

type RedisHook struct {
	broker.HookBase
	// contains filtered or unexported fields
}

RedisHook provides distributed state using Redis/Valkey. Features:

  • Retained message storage (survives broker restart)
  • Cross-node message routing via Redis pub/sub
  • Session persistence for cluster failover

func (*RedisHook) Client

func (h *RedisHook) Client() *redis.Client

Client returns the underlying Redis client for advanced usage.

func (*RedisHook) GetRetained

func (h *RedisHook) GetRetained(ctx context.Context, filter string) ([]*packet.Publish, error)

GetRetained retrieves retained messages matching a topic filter.

func (*RedisHook) ID

func (h *RedisHook) ID() string

func (*RedisHook) Init

func (h *RedisHook) Init(opts *broker.HookOptions, config any) error

Init connects to Redis and starts the cluster subscription listener.

func (*RedisHook) OnPublishReceived

func (h *RedisHook) OnPublishReceived(ctx context.Context, client broker.ClientInfo, pkt *packet.Publish) (*packet.Publish, error)

OnPublishReceived forwards messages to other nodes via Redis pub/sub.

func (*RedisHook) Provides

func (h *RedisHook) Provides(event byte) bool

Provides indicates which events this hook handles.

func (*RedisHook) Stop

func (h *RedisHook) Stop() error

Stop closes the Redis connection and stops the cluster listener.

func (*RedisHook) StoreRetained

func (h *RedisHook) StoreRetained(ctx context.Context, topic string, pkt *packet.Publish) error

StoreRetained stores a retained message in Redis.

type SysConfig

type SysConfig struct {
	// Interval is how often to publish metrics (default: 10s).
	Interval time.Duration

	// Version is the broker version string (default: "1.0.0").
	Version string
}

SysConfig configures the $SYS hook.

type SysHook

type SysHook struct {
	broker.HookBase
	// contains filtered or unexported fields
}

SysHook publishes broker metrics to $SYS topics like Mosquitto. Topics published:

  • $SYS/broker/version
  • $SYS/broker/uptime
  • $SYS/broker/clients/connected
  • $SYS/broker/clients/total
  • $SYS/broker/messages/received
  • $SYS/broker/messages/sent
  • $SYS/broker/bytes/received
  • $SYS/broker/bytes/sent
  • $SYS/broker/subscriptions/count

func (*SysHook) DecrementSubscriptions

func (h *SysHook) DecrementSubscriptions(n int64)

DecrementSubscriptions decrements the subscription count.

func (*SysHook) ID

func (h *SysHook) ID() string

func (*SysHook) IncrementSubscriptions

func (h *SysHook) IncrementSubscriptions(n int64)

IncrementSubscriptions increments the subscription count.

func (*SysHook) Init

func (h *SysHook) Init(opts *broker.HookOptions, config any) error

Init is called when the hook is registered with the broker.

func (*SysHook) Metrics

func (h *SysHook) Metrics() SysMetrics

Metrics provides direct access to current metrics.

func (*SysHook) OnConnected

func (h *SysHook) OnConnected(ctx context.Context, client broker.ClientInfo)

OnConnected tracks connected clients.

func (*SysHook) OnDisconnect

func (h *SysHook) OnDisconnect(ctx context.Context, client broker.ClientInfo, err error)

OnDisconnect tracks disconnected clients.

func (*SysHook) OnPublishDeliver

func (h *SysHook) OnPublishDeliver(ctx context.Context, subscriber broker.ClientInfo, pkt *packet.Publish) (*packet.Publish, error)

OnPublishDeliver tracks sent messages.

func (*SysHook) OnPublishReceived

func (h *SysHook) OnPublishReceived(ctx context.Context, client broker.ClientInfo, pkt *packet.Publish) (*packet.Publish, error)

OnPublishReceived tracks received messages.

func (*SysHook) Provides

func (h *SysHook) Provides(event byte) bool

Provides indicates which events this hook handles.

func (*SysHook) Stop

func (h *SysHook) Stop() error

Stop is called when the broker shuts down.

type SysMetrics

type SysMetrics struct {
	Uptime           time.Duration
	ClientsConnected int64
	ClientsTotal     int64
	MessagesReceived int64
	MessagesSent     int64
	BytesReceived    int64
	BytesSent        int64
	Subscriptions    int64
}

SysMetrics holds current broker metrics.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL