class Fluent::ForwardInput

Constants

LISTEN_PORT

Public Class Methods

new() click to toggle source
Calls superclass method Fluent::Compat::Input.new
# File lib/fluent/plugin/in_forward.rb, line 30
def initialize
  super
  require 'fluent/plugin/socket_util'
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method Fluent::PluginLoggerMixin#configure
# File lib/fluent/plugin/in_forward.rb, line 88
def configure(conf)
  super

  if @security
    if @security.user_auth && @security.users.empty?
      raise Fluent::ConfigError, "<user> sections required if user_auth enabled"
    end
    if !@security.allow_anonymous_source && @security.clients.empty?
      raise Fluent::ConfigError, "<client> sections required if allow_anonymous_source disabled"
    end

    @nodes = []

    @security.clients.each do |client|
      if client.host && client.network
        raise Fluent::ConfigError, "both of 'host' and 'network' are specified for client"
      end
      if !client.host && !client.network
        raise Fluent::ConfigError, "Either of 'host' and 'network' must be specified for client"
      end
      source = nil
      if client.host
        begin
          source = IPSocket.getaddress(client.host)
        rescue SocketError => e
          raise Fluent::ConfigError, "host '#{client.host}' cannot be resolved"
        end
      end
      source_addr = begin
                      IPAddr.new(source || client.network)
                    rescue ArgumentError => e
                      raise Fluent::ConfigError, "network '#{client.network}' address format is invalid"
                    end
      @nodes.push({
          address: source_addr,
          shared_key: (client.shared_key || @security.shared_key),
          users: client.users
        })
    end
  end
end
listen(client) click to toggle source
# File lib/fluent/plugin/in_forward.rb, line 171
def listen(client)
  log.info "listening fluent socket on #{@bind}:#{@port}"
  sock = client.listen_tcp(@bind, @port)
  s = Coolio::TCPServer.new(sock, nil, Handler, @linger_timeout, log, method(:handle_connection))
  s.listen(@backlog) unless @backlog.nil?
  s
end
run() click to toggle source

config_param :path, :string, :default => DEFAULT_SOCKET_PATH def listen

if File.exist?(@path)
  File.unlink(@path)
end
FileUtils.mkdir_p File.dirname(@path)
log.debug "listening fluent socket on #{@path}"
Coolio::UNIXServer.new(@path, Handler, method(:on_message))

end

# File lib/fluent/plugin/in_forward.rb, line 189
def run
  @loop.run(@blocking_timeout)
rescue => e
  log.error "unexpected error", error: e
  log.error_backtrace
end
shutdown() click to toggle source
Calls superclass method Fluent::Compat::Input#shutdown
# File lib/fluent/plugin/in_forward.rb, line 152
def shutdown
  # In test cases it occasionally appeared that when detaching a watcher, another watcher is also detached.
  # In the case in the iteration of watchers, a watcher that has been already detached is intended to be detached
  # and therfore RuntimeError occurs saying that it is not attached to a loop.
  # It occurs only when testing for sending responses to ForwardOutput.
  # Sending responses needs to write the socket that is previously used only to read
  # and a handler has 2 watchers that is used to read and to write.
  # This problem occurs possibly because those watchers are thought to be related to each other
  # and when detaching one of them the other is also detached for some reasons.
  # As a workaround, check if watchers are attached before detaching them.
  @loop.watchers.each {|w| w.detach if w.attached? }
  @loop.stop
  @usock.close
  @thread.join
  @lsock.close

  super
end
start() click to toggle source
Calls superclass method Fluent::Compat::Input#start
# File lib/fluent/plugin/in_forward.rb, line 130
def start
  super

  @loop = Coolio::Loop.new

  socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH']
  if Fluent.windows?
    socket_manager_path = socket_manager_path.to_i
  end
  client = ServerEngine::SocketManager::Client.new(socket_manager_path)

  @lsock = listen(client)
  @loop.attach(@lsock)

  @usock = client.listen_udp(@bind, @port)
  @usock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
  @hbr = HeartbeatRequestHandler.new(@usock, method(:on_heartbeat_request))
  @loop.attach(@hbr)

  @thread = Thread.new(&method(:run))
