vert.x系列一

作者: suxin1932 | 来源:发表于2019-03-23 14:47 被阅读0次

学习资源

1.wiki:
https://github.com/vert-x3/wiki/wiki
2.
https://github.com/quanke/vert-x-core-manual-for-java
3.
https://github.com/vert-x3/vertx-examples/tree/master/maven-simplest/src/main/java/io/vertx/example
4.
https://vertxchina.github.io/vertx-translation-chinese/core/Core.html
5.
https://blog.csdn.net/king_kgh/article/details/80848571
6.
https://www.jdon.com/concurrent/vertx.html
7.
https://github.com/vert-x3/vertx-awesome
8.
https://blog.csdn.net/king_kgh/article/details/80772657
9.
https://me.csdn.net/king_kgh
10.
http://www.yidianzixun.com/article/0IIL5gDE
11.vertx降级与熔断,限流
https://www.jdon.com/51802
https://github.com/migibert/vertx-in-production/blob/master/src/main/java/co/teemo/blog/handlers/PokemonHandler.java
12.vertx异步handler原理
https://blog.csdn.net/lee_star1/article/details/86490093
13.异步请求处理
https://www.jianshu.com/p/062c2c6e21da?utm_campaign=maleskine&utm_content=note&utm_medium=seo_notes&utm_source=recommendation
14.vertx全异步框架
https://www.2cto.com/net/201702/599910.html
15.重要参考
https://blog.csdn.net/janwen2010/article/details/72954162
https://blog.csdn.net/weixin_41308834/article/details/78947968
https://www.sczyh30.com/posts/Vert-x/vertx-advanced-demystifying-thread-model/#Event_Loop_%E7%BA%BF%E7%A8%8B
16.
https://www.jdon.com/45898
https://github.com/vert-x3/vertx-examples/blob/master/core-examples/src/main/java/io/vertx/example/core/eventbus/messagecodec/Sender.java
17.vert.x监控
https://my.oschina.net/chkui/blog/707632

一些概念

#事件总线(EventBus)
事件总线是对发布-订阅模式的一种实现。
它是一种集中式事件处理机制,
允许不同的组件之间进行彼此通信而又不需要相互依赖,达到一种解耦的目的。
eventbus.png
spring5.0(spring5.0逐步替代springmvc.)或者vert.x均是reactive(响应式编程), 是一种思想, 特点:
事件驱动(发挥单台机器的性能):单台机器上, 用少量线程处理大量并发
异步处理请求(多个服务间通信阻塞较少, 响应时间降低): 服务器集群通信效率提高

1.关于vert.x

Eclipse Vert.x is a tool-kit for building reactive applications on the JVM.

Vert.x是一个异步无阻塞的网络框架,其参照物是node.js。
Vert.x最大的特点就在于异步(底层基于Netty), 
通过事件循环(EventLoop)来调起存储在异步任务队列(CallBackQueue)中的任务,
大大降低了传统阻塞模型中线程对于操作系统的开销。
因此相比较传统的阻塞模型,异步模型能够很大层度的提高系统的并发量。
Vert.x利用Netty4的EventLoop来做单线程的事件循环,
所以跑在Vert.x上的业务"不能做CPU密集型的运算",这样会导致整个线程被阻塞。

Vert.x除了异步之外,还提供了非常多的吸引人的技术,比如EventBus,
通过EventBus可以非常简单的实现分布式消息,进而为分布式系统调用,微服务奠定基础。
除此之外,还提供了对多种客户端的支持,比如Redis,RabbitMQ,Kafka等等。

1.1什么是Vert.x

简单说,Vert.x就是一堆的jar包,提供了一系列的编程API接口。
通过这些API,可以实现"异步编程"。

