使用IDEA运行sparkStreaming时报错Excepti

作者: yimengtianya1 | 来源:发表于2019-07-30 15:33 被阅读2次

    使用IDEA运行sparkStreaming时报错Exception in thread "main" java.lang.NullPointerException

    Exception in thread "main" java.lang.NullPointerException   
    at java.io.Reader.<init>(Reader.java:78)    
    at java.io.InputStreamReader.<init>(InputStreamReader.java:129)     
    at scala.io.BufferedSource.reader(BufferedSource.scala:24)  
    at scala.io.BufferedSource.bu
    

    由于IDEA没有设置resources源,导致程序读取不到文件

    val namesResource = this.getClass.getResourceAsStream("/names.csv")
    

    解决办法是:在IDEA上设置数据源文件夹位置
    具体设置如下:


    图片.png
    图片.png

    源代码:

    import java.io.PrintWriter
    import java.net.ServerSocket
    import org.apache.log4j.{Level, Logger}//设置打印内容的级别
    
    import scala.util.Random
    
    object StreamingProducer {
    
      def main(args: Array[String]) {
        Logger.getLogger("org").setLevel(Level.ERROR)//只打印ERROR级别的日志
    
        val random = new Random()
    
        // Maximum number of events per second
        val MaxEvents = 6
    
        // Read the list of possible names
        val namesResource = this.getClass.getResourceAsStream("/names.csv")
        val names = scala.io.Source.fromInputStream(namesResource)
          .getLines()
          .toList
          .head
          .split(",")
          .toSeq
    
        // Generate a sequence of possible products
        val products = Seq(
          "iPhone Cover" -> 9.99,
          "Headphones" -> 5.49,
          "Samsung Galaxy Cover" -> 8.95,
          "iPad Cover" -> 7.49
        )
    
        /** Generate a number of random product events */
        def generateProductEvents(n: Int) = {
          (1 to n).map { i =>
            val (product, price) = products(random.nextInt(products.size))
            val user = random.shuffle(names).head
            (user, product, price)
          }
        }
    
        // create a network producer
        val listener = new ServerSocket(9999)
        println("Listening on port: 9999")
    
        while (true) {
          val socket = listener.accept()
          new Thread() {
            override def run = {
              println("Got client connected from: " + socket.getInetAddress)
              val out = new PrintWriter(socket.getOutputStream(), true)
    
              while (true) {
                Thread.sleep(1000)
                val num = random.nextInt(MaxEvents)
                val productEvents = generateProductEvents(num)
                productEvents.foreach{ event =>
                  out.write(event.productIterator.mkString(","))
                  out.write("\n")
                }
                out.flush()
                println(s"Created $num events...")
              }
              socket.close()
            }
          }.start()
        }
      }
    }

    相关文章

      网友评论

        本文标题:使用IDEA运行sparkStreaming时报错Excepti

        本文链接:https://www.haomeiwen.com/subject/rtsnrctx.html