end

Private Instance Methods

add_source_host(es, host) click to toggle source
# File lib/fluent/plugin/in_forward.rb, line 401
def add_source_host(es, host)
  new_es = MultiEventStream.new
  es.each { |time, record|
    record[@source_hostname_key] = host
    new_es.add(time, record)
  }
  new_es
end
check_and_skip_invalid_event(tag, es, peeraddr) click to toggle source
# File lib/fluent/plugin/in_forward.rb, line 389
def check_and_skip_invalid_event(tag, es, peeraddr)
  new_es = MultiEventStream.new
  es.each { |time, record|
    if invalid_event?(tag, time, record)
      log.warn "skip invalid event:", source: source_message(peeraddr), tag: tag, time: time, record: record
      next
    end
    new_es.add(time, record)
  }
  new_es
end
check_ping(message, remote_addr, user_auth_salt, nonce) click to toggle source
Authentication Handshake
  1. (client) connect to server

* Socket handshake, checks certificate and its significate (in client, if using SSL)
  1. (server)

* check network/domain acl (if enabled)
* disconnect when failed
  1. (server) send HELO

* ['HELO', options(hash)]
* options:
  * nonce: string (required)
  * auth: string or blank_string (string: authentication required, and its salt is this value)
  1. (client) send PING

* ['PING', selfhostname, sharedkey_salt, sha512_hex(sharedkey_salt + selfhostname + nonce + sharedkey), username || '', sha512_hex(auth_salt + username + password) || '']
  1. (server) check PING

* check sharedkey
* check username / password (if required)
* send PONG FAILURE if failed
* ['PONG', false, 'reason of authentication failure', '', '']
  1. (server) send PONG

* ['PONG', bool(authentication result), 'reason if authentication failed', selfhostname, sha512_hex(salt + selfhostname + nonce + sharedkey)]
  1. (client) check PONG

* check sharedkey
* disconnect when failed
  1. connection established

* send data from client
# File lib/fluent/plugin/in_forward.rb, line 460
def check_ping(message, remote_addr, user_auth_salt, nonce)
  log.debug "checking ping"
  # ['PING', self_hostname, shared_key_salt, sha512_hex(shared_key_salt + self_hostname + nonce + shared_key), username || '', sha512_hex(auth_salt + username + password) || '']
  unless message.size == 6 && message[0] == 'PING'
    return false, 'invalid ping message'
  end
  _ping, hostname, shared_key_salt, shared_key_hexdigest, username, password_digest = message

  node = @nodes.select{|n| n[:address].include?(remote_addr) rescue false }.first
  if !node && !@security.allow_anonymous_source
    log.warn "Anonymous client disallowed", address: remote_addr, hostname: hostname
    return false, "anonymous source host '#{remote_addr}' denied", nil
  end

  shared_key = node ? node[:shared_key] : @security.shared_key
  serverside = Digest::SHA512.new.update(shared_key_salt).update(hostname).update(nonce).update(shared_key).hexdigest
  if shared_key_hexdigest != serverside
    log.warn "Shared key mismatch", address: remote_addr, hostname: hostname
    return false, 'shared_key mismatch', nil
  end

  if @security.user_auth
    users = select_authenticate_users(node, username)
    success = false
    users.each do |user|
      passhash = Digest::SHA512.new.update(user_auth_salt).update(username).update(user[:password]).hexdigest
      success ||= (passhash == password_digest)
    end
    unless success
      log.warn "Authentication failed", address: remote_addr, hostname: hostname, username: username
      return false, 'username/password mismatch', nil
    end
  end

  return true, shared_key_salt, shared_key
end
generate_helo(nonce, user_auth_salt) click to toggle source
# File lib/fluent/plugin/in_forward.rb, line 427
def generate_helo(nonce, user_auth_salt)
  log.debug "generating helo"
  # ['HELO', options(hash)]
  ['HELO', {'nonce' => nonce, 'auth' => (@security ? user_auth_salt : ''), 'keepalive' => !@deny_keepalive}]
