class Fluent::Plugin::FileBuffer

Constants

DEFAULT_CHUNK_LIMIT_SIZE
DEFAULT_TOTAL_LIMIT_SIZE
DIR_PERMISSION

Public Class Methods

new() click to toggle source
Calls superclass method Fluent::Plugin::Buffer.new
# File lib/fluent/plugin/buf_file.rb, line 51
def initialize
  super
  @symlink_path = nil
end

Public Instance Methods

buffer_path_for_test?() click to toggle source
# File lib/fluent/plugin/buf_file.rb, line 95
def buffer_path_for_test?
  caller_locations.each do |location|
    # Thread::Backtrace::Location#path returns base filename or absolute path.
    # #absolute_path returns absolute_path always.
    # https://bugs.ruby-lang.org/issues/12159
    if location.absolute_path =~ /\/test_[^\/]+\.rb$/ # location.path =~ /test_.+\.rb$/
      return true
    end
  end
  false
end
configure(conf) click to toggle source
Calls superclass method Fluent::Plugin::Buffer#configure
# File lib/fluent/plugin/buf_file.rb, line 56
def configure(conf)
  super

  type_of_owner = Plugin.lookup_type_from_class(@_owner.class)
  if @@buffer_paths.has_key?(@path) && !buffer_path_for_test?
    type_using_this_path = @@buffer_paths[@path]
    raise ConfigError, "Other '#{type_using_this_path}' plugin already use same buffer path: type = #{type_of_owner}, buffer path = #{@path}"
  end

  @@buffer_paths[@path] = type_of_owner

  # TODO: create buffer path with plugin_id, under directory specified by system config
  if File.exist?(@path)
    if File.directory?(@path)
      @path = File.join(@path, 'buffer.*.log')
    elsif File.basename(@path).include?('.*.')
      # valid path (buffer.*.log will be ignored)
    elsif File.basename(@path).end_with?('.*')
      @path = @path + '.log'
    else
      # existing file will be ignored
      @path = @path + '.*.log'
    end
  else # path doesn't exist
    if File.basename(@path).include?('.*.')
      # valid path
    elsif File.basename(@path).end_with?('.*')
      @path = @path + '.log'
    else
      # path is handled as directory, and it will be created at #start
      @path = File.join(@path, 'buffer.*.log')
    end
  end

  unless @dir_permission
    @dir_permission = system_config.dir_permission || DIR_PERMISSION
  end
end
generate_chunk(metadata) click to toggle source
# File lib/fluent/plugin/buf_file.rb, line 144
def generate_chunk(metadata)
  # FileChunk generates real path with unique_id
  if @file_permission
    Fluent::Plugin::Buffer::FileChunk.new(metadata, @path, :create, perm: @file_permission, compress: @compress)
  else
    Fluent::Plugin::Buffer::FileChunk.new(metadata, @path, :create, compress: @compress)
  end
end
persistent?() click to toggle source
# File lib/fluent/plugin/buf_file.rb, line 113
def persistent?
  true
end
resume() click to toggle source
# File lib/fluent/plugin/buf_file.rb, line 117
def resume
  stage = {}
  queue = []

  Dir.glob(@path) do |path|
    m = new_metadata() # this metadata will be overwritten by resuming .meta file content
                       # so it should not added into @metadata_list for now
    mode = Fluent::Plugin::Buffer::FileChunk.assume_chunk_state(path)
    if mode == :unknown
      log.debug "uknown state chunk found", path: path
      next
    end

    chunk = Fluent::Plugin::Buffer::FileChunk.new(m, path, mode) # file chunk resumes contents of metadata
    case chunk.state
    when :staged
      stage[chunk.metadata] = chunk
    when :queued
      queue << chunk
    end
  end

  queue.sort_by!{ |chunk| chunk.modified_at }

  return stage, queue
end
start() click to toggle source
Calls superclass method Fluent::Plugin::Buffer#start
# File lib/fluent/plugin/buf_file.rb, line 107
def start
  FileUtils.mkdir_p File.dirname(@path), mode: @dir_permission

  super
end