美文网首页
vertx实践一——集群及监控

vertx实践一——集群及监控

作者: 天谕ty | 来源:发表于2020-07-28 17:15 被阅读0次

1.部署监控中心程序

监控中心下载地址

在这里插入图片描述
解压后目录是这样
在这里插入图片描述
我这边是在命令行运行
start.bat 7070 mancenter

表示端口7070 应用根路径 mancenter

启动!


在这里插入图片描述

2.vertx集群代码展示

启动类

package org.example;

import com.hazelcast.config.Config;
import com.hazelcast.config.FileSystemXmlConfig;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.eventbus.EventBusOptions;
import io.vertx.spi.cluster.hazelcast.HazelcastClusterManager;

import java.net.InetAddress;
import java.net.UnknownHostException;

/**
 * @Author: Administrator
 * @Description:
 * @Date: 2020/7/16 10:42
 * @Version: 1.0
 */
public class AppCluster {

    public static void main(String[] args) throws  UnknownHostException {
        final VertxOptions vertxOptions = new VertxOptions();
        EventBusOptions eventBusOptions = new EventBusOptions();
        // 本机局域网Ip
        String hostAddress = InetAddress.getLocalHost().getHostAddress();
        //集群方式启动时监听端口,用于接收数据
        vertxOptions.setEventBusOptions(eventBusOptions).getEventBusOptions().setHost(hostAddress);

      //集群方式启动
        HazelcastClusterManager clusterManager = new HazelcastClusterManager();

        vertxOptions.setClusterManager(clusterManager);
        Vertx.clusteredVertx(vertxOptions, res -> {
            Vertx result = res.result();
            result.deployVerticle(new MainClusterVerticle(), r -> {
                if (r.succeeded()) {
                    System.out.println(MainClusterVerticle.class.getName() + " --> 部署成功");
                } else {
                    r.cause().printStackTrace();
                    System.err.println(MainClusterVerticle.class.getName() + " --> 部署失败, " + r.cause().getMessage());
                }
            });
        });
    }
}

Verticle类

package org.example;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.json.JsonObject;

public class MainClusterVerticle extends AbstractVerticle {

  public void start()  {
//to do something

  }
}

cluster.xml

<?xml version="1.0" encoding="UTF-8"?>
<!--
  ~ Copyright (c) 2008-2019, Hazelcast, Inc. All Rights Reserved.
  ~
  ~ Licensed under the Apache License, Version 2.0 (the "License");
  ~ you may not use this file except in compliance with the License.
  ~ You may obtain a copy of the License at
  ~
  ~ http://www.apache.org/licenses/LICENSE-2.0
  ~
  ~ Unless required by applicable law or agreed to in writing, software
  ~ distributed under the License is distributed on an "AS IS" BASIS,
  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  ~ See the License for the specific language governing permissions and
  ~ limitations under the License.
  -->

<!--
  The default Hazelcast configuration.

  This XML file is used when no cluster.xml is present.

  To learn how to configure Hazelcast, please see the schema at
  https://hazelcast.com/schema/config/hazelcast-config-3.12.xsd
  or the Reference Manual at https://hazelcast.org/documentation/
-->

