美文网首页
scala 如何向RabbitMQ队列写消息(生产者)

scala 如何向RabbitMQ队列写消息(生产者)

作者: 踽踽独行DIY | 来源:发表于2019-08-06 15:57 被阅读0次

    1.首先,你需要一个RabbitMQ客户端的jar包

    链接: https://pan.baidu.com/s/1JFZ7jblpAIsQ29k2pSaxDA 提取码: w1mb 

    2.我的实现代码:

    import com.rabbitmq.client.{Channel, ConnectionFactory,Address}

    def send(input: JSONObject, source:String):String = {

    val EXCHANGE_NAME = source

    val factory =new ConnectionFactory()

    factory.setPort(指定端口)

    factory.setVirtualHost(指定的VH)

    factory.setUsername(账号)

    factory.setPassword(密码)

    val host1 =new Address(指定的第一个ip,指定的端口)

    val host2 =new Address(指定的第二个ip,指定的端口)

    val site: Array[Address] =Array(host1,host2)

    val conn = factory.newConnection(site)

    val channel = conn.createChannel()

    // durable = true, exclusive = false, autoDelete = false, arguments = null

      val msg = input.toJSONString

    //  channel.exchangeDeclare(EXCHANGE_NAME,"direct",true)

      channel.basicPublish("",EXCHANGE_NAME, null, msg.getBytes())

    println("消息已发送")

    channel.close()

    msg

    }

    val final_value: Array[JSONObject] = post_apply_exp_final_json.collect()

    for (i <-0 until final_value.length){

    send(final_value(i))

    }

    //1.由于我写入的是ObjectJson,因此值类型如此定义。具体请视自身情况修改

    //2.Rabbit常用的是订阅模式(Fanout)和路由模式(Direct),在实现时略有不同,路由需要匹配RoutingKey,订阅模式需要定义ExchangeName,我写的是Direct模式的

    相关文章

      网友评论

          本文标题:scala 如何向RabbitMQ队列写消息(生产者)

          本文链接:https://www.haomeiwen.com/subject/sxkwdctx.html