rmq

package module
v0.0.28 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 26, 2025 License: MIT Imports: 10 Imported by: 0

README

go-rmq

RabbitMQ Wrappers for amqp091-go

Features:

  • Using builder pattern to make it much easier working with exchanges, queues, consumers, etc...
  • KeepAlive functionality to persist the connection upon temporary failures

Install

go get -u github.com/aliforever/go-rmq

Usage:

To initialize the connection with 5 retry times and 10 seconds of delay on each try:

r := rmq.New(address)

errChan, err := r.Connect(5, 10*time.Second)
if err != nil {
    return nil, err
}

panic(<-errChan)

Documentation

Index

Constants

View Source
const (
	DataTypeBytes dataType = iota
	DataTypeJSON
)

Variables

View Source
var (
	ConnectionNotSetError                = errors.New("connection_is_not_set")
	DataIsNotBytesError                  = errors.New("data_is_not_of_bytes_type")
	ResponseMapNotSetError               = errors.New("response_map_not_set")
	CorrelationIdNotSetError             = errors.New("correlation_id_not_set")
	PublishResponseInvalidReplyToIdError = errors.New("publish_response_invalid_reply_to_id")
)
View Source
var ConnectionClosedError = fmt.Errorf("connection_closed")
View Source
var (
	ErrConnectionClosed = fmt.Errorf("connection closed")
)

Functions

func OnUpdate added in v0.0.26

func OnUpdate[T any](
	client *RMQ,
	queueName string,
	handler func(*T) error,
	opts *OnUpdateOptions,
) error

Types

type Channel added in v0.0.10

type Channel struct {
	// contains filtered or unexported fields
}

func (*Channel) Close added in v0.0.21

func (c *Channel) Close() error

func (*Channel) CloseChan added in v0.0.10

func (c *Channel) CloseChan() <-chan error

func (*Channel) ConsumerBuilder added in v0.0.10

func (c *Channel) ConsumerBuilder(name, queue string) *ConsumerBuilder

func (*Channel) DirectExchangeBuilder added in v0.0.11

func (c *Channel) DirectExchangeBuilder(name string) *ExchangeBuilder

func (*Channel) ExchangeBuilder added in v0.0.11

func (c *Channel) ExchangeBuilder(name string) *ExchangeBuilder

func (*Channel) FanoutExchangeBuilder added in v0.0.11

func (c *Channel) FanoutExchangeBuilder(name string) *ExchangeBuilder

func (*Channel) IsHealthy added in v0.0.25

func (c *Channel) IsHealthy() bool

func (*Channel) PublisherBuilder added in v0.0.10

func (c *Channel) PublisherBuilder(exchange string, routingKey string) *PublisherBuilder

func (*Channel) QueueBuilder added in v0.0.10

func (c *Channel) QueueBuilder() *QueueBuilder

func (*Channel) TopicExchangeBuilder added in v0.0.11

func (c *Channel) TopicExchangeBuilder(name string) *ExchangeBuilder

type ChannelImpl added in v0.0.16

type ChannelImpl interface {
	PublisherBuilder(exchange string, routingKey string) PublisherBuilderImpl
	ConsumerBuilder(name, queue string) ConsumerBuilderImpl
	QueueBuilder() QueueBuilderImpl
	ExchangeBuilder(name string) ExchangeBuilderImpl
	FanoutExchangeBuilder(name string) ExchangeBuilderImpl
	DirectExchangeBuilder(name string) ExchangeBuilderImpl
	TopicExchangeBuilder(name string) ExchangeBuilderImpl
	CloseChan() <-chan error
	Close() error
	IsHealthy() bool
}

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func (*Consumer) Cancel added in v0.0.5

func (c *Consumer) Cancel() error

func (*Consumer) Close added in v0.0.25

func (c *Consumer) Close() error

func (*Consumer) ErrorChan added in v0.0.10

func (c *Consumer) ErrorChan() <-chan error

func (*Consumer) Messages added in v0.0.10

func (c *Consumer) Messages() <-chan amqp091.Delivery

type ConsumerBuilder added in v0.0.10

type ConsumerBuilder struct {
	// contains filtered or unexported fields
}

func (*ConsumerBuilder) AddArg added in v0.0.10

func (c *ConsumerBuilder) AddArg(key string, val interface{}) *ConsumerBuilder

func (*ConsumerBuilder) Build added in v0.0.10

func (c *ConsumerBuilder) Build() (*Consumer, error)

func (*ConsumerBuilder) SetAutoAck added in v0.0.10

func (c *ConsumerBuilder) SetAutoAck() *ConsumerBuilder

func (*ConsumerBuilder) SetExclusive added in v0.0.10

func (c *ConsumerBuilder) SetExclusive() *ConsumerBuilder

func (*ConsumerBuilder) SetNoLocal added in v0.0.10

func (c *ConsumerBuilder) SetNoLocal() *ConsumerBuilder

func (*ConsumerBuilder) SetNoWait added in v0.0.10

func (c *ConsumerBuilder) SetNoWait() *ConsumerBuilder

func (*ConsumerBuilder) SetPrefetch added in v0.0.12

func (c *ConsumerBuilder) SetPrefetch(prefetch int) *ConsumerBuilder

type ConsumerBuilderImpl added in v0.0.16

type ConsumerBuilderImpl interface {
	SetAutoAck() ConsumerBuilderImpl
	SetExclusive() ConsumerBuilderImpl
	SetNoLocal() ConsumerBuilderImpl
	SetNoWait() ConsumerBuilderImpl
	SetPrefetch(prefetch int) ConsumerBuilderImpl
	AddArg(key string, val interface{}) ConsumerBuilderImpl
	Build() (ConsumerImpl, error)
}

type ConsumerImpl added in v0.0.16

type ConsumerImpl interface {
	Messages() <-chan amqp091.Delivery
	ErrorChan() <-chan error
	Cancel() error
	Close() error
}

type Delivery

type Delivery struct {
	*amqp091.Delivery
	// contains filtered or unexported fields
}

func (*Delivery) Ack

func (d *Delivery) Ack(multiple bool) error

func (*Delivery) Nack added in v0.0.25

func (d *Delivery) Nack(multiple, requeue bool) error

func (*Delivery) Reject added in v0.0.25

func (d *Delivery) Reject(requeue bool) error

type DeliveryImpl added in v0.0.25

type DeliveryImpl interface {
	Ack(multiple bool) error
	Nack(multiple, requeue bool) error
	Reject(requeue bool) error
}

type ExchangeBuilder added in v0.0.10

type ExchangeBuilder struct {
	// contains filtered or unexported fields
}

func (*ExchangeBuilder) AddArg added in v0.0.10

func (e *ExchangeBuilder) AddArg(key string, val interface{}) *ExchangeBuilder

func (*ExchangeBuilder) Declare added in v0.0.10

func (e *ExchangeBuilder) Declare() error

func (*ExchangeBuilder) DeleteOnDeclare added in v0.0.10

func (e *ExchangeBuilder) DeleteOnDeclare(ifUnused, noWait bool) *ExchangeBuilder

func (*ExchangeBuilder) SetAutoDelete added in v0.0.10

func (e *ExchangeBuilder) SetAutoDelete() *ExchangeBuilder

func (*ExchangeBuilder) SetDurable added in v0.0.10

func (e *ExchangeBuilder) SetDurable() *ExchangeBuilder

func (*ExchangeBuilder) SetInternal added in v0.0.10

func (e *ExchangeBuilder) SetInternal() *ExchangeBuilder

func (*ExchangeBuilder) SetNoWait added in v0.0.10

func (e *ExchangeBuilder) SetNoWait() *ExchangeBuilder

type ExchangeBuilderImpl added in v0.0.16

type ExchangeBuilderImpl interface {
	DeleteOnDeclare(ifUnused, noWait bool) ExchangeBuilderImpl
	SetDurable() ExchangeBuilderImpl
	SetAutoDelete() ExchangeBuilderImpl
	SetInternal() ExchangeBuilderImpl
	SetNoWait() ExchangeBuilderImpl
	AddArg(key string, val interface{}) ExchangeBuilderImpl
	Declare() error
}

type OnUpdateOptions added in v0.0.26

type OnUpdateOptions struct {
	// contains filtered or unexported fields
}

func NewOnUpdateOptions added in v0.0.26

func NewOnUpdateOptions() *OnUpdateOptions

func (*OnUpdateOptions) SetCreateQueue added in v0.0.26

func (oup *OnUpdateOptions) SetCreateQueue(createQueue, isDurable bool) *OnUpdateOptions

func (*OnUpdateOptions) SetLogger added in v0.0.26

func (oup *OnUpdateOptions) SetLogger(logger *slog.Logger) *OnUpdateOptions

func (*OnUpdateOptions) SetPrefetchCount added in v0.0.26

