百度360必应搜狗淘宝本站头条
当前位置:网站首页 > IT技术 > 正文

在inteillj idea中使用Spark操作Hive

wptr33 2024-12-20 19:04 19 浏览

前言:

都知道,小编前面已经简单介绍过在windows下hadoop和hive环境搭建和基本使用。这次的Spark有点突兀,但是也可以先忽略,重要的是先在IDEA中安装bigData插件连接hadoop已经HDFS,而后再简单介绍使用Spark操作Hive。


Big Data Tools安装:

1. 点击File, 选择Settings,再选择Plugins搜索Big Data Tools,最后下载安装。

2. 下载完毕后,底部和右侧栏会多出Hadoop或Big Data Tools的选项。


连接方法:

1. 进入hadoop的sbin目录,start-all启动成功,打开web控制台127.0.0.1:50070(默认),记住如下标志的节点地址,后面hdfs连接的就是这个。

2. 只要hadoop启动成功后,打开IDEA的hadoop其实就可以正常自动连接了。

3. 或者打开右侧栏的Big Data Tools,添加一个连接,Hadoop。

4. 连接Hdfs。

(1). 点击右侧栏Big Data Tools新增Hdfs。

(2). 重要的就是Authentication type,选择Explicit uri。File system URI填写的就是上面控制台的节点地址。

(3). 连接成功后就可以清晰的看到HDFS的目录,并且可以创建,删除和上传。不过需要对指定路径授权。


Hive操作:

关于操作Hive, 以下基于Maven构建Scala项目。项目创建和Hive就略过了,好像在Kafka一文中介绍过如何新建Maven的Scala,而Hive的产品还是原理介绍网上比较多,以下主要是小编的日志式记录,所以以过程居多,那么就开始了。

1. pom.xml添加如下依赖并安装(其实是我整个文件,不需要的可以根据注释删除)。

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>org.example</groupId>
  <artifactId>maven_scala_test</artifactId>
  <version>1.0-SNAPSHOT</version>
  <name>${project.artifactId}</name>
  <description>My wonderfull scala app</description>
  <inceptionYear>2015</inceptionYear>
  <licenses>
    <license>
      <name>My License</name>
      <url>http://....</url>
      <distribution>repo</distribution>
    </license>
  </licenses>

  <properties>
    <maven.compiler.source>1.6</maven.compiler.source>
    <maven.compiler.target>1.6</maven.compiler.target>
    <encoding>UTF-8</encoding>
    <scala.version>2.11.5</scala.version>
    <scala.compat.version>2.11</scala.compat.version>
    <spark.version>2.2.0</spark.version>
    <hadoop.version>2.6.0</hadoop.version>
    <hbase.version>1.2.0</hbase.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>

    <!-- Test -->
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
<!--      <scope>test</scope>-->
    </dependency>
    <dependency>
      <groupId>org.specs2</groupId>
      <artifactId>specs2-core_${scala.compat.version}</artifactId>
      <version>2.4.16</version>
<!--      <scope>test</scope>-->
    </dependency>
    <dependency>
      <groupId>org.scalatest</groupId>
      <artifactId>scalatest_${scala.compat.version}</artifactId>
      <version>2.2.4</version>
