starksummer 2019-03-31
FindDatainDynamoDBinSpark
Igetasmalltaskrecently,inourDynamoDB,thereisonecolumninonetable,atfirstweputbaddataintablesayingextensionasNumber,laterweknowitshouldbeString.
ButthatendupwehavesomeNumberandsomeStringinthesamecolumn.WeneedtofindoutalltheNumberonesandupdatethemtoString.
Hereismysteps.
FirstofallIdoascan
>awsdynamodbscan--table-namesillycat_device-stage-devicePairingInfo--query"Items
.[extension.N]"--outputtext>./extensionnumber.csvInthefileextensionnumber.csv,Iwillhavedatasimilartoextension
None
None
None
None
None
None
243074
IneedfilteroutalltheNoneones,thenIgettheNumberones.
ThenIdumpthewholetable
>export-dynamodb-tsillycat_device-stage-devicePairingInfo-fcsv-odevicepairinginfodb.csv
Putthese2tablesinSpark,doajoinandselect,IfoundoutalltheoneswithNumber.
InSpark,wecandirectlyreadDynamoDBiftheydonothavethiskindofNumber/Stringconflicts.
importcom.github.traviscrawford.spark.dynamodb._
%spark.dep
z.load("mysql:mysql-connector-java:5.1.47")
z.load("com.github.traviscrawford:spark-dynamodb:0.0.13”)
valaccountDF=sqlContext.read.dynamodb("us-west-1",“sillycat_device-stage-devicePairingInfo")
accountDF.printSchema()
accountDF.registerTempTable("devicepairing")
OrloadtheCSVfile
valdevicePairingDF=sqlContext.read.format("csv")
.option("header","true")
.option("inferSchema","true")
.load("hdfs://localhost:9000/convertdevicepairingextension/devicepairinginfodb.csv")
devicePairingDF.printSchema()
devicePairingDF.createOrReplaceTempView("devicepairing2")
LoadthesecondNumberfile
valextensionRawDF=sqlContext.read.format("csv")
.option("header","true")
.option("inferSchema","true")
.load("hdfs://localhost:9000/convertdevicepairingextension/extensionnumber2.csv")
valextensionRaw1DF=extensionRawDF.toDF(extensionRawDF.columnsmap(_.toLowerCase):_*)
valextensionDF=extensionRaw1DF.columns.foldLeft(extensionRaw1DF)((curr,n)=>curr.withColumnRenamed(n,n.replaceAll("\\s","_")))
extensionDF.printSchema()
extensionDF.createOrReplaceTempView("extension”)
Jointhe2tables
%sql
selectt1.*fromdevicepairing2t1,extensiont2wheret1.extension=t2.extension
OutputtheJSONfile
valextensionUpdateDF=sqlContext.sql("""
selectt1.*fromdevicepairing2t1,extensiont2wheret1.extension=t2.extension
""")
extensionUpdateDF.show(2)
extensionUpdateDF.repartition(1).write.json("hdfs://localhost:9000/convertdevicepairingextension/extension_to_update")
References: