美文网首页技术干货Java学习笔记java进阶干货
并行入门之生产者消费者Java实现

并行入门之生产者消费者Java实现

作者: 王强儿 | 来源:发表于2017-05-14 12:11 被阅读568次
    并行入门之生产者消费者Java实现

    由于工作原因,最近碰上一个大数据量的操作(主要的操作是生成上亿条测试数据,提交到索引服务器上),程序慢到不可接受。于是想办法让程序快起来。

    说到Java的多线程编程,首先想到继承Thread,重写run方法,新建一个Thread对象,调用start方法。

    线程

    
    package com.github.yfor.bigdata.tdg;
    
    /**
     * Created by wq on 2017/5/14.
     */
    public class TestThread extends Thread {
        public TestThread() {
        }
    
        public void run() {
    
        }
        public static void main(String[] args) {
            TestThread tt= new TestThread();
            tt.start();
            //一些操作
        }
    }
    
    
    
    
    

    这种方式,已经是并发了,对于线程的生命周期,join ,yield等方法这里就不讲了,还是找本书去看比较靠谱。很多同志多线程的学习到这里可能就结束了。

    最原始的方式有一些问题,竟然很多人在生产环境使用,还一脸的自豪感(我也是这样过)。要新建多少线程?管他呢。 如果一直创建线程会大量抢占资源。当然有点经验的同志会建议使用线程池。

    线程池的用法。

    // Executors.newCachedThreadPool();        //创建一个缓冲池,缓冲池容量大小为Integer.MAX_VALUE
    // Executors.newSingleThreadExecutor();   //创建容量为1的缓冲池
    // Executors.newFixedThreadPool(int);    //创建固定容量大小的缓冲池
    
    
    class MyTask implements Runnable {
       
         
        public MyTask() {
        }
         
        @Override
        public void run() {
      //do something
      }
    }
    
       ExecutorService executor =  Executors.newFixedThreadPool(5)
       MyTask myTask = new MyTask();
       executor.execute(myTask);
    

    对于单次提交数据的数量,当然单次数量越少越快,但是次数会变多,总体时间会变长,单次提交过多,执行会非常慢,以至于可能会失败,经过多次测试数据量在几千到一万时是比较能够接受的。

    选择那种线程池呢,是固定大小的,还是无限增长的。当线程数量超过限制时会如何呢?这几种线程池都会抛出异常。

    有一定经验的同志会不屑的说阻塞的线程池,基本就比较靠谱,例如加上等待队列,等待队列用一个阻塞的队列。小的缺点是一直创建线程,感觉也不是非常的合理。

    带队列的线程池

    ThreadPoolExecutor  executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
                     new ArrayBlockingQueue(5));
    
    

    由于前几天正好看到多线程的书籍,突然想起了,我的程序明显可以使用生产者消费者模式,于是对程序进行了改进。


    并行入门之生产者消费者Java实现

    主程序

    
    
    package com.github.yfor.bigdata.tdg;
    
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    /**
     * Created by wq on 2017/5/14.
     */
    public class Main {
    
        public static void main(String[] args) throws InterruptedException {
            int threadNum = Runtime.getRuntime().availableProcessors() * 2;
            ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(100);
            ExecutorService executor = Executors.newFixedThreadPool(5);
            for (int i = 0; i < threadNum; i++) {
                executor.execute(new Consumerlocal(queue));
            }
            Thread pt = new Thread(new Producerlocal(queue));
            pt.start();
            pt.join();
            for (int i = 0; i < threadNum; i++) {
                queue.put("poisonpill");
            }
    
            executor.shutdown();
            executor.awaitTermination(10L, TimeUnit.DAYS);
        }
    }
    
    

    生产者

    
    package com.github.yfor.bigdata.tdg;
    
    import java.util.concurrent.ArrayBlockingQueue;
    
    /**
     * Created by wq on 2017/5/14.
     */
    public class Producerlocal implements Runnable {
    
    
        ArrayBlockingQueue<String> queue;
    
        public Producerlocal(ArrayBlockingQueue<String> queue) {
            this.queue = queue;
        }
    
        @Override
        public void run() {
    
            try {
                for (int i = 0; i < 1000; i++) {
                    queue.put("s" + i);
                }
    
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    

    消费者

    package com.github.yfor.bigdata.tdg;
    
    import java.util.concurrent.ArrayBlockingQueue;
    
    /**
     * Created by wq on 2017/5/14.
     */
    public class Consumerlocal implements Runnable {
    
        ArrayBlockingQueue<String> queue;
    
        public Consumerlocal(ArrayBlockingQueue<String> queue) {
            this.queue = queue;
        }
    
        @Override
        public void run() {
            while (true) {
                try {
                    final String take = queue.take();
    
                    if ("poisonpill".equals(take)) {
                        return;
                    }
                    //do something
                    System.out.println(take);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    

    程序使用了阻塞队列,队列设置一定的大小,加入队列超过数量会阻塞,队列空了取值也会阻塞,感兴趣的同学可以查看jdk源码。消费者线程数是CPU的两倍,对于这些类的使用需要查看手册和写测试代码。对于何时结束线程也有一定的小技巧,加入足够数量的毒丸。

    对于代码使用了新的模式,程序明显加快了,到这里生产者消费者模式基本就结束了。如果你下次想起你的程序也需要多线程,正好适合这种模式,那么套用进来就是很好的选择。当然你现在能做的就是撸起袖子,写一些测试代码,找到这种模式的感觉。

    因为程序的大多数时间还是在http请求上,程序的运行时间仍然不能够接受。于是想到了利用异步io加快速度,而不用阻塞的http。但是问题是这次的http客户端为了安全验证进行了修改,有加密验证和单点登录,新的客户端能适配起来有一定难度估计需要一定的时间,还是怕搞不定。异步的非阻塞io,对于前面数据结果选择的经验,非阻塞不一定就是好!其实是没太看懂怎么在多线程中使用,而对于所得到的效果就不得而知了。

    maven依赖

            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka_2.11</artifactId>
                <version>0.10.2.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.httpcomponents</groupId>
                <artifactId>httpasyncclient</artifactId>
                <version>4.1.3</version>
            </dependency>
    

    异步http

    /*
     * ====================================================================
     * Licensed to the Apache Software Foundation (ASF) under one
     * or more contributor license agreements.  See the NOTICE file
     * distributed with this work for additional information
     * regarding copyright ownership.  The ASF licenses this file
     * to you under the Apache License, Version 2.0 (the
     * "License"); you may not use this file except in compliance
     * with the License.  You may obtain a copy of the License at
     *
     *   http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing,
     * software distributed under the License is distributed on an
     * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
     * KIND, either express or implied.  See the License for the
     * specific language governing permissions and limitations
     * under the License.
     * ====================================================================
     *
     * This software consists of voluntary contributions made by many
     * individuals on behalf of the Apache Software Foundation.  For more
     * information on the Apache Software Foundation, please see
     * <http://www.apache.org/>.
     *
     */
    package com.github.yfor.bigdata.tdg;
    
    import org.apache.http.HttpHost;
    import org.apache.http.HttpRequest;
    import org.apache.http.HttpResponse;
    import org.apache.http.client.methods.HttpGet;
    import org.apache.http.impl.nio.client.CloseableHttpPipeliningClient;
    import org.apache.http.impl.nio.client.HttpAsyncClients;
    import org.apache.http.nio.IOControl;
    import org.apache.http.nio.client.methods.AsyncCharConsumer;
    import org.apache.http.nio.protocol.BasicAsyncRequestProducer;
    import org.apache.http.protocol.HttpContext;
    
    import java.io.IOException;
    import java.nio.CharBuffer;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.Future;
    
    /**
     * This example demonstrates a pipelinfed execution of multiple HTTP request / response exchanges
     * with a full content streaming.
     */
    public class MainPhttpasyncclient {
    
        public static void main(final String[] args) throws Exception {
            CloseableHttpPipeliningClient httpclient = HttpAsyncClients.createPipelining();
            try {
                httpclient.start();
    
                HttpHost targetHost = new HttpHost("httpbin.org", 80);
                HttpGet[] resquests = {
                        new HttpGet("/"),
                        new HttpGet("/ip"),
                        new HttpGet("/headers"),
                        new HttpGet("/get")
                };
    
                List<MyRequestProducer> requestProducers = new ArrayList<MyRequestProducer>();
                List<MyResponseConsumer> responseConsumers = new ArrayList<MyResponseConsumer>();
                for (HttpGet request : resquests) {
                    requestProducers.add(new MyRequestProducer(targetHost, request));
                    responseConsumers.add(new MyResponseConsumer(request));
                }
    
                Future<List<Boolean>> future = httpclient.execute(
                        targetHost, requestProducers, responseConsumers, null);
                future.get();
                System.out.println("Shutting down");
            } finally {
                httpclient.close();
            }
            System.out.println("Done");
        }
    
        static class MyRequestProducer extends BasicAsyncRequestProducer {
    
            private final HttpRequest request;
    
            MyRequestProducer(final HttpHost target, final HttpRequest request) {
                super(target, request);
                this.request = request;
            }
    
            @Override
            public void requestCompleted(final HttpContext context) {
                super.requestCompleted(context);
                System.out.println();
                System.out.println("Request sent: " + this.request.getRequestLine());
                System.out.println("=================================================");
            }
        }
    
        static class MyResponseConsumer extends AsyncCharConsumer<Boolean> {
    
            private final HttpRequest request;
    
            MyResponseConsumer(final HttpRequest request) {
                this.request = request;
            }
    
            @Override
            protected void onResponseReceived(final HttpResponse response) {
                System.out.println();
                System.out.println("Response received: " + response.getStatusLine() + " -> " + this.request.getRequestLine());
                System.out.println("=================================================");
            }
    
            @Override
            protected void onCharReceived(final CharBuffer buf, final IOControl ioctrl) throws IOException {
                while (buf.hasRemaining()) {
                    buf.get();
                }
            }
    
            @Override
            protected void releaseResources() {
            }
    
            @Override
            protected Boolean buildResult(final HttpContext context) {
                System.out.println();
                System.out.println("=================================================");
                System.out.println();
                return Boolean.TRUE;
            }
    
        }
    
    }
    
    

    虽然可以放在服务器上进行运行,减少io,发挥16核CPU的威力(* 多消费者线程 *),但还是不理想。

    后来找同事请教,我的程序还是单机版,而我们接收服务器是集群的形式,程序改成了从zookeeper查询服务器集群多个节点的IP进行发送请求(* 多消费者接收节点 *),而且在请求参数中加大提交时间,进行软提交,而不是实时的commit,程序至此明显加快,达到了可以接受范围。

    我们还有一版的程序是利用kafak,这个分布式队列,将我们的生产者消费者进行到底。

    配置

    
    package com.github.yfor.bigdata.tdg;
    
    /**
     * Created by wq on 2017/4/29.
     */
    public interface KafkaProperties {
        final static String zkConnect = "localhost:2181";
        final static String groupId = "group21";
        final static String topic = "topic4";
        final static String kafkaServerURL = "localhost";
        final static int kafkaServerPort = 9092;
        final static int kafkaProducerBufferSize = 64 * 1024;
        final static int connectionTimeOut = 20000;
        final static int reconnectInterval = 10000;
    
        final static String clientId = "SimpleConsumerDemoClient";
    }
    

    kafka的配置需要一定的时间,可以阅读官方文档进行安装并运行。

    生产者线程

    package com.github.yfor.bigdata.tdg;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    
    public class Producer extends Thread {
        private final KafkaProducer<Integer, String> producer;
        private final String topic;
        private final Boolean isAsync;
        private final int size;
    
        public Producer(String topic) {
            Properties props = new Properties();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093");
            props.put(ConsumerConfig.CLIENT_ID_CONFIG, "DemoProducer");
            props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            producer = new KafkaProducer<Integer, String>(props);
            this.topic = topic;
            this.isAsync = true;
            this.size = producer.partitionsFor(topic).size();
    
        }
    
        @Override
        public void run() {
            int messageNo = 1;
            while (messageNo < 100) {
                try {
                    sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                String messageStr = "Message_" + messageNo;
                long startTime = System.currentTimeMillis();
                if (isAsync) { // Send asynchronously 异步
                    producer.send(new ProducerRecord<>(topic, messageNo % size, messageNo, messageStr),
                            new DemoCallBack(startTime, messageNo, messageStr));
                } else { // Send synchronously 同步
                    try {
                        producer.send(new ProducerRecord<>(topic, messageNo, messageStr)).get();
                        System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
                    } catch (InterruptedException | ExecutionException e) {
                        e.printStackTrace();
                    }
                }
                ++messageNo;
            }
    
        }
    
    }
    
    class DemoCallBack implements Callback {
    
        private final long startTime;
        private final int key;
        private final String message;
    
        public DemoCallBack(long startTime, int key, String message) {
            this.startTime = startTime;
            this.key = key;
            this.message = message;
        }
    
        @Override
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            long elapsedTime = System.currentTimeMillis() - startTime;
            if (metadata != null) {
                System.out.println(
                        "message(" + key + ", " + message + ") sent to partition(" + metadata.partition() +
                                "), " +
                                "offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
            } else {
                exception.printStackTrace();
            }
    
        }
    
    
    }
    

    消费者线程

    
    package com.github.yfor.bigdata.tdg;
    
    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.ConsumerIterator;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;
    import kafka.message.MessageAndMetadata;
    
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    /**
     * @author leicui bourne_cui@163.com
     */
    public class KafkaConsumer extends Thread {
        private final ConsumerConnector consumer;
        private final String topic;
        private final int size;
    
        public KafkaConsumer(String topic) {
            consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
                    createConsumerConfig());
            this.topic = topic;
            this.size = 5;
        }
    
        private static ConsumerConfig createConsumerConfig() {
            Properties props = new Properties();
            props.put("zookeeper.connect", KafkaProperties.zkConnect);
            props.put("group.id", KafkaProperties.groupId);
            props.put("zookeeper.session.timeout.ms", "40000");
            props.put("zookeeper.sync.time.ms", "200");
            props.put("auto.commit.interval.ms", "1000");
            return new ConsumerConfig(props);
        }
    
        @Override
        public void run() {
            try {
                sleep(2000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
            topicCountMap.put(topic, new Integer(size));
    
            ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
            Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
            List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
            ExecutorService executor = Executors.newFixedThreadPool(size);
            for (final KafkaStream stream : streams) {
                executor.submit(new KafkaConsumerThread(stream));
            }
    
        }
    }
    
    class KafkaConsumerThread implements Runnable {
    
        private KafkaStream<byte[], byte[]> stream;
    
        public KafkaConsumerThread(KafkaStream<byte[], byte[]> stream) {
            this.stream = stream;
        }
    
    
        public void run() {
            ConsumerIterator<byte[], byte[]> it = stream.iterator();
            while (it.hasNext()) {
                MessageAndMetadata<byte[], byte[]> mam = it.next();
                System.out.println(Thread.currentThread().getName() + ": partition[" + mam.partition() + "],"
                        + "offset[" + mam.offset() + "], " + new String(mam.message()));
    
            }
        }
    
    }
    

    这里的示例代码借鉴了几篇博客中的写法,能够实现多个消费者的处理。

    生产者客户端

    package com.github.yfor.bigdata.tdg;
    
    public class MainP {
    
        public static void main(String[] args) {
            Producer producerThread = new Producer(KafkaProperties.topic);
            producerThread.start();
    
    
        }
    }
    
    

    消费者客户端

    package com.github.yfor.bigdata.tdg;
    
    public class MainC {
    
        public static void main(String[] args) {
    
            KafkaConsumer consumerThread = new KafkaConsumer(KafkaProperties.topic);
            consumerThread.start();
        }
    }
    
    

    示例代码为了容易理解,减少了代码量,去除了相关的业务逻辑,希望大家能看到通用的模式设计,而不是看到玩具的模式。建议大家进行测试执行实验,把分布式消息队列加入到自己的工具箱,对求职的竞争力也有一定的提升。

    如果不是很理解,也不用担心,因为对于其中的部分细节我也没有很好的掌握,经过应用才感觉能有一点体会。

    参考资料

    • 七周七并发模型
    • solr参考手册
    • http异步客户端文档
    • kafak官网文档
    • 网上的文章

    相关文章

      网友评论

        本文标题:并行入门之生产者消费者Java实现

        本文链接:https://www.haomeiwen.com/subject/iyhqxxtx.html