<!--suppress XmlDefaultAttributeValue -->
<hazelcast xmlns="http://www.hazelcast.com/schema/config"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://www.hazelcast.com/schema/config
           http://www.hazelcast.com/schema/config/hazelcast-config-3.12.xsd">

    <group>
        <name>dev</name>
    </group>
    //监控中心地址
    <management-center enabled="true">http://192.168.2.112:7070/mancenter</management-center>
    <network>
    //TCP监听端口5700,如果占用会使用下一个端口,一直到5800
        <port auto-increment="true" port-count="100">5700</port>
        <outbound-ports>
            <!--
            Allowed port range when connecting to other nodes.
            0 or * means use system provided port.
            -->
            <ports>0</ports>
        </outbound-ports>
        <join>
        //采用组播方式集群
            <multicast enabled="true">
                <multicast-group>224.2.2.3</multicast-group>
                <multicast-port>54327</multicast-port>
            </multicast>
            <tcp-ip enabled="false">
                <interface>192.168.2.112</interface>
                <member-list>
                    <member>192.168.2.121</member>
                </member-list>
            </tcp-ip>
            <aws enabled="false">
                <access-key>my-access-key</access-key>
                <secret-key>my-secret-key</secret-key>
                <!--optional, default is us-east-1 -->
                <region>us-west-1</region>
                <!--optional, default is ec2.amazonaws.com. If set, region shouldn't be set as it will override this property -->
                <host-header>ec2.amazonaws.com</host-header>
                <!-- optional, only instances belonging to this group will be discovered, default will try all running instances -->
                <security-group-name>hazelcast-sg</security-group-name>
                <tag-key>type</tag-key>
                <tag-value>hz-nodes</tag-value>
            </aws>
            <gcp enabled="false">
                <zones>us-east1-b,us-east1-c</zones>
            </gcp>
            <azure enabled="false">
                <client-id>CLIENT_ID</client-id>
                <client-secret>CLIENT_SECRET</client-secret>
                <tenant-id>TENANT_ID</tenant-id>
                <subscription-id>SUB_ID</subscription-id>
                <cluster-id>HZLCAST001</cluster-id>
                <group-name>GROUP-NAME</group-name>
            </azure>
            <kubernetes enabled="false">
                <namespace>MY-KUBERNETES-NAMESPACE</namespace>
                <service-name>MY-SERVICE-NAME</service-name>
                <service-label-name>MY-SERVICE-LABEL-NAME</service-label-name>
                <service-label-value>MY-SERVICE-LABEL-VALUE</service-label-value>
            </kubernetes>
            <eureka enabled="false">
                <self-registration>true</self-registration>
                <namespace>hazelcast</namespace>
            </eureka>
            <discovery-strategies>
            </discovery-strategies>
        </join>
        <interfaces enabled="false">
            <interface>10.10.1.*</interface>
        </interfaces>
        <ssl enabled="false"/>
        <socket-interceptor enabled="false"/>
        <symmetric-encryption enabled="false">
            <!--
               encryption algorithm such as
               DES/ECB/PKCS5Padding,
               PBEWithMD5AndDES,
               AES/CBC/PKCS5Padding,
               Blowfish,
               DESede
            -->
            <algorithm>PBEWithMD5AndDES</algorithm>
            <!-- salt value to use when generating the secret key -->
            <salt>thesalt</salt>
            <!-- pass phrase to use when generating the secret key -->
            <password>thepass</password>
            <!-- iteration count to use when generating the secret key -->
            <iteration-count>19</iteration-count>
        </symmetric-encryption>
        <failure-detector>
            <icmp enabled="false"/>
        </failure-detector>
    </network>
    <partition-group enabled="false"/>
    <executor-service name="default">
        <pool-size>16</pool-size>
        <!--Queue capacity. 0 means Integer.MAX_VALUE.-->
        <queue-capacity>0</queue-capacity>
    </executor-service>
    <security>
        <client-block-unmapped-actions>true</client-block-unmapped-actions>
    </security>
    <queue name="default">
        <!--
            Maximum size of the queue. When a JVM's local queue size reaches the maximum,
            all put/offer operations will get blocked until the queue size
            of the JVM goes down below the maximum.
            Any integer between 0 and Integer.MAX_VALUE. 0 means
            Integer.MAX_VALUE. Default is 0.
        -->
        <max-size>0</max-size>
        <!--
            Number of backups. If 1 is set as the backup-count for example,
            then all entries of the map will be copied to another JVM for
            fail-safety. 0 means no backup.
        -->
        <backup-count>1</backup-count>

        <!--
            Number of async backups. 0 means no backup.
        -->
        <async-backup-count>0</async-backup-count>

        <empty-queue-ttl>-1</empty-queue-ttl>

        <merge-policy batch-size="100">com.hazelcast.spi.merge.PutIfAbsentMergePolicy</merge-policy>
    </queue>
    <map name="default">
        <!--
           Data type that will be used for storing recordMap.
           Possible values:
           BINARY (default): keys and values will be stored as binary data
           OBJECT : values will be stored in their object forms
           NATIVE : values will be stored in non-heap region of JVM
        -->
        <in-memory-format>BINARY</in-memory-format>

        <!--
            Metadata creation policy for this map. Hazelcast may process objects of supported types ahead of time to
            create additional metadata about them. This metadata then is used to make querying and indexing faster.
            Metadata creation may decrease put throughput.
            Valid values are:
            CREATE_ON_UPDATE (default): Objects of supported types are pre-processed when they are created and updated.
            OFF: No metadata is created.
        -->
        <metadata-policy>CREATE_ON_UPDATE</metadata-policy>

        <!--
            Number of backups. If 1 is set as the backup-count for example,
            then all entries of the map will be copied to another JVM for
            fail-safety. 0 means no backup.
        -->
        <backup-count>1</backup-count>
        <!--
            Number of async backups. 0 means no backup.
        -->
        <async-backup-count>0</async-backup-count>
        <!--
            Maximum number of seconds for each entry to stay in the map. Entries that are
            older than <time-to-live-seconds> and not updated for <time-to-live-seconds>
            will get automatically evicted from the map.
            Any integer between 0 and Integer.MAX_VALUE. 0 means infinite. Default is 0
        -->
        <time-to-live-seconds>0</time-to-live-seconds>
        <!--
            Maximum number of seconds for each entry to stay idle in the map. Entries that are
            idle(not touched) for more than <max-idle-seconds> will get
            automatically evicted from the map. Entry is touched if get, put or containsKey is called.
            Any integer between 0 and Integer.MAX_VALUE. 0 means infinite. Default is 0.
        -->
        <max-idle-seconds>0</max-idle-seconds>
        <!--
            Valid values are:
            NONE (no eviction),
            LRU (Least Recently Used),
            LFU (Least Frequently Used).
            NONE is the default.
        -->
        <eviction-policy>NONE</eviction-policy>
        <!--
            Maximum size of the map. When max size is reached,
            map is evicted based on the policy defined.
            Any integer between 0 and Integer.MAX_VALUE. 0 means
            Integer.MAX_VALUE. Default is 0.
        -->
        <max-size policy="PER_NODE">0</max-size>
        <!--
            `eviction-percentage` property is deprecated and will be ignored when it is set.

            As of version 3.7, eviction mechanism changed.
            It uses a probabilistic algorithm based on sampling. Please see documentation for further details
        -->
        <eviction-percentage>25</eviction-percentage>
        <!--
            `min-eviction-check-millis` property is deprecated  and will be ignored when it is set.

            As of version 3.7, eviction mechanism changed.
            It uses a probabilistic algorithm based on sampling. Please see documentation for further details
        -->
        <min-eviction-check-millis>100</min-eviction-check-millis>
        <!--
            While recovering from split-brain (network partitioning),
            map entries in the small cluster will merge into the bigger cluster
            based on the policy set here. When an entry merge into the
            cluster, there might an existing entry with the same key already.
            Values of these entries might be different for that same key.
            Which value should be set for the key? Conflict is resolved by
            the policy set here. Default policy is PutIfAbsentMapMergePolicy

            There are built-in merge policies such as
            com.hazelcast.map.merge.PassThroughMergePolicy; entry will be overwritten if merging entry exists for the key.
            com.hazelcast.map.merge.PutIfAbsentMapMergePolicy ; entry will be added if the merging entry doesn't exist in the cluster.
            com.hazelcast.map.merge.HigherHitsMapMergePolicy ; entry with the higher hits wins.
            com.hazelcast.map.merge.LatestUpdateMapMergePolicy ; entry with the latest update wins.
        -->
        <merge-policy batch-size="100">com.hazelcast.spi.merge.PutIfAbsentMergePolicy</merge-policy>

        <!--
           Control caching of de-serialized values. Caching makes query evaluation faster, but it cost memory.
           Possible Values:
                        NEVER: Never cache deserialized object
                        INDEX-ONLY: Caches values only when they are inserted into an index.
                        ALWAYS: Always cache deserialized values.
        -->
        <cache-deserialized-values>INDEX-ONLY</cache-deserialized-values>

    </map>

    <!--
           Configuration for an event journal. The event journal keeps events related
           to a specific partition and data structure. For instance, it could keep
           map add, update, remove, merge events along with the key, old value, new value and so on.
        -->
    <event-journal enabled="false">
        <mapName>mapName</mapName>
        <capacity>10000</capacity>
        <time-to-live-seconds>0</time-to-live-seconds>
    </event-journal>

    <event-journal enabled="false">
        <cacheName>cacheName</cacheName>
        <capacity>10000</capacity>
        <time-to-live-seconds>0</time-to-live-seconds>
    </event-journal>

    <!--
       Configuration for a merkle tree.
       The merkle tree is a data structure used for efficient comparison of the
       difference in the contents of large data structures. The precision of
       such a comparison mechanism is defined by the depth of the merkle tree.
    -->
    <merkle-tree enabled="false">
        <mapName>mapName</mapName>
        <depth>10</depth>
    </merkle-tree>

    <multimap name="default">
        <backup-count>1</backup-count>
        <value-collection-type>SET</value-collection-type>
        <merge-policy batch-size="100">com.hazelcast.spi.merge.PutIfAbsentMergePolicy</merge-policy>
    </multimap>

    <replicatedmap name="default">
        <in-memory-format>OBJECT</in-memory-format>
        <async-fillup>true</async-fillup>
        <statistics-enabled>true</statistics-enabled>
        <merge-policy batch-size="100">com.hazelcast.spi.merge.PutIfAbsentMergePolicy</merge-policy>
    </replicatedmap>

    <list name="default">
        <backup-count>1</backup-count>
        <merge-policy batch-size="100">com.hazelcast.spi.merge.PutIfAbsentMergePolicy</merge-policy>
    </list>

    <set name="default">
        <backup-count>1</backup-count>
        <merge-policy batch-size="100">com.hazelcast.spi.merge.PutIfAbsentMergePolicy</merge-policy>
    </set>

    <jobtracker name="default">
        <max-thread-size>0</max-thread-size>
        <!-- Queue size 0 means number of partitions * 2 -->
        <queue-size>0</queue-size>
        <retry-count>0</retry-count>
        <chunk-size>1000</chunk-size>
        <communicate-stats>true</communicate-stats>
        <topology-changed-strategy>CANCEL_RUNNING_OPERATION</topology-changed-strategy>
    </jobtracker>

    <semaphore name="default">
        <initial-permits>0</initial-permits>
        <backup-count>1</backup-count>
        <async-backup-count>0</async-backup-count>
    </semaphore>

    <reliable-topic name="default">
        <read-batch-size>10</read-batch-size>
        <topic-overload-policy>BLOCK</topic-overload-policy>
        <statistics-enabled>true</statistics-enabled>
    </reliable-topic>

    <ringbuffer name="default">
        <capacity>10000</capacity>
        <backup-count>1</backup-count>
        <async-backup-count>0</async-backup-count>
        <time-to-live-seconds>0</time-to-live-seconds>
        <in-memory-format>BINARY</in-memory-format>
        <merge-policy batch-size="100">com.hazelcast.spi.merge.PutIfAbsentMergePolicy</merge-policy>
    </ringbuffer>

    <flake-id-generator name="default">
        <prefetch-count>100</prefetch-count>
        <prefetch-validity-millis>600000</prefetch-validity-millis>
        <id-offset>0</id-offset>
        <node-id-offset>0</node-id-offset>
        <statistics-enabled>true</statistics-enabled>
    </flake-id-generator>

    <atomic-long name="default">
        <merge-policy batch-size="100">com.hazelcast.spi.merge.PutIfAbsentMergePolicy</merge-policy>
    </atomic-long>

    <atomic-reference name="default">
        <merge-policy batch-size="100">com.hazelcast.spi.merge.PutIfAbsentMergePolicy</merge-policy>
    </atomic-reference>

    <count-down-latch name="default"/>

    <serialization>
        <portable-version>0</portable-version>
    </serialization>

    <services enable-defaults="true"/>

    <lite-member enabled="false"/>

    <cardinality-estimator name="default">
        <backup-count>1</backup-count>
        <async-backup-count>0</async-backup-count>
        <merge-policy batch-size="100">HyperLogLogMergePolicy</merge-policy>
    </cardinality-estimator>

    <scheduled-executor-service name="default">
        <capacity>100</capacity>
        <durability>1</durability>
        <pool-size>16</pool-size>
        <merge-policy batch-size="100">com.hazelcast.spi.merge.PutIfAbsentMergePolicy</merge-policy>
    </scheduled-executor-service>

    <crdt-replication>
        <replication-period-millis>1000</replication-period-millis>
        <max-concurrent-replication-targets>1</max-concurrent-replication-targets>
    </crdt-replication>

    <pn-counter name="default">
        <replica-count>2147483647</replica-count>
        <statistics-enabled>true</statistics-enabled>
    </pn-counter>

    <cp-subsystem>
        <cp-member-count>0</cp-member-count>
        <group-size>0</group-size>
        <session-time-to-live-seconds>300</session-time-to-live-seconds>
        <session-heartbeat-interval-seconds>5</session-heartbeat-interval-seconds>
        <missing-cp-member-auto-removal-seconds>14400</missing-cp-member-auto-removal-seconds>
        <fail-on-indeterminate-operation-state>false</fail-on-indeterminate-operation-state>
        <raft-algorithm>
            <leader-election-timeout-in-millis>2000</leader-election-timeout-in-millis>
            <leader-heartbeat-period-in-millis>5000</leader-heartbeat-period-in-millis>
            <max-missed-leader-heartbeat-count>5</max-missed-leader-heartbeat-count>
            <append-request-max-entry-count>100</append-request-max-entry-count>
            <commit-index-advance-count-to-snapshot>10000</commit-index-advance-count-to-snapshot>
            <uncommitted-entry-count-to-reject-new-appends>100</uncommitted-entry-count-to-reject-new-appends>
            <append-request-backoff-timeout-in-millis>100</append-request-backoff-timeout-in-millis>
        </raft-algorithm>
    </cp-subsystem>
</hazelcast>

项目目录结构

目录价

3.集群启动及原理分析

在这里插入图片描述
  • 该进程三个重要的TCP端口。
  1. 58327端口连接192.168.2.112:7070用于发送该vertx服务jvm数据。
  2. 58331端口用于集群中别的vertx服务通过eventbus向它发数据。
  3. 5700端口,实验中观察发现集群中启动n个服务,每个服务都会与其他n-1个服务建立连接,并且通过心跳保持连接,这样就可以在一个服务挂掉以后,别的服务会有断开连接提示进而知道集群中某节点挂掉。
  • 该进程一个重要UDP端口,组播端口,用于互相告知连接端口,如果上面的5700,进而建立连接形成集群。

第一个vertx服务启动界面


在这里插入图片描述

第二个vertx服务启动界面


在这里插入图片描述

第三个vertx服务启动界面


在这里插入图片描述

可以看出每个服务都保存一份集群节点

从第一个进程(4980)的套接字看


在这里插入图片描述

可以看出第一个进程通过它的集群端口(5700端口)和其他两个集群中的服务保持了TCP连接从而建立集群

从第二个进程的套接字看(3376)


在这里插入图片描述

可以看出第二个进程通过50674端口和第一个进程集群端口(5700端口)保持TCP连接建立集群,通过50787端口连接第三个进程的集群端口(5702端口)保持TCP连接建立集群

看图可能会直观一点

在这里插入图片描述

相关文章

网友评论

      本文标题:vertx实践一——集群及监控

      本文链接:https://www.haomeiwen.com/subject/gtfqrktx.html