How To: Spark with Postgres

Spark Postgres

One benefit of using Apache Spark is that the read and write operations can be fairly easily modified to work with different data systems without having to change the core logic of the code.  I recently converted some Python read and write functions to use Spark in order to get the benefits of cluster computing.  The implementation is not very challenging and the documentation will get you close, but submitting the jobs with spark-submit took a few parameters worth highlighting.  I will be using Spark 2.2 for these examples.

I mostly used the info in this book section https://jaceklaskowski.gitbooks.io/mastering-apache-spark/exercises/spark-exercise-dataframe-jdbc-postgresql.html
https://stephanefrechette.com/connect-sql-server-using-apache-spark/#.Wg4dp7T83WZ written by Jacek Laskowski to figure out the driver and parameters, but approached it a little differently and with Python instead of Scala.

Prior to running the Spark code we need to have a Postgres driver installed on the machine we use to submit our spark code.  You can download from Maven repository to whatever filename you want, I will use /opt/jdbc/postgresjdbc/postgresql-42.1.1.jar for this example.

The first step is to get reading from a table working, so let’s create a file `read_from_database.py` with this Spark code required to read from Postgres:

from pyspark import SparkContext
from pyspark.sql import SparkSession

POSTGRES_HOST ='localhost'
POSTGRES_USER ='postgres'
POSTGRES_DB ='postgres'
POSTGRES_PASSWORD =''

def load_from_postgres(table):
    spark = SparkSession.builder.appName("read_from_postgres").getOrCreate()

   df = spark.read.format("jdbc").options(
       source="jdbc",
       url="jdbc:postgresql://{0}:5432/{1}".format(POSTGRES_HOST, POSTGRES_DB),
       user=POSTGRES_USER,
       password=POSTGRES_PASSWORD,
       dbtable=table).load()

   return df

table = "vehicle_stops"
df = load_from_postgres()
print(df.show())

NOTE: Modify the POSTGRES_* variables for the instance you are using.  I am using the docker image kiasaki/alpine-postgres on my Mac and can help with additional info for that setup if you need.

The basic way of running this via spark-submit is:

spark-submit –master local[2] read_from_database.py

but this simple command is missing a parameter to specify the driver so you will get this error in the spark logging:

java.sql.SQLException: No suitable driver 

Instead, we need to specify the driver via parameter –driver-class-path during spark-submit:

spark-submit –master local[2] –driver-class-path /opt/jdbc/postgresjdbc/postgresql-42.1.1.jar read_from_database.py

At this point it should work and print out the contents of whichever table you are using in the table variable.

The next step for me was deploying to an EMR cluster which is running YARN, and for that I had to also include the driver file in the parameter jars.  NOTE: I submitted this from a node in the EMR cluster since some YARN configuration files need to be present for it to work.

spark-submit –deploy-mode cluster –master yarn –driver-class-path /usr/lib/jdbc/postgresql-42.1.1.jar –driver-memory 8g –executor-memory 8g –jars /usr/lib/jdbc/postgresql-42.1.1.jar read_from_database.py

There is another parameter you will need to add if you importing additional python files such as a config file, for example: –py-files config.py

At this point you hopefully have read from a database successfully, lets see what it looks like to write to a different table in the same database.  Here is the full code with additional steps to write back to Postgres:

from pyspark import SparkContext
from pyspark.sql import SparkSession

POSTGRES_HOST = 'localhost'
POSTGRES_USER = 'postgres'
POSTGRES_DB = 'postgres'
POSTGRES_PASSWORD = ''

def load_from_postgres(table):
    spark = SparkSession.builder.appName("read_from_postgres").getOrCreate()

    df = spark.read.format("jdbc").options(
        source="jdbc",
        url="jdbc:postgresql://{0}:5432/{1}".format(POSTGRES_HOST, POSTGRES_DB),
        user=POSTGRES_USER,
        password=POSTGRES_PASSWORD,
        dbtable=table).load()

    return df

def write_to_postgres(df, table, mode = 'append'):
    db_properties = {
        "user": POSTGRES_USER,
        "password": POSTGRES_PASSWORD
    }

    df.write.jdbc(
        url="jdbc:postgresql://{0}:5432/{1}".format(POSTGRES_HOST, POSTGRES_DB),
        properties = db_properties,
        table=table,
        mode = mode)

table = "vehicle_stops"
df = load_from_postgres(table)
print(df.show())
target_table = "vehicle_stops2"
write_to_postgres(df, target_table)

And finally, if you were using both Postgres and SQL Server in the same spark script you would need to pass multiple values for these parameters.  Here is what it would look like to specify multiple drivers when submitting to yarn cluster (code to write to sql not included, but very similar to Postgres).

spark-submit –deploy-mode cluster –master yarn –driver-class-path /usr/lib/jdbc/postgresql-42.1.1.jar:/usr/lib/jdbc/sqljdbc42.jar –driver-memory 8g –executor-memory 8g –jars /usr/lib/jdbc/postgresql-42.1.1.jar,/usr/lib/jdbc/sqljdbc42.jar read_from_database.py 

Hopefully this was a helpful tour of the basics of reading/writing from databases and submitting to a cluster.  Feel free to send me questions if you hit issues with this code.

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

w

Connecting to %s