Skip to content

Commit

Permalink
feat(udf): support user-defined table function (UDTF) (#8255)
Browse files Browse the repository at this point in the history
Signed-off-by: Runji Wang <wangrunji0408@163.com>
Co-authored-by: xxchan <xxchan22f@gmail.com>
  • Loading branch information
wangrunji0408 and xxchan committed Mar 15, 2023
1 parent 61191c2 commit f92d7f6
Show file tree
Hide file tree
Showing 24 changed files with 676 additions and 63 deletions.
121 changes: 113 additions & 8 deletions dashboard/proto/gen/catalog.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

73 changes: 66 additions & 7 deletions dashboard/proto/gen/expr.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 27 additions & 0 deletions e2e_test/udf/python.slt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ create function gcd(int, int, int) returns int language python as gcd3 using lin
statement error exists
create function gcd(int, int) returns int language python as gcd using link 'http://localhost:8815';

# Create a table function.
statement ok
create function series(int) returns table (x int) language python as series using link 'http://localhost:8815';

statement ok
create function series2(int) returns table (x int, s varchar) language python as series2 using link 'http://localhost:8815';

query I
select int_42();
----
Expand All @@ -39,6 +46,26 @@ select gcd(25, 15, 3);
----
1

query I
select series(5);
----
0
1
2
3
4

# FIXME: support table function with multiple columns
# query IT
# select series2(5);
# ----
# (0,0)
# (1,1)
# (2,2)
# (3,3)
# (4,4)


# TODO: drop function without arguments

# # Drop a function but ambiguous.
Expand Down
17 changes: 16 additions & 1 deletion e2e_test/udf/test.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import sys
from typing import Iterator
sys.path.append('src/udf/python') # noqa

from risingwave.udf import udf, UdfServer
from risingwave.udf import udf, udtf, UdfServer


@udf(input_types=[], result_type='INT')
Expand All @@ -21,9 +22,23 @@ def gcd3(x: int, y: int, z: int) -> int:
return gcd(gcd(x, y), z)


@udtf(input_types='INT', result_types='INT')
def series(n: int) -> Iterator[int]:
for i in range(n):
yield i


@udtf(input_types=['INT'], result_types=['INT', 'VARCHAR'])
def series2(n: int) -> Iterator[tuple[int, str]]:
for i in range(n):
yield i, str(i)


if __name__ == '__main__':
server = UdfServer()
server.add_function(int_42)
server.add_function(gcd)
server.add_function(gcd3)
server.add_function(series)
server.add_function(series2)
server.serve()
Loading

0 comments on commit f92d7f6

Please sign in to comment.