end
generate_pong(auth_result, reason_or_salt, nonce, shared_key) click to toggle source
# File lib/fluent/plugin/in_forward.rb, line 497
def generate_pong(auth_result, reason_or_salt, nonce, shared_key)
  log.debug "generating pong"
  # ['PONG', bool(authentication result), 'reason if authentication failed', self_hostname, sha512_hex(salt + self_hostname + nonce + sharedkey)]
  unless auth_result
    return ['PONG', false, reason_or_salt, '', '']
  end

  shared_key_digest_hex = Digest::SHA512.new.update(reason_or_salt).update(@security.self_hostname).update(nonce).update(shared_key).hexdigest
  ['PONG', true, '', @security.self_hostname, shared_key_digest_hex]
end
generate_salt() click to toggle source
# File lib/fluent/plugin/in_forward.rb, line 423
def generate_salt
  OpenSSL::Random.random_bytes(16)
end
handle_connection(conn) click to toggle source
# File lib/fluent/plugin/in_forward.rb, line 198
def handle_connection(conn)
  send_data = ->(serializer, data){ conn.write serializer.call(data) }

  log.trace "connected fluent socket", address: conn.remote_addr, port: conn.remote_port
  state = :established
  nonce = nil
  user_auth_salt = nil

  if @security
    # security enabled session MUST use MessagePack as serialization format
    state = :helo
    nonce = generate_salt
    user_auth_salt = generate_salt
    send_data.call(:to_msgpack.to_proc, generate_helo(nonce, user_auth_salt))
    state = :pingpong
  end

  log.trace "accepted fluent socket", address: conn.remote_addr, port: conn.remote_port

  read_messages(conn) do |msg, chunk_size, serializer|
    case state
    when :pingpong
      success, reason_or_salt, shared_key = check_ping(msg, conn.remote_addr, user_auth_salt, nonce)
      unless success
        send_data.call(serializer, generate_pong(false, reason_or_salt, nonce, shared_key))
        conn.close
        next
      end
      send_data.call(serializer, generate_pong(true, reason_or_salt, nonce, shared_key))

      log.debug "connection established", address: conn.remote_addr, port: conn.remote_port
      state = :established
    when :established
      options = on_message(msg, chunk_size, conn.remote_addr)
      if options && r = response(options)
        send_data.call(serializer, r)
        log.trace "sent response to fluent socket", address: conn.remote_addr, response: r
        if @deny_keepalive
          conn.on_write_complete do
            conn.close
          end
        end
      else
        if @deny_keepalive
          conn.close
        end
      end
    else
      raise "BUG: unknown session state: #{state}"
    end
  end
end
invalid_event?(tag, time, record) click to toggle source
# File lib/fluent/plugin/in_forward.rb, line 385
def invalid_event?(tag, time, record)
  !((time.is_a?(Integer) || time.is_a?(::Fluent::EventTime)) && record.is_a?(Hash) && tag.is_a?(String))
end
on_heartbeat_request(host, port, msg) click to toggle source
# File lib/fluent/plugin/in_forward.rb, line 609
def on_heartbeat_request(host, port, msg)
  #log.trace "heartbeat request from #{host}:#{port}"
  begin
    @usock.send "\0", 0, host, port
  rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR
  end
end
on_message(msg, chunk_size, peeraddr) click to toggle source

message Entry {

1: long time
2: object record

}

message Forward {

1: string tag
2: list<Entry> entries
3: object option (optional)

}

message PackedForward {

1: string tag
2: raw entries  # msgpack stream of Entry
3: object option (optional)

}

message Message {

1: string tag
2: long? time
3: object record
4: object option (optional)

}

