読者です 読者をやめる 読者になる 読者になる

A Log In The Life.

May The Code Be With You !

Tick Tuples の Clojure DSL での書き方

clojure storm

どうも、Storm 初心者のまさおりです。Storm のトポロジーを書いていて「10秒間隔で Bolt からデータ流したいなー」とか思ったものの、よくわからなかったので調べてみましたー。

Tick Tuples

It’s common to require a bolt to “do something” at a fixed interval, like flush writes to a database. Many people have been using variants of a ClockSpout to send these ticks. The problem with a ClockSpout is that you can’t internalize the need for ticks within your bolt, so if you forget to set up your bolt correctly within your topology it won’t work correctly. 0.8.0 introduces a new “tick tuple” config that lets you specify the frequency at which you want to receive tick tuples via the “topology.tick.tuple.freq.secs” component- specific config, and then your bolt will receive a tuple from the system component and tick stream at that frequency.

Tick Tuples in Clojure DSL

フムフム、Tick Tuple 受け取る側で “topology.tick.tuple.freq.secs” を config に設定すればいいのね。

で、それ Clojure DSL でどうやって書くの?と探すと、ML に流れてました。

ML によると defbolt のオプションに :conf {"topology.tick.tuple.freq.secs", 2} みたいに渡せば良さそうです。

(defbolt mybolt ["myfield"] {:prepare true :conf {"topology.tick.tuple.freq.secs", 2}} [conf tuple collector]

Tick?

実際に使うには 『Tick Tuple が来たときだけ、次の Bolt に Tuple 渡す』みたいな処理も必要で、受け取った Tuple が Tick Tuple が判別する必要があります。

(defn tick? [tuple]
  (= (.getSourceStreamId tuple) (Constants/SYSTEM_TICK_STREAM_ID)))

※ これデフォルトで用意してありそうな気がするんですよー。ないんですかねー?詳しい方教えて下さい:D

Example

まとめるとこんな感じです。

(ns example.bolts
  (:require [backtype.storm [clojure :refer [emit-bolt! defbolt ack! bolt]]])
  (:import [backtype.storm Constants]))

(defn tick? [tuple]
  (= (.getSourceStreamId tuple) (Constants/SYSTEM_TICK_STREAM_ID)))

(defbolt tick-tack-bolt ["tack"]  {:prepare true :conf {"topology.tick.tuple.freq.secs", 10}} [conf tuple collector]
  (let [tack (atom 0)]
    (bolt
      (execute [tuple]
                    (if (tick?  tuple)
                      (do
                        (emit-bolt! collector [@tack] :anchor tuple)
                        (reset! tack 0))
                      (do
                         (swap! tack inc)
                         (ack! collector tuple)))))))
  • tupletick tuple の場合だけ emit-bolt! して、そうじゃない場合は単に ack! してるだけですー

その他