I needed to parse server logs and create Spark DataFrames to query information from the query string parameters. My naive version kept throwing errors about mismatched number of fields in schema and those in the row being queried.

It turns out I was dealing with over 350 different query string params across the logs. This could change over time and there was no way I was going to add these programmatically by hand.

The technique I used was to in the first pass, parse out all the various parameters and then as a second job, make sure that every row had all the fields that were present. I am not sure if there is a different Spark way of doing this.

Here is what worked in my particular case.

Parse the logs normally using a regex to split the fields and parameters normally

from pyspark.sql import SQLContext, Row
from pyspark.sql.types import *
from pyspark import SparkContext

sc = SparkContext("local", "Parse Logs")
sqlContext = SQLContext(sc)

def parseSchema(line):
 return (' '.join(line.asDict().viewkeys())).split()
def parseLogs():
    """ Read and parse log file """
    parsed_logs = (sc
                   .textFile(logFile)
                   .map(parseServerLogLine)
                   .cache())
    schema = (access_logs
                .flatMap(parseSchema)
                .map(lambda s: (s,1))
                .reduceByKey(lambda a, b: a + b)
                .cache())
    return parsed_logs, schema

Make sure the parsed logs are cached by using the cache() directive. The parseSchema method takes each key value pair in the parsed_logs RDD Row.

The schema RDD accumulates all possible parameter keys as it processes each line of the log file, the reduceByKey action builds the final list with counts of occurences of each parameter.

As a next step - once we have all the possible keys parsed out, we create a dictionary of all possible parameters via dictionary comprehension.

parsed_logs, access_logs, failed_logs, schema = parseLogs()
schemaDict = {pair[0]: '' for pair in schema.collect()}

The next step is to modify the log Rows so that each row has all the possible elements, albeit empty for ones that don’t exist in that particular row.

By this point we have (1) all the rows parsed, rows have a variable number of parameters and (2) the entire schema in a dicitonary. The tricky bit here is to realize that since Spark is splitting the work to multiple workers on different Hadoop nodes, the schema will need to be somehow shared. The Spark way of doing this is via BroadcastVars. These are read-only serializable values that are cached on each worker node. To do this we create a new BroadcastVar with with schema dictionary we created above.


broadcastVar = sc.broadcast(schemaDict)

The final job is to enrich the log Rows with all the missing parameters so that the DataFrame can be created.

def enrichRow(row):
    """Add missing fields for DataFrame to work
    """
    rowdata = row.asDict()
    newRow = broadcastVar.value.copy()
    newRow.update(rowdata)
    return (Row(**newRow))

def finalLogs():
    final_logs = (access_logs
                .map(enrichRow)
                .cache())
    return final_logs

final_logs = finalLogs()

At this point we have (1) The parsed rows where every row has all the possible parameters ready to create a DataFrame.

schemaAccess = sqlContext.createDataFrame(final_logs)
schemaAccess.printSchema()

schemaAccess.registerTempTable("access")
schemaAccess.show()

res = sqlContext.sql("select client_identd, endpoint, count(*) from access group by client_identd, endpoint order by count(*) desc")

You will notice that we are letting Spark infer the schema itself by parsing the first row typically. If there are Types that are specific, you will need to create those as StructType and StructField constructs.

Note that this not production ready code and mostly explains the concept behind creating DataFrames from data that may not be complete.