@@ -37,18 +37,6 @@ trait Enrichable extends Serializable with Copyable[Enrichable] with JsonConvert
37
37
def get : Any
38
38
def typed [T ]: TypedEnrichable [T ] = this .asInstanceOf [TypedEnrichable [T ]]
39
39
40
- override def copy (): Enrichable = copy(Map .empty)
41
-
42
- protected [enrich] def copy (cloned : Map [String , Enrichable ]): Enrichable = {
43
- val copy = super .copy()
44
- copy._enrichments = (_enrichments.keySet ++ cloned.keySet).map{field =>
45
- val enrichable = cloned.getOrElse(field, _enrichments(field).copy())
46
- enrichable.setHierarchy(copy, field, _root)
47
- (field, enrichable)
48
- }.toMap
49
- copy
50
- }
51
-
52
40
private var excludeFromOutput : Option [Boolean ] = None
53
41
def isExcludedFromOutput : Boolean = excludeFromOutput match {
54
42
case Some (value) => value
@@ -63,19 +51,19 @@ trait Enrichable extends Serializable with Copyable[Enrichable] with JsonConvert
63
51
case None => excludeFromOutput = Some (value)
64
52
}
65
53
66
- private var _field : String = _
54
+ @ transient private var _field : String = _
67
55
def field : String = _field
68
56
69
- private var _parent : Enrichable = _
57
+ @ transient private var _parent : Enrichable = _
70
58
def parent [A ]: TypedEnrichable [A ] = _parent.asInstanceOf [TypedEnrichable [A ]]
71
59
72
- private var _root : EnrichRoot = _
60
+ @ transient private var _root : EnrichRoot = _
73
61
def root [A ]: TypedEnrichRoot [A ] = _root.asInstanceOf [TypedEnrichRoot [A ]]
74
62
75
63
def path : Seq [String ] = if (_parent == null ) Seq .empty else _parent.path :+ _field
76
64
def chain : Seq [Enrichable ] = if (_parent == null ) Seq (this ) else _parent.chain :+ this
77
65
78
- protected [enrich] def setHierarchy (parent : Enrichable , field : String , root : EnrichRoot = null ): Unit = {
66
+ protected [enrich] def setHierarchy (parent : Enrichable , field : String , root : EnrichRoot ): Unit = {
79
67
_field = field
80
68
_parent = parent
81
69
_root = root
@@ -92,21 +80,29 @@ trait Enrichable extends Serializable with Copyable[Enrichable] with JsonConvert
92
80
clone
93
81
}
94
82
95
- def enrichment [D : ClassTag ](key : String ): Option [TypedEnrichable [D ]] = _enrichments.get(field(key)).map(_.asInstanceOf [TypedEnrichable [D ]])
83
+ def enrichment [D : ClassTag ](key : String ): Option [TypedEnrichable [D ]] = {
84
+ val fieldname = field(key)
85
+ _enrichments.get(fieldname) match {
86
+ case Some (enrichable) =>
87
+ enrichable.setHierarchy(this , fieldname, root)
88
+ Some (enrichable.asInstanceOf [TypedEnrichable [D ]])
89
+ case None => None
90
+ }
91
+ }
96
92
97
93
def enrich (fieldName : String , enrichment : Enrichable ): Enrichable = {
98
- val clone = copy(Map (fieldName -> enrichment))
94
+ val clone = copy()
95
+ clone._enrichments = clone._enrichments.updated(fieldName, enrichment)
99
96
clone._lastException = enrichment._lastException
100
97
clone._aliases -= fieldName
101
98
clone
102
99
}
103
100
104
101
def enrichValue [Value ](fieldName : String , value : Value ): Enrichable = {
105
- val enrichable = SingleValueEnrichable [Value ](value, this , fieldName, _root)
106
- enrich(fieldName, enrichable)
102
+ enrich(fieldName, SingleValueEnrichable [Value ](value))
107
103
}
108
104
109
- private [enrich] def enrich [D ](func : EnrichFunc [_, D ], excludeFromOutput : Boolean = false ): Enrichable = {
105
+ private def enrich [D ](func : EnrichFunc [_, D ], excludeFromOutput : Boolean = false ): Enrichable = {
110
106
if (! func.exists(this )) {
111
107
val derivatives = new Derivatives (func.fields, func.aliases)
112
108
var lastException : Option [Exception ] = None
@@ -118,20 +114,28 @@ trait Enrichable extends Serializable with Copyable[Enrichable] with JsonConvert
118
114
// if (ArchiveSpark.conf.catchExceptions) lastException = Some(exception)
119
115
// else throw exception
120
116
}
121
- val clone = copy(derivatives.get )
117
+ val clone = copy()
122
118
clone._lastException = lastException
123
119
clone._aliases ++= derivatives.aliases
124
120
for ((field, enrichment) <- derivatives.get) {
125
121
enrichment.excludeFromOutput(excludeFromOutput, overwrite = false )
122
+ clone._enrichments = clone._enrichments.updated(field, enrichment)
126
123
clone._aliases -= field
127
124
}
128
125
clone
129
- } else if (! excludeFromOutput && func.fields.exists(f => enrichment(f).get.isExcludedFromOutput)) {
130
- val clone = copy()
131
- for (field <- func.fields if enrichment(field).get.isExcludedFromOutput) {
132
- clone.enrichment(field).get.excludeFromOutput(value = false )
126
+ } else if (! excludeFromOutput) {
127
+ val excluded = func.fields.map(enrichment).filter(_.isDefined).map(_.get).filter(_.isExcludedFromOutput)
128
+ if (excluded.nonEmpty) {
129
+ val clone = copy()
130
+ for (enrichment <- excluded) {
131
+ val enrichmentClone = enrichment.copy()
132
+ enrichmentClone.excludeFromOutput(value = false )
133
+ clone._enrichments = clone._enrichments.updated(enrichment.field, enrichmentClone)
134
+ }
135
+ clone
136
+ } else {
137
+ this
133
138
}
134
- clone
135
139
} else {
136
140
this
137
141
}
@@ -158,10 +162,11 @@ trait Enrichable extends Serializable with Copyable[Enrichable] with JsonConvert
158
162
val remaining = path.tail
159
163
enrichment(remaining.head) match {
160
164
case Some (child) => child(remaining.tail)
161
- case None => for (child <- _enrichments.values) {
162
- val target : Option [TypedEnrichable [D ]] = child[D ](path)
163
- if (target.isDefined) return target
164
- }
165
+ case None =>
166
+ for (child <- _enrichments.values) {
167
+ val target : Option [TypedEnrichable [D ]] = child[D ](path)
168
+ if (target.isDefined) return target
169
+ }
165
170
None
166
171
}
167
172
} else if (path.head.matches(" \\ [\\ d+\\ ]" )) {
0 commit comments