# File lib/fluent/plugin/in_forward.rb, line 314
def on_message(msg, chunk_size, peeraddr)
  if msg.nil?
    # for future TCP heartbeat_request
    return
  end

  # TODO: raise an exception if broken chunk is generated by recoverable situation
  unless msg.is_a?(Array)
    log.warn "incoming chunk is broken:", source: source_message(peeraddr), msg: msg
    return
  end

  tag = msg[0]
  entries = msg[1]

  if @chunk_size_limit && (chunk_size > @chunk_size_limit)
    log.warn "Input chunk size is larger than 'chunk_size_limit', dropped:", tag: tag, source: source_message(peeraddr), limit: @chunk_size_limit, size: chunk_size
    return
  elsif @chunk_size_warn_limit && (chunk_size > @chunk_size_warn_limit)
    log.warn "Input chunk size is larger than 'chunk_size_warn_limit':", tag: tag, source: source_message(peeraddr), limit: @chunk_size_warn_limit, size: chunk_size
  end

  case entries
  when String
    # PackedForward
    option = msg[2]
    size = (option && option['size']) || 0
    es_class = (option && option['compressed'] == 'gzip') ? CompressedMessagePackEventStream : MessagePackEventStream
    es = es_class.new(entries, nil, size.to_i)
    es = check_and_skip_invalid_event(tag, es, peeraddr) if @skip_invalid_event
    es = add_source_host(es, peeraddr[2]) if @source_hostname_key
    router.emit_stream(tag, es)

  when Array
    # Forward
    es = if @skip_invalid_event
           check_and_skip_invalid_event(tag, entries, peeraddr)
         else
           es = MultiEventStream.new
           entries.each { |e|
             record = e[1]
             next if record.nil?
             time = e[0]
             time = (now ||= Engine.now) if time.to_i == 0
             es.add(time, record)
           }
           es
         end
    es = add_source_host(es, peeraddr[2]) if @source_hostname_key
    router.emit_stream(tag, es)
    option = msg[2]

  else
    # Message
    time = msg[1]
    record = msg[2]
    if @skip_invalid_event && invalid_event?(tag, time, record)
      log.warn "got invalid event and drop it:", source: source_message(peeraddr), tag: tag, time: time, record: record
      return msg[3] # retry never succeeded so return ack and drop incoming event.
    end
    return if record.nil?
    time = Engine.now if time.to_i == 0
    record[@source_hostname_key] = peeraddr[2] if @source_hostname_key
    router.emit(tag, time, record)
    option = msg[3]
  end

  # return option for response
  option
end
read_messages(conn, &block) click to toggle source
# File lib/fluent/plugin/in_forward.rb, line 251
def read_messages(conn, &block)
  feeder = nil
  serializer = nil
  bytes = 0
  conn.on_data do |data|
    # only for first call of callback
    unless feeder
      first = data[0]
      if first == '{' || first == '[' # json
        parser = Yajl::Parser.new
        parser.on_parse_complete = ->(obj){
          block.call(obj, bytes, serializer)
          bytes = 0
        }
        serializer = :to_json.to_proc
        feeder = ->(d){ parser << d }
      else # msgpack
        parser = Fluent::Engine.msgpack_factory.unpacker
        serializer = :to_msgpack.to_proc
        feeder = ->(d){
          parser.feed_each(d){|obj|
            block.call(obj, bytes, serializer)
            bytes = 0
          }
        }
      end
    end

    bytes += data.bytesize
    feeder.call(data)
  end
end
response(option) click to toggle source
# File lib/fluent/plugin/in_forward.rb, line 284
def response(option)
  if option && option['chunk']
    return { 'ack' => option['chunk'] }
  end
  nil
end
select_authenticate_users(node, username) click to toggle source
# File lib/fluent/plugin/in_forward.rb, line 415
def select_authenticate_users(node, username)
  if node.nil? || node[:users].empty?
    @security.users.select{|u| u.username == username}
  else
    @security.users.select{|u| node[:users].include?(u.username) && u.username == username}
  end
end
source_message(peeraddr) click to toggle source
# File lib/fluent/plugin/in_forward.rb, line 410
def source_message(peeraddr)
  _, port, host, addr = peeraddr
  "host: #{host}, addr: #{addr}, port: #{port}"
end