ここ数日やっていること

ここ数日RabbitMQとrabbitmq-delayed-message-exchangeとElixirを組み合わせたサンプルを書くということをしている

ようやっとサンプルが動いたのでブログを書く

やりたいこと

github.com

やりたいことは、このPluginを使って指定した秒数待ったあとにメッセージを送るというもの

ここのReadmeに使い方が書いてあるのだけど、初心者の僕には、Javaっぽい言語のソースコードが載っているだけでイマイチピンと来なかった。

この記事ではRabbitMQでdelayed-message-exchangeのpluginだけインストールした状態から、実際にElixirでメッセージ送信するためにやったことを紹介する。

Exchange設定

f:id:kytiken:20171114215537p:plain

こんな感じにRabbitMQ managementからExchangeを設定する。

Elixirプログラム

受信側

defmodule Consumer do
  use GenServer
  use AMQP

  def start_link do
    GenServer.start_link(__MODULE__, [], [])
  end

  @exchange    "my-exchange"
  @queue       "gen_server_test_queue"
  @queue_error "#{@queue}_error"

  def init(_opts) do
    {:ok, conn} = Connection.open
    {:ok, chan} = Channel.open(conn)
    # Limit unacknowledged messages to 10
    Basic.qos(chan, prefetch_count: 10)
    Queue.declare(chan, @queue_error, durable: true)
    # Messages that cannot be delivered to any consumer in the main queue will be routed to the error queue
    Queue.declare(chan, @queue, durable: true,
                                arguments: [{"x-dead-letter-exchange", :longstr, ""},
                                            {"x-dead-letter-routing-key", :longstr, @queue_error}])
    Queue.bind(chan, @queue, @exchange)
    # Register the GenServer process as a consumer
    {:ok, _consumer_tag} = Basic.consume(chan, @queue)
    {:ok, chan}
  end

  # Confirmation sent by the broker after registering this process as a consumer
  def handle_info({:basic_consume_ok, %{consumer_tag: consumer_tag}}, chan) do
    {:noreply, chan}
  end

  # Sent by the broker when the consumer is unexpectedly cancelled (such as after a queue deletion)
  def handle_info({:basic_cancel, %{consumer_tag: consumer_tag}}, chan) do
    {:stop, :normal, chan}
  end

  # Confirmation sent by the broker to the consumer process after a Basic.cancel
  def handle_info({:basic_cancel_ok, %{consumer_tag: consumer_tag}}, chan) do
    {:noreply, chan}
  end

  def handle_info({:basic_deliver, payload, %{delivery_tag: tag, redelivered: redelivered}}, chan) do
    spawn fn -> consume(chan, tag, redelivered, payload) end
    {:noreply, chan}
  end

  defp consume(channel, tag, redelivered, payload) do
    number = String.to_integer(payload)
    if number <= 10 do
      Basic.ack channel, tag
      IO.puts "Consumed a #{number}."
    else
      Basic.reject channel, tag, requeue: false
      IO.puts "#{number} is too big and was rejected."
    end

  rescue
    # Requeue unless it's a redelivered message.
    # This means we will retry consuming a message once in case of exception
    # before we give up and have it moved to the error queue
    #
    # You might also want to catch :exit signal in production code.
    # Make sure you call ack, nack or reject otherwise comsumer will stop
    # receiving messages.
    exception ->
      Basic.reject channel, tag, requeue: not redelivered
      IO.puts "Error converting #{payload} to integer"
  end
end

送信側

defmodule RabbitmqTutorials do
  @queue       "gen_server_test_queue"
  @exchange    "my-exchange"
  def hello do
    {:ok, connection} = AMQP.Connection.open
    {:ok, channel} = AMQP.Channel.open(connection)
    AMQP.Basic.publish(channel, @exchange, "", "5", headers: [{"x-delay", 5000}])
    AMQP.Connection.close(connection)
  end
end

あくまでサンプルなので、ここからもっと洗練する必要がある

今回の目的は「サンプルを書いて動かしてみる」なので以上

躓いた所

AMQP

RabbitMQではAMQPというプロトコルを使用するのですが、中途半端な理解なまま先に進んだためわけわからなくなった。

AMQPについてはGreeの技術記事がすごくわかりやすかった

AMQPによるメッセージング | GREE Engineers' Blog

AMQP.Exchange#declare/4

ElixirのAMQPクライアントには AMQP.Exchange#declare/4 という関数があって、これを使えばExchangeを登録できる

AMQP.Exchange – amqp v0.3.0

delayed-message-exchange を使うには typeに x-delayed-message を入れないといけない AMQP.Exchange#declare/4 のtypeはAtomを入れるようになっていて x-delayed-messageAtomで書きたいんだけど、 - が付いているもんだからAtomが作れない

仕方がないからRabbitMQ managementで登録することにした

RabbitMQ management exchange登録

なんもわからずに登録しようとしたらこんなこと言われた

f:id:kytiken:20171114222539p:plain

406 PRECONDITION_FAILED - Invalid argument, 'x-delayed-type' must be an existing exchange type 

argumentsx-delayed-type 入れろとのこと

感想

ちゃんと理解してから先に進むの大事だなと思った