Wednesday, March 6, 2019

Using Apache Spark to query a remote authenticated MongoDB server

[1]  First, download and extract Spark

$ wget http://apache.spinellicreations.com/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz 
$ tar -xf spark-2.4.0-bin-hadoop2.7.tgz

$ cd spark-2.4.0-bin-hadoop2.7



Create spark-defaults.conf by copying  spark-defaults.conf.template in conf/

Add the below line in the conf file.

spark.debug.maxToStringFields=1000


[2] Now connect to Mongo in a remote server.

We use the MongoDB Spark Connector.

First, make sure the Mongo in the remote server has the bindIp set to the value appropriately to the correct local IP (not just localhost). Use the authentication.

root and password below indicate the credentials of your authenticated mongo. 192.168.1.32 is your remote server's (where Mongo is running) private IP. We are reading the oplog.rs collection in the local database. Change these accordingly. Similarly, we are writing the outputs to the database sparkoutput. Change it too as you prefer.

spark-2.4.0-bin-hadoop2.7]$ ./bin/pyspark --conf "spark.mongodb.input.uri=mongodb://root:password@192.168.1.32:27017/local.oplog.rs?readPreference=primaryPreferred"               --conf "spark.mongodb.output.uri=mongodb://root:password@192.168.1.32:27017/sparkoutput"               --packages org.mongodb.spark:mongo-spark-connector_2.11:2.4.0



Python 2.7.5 (default, Oct 30 2018, 23:45:53)
[GCC 4.8.5 20150623 (Red Hat 4.8.5-36)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Ivy Default Cache set to: /home/pkathi2/.ivy2/cache
The jars for the packages stored in: /home/pkathi2/.ivy2/jars
:: loading settings :: url = jar:file:/home/pkathi2/spark-2.4.0-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.mongodb.spark#mongo-spark-connector_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-33a37e02-1a24-498d-9217-e7025eeebd10;1.0
    confs: [default]
    found org.mongodb.spark#mongo-spark-connector_2.11;2.4.0 in central
    found org.mongodb#mongo-java-driver;3.9.0 in central
:: resolution report :: resolve 256ms :: artifacts dl 5ms
    :: modules in use:
    org.mongodb#mongo-java-driver;3.9.0 from central in [default]
    org.mongodb.spark#mongo-spark-connector_2.11;2.4.0 from central in [default]
    ---------------------------------------------------------------------
    |                  |            modules            ||   artifacts   |
    |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
    ---------------------------------------------------------------------
    |      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
    ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-33a37e02-1a24-498d-9217-e7025eeebd10
    confs: [default]
    0 artifacts copied, 2 already retrieved (0kB/6ms)
19/03/06 08:24:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.0
      /_/

Using Python version 2.7.5 (default, Oct 30 2018 23:45:53)
SparkSession available as 'spark'.

>>> from pyspark.sql import SparkSession

>>> my_spark = SparkSession \
...     .builder \
...     .appName("myApp") \
...     .config("spark.mongodb.input.uri", "mongodb://root:password@192.168.1.32:27017/local.oplog.rs?authSource=admin") \
...     .config("spark.mongodb.output.uri", "mongodb://root:password@192.168.1.32:27017/sparkoutput?authSource=admin") \
...     .getOrCreate()


Make sure you are using the correct authentication source (i.e., where you authenticate yourself in the Mongo server).


[3] Perform queries on Mongo collection.

Now you can perform queries on your remote Mongo collection through the Spark instance. For example, the below query finds the schema from the collection.

>>> df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()
>>> df.printSchema()                                                           


root
 |-- h: long (nullable = true)
 |-- ns: string (nullable = true)
 |-- o: struct (nullable = true)
 |    |-- $set: struct (nullable = true)
 |    |    |-- lastUse: timestamp (nullable = true)
 |    |-- $v: integer (nullable = true)

No comments:

Post a Comment

You are welcome to provide your opinions in the comments. Spam comments and comments with random links will be deleted.