How To: Spark with Postgres

Spark Postgres

One benefit of using Apache Spark is that the read and write operations can be fairly easily modified to work with different data systems without having to change the core logic of the code.  I recently converted some Python read and write functions to use Spark in order to get the benefits of cluster computing.  The implementation is not very challenging and the documentation will get you close, but submitting the jobs with spark-submit took a few parameters worth highlighting.  I will be using Spark 2.2 for these examples.

I mostly used the info in this book section https://jaceklaskowski.gitbooks.io/mastering-apache-spark/exercises/spark-exercise-dataframe-jdbc-postgresql.html
https://stephanefrechette.com/connect-sql-server-using-apache-spark/#.Wg4dp7T83WZ written by Jacek Laskowski to figure out the driver and parameters, but approached it a little differently and with Python instead of Scala.

Prior to running the Spark code we need to have a Postgres driver installed on the machine we use to submit our spark code.  You can download from Maven repository to whatever filename you want, I will use /opt/jdbc/postgresjdbc/postgresql-42.1.1.jar for this example.

The first step is to get reading from a table working, so let’s create a file `read_from_database.py` with this Spark code required to read from Postgres:

from pyspark import SparkContext
from pyspark.sql import SparkSession

POSTGRES_HOST ='localhost'
POSTGRES_USER ='postgres'
POSTGRES_DB ='postgres'
POSTGRES_PASSWORD =''

def load_from_postgres(table):
    spark = SparkSession.builder.appName("read_from_postgres").getOrCreate()

   df = spark.read.format("jdbc").options(
       source="jdbc",
       url="jdbc:postgresql://{0}:5432/{1}".format(POSTGRES_HOST, POSTGRES_DB),
       user=POSTGRES_USER,
       password=POSTGRES_PASSWORD,
       dbtable=table).load()

   return df

table = "vehicle_stops"
df = load_from_postgres()
print(df.show())

NOTE: Modify the POSTGRES_* variables for the instance you are using.  I am using the docker image kiasaki/alpine-postgres on my Mac and can help with additional info for that setup if you need.

The basic way of running this via spark-submit is:

spark-submit –master local[2] read_from_database.py

but this simple command is missing a parameter to specify the driver so you will get this error in the spark logging:

java.sql.SQLException: No suitable driver 

Instead, we need to specify the driver via parameter –driver-class-path during spark-submit:

spark-submit –master local[2] –driver-class-path /opt/jdbc/postgresjdbc/postgresql-42.1.1.jar read_from_database.py

At this point it should work and print out the contents of whichever table you are using in the table variable.

The next step for me was deploying to an EMR cluster which is running YARN, and for that I had to also include the driver file in the parameter jars.  NOTE: I submitted this from a node in the EMR cluster since some YARN configuration files need to be present for it to work.

spark-submit –deploy-mode cluster –master yarn –driver-class-path /usr/lib/jdbc/postgresql-42.1.1.jar –driver-memory 8g –executor-memory 8g –jars /usr/lib/jdbc/postgresql-42.1.1.jar read_from_database.py

There is another parameter you will need to add if you importing additional python files such as a config file, for example: –py-files config.py

At this point you hopefully have read from a database successfully, lets see what it looks like to write to a different table in the same database.  Here is the full code with additional steps to write back to Postgres:

from pyspark import SparkContext
from pyspark.sql import SparkSession

POSTGRES_HOST = 'localhost'
POSTGRES_USER = 'postgres'
POSTGRES_DB = 'postgres'
POSTGRES_PASSWORD = ''

def load_from_postgres(table):
    spark = SparkSession.builder.appName("read_from_postgres").getOrCreate()

    df = spark.read.format("jdbc").options(
        source="jdbc",
        url="jdbc:postgresql://{0}:5432/{1}".format(POSTGRES_HOST, POSTGRES_DB),
        user=POSTGRES_USER,
        password=POSTGRES_PASSWORD,
        dbtable=table).load()

    return df

