Some Useful PySpark Commands to Know When Working with Databricks


I had to figure out a bunch of things for a recent Azure Databricks hobby project I was working  on. Required a whole bunch of Google searches, and quite some time, so thought I would capture some of the useful things I learned during the process:

This was the overall architecture:

image

  • Creating a ‘standard’ CSV file:

    When working with CSV files, the writer creates multiple files even when working with relatively small-sized files. Sometimes you just want a straightforward csv file that you can easily open. This is a routine that does that:
def WriteCsvToLocation(dataframe, location, filename): 

    dataframe.coalesce(1).write.option("header","true").option("escape", "\"").mode("overwrite").format("csv").save(location + "tmp")

    # write multi-part file to temp location, then move .csv to desired location. 
    fileNames = dbutils.fs.ls(location + "tmp")
    name = ''
    for fileName in fileNames:
        if fileName.name.endswith('.csv'):
            name = fileName.name

    dbutils.fs.cp(location + "tmp/" + name, location + filename + ".csv")
    dbutils.fs.rm(location + "tmp", recurse = True)
  • Common functions:

    You can include all common functions and file location details in a file that you can then include in other files. For instance, I have a file called ‘CommonFunctions’ which has all the location details, and other shared functions.

image

It’s in the same workspace as other files:

image

You can then call the common file at the top of a workbook using:

image

From then on, it’s like calling a standard function, e.g.:

image

  • Creating an empty dataframe row and populating only some values:

I wanted get some data using SQL, and append new rows to an existing dataframe (that has like 25 columns). This is what I used to get inactive projects from a database, and then append them to a dataframe with “Inactive” status:

# df is the dataframe you want to append the new rows to

df # initialized however

dfFromSQL = spark.sql("SELECT * FROM yourTable")

for row in dfFromSQL.collect():
    newRowVals = [f.array(lit(None) * len(df.columns))]
    dfNewRow = spark.createDataFrame(newRowVals, df.schema)
    
    dfNewRow = dfNewRow.withColumn("Institution_Id", lit(row['Institution_Id']))
    dfNewRow = dfNewRow.withColumn("Institution", lit(row['Institution']))
    dfNewRow = dfNewRow.withColumn("IsActiveProject", lit("Inactive"))

    df = df.union(dfNewRow)


  • Creating a temp SQL table from a dataframe

Sometimes it’s just easier to work with SQL when you need to do joins and filtering together with GROUP BYs and HAVING clauses. You can easily convert a dataframe to a SQL temp table using createOrReplaceTempView. E.g.:

dfInstitution.createOrReplaceTempView('tbl_dfInstitution')

From here on, you can use spark.sql to execute SQL commands. E.g.:

spark.sql("select count(distinct Institution_Name) from tbl_dfInstitution").show()

queries the tbl_dfInstitution table that was created in the previous step. You can create multiple tables using createOrReplaceTempView and then write standard SQL against these. To save back to a dataframe, simply use:

dfFromSQL = spark.sql(“SELECT * FROM yourTable”)

Leave a comment

Blog at WordPress.com.

Up ↑