SparkR JDBC 连接 PostgreSQL - All About Free

  • Spark需要包含org.postgresql:postgresql对应版本的链接库,我们使用的是org.postgresql:postgresql:9.4-1201-jdbc41版本,通过Cloudera Manager设置了Spark的spark.jars.packages参数,目前包含的库有

    spark.jars.packages=org.apache.commons:commons-csv:1.2,com.databricks:spark-csv_2.10:1.4.0,org.mongodb.spark:mongo-spark-connector_2.10:1.0.0,org.postgresql:postgresql:9.4-1201-jdbc41
    
  • 调用JDBC库从PostgreSQL中读取Dataframe形式的数据

    df <- read.df(sqlContext, source="jdbc", url="jdbc:postgresql://Master:5432/r_test", dbtable="r_test_table", "driver"="org.postgresql.Driver", "user"="postgres", "password"="163700gf")
    
    

    其中Master:5432是目前postgresql的ip和端口,r_test是测试用的database,r_test_tabler_test中的一个表,userpassword是数据库的帐号和密码。
    获取到df以后,就可以输出内容了。

    > show(df)
    DataFrame[id:int, name:string, count:int]
    
    > collect(df)
      id name count
    1  1   a1     1
    2  2   a2     2
    3  3   a3     3
    4  4   a4     4
    
  • 调用JDBC库将Dataframe写入PostgreSQL数据库中

     > write.df(df, path="NULL", source="jdbc", url="jdbc:postgresql://Master:5432/r_test", "dbtable"="r_test_table", "driver"="org.postgresql.Driver", "user"="postgres", "password"="163700gf", mode="append")
    
    Error in invokeJava(isStatic = FALSE, objId$id, methodName, ...) : 
      java.lang.RuntimeException: org.apache.spark.sql.execution.datasources.jdbc.DefaultSource does not allow create table as select.
        at scala.sys.package$.error(package.scala:27)
        at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:259)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)
        at org.apache.spark.sql.DataFrame.save(DataFrame.scala:2027)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.api.r.RBackendHandler.handleMethodCall(RBackendHandler.scala:141)
        at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:86)
        at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:38)
        at io.netty.channel.SimpleChannelIn
    

重新将数据append到表中,提示错误,可以根据Mail Archive得知是Spark目前的BUG,已经在Spark 2.0中修复。

Free /
Published under (CC) BY-NC-SA in categories technology