module Fluent::PluginHelper::Thread

Constants

THREAD_DEFAULT_WAIT_SECONDS

Attributes

_threads[R]

stop : mark callback thread as stopped shutdown : [-] close : correct stopped threads terminate: kill all threads

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin_helper/thread.rb, line 104
def initialize
  super
  @_threads_mutex = Mutex.new
  @_threads = {}
  @_thread_wait_seconds = THREAD_DEFAULT_WAIT_SECONDS
end

Public Instance Methods

after_shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin_helper/thread.rb, line 127
def after_shutdown
  super
  wakeup_threads = []
  @_threads_mutex.synchronize do
    @_threads.each_value do |thread|
      wakeup_threads << thread if thread.alive? && thread.status == "sleep"
    end
  end
  wakeup_threads.each do |thread|
    thread.wakeup if thread.alive?
  end
end
close() click to toggle source
Calls superclass method
# File lib/fluent/plugin_helper/thread.rb, line 140
def close
  @_threads_mutex.synchronize{ @_threads.keys }.each do |obj_id|
    thread = @_threads[obj_id]
    if !thread || thread.join(@_thread_wait_seconds)
      @_threads_mutex.synchronize{ @_threads.delete(obj_id) }
    end
  end

  super
end
stop() click to toggle source
Calls superclass method
# File lib/fluent/plugin_helper/thread.rb, line 111
def stop
  super
  wakeup_threads = []
  @_threads_mutex.synchronize do
    @_threads.each_value do |thread|
      thread[:_fluentd_plugin_helper_thread_running] = false
      wakeup_threads << thread if thread.alive? && thread.status == "sleep"
    end
  end
  wakeup_threads.each do |thread|
    if thread.alive?
      thread.wakeup
    end
  end
end
terminate() click to toggle source
Calls superclass method
# File lib/fluent/plugin_helper/thread.rb, line 151
def terminate
  super
  @_threads_mutex.synchronize{ @_threads.keys }.each do |obj_id|
    thread = @_threads[obj_id]
    log.warn "killing existing thread", thread: thread
    thread.kill if thread
  end
  @_threads_mutex.synchronize{ @_threads.keys }.each do |obj_id|
    thread = @_threads[obj_id]
    thread.join
    @_threads_mutex.synchronize{ @_threads.delete(obj_id) }
  end
  @_thread_wait_seconds = nil
end
thread_create(title) { || ... } click to toggle source

Ruby 2.2.3 or earlier (and all 2.1.x) cause bug about Threading (“Stack consistency error”)

by passing splatted argument to %x`yield`

bugs.ruby-lang.org/issues/11027 We can enable to pass arguments after expire of Ruby 2.1 (& older 2.2.x) def #thread_create(title, *args)

Thread.new(*args) do |*t_args|
  yield *t_args
# File lib/fluent/plugin_helper/thread.rb, line 53
def thread_create(title)
  raise ArgumentError, "BUG: title must be a symbol" unless title.is_a? Symbol
  raise ArgumentError, "BUG: callback not specified" unless block_given?
  m = Mutex.new
  m.lock
  thread = ::Thread.new do
    m.lock # run thread after that thread is successfully set into @_threads
    m.unlock
    thread_exit = false
    ::Thread.current[:_fluentd_plugin_helper_thread_title] = title
    ::Thread.current[:_fluentd_plugin_helper_thread_started] = true
    ::Thread.current[:_fluentd_plugin_helper_thread_running] = true
    begin
      yield
      thread_exit = true
    rescue Exception => e
      log.warn "thread exited by unexpected error", plugin: self.class, title: title, error: e
      thread_exit = true
      raise
    ensure
      if ::Thread.current.alive? && !thread_exit
        log.warn "thread doesn't exit correctly (killed or other reason)", plugin: self.class, title: title, thread: ::Thread.current, error: $!
      end
      @_threads_mutex.synchronize do
        @_threads.delete(::Thread.current.object_id)
      end
      ::Thread.current[:_fluentd_plugin_helper_thread_running] = false
    end
  end
  thread.abort_on_exception = true
  @_threads_mutex.synchronize do
    @_threads[thread.object_id] = thread
  end
  m.unlock
  thread
end
thread_current_running?() click to toggle source
# File lib/fluent/plugin_helper/thread.rb, line 29
def thread_current_running?
  # checker for code in callback of thread_create
  ::Thread.current[:_fluentd_plugin_helper_thread_running] || false
end
thread_exist?(title) click to toggle source
# File lib/fluent/plugin_helper/thread.rb, line 90
def thread_exist?(title)
  @_threads.values.select{|thread| title == thread[:_fluentd_plugin_helper_thread_title] }.size > 0
end
thread_running?(title) click to toggle source
# File lib/fluent/plugin_helper/thread.rb, line 99
def thread_running?(title)
  t = @_threads.values.select{|thread| title == thread[:_fluentd_plugin_helper_thread_title] }.first
  t && t[:_fluentd_plugin_helper_thread_running]
end
thread_started?(title) click to toggle source
# File lib/fluent/plugin_helper/thread.rb, line 94
def thread_started?(title)
  t = @_threads.values.select{|thread| title == thread[:_fluentd_plugin_helper_thread_title] }.first
  t && t[:_fluentd_plugin_helper_thread_started]
end
thread_wait_until_start() click to toggle source
# File lib/fluent/plugin_helper/thread.rb, line 34
def thread_wait_until_start
  until @_threads_mutex.synchronize{ @_threads.values.reduce(true){|r,t| r && t[:_fluentd_plugin_helper_thread_started] } }
    sleep 0.1
  end
end
thread_wait_until_stop() click to toggle source
# File lib/fluent/plugin_helper/thread.rb, line 40
def thread_wait_until_stop
  until @_threads_mutex.synchronize{ @_threads.values.reduce(true){|r,t| r && ![:_fluentd_plugin_helper_thread_running] } }
    sleep 0.1
  end
end