Skip to content

Commit

Permalink
Allow empty dst field in apply, fix csv sink
Browse files Browse the repository at this point in the history
  • Loading branch information
shwars committed Nov 28, 2018
1 parent 256179d commit c3df1d8
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 4 deletions.
10 changes: 7 additions & 3 deletions mPyPl/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,15 @@ def apply(datastream, src_field, dst_field, func,eval_strategy=None):
"""
Applies a function to the specified field of the stream and stores the result in the specified field. Sample usage:
`[1,2,3] | as_field('f1') | apply('f1','f2',lambda x: x*x) | select_field('f2') | as_list`
If `dst_field` is `None`, function is just executed on the source field(s), and result is not stored.
This is useful when there are side effects.
"""
def applier(x):
x[dst_field] = (lambda : __fnapply(x,src_field,func)) if lazy_strategy(eval_strategy) else __fnapply(x,src_field,func)
if eval_strategy:
x.set_eval_strategy(dst_field,eval_strategy)
r = (lambda : __fnapply(x,src_field,func)) if lazy_strategy(eval_strategy) else __fnapply(x,src_field,func)
if dst_field is not None and dst_field!='':
x[dst_field]=r
if eval_strategy:
x.set_eval_strategy(dst_field,eval_strategy)
return x
return datastream | select(applier)

Expand Down
5 changes: 4 additions & 1 deletion mPyPl/sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@

# Different sinks to consume mdict streams

from pipe import *
import csv
import json

@Pipe
def write_csv(l,filename):
with open(filename,'wb') as f:
with open(filename,'a') as f:
w = csv.writer(f)
for x in l:
w.writerow(x)

@Pipe
def write_json(l,filename):
with open(filename,'w') as f:
f.write(json.dumps(l))

0 comments on commit c3df1d8

Please sign in to comment.