什么是异步编程?异步编程是Vert.x的一大特性,也是Vert.x的核心.
如ajax调用时代码执行顺序, 反映了异步。
Vert.x可以开发Web应用,但Vert.x不仅仅是一个Web开发框架,他更像Spring,是一个技术栈
(Vert.x生态可以查看https://github.com/vert-x3/vertx-awesome),或者说是一个Vert.x生态体系。
在这个体系中,Vert.x只是提供了Web开发的能力。下面对Vertx和Spring做一个对比:
项目 Spring Vertx
核心框架 spring-core vertx-core
Web开发 spring-webmvc vertx-web
jdbc框架 spring-jdbc vertx-jdbc-client
redis spring-data-redis vertx-redis-client
微服务 spring-cloud vertx-hazelcast
可以说,很多的spring能做的事情,Vertx也都能实现。
那么既然如此,Spring如此强大,社区如此活跃,为何还会有Vertx呢?
他们之前区别的核心点就只有一个,Spring的操作是同步的,Vertx的操作是异步的。
异步带来了更高的性能,但同时也带来了编码和调试的复杂度,
但不得不说异步可能是未来的一个趋势,至少在Java实现高性能服务器上的一个趋势。

1.2Vertx能干什么

#Java能做的,Vert.x都能做。

(1)Web开发
Vert.x封装了Web开发常用的组件,支持路由、Session管理、模板等,可以非常方便的进行Web开发。
..................................不需要容器................................

(2)TCP/UDP开发
Vert.x底层基于Netty,提供了丰富的IO类库,支持多种网络应用开发。
不需要处理底层细节(如拆包和粘包),注重业务代码编写。

(3)提供对WebSocket的支持
可以做网络聊天室,动态推送等。

(4)Event Bus(事件总线)
是Vert.x的神经系统,通过Event Bus可以实现分布式消息,远程方法调用等等。
正是因为Event Bus的存在,Vert.x可以非常便捷的开发微服务应用。

(5)支持主流的数据和消息的访问
redis mongodb rabbitmq kafka 等

(6)分布式锁,分布式计数器,分布式map的支持

1.3Vert.x的一些优势

(1)异步非阻塞**
Vert.x就像是跑在JVM之上的Nodejs,所以Vert.x的第一个优势就是这是一个异步非阻塞框架。
"异步也是Vert.x于其他的JavaWeb框架的主要区别。"

#先打印 1,再打印 3,最后打印 2。
-----------------------------------------------------------
System.out.println("1")
 
WebClient
    .create(vertx)
    .postAbs(REQUEST_URL) // 这里指定的是请求的地址
    .sendBuffer(buffer, res -> { // buffer是请求的数据
 
        if (res.succeeded()) {  
            // 请求远程服务成功
            System.out.println("2")
            
        } else {
            // 请求失败
            resultHandler.handle(Future.failedFuture("请求服务器失败..."));
        }
    });
 
System.out.println("3")
-----------------------------------------------------------
(2)Vertx支持多种编程语言
(3)不依赖中间件
Vert.x的底层依赖Netty,因此在使用Vert.x构建Web项目时,不依赖中间件。
像Node一样,可以直接创建一个HttServer,
可以直接运行main方法,启动一个Http服务,而不需要使用类似于Tomcat的中间件。
不依赖中间件进行开发,相对会更灵活一些,安全性也会更高一些。
(4)完善的生态
Vert.x和Spring的对比,有一种使用MacOS和Windows对比的感觉。
Vert.x和庞大的Spring家族体系不同,
Vert.x提供数据库操作,Redis操作,Web客户端操作等。
(5)为微服务而生
Vert .x提供了各种组件来构建基于微服务的应用程序。
通过EventBus可以非常容易的进行服务之间的交互。
并且提供了HAZELCAST来实现分布式。

* 对于复杂的业务,可能会遇到Callback Hell问题(解决方案也有很多)
* 由于异步的特征、契约创建、更高级的错误处理机制使得开发过程会相对更复杂。

1.4Vert.x技术体系

(1)core模块
Vert.x核心模块包含一些基础的功能,
如HTTP,TCP,文件系统访问,EventBus、WebSocket、延时与重复执行、缓存等其他基础的功能,
可以通过vertx-core模块引用即可。
(2)Web模块
Vert.x Web是一个工具集,虽然core模块提供了HTTP的支持,
但是要开发复杂的Web应用,还需要路由、Session、请求数据读取、Rest支持等等还需要Web模块,
Web模块提供了上述的这些功能的API,便于开发。

除了对Web服务的开发以外,还提供了对Web客户端请求的支持,
通过vertx-web-client即可方便的访问HTTP服务。

使用Vert.x一定要注意,Vert.x是一个异步框架,请求HTTP服务是一个耗时操作,
所有的耗时,都会阻塞EventBus,导致整体性能被拖垮,
因此,对于请求Web服务,一定要使用Vert.x提供的vertx-web-client模块
(3)数据访问模块
Vert.x提供了对关系型数据库、NoSQL、消息中间件的支持,
传统的客户端因为是阻塞的,会严重影响系统的性能,
因此Vert.x提供了对以上客户端的异步支持。
具体支持的数据访问如下:
MongoDB client,
JDBC client,
SQL common,
Redis client,
MySQL/PostgreSQLclient
(4)Reactive响应式编程
复杂的异步操作,会导致异步回调地狱的产生,下面的代码嵌套的层次太多,而通过reactive可以最小化的简化异步回调地狱。


----------------------------------------------------------------------------------
// create a test table
execute(conn.result(), "create table test(id int primary key, name varchar(255))", create -> {
  // start a transaction
  startTx(conn.result(), beginTrans -> {
    // insert some test data
    execute(conn.result(), "insert into test values(1, 'Hello')", insert -> {
      // commit data
      rollbackTx(conn.result(), rollbackTrans -> {
        // query some data
        query(conn.result(), "select count(*) from test", rs -> {
          for (JsonArray line : rs.getResults()) {
            System.out.println(line.encode());
          }
 
          // and close the connection
          conn.result().close(done -> {
            if (done.failed()) {
              throw new RuntimeException(done.cause());
            }
          });
        });
      });
    });
  });
});
----------------------------------------------------------------------------------


"处理后的代码:"
----------------------------------------------------------------------------------
public void scanPay(JsonObject data, Handler<AsyncResult<JsonObject>> resultHandler) {
    paramCheckStep(data) // 参数校验
            .flatMap(this::insertPayDtlStep) // 插入流水
            .flatMap(x -> requestStep(x, config)) // 请求上游
            .flatMap(this::cleanStep) //参数清理
            .subscribe(ok -> {
                        logger.info("成功结束");
                        resultHandler.handle(Future.succeededFuture(ok));
                    },
                    err -> {
                        logger.error("正在结束", err);
                        resultHandler.handle(Future.failedFuture(err));
                    }
 
            );
}
----------------------------------------------------------------------------------
(5)整合其他模块
"邮件客户端"
Vert.x提供了一简单STMP邮件客户端,所以你可以在应用程序中发送电子邮件。

"STOMP客户端与服务端"
Vert.x提供了STOMP协议的实现包括客户端与服务端。

"Consul Client"
consul是google开源的一个使用go语言开发的服务发现、配置管理中心服务。
内置了服务注册与发现框 架、分布一致性协议实现、健康检查、Key/Value存储、多数据中心方案。

"RabbitMQ Client  Kafka Client"
消息队里的客户端支持

"JCA适配器"
Vert.x提供了Java连接器架构适配器,这允许同任意JavaEE应用服务器进行互操作。
(6)认证与授权
Vert.x提供了简单API用于在应用中提供认证和授权。
Auth common 通用的认证API,可以通过重写AuthProvider类来实现自己的认证:
JDBC auth 后台为JDBC的认证实现
JWT auth 用JSON Web tokens认证实现
Shiro auth 使用Apache Shiro认证实现
MongoDB auth MongoDB认证实现
OAuth 2 Oauth2协义认证实现
htdigest auth 这个是新增一种认证的支持
(7)微服务

Vert.x提供多个组件构建基于微服务的应用程序。
比如服务发现(Vert.x Service Discovery)、断路器(Vert.x Circuit Breaker)、配置中心(Vert.x Config)等。

2.demo

2.1简单demo01-server

2.1gradle配置

plugins {
    id 'java'
    id 'war'
}

group 'com.zy'
version '1.0-SNAPSHOT'

sourceCompatibility = 1.8

repositories {
    // mavenCentral()
    maven {
        url 'http://maven.aliyun.com/nexus/content/groups/public/'
    }
}

dependencies {
    testCompile group: 'junit', name: 'junit', version: '4.11'
    testCompile group: 'junit', name: 'junit', version: '4.12'
    compile 'io.vertx:vertx-core:3.6.3'
    compile 'io.vertx:vertx-web:3.6.3'
}

2.2demo-server

package com.zy.server;

import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.BodyHandler;

public class Vertx02 {

    public static void main(String[] args) {
        // 从工厂中获取vertx实例
        Vertx vertx = Vertx.vertx();
        // 创建httpServer
        HttpServer httpServer = vertx.createHttpServer();
        // 创建web路由器
        Router router = Router.router(vertx);

        //////////////////////////////////   同步处理的请求   ////////////////////////////////////////
        // 针对/sync路径下的所有类型请求
        router.route("/sync").order(1).handler(context -> {
            HttpServerRequest request = context.request();
            String name = request.getHeader("name");
            context.response().end("good morning " + name);
        });
        // 针对/sync/api/*路径下的所有post请求
        router.route().handler(BodyHandler.create());
        router.post("/sync/api/*").order(-1).handler(context -> {
            HttpServerRequest request = context.request();
            String gender = request.getParam("gender");
           // String formAttribute = request.getFormAttribute();
           // String header = request.getHeader();

           context.response().end("sync线程: " + Thread.currentThread().getName() + "\napi: " + gender);
        });
        //////////////////////////////////   异步处理的请求   ////////////////////////////////////////
        router.get("/async").blockingHandler(context -> {
            // context.request();
            /**
             * 异步处理请求
             * 适合执行耗时操作:
             * 1.数据库访问
             * 2.服务访问
             *
             */
            context.response().end("async线程: " + Thread.currentThread().getName());
        });

        httpServer.requestHandler(router::accept);
        // 监听端口
        httpServer.listen(8091);
    }
}

2.3demo-client-async

package com.zy.client;


import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.HttpResponse;
import io.vertx.ext.web.client.WebClient;
import io.vertx.ext.web.codec.BodyCodec;

import static io.vertx.core.json.Json.encode;

public class Vertx01 {

    /**
     * Vert.x Web Client(Web客户端)是一个异步的 HTTP 和 HTTP/2 客户端。
     *
     * Web Client使得发送 HTTP 请求以及从 Web 服务器接收 HTTP 响应变得更加便捷,同时提供了额外的高级功能,例如:
     *     JSON体的编码和解码
     *     请求和响应泵
     *     请求参数的处理
     *     统一的错误处理
     *     提交表单
     *
     * 制作Web Client的目的并非为了替换Vert.x Core中的 HttpClient,
     * 而是基于该客户端,扩展并保留其便利的设置和特性,
     * 例如请求连接池(Pooling),HTTP/2的支持,流水线/管线的支持等。
     * 当您需要对 HTTP 请求和响应做细微粒度控制时,您应当使用 HttpClient。
     * 另外Web Client并未提供 WebSocket API,此时您应当使用 HttpClient
     *
     * @param args
     */

    public static void main(String[] args) {
        WebClient webClient = WebClient.create(Vertx.vertx());
    }

    /**
     * 异步返回结果
     * @param client
     * @param company
     * @param numberOfShares
     * @return
     */
    private Future<Double> getValueForCompany(WebClient client, String company, int numberOfShares) {
        // 创建预期对象,它将要在收到估值的时候得到赋值
        Future<Double> future = Future.future();

        client.get("/?name=" + encode(company))
                .as(BodyCodec.jsonObject())
                .send(ar -> {
                    if (ar.succeeded()) {
                        HttpResponse<JsonObject> response = ar.result();
                        if (response.statusCode() == 200) {
                            double v = numberOfShares * response.body().getDouble("bid");
                            future.complete(v);
                        } else {
                            future.complete(0.0);
                        }
                    } else {
                        future.fail(ar.cause());
                    }
                });

        return future;
    }
}

2.3.1demo-client-完整版(重试机制)

https://github.com/spring-projects/spring-retry

项目结构

图片.png
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <!--<version>2.1.4.RELEASE</version>-->
        <version>1.5.14.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.zy</groupId>
    <artifactId>spring-boot-vertx-demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>spring-boot-vertx-demo</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.retry</groupId>
            <artifactId>spring-retry</artifactId>
        </dependency>
        <dependency>
            <groupId>org.aspectj</groupId>
            <artifactId>aspectjweaver</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>io.vertx</groupId>
            <artifactId>vertx-core</artifactId>
            <version>3.6.3</version>
        </dependency>
        <dependency>
            <groupId>io.vertx</groupId>
            <artifactId>vertx-web</artifactId>
            <version>3.6.3</version>
        </dependency>
        <dependency>
            <groupId>io.vertx</groupId>
            <artifactId>vertx-web-client</artifactId>
            <version>3.6.3</version>
        </dependency>
        <dependency>
            <groupId>io.vertx</groupId>
            <artifactId>vertx-rx-java2</artifactId>
            <version>3.6.3</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

application.yml

server:
  port: 8080

启动类

package com.zy;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.retry.annotation.EnableRetry;

@EnableRetry(proxyTargetClass = true)
@SpringBootApplication
public class SpringBootVertxDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringBootVertxDemoApplication.class, args);
    }

}

