Change offset column from Long Type to String Type (#253)
* Fix offset type in EventDataSchema * fix type in readme
This commit is contained in:
Родитель
e16631bc4e
Коммит
da175a86a5
|
@ -72,7 +72,7 @@ private[eventhubs] class EventHubsRelation(override val sqlContext: SQLContext,
|
|||
ed =>
|
||||
InternalRow(
|
||||
ed.getBytes,
|
||||
ed.getSystemProperties.getOffset.toLong,
|
||||
UTF8String.fromString(ed.getSystemProperties.getOffset),
|
||||
ed.getSystemProperties.getSequenceNumber,
|
||||
DateTimeUtils.fromJavaTimestamp(
|
||||
new java.sql.Timestamp(ed.getSystemProperties.getEnqueuedTime.toEpochMilli)),
|
||||
|
|
|
@ -295,7 +295,7 @@ private[spark] class EventHubsSource private[eventhubs] (sqlContext: SQLContext,
|
|||
val rdd = new EventHubsRDD(sc, ehConf, offsetRanges, clientFactory).map { ed =>
|
||||
InternalRow(
|
||||
ed.getBytes,
|
||||
ed.getSystemProperties.getOffset.toLong,
|
||||
UTF8String.fromString(ed.getSystemProperties.getOffset),
|
||||
ed.getSystemProperties.getSequenceNumber,
|
||||
DateTimeUtils.fromJavaTimestamp(
|
||||
new java.sql.Timestamp(ed.getSystemProperties.getEnqueuedTime.toEpochMilli)),
|
||||
|
|
|
@ -151,7 +151,7 @@ private[sql] object EventHubsSourceProvider extends Serializable {
|
|||
StructType(
|
||||
Seq(
|
||||
StructField("body", BinaryType),
|
||||
StructField("offset", LongType),
|
||||
StructField("offset", StringType),
|
||||
StructField("sequenceNumber", LongType),
|
||||
StructField("enqueuedTime", TimestampType),
|
||||
StructField("publisher", StringType),
|
||||
|
|
|
@ -550,7 +550,7 @@ class EventHubsSourceSuite extends EventHubsSourceTest {
|
|||
assert(rows.length === 1, s"Unexpected results: ${rows.toList}")
|
||||
val row = rows(0)
|
||||
assert(row.getAs[Array[Byte]]("body") === "1".getBytes(UTF_8), s"Unexpected results: $row")
|
||||
assert(row.getAs[Long]("offset") === 0L, s"Unexpected results: $row")
|
||||
assert(row.getAs[String]("offset") === "0", s"Unexpected results: $row")
|
||||
assert(row.getAs[Long]("sequenceNumber") === 0, s"Unexpected results: $row")
|
||||
assert(row.getAs[String]("publisher") === null, s"Unexpected results: $row")
|
||||
assert(row.getAs[String]("partitionKey") === null, s"Unexpected results: $row")
|
||||
|
|
|
@ -243,7 +243,7 @@ Each row in the source has the following schema:
|
|||
| Column | Type |
|
||||
| ------ | ---- |
|
||||
| body | binary |
|
||||
| offset | long |
|
||||
| offset | string |
|
||||
| sequenceNumber | long |
|
||||
| enqueuedTime | timestamp |
|
||||
| publisher | string |
|
||||
|
|
Загрузка…
Ссылка в новой задаче