class Fluent::Plugin::TailInput

Constants

FILE_PERMISSION

Attributes

paths[R]

Public Class Methods

new() click to toggle source
Calls superclass method Fluent::PluginLoggerMixin.new
# File lib/fluent/plugin/in_tail.rb, line 39
def initialize
  super
  @paths = []
  @tails = {}
  @pf_file = nil
  @pf = nil
end

Public Instance Methods

close_watcher(tw, close_io = true) click to toggle source

Fluent::Plugin::TailInput::TailWatcher#close is called by another thread at shutdown phase. It causes 'can't modify string; temporarily locked' error in IOHandler so adding close_io argument to avoid this problem. At shutdown, IOHandler's io will be released automatically after detached the event loop

# File lib/fluent/plugin/in_tail.rb, line 266
def close_watcher(tw, close_io = true)
  tw.close(close_io)
  flush_buffer(tw)
  if tw.unwatched && @pf
    @pf[tw.path].update_pos(PositionFile::UNWATCHED_POSITION)
  end
end
close_watcher_after_rotate_wait(tw) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 274
def close_watcher_after_rotate_wait(tw)
  timer_execute(:in_tail_close_watcher, @rotate_wait, repeat: false) do
    close_watcher(tw)
  end
end
configure(conf) click to toggle source
Calls superclass method Fluent::PluginLoggerMixin#configure
# File lib/fluent/plugin/in_tail.rb, line 79
def configure(conf)
  compat_parameters_convert(conf, :parser)
  parser_config = conf.elements('parse').first
  unless parser_config
    raise Fluent::ConfigError, "<parse> section is required."
  end
  unless parser_config["@type"]
    raise Fluent::ConfigError, "parse/@type is required."
  end

  (1..Fluent::Plugin::MultilineParser::FORMAT_MAX_NUM).each do |n|
    parser_config["format#{n}"] = conf["format#{n}"] if conf["format#{n}"]
  end

  super

  @paths = @path.split(',').map {|path| path.strip }
  if @paths.empty?
    raise Fluent::ConfigError, "tail: 'path' parameter is required on tail input"
  end

  unless @pos_file
    $log.warn "'pos_file PATH' parameter is not set to a 'tail' source."
    $log.warn "this parameter is highly recommended to save the position to resume tailing."
  end

  configure_tag
  configure_encoding

  @multiline_mode = parser_config["@type"] =~ /multiline/
  @receive_handler = if @multiline_mode
                       method(:parse_multilines)
                     else
                       method(:parse_singleline)
                     end
  @file_perm = system_config.file_permission || FILE_PERMISSION
  @parser = parser_create(conf: parser_config)
end
configure_encoding() click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 128
def configure_encoding
  unless @encoding
    if @from_encoding
      raise Fluent::ConfigError, "tail: 'from_encoding' parameter must be specified with 'encoding' parameter."
    end
  end

  @encoding = parse_encoding_param(@encoding) if @encoding
  @from_encoding = parse_encoding_param(@from_encoding) if @from_encoding
end
configure_tag() click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 118
def configure_tag
  if @tag.index('*')
    @tag_prefix, @tag_suffix = @tag.split('*')
    @tag_suffix ||= ''
  else
    @tag_prefix = nil
    @tag_suffix = nil
  end
end
convert_line_to_event(line, es, tail_watcher) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 328
def convert_line_to_event(line, es, tail_watcher)
  begin
    line.chomp!  # remove \n
    if @encoding
      if @from_encoding
        line.encode!(@encoding, @from_encoding)
      else
        line.force_encoding(@encoding)
      end
    end
    @parser.parse(line) { |time, record|
      if time && record
        record[@path_key] ||= tail_watcher.path unless @path_key.nil?
        es.add(time, record)
      else
        log.warn "pattern not match: #{line.inspect}"
      end
    }
  rescue => e
    log.warn line.dump, error: e.to_s
    log.debug_backtrace(e.backtrace)
  end
end
expand_paths() click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 167
def expand_paths
  date = Time.now
  paths = []

  excluded = @exclude_path.map { |path| path = date.strftime(path); path.include?('*') ? Dir.glob(path) : path }.flatten.uniq
  @paths.each { |path|
    path = date.strftime(path)
    if path.include?('*')
      paths += Dir.glob(path).select { |p|
        if File.readable?(p)
          true
        else
          log.warn "#{p} unreadable. It is excluded and would be examined next time."
          false
        end
      }
    else
      # When file is not created yet, Dir.glob returns an empty array. So just add when path is static.
      paths << path
    end
  }
  paths - excluded
end
flush_buffer(tw) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 280
def flush_buffer(tw)
  if lb = tw.line_buffer
    lb.chomp!
    if @encoding
      if @from_encoding
        lb.encode!(@encoding, @from_encoding)
      else
        lb.force_encoding(@encoding)
      end
    end
    @parser.parse(lb) { |time, record|
      if time && record
        tag = if @tag_prefix || @tag_suffix
                @tag_prefix + tw.tag + @tag_suffix
              else
                @tag
              end
        record[@path_key] ||= tw.path unless @path_key.nil?
        router.emit(tag, time, record)
      else
        log.warn "got incomplete line at shutdown from #{tw.path}: #{lb.inspect}"
      end
    }
  end
end
parse_encoding_param(encoding_name) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 139
def parse_encoding_param(encoding_name)
  begin
    Encoding.find(encoding_name) if encoding_name
  rescue ArgumentError => e
    raise Fluent::ConfigError, e.message
  end