func (oup *OnUpdateOptions) SetPrefetchCount(count int) *OnUpdateOptions

func (*OnUpdateOptions) SetReturnOnHandlerError added in v0.0.26

func (oup *OnUpdateOptions) SetReturnOnHandlerError(returnOnHandlerError bool) *OnUpdateOptions

func (*OnUpdateOptions) SetReturnOnUnmarshalError added in v0.0.26

func (oup *OnUpdateOptions) SetReturnOnUnmarshalError(returnOnUnmarshalError bool) *OnUpdateOptions

type PublishFields added in v0.0.3

type PublishFields struct {
	// contains filtered or unexported fields
}

func NewPublishFields added in v0.0.3

func NewPublishFields() *PublishFields

func (*PublishFields) AddHeader added in v0.0.3

func (p *PublishFields) AddHeader(key string, val interface{}) *PublishFields

func (*PublishFields) DeliveryModePersistent added in v0.0.3

func (p *PublishFields) DeliveryModePersistent() *PublishFields

func (*PublishFields) DeliveryModeTransient added in v0.0.3

func (p *PublishFields) DeliveryModeTransient() *PublishFields

func (*PublishFields) SetContentType added in v0.0.3

func (p *PublishFields) SetContentType(contentType string) *PublishFields

func (*PublishFields) SetCorrelationID added in v0.0.3

func (p *PublishFields) SetCorrelationID(id string) *PublishFields

func (*PublishFields) SetDataTypeBytes added in v0.0.3

func (p *PublishFields) SetDataTypeBytes() *PublishFields

func (*PublishFields) SetDataTypeJSON added in v0.0.3

func (p *PublishFields) SetDataTypeJSON() *PublishFields

func (*PublishFields) SetExpiration added in v0.0.3

func (p *PublishFields) SetExpiration(dur time.Duration) *PublishFields

func (*PublishFields) SetImmediate added in v0.0.3

func (p *PublishFields) SetImmediate() *PublishFields

func (*PublishFields) SetMandatory added in v0.0.3

func (p *PublishFields) SetMandatory() *PublishFields

func (*PublishFields) SetReplyToID added in v0.0.3

func (p *PublishFields) SetReplyToID(id string) *PublishFields

type PublishFieldsImpl added in v0.0.16

type PublishFieldsImpl interface {
	SetDataTypeBytes() PublisherBuilderImpl
	SetDataTypeJSON() PublisherBuilderImpl
	SetContentType(contentType string) PublisherBuilderImpl
	DeliveryModePersistent() PublisherBuilderImpl
	DeliveryModeTransient() PublisherBuilderImpl
	AddHeader(key string, val interface{}) PublisherBuilderImpl
	SetCorrelationID(id string) PublisherBuilderImpl
	SetReplyToID(id string) PublisherBuilderImpl
	SetExpiration(dur time.Duration) PublisherBuilderImpl
	SetMandatory() PublisherBuilderImpl
	SetImmediate() PublisherBuilderImpl
}

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

func (*Publisher) Fields added in v0.0.10

func (p *Publisher) Fields() *PublishFields

func (*Publisher) Publish

func (p *Publisher) Publish(ctx context.Context, data interface{}) error

func (*Publisher) PublishAwaitResponse

func (p *Publisher) PublishAwaitResponse(
	ctx context.Context,
	data interface{},
	responseMap *genericSync.Map[chan amqp091.Delivery],
) (amqp091.Delivery, error)

func (*Publisher) PublishWithConfirmation

func (p *Publisher) PublishWithConfirmation(ctx context.Context, data interface{}) (bool, error)

func (*Publisher) WithFields added in v0.0.3

func (p *Publisher) WithFields(fields *PublishFields) *Publisher

type PublisherBuilder added in v0.0.10

type PublisherBuilder struct {
	// contains filtered or unexported fields
}

func (*PublisherBuilder) New added in v0.0.10

func (p *PublisherBuilder) New() *Publisher

func (*PublisherBuilder) NewWithDefaultFields added in v0.0.10

func (p *PublisherBuilder) NewWithDefaultFields() *Publisher

func (*PublisherBuilder) WithFields added in v0.0.10

func (p *PublisherBuilder) WithFields(fields *PublishFields) *PublisherBuilder

type PublisherBuilderImpl added in v0.0.16

type PublisherBuilderImpl interface {
	WithFields(fields *PublishFields) PublisherBuilderImpl
	New() PublisherImpl
	NewWithDefaultFields() PublisherImpl
}