工具类

package com.zy.utils;

import io.vertx.ext.web.client.WebClientOptions;
import io.vertx.reactivex.core.Vertx;
import io.vertx.reactivex.ext.web.client.WebClient;

public class VertxWebClient {

    private static final int connectionTimeoutInMills = 60_000;
    private static final int idleTimeoutMills = 60_000;
    private static final int maxTotalConnection = 200;
    private static final WebClient webClient = WebClient.create(vertx(), options());

    private VertxWebClient() {

    }

    public static WebClient buildWebClient() {
        return webClient;
    }

    private static Vertx vertx() {
        return Vertx.vertx();
    }

    private static WebClientOptions options() {
        return new WebClientOptions()
                .setMaxPoolSize(maxTotalConnection)
                .setConnectTimeout(connectionTimeoutInMills)
                .setSsl(false)
                .setIdleTimeout(idleTimeoutMills)
                .setKeepAlive(true);
    }


}

package com.zy.utils;

import com.zy.exception.CustomException;
import io.reactivex.Single;
import io.vertx.reactivex.core.buffer.Buffer;
import io.vertx.reactivex.ext.web.client.HttpResponse;

import java.util.concurrent.ExecutionException;

public class VertxRequestUtils {

    private static final VertxRequestUtils VERTX_REQUEST_UTILS = new VertxRequestUtils();

