ここ数日RabbitMQとrabbitmq-delayed-message-exchangeとElixirを組み合わせたサンプルを書くということをしている
ようやっとサンプルが動いたのでブログを書く
やりたいこと
やりたいことは、このPluginを使って指定した秒数待ったあとにメッセージを送るというもの
ここのReadmeに使い方が書いてあるのだけど、初心者の僕には、Javaっぽい言語のソースコードが載っているだけでイマイチピンと来なかった。
この記事ではRabbitMQでdelayed-message-exchangeのpluginだけインストールした状態から、実際にElixirでメッセージ送信するためにやったことを紹介する。
Exchange設定
こんな感じにRabbitMQ managementからExchangeを設定する。
Elixirプログラム
受信側
defmodule Consumer do use GenServer use AMQP def start_link do GenServer.start_link(__MODULE__, [], []) end @exchange "my-exchange" @queue "gen_server_test_queue" @queue_error "#{@queue}_error" def init(_opts) do {:ok, conn} = Connection.open {:ok, chan} = Channel.open(conn) # Limit unacknowledged messages to 10 Basic.qos(chan, prefetch_count: 10) Queue.declare(chan, @queue_error, durable: true) # Messages that cannot be delivered to any consumer in the main queue will be routed to the error queue Queue.declare(chan, @queue, durable: true, arguments: [{"x-dead-letter-exchange", :longstr, ""}, {"x-dead-letter-routing-key", :longstr, @queue_error}]) Queue.bind(chan, @queue, @exchange) # Register the GenServer process as a consumer {:ok, _consumer_tag} = Basic.consume(chan, @queue) {:ok, chan} end # Confirmation sent by the broker after registering this process as a consumer def handle_info({:basic_consume_ok, %{consumer_tag: consumer_tag}}, chan) do {:noreply, chan} end # Sent by the broker when the consumer is unexpectedly cancelled (such as after a queue deletion) def handle_info({:basic_cancel, %{consumer_tag: consumer_tag}}, chan) do {:stop, :normal, chan} end # Confirmation sent by the broker to the consumer process after a Basic.cancel def handle_info({:basic_cancel_ok, %{consumer_tag: consumer_tag}}, chan) do {:noreply, chan} end def handle_info({:basic_deliver, payload, %{delivery_tag: tag, redelivered: redelivered}}, chan) do spawn fn -> consume(chan, tag, redelivered, payload) end {:noreply, chan} end defp consume(channel, tag, redelivered, payload) do number = String.to_integer(payload) if number <= 10 do Basic.ack channel, tag IO.puts "Consumed a #{number}." else Basic.reject channel, tag, requeue: false IO.puts "#{number} is too big and was rejected." end rescue # Requeue unless it's a redelivered message. # This means we will retry consuming a message once in case of exception # before we give up and have it moved to the error queue # # You might also want to catch :exit signal in production code. # Make sure you call ack, nack or reject otherwise comsumer will stop # receiving messages. exception -> Basic.reject channel, tag, requeue: not redelivered IO.puts "Error converting #{payload} to integer" end end
送信側
defmodule RabbitmqTutorials do @queue "gen_server_test_queue" @exchange "my-exchange" def hello do {:ok, connection} = AMQP.Connection.open {:ok, channel} = AMQP.Channel.open(connection) AMQP.Basic.publish(channel, @exchange, "", "5", headers: [{"x-delay", 5000}]) AMQP.Connection.close(connection) end end
あくまでサンプルなので、ここからもっと洗練する必要がある
今回の目的は「サンプルを書いて動かしてみる」なので以上
躓いた所
AMQP
RabbitMQではAMQPというプロトコルを使用するのですが、中途半端な理解なまま先に進んだためわけわからなくなった。
AMQPについてはGreeの技術記事がすごくわかりやすかった
AMQPによるメッセージング | GREE Engineers' Blog
AMQP.Exchange#declare/4
ElixirのAMQPクライアントには AMQP.Exchange#declare/4
という関数があって、これを使えばExchangeを登録できる
delayed-message-exchange
を使うには typeに x-delayed-message
を入れないといけない
AMQP.Exchange#declare/4
のtypeはAtomを入れるようになっていて x-delayed-message
をAtomで書きたいんだけど、 -
が付いているもんだからAtomが作れない
仕方がないからRabbitMQ managementで登録することにした
RabbitMQ management exchange登録
なんもわからずに登録しようとしたらこんなこと言われた
406 PRECONDITION_FAILED - Invalid argument, 'x-delayed-type' must be an existing exchange type
arguments
に x-delayed-type
入れろとのこと
感想
ちゃんと理解してから先に進むの大事だなと思った