type PublisherImpl added in v0.0.16

type PublisherImpl interface {
	WithFields(fields *PublishFields) PublisherImpl
	Fields() *PublishFields
	Publish(ctx context.Context, data interface{}) error
	PublishAwaitResponse(
		ctx context.Context,
		data interface{},
		responseMap *genericSync.Map[chan amqp091.Delivery],
	) (amqp091.Delivery, error)
	PublishWithConfirmation(ctx context.Context, data interface{}) (bool, error)
}

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

func (*Queue) BindToExchange

func (q *Queue) BindToExchange(
	exchange string,
	routingKey string,
	noWait bool,
	args map[string]interface{},
) error

func (*Queue) ConsumerCount added in v0.0.18

func (q *Queue) ConsumerCount() int

func (*Queue) Name added in v0.0.10

func (q *Queue) Name() string

type QueueBuilder

type QueueBuilder struct {
	// contains filtered or unexported fields
}

func (*QueueBuilder) AddArg

func (q *QueueBuilder) AddArg(key string, val interface{}) *QueueBuilder

func (*QueueBuilder) Declare

func (q *QueueBuilder) Declare() (*Queue, error)

func (*QueueBuilder) SetAutoDelete

func (q *QueueBuilder) SetAutoDelete() *QueueBuilder

func (*QueueBuilder) SetDurable

func (q *QueueBuilder) SetDurable() *QueueBuilder

func (*QueueBuilder) SetExclusive

func (q *QueueBuilder) SetExclusive() *QueueBuilder

func (*QueueBuilder) SetName

func (q *QueueBuilder) SetName(name string) *QueueBuilder

func (*QueueBuilder) SetNoWait

func (q *QueueBuilder) SetNoWait() *QueueBuilder

func (*QueueBuilder) SetPassive added in v0.0.19

func (q *QueueBuilder) SetPassive() *QueueBuilder

type QueueBuilderImpl added in v0.0.16

type QueueBuilderImpl interface {
	SetName(name string) QueueBuilderImpl
	SetDurable() QueueBuilderImpl
	SetAutoDelete() QueueBuilderImpl
	SetExclusive() QueueBuilderImpl
	SetNoWait() QueueBuilderImpl
	AddArg(key string, val interface{}) QueueBuilderImpl
	Declare() (QueueImpl, error)
}

type QueueImpl added in v0.0.16

type QueueImpl interface {
	Name() string
	BindToExchange(
		exchange string,
		routingKey string,
		noWait bool,
		args map[string]interface{},
	) error
}

type RMQ added in v0.0.10

type RMQ struct {
	// contains filtered or unexported fields
}

func New added in v0.0.10

func New(address string) *RMQ

func (*RMQ) Close added in v0.0.10

func (r *RMQ) Close() error

func (*RMQ) Connect added in v0.0.10

func (r *RMQ) Connect(retryCount int, retryDelay time.Duration, onRetryError func(err error)) (<-chan error, error)

func (*RMQ) IsConnected added in v0.0.25

func (r *RMQ) IsConnected() bool

func (*RMQ) IsHealthy added in v0.0.25

func (r *RMQ) IsHealthy() bool

func (*RMQ) NewChannel added in v0.0.10

func (r *RMQ) NewChannel() (*Channel, error)

func (*RMQ) NewChannelWithConfirm added in v0.0.10

func (r *RMQ) NewChannelWithConfirm() (*Channel, error)

func (*RMQ) SetOnError added in v0.0.10

func (r *RMQ) SetOnError(onError func(err error))

type RmqImpl added in v0.0.16

type RmqImpl interface {
	SetOnError(onError func(err error))
	Connect(retryCount int, retryDelay time.Duration, onRetryError func(err error)) (<-chan error, error)
	Close() error
	NewChannel() (ChannelImpl, error)
	NewChannelWithConfirm() (ChannelImpl, error)
	IsConnected() bool
	IsHealthy() bool
}

type RmqOptions added in v0.0.10

type RmqOptions struct {
	// contains filtered or unexported fields
}

func NewRmqOptions added in v0.0.10

func NewRmqOptions() *RmqOptions

func (*RmqOptions) SetReconnectDelay added in v0.0.10

func (r *RmqOptions) SetReconnectDelay(delay int) *RmqOptions

func (*RmqOptions) SetReconnectTries added in v0.0.10

func (r *RmqOptions) SetReconnectTries(tries int) *RmqOptions

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL