美文网首页
vertx, kotlin, postgrest reactiv

vertx, kotlin, postgrest reactiv

作者: 赐我理由在披甲上阵 | 来源:发表于2020-05-07 10:57 被阅读0次

build.gradle

plugins {
  id 'java'
  id 'java-library'
  id 'application'
  id 'org.jetbrains.kotlin.jvm' version '1.3.40'
  id 'com.github.johnrengelman.shadow' version '5.2.0'
  id 'maven-publish'
//  id("io.vertx.vertx-plugin") version "0.8.0" apply false
}

group 'citi.rio'
version '1.0-SNAPSHOT'

sourceCompatibility = 1.8
def mainClass = 'citi.ken.Starter'

dependencies {w
  // kotlin
  implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kt_version"
  implementation "org.jetbrains.kotlin:kotlin-reflect:$kt_version"
  implementation "com.fasterxml.jackson.module:jackson-module-kotlin:$jackson_version"

  implementation "io.vertx:vertx-core:$vertx_version"
  implementation "io.vertx:vertx-web:$vertx_version"
  implementation "io.vertx:vertx-pg-client:$vertx_version"
//    implementation "io.vertx:vertx-redis-client:$vertx_version"
//    implementation "io.vertx:vertx-jdbc-client:$vertx_version"

  // kotlin vertx
  implementation "io.vertx:vertx-lang-kotlin:$vertx_version"
  implementation "io.vertx:vertx-lang-kotlin-coroutines:$vertx_version"

  // log
  implementation "org.slf4j:jul-to-slf4j:$slf4j_version"
  runtime "ch.qos.logback:logback-classic:$logback_version"

  // java -cp .\h2-1.4.199.jar org.h2.tools.Server -tcp -tcpAllowOthers -web -webAllowOthers -browser -ifNotExists
  testImplementation("com.h2database:h2:1.4.199")
  testImplementation "org.junit.jupiter:junit-jupiter:5.4.2"
  testImplementation "io.vertx:vertx-junit5:$vertx_version"
  testImplementation "io.vertx:vertx-web-client:$vertx_version"
  testImplementation("org.testcontainers:postgresql:1.11.3")

  testImplementation "junit:junit:4.12"
}

repositories {
  mavenLocal()
  maven {
    url = uri("https://www.artifactrepository.citigroup.net:443/artifactory/maven-icg-dev")
    credentials {
      username = "ocean-devops"
      password = "APBQW78wqY4oewwXFBtTfcN17ZG"
    }
  }
  mavenCentral()
  jcenter()
}

sourceSets {
  main {
    java {
      srcDirs += 'src/main/kotlin'
      outputDir = file("$buildDir/classes/java/main")
    }
    kotlin {
      outputDir = file("$buildDir/classes/kotin/main")
    }
  }
}

compileKotlin {
  kotlinOptions.jvmTarget = "1.8"
  dependencies {
    compile files("$sourceSets.main.kotlin.outputDir")
  }
}
compileTestKotlin {
  kotlinOptions.jvmTarget = "1.8"
}

def mainVerticleName = 'citi.ken.verticle.MainVerticle'
def watchForChange = 'src/**/*'
def doOnChange = 'gradle classes'

application {
  mainClassName = mainClass
}

run {
  args = ['run', mainVerticleName, "--redeploy=$watchForChange", "--launcher-class=$mainClassName",
          "--on-redeploy=$doOnChange", '--conf=config/config_local.json']
}

task sourceJar(type: Jar) {
  classifier 'sources'
  from sourceSets.main.allJava
}

publishing {
  publications {
    maven(MavenPublication) {
      artifact tasks.sourceJar
      groupId = group
      artifactId = "$project.name"
      from components.java
    }
  }
}

task testRunner() {
  doLast {
    def libs = configurations.runtimeClasspath.findAll {
      it.name.contains('rio_registryservice') || it.name.contains('ocean-common')
    }
    configurations.runtimeClasspath.collect { println it.name }
  }
}

// tasks demo
//task startUmb1(type: Exec, group: 'gemfire', dependsOn: [installGemfire, build]) {
//  workingDir projectDir
//  environment 'GEMFIRE_HOME', installDir
//  environment 'PATH', gemfirePath
////    environment 'JAVA_HOME', 'C:/Users/hw83770/Java/jdk1.8.0_161'
//  if (System.getProperty('os.name').toLowerCase(Locale.ROOT).contains('windows')) {
//    commandLine cmd, "run --file=${projectDir}/scripts/start_umb1.gfsh"
//  } else {
//    commandLine 'sh', '-c', "gfsh run --file=${projectDir}/scripts/start_umb1.gfsh"
//  }
//}
//
//task cleanYumeServer(group: 'gemfire') {
//  doLast {
//    delete 'yume_locator'
//    delete 'yume_server1'
//    delete 'yume_server2'
//  }
//}

gradle.properties

version=1.0-SNAPSHOT
### remote
#gemfireRepositoryUrl = https://globaldeploymentservicesforunixsystems.citigroup.net/cgi-bin/down.cgi?lookup=pivotal-gemfire-9.8.4.tgz&location=Linux&dl=yes
### local
gemfireRepositoryUrl=C:/Users/hw83770/Desktop/gemfire-9.8.4/pivotal-gemfire-9.8.4.tgz
gemfireReleaseUrl=
# dependency versions
#assertjVersion = 3.6.2
#awaitilityVersion = 1.7.0
#junitVersion = 4.12
#mockitocoreVersion = 2.19.1
#log4jVersion = 2.11.0
#systemrulesVersion = 1.16.1
#lombokVersion = 1.18.8
#guavaVersion = 25.1-jre

kt_version=1.3.40
vertx_version=3.8.2
jackson_version=2.9.10
slf4j_version=1.7.25
logback_version=1.2.3
jackson_version=2.9.10

config_local.json

{
  "http.port": 9991,
  "pgClient": {
    "database": "meta_int",
    "host": "oceanap02d.nam.nsroot.net",
    "user": "admin",
    "password": "password",
    "port": 5432,
    "max_pool_size": 30
  }
}

log-back.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration>

    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
            <Pattern>%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n</Pattern>
        </encoder>
    </appender>

    <logger name="citi.ken" level="info"/>

    <root level="info">
        <appender-ref ref="STDOUT"/>
    </root>

</configuration>

constants.kt

package citi.ken.config

/**
 * @Author hw83770
 * @Date 17:04 2019/10/30
 *
 */

const val item = "account"

const val API_GET = "/$item/:Id"
const val API_LIST_ALL = "/$item"
const val API_CREATE = "/$item"
const val API_UPDATE = "/$item/:Id"
const val API_DELETE = "/$item/:Id"
const val API_DELETE_ALL = "/$item"

package citi.ken.domain

import java.time.LocalDateTime

/**
 * @Author hw83770
 * @Date 17:18 2019/10/30
 *
 */
data class Account(val id: Int) {
  var username: String = ""
  var nickname: String = ""
  var avatar: String = ""
  var createTime: String = LocalDateTime.now().toString()
  var updateTime: String = LocalDateTime.now().toString()

  constructor(
    id: Int,
    username: String,
    nickname: String,
    avatar: String,
    createTime: String,
    updateTime: String
  ) : this(id) {
    this.username = username
    this.nickname = nickname
    this.avatar = avatar
    this.createTime = createTime
    this.updateTime = updateTime
  }
}

package citi.ken.domain

import citi.ken.orm.Table
import citi.ken.orm.column
import io.vertx.sqlclient.Row

/**
 * @Author hw83770
 * @Date 10:08 2019/10/31
 *
 */

object Accounts : Table<Account>(tblName = "accounts") {

  val id = column("id", "int")
  val username = column("username", "String")
  val nickname = column("nickname", "String")
  val avatar = column("avatar", "String")
  val created_at = column("created_at", "timestamp")
  val updated_at = column("updated_at", "timestamp")

  fun mapToEntity(from: Row): Account = run {
    return Account(
      id = from.getInteger(id.col),
      username = from.getString(username.col),
      nickname = from.getString(nickname.col).orEmpty(),
      avatar = from.getString(avatar.col).orEmpty(),
      createTime = from.getLocalDateTime(created_at.col).toString(),
      updateTime = from.getLocalDateTime(updated_at.col).toString()
    )
  }

}


package citi.ken.orm

/**
 * @Author hw83770
 * @Date 18:21 2019/10/30
 *
 */
data class Column(
  val col: String,
  val type: String
)

---

package citi.ken.orm

/**
 * @Author hw83770
 * @Date 10:42 2019/10/31
 *
 */
class Expression {
}


package citi.ken.orm

import java.util.LinkedHashMap
import java.util.concurrent.atomic.AtomicInteger
import kotlin.reflect.KClass
import kotlin.text.StringBuilder

/**
 * @Author hw83770
 * @Date 18:11 2019/10/30
 *
 */
abstract class Table<E : Any>(tblName: String, entityClass: KClass<E>? = null) {
  private val _refCounter = AtomicInteger()
  private val _columns = LinkedHashMap<String, Column>()
  private var _primaryKeyName: String? = null

  val tblName: String = tblName

//  val entityClass: KClass<E>? =
//    (entityClass ?: referencedKotlinType.jvmErasure as KClass<E>).takeIf { it != Nothing::class }

//  protected abstract fun mapToEntity(): E

  fun registerColumn(colums: Column): Unit {
    if (colums.col in _columns.keys) {
      throw IllegalArgumentException("Duplicate column name: ${colums.col}")
    }
    _columns[colums.col] = colums
//    return ColumnRegistration(name)
  }

  fun select(vararg colums: Column): Query {
    val cols = colums.asList()
    return Query(cols = cols, from = this.tblName)
  }

  fun insert(cols: List<Column>, values: List<String>, source: String): String {
    val expression = StringBuilder("insert into $source (")
    val str = cols.map { it.col }
    expression.append(str.joinToString())
    expression.append(") values (")
    expression.append(values.joinToString())
    expression.append(")")
    return expression.toString()
  }

  fun insertPrepare(cols: List<Column>, source: String): String {
    val expression = StringBuilder("insert into $source (")
    val str = cols.map { it.col }
    // generate $1 $2 ...
    val seqs = generateSequence(1) { it + 1 }.map { "$$it" }
    val values = seqs.take(str.size).toList()

    expression.append(str.joinToString())
    expression.append(") values (")
    expression.append(values.joinToString())
    expression.append(")")
    return expression.toString()
  }

}

data class Query(
  val cols: List<Column>,
  val from: String
) {
  fun where(block: () -> Boolean) {
    block()
  }
}


fun <E : Any> Table<E>.column(name: String, type: String): Column {
  val col = Column(name, type)
  this.registerColumn(colums = col)
  return col
}

package citi.ken.service

import citi.ken.domain.Account
import citi.ken.domain.Accounts
import io.vertx.core.Vertx
import io.vertx.pgclient.PgPool
import org.slf4j.LoggerFactory
import io.vertx.kotlin.pgclient.pgConnectOptionsOf
import io.vertx.kotlin.pgclient.preparedQueryAwait
import io.vertx.kotlin.pgclient.queryAwait
import io.vertx.kotlin.sqlclient.poolOptionsOf
import io.vertx.sqlclient.Row
import io.vertx.sqlclient.Tuple
import java.time.LocalDateTime


/**
 * @Author hw83770
 * @Date 17:14 2019/10/30
 *
 */
class PgclientService(val vertx: Vertx) {

  val logger = LoggerFactory.getLogger(PgclientService::class.java)
  private val pgClient by lazy(LazyThreadSafetyMode.PUBLICATION) { createPgClient() }


  suspend fun getAll(): List<Account>? = runSafely<List<Account>> {
    val queryAwait = pgClient.queryAwait("select * from ${Accounts.tblName}")
    return queryAwait.toList().map { Accounts.mapToEntity(it) }
  }

  suspend fun getCertain(accountId: String): Account? = runSafely<Account> {
    val queryAwait = pgClient.queryAwait("select * from ${Accounts.tblName} where ${Accounts.id.col} = $accountId")
    val res: Row = queryAwait.iterator().next()
    return Accounts.mapToEntity(res)
  }

  suspend fun insert(account: Account): Account? {
    val sql = Accounts.insertPrepare(
      cols = listOf(Accounts.username, Accounts.nickname, Accounts.created_at),
//      values = listOf(account.username, account.nickname, LocalDateTime.now()),
      source = Accounts.tblName
    )
    logger.info("sql: $sql")
    return runSafely {
      val queryAwait = pgClient
        .preparedQueryAwait(sql, Tuple.of(account.username, account.nickname, LocalDateTime.now()))
      if (queryAwait.rowCount() > 0) account else null
    }
  }

