Skip to content

TiSpark with multiple catalogs

shiyuhang0 edited this page Jun 20, 2022 · 2 revisions

This article introduces how to use TiSpark with multiple catalogs, take hive for example.

Version

  • TiSpark: 2.5.1
  • Spark: 3.1

Config

Add the following configs into spark.default.conf

spark.sql.extensions  org.apache.spark.sql.TiExtensions
spark.tispark.pd.addresses  ${your_pd_adress}
spark.sql.catalog.tidb_catalog  org.apache.spark.sql.catalyst.catalog.TiCatalog
spark.sql.catalog.tidb_catalog.pd.addresses  ${your_pd_adress}

Use TiSpark with Hive

  1. Create TiDB table with JDBC
CREATE TABLE `test`.`tidb` (
  `id` int(11) NOT NULL, 
  `name` varchar(255) NOT NULL, 
  PRIMARY KEY (`id`)
);
  1. run spark-sql
./bin/spark-sql --jars tispark-assembly-3.0-2.5.1.jar
  1. create hive table and insert
CREATE TABLE IF NOT EXISTS hive (id INT, sex STRING) USING hive;
insert into spark_catalog.default.hive values(1,'male');
  1. select from TiDB and hive
select * from tidb_catalog.test.tidb a left join spark_catalog.default.hive b on a.id = b.id

Use TiSpark with MySQL

make sure you have added the mysql-connector-java into spark jars.

  1. create TiDB table with JDBC
CREATE TABLE `test`.`tidb` (
  `id` int(11) NOT NULL, 
  `name` varchar(255) NOT NULL, 
  PRIMARY KEY (`id`)
);
  1. create MySQL table with JDBC
CREATE TABLE `test`.`mysql`(
  `id` int(11) NOT NULL, 
  `age` int(11) NOT NULL, 
  PRIMARY KEY (`id`)
);
  1. run spark-sql
./bin/spark-sql --jars tispark-assembly-3.0-2.5.1.jar
  1. create temp view for MySQL
CREATE TEMPORARY VIEW jdbcTable USING jdbc OPTIONS(url "jdbc:mysql://127.0.0.1:3306/test?useSSL=false",dbtable "mysql",user 'root', password 'password');
  1. select from TiDB and MySQL
select * from tidb_catalog.test.tidb a left join jdbcTable b on a.id = b.id;

TiSpark && HDFS

  1. create TiDB Table
CREATE TABLE `test`.`tidb` (
  `id` int(11) NOT NULL, 
  `name` varchar(255) NOT NULL, 
  PRIMARY KEY (`id`)
);
  1. create data.csv

take 0x01 delimiter and \n newline as example

2^Ashi
3^Ayu
  1. load the CSV to hdfs
hdfs dfs -put data.csv /
  1. start spark-shell
./bin/spark-shell --jars tispark-assembly-3.0-2.5.1.jar
  1. write to TiDB from HDFS

you can also use Spark JDBC DataSource to do it, see here

import org.apache.spark.sql.types._

// read from hdfs
val schema = new StructType().add("id",IntegerType).add("name",StringType)
val df = spark.read.format("csv").option("delimiter","\u0001").schema(schema).load("hdfs://${ip:port}/data.csv")

// write to tidb
val tidbOptions = Map(
  "tidb.addr" -> "${ip}",
  "tidb.password" -> "",
  "tidb.port" -> "4000",
  "tidb.user" -> "root"
)
df.write.format("tidb").options(tidbOptions).option("database", "test").option("table", "tidb").mode("append").save()

// if we want to replace
df.write.format("tidb").options(tidbOptions).option("database", "test").option("table", "tidb").option("replace","true").mode("append").save()

  1. write to HDFS from TiDB

you can also use Spark JDBC DataSource to do it, see here

// read from tidb
val df = spark.sql("select * from tidb_catalog.test.tidb")

// write to hdfs
df.write.format("csv").option("delimiter","\u0001").save("hdfs://${ip:port}/save") 

zip file

Spark support read from the zipped file in Hadoop directly, take gz file for example:

val schema = new StructType().add("id",IntegerType).add("name",StringType)
val df = spark.read.format("csv").option("delimiter","\u0001").schema(schema).load("hdfs://${ip:port}/data.csv.gz")
Clone this wiki locally