本周主要是学了以下三个方面:
- Redis哨兵模式主从配置
- 初步学习Netty网络编程框架
- 初步学习spring boot
一、Redis哨兵模式主从配置
监视.png发现主服务器下线.png
升级从服务器.png
之前学习了redis哨兵模式主从配置的理论,主要的流程就如上图所示。这里就不再多说。主要在Linux下做了一个主,二个从,三个哨兵的配置。ip地址分配分别为
主 127.0.0.1:6379
从 127.0.0.1:6389
从 127.0.0.1:6399
哨兵 127.0.0.1:26379
哨兵 127.0.0.1:26389
哨兵 127.0.0.1:26399
主服务redis.conf默认保持不变,主要是对两个从服务redis的redis.conf做如下配置
#端口号
port 6389
#守护进程
daemonize yes
#设置主服务器IP,端口号
slaveof 127.0.0.1:6379
#只读
slave-read-only yes
修改三个哨兵的sentinel.conf
#三个哨兵端口号不同,这里举其中一个例
port 26379
#配置监控的主服务器IP地址和端口以及达成客观下线的哨兵个数
sentinel monitor mymaster 127.0.0.1 6379 2
启动服务后,可查看三台redis的从属关系,测试kill掉主服务器后,哨兵检测到主服务器下线,再将主从配置对调,将127.0.0.1:6389的从服务器升级为主服务器。
二、学习Netty网络编程框架
1.Netty简介
Netty 是一个利用 Java 的高级网络的能力,隐藏了Java背后的复杂性然后提供了一个易于使用的 API 的客户端/服务器框架。
架构
2.核心组件
Bootstrap:Netty的引导类应用程序网络层配置提供容器,其涉及将进程绑定到给定端口或连接一个进程到在指定主机上指定端口上运行的另一进程。引导类分为客户端引导Bootstrap和服务端引导ServerBootstrap。
Channel:基础的IO操作,如绑定、连接、读写等都依赖于底层网络传输所提供的原语,在Java的网络编程中,基础核心类是Socket,而Netty的Channel提供了一组API,极大地简化了直接与Socket进行操作的复杂性,并且Channel是很多类的父类,如EmbeddedChannel、LocalServerChannel、NioDatagramChannel、NioSctpChannel、NioSocketChannel等。
ChannelHandler:ChannelHandler是最重要的组件,其中存放用来处理进站和出站数据的用户逻辑。ChannelHandler的方法被网络事件触发,ChannelHandler可以用于几乎任何类型的操作,如将数据从一种格式转换为另一种格式或处理抛出的异常。其子接口ChannelInboundHandler,接受进站的事件和数据以便被用户定义的逻辑处理,或者当响应所连接的客户端时刷新ChannelInboundHandler的数据。
ChannelPipeline: ChannelPipeline为ChannelHandler链提供了一个容器并定义了用于沿着链传播入站和出站事件流的API。当创建Channel时,会自动创建一个附属的ChannelPipeline
EventLoop:EventLoop定义了处理在连接过程中发生的事件的核心抽象
ChannelFuture:Netty中的所有IO操作都是异步的,不会立即返回,需要在稍后确定操作结果。因此Netty提供了ChannelFuture,其addListener方法可以注册一个ChannelFutureListener,当操作完成时,不管成功还是失败,均会被通知。ChannelFuture存储了之后执行的操作的结果并且无法预测操作何时被执行,提交至Channel的操作按照被唤醒的顺序被执行
3.Channel、ChannelHandler和ChannelPipeline
3.1. Channel生命周期
当状态发生变化时,就会产生相应的事件。
3.2.ChannelPipeline接口
一个ChannelInitializer的实现在ServerBootstrap中进行注册。 当ChannelInitializer的initChannel方法被调用时,ChannelInitializer在管道中安装一组自定义的ChannelHandlers。 ChannelInitializer从ChannelPipeline中移除自身。ChannelHandler可被当做放置任何代码的容器,用于处理到达并通过ChannelPipeline的事件或者数据,数据可以沿着处理链进行传递。
ChannelHandler的两个重要的子类:
ChannelInboundHandler,处理各种入站的数据和状态的变化。
ChannelOutboundHandler,处理出站数据并允许拦截的所有操作。
修改ChannelPipeline:ChannelHandler可实时修改ChannelPipeline的布局,如添加、删除、替换其他ChannelHandler
3.3. ChannelHandlerContext接口
ChannelHandlerContext代表了ChannelHandler与ChannelPipeline之间的关联,当ChannelHandler被添加至ChannelPipeline中时其被创建,ChannelHandlerContext的主要功能是管理相关ChannelHandler与同一ChannelPipeline中的其他ChannelHandler的交互。
ChannelContext
4.EventLoop
- 一个EventLoopGroup包含一个或多个EventLoop。
- 一个EventLoop在生命中周期绑定到一个Thread上。
- EventLoop使用其对应的Thread处理IO事件。
- 一个Channel使用EventLoop进行注册。
- 一个EventLoop可被分配至一个或多个Channel。
image.png
4.2线程管理
Netty的线程模型的优越性能取决于确定当前正在执行的线程的身份,即是否为分配给当前Channel及其EventLoop的线程。如果调用的是EventLoop的线程,那么直接执行该代码块,否则,EventLoop调度一个任务以供稍后执行,并将其放入内部队列中,当处理下个事件时,会处理队列中的事件
EventLoop线程管理
4.3线程分配
异步传输时,只使用少量的EventLoopGroup,在当前的模型中其在通道中共享。这允许通道由最小数量的线程提供服务,而不是为每个通道分配一个线程,节约线程资源,EventLoop可被分配给多个通道,这让我想到了NIO中的选择器
异步传输时的线程分配
5.Bootstarp
引导类分为客户端引导Bootstrap和服务端引导ServerBootstrap
ServerBootstrap绑定到指定端口来监听客户端连接请求,Bootstrap连接至远程服务端。ServerBootstrap包含两个EventLoopGroup
Bootstrap只包含一个EventLoopGroup。
ServerBootstrap包含两组通道,第一组包含一个ServerChannel,表示服务器绑定到本地端口的监听套接字;第二组包含用来处理客户端连接所创建的所有通道,每接受一个连接时便会创建一个通道
ServerBootstrap的两个EventLoopGroup:
image.png
6.ByteBuf
Netty使用ByteBuf作为Java nio中ByteBuffer的替换方案
ByteBuf维护两个不同的读索引和写索引,当读ByteBuf时,readerIndex会随着数据的读取而不断增加,同理,writerIndex也相同,对于空的ByteBuf而言,其readerIndex和writerIndex均初始化为0
6.1ByteBuf使用模式
常用的模式有如下几种:
堆缓冲:将数据存储在JVM的堆空间中,使用backing array提供支持,这种模式在不使用池的情况下提供快速分配和释放。
直接缓冲:通过JVM的本地调用分配内存,这可避免每次调用本地I / O操作之前(或之后)将缓冲区的内容复制到(或从)中间缓冲区。
复合缓冲:呈现多个ByteBufs的聚合视图,可以添加或删除ByteBuf实例,由Netty中的CompositeByteBuf提供支持,CompositeByteBuf中的ByteBuf实例包含直接或非直接的分配。
6.2 随机访问索引
ByteBuf的第一个索引编号为0,最后一个编号为capacity() - 1,可用getByte(int i)读取ByteBuf的数据
6.3顺序访问索引
Netty的ByteBuf有读写两个索引,而JDK的ByteBuffer只有一个索引,因此需要使用flip方法进行读写切换,读写索引将ByteBuf划分为三个区域。
6.4 可舍弃字节
可舍弃的字节表示那些已经被读取的数据,可通过调用discardReadBytes() 方法舍弃并且回收该部分空间。当调用了discardReadBytes方法后整个容量未变,但是此时readerIndex的值变为0,可写的容量大小扩大。
image.png
6.5索引管理
可通过调用markReaderIndex(), markWriterIndex(), resetReaderIndex(), and resetWriterIndex()方法来标记和重置readerIndex和writerIndex,也可通过调用readerIndex(int)、writerIndex(int) 方法来将readerIndex和writerIndex设置为指定值,也可通过调用clear()方法将readerIndex和writerIndex设置为0,但是并不会清空内容。 调用clear之前的布局如下
image
调用clear之后的布局如下
image
clear()方法比discardReadBytes()方法性能更优
7.使用Netty框架做的简单的多人聊天室
ChatServerHandler.java主要是重写的回调函数处理客户端连接的事件
package chat.server;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
public class ChatServerHandler extends SimpleChannelInboundHandler<String> {
public static final ChannelGroup group = new DefaultChannelGroup(
GlobalEventExecutor.INSTANCE);
@Override
protected void channelRead0(ChannelHandlerContext arg0, String arg1)
throws Exception {
Channel channel = arg0.channel();
for (Channel ch : group) {
if (ch == channel) {
ch.writeAndFlush("[you]:" + arg1 + "\n");
} else {
ch.writeAndFlush(
"[" + channel.remoteAddress() + "]: " + arg1 + "\n");
}
}
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
for (Channel ch : group) {
ch.writeAndFlush(
"[" + channel.remoteAddress() + "] " + "is comming");
}
group.add(channel);
}
@Override
public voiced handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
for (Channel ch : group) {
ch.writeAndFlush(
"[" + channel.remoteAddress() + "] " + "is comming");
}
group.remove(channel);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
System.out.println("[" + channel.remoteAddress() + "] " + "online");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
System.out.println("[" + channel.remoteAddress() + "] " + "offline");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
System.out.println(
"[" + ctx.channel().remoteAddress() + "]" + "exit the room");
ctx.close().sync();
}
}
ServerIniterHandler.java去给channelpipelie入站出站添加上字符串编解码器添加消息处理器
package chat.server;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class ServerIniterHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel arg0) throws Exception {
ChannelPipeline pipeline = arg0.pipeline();
pipeline.addLast("docode",new StringDecoder());
pipeline.addLast("encode",new StringEncoder());
pipeline.addLast("chat",new ChatServerHandler());
}
}
服务端绑定和启动
package chat.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class ServerMain {
private int port;
public ServerMain(int port) {
this.port = port;
}
public static void main(String[] args) {
new ServerMain(Integer.parseInt(args[0])).run();
}
public void run() {
EventLoopGroup acceptor = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
bootstrap.group(acceptor, worker);//设置循环线程组,前者用于处理客户端连接事件,后者用于处理网络IO
bootstrap.channel(NioServerSocketChannel.class);//用于构造socketchannel工厂
bootstrap.childHandler(new ServerIniterHandler());//为处理accept客户端的channel中的pipeline添加自定义处理函数
try {
Channel channel = bootstrap.bind(port).sync().channel();//绑定端口(实际上是创建serversocketchannnel,并注册到eventloop上),同步等待完成,返回相应channel
System.out.println("server strart running in port:" + port);
channel.closeFuture().sync();//在这里阻塞,等待关闭
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//退出
acceptor.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
客户端启动
package chat.client;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
public class ClientMain {
private String host;
private int port;
private boolean stop = false;
public ClientMain(String host, int port) {
this.host = host;
this.port = port;
}
public static void main(String[] args) throws IOException {
new ClientMain(args[0], Integer.parseInt(args[1])).run();
}
public void run() throws IOException {
EventLoopGroup worker = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(worker);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ClientIniter());
try {
Channel channel = bootstrap.connect(host, port).sync().channel();
while (true) {
BufferedReader reader = new BufferedReader(
new InputStreamReader(System.in));
String input = reader.readLine();
if (input != null) {
if ("quit".equals(input)) {
System.exit(1);
}
channel.writeAndFlush(input);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
System.exit(1);
}
}
public boolean isStop() {
return stop;
}
public void setStop(boolean stop) {
this.stop = stop;
}
}
服务端的channelhandler打印服务端收到的消息
package chat.client;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class ChatClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext arg0, String arg1)
throws Exception {
System.out.println(arg1);
}
}
客户端channel handlerpipeline添加字符串编码器、解码器、消息处理器
package chat.client;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class ClientIniter extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel arg0) throws Exception {
ChannelPipeline pipeline = arg0.pipeline();
pipeline.addLast("stringD", new StringDecoder());
pipeline.addLast("stringC", new StringEncoder());
pipeline.addLast("http", new HttpClientCodec());
pipeline.addLast("chat", new ChatClientHandler());
}
}
Netty框架还没有学完,还需继续深入学习。
三、初步学习springboot
主要是看了中文官方文档和博客,初步学习了以下,还没深入到微服务的层次去,这个接下来学习。
1.简介
Spring Boot(英文中是“引导”的意思),是用来简化Spring应用的搭建到开发的过程。应用开箱即用,只要通过 just run,就可以启动项目。二者,Spring Boot 只要很少的Spring配置文件。因为习惯优先于配置的原则,使得Spring Boot在快速开发应用和微服务架构实践中得到广泛应用。
注解很大部分与spring一样
Maven添加相关依赖,根据自己的需求添加其他依赖,这里举例简单的依赖,就可以开始
<!-- Spring Boot 启动父依赖 -->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.3.3.RELEASE</version>
</parent>
<dependencies>
<!-- Spring Boot web依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
启动类
@SpringBootApplication:Spring Boot 应用的标识
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class,args);
}
}
Application很简单,一个main函数作为主入口。SpringApplication引导应用,并将Application本身作为参数传递给run方法。run方法会启动嵌入式的Tomcat并初始化Spring环境及其各Spring组件
配置文件
按照以下列表的优先级排列:
1.命令行参数
2.java:comp/env 里的 JNDI 属性
3.JVM 系统属性
4.操作系统环境变量
5.RandomValuePropertySource 属性类生成的 random.* 属性
6.应用以外的 application.properties(或 yml)文件
7.打包在应用内的 application.properties(或 yml)文件
8.在应用 @Configuration 配置类中,用 @PropertySource 注解声明的属性文件
9.SpringApplication.setDefaultProperties 声明的默认属性
根据优先级可以灵活配置,比如可以在测试或生产环境中快速地修改命令行配置参数值,而不需要重新打包和部署应用。
application.properties 文件或者 application.yml 文件
logging.config=classpath:log4j2.xml
spring.mail.host=smtp.163.com
spring.mail.username=jiangyh98@163.com
spring.mail.password=jiang123
spring.mail.default-encoding=UTF-8
spring.mail.port=587�
spring.mail.properties.mail.smtp.auth=true
spring.mail.properties.mail.smtp.starttls.enable=true
spring.mail.properties.mail.smtp.starttls.required=true
mail.fromMail.addr=jiangyh98@163.com
通过 @ConfigurationProperties(prefix = "XXX”) 注解,将配置文件中以 XXX前缀的属性值自动绑定到对应的字段中
2.整合mybatis
添加相关依赖
<!-- Spring Boot Mybatis 依赖 -->
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>${mybatis-spring-boot}</version>
</dependency>
<!-- MySQL 连接驱动依赖 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql-connector}</version>
</dependency>
在appliaction.properties里加入
## Mybatis 配置
mybatis.typeAliasesPackage=org.spring.springboot.domain
mybatis.mapperLocations=classpath:mapper/*.xml
在启动类里加入@MapperScan("org.spring.springboot.dao")这个注解注册 Mybatis mapper 接口类。
为了节约篇幅这里就不贴具体整合代码了
3.整合Redis
添加相关依赖
<!-- Spring Boot Reids 依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-redis</artifactId>
<version>${spring-boot-starter-redis-version}</version>
</dependency>
同样在application.properties配置Redis相关配置参数,
## Redis 配置
## Redis数据库索引(默认为0)
spring.redis.database=0
## Redis服务器地址
spring.redis.host=127.0.0.1
## Redis服务器连接端口
spring.redis.port=6379
## Redis服务器连接密码(默认为空)
spring.redis.password=
## 连接池最大连接数(使用负值表示没有限制)
spring.redis.pool.max-active=8
## 连接池最大阻塞等待时间(使用负值表示没有限制)
spring.redis.pool.max-wait=-1
## 连接池中的最大空闲连接
spring.redis.pool.max-idle=8
## 连接池中的最小空闲连接
spring.redis.pool.min-idle=0
## 连接超时时间(毫秒)
spring.redis.timeout=0
参数跟在spring中整合类似。操作跟在spring中类似使用redisTemplate.
为了节约篇幅这里就不贴具体整合代码了
4.完成简单的邮件服务
添加依赖
<!--log4j2-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<!--Email-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-mail</artifactId>
</dependency>
发送简单邮件、html邮件、嵌入静态资源邮件、带附件的邮件业务
import org.springframework.stereotype.Service;
import java.io.File;
import javax.mail.MessagingException;
import javax.mail.internet.MimeMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.io.FileSystemResource;
import org.springframework.mail.SimpleMailMessage;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.mail.javamail.MimeMessageHelper;
@Service
public class EmailServiceIml implements EmailService {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private JavaMailSender mailSender;//spring 提供的邮件发送类
@Value("${mail.fromMail.addr}")
private String from;
@Override
public void sendSimpleEmail(String to, String subject, String content) {
SimpleMailMessage message = new SimpleMailMessage();//创建简单邮件消息
message.setFrom(from);//设置发送人
message.setTo(to);//设置收件人
/* String[] adds = {"xxx@qq.com","yyy@qq.com"}; //同时发送给多人
message.setTo(adds);*/
message.setSubject(subject);//设置主题
message.setText(content);//设置内容
try {
mailSender.send(message);//执行发送邮件
logger.info("简单邮件已经发送。");
} catch (Exception e) {
logger.error("发送简单邮件时发生异常!", e);
}
}
@Override
public void sendHtmlEmail(String to, String subject, String content) {
MimeMessage message = mailSender.createMimeMessage();//创建一个MIME消息
try {
//true表示需要创建一个multipart message
MimeMessageHelper helper = new MimeMessageHelper(message, true);
helper.setFrom(from);
helper.setTo(to);
helper.setSubject(subject);
helper.setText(content, true);
mailSender.send(message);
logger.info("html邮件发送成功");
} catch (MessagingException e) {
logger.error("发送html邮件时发生异常!", e);
}
}
@Override
public void sendAttachmentsEmail(String to, String subject, String content, String filePath) {
MimeMessage message = mailSender.createMimeMessage();//创建一个MINE消息
try {
MimeMessageHelper helper = new MimeMessageHelper(message, true);
helper.setFrom(from);
helper.setTo(to);
helper.setSubject(subject);
helper.setText(content, true);// true表示这个邮件是有附件的
FileSystemResource file = new FileSystemResource(new File(filePath));//创建文件系统资源
String fileName = filePath.substring(filePath.lastIndexOf(File.separator));
helper.addAttachment(fileName, file);//添加附件
mailSender.send(message);
logger.info("带附件的邮件已经发送。");
} catch (MessagingException e) {
logger.error("发送带附件的邮件时发生异常!", e);
}
}
@Override
public void sendInlineResourceEmail(String to, String subject, String content, String rscPath, String rscId) {
MimeMessage message = mailSender.createMimeMessage();
try {
MimeMessageHelper helper = new MimeMessageHelper(message, true);
helper.setFrom(from);
helper.setTo(to);
helper.setSubject(subject);
helper.setText(content, true);
FileSystemResource res = new FileSystemResource(new File(rscPath));
//添加内联资源,一个id对应一个资源,最终通过id来找到该资源
helper.addInline(rscId, res);//添加多个图片可以使用多条 <img src='cid:" + rscId + "' > 和 helper.addInline(rscId, res) 来实现
mailSender.send(message);
logger.info("嵌入静态资源的邮件已经发送。");
} catch (MessagingException e) {
logger.error("发送嵌入静态资源的邮件时发生异常!", e);
}
}
}
appliaction.properties,这里需要开通163邮箱的授权码
logging.config=classpath:log4j2.xml
spring.mail.host=smtp.163.com
spring.mail.username=jiangyh98@163.com
spring.mail.password=jiang123
spring.mail.default-encoding=UTF-8
spring.mail.port=587�
spring.mail.properties.mail.smtp.auth=true
spring.mail.properties.mail.smtp.starttls.enable=true
spring.mail.properties.mail.smtp.starttls.required=true
mail.fromMail.addr=jiangyh98@163.com
编写测试类,调用service几种方法,发邮件测试成功。
网友评论