  private fun createPgClient(): PgPool {
    val config = Vertx.currentContext().config().getJsonObject("pgClient")

    val connectOptions = pgConnectOptionsOf(
      database = config.getString("database"),
      host = config.getString("host"),
      user = config.getString("user"),
      password = config.getString("password"),
      port = config.getInteger("port")
    ).addProperty("search_path", "test")

    val poolOptions = poolOptionsOf(maxSize = config.getInteger("max_pool_size"))
    return PgPool.pool(vertx, connectOptions, poolOptions)
  }

  inline fun <T> runSafely(block: () -> T?): T? {
    return try {
      block()
    } catch (e: Throwable) {
      this.logger.error(e.message)
      null
    }
  }

}

package citi.ken.utils

import io.vertx.core.http.HttpHeaders
import io.vertx.core.json.Json
import io.vertx.ext.web.RoutingContext
import io.vertx.kotlin.coroutines.awaitResult
import io.vertx.pgclient.PgPool
import io.vertx.pgclient.impl.RowImpl
import io.vertx.sqlclient.Row
import io.vertx.sqlclient.RowSet
import io.vertx.sqlclient.SqlResult
import java.util.stream.Collector

/**
 * @Author hw83770
 * @Date 17:54 2019/10/30
 *
 */

fun RoutingContext.toJson(obj: Any?) {
  response()
    .putHeader(HttpHeaders.CONTENT_TYPE, "application/json; charset=utf-8")
    .end(if (obj is String) obj else Json.encode(obj))
}

package citi.ken.verticle

import citi.ken.config.API_CREATE
import citi.ken.config.API_GET
import citi.ken.config.API_LIST_ALL
import citi.ken.domain.Account
import citi.ken.service.PgclientService
import citi.ken.utils.toJson
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
import io.vertx.core.Handler
import io.vertx.core.http.HttpHeaders
import io.vertx.core.http.HttpMethod
import io.vertx.ext.web.Router
import io.vertx.ext.web.RoutingContext
import io.vertx.ext.web.handler.BodyHandler
import io.vertx.ext.web.handler.CorsHandler
import io.vertx.ext.web.handler.LoggerHandler
import io.vertx.kotlin.coroutines.CoroutineVerticle
import io.vertx.kotlin.coroutines.dispatcher
import kotlinx.coroutines.launch
import org.slf4j.LoggerFactory

/**
 * @Author hw83770
 * @Date 17:46 2019/10/30
 *
 */
class AccountRestVerticleCo : CoroutineVerticle() {
  private val logger = LoggerFactory.getLogger(AccountRestVerticleCo::class.java)

  private val defaultPort = 9991
  private val router by lazy(LazyThreadSafetyMode.NONE) { createRouter() }
  val mapper = jacksonObjectMapper()

  private lateinit var pgclientService: PgclientService


  private val cors: CorsHandler = CorsHandler.create("*")
    .allowedHeaders(
      setOf(
        HttpHeaders.CONTENT_TYPE.toString(),
        HttpHeaders.ORIGIN.toString(),
        HttpHeaders.ACCESS_CONTROL_ALLOW_ORIGIN.toString()
      )
    )
    .allowedMethods(
      setOf(
        HttpMethod.GET,
        HttpMethod.POST,
        HttpMethod.PUT,
        HttpMethod.PATCH,
        HttpMethod.DELETE,
        HttpMethod.OPTIONS
      )
    )

  override suspend fun start() {
    val port = config.getInteger("http.port", defaultPort)
    pgclientService = PgclientService(vertx)
    vertx.createHttpServer().requestHandler(router).listen(port)
  }

  private fun createRouter(): Router = Router.router(vertx).apply {
    // cors handle
    route().handler(cors)
    route().handler(BodyHandler.create())
    route().handler(LoggerHandler.create())

    get("/configs").handler { it.toJson(config.toString()) }
    get(API_GET).handler(getCertain)
    get(API_LIST_ALL).handler(getAll)
    post(API_CREATE).handler(createOne)

    //todo:
//      post(API_UPDATE).handler(updateTodo)
//      delete(API_DELETE).handler(deleteTodo)
//      delete(API_DELETE_ALL).handler(deleteAll)

  }


