class Fluent::ForwardInput::Handler

Constants

PEERADDR_FAILED

Attributes

protocol[R]
remote_addr[R]
remote_host[R]
remote_port[R]

Public Class Methods

new(io, linger_timeout, log, on_connect_callback) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_forward.rb, line 513
def initialize(io, linger_timeout, log, on_connect_callback)
  super(io)

  @peeraddr = nil
  if io.is_a?(TCPSocket) # for unix domain socket support in the future
    @peeraddr = (io.peeraddr rescue PEERADDR_FAILED)
    opt = [1, linger_timeout].pack('I!I!')  # { int l_onoff; int l_linger; }
    io.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt)
  end

  ### TODO: disabling name rev resolv
  proto, port, host, addr = ( io.peeraddr rescue PEERADDR_FAILED )
  if addr == '?'
    port, addr = *Socket.unpack_sockaddr_in(io.getpeername) rescue nil
  end
  @protocol = proto
  @remote_port = port
  @remote_addr = addr
  @remote_host = host
  @writing = false
  @closing = false
  @mutex = Mutex.new

  @chunk_counter = 0
  @on_connect_callback = on_connect_callback
  @log = log
  @log.trace {
    begin
      remote_port, remote_addr = *Socket.unpack_sockaddr_in(@_io.getpeername)
    rescue
      remote_port = nil
      remote_addr = nil
    end
    [ "accepted fluent socket", {address: remote_addr, port: remote_port, instance: self.object_id} ]
  }
end

Public Instance Methods

close() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_forward.rb, line 577
def close
  writing = @mutex.synchronize {
    @closing = true
    @writing
  }
  unless writing
    super
  end
end
on_connect() click to toggle source
# File lib/fluent/plugin/in_forward.rb, line 550
def on_connect
  @on_connect_callback.call(self)
end
on_data(&callback) click to toggle source

API to register callback for data arrival

# File lib/fluent/plugin/in_forward.rb, line 555
def on_data(&callback)
  @on_read_callback = callback
end
on_read(data) click to toggle source
# File lib/fluent/plugin/in_forward.rb, line 559
def on_read(data)
  @on_read_callback.call(data)
rescue => e
  @log.error "unexpected error on reading data from client", address: @remote_addr, error: e
  @log.error_backtrace
  close
end
on_write_complete() click to toggle source
# File lib/fluent/plugin/in_forward.rb, line 567
def on_write_complete
  closing = @mutex.synchronize {
    @writing = false
    @closing
  }
  if closing
    close
  end
end