    private VertxRequestUtils() {

    }

    public static VertxRequestUtils getInstance() {
        return VERTX_REQUEST_UTILS;
    }

    public int response() throws CustomException {
        try {
            request().toFuture().get().statusCode();
            // return request().toFuture().get().statusCode();
        } catch (InterruptedException | ExecutionException e) {
            System.out.println(".......");
        }
        throw new CustomException(".......");
    }

    private Single<HttpResponse<Buffer>> request() {
        System.out.println("------请求来了-------");
        Single<HttpResponse<Buffer>> single = VertxWebClient.buildWebClient().getAbs("http://finance.ifeng.com/")
                .putHeader("User-Agent", "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36")
                .rxSend();
        return single;
    }

}

异常类

package com.zy.exception;

public class CustomException extends Exception {

    public CustomException(String message) {
        super(message);
    }
}

service层

package com.zy.service;

import com.zy.exception.CustomException;

public interface VertxService {

    int aaa() throws CustomException;
}
package com.zy.service;

import com.zy.exception.CustomException;

public interface VertxRetryService {

    int hello() throws CustomException;
}
package com.zy.service.impl;

import com.zy.exception.CustomException;
import com.zy.service.VertxService;
import com.zy.service.VertxRetryService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class VertxServiceImpl implements VertxService {

    @Autowired
    private VertxRetryService vertxService;

    @Override
    public int aaa() throws CustomException {
        return vertxService.hello();
    }

}
package com.zy.service.impl;