def write_to_postgres(df, table, mode = 'append'):
    db_properties = {
        "user": POSTGRES_USER,
        "password": POSTGRES_PASSWORD
    }

    df.write.jdbc(
        url="jdbc:postgresql://{0}:5432/{1}".format(POSTGRES_HOST, POSTGRES_DB),
        properties = db_properties,
        table=table,
        mode = mode)

table = "vehicle_stops"
df = load_from_postgres(table)
print(df.show())
target_table = "vehicle_stops2"
write_to_postgres(df, target_table)

And finally, if you were using both Postgres and SQL Server in the same spark script you would need to pass multiple values for these parameters.  Here is what it would look like to specify multiple drivers when submitting to yarn cluster (code to write to sql not included, but very similar to Postgres).

spark-submit –deploy-mode cluster –master yarn –driver-class-path /usr/lib/jdbc/postgresql-42.1.1.jar:/usr/lib/jdbc/sqljdbc42.jar –driver-memory 8g –executor-memory 8g –jars /usr/lib/jdbc/postgresql-42.1.1.jar,/usr/lib/jdbc/sqljdbc42.jar read_from_database.py 

Hopefully this was a helpful tour of the basics of reading/writing from databases and submitting to a cluster.  Feel free to send me questions if you hit issues with this code.

Advertisements

Big Data Kickstart

Big Data is everywhere, here is how to manage it…

Managing big data is critical for many organizations. Analytics can improve products and inform critical business decisions. Using data can provide distinct advantages, and it’s likely that an organization’s competitors are already leveraging their data. But if you have not started down the path yet, it can be a challenge to get your big data initiative off the ground.

Often the world of big data sounds like a magical place where smart data geeks just take data and come back with the answers to all of life’s questions. As you may suspect, it doesn’t actually work this way. There is hard technical work and even harder shifts in how organizational leaders think about the role of data. But the hard work is worth it if it makes the company better and customers happier, which we know is possible based on all the success stories that have been shared publicly. If you are wondering how to approach managing and leveraging data to improve your organization, the steps outlined below provide a way to approach data and analytics with a focus on building a foundation. This foundation will help an organization deal with big data (think many terabytes or petabytes). These steps are meant to drive a long term data focus so you may approach some of the steps in parallel and you will definitely iterate on the steps in the long run.

1. Collect

The first step in getting value from data is to collect it, big or small. Rather than spending days, weeks, or months discussing which data to collect I urge you to start with the mindset that all data related to your organization should be collected. I prefer collecting the data into one data ecosystem to minimize the challenge of finding the data when it’s time to do analysis, but even if you choose a decentralized approach to collection and storage it is important to set standards across departments. You should provide guidance across your organization on how data should be captured and history stored for all processes. You should aim to capture both local data as well as third party services you might use. You would expect to capture all the business-side data from CRMs (such as Salesforce), your company website, accounting systems, HR systems, online marketing tools, and so on. There is also a wealth of data to collect on the product or service you provide (web application). This could include data captured by the system, error and application logs, web analytics (such as Adobe or Google analytics), and customer feedback through surveys. Analysts in your organization will inevitably choose to tie together data from each source system at some point so storing it in one big data ecosystem (such as Hadoop) can decrease the effort needed to join the data together.

2. Make Available

The next focus is making sure those who want to get value from the data can access it. The data should be useful with tools they are able to use. For some teams this will be a spreadsheet and for others it will be more sophisticated reporting software. You will want to focus more on self-service when there is a lot of data to choose from. For big data environments you will want to eliminate roadblocks. A common roadblock is relying on a “one-off” data request process that relies on an overloaded engineering team to create or modify views specific to each analytics request. Ideally all of the data collected will automatically be available for analysts to pull into the tool of their choice without having to request it. Realistically, however, you will likely want to pick some high value data sets to start. This will allow you to think about how to make all the data available in the future without getting caught up with fully automating steps that might need to change once you have a little more experience.

Once you make your first data set available you should market it to the internal teams and offer support for them to get started. This part of the process is a great place to gather feedback on the value, utility and ease of use for others. With this feedback and the experience gained by collecting and making it available in your big data environment, you should be able to decide how to improve the process to be scalable for many more data sets.

