Find Data in DynamoDB in Spark

starksummer 2019-03-31

FindDatainDynamoDBinSpark

Igetasmalltaskrecently,inourDynamoDB,thereisonecolumninonetable,atfirstweputbaddataintablesayingextensionasNumber,laterweknowitshouldbeString.

ButthatendupwehavesomeNumberandsomeStringinthesamecolumn.WeneedtofindoutalltheNumberonesandupdatethemtoString.

Hereismysteps.

FirstofallIdoascan

>awsdynamodbscan--table-namesillycat_device-stage-devicePairingInfo--query"Items

.[extension.N]"--outputtext>./extensionnumber.csvInthefileextensionnumber.csv,Iwillhavedatasimilarto

extension

None

None

None

None

None

None

243074

IneedfilteroutalltheNoneones,thenIgettheNumberones.

ThenIdumpthewholetable

>export-dynamodb-tsillycat_device-stage-devicePairingInfo-fcsv-odevicepairinginfodb.csv

Putthese2tablesinSpark,doajoinandselect,IfoundoutalltheoneswithNumber.

InSpark,wecandirectlyreadDynamoDBiftheydonothavethiskindofNumber/Stringconflicts.

importcom.github.traviscrawford.spark.dynamodb._

%spark.dep

z.load("mysql:mysql-connector-java:5.1.47")

z.load("com.github.traviscrawford:spark-dynamodb:0.0.13”)

valaccountDF=sqlContext.read.dynamodb("us-west-1",“sillycat_device-stage-devicePairingInfo")

accountDF.printSchema()

accountDF.registerTempTable("devicepairing")

OrloadtheCSVfile

valdevicePairingDF=sqlContext.read.format("csv")

.option("header","true")

.option("inferSchema","true")

.load("hdfs://localhost:9000/convertdevicepairingextension/devicepairinginfodb.csv")

devicePairingDF.printSchema()

devicePairingDF.createOrReplaceTempView("devicepairing2")

LoadthesecondNumberfile

valextensionRawDF=sqlContext.read.format("csv")

.option("header","true")

.option("inferSchema","true")

.load("hdfs://localhost:9000/convertdevicepairingextension/extensionnumber2.csv")

valextensionRaw1DF=extensionRawDF.toDF(extensionRawDF.columnsmap(_.toLowerCase):_*)

valextensionDF=extensionRaw1DF.columns.foldLeft(extensionRaw1DF)((curr,n)=>curr.withColumnRenamed(n,n.replaceAll("\\s","_")))

extensionDF.printSchema()

extensionDF.createOrReplaceTempView("extension”)

Jointhe2tables

%sql

selectt1.*fromdevicepairing2t1,extensiont2wheret1.extension=t2.extension

OutputtheJSONfile

valextensionUpdateDF=sqlContext.sql("""

selectt1.*fromdevicepairing2t1,extensiont2wheret1.extension=t2.extension

""")

extensionUpdateDF.show(2)

extensionUpdateDF.repartition(1).write.json("hdfs://localhost:9000/convertdevicepairingextension/extension_to_update")

References:

相关推荐