import com.zy.exception.CustomException;
import com.zy.service.VertxRetryService;
import com.zy.utils.VertxRequestUtils;
import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.annotation.Recover;
import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Service;

@Service
public class VertxRetryServiceImpl implements VertxRetryService {

    @Retryable(value = {CustomException.class},maxAttempts = 2,backoff = @Backoff(delay = 0,multiplier = 0))
    @Override
    public int hello() throws CustomException {
        System.out.println(111111111);
        return VertxRequestUtils.getInstance().response();
        //throw new CustomException(".......");
    }

    @Recover
    public int recover(CustomException e){
        System.out.println("wrong");
        return 100000;
    }

}

controller层

package com.zy.controller;

import com.zy.service.VertxService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class VertxController {

    @Autowired
    private VertxService vertxService;
    private final Logger logger = LoggerFactory.getLogger(getClass());

    @RequestMapping("/vertx")
    public Object vertx() {
        int hello = 5;
        try {
            hello = vertxService.aaa();
        } catch (Exception e) {
            System.out.println("------------");
        }
        return hello;
    }

}

若是spring项目, 则需要

a.引入依赖:
    <dependency>
      <groupId>org.springframework.retry</groupId>
      <artifactId>spring-retry</artifactId>
      <version>1.1.2.RELEASE</version>
    </dependency>
    <dependency>
      <groupId>org.aspectj</groupId>
      <artifactId>aspectjweaver</artifactId>
      <version>1.8.13</version>
    </dependency>