3. Train

Once your first data set is available, you should begin putting significant energy into training. You will need to educate analysts on retrieving data and your organization’s decision makers on how they can get the information they need from the data. You will need to cater to varying levels of technical proficiency ranging from those with formal technology training to those in business units who became analysts along the way, so several levels of training will be most effective. One of the goals of training is that department leaders should know what capabilities exist, in particular who they can talk to regarding their department’s data goals as well as any requests for analytics. You will need to provide the support for those individuals handling departmental analytics to be successful, especially in the early stages of your big data journey. A lot of the training time will be focused on making sure the data analysts/scientists can access the data and transform it in a way that fits their project, but you must not neglect making sure the department leaders know how involved they should be in driving the projects and understanding what was learned with each one.

4. Trust

To unlock the potential of big data, there is a vital need to trust others in your organization with the hard work of making sense of it. You can’t rely entirely on a central data team for all of your reporting and analytics. The reason you need to trust others in the organization is simple: there is too much data with too much potential to improve your organization to keep it locked up where only a small group of privileged data experts can access it. An exception to this is that you will still keep financial and personally identifiable information (PII) locked up tight. One example of trust is when an analyst has an idea to take product usage data and feed it into a model to determine what type of emails should be sent to improve conversions (more future purchases, longer retention, etc). Senior data scientists may have concerns about the analyst attempting this project on their own while no data experts available to partner on the project. When you have trust as a component of your data strategy, you should have an open communication channel for the work done by the analyst to happen independent from a central data team and be reviewed if needed. You have a mindset that the analyst can be trusted to be honest with their department leaders about their limitations and the department leaders will be careful to weigh the risks before rolling out customer impacting changes based on any analysis that hasn’t been vetted by the central data team. In practice, department leaders are unlikely to change strategy based on analysis that contradicts their experience and instinct, so the use of data is a step forward and carries a low risk of steering the organization in the wrong direction without proper due diligence.

5. Improve

There is a strong need in big data initiatives to focus on improvement as a consistent part of the work. Despite what you may have heard, big data systems are not cheap and the amount of data that can be collected is growing every year. Effective data initiatives are less about setting up a perfect process to organize people and more about building systems to automate the steps to collect data and make it available. For most organizations, taking an iterative approach to these steps is the best chance for success since results will be evident in a reasonable timeframe. You should focus on a single use case initially with a plan to refactor the process as you expand to more use cases. It is critical that you make time for the effort to refactor after the first project is deployed to production. This will allow your engineers to spend less time trying to develop perfect automated solutions for the ‘Collect’ and ‘Make Available’ steps and provide a chance for feedback and adjustments in the ‘Train’ and ‘Trust’ steps. Data engineers will find solutions to the collect and make available steps, but these alone will not provide the impact you desire without strong organizational effort to train and trust. All of the prior steps are opportunities to learn and most organizations will need to make adjustments to be successful regardless of how much time they spend on the planning and design.

How To: Python Logging

I shared previously in my post ETL Tool vs Custom Code that I use Python for developing data flows.  In my journey of writing production data flows and applications with Python I dove in head first and didn’t learn some of the useful practices until a year or two in to my journey.  But lucky for you, I am going to share some of these foundational Python concepts in various “How To” posts, starting today.  Logging is a major component I procrastinated on learning but think any new comer should learn in the first week.  To help anyone else getting started (or who is awesome at Python but still is using print every other line) let’s look at both a basic logging example (minimum expected) plus a real world example (recommended).

To start, the basic example involves adding these lines at the beginning of your code:

import logging
logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)

Once this is set up, you can add log calls like this:

log.info("Logging is turned on")

This is the minimum amount of logging that I recommend, even if you are doing adhoc scripts like we often do in data engineering and data science. In the code recommended above, it will still print to your screen but if you decide to deploy your adhoc script as a job or module you now have the option to turn off those messages. The first thing you will notice is that most of your statements like print(“Opened connection”) which are really meant for logging and debugging start to become log.info(“Opened connection”). You should play with different levels of logging as well. Normally I set level=logging.DEBUG while I’m developing and then have statements like log.debug(“Load table using query: %s”, query) in order to keep an eye on if my code is doing what I meant for it to do.

