mirror of
https://github.com/crazy-max/diun.git
synced 2024-12-22 19:38:28 +00:00
1829 lines
58 KiB
Go
1829 lines
58 KiB
Go
// Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved.
|
|
// Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd.
|
|
// Use of this source code is governed by a BSD-style
|
|
// license that can be found in the LICENSE file.
|
|
|
|
package amqp091
|
|
|
|
import (
|
|
"context"
|
|
"reflect"
|
|
"sync"
|
|
"sync/atomic"
|
|
)
|
|
|
|
// 0 1 3 7 size+7 size+8
|
|
// +------+---------+-------------+ +------------+ +-----------+
|
|
// | type | channel | size | | payload | | frame-end |
|
|
// +------+---------+-------------+ +------------+ +-----------+
|
|
//
|
|
// octet short long size octets octet
|
|
const frameHeaderSize = 1 + 2 + 4 + 1
|
|
|
|
/*
|
|
Channel represents an AMQP channel. Used as a context for valid message
|
|
exchange. Errors on methods with this Channel as a receiver means this channel
|
|
should be discarded and a new channel established.
|
|
*/
|
|
type Channel struct {
|
|
destructor sync.Once
|
|
m sync.Mutex // struct field mutex
|
|
confirmM sync.Mutex // publisher confirms state mutex
|
|
notifyM sync.RWMutex
|
|
|
|
connection *Connection
|
|
|
|
rpc chan message
|
|
consumers *consumers
|
|
|
|
id uint16
|
|
|
|
// closed is set to 1 when the channel has been closed - see Channel.send()
|
|
closed int32
|
|
close chan struct{}
|
|
|
|
// true when we will never notify again
|
|
noNotify bool
|
|
|
|
// Channel and Connection exceptions will be broadcast on these listeners.
|
|
closes []chan *Error
|
|
|
|
// Listeners for active=true flow control. When true is sent to a listener,
|
|
// publishing should pause until false is sent to listeners.
|
|
flows []chan bool
|
|
|
|
// Listeners for returned publishings for unroutable messages on mandatory
|
|
// publishings or undeliverable messages on immediate publishings.
|
|
returns []chan Return
|
|
|
|
// Listeners for when the server notifies the client that
|
|
// a consumer has been cancelled.
|
|
cancels []chan string
|
|
|
|
// Allocated when in confirm mode in order to track publish counter and order confirms
|
|
confirms *confirms
|
|
confirming bool
|
|
|
|
// Selects on any errors from shutdown during RPC
|
|
errors chan *Error
|
|
|
|
// State machine that manages frame order, must only be mutated by the connection
|
|
recv func(*Channel, frame)
|
|
|
|
// Current state for frame re-assembly, only mutated from recv
|
|
message messageWithContent
|
|
header *headerFrame
|
|
body []byte
|
|
}
|
|
|
|
// Constructs a new channel with the given framing rules
|
|
func newChannel(c *Connection, id uint16) *Channel {
|
|
return &Channel{
|
|
connection: c,
|
|
id: id,
|
|
rpc: make(chan message),
|
|
consumers: makeConsumers(),
|
|
confirms: newConfirms(),
|
|
recv: (*Channel).recvMethod,
|
|
errors: make(chan *Error, 1),
|
|
close: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
// Signal that from now on, Channel.send() should call Channel.sendClosed()
|
|
func (ch *Channel) setClosed() {
|
|
atomic.StoreInt32(&ch.closed, 1)
|
|
}
|
|
|
|
// shutdown is called by Connection after the channel has been removed from the
|
|
// connection registry.
|
|
func (ch *Channel) shutdown(e *Error) {
|
|
ch.setClosed()
|
|
|
|
ch.destructor.Do(func() {
|
|
ch.m.Lock()
|
|
defer ch.m.Unlock()
|
|
|
|
// Grab an exclusive lock for the notify channels
|
|
ch.notifyM.Lock()
|
|
defer ch.notifyM.Unlock()
|
|
|
|
// Broadcast abnormal shutdown
|
|
if e != nil {
|
|
for _, c := range ch.closes {
|
|
c <- e
|
|
}
|
|
// Notify RPC if we're selecting
|
|
ch.errors <- e
|
|
}
|
|
|
|
ch.consumers.close()
|
|
|
|
for _, c := range ch.closes {
|
|
close(c)
|
|
}
|
|
|
|
for _, c := range ch.flows {
|
|
close(c)
|
|
}
|
|
|
|
for _, c := range ch.returns {
|
|
close(c)
|
|
}
|
|
|
|
for _, c := range ch.cancels {
|
|
close(c)
|
|
}
|
|
|
|
// Set the slices to nil to prevent the dispatch() range from sending on
|
|
// the now closed channels after we release the notifyM mutex
|
|
ch.flows = nil
|
|
ch.closes = nil
|
|
ch.returns = nil
|
|
ch.cancels = nil
|
|
|
|
if ch.confirms != nil {
|
|
ch.confirms.Close()
|
|
}
|
|
|
|
close(ch.errors)
|
|
close(ch.close)
|
|
ch.noNotify = true
|
|
})
|
|
}
|
|
|
|
// send calls Channel.sendOpen() during normal operation.
|
|
//
|
|
// After the channel has been closed, send calls Channel.sendClosed(), ensuring
|
|
// only 'channel.close' is sent to the server.
|
|
func (ch *Channel) send(msg message) (err error) {
|
|
// If the channel is closed, use Channel.sendClosed()
|
|
if ch.IsClosed() {
|
|
return ch.sendClosed(msg)
|
|
}
|
|
|
|
return ch.sendOpen(msg)
|
|
}
|
|
|
|
func (ch *Channel) open() error {
|
|
return ch.call(&channelOpen{}, &channelOpenOk{})
|
|
}
|
|
|
|
// Performs a request/response call for when the message is not NoWait and is
|
|
// specified as Synchronous.
|
|
func (ch *Channel) call(req message, res ...message) error {
|
|
if err := ch.send(req); err != nil {
|
|
return err
|
|
}
|
|
|
|
if req.wait() {
|
|
select {
|
|
case e, ok := <-ch.errors:
|
|
if ok {
|
|
return e
|
|
}
|
|
return ErrClosed
|
|
|
|
case msg := <-ch.rpc:
|
|
if msg != nil {
|
|
for _, try := range res {
|
|
if reflect.TypeOf(msg) == reflect.TypeOf(try) {
|
|
// *res = *msg
|
|
vres := reflect.ValueOf(try).Elem()
|
|
vmsg := reflect.ValueOf(msg).Elem()
|
|
vres.Set(vmsg)
|
|
return nil
|
|
}
|
|
}
|
|
return ErrCommandInvalid
|
|
}
|
|
// RPC channel has been closed without an error, likely due to a hard
|
|
// error on the Connection. This indicates we have already been
|
|
// shutdown and if were waiting, will have returned from the errors chan.
|
|
return ErrClosed
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (ch *Channel) sendClosed(msg message) (err error) {
|
|
// After a 'channel.close' is sent or received the only valid response is
|
|
// channel.close-ok
|
|
if _, ok := msg.(*channelCloseOk); ok {
|
|
return ch.connection.send(&methodFrame{
|
|
ChannelId: ch.id,
|
|
Method: msg,
|
|
})
|
|
}
|
|
|
|
return ErrClosed
|
|
}
|
|
|
|
func (ch *Channel) sendOpen(msg message) (err error) {
|
|
if content, ok := msg.(messageWithContent); ok {
|
|
props, body := content.getContent()
|
|
class, _ := content.id()
|
|
|
|
// catch client max frame size==0 and server max frame size==0
|
|
// set size to length of what we're trying to publish
|
|
var size int
|
|
if ch.connection.Config.FrameSize > 0 {
|
|
size = ch.connection.Config.FrameSize - frameHeaderSize
|
|
} else {
|
|
size = len(body)
|
|
}
|
|
|
|
// If the channel is closed, use Channel.sendClosed()
|
|
if ch.IsClosed() {
|
|
return ch.sendClosed(msg)
|
|
}
|
|
|
|
// Flush the buffer only after all the Frames that comprise the Message
|
|
// have been written to maximise benefits of using a buffered writer.
|
|
defer func() {
|
|
if endError := ch.connection.endSendUnflushed(); endError != nil {
|
|
if err == nil {
|
|
err = endError
|
|
}
|
|
}
|
|
}()
|
|
|
|
// We use sendUnflushed() in this method as sending the message requires
|
|
// sending multiple Frames (methodFrame, headerFrame, N x bodyFrame).
|
|
// Flushing after each Frame is inefficient, as it negates much of the
|
|
// benefit of using a buffered writer and results in more syscalls than
|
|
// necessary. Flushing buffers after every frame can have a significant
|
|
// performance impact when sending (e.g. basicPublish) small messages,
|
|
// so sendUnflushed() performs an *Unflushed* write, but is otherwise
|
|
// equivalent to the send() method. We later use the separate flush
|
|
// method to explicitly flush the buffer after all Frames are written.
|
|
if err = ch.connection.sendUnflushed(&methodFrame{
|
|
ChannelId: ch.id,
|
|
Method: content,
|
|
}); err != nil {
|
|
return
|
|
}
|
|
|
|
if err = ch.connection.sendUnflushed(&headerFrame{
|
|
ChannelId: ch.id,
|
|
ClassId: class,
|
|
Size: uint64(len(body)),
|
|
Properties: props,
|
|
}); err != nil {
|
|
return
|
|
}
|
|
|
|
// chunk body into size (max frame size - frame header size)
|
|
for i, j := 0, size; i < len(body); i, j = j, j+size {
|
|
if j > len(body) {
|
|
j = len(body)
|
|
}
|
|
|
|
if err = ch.connection.sendUnflushed(&bodyFrame{
|
|
ChannelId: ch.id,
|
|
Body: body[i:j],
|
|
}); err != nil {
|
|
return
|
|
}
|
|
}
|
|
} else {
|
|
// If the channel is closed, use Channel.sendClosed()
|
|
if ch.IsClosed() {
|
|
return ch.sendClosed(msg)
|
|
}
|
|
|
|
err = ch.connection.send(&methodFrame{
|
|
ChannelId: ch.id,
|
|
Method: msg,
|
|
})
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// Eventually called via the state machine from the connection's reader
|
|
// goroutine, so assumes serialized access.
|
|
func (ch *Channel) dispatch(msg message) {
|
|
switch m := msg.(type) {
|
|
case *channelClose:
|
|
// Note: channel state is set to closed immedately after the message is
|
|
// decoded by the Connection
|
|
|
|
// lock before sending connection.close-ok
|
|
// to avoid unexpected interleaving with basic.publish frames if
|
|
// publishing is happening concurrently
|
|
ch.m.Lock()
|
|
if err := ch.send(&channelCloseOk{}); err != nil {
|
|
Logger.Printf("error sending channelCloseOk, channel id: %d error: %+v", ch.id, err)
|
|
}
|
|
ch.m.Unlock()
|
|
ch.connection.closeChannel(ch, newError(m.ReplyCode, m.ReplyText))
|
|
|
|
case *channelFlow:
|
|
ch.notifyM.RLock()
|
|
for _, c := range ch.flows {
|
|
c <- m.Active
|
|
}
|
|
ch.notifyM.RUnlock()
|
|
if err := ch.send(&channelFlowOk{Active: m.Active}); err != nil {
|
|
Logger.Printf("error sending channelFlowOk, channel id: %d error: %+v", ch.id, err)
|
|
}
|
|
|
|
case *basicCancel:
|
|
ch.notifyM.RLock()
|
|
for _, c := range ch.cancels {
|
|
c <- m.ConsumerTag
|
|
}
|
|
ch.notifyM.RUnlock()
|
|
ch.consumers.cancel(m.ConsumerTag)
|
|
|
|
case *basicReturn:
|
|
ret := newReturn(*m)
|
|
ch.notifyM.RLock()
|
|
for _, c := range ch.returns {
|
|
c <- *ret
|
|
}
|
|
ch.notifyM.RUnlock()
|
|
|
|
case *basicAck:
|
|
if ch.confirming {
|
|
if m.Multiple {
|
|
ch.confirms.Multiple(Confirmation{m.DeliveryTag, true})
|
|
} else {
|
|
ch.confirms.One(Confirmation{m.DeliveryTag, true})
|
|
}
|
|
}
|
|
|
|
case *basicNack:
|
|
if ch.confirming {
|
|
if m.Multiple {
|
|
ch.confirms.Multiple(Confirmation{m.DeliveryTag, false})
|
|
} else {
|
|
ch.confirms.One(Confirmation{m.DeliveryTag, false})
|
|
}
|
|
}
|
|
|
|
case *basicDeliver:
|
|
ch.consumers.send(m.ConsumerTag, newDelivery(ch, m))
|
|
// TODO log failed consumer and close channel, this can happen when
|
|
// deliveries are in flight and a no-wait cancel has happened
|
|
|
|
default:
|
|
select {
|
|
case <-ch.close:
|
|
return
|
|
case ch.rpc <- msg:
|
|
}
|
|
}
|
|
}
|
|
|
|
func (ch *Channel) transition(f func(*Channel, frame)) {
|
|
ch.recv = f
|
|
}
|
|
|
|
func (ch *Channel) recvMethod(f frame) {
|
|
switch frame := f.(type) {
|
|
case *methodFrame:
|
|
if msg, ok := frame.Method.(messageWithContent); ok {
|
|
ch.body = make([]byte, 0)
|
|
ch.message = msg
|
|
ch.transition((*Channel).recvHeader)
|
|
return
|
|
}
|
|
|
|
ch.dispatch(frame.Method) // termination state
|
|
ch.transition((*Channel).recvMethod)
|
|
|
|
case *headerFrame:
|
|
// drop
|
|
ch.transition((*Channel).recvMethod)
|
|
|
|
case *bodyFrame:
|
|
// drop
|
|
ch.transition((*Channel).recvMethod)
|
|
|
|
default:
|
|
panic("unexpected frame type")
|
|
}
|
|
}
|
|
|
|
func (ch *Channel) recvHeader(f frame) {
|
|
switch frame := f.(type) {
|
|
case *methodFrame:
|
|
// interrupt content and handle method
|
|
ch.recvMethod(f)
|
|
|
|
case *headerFrame:
|
|
// start collecting if we expect body frames
|
|
ch.header = frame
|
|
|
|
if frame.Size == 0 {
|
|
ch.message.setContent(ch.header.Properties, ch.body)
|
|
ch.dispatch(ch.message) // termination state
|
|
ch.transition((*Channel).recvMethod)
|
|
return
|
|
}
|
|
ch.transition((*Channel).recvContent)
|
|
|
|
case *bodyFrame:
|
|
// drop and reset
|
|
ch.transition((*Channel).recvMethod)
|
|
|
|
default:
|
|
panic("unexpected frame type")
|
|
}
|
|
}
|
|
|
|
// state after method + header and before the length
|
|
// defined by the header has been reached
|
|
func (ch *Channel) recvContent(f frame) {
|
|
switch frame := f.(type) {
|
|
case *methodFrame:
|
|
// interrupt content and handle method
|
|
ch.recvMethod(f)
|
|
|
|
case *headerFrame:
|
|
// drop and reset
|
|
ch.transition((*Channel).recvMethod)
|
|
|
|
case *bodyFrame:
|
|
if cap(ch.body) == 0 {
|
|
ch.body = make([]byte, 0, ch.header.Size)
|
|
}
|
|
ch.body = append(ch.body, frame.Body...)
|
|
|
|
if uint64(len(ch.body)) >= ch.header.Size {
|
|
ch.message.setContent(ch.header.Properties, ch.body)
|
|
ch.dispatch(ch.message) // termination state
|
|
ch.transition((*Channel).recvMethod)
|
|
return
|
|
}
|
|
|
|
ch.transition((*Channel).recvContent)
|
|
|
|
default:
|
|
panic("unexpected frame type")
|
|
}
|
|
}
|
|
|
|
/*
|
|
Close initiate a clean channel closure by sending a close message with the error
|
|
code set to '200'.
|
|
|
|
It is safe to call this method multiple times.
|
|
*/
|
|
func (ch *Channel) Close() error {
|
|
if ch.IsClosed() {
|
|
return nil
|
|
}
|
|
|
|
defer ch.connection.closeChannel(ch, nil)
|
|
return ch.call(
|
|
&channelClose{ReplyCode: replySuccess},
|
|
&channelCloseOk{},
|
|
)
|
|
}
|
|
|
|
// IsClosed returns true if the channel is marked as closed, otherwise false
|
|
// is returned.
|
|
func (ch *Channel) IsClosed() bool {
|
|
return atomic.LoadInt32(&ch.closed) == 1
|
|
}
|
|
|
|
/*
|
|
NotifyClose registers a listener for when the server sends a channel or
|
|
connection exception in the form of a Connection.Close or Channel.Close method.
|
|
Connection exceptions will be broadcast to all open channels and all channels
|
|
will be closed, where channel exceptions will only be broadcast to listeners to
|
|
this channel.
|
|
|
|
The chan provided will be closed when the Channel is closed and on a
|
|
graceful close, no error will be sent.
|
|
|
|
In case of a non graceful close the error will be notified synchronously by the library
|
|
so that it will be necessary to consume the Channel from the caller in order to avoid deadlocks
|
|
*/
|
|
func (ch *Channel) NotifyClose(c chan *Error) chan *Error {
|
|
ch.notifyM.Lock()
|
|
defer ch.notifyM.Unlock()
|
|
|
|
if ch.noNotify {
|
|
close(c)
|
|
} else {
|
|
ch.closes = append(ch.closes, c)
|
|
}
|
|
|
|
return c
|
|
}
|
|
|
|
/*
|
|
NotifyFlow registers a listener for basic.flow methods sent by the server.
|
|
When `false` is sent on one of the listener channels, all publishers should
|
|
pause until a `true` is sent.
|
|
|
|
The server may ask the producer to pause or restart the flow of Publishings
|
|
sent by on a channel. This is a simple flow-control mechanism that a server can
|
|
use to avoid overflowing its queues or otherwise finding itself receiving more
|
|
messages than it can process. Note that this method is not intended for window
|
|
control. It does not affect contents returned by basic.get-ok methods.
|
|
|
|
When a new channel is opened, it is active (flow is active). Some
|
|
applications assume that channels are inactive until started. To emulate
|
|
this behavior a client MAY open the channel, then pause it.
|
|
|
|
Publishers should respond to a flow messages as rapidly as possible and the
|
|
server may disconnect over producing channels that do not respect these
|
|
messages.
|
|
|
|
basic.flow-ok methods will always be returned to the server regardless of
|
|
the number of listeners there are.
|
|
|
|
To control the flow of deliveries from the server, use the Channel.Flow()
|
|
method instead.
|
|
|
|
Note: RabbitMQ will rather use TCP pushback on the network connection instead
|
|
of sending basic.flow. This means that if a single channel is producing too
|
|
much on the same connection, all channels using that connection will suffer,
|
|
including acknowledgments from deliveries. Use different Connections if you
|
|
desire to interleave consumers and producers in the same process to avoid your
|
|
basic.ack messages from getting rate limited with your basic.publish messages.
|
|
*/
|
|
func (ch *Channel) NotifyFlow(c chan bool) chan bool {
|
|
ch.notifyM.Lock()
|
|
defer ch.notifyM.Unlock()
|
|
|
|
if ch.noNotify {
|
|
close(c)
|
|
} else {
|
|
ch.flows = append(ch.flows, c)
|
|
}
|
|
|
|
return c
|
|
}
|
|
|
|
/*
|
|
NotifyReturn registers a listener for basic.return methods. These can be sent
|
|
from the server when a publish is undeliverable either from the mandatory or
|
|
immediate flags.
|
|
|
|
A return struct has a copy of the Publishing along with some error
|
|
information about why the publishing failed.
|
|
*/
|
|
func (ch *Channel) NotifyReturn(c chan Return) chan Return {
|
|
ch.notifyM.Lock()
|
|
defer ch.notifyM.Unlock()
|
|
|
|
if ch.noNotify {
|
|
close(c)
|
|
} else {
|
|
ch.returns = append(ch.returns, c)
|
|
}
|
|
|
|
return c
|
|
}
|
|
|
|
/*
|
|
NotifyCancel registers a listener for basic.cancel methods. These can be sent
|
|
from the server when a queue is deleted or when consuming from a mirrored queue
|
|
where the master has just failed (and was moved to another node).
|
|
|
|
The subscription tag is returned to the listener.
|
|
*/
|
|
func (ch *Channel) NotifyCancel(c chan string) chan string {
|
|
ch.notifyM.Lock()
|
|
defer ch.notifyM.Unlock()
|
|
|
|
if ch.noNotify {
|
|
close(c)
|
|
} else {
|
|
ch.cancels = append(ch.cancels, c)
|
|
}
|
|
|
|
return c
|
|
}
|
|
|
|
/*
|
|
NotifyConfirm calls NotifyPublish and starts a goroutine sending
|
|
ordered Ack and Nack DeliveryTag to the respective channels.
|
|
|
|
For strict ordering, use NotifyPublish instead.
|
|
*/
|
|
func (ch *Channel) NotifyConfirm(ack, nack chan uint64) (chan uint64, chan uint64) {
|
|
confirms := ch.NotifyPublish(make(chan Confirmation, cap(ack)+cap(nack)))
|
|
|
|
go func() {
|
|
for c := range confirms {
|
|
if c.Ack {
|
|
ack <- c.DeliveryTag
|
|
} else {
|
|
nack <- c.DeliveryTag
|
|
}
|
|
}
|
|
close(ack)
|
|
if nack != ack {
|
|
close(nack)
|
|
}
|
|
}()
|
|
|
|
return ack, nack
|
|
}
|
|
|
|
/*
|
|
NotifyPublish registers a listener for reliable publishing. Receives from this
|
|
chan for every publish after Channel.Confirm will be in order starting with
|
|
DeliveryTag 1.
|
|
|
|
There will be one and only one Confirmation Publishing starting with the
|
|
delivery tag of 1 and progressing sequentially until the total number of
|
|
Publishings have been seen by the server.
|
|
|
|
Acknowledgments will be received in the order of delivery from the
|
|
NotifyPublish channels even if the server acknowledges them out of order.
|
|
|
|
The listener chan will be closed when the Channel is closed.
|
|
|
|
The capacity of the chan Confirmation must be at least as large as the
|
|
number of outstanding publishings. Not having enough buffered chans will
|
|
create a deadlock if you attempt to perform other operations on the Connection
|
|
or Channel while confirms are in-flight.
|
|
|
|
It's advisable to wait for all Confirmations to arrive before calling
|
|
Channel.Close() or Connection.Close().
|
|
|
|
It is also advisable for the caller to consume from the channel returned till it is closed
|
|
to avoid possible deadlocks
|
|
*/
|
|
func (ch *Channel) NotifyPublish(confirm chan Confirmation) chan Confirmation {
|
|
ch.notifyM.Lock()
|
|
defer ch.notifyM.Unlock()
|
|
|
|
if ch.noNotify {
|
|
close(confirm)
|
|
} else {
|
|
ch.confirms.Listen(confirm)
|
|
}
|
|
|
|
return confirm
|
|
}
|
|
|
|
/*
|
|
Qos controls how many messages or how many bytes the server will try to keep on
|
|
the network for consumers before receiving delivery acks. The intent of Qos is
|
|
to make sure the network buffers stay full between the server and client.
|
|
|
|
With a prefetch count greater than zero, the server will deliver that many
|
|
messages to consumers before acknowledgments are received. The server ignores
|
|
this option when consumers are started with noAck because no acknowledgments
|
|
are expected or sent.
|
|
|
|
With a prefetch size greater than zero, the server will try to keep at least
|
|
that many bytes of deliveries flushed to the network before receiving
|
|
acknowledgments from the consumers. This option is ignored when consumers are
|
|
started with noAck.
|
|
|
|
When global is true, these Qos settings apply to all existing and future
|
|
consumers on all channels on the same connection. When false, the Channel.Qos
|
|
settings will apply to all existing and future consumers on this channel.
|
|
|
|
Please see the RabbitMQ Consumer Prefetch documentation for an explanation of
|
|
how the global flag is implemented in RabbitMQ, as it differs from the
|
|
AMQP 0.9.1 specification in that global Qos settings are limited in scope to
|
|
channels, not connections (https://www.rabbitmq.com/consumer-prefetch.html).
|
|
|
|
To get round-robin behavior between consumers consuming from the same queue on
|
|
different connections, set the prefetch count to 1, and the next available
|
|
message on the server will be delivered to the next available consumer.
|
|
|
|
If your consumer work time is reasonably consistent and not much greater
|
|
than two times your network round trip time, you will see significant
|
|
throughput improvements starting with a prefetch count of 2 or slightly
|
|
greater as described by benchmarks on RabbitMQ.
|
|
|
|
http://www.rabbitmq.com/blog/2012/04/25/rabbitmq-performance-measurements-part-2/
|
|
*/
|
|
func (ch *Channel) Qos(prefetchCount, prefetchSize int, global bool) error {
|
|
return ch.call(
|
|
&basicQos{
|
|
PrefetchCount: uint16(prefetchCount),
|
|
PrefetchSize: uint32(prefetchSize),
|
|
Global: global,
|
|
},
|
|
&basicQosOk{},
|
|
)
|
|
}
|
|
|
|
/*
|
|
Cancel stops deliveries to the consumer chan established in Channel.Consume and
|
|
identified by consumer.
|
|
|
|
Only use this method to cleanly stop receiving deliveries from the server and
|
|
cleanly shut down the consumer chan identified by this tag. Using this method
|
|
and waiting for remaining messages to flush from the consumer chan will ensure
|
|
all messages received on the network will be delivered to the receiver of your
|
|
consumer chan.
|
|
|
|
Continue consuming from the chan Delivery provided by Channel.Consume until the
|
|
chan closes.
|
|
|
|
When noWait is true, do not wait for the server to acknowledge the cancel.
|
|
Only use this when you are certain there are no deliveries in flight that
|
|
require an acknowledgment, otherwise they will arrive and be dropped in the
|
|
client without an ack, and will not be redelivered to other consumers.
|
|
*/
|
|
func (ch *Channel) Cancel(consumer string, noWait bool) error {
|
|
req := &basicCancel{
|
|
ConsumerTag: consumer,
|
|
NoWait: noWait,
|
|
}
|
|
res := &basicCancelOk{}
|
|
|
|
if err := ch.call(req, res); err != nil {
|
|
return err
|
|
}
|
|
|
|
if req.wait() {
|
|
ch.consumers.cancel(res.ConsumerTag)
|
|
} else {
|
|
// Potentially could drop deliveries in flight
|
|
ch.consumers.cancel(consumer)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
/*
|
|
QueueDeclare declares a queue to hold messages and deliver to consumers.
|
|
Declaring creates a queue if it doesn't already exist, or ensures that an
|
|
existing queue matches the same parameters.
|
|
|
|
Every queue declared gets a default binding to the empty exchange "" which has
|
|
the type "direct" with the routing key matching the queue's name. With this
|
|
default binding, it is possible to publish messages that route directly to
|
|
this queue by publishing to "" with the routing key of the queue name.
|
|
|
|
QueueDeclare("alerts", true, false, false, false, nil)
|
|
Publish("", "alerts", false, false, Publishing{Body: []byte("...")})
|
|
|
|
Delivery Exchange Key Queue
|
|
-----------------------------------------------
|
|
key: alerts -> "" -> alerts -> alerts
|
|
|
|
The queue name may be empty, in which case the server will generate a unique name
|
|
which will be returned in the Name field of Queue struct.
|
|
|
|
Durable and Non-Auto-Deleted queues will survive server restarts and remain
|
|
when there are no remaining consumers or bindings. Persistent publishings will
|
|
be restored in this queue on server restart. These queues are only able to be
|
|
bound to durable exchanges.
|
|
|
|
Non-Durable and Auto-Deleted queues will not be redeclared on server restart
|
|
and will be deleted by the server after a short time when the last consumer is
|
|
canceled or the last consumer's channel is closed. Queues with this lifetime
|
|
can also be deleted normally with QueueDelete. These durable queues can only
|
|
be bound to non-durable exchanges.
|
|
|
|
Non-Durable and Non-Auto-Deleted queues will remain declared as long as the
|
|
server is running regardless of how many consumers. This lifetime is useful
|
|
for temporary topologies that may have long delays between consumer activity.
|
|
These queues can only be bound to non-durable exchanges.
|
|
|
|
Durable and Auto-Deleted queues will be restored on server restart, but without
|
|
active consumers will not survive and be removed. This Lifetime is unlikely
|
|
to be useful.
|
|
|
|
Exclusive queues are only accessible by the connection that declares them and
|
|
will be deleted when the connection closes. Channels on other connections
|
|
will receive an error when attempting to declare, bind, consume, purge or
|
|
delete a queue with the same name.
|
|
|
|
When noWait is true, the queue will assume to be declared on the server. A
|
|
channel exception will arrive if the conditions are met for existing queues
|
|
or attempting to modify an existing queue from a different connection.
|
|
|
|
When the error return value is not nil, you can assume the queue could not be
|
|
declared with these parameters, and the channel will be closed.
|
|
*/
|
|
func (ch *Channel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args Table) (Queue, error) {
|
|
if err := args.Validate(); err != nil {
|
|
return Queue{}, err
|
|
}
|
|
|
|
req := &queueDeclare{
|
|
Queue: name,
|
|
Passive: false,
|
|
Durable: durable,
|
|
AutoDelete: autoDelete,
|
|
Exclusive: exclusive,
|
|
NoWait: noWait,
|
|
Arguments: args,
|
|
}
|
|
res := &queueDeclareOk{}
|
|
|
|
if err := ch.call(req, res); err != nil {
|
|
return Queue{}, err
|
|
}
|
|
|
|
if req.wait() {
|
|
return Queue{
|
|
Name: res.Queue,
|
|
Messages: int(res.MessageCount),
|
|
Consumers: int(res.ConsumerCount),
|
|
}, nil
|
|
}
|
|
|
|
return Queue{Name: name}, nil
|
|
}
|
|
|
|
/*
|
|
QueueDeclarePassive is functionally and parametrically equivalent to
|
|
QueueDeclare, except that it sets the "passive" attribute to true. A passive
|
|
queue is assumed by RabbitMQ to already exist, and attempting to connect to a
|
|
non-existent queue will cause RabbitMQ to throw an exception. This function
|
|
can be used to test for the existence of a queue.
|
|
*/
|
|
func (ch *Channel) QueueDeclarePassive(name string, durable, autoDelete, exclusive, noWait bool, args Table) (Queue, error) {
|
|
if err := args.Validate(); err != nil {
|
|
return Queue{}, err
|
|
}
|
|
|
|
req := &queueDeclare{
|
|
Queue: name,
|
|
Passive: true,
|
|
Durable: durable,
|
|
AutoDelete: autoDelete,
|
|
Exclusive: exclusive,
|
|
NoWait: noWait,
|
|
Arguments: args,
|
|
}
|
|
res := &queueDeclareOk{}
|
|
|
|
if err := ch.call(req, res); err != nil {
|
|
return Queue{}, err
|
|
}
|
|
|
|
if req.wait() {
|
|
return Queue{
|
|
Name: res.Queue,
|
|
Messages: int(res.MessageCount),
|
|
Consumers: int(res.ConsumerCount),
|
|
}, nil
|
|
}
|
|
|
|
return Queue{Name: name}, nil
|
|
}
|
|
|
|
/*
|
|
QueueInspect passively declares a queue by name to inspect the current message
|
|
count and consumer count.
|
|
|
|
Use this method to check how many messages ready for delivery reside in the queue,
|
|
how many consumers are receiving deliveries, and whether a queue by this
|
|
name already exists.
|
|
|
|
If the queue by this name exists, use Channel.QueueDeclare check if it is
|
|
declared with specific parameters.
|
|
|
|
If a queue by this name does not exist, an error will be returned and the
|
|
channel will be closed.
|
|
|
|
Deprecated: Use QueueDeclare with "Passive: true" instead.
|
|
*/
|
|
func (ch *Channel) QueueInspect(name string) (Queue, error) {
|
|
req := &queueDeclare{
|
|
Queue: name,
|
|
Passive: true,
|
|
}
|
|
res := &queueDeclareOk{}
|
|
|
|
err := ch.call(req, res)
|
|
|
|
state := Queue{
|
|
Name: name,
|
|
Messages: int(res.MessageCount),
|
|
Consumers: int(res.ConsumerCount),
|
|
}
|
|
|
|
return state, err
|
|
}
|
|
|
|
/*
|
|
QueueBind binds an exchange to a queue so that publishings to the exchange will
|
|
be routed to the queue when the publishing routing key matches the binding
|
|
routing key.
|
|
|
|
QueueBind("pagers", "alert", "log", false, nil)
|
|
QueueBind("emails", "info", "log", false, nil)
|
|
|
|
Delivery Exchange Key Queue
|
|
-----------------------------------------------
|
|
key: alert --> log ----> alert --> pagers
|
|
key: info ---> log ----> info ---> emails
|
|
key: debug --> log (none) (dropped)
|
|
|
|
If a binding with the same key and arguments already exists between the
|
|
exchange and queue, the attempt to rebind will be ignored and the existing
|
|
binding will be retained.
|
|
|
|
In the case that multiple bindings may cause the message to be routed to the
|
|
same queue, the server will only route the publishing once. This is possible
|
|
with topic exchanges.
|
|
|
|
QueueBind("pagers", "alert", "amq.topic", false, nil)
|
|
QueueBind("emails", "info", "amq.topic", false, nil)
|
|
QueueBind("emails", "#", "amq.topic", false, nil) // match everything
|
|
|
|
Delivery Exchange Key Queue
|
|
-----------------------------------------------
|
|
key: alert --> amq.topic ----> alert --> pagers
|
|
key: info ---> amq.topic ----> # ------> emails
|
|
\---> info ---/
|
|
key: debug --> amq.topic ----> # ------> emails
|
|
|
|
It is only possible to bind a durable queue to a durable exchange regardless of
|
|
whether the queue or exchange is auto-deleted. Bindings between durable queues
|
|
and exchanges will also be restored on server restart.
|
|
|
|
If the binding could not complete, an error will be returned and the channel
|
|
will be closed.
|
|
|
|
When noWait is false and the queue could not be bound, the channel will be
|
|
closed with an error.
|
|
*/
|
|
func (ch *Channel) QueueBind(name, key, exchange string, noWait bool, args Table) error {
|
|
if err := args.Validate(); err != nil {
|
|
return err
|
|
}
|
|
|
|
return ch.call(
|
|
&queueBind{
|
|
Queue: name,
|
|
Exchange: exchange,
|
|
RoutingKey: key,
|
|
NoWait: noWait,
|
|
Arguments: args,
|
|
},
|
|
&queueBindOk{},
|
|
)
|
|
}
|
|
|
|
/*
|
|
QueueUnbind removes a binding between an exchange and queue matching the key and
|
|
arguments.
|
|
*/
|
|
func (ch *Channel) QueueUnbind(name, key, exchange string, args Table) error {
|
|
if err := args.Validate(); err != nil {
|
|
return err
|
|
}
|
|
|
|
return ch.call(
|
|
&queueUnbind{
|
|
Queue: name,
|
|
Exchange: exchange,
|
|
RoutingKey: key,
|
|
Arguments: args,
|
|
},
|
|
&queueUnbindOk{},
|
|
)
|
|
}
|
|
|
|
/*
|
|
QueuePurge removes all messages from the named queue which are not waiting to
|
|
be acknowledged. Messages that have been delivered but have not yet been
|
|
acknowledged will not be removed.
|
|
|
|
When successful, returns the number of messages purged.
|
|
|
|
If noWait is true, do not wait for the server response and the number of
|
|
messages purged will not be meaningful.
|
|
*/
|
|
func (ch *Channel) QueuePurge(name string, noWait bool) (int, error) {
|
|
req := &queuePurge{
|
|
Queue: name,
|
|
NoWait: noWait,
|
|
}
|
|
res := &queuePurgeOk{}
|
|
|
|
err := ch.call(req, res)
|
|
|
|
return int(res.MessageCount), err
|
|
}
|
|
|
|
/*
|
|
QueueDelete removes the queue from the server including all bindings then
|
|
purges the messages based on server configuration, returning the number of
|
|
messages purged.
|
|
|
|
When ifUnused is true, the queue will not be deleted if there are any
|
|
consumers on the queue. If there are consumers, an error will be returned and
|
|
the channel will be closed.
|
|
|
|
When ifEmpty is true, the queue will not be deleted if there are any messages
|
|
remaining on the queue. If there are messages, an error will be returned and
|
|
the channel will be closed.
|
|
|
|
When noWait is true, the queue will be deleted without waiting for a response
|
|
from the server. The purged message count will not be meaningful. If the queue
|
|
could not be deleted, a channel exception will be raised and the channel will
|
|
be closed.
|
|
*/
|
|
func (ch *Channel) QueueDelete(name string, ifUnused, ifEmpty, noWait bool) (int, error) {
|
|
req := &queueDelete{
|
|
Queue: name,
|
|
IfUnused: ifUnused,
|
|
IfEmpty: ifEmpty,
|
|
NoWait: noWait,
|
|
}
|
|
res := &queueDeleteOk{}
|
|
|
|
err := ch.call(req, res)
|
|
|
|
return int(res.MessageCount), err
|
|
}
|
|
|
|
/*
|
|
Consume immediately starts delivering queued messages.
|
|
|
|
Begin receiving on the returned chan Delivery before any other operation on the
|
|
Connection or Channel.
|
|
|
|
Continues deliveries to the returned chan Delivery until Channel.Cancel,
|
|
Connection.Close, Channel.Close, or an AMQP exception occurs. Consumers must
|
|
range over the chan to ensure all deliveries are received. Unreceived
|
|
deliveries will block all methods on the same connection.
|
|
|
|
All deliveries in AMQP must be acknowledged. It is expected of the consumer to
|
|
call Delivery.Ack after it has successfully processed the delivery. If the
|
|
consumer is cancelled or the channel or connection is closed any unacknowledged
|
|
deliveries will be requeued at the end of the same queue.
|
|
|
|
The consumer is identified by a string that is unique and scoped for all
|
|
consumers on this channel. If you wish to eventually cancel the consumer, use
|
|
the same non-empty identifier in Channel.Cancel. An empty string will cause
|
|
the library to generate a unique identity. The consumer identity will be
|
|
included in every Delivery in the ConsumerTag field
|
|
|
|
When autoAck (also known as noAck) is true, the server will acknowledge
|
|
deliveries to this consumer prior to writing the delivery to the network. When
|
|
autoAck is true, the consumer should not call Delivery.Ack. Automatically
|
|
acknowledging deliveries means that some deliveries may get lost if the
|
|
consumer is unable to process them after the server delivers them.
|
|
See http://www.rabbitmq.com/confirms.html for more details.
|
|
|
|
When exclusive is true, the server will ensure that this is the sole consumer
|
|
from this queue. When exclusive is false, the server will fairly distribute
|
|
deliveries across multiple consumers.
|
|
|
|
The noLocal flag is not supported by RabbitMQ.
|
|
|
|
It's advisable to use separate connections for
|
|
Channel.Publish and Channel.Consume so not to have TCP pushback on publishing
|
|
affect the ability to consume messages, so this parameter is here mostly for
|
|
completeness.
|
|
|
|
When noWait is true, do not wait for the server to confirm the request and
|
|
immediately begin deliveries. If it is not possible to consume, a channel
|
|
exception will be raised and the channel will be closed.
|
|
|
|
Optional arguments can be provided that have specific semantics for the queue
|
|
or server.
|
|
|
|
Inflight messages, limited by Channel.Qos will be buffered until received from
|
|
the returned chan.
|
|
|
|
When the Channel or Connection is closed, all buffered and inflight messages will
|
|
be dropped. RabbitMQ will requeue messages not acknowledged. In other words, dropped
|
|
messages in this way won't be lost.
|
|
|
|
When the consumer tag is cancelled, all inflight messages will be delivered until
|
|
the returned chan is closed.
|
|
*/
|
|
func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error) {
|
|
// When we return from ch.call, there may be a delivery already for the
|
|
// consumer that hasn't been added to the consumer hash yet. Because of
|
|
// this, we never rely on the server picking a consumer tag for us.
|
|
|
|
if err := args.Validate(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if consumer == "" {
|
|
consumer = uniqueConsumerTag()
|
|
}
|
|
|
|
req := &basicConsume{
|
|
Queue: queue,
|
|
ConsumerTag: consumer,
|
|
NoLocal: noLocal,
|
|
NoAck: autoAck,
|
|
Exclusive: exclusive,
|
|
NoWait: noWait,
|
|
Arguments: args,
|
|
}
|
|
res := &basicConsumeOk{}
|
|
|
|
deliveries := make(chan Delivery)
|
|
|
|
ch.consumers.add(consumer, deliveries)
|
|
|
|
if err := ch.call(req, res); err != nil {
|
|
ch.consumers.cancel(consumer)
|
|
return nil, err
|
|
}
|
|
|
|
return deliveries, nil
|
|
}
|
|
|
|
/*
|
|
ConsumeWithContext immediately starts delivering queued messages.
|
|
|
|
This function is similar to Channel.Consume, and accepts a context to control
|
|
consumer lifecycle. When the context passed to this function is canceled, the
|
|
consumer associated with the deliveries channel will be canceled too. When the
|
|
context passed to this function is cancelled, the deliveries channel will be closed.
|
|
|
|
An application is advised to keep on receiving messages from the delivery channel
|
|
until the channel is empty. This is specially important to avoid memory leaks from
|
|
unconsumed messages from the delivery channel.
|
|
|
|
Begin receiving on the returned chan Delivery before any other operation on the
|
|
Connection or Channel.
|
|
|
|
Continues deliveries to the returned chan Delivery until Channel.Cancel,
|
|
Connection.Close, Channel.Close, context is cancelled, or an AMQP exception
|
|
occurs. Consumers must range over the chan to ensure all deliveries are
|
|
received. Unreceived deliveries will block all methods on the same connection.
|
|
|
|
All deliveries in AMQP must be acknowledged. It is expected of the consumer to
|
|
call Delivery.Ack after it has successfully processed the delivery. If the
|
|
consumer is cancelled or the channel or connection is closed any unacknowledged
|
|
deliveries will be requeued at the end of the same queue.
|
|
|
|
The consumer is identified by a string that is unique and scoped for all
|
|
consumers on this channel. If you wish to eventually cancel the consumer, use
|
|
the same non-empty identifier in Channel.Cancel. An empty string will cause
|
|
the library to generate a unique identity. The consumer identity will be
|
|
included in every Delivery in the ConsumerTag field
|
|
|
|
When autoAck (also known as noAck) is true, the server will acknowledge
|
|
deliveries to this consumer prior to writing the delivery to the network. When
|
|
autoAck is true, the consumer should not call Delivery.Ack. Automatically
|
|
acknowledging deliveries means that some deliveries may get lost if the
|
|
consumer is unable to process them after the server delivers them.
|
|
See http://www.rabbitmq.com/confirms.html for more details.
|
|
|
|
When exclusive is true, the server will ensure that this is the sole consumer
|
|
from this queue. When exclusive is false, the server will fairly distribute
|
|
deliveries across multiple consumers.
|
|
|
|
The noLocal flag is not supported by RabbitMQ.
|
|
|
|
It's advisable to use separate connections for Channel.Publish and
|
|
Channel.Consume so not to have TCP pushback on publishing affect the ability to
|
|
consume messages, so this parameter is here mostly for completeness.
|
|
|
|
When noWait is true, do not wait for the server to confirm the request and
|
|
immediately begin deliveries. If it is not possible to consume, a channel
|
|
exception will be raised and the channel will be closed.
|
|
|
|
Optional arguments can be provided that have specific semantics for the queue
|
|
or server.
|
|
|
|
Inflight messages, limited by Channel.Qos will be buffered until received from
|
|
the returned chan.
|
|
|
|
When the Channel or Connection is closed, all buffered and inflight messages will
|
|
be dropped. RabbitMQ will requeue messages not acknowledged. In other words, dropped
|
|
messages in this way won't be lost.
|
|
*/
|
|
func (ch *Channel) ConsumeWithContext(ctx context.Context, queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error) {
|
|
// When we return from ch.call, there may be a delivery already for the
|
|
// consumer that hasn't been added to the consumer hash yet. Because of
|
|
// this, we never rely on the server picking a consumer tag for us.
|
|
|
|
if err := args.Validate(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if consumer == "" {
|
|
consumer = uniqueConsumerTag()
|
|
}
|
|
|
|
req := &basicConsume{
|
|
Queue: queue,
|
|
ConsumerTag: consumer,
|
|
NoLocal: noLocal,
|
|
NoAck: autoAck,
|
|
Exclusive: exclusive,
|
|
NoWait: noWait,
|
|
Arguments: args,
|
|
}
|
|
res := &basicConsumeOk{}
|
|
|
|
select {
|
|
default:
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
}
|
|
|
|
deliveries := make(chan Delivery)
|
|
|
|
ch.consumers.add(consumer, deliveries)
|
|
|
|
if err := ch.call(req, res); err != nil {
|
|
ch.consumers.cancel(consumer)
|
|
return nil, err
|
|
}
|
|
|
|
go func() {
|
|
select {
|
|
case <-ch.consumers.closed:
|
|
return
|
|
case <-ctx.Done():
|
|
if ch != nil {
|
|
_ = ch.Cancel(consumer, false)
|
|
}
|
|
}
|
|
}()
|
|
|
|
return deliveries, nil
|
|
}
|
|
|
|
/*
|
|
ExchangeDeclare declares an exchange on the server. If the exchange does not
|
|
already exist, the server will create it. If the exchange exists, the server
|
|
verifies that it is of the provided type, durability and auto-delete flags.
|
|
|
|
Errors returned from this method will close the channel.
|
|
|
|
Exchange names starting with "amq." are reserved for pre-declared and
|
|
standardized exchanges. The client MAY declare an exchange starting with
|
|
"amq." if the passive option is set, or the exchange already exists. Names can
|
|
consist of a non-empty sequence of letters, digits, hyphen, underscore,
|
|
period, or colon.
|
|
|
|
Each exchange belongs to one of a set of exchange kinds/types implemented by
|
|
the server. The exchange types define the functionality of the exchange - i.e.
|
|
how messages are routed through it. Once an exchange is declared, its type
|
|
cannot be changed. The common types are "direct", "fanout", "topic" and
|
|
"headers".
|
|
|
|
Durable and Non-Auto-Deleted exchanges will survive server restarts and remain
|
|
declared when there are no remaining bindings. This is the best lifetime for
|
|
long-lived exchange configurations like stable routes and default exchanges.
|
|
|
|
Non-Durable and Auto-Deleted exchanges will be deleted when there are no
|
|
remaining bindings and not restored on server restart. This lifetime is
|
|
useful for temporary topologies that should not pollute the virtual host on
|
|
failure or after the consumers have completed.
|
|
|
|
Non-Durable and Non-Auto-deleted exchanges will remain as long as the server is
|
|
running including when there are no remaining bindings. This is useful for
|
|
temporary topologies that may have long delays between bindings.
|
|
|
|
Durable and Auto-Deleted exchanges will survive server restarts and will be
|
|
removed before and after server restarts when there are no remaining bindings.
|
|
These exchanges are useful for robust temporary topologies or when you require
|
|
binding durable queues to auto-deleted exchanges.
|
|
|
|
Note: RabbitMQ declares the default exchange types like 'amq.fanout' as
|
|
durable, so queues that bind to these pre-declared exchanges must also be
|
|
durable.
|
|
|
|
Exchanges declared as `internal` do not accept publishings. Internal
|
|
exchanges are useful when you wish to implement inter-exchange topologies
|
|
that should not be exposed to users of the broker.
|
|
|
|
When noWait is true, declare without waiting for a confirmation from the server.
|
|
The channel may be closed as a result of an error. Add a NotifyClose listener
|
|
to respond to any exceptions.
|
|
|
|
Optional amqp.Table of arguments that are specific to the server's implementation of
|
|
the exchange can be sent for exchange types that require extra parameters.
|
|
*/
|
|
func (ch *Channel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args Table) error {
|
|
if err := args.Validate(); err != nil {
|
|
return err
|
|
}
|
|
|
|
return ch.call(
|
|
&exchangeDeclare{
|
|
Exchange: name,
|
|
Type: kind,
|
|
Passive: false,
|
|
Durable: durable,
|
|
AutoDelete: autoDelete,
|
|
Internal: internal,
|
|
NoWait: noWait,
|
|
Arguments: args,
|
|
},
|
|
&exchangeDeclareOk{},
|
|
)
|
|
}
|
|
|
|
/*
|
|
ExchangeDeclarePassive is functionally and parametrically equivalent to
|
|
ExchangeDeclare, except that it sets the "passive" attribute to true. A passive
|
|
exchange is assumed by RabbitMQ to already exist, and attempting to connect to a
|
|
non-existent exchange will cause RabbitMQ to throw an exception. This function
|
|
can be used to detect the existence of an exchange.
|
|
*/
|
|
func (ch *Channel) ExchangeDeclarePassive(name, kind string, durable, autoDelete, internal, noWait bool, args Table) error {
|
|
if err := args.Validate(); err != nil {
|
|
return err
|
|
}
|
|
|
|
return ch.call(
|
|
&exchangeDeclare{
|
|
Exchange: name,
|
|
Type: kind,
|
|
Passive: true,
|
|
Durable: durable,
|
|
AutoDelete: autoDelete,
|
|
Internal: internal,
|
|
NoWait: noWait,
|
|
Arguments: args,
|
|
},
|
|
&exchangeDeclareOk{},
|
|
)
|
|
}
|
|
|
|
/*
|
|
ExchangeDelete removes the named exchange from the server. When an exchange is
|
|
deleted all queue bindings on the exchange are also deleted. If this exchange
|
|
does not exist, the channel will be closed with an error.
|
|
|
|
When ifUnused is true, the server will only delete the exchange if it has no queue
|
|
bindings. If the exchange has queue bindings the server does not delete it
|
|
but close the channel with an exception instead. Set this to true if you are
|
|
not the sole owner of the exchange.
|
|
|
|
When noWait is true, do not wait for a server confirmation that the exchange has
|
|
been deleted. Failing to delete the channel could close the channel. Add a
|
|
NotifyClose listener to respond to these channel exceptions.
|
|
*/
|
|
func (ch *Channel) ExchangeDelete(name string, ifUnused, noWait bool) error {
|
|
return ch.call(
|
|
&exchangeDelete{
|
|
Exchange: name,
|
|
IfUnused: ifUnused,
|
|
NoWait: noWait,
|
|
},
|
|
&exchangeDeleteOk{},
|
|
)
|
|
}
|
|
|
|
/*
|
|
ExchangeBind binds an exchange to another exchange to create inter-exchange
|
|
routing topologies on the server. This can decouple the private topology and
|
|
routing exchanges from exchanges intended solely for publishing endpoints.
|
|
|
|
Binding two exchanges with identical arguments will not create duplicate
|
|
bindings.
|
|
|
|
Binding one exchange to another with multiple bindings will only deliver a
|
|
message once. For example if you bind your exchange to `amq.fanout` with two
|
|
different binding keys, only a single message will be delivered to your
|
|
exchange even though multiple bindings will match.
|
|
|
|
Given a message delivered to the source exchange, the message will be forwarded
|
|
to the destination exchange when the routing key is matched.
|
|
|
|
ExchangeBind("sell", "MSFT", "trade", false, nil)
|
|
ExchangeBind("buy", "AAPL", "trade", false, nil)
|
|
|
|
Delivery Source Key Destination
|
|
example exchange exchange
|
|
-----------------------------------------------
|
|
key: AAPL --> trade ----> MSFT sell
|
|
\---> AAPL --> buy
|
|
|
|
When noWait is true, do not wait for the server to confirm the binding. If any
|
|
error occurs the channel will be closed. Add a listener to NotifyClose to
|
|
handle these errors.
|
|
|
|
Optional arguments specific to the exchanges bound can also be specified.
|
|
*/
|
|
func (ch *Channel) ExchangeBind(destination, key, source string, noWait bool, args Table) error {
|
|
if err := args.Validate(); err != nil {
|
|
return err
|
|
}
|
|
|
|
return ch.call(
|
|
&exchangeBind{
|
|
Destination: destination,
|
|
Source: source,
|
|
RoutingKey: key,
|
|
NoWait: noWait,
|
|
Arguments: args,
|
|
},
|
|
&exchangeBindOk{},
|
|
)
|
|
}
|
|
|
|
/*
|
|
ExchangeUnbind unbinds the destination exchange from the source exchange on the
|
|
server by removing the routing key between them. This is the inverse of
|
|
ExchangeBind. If the binding does not currently exist, an error will be
|
|
returned.
|
|
|
|
When noWait is true, do not wait for the server to confirm the deletion of the
|
|
binding. If any error occurs the channel will be closed. Add a listener to
|
|
NotifyClose to handle these errors.
|
|
|
|
Optional arguments that are specific to the type of exchanges bound can also be
|
|
provided. These must match the same arguments specified in ExchangeBind to
|
|
identify the binding.
|
|
*/
|
|
func (ch *Channel) ExchangeUnbind(destination, key, source string, noWait bool, args Table) error {
|
|
if err := args.Validate(); err != nil {
|
|
return err
|
|
}
|
|
|
|
return ch.call(
|
|
&exchangeUnbind{
|
|
Destination: destination,
|
|
Source: source,
|
|
RoutingKey: key,
|
|
NoWait: noWait,
|
|
Arguments: args,
|
|
},
|
|
&exchangeUnbindOk{},
|
|
)
|
|
}
|
|
|
|
/*
|
|
Publish sends a Publishing from the client to an exchange on the server.
|
|
|
|
When you want a single message to be delivered to a single queue, you can
|
|
publish to the default exchange with the routingKey of the queue name. This is
|
|
because every declared queue gets an implicit route to the default exchange.
|
|
|
|
Since publishings are asynchronous, any undeliverable message will get returned
|
|
by the server. Add a listener with Channel.NotifyReturn to handle any
|
|
undeliverable message when calling publish with either the mandatory or
|
|
immediate parameters as true.
|
|
|
|
Publishings can be undeliverable when the mandatory flag is true and no queue is
|
|
bound that matches the routing key, or when the immediate flag is true and no
|
|
consumer on the matched queue is ready to accept the delivery.
|
|
|
|
This can return an error when the channel, connection or socket is closed. The
|
|
error or lack of an error does not indicate whether the server has received this
|
|
publishing.
|
|
|
|
It is possible for publishing to not reach the broker if the underlying socket
|
|
is shut down without pending publishing packets being flushed from the kernel
|
|
buffers. The easy way of making it probable that all publishings reach the
|
|
server is to always call Connection.Close before terminating your publishing
|
|
application. The way to ensure that all publishings reach the server is to add
|
|
a listener to Channel.NotifyPublish and put the channel in confirm mode with
|
|
Channel.Confirm. Publishing delivery tags and their corresponding
|
|
confirmations start at 1. Exit when all publishings are confirmed.
|
|
|
|
When Publish does not return an error and the channel is in confirm mode, the
|
|
internal counter for DeliveryTags with the first confirmation starts at 1.
|
|
*/
|
|
func (ch *Channel) Publish(exchange, key string, mandatory, immediate bool, msg Publishing) error {
|
|
_, err := ch.PublishWithDeferredConfirm(exchange, key, mandatory, immediate, msg)
|
|
return err
|
|
}
|
|
|
|
/*
|
|
PublishWithContext sends a Publishing from the client to an exchange on the server.
|
|
|
|
NOTE: this function is equivalent to [Channel.Publish]. Context is not honoured.
|
|
|
|
When you want a single message to be delivered to a single queue, you can
|
|
publish to the default exchange with the routingKey of the queue name. This is
|
|
because every declared queue gets an implicit route to the default exchange.
|
|
|
|
Since publishings are asynchronous, any undeliverable message will get returned
|
|
by the server. Add a listener with Channel.NotifyReturn to handle any
|
|
undeliverable message when calling publish with either the mandatory or
|
|
immediate parameters as true.
|
|
|
|
Publishings can be undeliverable when the mandatory flag is true and no queue is
|
|
bound that matches the routing key, or when the immediate flag is true and no
|
|
consumer on the matched queue is ready to accept the delivery.
|
|
|
|
This can return an error when the channel, connection or socket is closed. The
|
|
error or lack of an error does not indicate whether the server has received this
|
|
publishing.
|
|
|
|
It is possible for publishing to not reach the broker if the underlying socket
|
|
is shut down without pending publishing packets being flushed from the kernel
|
|
buffers. The easy way of making it probable that all publishings reach the
|
|
server is to always call Connection.Close before terminating your publishing
|
|
application. The way to ensure that all publishings reach the server is to add
|
|
a listener to Channel.NotifyPublish and put the channel in confirm mode with
|
|
Channel.Confirm. Publishing delivery tags and their corresponding
|
|
confirmations start at 1. Exit when all publishings are confirmed.
|
|
|
|
When Publish does not return an error and the channel is in confirm mode, the
|
|
internal counter for DeliveryTags with the first confirmation starts at 1.
|
|
*/
|
|
func (ch *Channel) PublishWithContext(_ context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) error {
|
|
return ch.Publish(exchange, key, mandatory, immediate, msg)
|
|
}
|
|
|
|
/*
|
|
PublishWithDeferredConfirm behaves identically to Publish, but additionally
|
|
returns a DeferredConfirmation, allowing the caller to wait on the publisher
|
|
confirmation for this message. If the channel has not been put into confirm
|
|
mode, the DeferredConfirmation will be nil.
|
|
*/
|
|
func (ch *Channel) PublishWithDeferredConfirm(exchange, key string, mandatory, immediate bool, msg Publishing) (*DeferredConfirmation, error) {
|
|
if err := msg.Headers.Validate(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
ch.m.Lock()
|
|
defer ch.m.Unlock()
|
|
|
|
var dc *DeferredConfirmation
|
|
if ch.confirming {
|
|
dc = ch.confirms.publish()
|
|
}
|
|
|
|
if err := ch.send(&basicPublish{
|
|
Exchange: exchange,
|
|
RoutingKey: key,
|
|
Mandatory: mandatory,
|
|
Immediate: immediate,
|
|
Body: msg.Body,
|
|
Properties: properties{
|
|
Headers: msg.Headers,
|
|
ContentType: msg.ContentType,
|
|
ContentEncoding: msg.ContentEncoding,
|
|
DeliveryMode: msg.DeliveryMode,
|
|
Priority: msg.Priority,
|
|
CorrelationId: msg.CorrelationId,
|
|
ReplyTo: msg.ReplyTo,
|
|
Expiration: msg.Expiration,
|
|
MessageId: msg.MessageId,
|
|
Timestamp: msg.Timestamp,
|
|
Type: msg.Type,
|
|
UserId: msg.UserId,
|
|
AppId: msg.AppId,
|
|
},
|
|
}); err != nil {
|
|
if ch.confirming {
|
|
ch.confirms.unpublish()
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
return dc, nil
|
|
}
|
|
|
|
/*
|
|
PublishWithDeferredConfirmWithContext behaves identically to Publish but additionally returns a
|
|
DeferredConfirmation, allowing the caller to wait on the publisher confirmation
|
|
for this message. If the channel has not been put into confirm mode,
|
|
the DeferredConfirmation will be nil.
|
|
|
|
NOTE: PublishWithDeferredConfirmWithContext is equivalent to its non-context variant. The context passed
|
|
to this function is not honoured.
|
|
*/
|
|
func (ch *Channel) PublishWithDeferredConfirmWithContext(_ context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) (*DeferredConfirmation, error) {
|
|
return ch.PublishWithDeferredConfirm(exchange, key, mandatory, immediate, msg)
|
|
}
|
|
|
|
/*
|
|
Get synchronously receives a single Delivery from the head of a queue from the
|
|
server to the client. In almost all cases, using Channel.Consume will be
|
|
preferred.
|
|
|
|
If there was a delivery waiting on the queue and that delivery was received, the
|
|
second return value will be true. If there was no delivery waiting or an error
|
|
occurred, the ok bool will be false.
|
|
|
|
All deliveries must be acknowledged including those from Channel.Get. Call
|
|
Delivery.Ack on the returned delivery when you have fully processed this
|
|
delivery.
|
|
|
|
When autoAck is true, the server will automatically acknowledge this message so
|
|
you don't have to. But if you are unable to fully process this message before
|
|
the channel or connection is closed, the message will not get requeued.
|
|
*/
|
|
func (ch *Channel) Get(queue string, autoAck bool) (msg Delivery, ok bool, err error) {
|
|
req := &basicGet{Queue: queue, NoAck: autoAck}
|
|
res := &basicGetOk{}
|
|
empty := &basicGetEmpty{}
|
|
|
|
if err := ch.call(req, res, empty); err != nil {
|
|
return Delivery{}, false, err
|
|
}
|
|
|
|
if res.DeliveryTag > 0 {
|
|
return *(newDelivery(ch, res)), true, nil
|
|
}
|
|
|
|
return Delivery{}, false, nil
|
|
}
|
|
|
|
/*
|
|
Tx puts the channel into transaction mode on the server. All publishings and
|
|
acknowledgments following this method will be atomically committed or rolled
|
|
back for a single queue. Call either Channel.TxCommit or Channel.TxRollback to
|
|
leave a this transaction and immediately start a new transaction.
|
|
|
|
The atomicity across multiple queues is not defined as queue declarations and
|
|
bindings are not included in the transaction.
|
|
|
|
The behavior of publishings that are delivered as mandatory or immediate while
|
|
the channel is in a transaction is not defined.
|
|
|
|
Once a channel has been put into transaction mode, it cannot be taken out of
|
|
transaction mode. Use a different channel for non-transactional semantics.
|
|
*/
|
|
func (ch *Channel) Tx() error {
|
|
return ch.call(
|
|
&txSelect{},
|
|
&txSelectOk{},
|
|
)
|
|
}
|
|
|
|
/*
|
|
TxCommit atomically commits all publishings and acknowledgments for a single
|
|
queue and immediately start a new transaction.
|
|
|
|
Calling this method without having called Channel.Tx is an error.
|
|
*/
|
|
func (ch *Channel) TxCommit() error {
|
|
return ch.call(
|
|
&txCommit{},
|
|
&txCommitOk{},
|
|
)
|
|
}
|
|
|
|
/*
|
|
TxRollback atomically rolls back all publishings and acknowledgments for a
|
|
single queue and immediately start a new transaction.
|
|
|
|
Calling this method without having called Channel.Tx is an error.
|
|
*/
|
|
func (ch *Channel) TxRollback() error {
|
|
return ch.call(
|
|
&txRollback{},
|
|
&txRollbackOk{},
|
|
)
|
|
}
|
|
|
|
/*
|
|
Flow pauses the delivery of messages to consumers on this channel. Channels
|
|
are opened with flow control active, to open a channel with paused
|
|
deliveries immediately call this method with `false` after calling
|
|
Connection.Channel.
|
|
|
|
When active is `false`, this method asks the server to temporarily pause deliveries
|
|
until called again with active as `true`.
|
|
|
|
Channel.Get methods will not be affected by flow control.
|
|
|
|
This method is not intended to act as window control. Use Channel.Qos to limit
|
|
the number of unacknowledged messages or bytes in flight instead.
|
|
|
|
The server may also send us flow methods to throttle our publishings. A well
|
|
behaving publishing client should add a listener with Channel.NotifyFlow and
|
|
pause its publishings when `false` is sent on that channel.
|
|
|
|
Note: RabbitMQ prefers to use TCP push back to control flow for all channels on
|
|
a connection, so under high volume scenarios, it's wise to open separate
|
|
Connections for publishings and deliveries.
|
|
*/
|
|
func (ch *Channel) Flow(active bool) error {
|
|
return ch.call(
|
|
&channelFlow{Active: active},
|
|
&channelFlowOk{},
|
|
)
|
|
}
|
|
|
|
/*
|
|
Confirm puts this channel into confirm mode so that the client can ensure all
|
|
publishings have successfully been received by the server. After entering this
|
|
mode, the server will send a basic.ack or basic.nack message with the deliver
|
|
tag set to a 1 based incremental index corresponding to every publishing
|
|
received after the this method returns.
|
|
|
|
Add a listener to Channel.NotifyPublish to respond to the Confirmations. If
|
|
Channel.NotifyPublish is not called, the Confirmations will be silently
|
|
ignored.
|
|
|
|
The order of acknowledgments is not bound to the order of deliveries.
|
|
|
|
Ack and Nack confirmations will arrive at some point in the future.
|
|
|
|
Unroutable mandatory or immediate messages are acknowledged immediately after
|
|
any Channel.NotifyReturn listeners have been notified. Other messages are
|
|
acknowledged when all queues that should have the message routed to them have
|
|
either received acknowledgment of delivery or have enqueued the message,
|
|
persisting the message if necessary.
|
|
|
|
When noWait is true, the client will not wait for a response. A channel
|
|
exception could occur if the server does not support this method.
|
|
*/
|
|
func (ch *Channel) Confirm(noWait bool) error {
|
|
if err := ch.call(
|
|
&confirmSelect{Nowait: noWait},
|
|
&confirmSelectOk{},
|
|
); err != nil {
|
|
return err
|
|
}
|
|
|
|
ch.confirmM.Lock()
|
|
ch.confirming = true
|
|
ch.confirmM.Unlock()
|
|
|
|
return nil
|
|
}
|
|
|
|
/*
|
|
Recover redelivers all unacknowledged deliveries on this channel.
|
|
|
|
When requeue is false, messages will be redelivered to the original consumer.
|
|
|
|
When requeue is true, messages will be redelivered to any available consumer,
|
|
potentially including the original.
|
|
|
|
If the deliveries cannot be recovered, an error will be returned and the channel
|
|
will be closed.
|
|
|
|
Note: this method is not implemented on RabbitMQ, use Delivery.Nack instead
|
|
|
|
Deprecated: This method is deprecated in RabbitMQ. RabbitMQ used Recover(true)
|
|
as a mechanism for consumers to tell the broker that they were ready for more
|
|
deliveries, back in 2008-2009. Support for this will be removed from RabbitMQ in
|
|
a future release. Use Nack() with requeue=true instead.
|
|
*/
|
|
func (ch *Channel) Recover(requeue bool) error {
|
|
return ch.call(
|
|
&basicRecover{Requeue: requeue},
|
|
&basicRecoverOk{},
|
|
)
|
|
}
|
|
|
|
/*
|
|
Ack acknowledges a delivery by its delivery tag when having been consumed with
|
|
Channel.Consume or Channel.Get.
|
|
|
|
Ack acknowledges all message received prior to the delivery tag when multiple
|
|
is true.
|
|
|
|
See also Delivery.Ack
|
|
*/
|
|
func (ch *Channel) Ack(tag uint64, multiple bool) error {
|
|
ch.m.Lock()
|
|
defer ch.m.Unlock()
|
|
|
|
return ch.send(&basicAck{
|
|
DeliveryTag: tag,
|
|
Multiple: multiple,
|
|
})
|
|
}
|
|
|
|
/*
|
|
Nack negatively acknowledges a delivery by its delivery tag. Prefer this
|
|
method to notify the server that you were not able to process this delivery and
|
|
it must be redelivered or dropped.
|
|
|
|
See also Delivery.Nack
|
|
*/
|
|
func (ch *Channel) Nack(tag uint64, multiple bool, requeue bool) error {
|
|
ch.m.Lock()
|
|
defer ch.m.Unlock()
|
|
|
|
return ch.send(&basicNack{
|
|
DeliveryTag: tag,
|
|
Multiple: multiple,
|
|
Requeue: requeue,
|
|
})
|
|
}
|
|
|
|
/*
|
|
Reject negatively acknowledges a delivery by its delivery tag. Prefer Nack
|
|
over Reject when communicating with a RabbitMQ server because you can Nack
|
|
multiple messages, reducing the amount of protocol messages to exchange.
|
|
|
|
See also Delivery.Reject
|
|
*/
|
|
func (ch *Channel) Reject(tag uint64, requeue bool) error {
|
|
ch.m.Lock()
|
|
defer ch.m.Unlock()
|
|
|
|
return ch.send(&basicReject{
|
|
DeliveryTag: tag,
|
|
Requeue: requeue,
|
|
})
|
|
}
|
|
|
|
// GetNextPublishSeqNo returns the sequence number of the next message to be
|
|
// published, when in confirm mode.
|
|
func (ch *Channel) GetNextPublishSeqNo() uint64 {
|
|
ch.confirms.publishedMut.Lock()
|
|
defer ch.confirms.publishedMut.Unlock()
|
|
|
|
return ch.confirms.published + 1
|
|
}
|