b.在任一加了@Configuration注解的配置类上添加@EnableRetry(proxyTargetClass = true)即可, 如
package com.zy.config;

import org.springframework.context.annotation.Configuration;
import org.springframework.retry.annotation.EnableRetry;

@Configuration
@EnableRetry(proxyTargetClass = true)
public class VertxConfig {

}
3.其余设计同上述springboot的service层代码

2.3.2demo-server-eventbus(google-guava版)

项目结构

图片.png

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.zy</groupId>
    <artifactId>vertx-http</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <spring.version>4.3.18.RELEASE</spring.version>
        <fastjson.version>1.2.54</fastjson.version>
        <lombok.version>1.16.20</lombok.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
        </dependency>
        <dependency>
            <groupId>io.vertx</groupId>
            <artifactId>vertx-core</artifactId>
            <version>3.7.0</version>
        </dependency>
        <dependency>
            <groupId>io.vertx</groupId>
            <artifactId>vertx-web</artifactId>
            <version>3.7.0</version>
        </dependency>
        <dependency>
            <groupId>io.vertx</groupId>
            <artifactId>vertx-web-client</artifactId>
            <version>3.7.0</version>
        </dependency>
        <dependency>
            <groupId>io.vertx</groupId>
            <artifactId>vertx-rx-java2</artifactId>
            <version>3.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-beans</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context-support</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-expression</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-webmvc</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-web</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.retry</groupId>
            <artifactId>spring-retry</artifactId>
            <version>1.1.2.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.aspectj</groupId>
            <artifactId>aspectjweaver</artifactId>
            <version>1.8.13</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>log4j-over-slf4j</artifactId>
            <version>1.7.25</version>
        </dependency>
        <dependency>
            <groupId>javax.servlet</groupId>
            <artifactId>servlet-api</artifactId>
            <version>2.5</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>19.0</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.46</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.20</version>
        </dependency>
    </dependencies>

</project>

bean包