end
parse_multilines(lines, tail_watcher) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 360
def parse_multilines(lines, tail_watcher)
  lb = tail_watcher.line_buffer
  es = Fluent::MultiEventStream.new
  if @parser.has_firstline?
    tail_watcher.line_buffer_timer_flusher.reset_timer if tail_watcher.line_buffer_timer_flusher
    lines.each { |line|
      if @parser.firstline?(line)
        if lb
          convert_line_to_event(lb, es, tail_watcher)
        end
        lb = line
      else
        if lb.nil?
          log.warn "got incomplete line before first line from #{tail_watcher.path}: #{line.inspect}"
        else
          lb << line
        end
      end
    }
  else
    lb ||= ''
    lines.each do |line|
      lb << line
      @parser.parse(lb) { |time, record|
        if time && record
          convert_line_to_event(lb, es, tail_watcher)
          lb = ''
        end
      }
    end
  end
  tail_watcher.line_buffer = lb
  es
end
parse_singleline(lines, tail_watcher) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 352
def parse_singleline(lines, tail_watcher)
  es = Fluent::MultiEventStream.new
  lines.each { |line|
    convert_line_to_event(line, es, tail_watcher)
  }
  es
end
receive_lines(lines, tail_watcher) click to toggle source

@return true if no error or unrecoverable error happens in emit action. false if got BufferOverflowError

# File lib/fluent/plugin/in_tail.rb, line 307
def receive_lines(lines, tail_watcher)
  es = @receive_handler.call(lines, tail_watcher)
  unless es.empty?
    tag = if @tag_prefix || @tag_suffix
            @tag_prefix + tail_watcher.tag + @tag_suffix
          else
            @tag
          end
    begin
      router.emit_stream(tag, es)
    rescue Fluent::Plugin::Buffer::BufferOverflowError
      return false
    rescue
      # ignore non BufferQueueLimitError errors because in_tail can't recover. Engine shows logs and backtraces.
      return true
    end
  end

  return true
end
refresh_watchers() click to toggle source

in_tail with '*' path doesn't check rotation file equality at refresh phase. So you should not use '*' path when your logs will be rotated by another tool. It will cause log duplication after updated watch files. In such case, you should separate log directory and specify two paths in path parameter. e.g. path /path/to/dir/*,/path/to/rotated_logs/target_file

# File lib/fluent/plugin/in_tail.rb, line 196
def refresh_watchers
  target_paths = expand_paths
  existence_paths = @tails.keys

  unwatched = existence_paths - target_paths
  added = target_paths - existence_paths

  stop_watchers(unwatched, false, true) unless unwatched.empty?
  start_watchers(added) unless added.empty?
end
setup_watcher(path, pe) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 207
def setup_watcher(path, pe)
  line_buffer_timer_flusher = (@multiline_mode && @multiline_flush_interval) ? TailWatcher::LineBufferTimerFlusher.new(log, @multiline_flush_interval, &method(:flush_buffer)) : nil
  tw = TailWatcher.new(path, @rotate_wait, pe, log, @read_from_head, @enable_watch_timer, @read_lines_limit, method(:update_watcher), line_buffer_timer_flusher, &method(:receive_lines))
  tw.attach do |watcher|
    watcher.timer_trigger = timer_execute(:in_tail_timer_trigger, 1, &watcher.method(:on_notify)) if watcher.enable_watch_timer
    event_loop_attach(watcher.stat_trigger)
  end
  tw
end
shutdown() click to toggle source
Calls superclass method Fluent::Plugin::Base#shutdown
# File lib/fluent/plugin/in_tail.rb, line 160
def shutdown
  stop_watchers(@tails.keys, true)
  @pf_file.close if @pf_file

  super
end
start() click to toggle source
Calls superclass method Fluent::PluginLoggerMixin#start
# File lib/fluent/plugin/in_tail.rb, line 147
def start
  super

  if @pos_file
    @pf_file = File.open(@pos_file, File::RDWR|File::CREAT|File::BINARY, @file_perm)
    @pf_file.sync = true
    @pf = PositionFile.parse(@pf_file)
  end

  refresh_watchers
  timer_execute(:in_tail_refresh_watchers, @refresh_interval, &method(:refresh_watchers))
end
start_watchers(paths) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 217
def start_watchers(paths)
  paths.each { |path|
    pe = nil
    if @pf
      pe = @pf[path]
      if @read_from_head && pe.read_inode.zero?
        begin
          pe.update(Fluent::FileWrapper.stat(path).ino, 0)
        rescue Errno::ENOENT
          $log.warn "#{path} not found. Continuing without tailing it."
        end
      end
    end

    @tails[path] = setup_watcher(path, pe)
  }
end
stop_watchers(paths, immediate = false, unwatched = false) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 235
def stop_watchers(paths, immediate = false, unwatched = false)
  paths.each { |path|
    tw = @tails.delete(path)
    if tw
      tw.unwatched = unwatched
      if immediate
        close_watcher(tw, false)
      else
        close_watcher_after_rotate_wait(tw)
      end
    end
  }
end
update_watcher(path, pe) click to toggle source

#refresh_watchers calls @tails.keys so we don't use stop_watcher -> start_watcher sequence for safety.

# File lib/fluent/plugin/in_tail.rb, line 250
def update_watcher(path, pe)
  if @pf
    unless pe.read_inode == @pf[path].read_inode
      log.trace "Skip update_watcher because watcher has been already updated by other inotify event"
      return
    end
  end
  rotated_tw = @tails[path]
  @tails[path] = setup_watcher(path, pe)
  close_watcher_after_rotate_wait(rotated_tw) if rotated_tw
end