mapreduce - Check if element is in documents of rdd -
i have such rdd1 in pyspark: (please excuse minor syntax errors):
[(id1,(1,2,3)), (id2,(3,4,5))]
i have rdd2 holding such: (2,3,4).
now want see each element of rdd2 in how many rdd1 sublists occurs, e.g. of expected output rdd (or collected list dont care)
(2, [id1]),(3,[id1,id2]),(4,[id2])
this have far (note rdd2 must first item in line/algorithm)
rdd2.map(lambda x: (x, x in rdd.map(lambda y:y[1])))
even though me give true/false second item of pair tuple live it, not work. failing when trying perform map on rdd2 inside anonymous function of rdd1 map.
any idea how going in right direction?
if rrd2
relatively small (fits in memory):
pairs1 = rdd1.flatmap(lambda (k, vals): ((v, k) v in vals)) vals_set = sc.broadcast(set(rdd2.collect())) (pairs1 .filter(lambda (k, v): k in vals_set.value) .groupbykey())
if not, can take pairs1
previous part , use join:
pairs2 = rdd2.map(lambda x: (x, none)) (pairs2 .leftouterjoin(pairs1) .map(lambda (k, (_, v)): (k, v)) .groupbykey())
as always, if intermediate structure should consider reducebykey
, aggregatebykey
or combinebykey
instead of groupbykey
. if final structure can add .mapvalues(list)
.
finally can try use spark data frames:
df1 = sqlcontext.createdataframe( rdd1.flatmap(lambda (v, keys): ({'k': k, 'v': v} k in keys))) df2 = sqlcontext.createdataframe(rdd2.map(lambda k: {'k': k})) (df1 .join(df2, df1.k == df2.k, 'leftsemi') .map(lambda r: (r.k, r.v)).groupbykey())
Comments
Post a Comment