  private val getAll = Handler<RoutingContext> { ctx ->
    doAsync(ctx) { pgclientService.getAll() }
  }

  private val getCertain = Handler<RoutingContext> { ctx ->
    val id = ctx.request().getParam("Id")
    doAsync(ctx) { pgclientService.getCertain(id) }
  }

  private val createOne = Handler<RoutingContext> { ctx ->
    val data: ByteArray = ctx.body.bytes
    val acc: Account = mapper.readValue(data)
    doAsync(ctx) { pgclientService.insert(acc) }
  }


  fun <T> doAsync(ctx: RoutingContext, block: suspend () -> T) {
    launch(vertx.dispatcher()) {
      logger.info("doAsync1: ${Thread.currentThread().name}")
      val t = block.invoke()
//      Thread.sleep(1000)
      logger.info("doAsync2: ${Thread.currentThread().name}")
      ctx.toJson(t ?: "no data")
    }
    logger.info("doAsync3: ${Thread.currentThread().name}")
  }

}


package citi.ken.verticle

/**
 * @Author hw83770
 * @Date 16:48 2019/10/30
 * MainVerticle to deploy other component verticles
 */
import io.vertx.core.AbstractVerticle
import io.vertx.core.DeploymentOptions
import org.slf4j.LoggerFactory

class MainVerticle : AbstractVerticle() {
  val log = LoggerFactory.getLogger(MainVerticle::class.java)

  override fun start() {
    log.info("MainVerticle started")
    vertx.deployVerticle(AccountRestVerticleCo(), DeploymentOptions().setConfig(config()))
  }
}

package citi.ken

import io.vertx.core.Launcher
import io.vertx.core.VertxOptions
import org.slf4j.bridge.SLF4JBridgeHandler

/**
 * @Author hw83770
 * @Date 16:50 2019/10/30
 *
 */
object Starter {

  /**
   *  @param program argument:
   *  deploy kotlin verticle:
   *  run citi.ken.verticle.MainVerticle -conf=config/config_local.json
   *
   *  deploy java verticle:
   *  run citi.ken.verticle.JavaMainVerticle -conf=config/config_local.json
   */
  @JvmStatic
  fun main(args: Array<String>) {
    SLF4JBridgeHandler.removeHandlersForRootLogger()
    SLF4JBridgeHandler.install()
    MyLauncher().dispatch(args)
  }
}

class MyLauncher : Launcher() {

  override fun beforeStartingVertx(options: VertxOptions?) {
    options?.apply {
      eventLoopPoolSize = 4
      workerPoolSize = 4
    }
  }

}

rest_test.http


### hello
GET http://localhost:9991/configs HTTP/1.1

### get all
GET http://localhost:9991/account HTTP/1.1

### get one
GET http://localhost:9991/account/1 HTTP/1.1 

###
POST http://localhost:9991/account HTTP/1.1
content-type: application/json

{
    "username": "sample1",
    "nickname": "test_nick",
    "avatar": "test/url_avatar",
    "createTime":  "2015-11-11 18:27:50"
}


###
https://localhost:8080/geode/v1/publisher?limit=50


###
https://localhost:8080/geode/v1/publisher?limit=50
accept: application/json;charset=UTF-8
Accept-Encoding: gzip, deflate, br
Accept-Language: en-US,en;q=0.9
Authorization: Basic aHc4Mzc3MDoxMjM=
Connection: keep-alive
Host: localhost:8080
Referer: https://localhost:8080/geode/swagger-ui.html
Sec-Fetch-Mode: cors
Sec-Fetch-Site: same-origin
User-Agent: Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.87 Safari/537.36


###
https://localhost:8080/geode/v1/publisher/publisher%3Akey%3A0
accept: application/json;charset=UTF-8
Accept-Encoding: gzip, deflate, br
Accept-Language: en-US,en;q=0.9
Authorization: Basic aHc4Mzc3MDoxMjM=

相关文章

网友评论

      本文标题:vertx, kotlin, postgrest reactiv

      本文链接:https://www.haomeiwen.com/subject/lmilwctx.html