<!--      <scope>test</scope>-->
    </dependency>

    <!--scala-->
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>

    <!-- spark -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>${spark.version}</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-mllib_2.11</artifactId>
      <version>${spark.version}</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-hive_2.11</artifactId>
      <version>${spark.version}</version>
      <!--<scope>provided</scope>-->
    </dependency>



    <!-- hadoop -->
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>${hadoop.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-common</artifactId>
      <version>${hadoop.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-hdfs</artifactId>
      <version>${hadoop.version}</version>
    </dependency>

    <!--hbase-->
    <dependency>
      <groupId>org.apache.hbase</groupId>
      <artifactId>hbase-client</artifactId>
      <version>${hbase.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hbase</groupId>
      <artifactId>hbase-server</artifactId>
      <version>${hbase.version}</version>
    </dependency>

    <!--kafka-->
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.11</artifactId>
      <version>1.1.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>1.1.0</version>
    </dependency>

  </dependencies>

  <build>
    <sourceDirectory>src/main/scala</sourceDirectory>
    <testSourceDirectory>src/test/scala</testSourceDirectory>
    <plugins>
      <plugin>
        <!-- see http://davidb.github.com/scala-maven-plugin -->
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.2.0</version>
        <executions>
          <execution>
            <goals>
              <goal>compile</goal>
              <goal>testCompile</goal>
            </goals>
            <configuration>
              <args>
<!--                <arg>-make:transitive</arg>-->
                <arg>-dependencyfile</arg>
                <arg>${project.build.directory}/.scala_dependencies</arg>
              </args>
            </configuration>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-surefire-plugin</artifactId>
        <version>2.18.1</version>
        <configuration>
          <useFile>false</useFile>
          <disableXmlReport>true</disableXmlReport>
          <!-- If you have classpath issue like NoDefClassError,... -->
          <!-- useManifestOnlyJar>false</useManifestOnlyJar -->
          <includes>
            <include>**/*Test.*</include>
            <include>**/*Suite.*</include>
          </includes>
        </configuration>
      </plugin>
    </plugins>
  </build>
</project>

2. 项目的resources新建元数据文件,可以是txt,以空格为列,换行为行,这里对hive表格创建时重要。

在通过HQL创建表格,如何没有指定分列和分行表示,再通过HQL的select查询数据都是NULL,具体可以看下面代码演示。

3. 加载源数据文件,只需要项目根目录以下的路径即可。比如resouces下的hello.txt只需要指定

src/main/resources/hello.txt

4. Hive相关操作的代码。

这里需要注意的是,hive中的Default(默认)数据仓库的最原始位置是在hdfs上的 /user/hive/warehouse,也就是以后在默认下,新建的表都在那个目录下。

而仓库的原始位置是本地的/usr/local/hive/conf/hive-default.xml.template文件里配置

package com.xudong

import org.apache.spark.sql.SparkSession

object TestSparkHiveHql {

  def main(args: Array[String]): Unit = {

      // 创建spark环境
      val spark = SparkSession
        .builder()
        .appName("Spark Hive HQL")
        .master("local[*]")
        .config("spark.sql.warehouse.dir","hdfs://rebuildb.xdddsd75.com:9500/user/hive/warehouse")
        .enableHiveSupport()
        .getOrCreate();

    import spark.implicits._
    import spark.sql

    // 显示HDFS数据库
    spark.sql("show databases").show();
    // 使用指定数据库
    spark.sql("use default");
    // 创建表格并约定字段
    spark.sql("CREATE TABLE users(id int, name string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' LINES TERMINATED BY '\\n' STORED AS TEXTFILE");
    // 将本地数据加载到表格
    spark.sql("LOAD DATA LOCAL INPATH 'src/main/resources/hello.txt' overwrite into table users");

    // 查询表格数据HQL
    spark.sql("SELECT * FROM users").show()

    // 聚合统计表格数据条数HQL
    spark.sql("SELECT COUNT(*) FROM users").show()

  }

}

5. hdfs简单操作示例。

package com.xudong

package com.dkl.leanring.spark.hdfs
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileUtil;
import scala.collection.mutable.ArrayBuffer

/**
 * 主要目的是打印某个hdfs目录下所有的文件名,包括子目录下的
 * 其他的方法只是顺带示例,以便有其它需求可以参照改写
 */
object FilesList {

  def main(args: Array[String]): Unit = {

    val path = "hdfs://rebuildb.hhyp75.com:9500/tmp/hive"
    println("打印所有的文件名,包括子目录")

    listAllFiles(path)

    println("打印一级文件名")

    listFiles(path)
    println("打印一级目录名")

    listDirs(path)
    println("打印一级文件名和目录名")

    listFilesAndDirs(path)

    // getAllFiles(path).foreach(println)
    // getFiles(path).foreach(println)
    // getDirs(path).foreach(println)
  }

  def getHdfs(path: String) = {
    val conf = new Configuration()
    FileSystem.get(URI.create(path), conf)
  }

  def getFilesAndDirs(path: String): Array[Path] = {
    val fs = getHdfs(path).listStatus(new Path(path))
    FileUtil.stat2Paths(fs)
  }
  /**************直接打印************/

  /**
   * 打印所有的文件名,包括子目录
   */
  def listAllFiles(path: String) {
    val hdfs = getHdfs(path)
    val listPath = getFilesAndDirs(path)
    listPath.foreach(path => {
      if (hdfs.getFileStatus(path).isFile())
        println(path)
      else {
        listAllFiles(path.toString())
      }
    })
  }

  /**
   * 打印一级文件名
   */
  def listFiles(path: String) {
    getFilesAndDirs(path).filter(getHdfs(path).getFileStatus(_).isFile()).foreach(println)
  }

  /**
   * 打印一级目录名
   */
  def listDirs(path: String) {
    getFilesAndDirs(path).filter(getHdfs(path).getFileStatus(_).isDirectory()).foreach(println)
  }

  /**
   * 打印一级文件名和目录名
   */
  def listFilesAndDirs(path: String) {
    getFilesAndDirs(path).foreach(println)
  }

  /**************直接打印************/
  /**************返回数组************/
  def getAllFiles(path: String): ArrayBuffer[Path] = {
    val arr = ArrayBuffer[Path]()
    val hdfs = getHdfs(path)
    val listPath = getFilesAndDirs(path)
    listPath.foreach(path => {
      if (hdfs.getFileStatus(path).isFile()) {
        arr += path
      } else {
        arr ++= getAllFiles(path.toString())
      }
    })
    arr
  }

  def getFiles(path: String): Array[Path] = {
    getFilesAndDirs(path).filter(getHdfs(path).getFileStatus(_).isFile())
  }

  def getDirs(path: String): Array[Path] = {
    getFilesAndDirs(path).filter(getHdfs(path).getFileStatus(_).isDirectory())
  }

  /**************返回数组************/
}

6. spark的wordCount示例。

package com.xudong

import org.apache.spark.mllib.linalg.{Matrices, Matrix}
import org.apache.spark.{SparkContext, SparkConf}

object TestSparkHdfs {

  def main(args: Array[String]): Unit = {

    val conf=new SparkConf().setAppName("SparkHive").setMaster("local")   //可忽略,已经自动创建了
    val sc=new SparkContext(conf)  //可忽略,已经自动创建了

    val textFile = sc.textFile("hdfs://rebuildb.fdfp75.com:9500/tmp/spark/test/workd.txt");
    val counts = textFile.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _);
    counts.saveAsTextFile("hdfs://rebuildb.fdfd75.com:9500/tmp/spark/test/wordcount/output");

  }

}
package com.xudong

import org.apache.spark.mllib.linalg.{Matrices, Matrix}
import org.apache.spark.{SparkContext, SparkConf}

object WordCountLocal {

  def main(args: Array[String]) {

    /**
     * SparkContext 的初始化需要一个SparkConf对象
     * SparkConf包含了Spark集群的配置的各种参数
    */
    val conf = new SparkConf()
      .setMaster("local")         // 启动本地化计算
      .setAppName("testRdd")      // 设置本程序名称

    // Spark程序的编写都是从SparkContext开始的
    val sc = new SparkContext(conf)

    // 以上的语句等价与val sc=new SparkContext("local","testRdd")
    val data = sc.textFile("E:\\4work\\27java\\1_1_Movie_Recommend\\maven_scala_test\\src\\main\\resources\\hello.txt") // 读取本地文件

    data.flatMap(_.split(" "))      // 下划线是占位符,flatMap是对行操作的方法,对读入的数据进行分割
      .map((_, 1))                         // 将每一项转换为key-value,数据是key,value是1
      .reduceByKey(_ + _)                  // 将具有相同key的项相加合并成一个
      .collect()                           // 将分布式的RDD返回一个单机的scala array,在这个数组上运用scala的函数操作,并返回结果到驱动程序
      .foreach(println)                   // 循环打印
  }

}

相关推荐

redis的八种使用场景

前言:redis是我们工作开发中,经常要打交道的,下面对redis的使用场景做总结介绍也是对redis举报的功能做梳理。缓存Redis最常见的用途是作为缓存,用于加速应用程序的响应速度。...

基于Redis的3种分布式ID生成策略

在分布式系统设计中,全局唯一ID是一个基础而关键的组件。随着业务规模扩大和系统架构向微服务演进,传统的单机自增ID已无法满足需求。高并发、高可用的分布式ID生成方案成为构建可靠分布式系统的必要条件。R...

基于OpenWrt系统路由器的模式切换与网页设计

摘要:目前商用WiFi路由器已应用到多个领域,商家通过给用户提供一个稳定免费WiFi热点达到吸引客户、提升服务的目标。传统路由器自带的Luci界面提供了工厂模式的Web界面,用户可通过该界面配置路...

这篇文章教你看明白 nginx-ingress 控制器

主机nginx一般nginx做主机反向代理(网关)有以下配置...

如何用redis实现注册中心

一句话总结使用Redis实现注册中心:服务注册...

爱可可老师24小时热门分享(2020.5.10)

No1.看自己以前写的代码是种什么体验?No2.DooM-chip!国外网友SylvainLefebvre自制的无CPU、无操作码、无指令计数器...No3.我认为CS学位可以更好,如...

Apportable:拯救程序员,IOS一秒变安卓

摘要:还在为了跨平台使用cocos2d-x吗,拯救objc程序员的奇葩来了,ApportableSDK:FreeAndroidsupportforcocos2d-iPhone。App...

JAVA实现超买超卖方案汇总,那个最适合你,一篇文章彻底讲透

以下是几种Java实现超买超卖问题的核心解决方案及代码示例,针对高并发场景下的库存扣减问题:方案一:Redis原子操作+Lua脚本(推荐)//使用Redis+Lua保证原子性publicbo...

3月26日更新 快速施法自动施法可独立设置

2016年3月26日DOTA2有一个79.6MB的更新主要是针对自动施法和快速施法的调整本来内容不多不少朋友都有自动施法和快速施法的困扰英文更新日志一些视觉BUG修复就不翻译了主要翻译自动施...

Redis 是如何提供服务的

在刚刚接触Redis的时候,最想要知道的是一个’setnameJhon’命令到达Redis服务器的时候,它是如何返回’OK’的?里面命令处理的流程如何,具体细节怎么样?你一定有问过自己...

lua _G、_VERSION使用

到这里我们已经把lua基础库中的函数介绍完了,除了函数外基础库中还有两个常量,一个是_G,另一个是_VERSION。_G是基础库本身,指向自己,这个变量很有意思,可以无限引用自己,最后得到的还是自己,...

China&#39;s top diplomat to chair third China-Pacific Island countries foreign ministers&#39; meeting

BEIJING,May21(Xinhua)--ChineseForeignMinisterWangYi,alsoamemberofthePoliticalBureau...

移动工作交流工具Lua推出Insights数据分析产品

Lua是一个适用于各种职业人士的移动交流平台,它在今天推出了一项叫做Insights的全新功能。Insights是一个数据平台,客户可以在上面实时看到员工之间的交流情况,并分析这些情况对公司发展的影响...

Redis 7新武器:用Redis Stack实现向量搜索的极限压测

当传统关系型数据库还在为向量相似度搜索的性能挣扎时,Redis7的RedisStack...

Nginx/OpenResty详解,Nginx Lua编程,重定向与内部子请求

重定向与内部子请求Nginx的rewrite指令不仅可以在Nginx内部的server、location之间进行跳转,还可以进行外部链接的重定向。通过ngx_lua模块的Lua函数除了能实现Nginx...