class Fluent::DetachProcessManager::DelayedForwarder

Public Class Methods

new(w, interval) click to toggle source
# File lib/fluent/process.rb, line 242
def initialize(w, interval)
  @w = w
  @interval = interval
  @buffer = {}
  @mutex = Mutex.new
  Thread.new(&method(:run))
end

Public Instance Methods

emit(tag, es) click to toggle source
# File lib/fluent/process.rb, line 250
def emit(tag, es)
  stream = es.to_msgpack_stream
  @mutex.synchronize do
    if @buffer[tag]
      @buffer[tag] << stream
    else
      @buffer[tag] = stream
    end
  end
end
run() click to toggle source
# File lib/fluent/process.rb, line 261
def run
  while true
    sleep @interval

    pairs = []
    @mutex.synchronize do
      @buffer.keys.each do |tag|
        if ms = @buffer.delete(tag)
          pairs << [tag, ms]
        end
      end
    end
    pairs.each do |pair|
      pair.to_msgpack(@w)
    end
  end
rescue
  $log.error "error on forwerder thread", error: $!.to_s
  $log.error_backtrace
  raise
end