Documentation
¶
Index ¶
- Constants
- Variables
- func OnUpdate[T any](client *RMQ, queueName string, handler func(*T) error, opts *OnUpdateOptions) error
- type Channel
- func (c *Channel) Close() error
- func (c *Channel) CloseChan() <-chan error
- func (c *Channel) ConsumerBuilder(name, queue string) *ConsumerBuilder
- func (c *Channel) DirectExchangeBuilder(name string) *ExchangeBuilder
- func (c *Channel) ExchangeBuilder(name string) *ExchangeBuilder
- func (c *Channel) FanoutExchangeBuilder(name string) *ExchangeBuilder
- func (c *Channel) IsHealthy() bool
- func (c *Channel) PublisherBuilder(exchange string, routingKey string) *PublisherBuilder
- func (c *Channel) QueueBuilder() *QueueBuilder
- func (c *Channel) TopicExchangeBuilder(name string) *ExchangeBuilder
- type ChannelImpl
- type Consumer
- type ConsumerBuilder
- func (c *ConsumerBuilder) AddArg(key string, val interface{}) *ConsumerBuilder
- func (c *ConsumerBuilder) Build() (*Consumer, error)
- func (c *ConsumerBuilder) SetAutoAck() *ConsumerBuilder
- func (c *ConsumerBuilder) SetExclusive() *ConsumerBuilder
- func (c *ConsumerBuilder) SetNoLocal() *ConsumerBuilder
- func (c *ConsumerBuilder) SetNoWait() *ConsumerBuilder
- func (c *ConsumerBuilder) SetPrefetch(prefetch int) *ConsumerBuilder
- type ConsumerBuilderImpl
- type ConsumerImpl
- type Delivery
- type DeliveryImpl
- type ExchangeBuilder
- func (e *ExchangeBuilder) AddArg(key string, val interface{}) *ExchangeBuilder
- func (e *ExchangeBuilder) Declare() error
- func (e *ExchangeBuilder) DeleteOnDeclare(ifUnused, noWait bool) *ExchangeBuilder
- func (e *ExchangeBuilder) SetAutoDelete() *ExchangeBuilder
- func (e *ExchangeBuilder) SetDurable() *ExchangeBuilder
- func (e *ExchangeBuilder) SetInternal() *ExchangeBuilder
- func (e *ExchangeBuilder) SetNoWait() *ExchangeBuilder
- type ExchangeBuilderImpl
- type OnUpdateOptions
- func (oup *OnUpdateOptions) SetCreateQueue(createQueue, isDurable bool) *OnUpdateOptions
- func (oup *OnUpdateOptions) SetLogger(logger *slog.Logger) *OnUpdateOptions
- func (oup *OnUpdateOptions) SetPrefetchCount(count int) *OnUpdateOptions
- func (oup *OnUpdateOptions) SetReturnOnHandlerError(returnOnHandlerError bool) *OnUpdateOptions
- func (oup *OnUpdateOptions) SetReturnOnUnmarshalError(returnOnUnmarshalError bool) *OnUpdateOptions
- type PublishFields
- func (p *PublishFields) AddHeader(key string, val interface{}) *PublishFields
- func (p *PublishFields) DeliveryModePersistent() *PublishFields
- func (p *PublishFields) DeliveryModeTransient() *PublishFields
- func (p *PublishFields) SetContentType(contentType string) *PublishFields
- func (p *PublishFields) SetCorrelationID(id string) *PublishFields
- func (p *PublishFields) SetDataTypeBytes() *PublishFields
- func (p *PublishFields) SetDataTypeJSON() *PublishFields
- func (p *PublishFields) SetExpiration(dur time.Duration) *PublishFields
- func (p *PublishFields) SetImmediate() *PublishFields
- func (p *PublishFields) SetMandatory() *PublishFields
- func (p *PublishFields) SetReplyToID(id string) *PublishFields
- type PublishFieldsImpl
- type Publisher
- func (p *Publisher) Fields() *PublishFields
- func (p *Publisher) Publish(ctx context.Context, data interface{}) error
- func (p *Publisher) PublishAwaitResponse(ctx context.Context, data interface{}, ...) (amqp091.Delivery, error)
- func (p *Publisher) PublishWithConfirmation(ctx context.Context, data interface{}) (bool, error)
- func (p *Publisher) WithFields(fields *PublishFields) *Publisher
- type PublisherBuilder
- type PublisherBuilderImpl
- type PublisherImpl
- type Queue
- type QueueBuilder
- func (q *QueueBuilder) AddArg(key string, val interface{}) *QueueBuilder
- func (q *QueueBuilder) Declare() (*Queue, error)
- func (q *QueueBuilder) SetAutoDelete() *QueueBuilder
- func (q *QueueBuilder) SetDurable() *QueueBuilder
- func (q *QueueBuilder) SetExclusive() *QueueBuilder
- func (q *QueueBuilder) SetName(name string) *QueueBuilder
- func (q *QueueBuilder) SetNoWait() *QueueBuilder
- func (q *QueueBuilder) SetPassive() *QueueBuilder
- type QueueBuilderImpl
- type QueueImpl
- type RMQ
- func (r *RMQ) Close() error
- func (r *RMQ) Connect(retryCount int, retryDelay time.Duration, onRetryError func(err error)) (<-chan error, error)
- func (r *RMQ) IsConnected() bool
- func (r *RMQ) IsHealthy() bool
- func (r *RMQ) NewChannel() (*Channel, error)
- func (r *RMQ) NewChannelWithConfirm() (*Channel, error)
- func (r *RMQ) SetOnError(onError func(err error))
- type RmqImpl
- type RmqOptions
Constants ¶
View Source
const ( DataTypeBytes dataType = iota DataTypeJSON )
Variables ¶
View Source
var ( ConnectionNotSetError = errors.New("connection_is_not_set") DataIsNotBytesError = errors.New("data_is_not_of_bytes_type") ResponseMapNotSetError = errors.New("response_map_not_set") CorrelationIdNotSetError = errors.New("correlation_id_not_set") PublishResponseInvalidReplyToIdError = errors.New("publish_response_invalid_reply_to_id") )
View Source
var ConnectionClosedError = fmt.Errorf("connection_closed")
View Source
var (
ErrConnectionClosed = fmt.Errorf("connection closed")
)
Functions ¶
Types ¶
type Channel ¶ added in v0.0.10
type Channel struct {
// contains filtered or unexported fields
}
func (*Channel) ConsumerBuilder ¶ added in v0.0.10
func (c *Channel) ConsumerBuilder(name, queue string) *ConsumerBuilder
func (*Channel) DirectExchangeBuilder ¶ added in v0.0.11
func (c *Channel) DirectExchangeBuilder(name string) *ExchangeBuilder
func (*Channel) ExchangeBuilder ¶ added in v0.0.11
func (c *Channel) ExchangeBuilder(name string) *ExchangeBuilder
func (*Channel) FanoutExchangeBuilder ¶ added in v0.0.11
func (c *Channel) FanoutExchangeBuilder(name string) *ExchangeBuilder
func (*Channel) PublisherBuilder ¶ added in v0.0.10
func (c *Channel) PublisherBuilder(exchange string, routingKey string) *PublisherBuilder
func (*Channel) QueueBuilder ¶ added in v0.0.10
func (c *Channel) QueueBuilder() *QueueBuilder
func (*Channel) TopicExchangeBuilder ¶ added in v0.0.11
func (c *Channel) TopicExchangeBuilder(name string) *ExchangeBuilder
type ChannelImpl ¶ added in v0.0.16
type ChannelImpl interface {
PublisherBuilder(exchange string, routingKey string) PublisherBuilderImpl
ConsumerBuilder(name, queue string) ConsumerBuilderImpl
QueueBuilder() QueueBuilderImpl
ExchangeBuilder(name string) ExchangeBuilderImpl
FanoutExchangeBuilder(name string) ExchangeBuilderImpl
DirectExchangeBuilder(name string) ExchangeBuilderImpl
TopicExchangeBuilder(name string) ExchangeBuilderImpl
CloseChan() <-chan error
Close() error
IsHealthy() bool
}
type ConsumerBuilder ¶ added in v0.0.10
type ConsumerBuilder struct {
// contains filtered or unexported fields
}
func (*ConsumerBuilder) AddArg ¶ added in v0.0.10
func (c *ConsumerBuilder) AddArg(key string, val interface{}) *ConsumerBuilder
func (*ConsumerBuilder) Build ¶ added in v0.0.10
func (c *ConsumerBuilder) Build() (*Consumer, error)
func (*ConsumerBuilder) SetAutoAck ¶ added in v0.0.10
func (c *ConsumerBuilder) SetAutoAck() *ConsumerBuilder
func (*ConsumerBuilder) SetExclusive ¶ added in v0.0.10
func (c *ConsumerBuilder) SetExclusive() *ConsumerBuilder
func (*ConsumerBuilder) SetNoLocal ¶ added in v0.0.10
func (c *ConsumerBuilder) SetNoLocal() *ConsumerBuilder
func (*ConsumerBuilder) SetNoWait ¶ added in v0.0.10
func (c *ConsumerBuilder) SetNoWait() *ConsumerBuilder
func (*ConsumerBuilder) SetPrefetch ¶ added in v0.0.12
func (c *ConsumerBuilder) SetPrefetch(prefetch int) *ConsumerBuilder
type ConsumerBuilderImpl ¶ added in v0.0.16
type ConsumerBuilderImpl interface {
SetAutoAck() ConsumerBuilderImpl
SetExclusive() ConsumerBuilderImpl
SetNoLocal() ConsumerBuilderImpl
SetNoWait() ConsumerBuilderImpl
SetPrefetch(prefetch int) ConsumerBuilderImpl
AddArg(key string, val interface{}) ConsumerBuilderImpl
Build() (ConsumerImpl, error)
}
type ConsumerImpl ¶ added in v0.0.16
type DeliveryImpl ¶ added in v0.0.25
type ExchangeBuilder ¶ added in v0.0.10
type ExchangeBuilder struct {
// contains filtered or unexported fields
}
func (*ExchangeBuilder) AddArg ¶ added in v0.0.10
func (e *ExchangeBuilder) AddArg(key string, val interface{}) *ExchangeBuilder
func (*ExchangeBuilder) Declare ¶ added in v0.0.10
func (e *ExchangeBuilder) Declare() error
func (*ExchangeBuilder) DeleteOnDeclare ¶ added in v0.0.10
func (e *ExchangeBuilder) DeleteOnDeclare(ifUnused, noWait bool) *ExchangeBuilder
func (*ExchangeBuilder) SetAutoDelete ¶ added in v0.0.10
func (e *ExchangeBuilder) SetAutoDelete() *ExchangeBuilder
func (*ExchangeBuilder) SetDurable ¶ added in v0.0.10
func (e *ExchangeBuilder) SetDurable() *ExchangeBuilder
func (*ExchangeBuilder) SetInternal ¶ added in v0.0.10
func (e *ExchangeBuilder) SetInternal() *ExchangeBuilder
func (*ExchangeBuilder) SetNoWait ¶ added in v0.0.10
func (e *ExchangeBuilder) SetNoWait() *ExchangeBuilder
type ExchangeBuilderImpl ¶ added in v0.0.16
type ExchangeBuilderImpl interface {
DeleteOnDeclare(ifUnused, noWait bool) ExchangeBuilderImpl
SetDurable() ExchangeBuilderImpl
SetAutoDelete() ExchangeBuilderImpl
SetInternal() ExchangeBuilderImpl
SetNoWait() ExchangeBuilderImpl
AddArg(key string, val interface{}) ExchangeBuilderImpl
Declare() error
}
type OnUpdateOptions ¶ added in v0.0.26
type OnUpdateOptions struct {
// contains filtered or unexported fields
}
func NewOnUpdateOptions ¶ added in v0.0.26
func NewOnUpdateOptions() *OnUpdateOptions
func (*OnUpdateOptions) SetCreateQueue ¶ added in v0.0.26
func (oup *OnUpdateOptions) SetCreateQueue(createQueue, isDurable bool) *OnUpdateOptions
func (*OnUpdateOptions) SetLogger ¶ added in v0.0.26
func (oup *OnUpdateOptions) SetLogger(logger *slog.Logger) *OnUpdateOptions
func (*OnUpdateOptions) SetPrefetchCount ¶ added in v0.0.26
func (oup *OnUpdateOptions) SetPrefetchCount(count int) *OnUpdateOptions
func (*OnUpdateOptions) SetReturnOnHandlerError ¶ added in v0.0.26
func (oup *OnUpdateOptions) SetReturnOnHandlerError(returnOnHandlerError bool) *OnUpdateOptions
func (*OnUpdateOptions) SetReturnOnUnmarshalError ¶ added in v0.0.26
func (oup *OnUpdateOptions) SetReturnOnUnmarshalError(returnOnUnmarshalError bool) *OnUpdateOptions
type PublishFields ¶ added in v0.0.3
type PublishFields struct {
// contains filtered or unexported fields
}
func NewPublishFields ¶ added in v0.0.3
func NewPublishFields() *PublishFields
func (*PublishFields) AddHeader ¶ added in v0.0.3
func (p *PublishFields) AddHeader(key string, val interface{}) *PublishFields
func (*PublishFields) DeliveryModePersistent ¶ added in v0.0.3
func (p *PublishFields) DeliveryModePersistent() *PublishFields
func (*PublishFields) DeliveryModeTransient ¶ added in v0.0.3
func (p *PublishFields) DeliveryModeTransient() *PublishFields
func (*PublishFields) SetContentType ¶ added in v0.0.3
func (p *PublishFields) SetContentType(contentType string) *PublishFields
func (*PublishFields) SetCorrelationID ¶ added in v0.0.3
func (p *PublishFields) SetCorrelationID(id string) *PublishFields
func (*PublishFields) SetDataTypeBytes ¶ added in v0.0.3
func (p *PublishFields) SetDataTypeBytes() *PublishFields
func (*PublishFields) SetDataTypeJSON ¶ added in v0.0.3
func (p *PublishFields) SetDataTypeJSON() *PublishFields
func (*PublishFields) SetExpiration ¶ added in v0.0.3
func (p *PublishFields) SetExpiration(dur time.Duration) *PublishFields
func (*PublishFields) SetImmediate ¶ added in v0.0.3
func (p *PublishFields) SetImmediate() *PublishFields
func (*PublishFields) SetMandatory ¶ added in v0.0.3
func (p *PublishFields) SetMandatory() *PublishFields
func (*PublishFields) SetReplyToID ¶ added in v0.0.3
func (p *PublishFields) SetReplyToID(id string) *PublishFields
type PublishFieldsImpl ¶ added in v0.0.16
type PublishFieldsImpl interface {
SetDataTypeBytes() PublisherBuilderImpl
SetDataTypeJSON() PublisherBuilderImpl
SetContentType(contentType string) PublisherBuilderImpl
DeliveryModePersistent() PublisherBuilderImpl
DeliveryModeTransient() PublisherBuilderImpl
AddHeader(key string, val interface{}) PublisherBuilderImpl
SetCorrelationID(id string) PublisherBuilderImpl
SetReplyToID(id string) PublisherBuilderImpl
SetExpiration(dur time.Duration) PublisherBuilderImpl
SetMandatory() PublisherBuilderImpl
SetImmediate() PublisherBuilderImpl
}
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
func (*Publisher) Fields ¶ added in v0.0.10
func (p *Publisher) Fields() *PublishFields
func (*Publisher) PublishAwaitResponse ¶
func (*Publisher) PublishWithConfirmation ¶
func (*Publisher) WithFields ¶ added in v0.0.3
func (p *Publisher) WithFields(fields *PublishFields) *Publisher
type PublisherBuilder ¶ added in v0.0.10
type PublisherBuilder struct {
// contains filtered or unexported fields
}
func (*PublisherBuilder) New ¶ added in v0.0.10
func (p *PublisherBuilder) New() *Publisher
func (*PublisherBuilder) NewWithDefaultFields ¶ added in v0.0.10
func (p *PublisherBuilder) NewWithDefaultFields() *Publisher
func (*PublisherBuilder) WithFields ¶ added in v0.0.10
func (p *PublisherBuilder) WithFields(fields *PublishFields) *PublisherBuilder
type PublisherBuilderImpl ¶ added in v0.0.16
type PublisherBuilderImpl interface {
WithFields(fields *PublishFields) PublisherBuilderImpl
New() PublisherImpl
NewWithDefaultFields() PublisherImpl
}
type PublisherImpl ¶ added in v0.0.16
type PublisherImpl interface {
WithFields(fields *PublishFields) PublisherImpl
Fields() *PublishFields
Publish(ctx context.Context, data interface{}) error
PublishAwaitResponse(
ctx context.Context,
data interface{},
responseMap *genericSync.Map[chan amqp091.Delivery],
) (amqp091.Delivery, error)
PublishWithConfirmation(ctx context.Context, data interface{}) (bool, error)
}
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
func (*Queue) BindToExchange ¶
func (*Queue) ConsumerCount ¶ added in v0.0.18
type QueueBuilder ¶
type QueueBuilder struct {
// contains filtered or unexported fields
}
func (*QueueBuilder) AddArg ¶
func (q *QueueBuilder) AddArg(key string, val interface{}) *QueueBuilder
func (*QueueBuilder) Declare ¶
func (q *QueueBuilder) Declare() (*Queue, error)
func (*QueueBuilder) SetAutoDelete ¶
func (q *QueueBuilder) SetAutoDelete() *QueueBuilder
func (*QueueBuilder) SetDurable ¶
func (q *QueueBuilder) SetDurable() *QueueBuilder
func (*QueueBuilder) SetExclusive ¶
func (q *QueueBuilder) SetExclusive() *QueueBuilder
func (*QueueBuilder) SetName ¶
func (q *QueueBuilder) SetName(name string) *QueueBuilder
func (*QueueBuilder) SetNoWait ¶
func (q *QueueBuilder) SetNoWait() *QueueBuilder
func (*QueueBuilder) SetPassive ¶ added in v0.0.19
func (q *QueueBuilder) SetPassive() *QueueBuilder
type QueueBuilderImpl ¶ added in v0.0.16
type QueueBuilderImpl interface {
SetName(name string) QueueBuilderImpl
SetDurable() QueueBuilderImpl
SetAutoDelete() QueueBuilderImpl
SetExclusive() QueueBuilderImpl
SetNoWait() QueueBuilderImpl
AddArg(key string, val interface{}) QueueBuilderImpl
Declare() (QueueImpl, error)
}
type RMQ ¶ added in v0.0.10
type RMQ struct {
// contains filtered or unexported fields
}
func (*RMQ) IsConnected ¶ added in v0.0.25
func (*RMQ) NewChannel ¶ added in v0.0.10
func (*RMQ) NewChannelWithConfirm ¶ added in v0.0.10
func (*RMQ) SetOnError ¶ added in v0.0.10
type RmqOptions ¶ added in v0.0.10
type RmqOptions struct {
// contains filtered or unexported fields
}
func NewRmqOptions ¶ added in v0.0.10
func NewRmqOptions() *RmqOptions
func (*RmqOptions) SetReconnectDelay ¶ added in v0.0.10
func (r *RmqOptions) SetReconnectDelay(delay int) *RmqOptions
func (*RmqOptions) SetReconnectTries ¶ added in v0.0.10
func (r *RmqOptions) SetReconnectTries(tries int) *RmqOptions
Source Files
¶
Click to show internal directories.
Click to hide internal directories.