美文网首页
聊聊flink的SocketClientSink

聊聊flink的SocketClientSink

作者: go4it | 来源:发表于2018-12-02 11:31 被阅读60次

    本文主要研究一下flink的SocketClientSink

    DataStream.writeToSocket

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java

        /**
         * Writes the DataStream to a socket as a byte array. The format of the
         * output is specified by a {@link SerializationSchema}.
         *
         * @param hostName
         *            host of the socket
         * @param port
         *            port of the socket
         * @param schema
         *            schema for serialization
         * @return the closed DataStream
         */
        @PublicEvolving
        public DataStreamSink<T> writeToSocket(String hostName, int port, SerializationSchema<T> schema) {
            DataStreamSink<T> returnStream = addSink(new SocketClientSink<>(hostName, port, schema, 0));
            returnStream.setParallelism(1); // It would not work if multiple instances would connect to the same port
            return returnStream;
        }
    
    • DataStream的writeToSocket方法,内部创建了SocketClientSink,这里传递了四个构造参数,分别是hostName、port、schema、maxNumRetries(这里为0)

    SocketClientSink

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java

    /**
     * Socket client that acts as a streaming sink. The data is sent to a Socket as a byte array.
     *
     * <p>The sink can be set to retry message sends after the sending failed.
     *
     * <p>The sink can be set to 'autoflush', in which case the socket stream is flushed after every
     * message. This significantly reduced throughput, but also decreases message latency.
     *
     * @param <IN> data to be written into the Socket.
     */
    @PublicEvolving
    public class SocketClientSink<IN> extends RichSinkFunction<IN> {
    
        private static final long serialVersionUID = 1L;
    
        private static final Logger LOG = LoggerFactory.getLogger(SocketClientSink.class);
    
        private static final int CONNECTION_RETRY_DELAY = 500;
    
    
        private final SerializableObject lock = new SerializableObject();
        private final SerializationSchema<IN> schema;
        private final String hostName;
        private final int port;
        private final int maxNumRetries;
        private final boolean autoFlush;
    
        private transient Socket client;
        private transient OutputStream outputStream;
    
        private int retries;
    
        private volatile boolean isRunning = true;
    
        /**
         * Creates a new SocketClientSink. The sink will not attempt to retry connections upon failure
         * and will not auto-flush the stream.
         *
         * @param hostName Hostname of the server to connect to.
         * @param port Port of the server.
         * @param schema Schema used to serialize the data into bytes.
         */
        public SocketClientSink(String hostName, int port, SerializationSchema<IN> schema) {
            this(hostName, port, schema, 0);
        }
    
        /**
         * Creates a new SocketClientSink that retries connections upon failure up to a given number of times.
         * A value of -1 for the number of retries will cause the system to retry an infinite number of times.
         * The sink will not auto-flush the stream.
         *
         * @param hostName Hostname of the server to connect to.
         * @param port Port of the server.
         * @param schema Schema used to serialize the data into bytes.
         * @param maxNumRetries The maximum number of retries after a message send failed.
         */
        public SocketClientSink(String hostName, int port, SerializationSchema<IN> schema, int maxNumRetries) {
            this(hostName, port, schema, maxNumRetries, false);
        }
    
        /**
         * Creates a new SocketClientSink that retries connections upon failure up to a given number of times.
         * A value of -1 for the number of retries will cause the system to retry an infinite number of times.
         *
         * @param hostName Hostname of the server to connect to.
         * @param port Port of the server.
         * @param schema Schema used to serialize the data into bytes.
         * @param maxNumRetries The maximum number of retries after a message send failed.
         * @param autoflush Flag to indicate whether the socket stream should be flushed after each message.
         */
        public SocketClientSink(String hostName, int port, SerializationSchema<IN> schema,
                                int maxNumRetries, boolean autoflush) {
            checkArgument(port > 0 && port < 65536, "port is out of range");
            checkArgument(maxNumRetries >= -1, "maxNumRetries must be zero or larger (num retries), or -1 (infinite retries)");
    
            this.hostName = checkNotNull(hostName, "hostname must not be null");
            this.port = port;
            this.schema = checkNotNull(schema);
            this.maxNumRetries = maxNumRetries;
            this.autoFlush = autoflush;
        }
    
        // ------------------------------------------------------------------------
        //  Life cycle
        // ------------------------------------------------------------------------
    
        /**
         * Initialize the connection with the Socket in the server.
         * @param parameters Configuration.
         */
        @Override
        public void open(Configuration parameters) throws Exception {
            try {
                synchronized (lock) {
                    createConnection();
                }
            }
            catch (IOException e) {
                throw new IOException("Cannot connect to socket server at " + hostName + ":" + port, e);
            }
        }
    
    
        /**
         * Called when new data arrives to the sink, and forwards it to Socket.
         *
         * @param value The value to write to the socket.
         */
        @Override
        public void invoke(IN value) throws Exception {
            byte[] msg = schema.serialize(value);
    
            try {
                outputStream.write(msg);
                if (autoFlush) {
                    outputStream.flush();
                }
            }
            catch (IOException e) {
                // if no re-tries are enable, fail immediately
                if (maxNumRetries == 0) {
                    throw new IOException("Failed to send message '" + value + "' to socket server at "
                            + hostName + ":" + port + ". Connection re-tries are not enabled.", e);
                }
    
                LOG.error("Failed to send message '" + value + "' to socket server at " + hostName + ":" + port +
                        ". Trying to reconnect..." , e);
    
                // do the retries in locked scope, to guard against concurrent close() calls
                // note that the first re-try comes immediately, without a wait!
    
                synchronized (lock) {
                    IOException lastException = null;
                    retries = 0;
    
                    while (isRunning && (maxNumRetries < 0 || retries < maxNumRetries)) {
    
                        // first, clean up the old resources
                        try {
                            if (outputStream != null) {
                                outputStream.close();
                            }
                        }
                        catch (IOException ee) {
                            LOG.error("Could not close output stream from failed write attempt", ee);
                        }
                        try {
                            if (client != null) {
                                client.close();
                            }
                        }
                        catch (IOException ee) {
                            LOG.error("Could not close socket from failed write attempt", ee);
                        }
    
                        // try again
                        retries++;
    
                        try {
                            // initialize a new connection
                            createConnection();
    
                            // re-try the write
                            outputStream.write(msg);
    
                            // success!
                            return;
                        }
                        catch (IOException ee) {
                            lastException = ee;
                            LOG.error("Re-connect to socket server and send message failed. Retry time(s): " + retries, ee);
                        }
    
                        // wait before re-attempting to connect
                        lock.wait(CONNECTION_RETRY_DELAY);
                    }
    
                    // throw an exception if the task is still running, otherwise simply leave the method
                    if (isRunning) {
                        throw new IOException("Failed to send message '" + value + "' to socket server at "
                                + hostName + ":" + port + ". Failed after " + retries + " retries.", lastException);
                    }
                }
            }
        }
    
        /**
         * Closes the connection with the Socket server.
         */
        @Override
        public void close() throws Exception {
            // flag this as not running any more
            isRunning = false;
    
            // clean up in locked scope, so there is no concurrent change to the stream and client
            synchronized (lock) {
                // we notify first (this statement cannot fail). The notified thread will not continue
                // anyways before it can re-acquire the lock
                lock.notifyAll();
    
                try {
                    if (outputStream != null) {
                        outputStream.close();
                    }
                }
                finally {
                    if (client != null) {
                        client.close();
                    }
                }
            }
        }
    
        // ------------------------------------------------------------------------
        //  Utilities
        // ------------------------------------------------------------------------
    
        private void createConnection() throws IOException {
            client = new Socket(hostName, port);
            client.setKeepAlive(true);
            client.setTcpNoDelay(true);
    
            outputStream = client.getOutputStream();
        }
    
        // ------------------------------------------------------------------------
        //  For testing
        // ------------------------------------------------------------------------
    
        int getCurrentNumberOfRetries() {
            synchronized (lock) {
                return retries;
            }
        }
    }
    
    • SocketClientSink继承了RichSinkFunction,其autoFlush属性默认为false
    • open方法里头调用了createConnection,来初始化与socket的连接,如果此时出现IOException,则立马fail fast;createConnection的时候,这里设置的keepAlive及tcpNoDelay均为true
    • invoke方法首先调用schema.serialize方法来序列化value,然后调用socket的outputStream.write,如果autoFlush为true的话,则立马flush outputStream;如果出现IOException则立马进行重试,这里重试的逻辑直接写在catch里头,根据maxNumRetries来,重试的时候,就是先createConnection,然后调用outputStream.write,重试的delay为CONNECTION_RETRY_DELAY(500)

    小结

    • DataStream的writeToSocket方法,内部创建了SocketClientSink,默认传递的maxNumRetries为0,而且没有调用带autoFlush属性默认为false的构造器,其autoFlush属性默认为false
    • open方法创建的socket,其keepAlive及tcpNoDelay均为true,如果open的时候出现IOException,则里头抛出异常终止运行
    • invoke方法比较简单,就是使用SerializationSchema来序列化value,然后write到outputStream;这里进行了简单的失败重试,默认的重试delay为CONNECTION_RETRY_DELAY(500),这个版本实现的重试比较简单,是同步进行的

    doc

    相关文章

      网友评论

          本文标题:聊聊flink的SocketClientSink

          本文链接:https://www.haomeiwen.com/subject/qckjcqtx.html