Now, if you want to take it up a notch and apply your own format consistently across your Python applications, you should take the next step of creating a log configuration file. For starters, just save a file name logging.conf into your project directory and populate it with this text:

[formatters]
keys=logfileformatter

[logger_root]
handlers=logfile

[formatter_logfileformatter]
format=%(levelno)s:%(asctime)s:%(name)s:sample-etl:%(message)s

[handler_logfile]
class=handlers.RotatingFileHandler
args=('/var/log/sample_etl/sample_etl.log', 'a', 2000000, 10)
formatter=logfileformatter

Now we have a file that specifies a custom format in the [formatter_logfileformatter] section.  We set up a handler in the [handler_logfile] section that will write to a file and rotate files.  After enough entries happen, it will truncate the oldest log file.
The next step is to tell your Python code to use this format, which you can do by replacing the first piece of log setup code I shared with this:

import logging.config
logging.config.fileConfig('logging.conf')
log = logging.getLogger(__name__)
log.setLevel(logging.DEBUG) # Use debug to see the most log entries

Now you should try adding a StreamHandler that uses sys.stderror so that you can see the logging in your console also. This is something I add to my development environments to save me effort of tailing the actual files as I develop. To get you started, check out the documentation. If you get stuck, leave a comment.

In closing, logging is important and you should use it right now and all the time unless you have a good reason to ignore this advice. Saving time is not a good reason…this stuff is easy. Granted, this message is coming from the guy that wrote a lot of production Python code before taking the time to get his head around how to properly log. But you can be better than me :).

ETL tool vs custom code

I used to help sell an ETL tool that had a graphical drag and drop interface. I really did like the tool because with a little training you could quickly build a basic ETL job. I still like these types of tools if pulling data from a database that has a static or slow changing data model. However, at my current company we do not use an ETL tool because I suggested we are better off without one. While it is possible we will use an ETL tool one day for certain tasks, we currently prefer Python and SQL to move and process our data. The primary reasons we went down this path is for increased flexibility, portability, and maintainability.

One of my top regrets leading a Data Warehousing team that used an ETL tool is that we felt limited by what the tool was capable of doing. Elements of ETL that were not as important when the team started were not easily supported by the tool. The best example of this is reading from a RESTful API. Another was working with JSON data as a source. With these examples we could easily find a tool that can do this for us, but what else will we encounter in the future? At my current company we are consuming RabbitMQ messages and using Kafka for data streaming, and we would not have known to plan for a tool that works well for these use cases. Since we are using Python (and Spark and Scala) we have no limits on what is possible for us to build. There are a lot of libraries that are already built which we can leverage, and we can modify our libraries as new ideas come up rather than being stuck with what a tool provides out of the box. We choose to focus on building a data flow engine in many cases over having one script per table or source. This amount of control over the code that moves data allows us to build up an engine that supports many configurations while keeping the base code backward compatible for data sets already flowing through the system. In many cases we trade off having a longer ramp up period to get our first build working in order to have more flexibility and control down the road, but it cuts down the amount of frustrating rework when systems change.

Another benefit of coding your own ETL is that you can change databases, servers, and data formats without applying changes all over your code base. We have already taken our library for reading SQL Server data and written a similar version that works for Postgres. With how our ETL jobs are set up we just switched out the library import on the relevant scripts and didn’t have to dig in to the logic that was running. I think this leads to better maintainability as well since if you find something is taking up a lot of your time you just build it into the overall system. I remember having alerts at 2 am because the metadata of a table had changed and our ETL tool couldn’t load the data without us refreshing the metadata in the job. With most of our python code we can handle new columns added to the source data and either add that column to the destination table or just ignore it until we decide to modify the destination. This really has decreased time spent getting mad at the system administrators who disrupted our morning by adding a new custom field, though there is still plenty of work to do to ease the pain of changed datatypes and renamed columns.

