How do I connect to a Redshift cluster using Spark in my EMR cluster?

4 minute read
0

I want to connect an Amazon Redshift cluster using Apache Spark in my Amazon EMR cluster.

Resolution

Note: Configure your Redshift cluster and EMR cluster and install the Spark service before proceeding with the following steps.

Test connectivity from the EMR cluster to the Redshift cluster

1.    Verify that EMR primary, core, and task node security groups are allowed in Redshift's security group (inbound rules) for TCP port 5439. If the EMR and Redshift clusters are deployed in two different Amazon Virtual Private Clouds (Amazon VPC), configure the VPC peering.

2.    Connect to the EMR primary node using SSH and run the following Telnet command. This Telnet command verifies that you can establish a connection between the EMR cluster and the Redshift cluster. In the following command, replace Redshift_Endpoint with the correct endpoint for your Redshift cluster.

telnet Redshift_Endpoint 5439

The following is example output for a successful connection:

telnet redshift-cluster-1.XXXX.us-east-1.redshift.amazonaws.com 5439
Trying 172.31.48.21...
Connected to redshift-cluster-1.XXXXX.us-east-1.redshift.amazonaws.com.
Escape character is

Connect to the Redshift cluster using Spark in EMR-5.x.x series clusters

Use Databrick’s spark-redshift package (library). This library loads data into Spark SQL DataFrames from Amazon Redshift and also saves DataFrames back into Amazon Redshift tables.

1.    Connect to the EMR primary node using SSH.

2.    To work with the spark-redshift library, download the following .jar files into the EMR cluster:

wget https://repo1.maven.org/maven2/com/databricks/spark-redshift_2.11/2.0.1/spark-redshift_2.11-2.0.1.jar
wget https://github.com/ralfstx/minimal-json/releases/download/0.9.4/minimal-json-0.9.4.jar

3.    Copy the downloaded JAR files to the default Spark library. The Spark library path is /usr/lib/spark/jars/.

sudo cp spark-redshift_2.11-2.0.1.jar /usr/lib/spark/jars/
sudo cp minimal-json-0.9.4.jar /usr/lib/spark/jars/

4.    Run the spark-shell command using the Amazon Redshift JDBC driver to connect to the Redshift cluster. The JDBC driver is included in Amazon EMR versions 4.7.0 and above.

spark-shell --jars /usr/share/aws/redshift/jdbc/RedshiftJDBC41.jar

5.    After the spark-shell session is initialized, run the following steps to connect to the Redshift cluster. In the following commands, update the Amazon Redshift endpoint, Amazon Simple Storage Service (Amazon S3) bucket name, and table details according to your use case.

import com.amazonaws.auth.AWSCredentialsProvider
import com.amazonaws.auth.AWSSessionCredentials
import com.amazonaws.auth.InstanceProfileCredentialsProvider

// Instance Profile for authentication to AWS resources
val provider = new InstanceProfileCredentialsProvider();
val credentials: AWSSessionCredentials = provider.getCredentials.asInstanceOf[AWSSessionCredentials];
val token = credentials.getSessionToken;
val awsAccessKey = credentials.getAWSAccessKeyId;
val awsSecretKey = credentials.getAWSSecretKey

// Set JDBC URL of Redshift
val jdbcUrl = "jdbc:redshift://<cluster-name>.<id>.<region>.redshift.amazonaws.com:5439/<database>?user=<user>&password=<password>"

// Create DataFrame by loading Redshift query
val df = spark.read.format("com.databricks.spark.redshift").option("url", jdbcUrl).option("tempdir", "s3://<S3-path-to-store-temp-data>").option("query", "select * from <table-name>").option("temporary_aws_access_key_id", awsAccessKey).option("temporary_aws_secret_access_key", awsSecretKey).option("temporary_aws_session_token", token).load()
df.show(2)

Connect to Redshift cluster using Spark in Amazon EMR-6.x.x series clusters

Amazon EMR versions 6.x and above use Scala version 2.12. Amazon EMR 5.x uses Scala version 2.11. The spark-redshift_2.11-2.0.1.jar file that the Amazon EMR 5.x version uses isn't compatible with Amazon EMR version 6.x and above. So, use the spark-redshift_2.12-4.2.0.jar connector in Amazon EMR 6.x and above clusters.

1.    Connect to the EMR primary node using SSH.

2.    To work with the spark-redshift library, download the following .jar files into the EMR cluster:

wget https://repo1.maven.org/maven2/io/github/spark-redshift-community/spark-redshift_2.12/4.2.0/spark-redshift_2.12-4.2.0.jar
wget https://github.com/ralfstx/minimal-json/releases/download/0.9.4/minimal-json-0.9.4.jar

3.    Copy the downloaded JAR files to the default Spark library. The Spark library path is /usr/lib/spark/jars/.

sudo cp spark-redshift_2.12-4.2.0.jar /usr/lib/spark/jars/
sudo cp minimal-json-0.9.4.jar /usr/lib/spark/jars/

4.    Run the spark-shell command using the Amazon Redshift JDBC driver to connect to the Redshift cluster. the JDBC driver is included in EMR versions 4.7.0 and above.

spark-shell --jars /usr/share/aws/redshift/jdbc/RedshiftJDBC41.jar

5.    After the spark-shell session is initialized, run the following steps to connect to the Redshift cluster. In the following commands, update the Amazon Redshift endpoint, S3 bucket name, and table details according to your use case.

import com.amazonaws.auth.AWSCredentialsProvider
import com.amazonaws.auth.AWSSessionCredentials
import com.amazonaws.auth.InstanceProfileCredentialsProvider

// Instance Profile for authentication to AWS resources
val provider = new InstanceProfileCredentialsProvider();
val credentials: AWSSessionCredentials = provider.getCredentials.asInstanceOf[AWSSessionCredentials];
val token = credentials.getSessionToken;
val awsAccessKey = credentials.getAWSAccessKeyId;
val awsSecretKey = credentials.getAWSSecretKey

// Set JDBC URL of Redshift
val jdbcUrl = "jdbc:redshift://<cluster-name>.<id>.<region>.redshift.amazonaws.com:5439/<database>?user=<user>&password=<password>"

// Create DataFrame by loading Redshift query
val df = spark.read.format("io.github.spark_redshift_community.spark.redshift").option("url", jdbcUrl).option("tempdir", "s3://bucket/tmp/").option("query", "select * from <table>").option("temporary_aws_access_key_id", awsAccessKey).option("temporary_aws_secret_access_key", awsSecretKey).option("temporary_aws_session_token", token).load()
df.show(2)

AWS OFFICIAL
AWS OFFICIALUpdated a year ago
No comments