Spark需要包含
org.postgresql:postgresql
对应版本的链接库,我们使用的是org.postgresql:postgresql:9.4-1201-jdbc41
版本,通过Cloudera Manager设置了Spark的spark.jars.packages
参数,目前包含的库有spark.jars.packages=org.apache.commons:commons-csv:1.2,com.databricks:spark-csv_2.10:1.4.0,org.mongodb.spark:mongo-spark-connector_2.10:1.0.0,org.postgresql:postgresql:9.4-1201-jdbc41
调用JDBC库从PostgreSQL中读取Dataframe形式的数据
df <- read.df(sqlContext, source="jdbc", url="jdbc:postgresql://Master:5432/r_test", dbtable="r_test_table", "driver"="org.postgresql.Driver", "user"="postgres", "password"="163700gf")
其中
Master:5432
是目前postgresql的ip和端口,r_test
是测试用的database,r_test_table
是r_test
中的一个表,user
和password
是数据库的帐号和密码。
获取到df以后,就可以输出内容了。> show(df) DataFrame[id:int, name:string, count:int] > collect(df) id name count 1 1 a1 1 2 2 a2 2 3 3 a3 3 4 4 a4 4
调用JDBC库将Dataframe写入PostgreSQL数据库中
> write.df(df, path="NULL", source="jdbc", url="jdbc:postgresql://Master:5432/r_test", "dbtable"="r_test_table", "driver"="org.postgresql.Driver", "user"="postgres", "password"="163700gf", mode="append") Error in invokeJava(isStatic = FALSE, objId$id, methodName, ...) : java.lang.RuntimeException: org.apache.spark.sql.execution.datasources.jdbc.DefaultSource does not allow create table as select. at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:259) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148) at org.apache.spark.sql.DataFrame.save(DataFrame.scala:2027) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.api.r.RBackendHandler.handleMethodCall(RBackendHandler.scala:141) at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:86) at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:38) at io.netty.channel.SimpleChannelIn
重新将数据append到表中,提示错误,可以根据Mail Archive得知是Spark目前的BUG,已经在Spark 2.0中修复。