I am sure there are plenty of different tools out there that do every thing you could want to do (at least according to their sales team), but I love the flexibility, control, and maintainability of writing our own applications to move data. It was worked out well for us as we have transitioned to building out a data platform rather than focusing on just tools to load an analytic data warehouse (but that is a topic for another time).

How To: Kafka 0.9 on Mac

Kafka is a distributed messaging system used for streaming data.  It works as a distributed commit log and if you want to really understand why you should use Kafka then it’s worth the time to read this article by Jay Kreps.  Now if you just want to get hands on with Kafka on your laptop, follow these steps from the quick start guide (which should work on linux also for a sandbox environment).  I didn’t hit errors along the way so this is pretty similar to what is in the documentation but I thought its worth sharing as a reference to the actual commands I used and a place for me to reference when I post more articles about working with Kafka.

  1. Go to http://kafka.apache.org/downloads.html and download the version you want.  I chose kafka_2.11-0.9.0.0.tgz.
  2. Follow instructions here for initial setup: http://kafka.apache.org/documentation.html#quickstart
    1. unzip: tar -xzf kafka_2.11-0.9.0.0.tgz
    2. go to folder: cd kafka_2.11.0.9.0.0
    3. start zookeeper: bin/zookeeper-server-start.sh config/zookeeper.properties
    4. open new terminal window and go to folder
    5. start kafka: bin/kafka-server-start.sh config/server.properties
    6. test creating topic: bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
    7. test listing a topic: bin/kafka-topics.sh --list --zookeeper localhost:2181
  3. Follow additional steps to get to multiple brokers since you would never use a single broker setup for a real environment (though for a production cluster there would be some different steps and a server per broker, of course)
    1. copy config: cp config/server.properties config/server-1.properties
    2. edit config/server-1.properties:
      broker.id=1
      listeners=PLAINTEXT://:9093
      log.dir=/tmp/kafka-logs-1
    3. copy config again: cp config/server.properties config/server-2.properties
    4. edit config/server-2.properties:
      broker.id=2
      listeners=PLAINTEXT://:9094
      log.dir=/tmp/kafka-logs-2
    5. keep zookeeper running but stop kafka (CMD+C on the terminal it is running under)
    6. run all 3 brokers as background processes:
      bin/kafka-server-start.sh config/server.properties &
      bin/kafka-server-start.sh config/server-1.properties &
      bin/kafka-server-start.sh config/server-2.properties &
    7. test creating topic with replication factor of 3: bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
    8. might as well publish a message to the topic: bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
      {"value": "Test message 1"}
    9. then test out the consumer: bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

The quick start guide and additional documentation have a lot more info that is worth exploring, but if things went well you now have a local instance to test with.  Congrats!

How To: VirtualBox Shared Folders

When using virtual machines, you will likely want to setup a mapping of a local folder on your computer to a virtual machine folder. This is a good way to move files from your machine onto the VM and vice versa.  Here are steps to set that up with VirtualBox using a Centos image (in this case it is the Cloudera Sandbox VM).

  1. From the VM, go to the VirtualBox menu and choose Devices -> Insert Guest Additions CD Image…VirtualBox Shared Folders 1
  2. If the CD image does not start automatically then select the drive from the file browser and run “autorun.sh”.  This will install the add-ons needed.
  3. Then go to Devices -> Shared Folders -> Shared Folders Settings and setup your folder.  For this example we’ll use a local folder called “installs”.VirtualBox Shared Folders 2VirtualBox Shared Folders 3
  4. Restart virtual machine
  5. You can now find your folder under /media/sf_<foldername> and you’ll probably need elevated permissions.  So for my example the command “sudo ls -l /media/sf_installs” can be used to view files and “sudo cp /media/sf_installs/<filename> ~/” can be used to copy files to a folder local to the VM.VirtualBox Shared Folders 4

Bonus info: Once guest additions are installed you can also setup clipboard sharing to let you copy and paste from your machine to the VM, this is done by going to Devices -> Shared Clipboard and choosing your option (such as Bidirectional).