Class: Karafka::Web::Pro::Commanding::Listener
- Inherits:
-
Object
- Object
- Karafka::Web::Pro::Commanding::Listener
- Defined in:
- lib/karafka/web/pro/commanding/listener.rb
Overview
Wrapper around the Pro Iterator that yields messages with commands when needed for further processing.
This iterator supports error handling, basically on errors it will be reported and ignored as long as they are not critical. Critical errors will cause back-off and reconnection.
Instance Method Summary collapse
-
#each {|Karafka::Messages::Message| ... } ⇒ Object
Runs iterator and keeps it running until not needed.
-
#stop ⇒ Object
Triggers stop of the listener.
Instance Method Details
#each {|Karafka::Messages::Message| ... } ⇒ Object
Runs iterator and keeps it running until not needed.
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/karafka/web/pro/commanding/listener.rb', line 32 def each c_config = ::Karafka::Web.config.commanding t_config = Karafka::Web.config.topics iterator = Karafka::Pro::Iterator.new( { t_config.consumers.commands => true }, settings: c_config.kafka, yield_nil: true, max_wait_time: c_config.max_wait_time ) iterator.each do || iterator.stop if @stop next if @stop next unless yield() rescue StandardError => e report_error(e) sleep(c_config.pause_timeout / 1_000) next end rescue StandardError => e report_error(e) return if done? sleep(c_config.pause_timeout / 1_000) retry end |
#stop ⇒ Object
Triggers stop of the listener. Does not stop the listener but requests it to stop.
67 68 69 |
# File 'lib/karafka/web/pro/commanding/listener.rb', line 67 def stop @stop = true end |