MessagePack FixArray length = 3 (if @extend_internal_protocol)
= 2 (else)
# File lib/fluent/plugin/out_forward.rb, line 42 def initialize super require 'fluent/plugin/socket_util' @nodes = [] #=> [Node] end
# File lib/fluent/plugin/out_forward.rb, line 94 def configure(conf) super # backward compatibility if host = conf['host'] port = conf['port'] port = port ? port.to_i : DEFAULT_LISTEN_PORT e = conf.add_element('server') e['host'] = host e['port'] = port.to_s end recover_sample_size = @recover_wait / @heartbeat_interval # add options here if any options addes which uses extended protocol @extend_internal_protocol = if @require_ack_response true else false end if @dns_round_robin if @heartbeat_type == :udp raise ConfigError, "forward output heartbeat type must be 'tcp' or 'none' to use dns_round_robin option" end end conf.elements.each {|e| next if e.name != "server" host = e['host'] port = e['port'] port = port ? port.to_i : DEFAULT_LISTEN_PORT weight = e['weight'] weight = weight ? weight.to_i : 60 standby = !!e['standby'] name = e['name'] unless name name = "#{host}:#{port}" end failure = FailureDetector.new(@heartbeat_interval, @hard_timeout, Time.now.to_i.to_f) node_conf = NodeConfig.new(name, host, port, weight, standby, failure, @phi_threshold, recover_sample_size, @expire_dns_cache, @phi_failure_detector, @dns_round_robin) if @heartbeat_type == :none @nodes << NoneHeartbeatNode.new(log, node_conf) else @nodes << Node.new(log, node_conf) end log.info "adding forwarding server '#{name}'", host: host, port: port, weight: weight, plugin_id: plugin_id } if @nodes.empty? raise ConfigError, "forward output plugin requires at least one <server> is required" end end
# File lib/fluent/plugin/out_forward.rb, line 191 def run @loop.run if @loop rescue log.error "unexpected error", error: $!.to_s log.error_backtrace end
# File lib/fluent/plugin/out_forward.rb, line 181 def shutdown @finished = true if @loop @loop.watchers.each {|w| w.detach } @loop.stop end @thread.join if @thread @usock.close if @usock end
# File lib/fluent/plugin/out_forward.rb, line 156 def start super @rand_seed = Random.new.seed rebuild_weight_array @rr = 0 unless @heartbeat_type == :none @loop = Coolio::Loop.new if @heartbeat_type == :udp # assuming all hosts use udp @usock = SocketUtil.create_udp_socket(@nodes.first.host) @usock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) @hb = HeartbeatHandler.new(@usock, method(:on_heartbeat)) @loop.attach(@hb) end @timer = HeartbeatRequestTimer.new(@heartbeat_interval, method(:on_timer)) @loop.attach(@timer) @thread = Thread.new(&method(:run)) end end
# File lib/fluent/plugin/out_forward.rb, line 198 def write_objects(tag, chunk) return if chunk.empty? error = nil wlen = @weight_array.length wlen.times do @rr = (@rr + 1) % wlen node = @weight_array[@rr] if node.available? begin send_data(node, tag, chunk) return rescue # for load balancing during detecting crashed servers error = $! # use the latest error end end end if error raise error else raise "no nodes are available" # TODO message end end
# File lib/fluent/plugin/out_forward.rb, line 376 def connect(node) # TODO unix socket? TCPSocket.new(node.resolved_host, node.port) end
# File lib/fluent/plugin/out_forward.rb, line 274 def forward_header if @extend_internal_protocol FORWARD_HEADER_EXT else FORWARD_HEADER end end
# File lib/fluent/plugin/out_forward.rb, line 436 def on_heartbeat(sockaddr, msg) port, host = Socket.unpack_sockaddr_in(sockaddr) if node = @nodes.find {|n| n.sockaddr == sockaddr } #log.trace "heartbeat from '#{node.name}'", :host=>node.host, :port=>node.port if node.heartbeat rebuild_weight_array end end end
# File lib/fluent/plugin/out_forward.rb, line 394 def on_timer return if @finished @nodes.each {|n| if n.tick rebuild_weight_array end begin #log.trace "sending heartbeat #{n.host}:#{n.port} on #{@heartbeat_type}" if @heartbeat_type == :tcp send_heartbeat_tcp(n) else @usock.send "\00"", 0, Socket.pack_sockaddr_in(n.port, n.resolved_host) end rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR, Errno::ECONNREFUSED # TODO log log.debug "failed to send heartbeat packet to #{n.host}:#{n.port}", error: $!.to_s end } end
# File lib/fluent/plugin/out_forward.rb, line 228 def rebuild_weight_array standby_nodes, regular_nodes = @nodes.partition {|n| n.standby? } lost_weight = 0 regular_nodes.each {|n| unless n.available? lost_weight += n.weight end } log.debug "rebuilding weight array", lost_weight: lost_weight if lost_weight > 0 standby_nodes.each {|n| if n.available? regular_nodes << n log.warn "using standby node #{n.host}:#{n.port}", weight: n.weight lost_weight -= n.weight break if lost_weight <= 0 end } end weight_array = [] gcd = regular_nodes.map {|n| n.weight }.inject(0) {|r,w| r.gcd(w) } regular_nodes.each {|n| (n.weight / gcd).times { weight_array << n } } # for load balancing during detecting crashed servers coe = (regular_nodes.size * 6) / weight_array.size weight_array *= coe if coe > 1 r = Random.new(@rand_seed) weight_array.sort_by! { r.rand } @weight_array = weight_array end
# File lib/fluent/plugin/out_forward.rb, line 298 def send_data(node, tag, chunk) sock = connect(node) begin opt = [1, @send_timeout.to_i].pack('I!I!') # { int l_onoff; int l_linger; } sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt) opt = [@send_timeout.to_i, 0].pack('L!L!') # struct timeval sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, opt) # beginArray(2) sock.write forward_header # writeRaw(tag) sock.write tag.to_msgpack # tag # beginRaw(size) sz = chunk.size #if sz < 32 # # FixRaw # sock.write [0xa0 | sz].pack('C') #elsif sz < 65536 # # raw 16 # sock.write [0xda, sz].pack('Cn') #else # raw 32 sock.write [0xdb, sz].pack('CN') #end # writeRawBody(packed_es) chunk.write_to(sock) if @extend_internal_protocol option = {} option['chunk'] = Base64.encode64(chunk.unique_id) if @require_ack_response sock.write option.to_msgpack if @require_ack_response && @ack_response_timeout > 0 # Waiting for a response here results in a decrease of throughput because a chunk queue is locked. # To avoid a decrease of troughput, it is necessary to prepare a list of chunks that wait for responses # and process them asynchronously. if IO.select([sock], nil, nil, @ack_response_timeout) raw_data = sock.recv(1024) # When connection is closed by remote host, socket is ready to read and #recv returns an empty string that means EOF. # If this happens we assume the data wasn't delivered and retry it. if raw_data.empty? @log.warn "node #{node.host}:#{node.port} closed the connection. regard it as unavailable." node.disable! raise ForwardOutputConnectionClosedError, "node #{node.host}:#{node.port} closed connection" else # Serialization type of the response is same as sent data. res = MessagePack.unpack(raw_data) if res['ack'] != option['chunk'] # Some errors may have occured when ack and chunk id is different, so send the chunk again. raise ForwardOutputResponseError, "ack in response and chunk id in sent data are different" end end else # IO.select returns nil on timeout. # There are 2 types of cases when no response has been received: # (1) the node does not support sending responses # (2) the node does support sending response but responses have not arrived for some reasons. @log.warn "no response from #{node.host}:#{node.port}. regard it as unavailable." node.disable! raise ForwardOutputACKTimeoutError, "node #{node.host}:#{node.port} does not return ACK" end end end node.heartbeat(false) return res # for test ensure sock.close end end
FORWARD_TCP_HEARTBEAT_DATA = FORWARD_HEADER + ''.to_msgpack + [].to_msgpack
# File lib/fluent/plugin/out_forward.rb, line 283 def send_heartbeat_tcp(node) sock = connect(node) begin opt = [1, @send_timeout.to_i].pack('I!I!') # { int l_onoff; int l_linger; } sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt) opt = [@send_timeout.to_i, 0].pack('L!L!') # struct timeval # don't send any data to not cause a compatibility problem #sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, opt) #sock.write FORWARD_TCP_HEARTBEAT_DATA node.heartbeat(true) ensure sock.close end end