package com.zy.bean;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Card {

    private Integer cardId;
    private String cardName;
}
package com.zy.bean;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class User {

    private Integer id;
    private String name;
}

AsyncEventManager

package com.zy.event;

import com.google.common.eventbus.AsyncEventBus;

import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

public class AsyncEventManager {

    private static final Executor executor = Executors.newSingleThreadExecutor();
    private static final AsyncEventBus asyncEventBus = new AsyncEventBus(executor);

    public static void register(Object listener) {
        asyncEventBus.register(listener);
    }

    public static void post(Object event) {
        asyncEventBus.post(event);
    }

    public static void unregister(Object listener) {
        asyncEventBus.unregister(listener);
    }
}

EventBusServiceImpl

package com.zy.service;

import com.google.common.eventbus.Subscribe;
import com.zy.bean.Card;
import com.zy.bean.User;

public class EventBusServiceImpl {

    private static volatile EventBusServiceImpl eventBusService;

    private EventBusServiceImpl(){}

    public static EventBusServiceImpl getInstance() {
        if (eventBusService == null) {
            synchronized (EventBusServiceImpl.class) {
                if (eventBusService == null) {
                    eventBusService = new EventBusServiceImpl();
                }
            }
        }
        return eventBusService;
    }

    @Subscribe
    public void helloUser(User user) {
        System.out.println("user >>>>>>>>>>." + user);
    }

    @Subscribe
    public void helloCard(Card card) {
        System.out.println("card >>>>>>>>>>." + card);
    }

}

Server01

package com.zy.server;

import com.alibaba.fastjson.JSON;
import com.zy.service.EventBusServiceImpl;
import com.zy.bean.Card;
import com.zy.bean.User;
import com.zy.event.AsyncEventManager;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.reactivex.core.AbstractVerticle;
import io.vertx.reactivex.core.http.HttpServer;
import io.vertx.reactivex.core.http.HttpServerResponse;
import io.vertx.reactivex.ext.web.Router;
import io.vertx.reactivex.ext.web.handler.BodyHandler;

public class Server01 extends AbstractVerticle {

    @Override
    public void init(Vertx vertx, Context context) {
        super.init(vertx, context);
        // 根据传入参数的对象类型执行实例的方法
        AsyncEventManager.register(EventBusServiceImpl.getInstance());
    }

    @Override
    public void start(Future<Void> startFuture) throws Exception {
        HttpServer httpServer = vertx.createHttpServer();
        Router router = Router.router(vertx);

        router.post("/hello/user")
                .consumes("application/json")
                .produces("application/json")
                .handler(BodyHandler.create())
                .handler(context -> {
                    JsonObject json = context.getBodyAsJson();
                    User user = json.mapTo(User.class);
                    HttpServerResponse response = context.response();
                    response.setChunked(true);
                    response.putHeader("content-type", "application/json");
                    response.write(JSON.toJSONString(user)).end();
                    // 执行实例的方法
                    AsyncEventManager.post(user);
                    System.out.println("-----------------user--------------");
                });

        router.post("/hello/card")
                .consumes("application/json")
                .produces("application/json")
                .handler(BodyHandler.create())
                .handler(context -> {
                    JsonObject json = context.getBodyAsJson();
                    Card card = json.mapTo(Card.class);
                    HttpServerResponse response = context.response();
                    response.setChunked(true);
                    response.putHeader("content-type", "application/json");
                    response.write(JSON.toJSONString(card)).end();
                    // 根据传入参数的对象类型执行实例的方法
                    AsyncEventManager.post(card);
                    System.out.println("----------------card-----------------");
                });


        httpServer.requestHandler(router).listen(8090);
    }

}

入口函数类

package com.zy;

import io.vertx.reactivex.core.Vertx;

public class Apps {

    public static void main(String[] args) {
        Vertx vertx = Vertx.vertx();
        vertx.deployVerticle("com.zy.server.Server01");
    }
}

相关文章

网友评论

    本文标题:vert.x系列一

    本文链接:https://www.haomeiwen.com/subject/tudsvqtx.html