Module: Karafka::Helpers::Async

Included in:
Connection::Listener, Processing::Worker
Defined in:
lib/karafka/helpers/async.rb

Overview

Note:

Thread running code needs to manage it’s own exceptions. If they leak out, they will abort thread on exception.

Allows a given class to run async in a separate thread. Provides also few methods we may want to use to control the underlying thread

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.included(base) ⇒ Object

Adds forwardable to redirect thread-based control methods to the underlying thread that runs the async operations

Parameters:

  • base (Class)

    class we’re including this module in



22
23
24
25
26
# File 'lib/karafka/helpers/async.rb', line 22

def included(base)
  base.extend ::Forwardable

  base.def_delegators :@thread, :join, :terminate, :name
end

Instance Method Details

#alive?Boolean

Returns true if thread is present and is running, false otherwise.

Returns:

  • (Boolean)

    true if thread is present and is running, false otherwise



30
31
32
33
34
35
36
# File 'lib/karafka/helpers/async.rb', line 30

def alive?
  MUTEX.synchronize do
    return false unless @thread

    @thread.alive?
  end
end

#async_call(thread_name) ⇒ Object

Runs the #call method in a new thread

Parameters:

  • thread_name (String)

    name that we want to assign to the thread when we start it



40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/karafka/helpers/async.rb', line 40

def async_call(thread_name)
  MUTEX.synchronize do
    return if @thread&.alive?

    @thread = Thread.new do
      Thread.current.name = thread_name

      Thread.current.abort_on_exception = true

      call
    end
  end
end