From 952d786314dfb5c8cb77ab6e35aa007ec1c32b54 Mon Sep 17 00:00:00 2001 From: Gildas Lebel Date: Wed, 5 Jun 2024 15:33:05 -0400 Subject: [PATCH 1/4] Fix: Resolve date formats handling --- build/ci/go.mod | 6 +- build/ci/go.sum | 3 + go.mod | 13 +- go.sum | 34 +- pkg/codegen/generators/helpers.go | 6 + pkg/codegen/generators/v2/generator.go | 14 +- pkg/codegen/generators/v2/imports.go | 9 +- .../generators/v2/templates/imports.tmpl | 2 + .../generators/v2/templates/schema_name.tmpl | 4 +- pkg/codegen/generators/v3/generator.go | 15 +- pkg/codegen/generators/v3/imports.go | 9 +- .../generators/v3/templates/imports.tmpl | 2 + pkg/utils/reflect.go | 36 ++ test/v2/issues/222/asyncapi.gen.go | 485 ++++++++++++++++++ test/v2/issues/222/asyncapi.yaml | 23 + test/v2/issues/222/suite_test.go | 36 ++ test/v3/issues/222/asyncapi.gen.go | 485 ++++++++++++++++++ test/v3/issues/222/asyncapi.yaml | 23 + test/v3/issues/222/suite_test.go | 36 ++ 19 files changed, 1199 insertions(+), 42 deletions(-) create mode 100644 pkg/utils/reflect.go create mode 100644 test/v2/issues/222/asyncapi.gen.go create mode 100644 test/v2/issues/222/asyncapi.yaml create mode 100644 test/v2/issues/222/suite_test.go create mode 100644 test/v3/issues/222/asyncapi.gen.go create mode 100644 test/v3/issues/222/asyncapi.yaml create mode 100644 test/v3/issues/222/suite_test.go diff --git a/build/ci/go.mod b/build/ci/go.mod index 5a148588..8dd2cc4c 100644 --- a/build/ci/go.mod +++ b/build/ci/go.mod @@ -29,10 +29,10 @@ require ( github.com/skeema/knownhosts v1.2.1 // indirect github.com/stretchr/testify v1.9.0 // indirect github.com/xanzy/ssh-agent v0.3.3 // indirect - golang.org/x/crypto v0.19.0 // indirect + golang.org/x/crypto v0.23.0 // indirect golang.org/x/mod v0.14.0 // indirect - golang.org/x/net v0.21.0 // indirect - golang.org/x/sys v0.17.0 // indirect + golang.org/x/net v0.25.0 // indirect + golang.org/x/sys v0.20.0 // indirect golang.org/x/tools v0.16.1 // indirect gopkg.in/warnings.v0 v0.1.2 // indirect ) diff --git a/build/ci/go.sum b/build/ci/go.sum index bdb08594..56a6ba53 100644 --- a/build/ci/go.sum +++ b/build/ci/go.sum @@ -88,6 +88,7 @@ golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU golang.org/x/crypto v0.16.0 h1:mMMrFzRSCF0GvB7Ne27XVtVAaXLrPmgPC7/v0tkwHaY= golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= +golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa h1:FRnLl4eNAQl8hwxVVC17teOw8kdjVDVAiFMtgUdTSRQ= golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa/go.mod h1:zk2irFbV9DP96SEBUUAy67IdHUaZuSnrz1n472HUCLE= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= @@ -104,6 +105,7 @@ golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c= golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U= golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= +golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -124,6 +126,7 @@ golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.2.0/go.mod h1:TVmDHMZPmdnySmBfhjOoOdhjzdE1h4u1VwSiw2l1Nuc= diff --git a/go.mod b/go.mod index 090f7e35..d3abd33e 100644 --- a/go.mod +++ b/go.mod @@ -5,18 +5,19 @@ go 1.21 toolchain go1.21.4 require ( + cloud.google.com/go v0.114.0 github.com/fatih/color v1.15.0 github.com/ghodss/yaml v1.0.0 github.com/go-git/go-git/v5 v5.11.0 github.com/go-playground/validator/v10 v10.20.0 github.com/google/go-cmp v0.6.0 - github.com/google/uuid v1.3.1 + github.com/google/uuid v1.6.0 github.com/iancoleman/strcase v0.3.0 github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 github.com/nats-io/nats.go v1.31.0 github.com/segmentio/kafka-go v0.4.42 github.com/spf13/cobra v1.8.0 - github.com/stretchr/testify v1.8.4 + github.com/stretchr/testify v1.9.0 golang.org/x/tools v0.16.1 ) @@ -53,11 +54,11 @@ require ( github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/scram v1.1.2 // indirect github.com/xdg-go/stringprep v1.0.4 // indirect - golang.org/x/crypto v0.19.0 // indirect + golang.org/x/crypto v0.23.0 // indirect golang.org/x/mod v0.14.0 // indirect - golang.org/x/net v0.21.0 // indirect - golang.org/x/sys v0.17.0 // indirect - golang.org/x/text v0.14.0 // indirect + golang.org/x/net v0.25.0 // indirect + golang.org/x/sys v0.20.0 // indirect + golang.org/x/text v0.15.0 // indirect gopkg.in/warnings.v0 v0.1.2 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index ee3dba49..4fb9567f 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +cloud.google.com/go v0.114.0 h1:OIPFAdfrFDFO2ve2U7r/H5SwSbBzEdrBdE7xkgwc+kY= +cloud.google.com/go v0.114.0/go.mod h1:ZV9La5YYxctro1HTPug5lXH/GefROyW8PPD4T8n9J8E= dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk= dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk= github.com/Microsoft/go-winio v0.5.2/go.mod h1:WpS1mjBmmwHBEWmogvA2mj8546UReBk4v8QkMxJ6pZY= @@ -50,8 +52,8 @@ github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= -github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/iancoleman/strcase v0.3.0 h1:nTXanmYxhfFAMjZL34Ov6gkzEsSJZ5DbhxWjvSASxEI= github.com/iancoleman/strcase v0.3.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= @@ -116,8 +118,8 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= -github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/xanzy/ssh-agent v0.3.3 h1:+/15pJfg/RsTxqYcX6fHqOXZwwMP+2VyYWJeWM2qQFM= github.com/xanzy/ssh-agent v0.3.3/go.mod h1:6dzNDKs0J9rVPHPhaGCukekBHKqfl+L3KghI1Bc68Uw= github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= @@ -132,8 +134,8 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.3.1-0.20221117191849-2c476679df9a/go.mod h1:hebNnKkNXi2UzZN1eVRvBB7co0a+JxK6XbPiWVs/3J4= golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU= -golang.org/x/crypto v0.19.0 h1:ENy+Az/9Y1vSrlrvBSyna3PITt4tiZLf7sgCjZBX7Wo= -golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= +golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI= +golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/mod v0.14.0 h1:dGoOF9QVLYng8IHTm7BAyWqCqSheQ5pYWGhzW00YJr0= @@ -146,13 +148,13 @@ golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= -golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4= -golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= +golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= +golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE= -golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -167,15 +169,15 @@ golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y= -golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= +golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.2.0/go.mod h1:TVmDHMZPmdnySmBfhjOoOdhjzdE1h4u1VwSiw2l1Nuc= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U= -golang.org/x/term v0.17.0 h1:mkTF7LCd6WGJNL3K1Ad7kwxNfYAW6a8a8QqtMblp/4U= -golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= +golang.org/x/term v0.20.0 h1:VnkxpohqXaOBYJtBmEppKUG6mXpi+4O6purfc2+sMhw= +golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= @@ -184,8 +186,8 @@ golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= -golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= -golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= +golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= diff --git a/pkg/codegen/generators/helpers.go b/pkg/codegen/generators/helpers.go index 97212660..b452fa88 100644 --- a/pkg/codegen/generators/helpers.go +++ b/pkg/codegen/generators/helpers.go @@ -5,6 +5,7 @@ import ( "strings" "github.com/lerenn/asyncapi-codegen/pkg/asyncapi" + "github.com/lerenn/asyncapi-codegen/pkg/utils" ) func appendDirectiveIfDefined(directives []string, tagName string, value float64) []string { @@ -68,3 +69,8 @@ func appendEnumDirectives[T any](schema asyncapi.Validations[T], directives []st } return directives } + +// DateTimeFormatInSpec recursively iterates to find any Schema, and returns true if their format is "date-time". +func DateTimeFormatInSpec[SPEC any](spec SPEC) bool { + return utils.FieldValueExists(spec, "Format", "date-time") +} diff --git a/pkg/codegen/generators/v2/generator.go b/pkg/codegen/generators/v2/generator.go index 40ad337d..4f0009f8 100644 --- a/pkg/codegen/generators/v2/generator.go +++ b/pkg/codegen/generators/v2/generator.go @@ -55,11 +55,17 @@ func (g Generator) generateImports(opts options.Options) (string, error) { return "", fmt.Errorf("failed to generate custom imports: %w", err) } + var requiredImports []string + if generators.DateTimeFormatInSpec(g.Specification) { + requiredImports = append(requiredImports, "\"cloud.google.com/go/civil\"") + } + return ImportsGenerator{ - PackageName: opts.PackageName, - ModuleVersion: g.ModuleVersion, - ModuleName: g.ModulePath, - CustomImports: imps, + PackageName: opts.PackageName, + ModuleVersion: g.ModuleVersion, + ModuleName: g.ModulePath, + RequiredImports: requiredImports, + CustomImports: imps, }.Generate() } diff --git a/pkg/codegen/generators/v2/imports.go b/pkg/codegen/generators/v2/imports.go index 615bc7a8..8f3beb37 100644 --- a/pkg/codegen/generators/v2/imports.go +++ b/pkg/codegen/generators/v2/imports.go @@ -8,10 +8,11 @@ import ( // to the code, being asyncapi-codegen packages, standard library packages or // custom packages. type ImportsGenerator struct { - PackageName string - ModuleVersion string - ModuleName string - CustomImports []string + PackageName string + ModuleVersion string + ModuleName string + RequiredImports []string + CustomImports []string } // Generate will generate the imports code. diff --git a/pkg/codegen/generators/v2/templates/imports.tmpl b/pkg/codegen/generators/v2/templates/imports.tmpl index 41fba115..b30339b0 100644 --- a/pkg/codegen/generators/v2/templates/imports.tmpl +++ b/pkg/codegen/generators/v2/templates/imports.tmpl @@ -17,6 +17,8 @@ import ( "github.com/google/uuid" "github.com/nats-io/nats.go" + {{ range .RequiredImports }}{{.}} + {{end}} {{ range .CustomImports }}{{.}} {{end}} ) diff --git a/pkg/codegen/generators/v2/templates/schema_name.tmpl b/pkg/codegen/generators/v2/templates/schema_name.tmpl index 6a8b918c..818b5f4f 100644 --- a/pkg/codegen/generators/v2/templates/schema_name.tmpl +++ b/pkg/codegen/generators/v2/templates/schema_name.tmpl @@ -16,7 +16,9 @@ bool {{- /* --------------------------- Type String -------------------------- */ -}} {{- else if eq .Type "string" -}} -{{- if and .Format (or (eq .Format "date") (eq .Format "date-time")) -}} +{{- if and .Format (eq .Format "date") -}} +civil.Date +{{- else if and .Format (eq .Format "date-time") -}} time.Time {{- else -}} string diff --git a/pkg/codegen/generators/v3/generator.go b/pkg/codegen/generators/v3/generator.go index b2533f79..ff43ce05 100644 --- a/pkg/codegen/generators/v3/generator.go +++ b/pkg/codegen/generators/v3/generator.go @@ -55,11 +55,18 @@ func (g Generator) generateImports(opts options.Options) (string, error) { return "", fmt.Errorf("failed to generate custom imports: %w", err) } + var requiredImports []string + + if generators.DateTimeFormatInSpec(g.Specification) { + requiredImports = append(requiredImports, "\"cloud.google.com/go/civil\"") + } + return ImportsGenerator{ - PackageName: opts.PackageName, - ModuleVersion: g.ModuleVersion, - ModuleName: g.ModulePath, - CustomImports: imps, + PackageName: opts.PackageName, + ModuleVersion: g.ModuleVersion, + ModuleName: g.ModulePath, + RequiredImports: requiredImports, + CustomImports: imps, }.Generate() } diff --git a/pkg/codegen/generators/v3/imports.go b/pkg/codegen/generators/v3/imports.go index 3a4cf467..fb978ba9 100644 --- a/pkg/codegen/generators/v3/imports.go +++ b/pkg/codegen/generators/v3/imports.go @@ -8,10 +8,11 @@ import ( // to the code, being asyncapi-codegen packages, standard library packages or // custom packages. type ImportsGenerator struct { - PackageName string - ModuleVersion string - ModuleName string - CustomImports []string + PackageName string + ModuleVersion string + ModuleName string + RequiredImports []string + CustomImports []string } // Generate will generate the imports code. diff --git a/pkg/codegen/generators/v3/templates/imports.tmpl b/pkg/codegen/generators/v3/templates/imports.tmpl index 41fba115..b30339b0 100644 --- a/pkg/codegen/generators/v3/templates/imports.tmpl +++ b/pkg/codegen/generators/v3/templates/imports.tmpl @@ -17,6 +17,8 @@ import ( "github.com/google/uuid" "github.com/nats-io/nats.go" + {{ range .RequiredImports }}{{.}} + {{end}} {{ range .CustomImports }}{{.}} {{end}} ) diff --git a/pkg/utils/reflect.go b/pkg/utils/reflect.go new file mode 100644 index 00000000..bf8d2290 --- /dev/null +++ b/pkg/utils/reflect.go @@ -0,0 +1,36 @@ +package utils + +import "reflect" + +// FieldValueExists uses reflection to check if any fields with the given name have the given value +// The is heavily adapted from https://stackoverflow.com/a/38407429 +func FieldValueExists[T comparable](v interface{}, name string, expectedValue T) bool { + queue := []reflect.Value{reflect.ValueOf(v)} + for len(queue) > 0 { + v := queue[0] + queue = queue[1:] + + for v.Kind() == reflect.Ptr { + v = v.Elem() + } + + // ignore if this is not a struct + if v.Kind() != reflect.Struct { + continue + } + // iterate through fields looking for match on name + t := v.Type() + for i := 0; i < v.NumField(); i++ { + if t.Field(i).Name == name { + // found it! + + if val, ok := v.Field(i).Interface().(T); ok { + return val == expectedValue + } + } + // push field to queue + queue = append(queue, v.Field(i)) + } + } + return false +} diff --git a/test/v2/issues/222/asyncapi.gen.go b/test/v2/issues/222/asyncapi.gen.go new file mode 100644 index 00000000..ca58222a --- /dev/null +++ b/test/v2/issues/222/asyncapi.gen.go @@ -0,0 +1,485 @@ +// Package "issue222" provides primitives to interact with the AsyncAPI specification. +// +// Code generated by github.com/lerenn/asyncapi-codegen version (devel) DO NOT EDIT. +package issue222 + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "cloud.google.com/go/civil" + "github.com/lerenn/asyncapi-codegen/pkg/extensions" +) + +// AppController is the structure that provides publishing capabilities to the +// developer and and connect the broker with the App +type AppController struct { + controller +} + +// NewAppController links the App to the broker +func NewAppController(bc extensions.BrokerController, options ...ControllerOption) (*AppController, error) { + // Check if broker controller has been provided + if bc == nil { + return nil, extensions.ErrNilBrokerController + } + + // Create default controller + controller := controller{ + broker: bc, + subscriptions: make(map[string]extensions.BrokerChannelSubscription), + logger: extensions.DummyLogger{}, + middlewares: make([]extensions.Middleware, 0), + errorHandler: extensions.DefaultErrorHandler(), + } + + // Apply options + for _, option := range options { + option(&controller) + } + + return &AppController{controller: controller}, nil +} + +func (c AppController) wrapMiddlewares( + middlewares []extensions.Middleware, + callback extensions.NextMiddleware, +) func(ctx context.Context, msg *extensions.BrokerMessage) error { + var called bool + + // If there is no more middleware + if len(middlewares) == 0 { + return func(ctx context.Context, msg *extensions.BrokerMessage) error { + // Call the callback if it exists and it has not been called already + if callback != nil && !called { + called = true + return callback(ctx) + } + + // Nil can be returned, as the callback has already been called + return nil + } + } + + // Get the next function to call from next middlewares or callback + next := c.wrapMiddlewares(middlewares[1:], callback) + + // Wrap middleware into a check function that will call execute the middleware + // and call the next wrapped middleware if the returned function has not been + // called already + return func(ctx context.Context, msg *extensions.BrokerMessage) error { + // Call the middleware and the following if it has not been done already + if !called { + // Create the next call with the context and the message + nextWithArgs := func(ctx context.Context) error { + return next(ctx, msg) + } + + // Call the middleware and register it as already called + called = true + if err := middlewares[0](ctx, msg, nextWithArgs); err != nil { + return err + } + + // If next has already been called in middleware, it should not be executed again + return nextWithArgs(ctx) + } + + // Nil can be returned, as the next middleware has already been called + return nil + } +} + +func (c AppController) executeMiddlewares(ctx context.Context, msg *extensions.BrokerMessage, callback extensions.NextMiddleware) error { + // Wrap middleware to have 'next' function when calling them + wrapped := c.wrapMiddlewares(c.middlewares, callback) + + // Execute wrapped middlewares + return wrapped(ctx, msg) +} + +func addAppContextValues(ctx context.Context, path string) context.Context { + ctx = context.WithValue(ctx, extensions.ContextKeyIsVersion, "1.2.3") + ctx = context.WithValue(ctx, extensions.ContextKeyIsProvider, "app") + return context.WithValue(ctx, extensions.ContextKeyIsChannel, path) +} + +// Close will clean up any existing resources on the controller +func (c *AppController) Close(ctx context.Context) { + // Unsubscribing remaining channels +} + +// PublishV2Issue222Test will publish messages to 'v2.issue222.test' channel +func (c *AppController) PublishV2Issue222Test( + ctx context.Context, + msg V2Issue222TestMessage, +) error { + // Get channel path + path := "v2.issue222.test" + + // Set context + ctx = addAppContextValues(ctx, path) + ctx = context.WithValue(ctx, extensions.ContextKeyIsDirection, "publication") + + // Convert to BrokerMessage + brokerMsg, err := msg.toBrokerMessage() + if err != nil { + return err + } + + // Set broker message to context + ctx = context.WithValue(ctx, extensions.ContextKeyIsBrokerMessage, brokerMsg.String()) + + // Publish the message on event-broker through middlewares + return c.executeMiddlewares(ctx, &brokerMsg, func(ctx context.Context) error { + return c.broker.Publish(ctx, path, brokerMsg) + }) +} + +// UserSubscriber represents all handlers that are expecting messages for User +type UserSubscriber interface { + // V2Issue222Test subscribes to messages placed on the 'v2.issue222.test' channel + V2Issue222Test(ctx context.Context, msg V2Issue222TestMessage) error +} + +// UserController is the structure that provides publishing capabilities to the +// developer and and connect the broker with the User +type UserController struct { + controller +} + +// NewUserController links the User to the broker +func NewUserController(bc extensions.BrokerController, options ...ControllerOption) (*UserController, error) { + // Check if broker controller has been provided + if bc == nil { + return nil, extensions.ErrNilBrokerController + } + + // Create default controller + controller := controller{ + broker: bc, + subscriptions: make(map[string]extensions.BrokerChannelSubscription), + logger: extensions.DummyLogger{}, + middlewares: make([]extensions.Middleware, 0), + errorHandler: extensions.DefaultErrorHandler(), + } + + // Apply options + for _, option := range options { + option(&controller) + } + + return &UserController{controller: controller}, nil +} + +func (c UserController) wrapMiddlewares( + middlewares []extensions.Middleware, + callback extensions.NextMiddleware, +) func(ctx context.Context, msg *extensions.BrokerMessage) error { + var called bool + + // If there is no more middleware + if len(middlewares) == 0 { + return func(ctx context.Context, msg *extensions.BrokerMessage) error { + // Call the callback if it exists and it has not been called already + if callback != nil && !called { + called = true + return callback(ctx) + } + + // Nil can be returned, as the callback has already been called + return nil + } + } + + // Get the next function to call from next middlewares or callback + next := c.wrapMiddlewares(middlewares[1:], callback) + + // Wrap middleware into a check function that will call execute the middleware + // and call the next wrapped middleware if the returned function has not been + // called already + return func(ctx context.Context, msg *extensions.BrokerMessage) error { + // Call the middleware and the following if it has not been done already + if !called { + // Create the next call with the context and the message + nextWithArgs := func(ctx context.Context) error { + return next(ctx, msg) + } + + // Call the middleware and register it as already called + called = true + if err := middlewares[0](ctx, msg, nextWithArgs); err != nil { + return err + } + + // If next has already been called in middleware, it should not be executed again + return nextWithArgs(ctx) + } + + // Nil can be returned, as the next middleware has already been called + return nil + } +} + +func (c UserController) executeMiddlewares(ctx context.Context, msg *extensions.BrokerMessage, callback extensions.NextMiddleware) error { + // Wrap middleware to have 'next' function when calling them + wrapped := c.wrapMiddlewares(c.middlewares, callback) + + // Execute wrapped middlewares + return wrapped(ctx, msg) +} + +func addUserContextValues(ctx context.Context, path string) context.Context { + ctx = context.WithValue(ctx, extensions.ContextKeyIsVersion, "1.2.3") + ctx = context.WithValue(ctx, extensions.ContextKeyIsProvider, "user") + return context.WithValue(ctx, extensions.ContextKeyIsChannel, path) +} + +// Close will clean up any existing resources on the controller +func (c *UserController) Close(ctx context.Context) { + // Unsubscribing remaining channels + c.UnsubscribeAll(ctx) + + c.logger.Info(ctx, "Closed user controller") +} + +// SubscribeAll will subscribe to channels without parameters on which the app is expecting messages. +// For channels with parameters, they should be subscribed independently. +func (c *UserController) SubscribeAll(ctx context.Context, as UserSubscriber) error { + if as == nil { + return extensions.ErrNilUserSubscriber + } + + if err := c.SubscribeV2Issue222Test(ctx, as.V2Issue222Test); err != nil { + return err + } + + return nil +} + +// UnsubscribeAll will unsubscribe all remaining subscribed channels +func (c *UserController) UnsubscribeAll(ctx context.Context) { + c.UnsubscribeV2Issue222Test(ctx) +} + +// SubscribeV2Issue222Test will subscribe to new messages from 'v2.issue222.test' channel. +// +// Callback function 'fn' will be called each time a new message is received. +func (c *UserController) SubscribeV2Issue222Test( + ctx context.Context, + fn func(ctx context.Context, msg V2Issue222TestMessage) error, +) error { + // Get channel path + path := "v2.issue222.test" + + // Set context + ctx = addUserContextValues(ctx, path) + ctx = context.WithValue(ctx, extensions.ContextKeyIsDirection, "reception") + + // Check if there is already a subscription + _, exists := c.subscriptions[path] + if exists { + err := fmt.Errorf("%w: %q channel is already subscribed", extensions.ErrAlreadySubscribedChannel, path) + c.logger.Error(ctx, err.Error()) + return err + } + + // Subscribe to broker channel + sub, err := c.broker.Subscribe(ctx, path) + if err != nil { + c.logger.Error(ctx, err.Error()) + return err + } + c.logger.Info(ctx, "Subscribed to channel") + + // Asynchronously listen to new messages and pass them to app subscriber + go func() { + for { + // Wait for next message + acknowledgeableBrokerMessage, open := <-sub.MessagesChannel() + + // If subscription is closed and there is no more message + // (i.e. uninitialized message), then exit the function + if !open && acknowledgeableBrokerMessage.IsUninitialized() { + return + } + + // Set broker message to context + ctx = context.WithValue(ctx, extensions.ContextKeyIsBrokerMessage, acknowledgeableBrokerMessage.String()) + + // Execute middlewares before handling the message + if err := c.executeMiddlewares(ctx, &acknowledgeableBrokerMessage.BrokerMessage, func(ctx context.Context) error { + // Process message + msg, err := brokerMessageToV2Issue222TestMessage(acknowledgeableBrokerMessage.BrokerMessage) + if err != nil { + return err + } + + // Execute the subscription function + if err := fn(ctx, msg); err != nil { + return err + } + + acknowledgeableBrokerMessage.Ack() + + return nil + }); err != nil { + c.errorHandler(ctx, path, &acknowledgeableBrokerMessage, err) + // On error execute the acknowledgeableBrokerMessage nack() function and + // let the BrokerAcknowledgment decide what is the right nack behavior for the broker + acknowledgeableBrokerMessage.Nak() + } + } + }() + + // Add the cancel channel to the inside map + c.subscriptions[path] = sub + + return nil +} + +// UnsubscribeV2Issue222Test will unsubscribe messages from 'v2.issue222.test' channel. +// A timeout can be set in context to avoid blocking operation, if needed. +func (c *UserController) UnsubscribeV2Issue222Test(ctx context.Context) { + // Get channel path + path := "v2.issue222.test" + + // Check if there subscribers for this channel + sub, exists := c.subscriptions[path] + if !exists { + return + } + + // Set context + ctx = addUserContextValues(ctx, path) + + // Stop the subscription + sub.Cancel(ctx) + + // Remove if from the subscribers + delete(c.subscriptions, path) + + c.logger.Info(ctx, "Unsubscribed from channel") +} + +// AsyncAPIVersion is the version of the used AsyncAPI document +const AsyncAPIVersion = "1.2.3" + +// controller is the controller that will be used to communicate with the broker +// It will be used internally by AppController and UserController +type controller struct { + // broker is the broker controller that will be used to communicate + broker extensions.BrokerController + // subscriptions is a map of all subscriptions + subscriptions map[string]extensions.BrokerChannelSubscription + // logger is the logger that will be used² to log operations on controller + logger extensions.Logger + // middlewares are the middlewares that will be executed when sending or + // receiving messages + middlewares []extensions.Middleware + // handler to handle errors from consumers and middlewares + errorHandler extensions.ErrorHandler +} + +// ControllerOption is the type of the options that can be passed +// when creating a new Controller +type ControllerOption func(controller *controller) + +// WithLogger attaches a logger to the controller +func WithLogger(logger extensions.Logger) ControllerOption { + return func(controller *controller) { + controller.logger = logger + } +} + +// WithMiddlewares attaches middlewares that will be executed when sending or receiving messages +func WithMiddlewares(middlewares ...extensions.Middleware) ControllerOption { + return func(controller *controller) { + controller.middlewares = middlewares + } +} + +// WithErrorHandler attaches a errorhandler to handle errors from subscriber functions +func WithErrorHandler(handler extensions.ErrorHandler) ControllerOption { + return func(controller *controller) { + controller.errorHandler = handler + } +} + +type MessageWithCorrelationID interface { + CorrelationID() string + SetCorrelationID(id string) +} + +type Error struct { + Channel string + Err error +} + +func (e *Error) Error() string { + return fmt.Sprintf("channel %q: err %v", e.Channel, e.Err) +} + +// V2Issue222TestMessage is the message expected for 'V2Issue222TestMessage' channel. +type V2Issue222TestMessage struct { + // Payload will be inserted in the message payload + Payload TestSchema +} + +func NewV2Issue222TestMessage() V2Issue222TestMessage { + var msg V2Issue222TestMessage + + return msg +} + +// brokerMessageToV2Issue222TestMessage will fill a new V2Issue222TestMessage with data from generic broker message +func brokerMessageToV2Issue222TestMessage(bMsg extensions.BrokerMessage) (V2Issue222TestMessage, error) { + var msg V2Issue222TestMessage + + // Unmarshal payload to expected message payload format + err := json.Unmarshal(bMsg.Payload, &msg.Payload) + if err != nil { + return msg, err + } + + // TODO: run checks on msg type + + return msg, nil +} + +// toBrokerMessage will generate a generic broker message from V2Issue222TestMessage data +func (msg V2Issue222TestMessage) toBrokerMessage() (extensions.BrokerMessage, error) { + // TODO: implement checks on message + + // Marshal payload to JSON + payload, err := json.Marshal(msg.Payload) + if err != nil { + return extensions.BrokerMessage{}, err + } + + // There is no headers here + headers := make(map[string][]byte, 0) + + return extensions.BrokerMessage{ + Headers: headers, + Payload: payload, + }, nil +} + +// TestSchema is a schema from the AsyncAPI specification required in messages +type TestSchema struct { + DateProp *civil.Date `json:"DateProp"` + DateTimeProp *time.Time `json:"DateTimeProp"` +} + +const ( + // V2Issue222TestPath is the constant representing the 'V2Issue222Test' channel path. + V2Issue222TestPath = "v2.issue222.test" +) + +// ChannelsPaths is an array of all channels paths +var ChannelsPaths = []string{ + V2Issue222TestPath, +} diff --git a/test/v2/issues/222/asyncapi.yaml b/test/v2/issues/222/asyncapi.yaml new file mode 100644 index 00000000..9fd819c4 --- /dev/null +++ b/test/v2/issues/222/asyncapi.yaml @@ -0,0 +1,23 @@ +asyncapi: 2.6.0 +info: + title: Sample App + version: 1.2.3 + +channels: + v2.issue222.test: + subscribe: + message: + payload: + $ref: '#/components/schemas/Test' + +components: + schemas: + Test: + type: object + properties: + DateProp: + type: string + format: date + DateTimeProp: + type: string + format: date-time diff --git a/test/v2/issues/222/suite_test.go b/test/v2/issues/222/suite_test.go new file mode 100644 index 00000000..185dd3fc --- /dev/null +++ b/test/v2/issues/222/suite_test.go @@ -0,0 +1,36 @@ +//go:generate go run ../../../../cmd/asyncapi-codegen -p issue222 -i ./asyncapi.yaml -o ./asyncapi.gen.go + +package issue222 + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" +) + +func TestSuite(t *testing.T) { + suite.Run(t, NewSuite()) +} + +type Suite struct { + suite.Suite +} + +func NewSuite() *Suite { + return &Suite{} +} + +const val = ` +{ + "DateProp": "2024-02-02", + "DateTimeProp": "2024-06-05T13:45:30.0000Z" +} +` + +func (suite *Suite) TestMarshalling() { + var res TestSchema + err := json.Unmarshal([]byte(val), &res) + assert.NoError(suite.T(), err) +} diff --git a/test/v3/issues/222/asyncapi.gen.go b/test/v3/issues/222/asyncapi.gen.go new file mode 100644 index 00000000..af3b4d1c --- /dev/null +++ b/test/v3/issues/222/asyncapi.gen.go @@ -0,0 +1,485 @@ +// Package "issue222" provides primitives to interact with the AsyncAPI specification. +// +// Code generated by github.com/lerenn/asyncapi-codegen version (devel) DO NOT EDIT. +package issue222 + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "cloud.google.com/go/civil" + "github.com/lerenn/asyncapi-codegen/pkg/extensions" +) + +// AppController is the structure that provides publishing capabilities to the +// developer and and connect the broker with the App +type AppController struct { + controller +} + +// NewAppController links the App to the broker +func NewAppController(bc extensions.BrokerController, options ...ControllerOption) (*AppController, error) { + // Check if broker controller has been provided + if bc == nil { + return nil, extensions.ErrNilBrokerController + } + + // Create default controller + controller := controller{ + broker: bc, + subscriptions: make(map[string]extensions.BrokerChannelSubscription), + logger: extensions.DummyLogger{}, + middlewares: make([]extensions.Middleware, 0), + errorHandler: extensions.DefaultErrorHandler(), + } + + // Apply options + for _, option := range options { + option(&controller) + } + + return &AppController{controller: controller}, nil +} + +func (c AppController) wrapMiddlewares( + middlewares []extensions.Middleware, + callback extensions.NextMiddleware, +) func(ctx context.Context, msg *extensions.BrokerMessage) error { + var called bool + + // If there is no more middleware + if len(middlewares) == 0 { + return func(ctx context.Context, msg *extensions.BrokerMessage) error { + // Call the callback if it exists and it has not been called already + if callback != nil && !called { + called = true + return callback(ctx) + } + + // Nil can be returned, as the callback has already been called + return nil + } + } + + // Get the next function to call from next middlewares or callback + next := c.wrapMiddlewares(middlewares[1:], callback) + + // Wrap middleware into a check function that will call execute the middleware + // and call the next wrapped middleware if the returned function has not been + // called already + return func(ctx context.Context, msg *extensions.BrokerMessage) error { + // Call the middleware and the following if it has not been done already + if !called { + // Create the next call with the context and the message + nextWithArgs := func(ctx context.Context) error { + return next(ctx, msg) + } + + // Call the middleware and register it as already called + called = true + if err := middlewares[0](ctx, msg, nextWithArgs); err != nil { + return err + } + + // If next has already been called in middleware, it should not be executed again + return nextWithArgs(ctx) + } + + // Nil can be returned, as the next middleware has already been called + return nil + } +} + +func (c AppController) executeMiddlewares(ctx context.Context, msg *extensions.BrokerMessage, callback extensions.NextMiddleware) error { + // Wrap middleware to have 'next' function when calling them + wrapped := c.wrapMiddlewares(c.middlewares, callback) + + // Execute wrapped middlewares + return wrapped(ctx, msg) +} + +func addAppContextValues(ctx context.Context, path string) context.Context { + ctx = context.WithValue(ctx, extensions.ContextKeyIsVersion, "1.2.3") + ctx = context.WithValue(ctx, extensions.ContextKeyIsProvider, "app") + return context.WithValue(ctx, extensions.ContextKeyIsChannel, path) +} + +// Close will clean up any existing resources on the controller +func (c *AppController) Close(ctx context.Context) { + // Unsubscribing remaining channels +} + +// PublishV3Issue222Test will publish messages to 'v3.issue222.test' channel +func (c *AppController) PublishV3Issue222Test( + ctx context.Context, + msg V3Issue222TestMessage, +) error { + // Get channel path + path := "v3.issue222.test" + + // Set context + ctx = addAppContextValues(ctx, path) + ctx = context.WithValue(ctx, extensions.ContextKeyIsDirection, "publication") + + // Convert to BrokerMessage + brokerMsg, err := msg.toBrokerMessage() + if err != nil { + return err + } + + // Set broker message to context + ctx = context.WithValue(ctx, extensions.ContextKeyIsBrokerMessage, brokerMsg.String()) + + // Publish the message on event-broker through middlewares + return c.executeMiddlewares(ctx, &brokerMsg, func(ctx context.Context) error { + return c.broker.Publish(ctx, path, brokerMsg) + }) +} + +// UserSubscriber represents all handlers that are expecting messages for User +type UserSubscriber interface { + // V3Issue222Test subscribes to messages placed on the 'v3.issue222.test' channel + V3Issue222Test(ctx context.Context, msg V3Issue222TestMessage) error +} + +// UserController is the structure that provides publishing capabilities to the +// developer and and connect the broker with the User +type UserController struct { + controller +} + +// NewUserController links the User to the broker +func NewUserController(bc extensions.BrokerController, options ...ControllerOption) (*UserController, error) { + // Check if broker controller has been provided + if bc == nil { + return nil, extensions.ErrNilBrokerController + } + + // Create default controller + controller := controller{ + broker: bc, + subscriptions: make(map[string]extensions.BrokerChannelSubscription), + logger: extensions.DummyLogger{}, + middlewares: make([]extensions.Middleware, 0), + errorHandler: extensions.DefaultErrorHandler(), + } + + // Apply options + for _, option := range options { + option(&controller) + } + + return &UserController{controller: controller}, nil +} + +func (c UserController) wrapMiddlewares( + middlewares []extensions.Middleware, + callback extensions.NextMiddleware, +) func(ctx context.Context, msg *extensions.BrokerMessage) error { + var called bool + + // If there is no more middleware + if len(middlewares) == 0 { + return func(ctx context.Context, msg *extensions.BrokerMessage) error { + // Call the callback if it exists and it has not been called already + if callback != nil && !called { + called = true + return callback(ctx) + } + + // Nil can be returned, as the callback has already been called + return nil + } + } + + // Get the next function to call from next middlewares or callback + next := c.wrapMiddlewares(middlewares[1:], callback) + + // Wrap middleware into a check function that will call execute the middleware + // and call the next wrapped middleware if the returned function has not been + // called already + return func(ctx context.Context, msg *extensions.BrokerMessage) error { + // Call the middleware and the following if it has not been done already + if !called { + // Create the next call with the context and the message + nextWithArgs := func(ctx context.Context) error { + return next(ctx, msg) + } + + // Call the middleware and register it as already called + called = true + if err := middlewares[0](ctx, msg, nextWithArgs); err != nil { + return err + } + + // If next has already been called in middleware, it should not be executed again + return nextWithArgs(ctx) + } + + // Nil can be returned, as the next middleware has already been called + return nil + } +} + +func (c UserController) executeMiddlewares(ctx context.Context, msg *extensions.BrokerMessage, callback extensions.NextMiddleware) error { + // Wrap middleware to have 'next' function when calling them + wrapped := c.wrapMiddlewares(c.middlewares, callback) + + // Execute wrapped middlewares + return wrapped(ctx, msg) +} + +func addUserContextValues(ctx context.Context, path string) context.Context { + ctx = context.WithValue(ctx, extensions.ContextKeyIsVersion, "1.2.3") + ctx = context.WithValue(ctx, extensions.ContextKeyIsProvider, "user") + return context.WithValue(ctx, extensions.ContextKeyIsChannel, path) +} + +// Close will clean up any existing resources on the controller +func (c *UserController) Close(ctx context.Context) { + // Unsubscribing remaining channels + c.UnsubscribeAll(ctx) + + c.logger.Info(ctx, "Closed user controller") +} + +// SubscribeAll will subscribe to channels without parameters on which the app is expecting messages. +// For channels with parameters, they should be subscribed independently. +func (c *UserController) SubscribeAll(ctx context.Context, as UserSubscriber) error { + if as == nil { + return extensions.ErrNilUserSubscriber + } + + if err := c.SubscribeV3Issue222Test(ctx, as.V3Issue222Test); err != nil { + return err + } + + return nil +} + +// UnsubscribeAll will unsubscribe all remaining subscribed channels +func (c *UserController) UnsubscribeAll(ctx context.Context) { + c.UnsubscribeV3Issue222Test(ctx) +} + +// SubscribeV3Issue222Test will subscribe to new messages from 'v3.issue222.test' channel. +// +// Callback function 'fn' will be called each time a new message is received. +func (c *UserController) SubscribeV3Issue222Test( + ctx context.Context, + fn func(ctx context.Context, msg V3Issue222TestMessage) error, +) error { + // Get channel path + path := "v3.issue222.test" + + // Set context + ctx = addUserContextValues(ctx, path) + ctx = context.WithValue(ctx, extensions.ContextKeyIsDirection, "reception") + + // Check if there is already a subscription + _, exists := c.subscriptions[path] + if exists { + err := fmt.Errorf("%w: %q channel is already subscribed", extensions.ErrAlreadySubscribedChannel, path) + c.logger.Error(ctx, err.Error()) + return err + } + + // Subscribe to broker channel + sub, err := c.broker.Subscribe(ctx, path) + if err != nil { + c.logger.Error(ctx, err.Error()) + return err + } + c.logger.Info(ctx, "Subscribed to channel") + + // Asynchronously listen to new messages and pass them to app subscriber + go func() { + for { + // Wait for next message + acknowledgeableBrokerMessage, open := <-sub.MessagesChannel() + + // If subscription is closed and there is no more message + // (i.e. uninitialized message), then exit the function + if !open && acknowledgeableBrokerMessage.IsUninitialized() { + return + } + + // Set broker message to context + ctx = context.WithValue(ctx, extensions.ContextKeyIsBrokerMessage, acknowledgeableBrokerMessage.String()) + + // Execute middlewares before handling the message + if err := c.executeMiddlewares(ctx, &acknowledgeableBrokerMessage.BrokerMessage, func(ctx context.Context) error { + // Process message + msg, err := brokerMessageToV3Issue222TestMessage(acknowledgeableBrokerMessage.BrokerMessage) + if err != nil { + return err + } + + // Execute the subscription function + if err := fn(ctx, msg); err != nil { + return err + } + + acknowledgeableBrokerMessage.Ack() + + return nil + }); err != nil { + c.errorHandler(ctx, path, &acknowledgeableBrokerMessage, err) + // On error execute the acknowledgeableBrokerMessage nack() function and + // let the BrokerAcknowledgment decide what is the right nack behavior for the broker + acknowledgeableBrokerMessage.Nak() + } + } + }() + + // Add the cancel channel to the inside map + c.subscriptions[path] = sub + + return nil +} + +// UnsubscribeV3Issue222Test will unsubscribe messages from 'v3.issue222.test' channel. +// A timeout can be set in context to avoid blocking operation, if needed. +func (c *UserController) UnsubscribeV3Issue222Test(ctx context.Context) { + // Get channel path + path := "v3.issue222.test" + + // Check if there subscribers for this channel + sub, exists := c.subscriptions[path] + if !exists { + return + } + + // Set context + ctx = addUserContextValues(ctx, path) + + // Stop the subscription + sub.Cancel(ctx) + + // Remove if from the subscribers + delete(c.subscriptions, path) + + c.logger.Info(ctx, "Unsubscribed from channel") +} + +// AsyncAPIVersion is the version of the used AsyncAPI document +const AsyncAPIVersion = "1.2.3" + +// controller is the controller that will be used to communicate with the broker +// It will be used internally by AppController and UserController +type controller struct { + // broker is the broker controller that will be used to communicate + broker extensions.BrokerController + // subscriptions is a map of all subscriptions + subscriptions map[string]extensions.BrokerChannelSubscription + // logger is the logger that will be used² to log operations on controller + logger extensions.Logger + // middlewares are the middlewares that will be executed when sending or + // receiving messages + middlewares []extensions.Middleware + // handler to handle errors from consumers and middlewares + errorHandler extensions.ErrorHandler +} + +// ControllerOption is the type of the options that can be passed +// when creating a new Controller +type ControllerOption func(controller *controller) + +// WithLogger attaches a logger to the controller +func WithLogger(logger extensions.Logger) ControllerOption { + return func(controller *controller) { + controller.logger = logger + } +} + +// WithMiddlewares attaches middlewares that will be executed when sending or receiving messages +func WithMiddlewares(middlewares ...extensions.Middleware) ControllerOption { + return func(controller *controller) { + controller.middlewares = middlewares + } +} + +// WithErrorHandler attaches a errorhandler to handle errors from subscriber functions +func WithErrorHandler(handler extensions.ErrorHandler) ControllerOption { + return func(controller *controller) { + controller.errorHandler = handler + } +} + +type MessageWithCorrelationID interface { + CorrelationID() string + SetCorrelationID(id string) +} + +type Error struct { + Channel string + Err error +} + +func (e *Error) Error() string { + return fmt.Sprintf("channel %q: err %v", e.Channel, e.Err) +} + +// V3Issue222TestMessage is the message expected for 'V3Issue222TestMessage' channel. +type V3Issue222TestMessage struct { + // Payload will be inserted in the message payload + Payload TestSchema +} + +func NewV3Issue222TestMessage() V3Issue222TestMessage { + var msg V3Issue222TestMessage + + return msg +} + +// brokerMessageToV3Issue222TestMessage will fill a new V3Issue222TestMessage with data from generic broker message +func brokerMessageToV3Issue222TestMessage(bMsg extensions.BrokerMessage) (V3Issue222TestMessage, error) { + var msg V3Issue222TestMessage + + // Unmarshal payload to expected message payload format + err := json.Unmarshal(bMsg.Payload, &msg.Payload) + if err != nil { + return msg, err + } + + // TODO: run checks on msg type + + return msg, nil +} + +// toBrokerMessage will generate a generic broker message from V3Issue222TestMessage data +func (msg V3Issue222TestMessage) toBrokerMessage() (extensions.BrokerMessage, error) { + // TODO: implement checks on message + + // Marshal payload to JSON + payload, err := json.Marshal(msg.Payload) + if err != nil { + return extensions.BrokerMessage{}, err + } + + // There is no headers here + headers := make(map[string][]byte, 0) + + return extensions.BrokerMessage{ + Headers: headers, + Payload: payload, + }, nil +} + +// TestSchema is a schema from the AsyncAPI specification required in messages +type TestSchema struct { + DateProp *civil.Date `json:"DateProp"` + DateTimeProp *time.Time `json:"DateTimeProp"` +} + +const ( + // V3Issue222TestPath is the constant representing the 'V3Issue222Test' channel path. + V3Issue222TestPath = "v3.issue222.test" +) + +// ChannelsPaths is an array of all channels paths +var ChannelsPaths = []string{ + V3Issue222TestPath, +} diff --git a/test/v3/issues/222/asyncapi.yaml b/test/v3/issues/222/asyncapi.yaml new file mode 100644 index 00000000..8ae8f85b --- /dev/null +++ b/test/v3/issues/222/asyncapi.yaml @@ -0,0 +1,23 @@ +asyncapi: 2.6.0 +info: + title: Sample App + version: 1.2.3 + +channels: + v3.issue222.test: + subscribe: + message: + payload: + $ref: '#/components/schemas/Test' + +components: + schemas: + Test: + type: object + properties: + DateProp: + type: string + format: date + DateTimeProp: + type: string + format: date-time diff --git a/test/v3/issues/222/suite_test.go b/test/v3/issues/222/suite_test.go new file mode 100644 index 00000000..185dd3fc --- /dev/null +++ b/test/v3/issues/222/suite_test.go @@ -0,0 +1,36 @@ +//go:generate go run ../../../../cmd/asyncapi-codegen -p issue222 -i ./asyncapi.yaml -o ./asyncapi.gen.go + +package issue222 + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" +) + +func TestSuite(t *testing.T) { + suite.Run(t, NewSuite()) +} + +type Suite struct { + suite.Suite +} + +func NewSuite() *Suite { + return &Suite{} +} + +const val = ` +{ + "DateProp": "2024-02-02", + "DateTimeProp": "2024-06-05T13:45:30.0000Z" +} +` + +func (suite *Suite) TestMarshalling() { + var res TestSchema + err := json.Unmarshal([]byte(val), &res) + assert.NoError(suite.T(), err) +} From a3ba379ff8e74f41561bb7ef5281d75d20aa9fd4 Mon Sep 17 00:00:00 2001 From: Louis FRADIN Date: Sun, 9 Jun 2024 14:12:29 +0200 Subject: [PATCH 2/4] Remove required imports from templating As there is already a processing from 'imports' official golang tools, the uneeded imports are automatically wiped out at generation. We can just add necessary imports in the 'imports.tmpl' files and the ones that are not needed will be automatically removed. --- pkg/codegen/generators/helpers.go | 6 ------ pkg/codegen/generators/v2/generator.go | 14 ++++---------- pkg/codegen/generators/v2/imports.go | 9 ++++----- pkg/codegen/generators/v2/templates/imports.tmpl | 14 +++++++++++--- pkg/codegen/generators/v3/generator.go | 15 ++++----------- pkg/codegen/generators/v3/imports.go | 9 ++++----- pkg/codegen/generators/v3/templates/imports.tmpl | 14 +++++++++++--- test/v2/issues/222/asyncapi.gen.go | 3 ++- test/v3/issues/222/asyncapi.gen.go | 3 ++- 9 files changed, 42 insertions(+), 45 deletions(-) diff --git a/pkg/codegen/generators/helpers.go b/pkg/codegen/generators/helpers.go index b452fa88..97212660 100644 --- a/pkg/codegen/generators/helpers.go +++ b/pkg/codegen/generators/helpers.go @@ -5,7 +5,6 @@ import ( "strings" "github.com/lerenn/asyncapi-codegen/pkg/asyncapi" - "github.com/lerenn/asyncapi-codegen/pkg/utils" ) func appendDirectiveIfDefined(directives []string, tagName string, value float64) []string { @@ -69,8 +68,3 @@ func appendEnumDirectives[T any](schema asyncapi.Validations[T], directives []st } return directives } - -// DateTimeFormatInSpec recursively iterates to find any Schema, and returns true if their format is "date-time". -func DateTimeFormatInSpec[SPEC any](spec SPEC) bool { - return utils.FieldValueExists(spec, "Format", "date-time") -} diff --git a/pkg/codegen/generators/v2/generator.go b/pkg/codegen/generators/v2/generator.go index 4f0009f8..40ad337d 100644 --- a/pkg/codegen/generators/v2/generator.go +++ b/pkg/codegen/generators/v2/generator.go @@ -55,17 +55,11 @@ func (g Generator) generateImports(opts options.Options) (string, error) { return "", fmt.Errorf("failed to generate custom imports: %w", err) } - var requiredImports []string - if generators.DateTimeFormatInSpec(g.Specification) { - requiredImports = append(requiredImports, "\"cloud.google.com/go/civil\"") - } - return ImportsGenerator{ - PackageName: opts.PackageName, - ModuleVersion: g.ModuleVersion, - ModuleName: g.ModulePath, - RequiredImports: requiredImports, - CustomImports: imps, + PackageName: opts.PackageName, + ModuleVersion: g.ModuleVersion, + ModuleName: g.ModulePath, + CustomImports: imps, }.Generate() } diff --git a/pkg/codegen/generators/v2/imports.go b/pkg/codegen/generators/v2/imports.go index 8f3beb37..615bc7a8 100644 --- a/pkg/codegen/generators/v2/imports.go +++ b/pkg/codegen/generators/v2/imports.go @@ -8,11 +8,10 @@ import ( // to the code, being asyncapi-codegen packages, standard library packages or // custom packages. type ImportsGenerator struct { - PackageName string - ModuleVersion string - ModuleName string - RequiredImports []string - CustomImports []string + PackageName string + ModuleVersion string + ModuleName string + CustomImports []string } // Generate will generate the imports code. diff --git a/pkg/codegen/generators/v2/templates/imports.tmpl b/pkg/codegen/generators/v2/templates/imports.tmpl index b30339b0..26a99fd3 100644 --- a/pkg/codegen/generators/v2/templates/imports.tmpl +++ b/pkg/codegen/generators/v2/templates/imports.tmpl @@ -4,6 +4,8 @@ package {{.PackageName}} import ( + {{/* ------------------- Standard library imports ------------------- */ -}} + "encoding/json" "time" "errors" @@ -12,13 +14,19 @@ import ( "encoding/binary" "math" + {{/* ------------------- AsyncAPI Codegen imports ------------------- */ -}} + + {{- /* For extensions */}} "github.com/lerenn/asyncapi-codegen/pkg/extensions" + {{/* ----------------------- External imports ----------------------- */ -}} + + {{- /* For UUID */}} "github.com/google/uuid" - "github.com/nats-io/nats.go" - {{ range .RequiredImports }}{{.}} - {{end}} + {{- /* For Date & Time formatting */}} + "cloud.google.com/go/civil" + {{ range .CustomImports }}{{.}} {{end}} ) diff --git a/pkg/codegen/generators/v3/generator.go b/pkg/codegen/generators/v3/generator.go index ff43ce05..b2533f79 100644 --- a/pkg/codegen/generators/v3/generator.go +++ b/pkg/codegen/generators/v3/generator.go @@ -55,18 +55,11 @@ func (g Generator) generateImports(opts options.Options) (string, error) { return "", fmt.Errorf("failed to generate custom imports: %w", err) } - var requiredImports []string - - if generators.DateTimeFormatInSpec(g.Specification) { - requiredImports = append(requiredImports, "\"cloud.google.com/go/civil\"") - } - return ImportsGenerator{ - PackageName: opts.PackageName, - ModuleVersion: g.ModuleVersion, - ModuleName: g.ModulePath, - RequiredImports: requiredImports, - CustomImports: imps, + PackageName: opts.PackageName, + ModuleVersion: g.ModuleVersion, + ModuleName: g.ModulePath, + CustomImports: imps, }.Generate() } diff --git a/pkg/codegen/generators/v3/imports.go b/pkg/codegen/generators/v3/imports.go index fb978ba9..3a4cf467 100644 --- a/pkg/codegen/generators/v3/imports.go +++ b/pkg/codegen/generators/v3/imports.go @@ -8,11 +8,10 @@ import ( // to the code, being asyncapi-codegen packages, standard library packages or // custom packages. type ImportsGenerator struct { - PackageName string - ModuleVersion string - ModuleName string - RequiredImports []string - CustomImports []string + PackageName string + ModuleVersion string + ModuleName string + CustomImports []string } // Generate will generate the imports code. diff --git a/pkg/codegen/generators/v3/templates/imports.tmpl b/pkg/codegen/generators/v3/templates/imports.tmpl index b30339b0..26a99fd3 100644 --- a/pkg/codegen/generators/v3/templates/imports.tmpl +++ b/pkg/codegen/generators/v3/templates/imports.tmpl @@ -4,6 +4,8 @@ package {{.PackageName}} import ( + {{/* ------------------- Standard library imports ------------------- */ -}} + "encoding/json" "time" "errors" @@ -12,13 +14,19 @@ import ( "encoding/binary" "math" + {{/* ------------------- AsyncAPI Codegen imports ------------------- */ -}} + + {{- /* For extensions */}} "github.com/lerenn/asyncapi-codegen/pkg/extensions" + {{/* ----------------------- External imports ----------------------- */ -}} + + {{- /* For UUID */}} "github.com/google/uuid" - "github.com/nats-io/nats.go" - {{ range .RequiredImports }}{{.}} - {{end}} + {{- /* For Date & Time formatting */}} + "cloud.google.com/go/civil" + {{ range .CustomImports }}{{.}} {{end}} ) diff --git a/test/v2/issues/222/asyncapi.gen.go b/test/v2/issues/222/asyncapi.gen.go index ca58222a..ee9f178c 100644 --- a/test/v2/issues/222/asyncapi.gen.go +++ b/test/v2/issues/222/asyncapi.gen.go @@ -9,8 +9,9 @@ import ( "fmt" "time" - "cloud.google.com/go/civil" "github.com/lerenn/asyncapi-codegen/pkg/extensions" + + "cloud.google.com/go/civil" ) // AppController is the structure that provides publishing capabilities to the diff --git a/test/v3/issues/222/asyncapi.gen.go b/test/v3/issues/222/asyncapi.gen.go index af3b4d1c..057cccdf 100644 --- a/test/v3/issues/222/asyncapi.gen.go +++ b/test/v3/issues/222/asyncapi.gen.go @@ -9,8 +9,9 @@ import ( "fmt" "time" - "cloud.google.com/go/civil" "github.com/lerenn/asyncapi-codegen/pkg/extensions" + + "cloud.google.com/go/civil" ) // AppController is the structure that provides publishing capabilities to the From 67d18bcb67faf878a8525092c52256e9504cb2f4 Mon Sep 17 00:00:00 2001 From: Louis FRADIN Date: Sun, 9 Jun 2024 14:22:23 +0200 Subject: [PATCH 3/4] add tests from FieldValueExists --- pkg/utils/reflect_test.go | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) create mode 100644 pkg/utils/reflect_test.go diff --git a/pkg/utils/reflect_test.go b/pkg/utils/reflect_test.go new file mode 100644 index 00000000..5f60aaee --- /dev/null +++ b/pkg/utils/reflect_test.go @@ -0,0 +1,34 @@ +package utils + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestFieldValueExists(t *testing.T) { + // Parameters + type testStruct struct { + Field1 string + Field2 int + Field3 bool + } + test := testStruct{ + Field1: "test", + Field2: 1, + Field3: true, + } + + // Test with field that exists and correct values + assert.True(t, FieldValueExists(test, "Field1", "test")) + assert.True(t, FieldValueExists(test, "Field2", 1)) + assert.True(t, FieldValueExists(test, "Field3", true)) + + // Test with field that exists and incorrect values + assert.False(t, FieldValueExists(test, "Field1", "test2")) + assert.False(t, FieldValueExists(test, "Field2", 2)) + assert.False(t, FieldValueExists(test, "Field3", false)) + + // Test with field that does not exist + assert.False(t, FieldValueExists(test, "Field4", "test")) +} From cf40709d998f5920f05942f92948be9f557a9693 Mon Sep 17 00:00:00 2001 From: Louis FRADIN Date: Sun, 9 Jun 2024 14:38:52 +0200 Subject: [PATCH 4/4] fix test for v3 --- .../generators/v3/templates/schema_name.tmpl | 4 +- test/v3/issues/222/asyncapi.gen.go | 303 +++++++++--------- test/v3/issues/222/asyncapi.yaml | 17 +- 3 files changed, 172 insertions(+), 152 deletions(-) diff --git a/pkg/codegen/generators/v3/templates/schema_name.tmpl b/pkg/codegen/generators/v3/templates/schema_name.tmpl index 6a8b918c..818b5f4f 100644 --- a/pkg/codegen/generators/v3/templates/schema_name.tmpl +++ b/pkg/codegen/generators/v3/templates/schema_name.tmpl @@ -16,7 +16,9 @@ bool {{- /* --------------------------- Type String -------------------------- */ -}} {{- else if eq .Type "string" -}} -{{- if and .Format (or (eq .Format "date") (eq .Format "date-time")) -}} +{{- if and .Format (eq .Format "date") -}} +civil.Date +{{- else if and .Format (eq .Format "date-time") -}} time.Time {{- else -}} string diff --git a/test/v3/issues/222/asyncapi.gen.go b/test/v3/issues/222/asyncapi.gen.go index 057cccdf..f6320ebb 100644 --- a/test/v3/issues/222/asyncapi.gen.go +++ b/test/v3/issues/222/asyncapi.gen.go @@ -14,7 +14,13 @@ import ( "cloud.google.com/go/civil" ) -// AppController is the structure that provides publishing capabilities to the +// AppSubscriber contains all handlers that are listening messages for App +type AppSubscriber interface { + // HandleTestingOperationReceived receive all TestMessageMessageFromTestingChannel messages from Testing channel. + HandleTestingOperationReceived(ctx context.Context, msg TestMessageMessageFromTestingChannel) error +} + +// AppController is the structure that provides sending capabilities to the // developer and and connect the broker with the App type AppController struct { controller @@ -101,51 +107,146 @@ func (c AppController) executeMiddlewares(ctx context.Context, msg *extensions.B return wrapped(ctx, msg) } -func addAppContextValues(ctx context.Context, path string) context.Context { +func addAppContextValues(ctx context.Context, addr string) context.Context { ctx = context.WithValue(ctx, extensions.ContextKeyIsVersion, "1.2.3") ctx = context.WithValue(ctx, extensions.ContextKeyIsProvider, "app") - return context.WithValue(ctx, extensions.ContextKeyIsChannel, path) + return context.WithValue(ctx, extensions.ContextKeyIsChannel, addr) } // Close will clean up any existing resources on the controller func (c *AppController) Close(ctx context.Context) { // Unsubscribing remaining channels + c.UnsubscribeFromAllChannels(ctx) + + c.logger.Info(ctx, "Closed app controller") +} + +// SubscribeToAllChannels will receive messages from channels where channel has +// no parameter on which the app is expecting messages. For channels with parameters, +// they should be subscribed independently. +func (c *AppController) SubscribeToAllChannels(ctx context.Context, as AppSubscriber) error { + if as == nil { + return extensions.ErrNilAppSubscriber + } + + if err := c.SubscribeToHandleTestingOperation(ctx, as.HandleTestingOperationReceived); err != nil { + return err + } + + return nil +} + +// UnsubscribeFromAllChannels will stop the subscription of all remaining subscribed channels +func (c *AppController) UnsubscribeFromAllChannels(ctx context.Context) { + c.UnsubscribeFromHandleTestingOperation(ctx) } -// PublishV3Issue222Test will publish messages to 'v3.issue222.test' channel -func (c *AppController) PublishV3Issue222Test( +// SubscribeToHandleTestingOperation will receive TestMessageMessageFromTestingChannel messages from Testing channel. +// +// Callback function 'fn' will be called each time a new message is received. +// +// NOTE: for now, this only support the first message from AsyncAPI list. +// +// NOTE: for now, this only support the first message from AsyncAPI list. +// If you need support for other messages, please raise an issue. +func (c *AppController) SubscribeToHandleTestingOperation( ctx context.Context, - msg V3Issue222TestMessage, + fn func(ctx context.Context, msg TestMessageMessageFromTestingChannel) error, ) error { - // Get channel path - path := "v3.issue222.test" + // Get channel address + addr := "v3.issue222.test" // Set context - ctx = addAppContextValues(ctx, path) - ctx = context.WithValue(ctx, extensions.ContextKeyIsDirection, "publication") + ctx = addAppContextValues(ctx, addr) + ctx = context.WithValue(ctx, extensions.ContextKeyIsDirection, "reception") - // Convert to BrokerMessage - brokerMsg, err := msg.toBrokerMessage() + // Check if the controller is already subscribed + _, exists := c.subscriptions[addr] + if exists { + err := fmt.Errorf("%w: controller is already subscribed on channel %q", extensions.ErrAlreadySubscribedChannel, addr) + c.logger.Error(ctx, err.Error()) + return err + } + + // Subscribe to broker channel + sub, err := c.broker.Subscribe(ctx, addr) if err != nil { + c.logger.Error(ctx, err.Error()) return err } + c.logger.Info(ctx, "Subscribed to channel") - // Set broker message to context - ctx = context.WithValue(ctx, extensions.ContextKeyIsBrokerMessage, brokerMsg.String()) + // Asynchronously listen to new messages and pass them to app receiver + go func() { + for { + // Wait for next message + acknowledgeableBrokerMessage, open := <-sub.MessagesChannel() - // Publish the message on event-broker through middlewares - return c.executeMiddlewares(ctx, &brokerMsg, func(ctx context.Context) error { - return c.broker.Publish(ctx, path, brokerMsg) - }) -} + // If subscription is closed and there is no more message + // (i.e. uninitialized message), then exit the function + if !open && acknowledgeableBrokerMessage.IsUninitialized() { + return + } + + // Set broker message to context + ctx = context.WithValue(ctx, extensions.ContextKeyIsBrokerMessage, acknowledgeableBrokerMessage.String()) + + // Execute middlewares before handling the message + if err := c.executeMiddlewares(ctx, &acknowledgeableBrokerMessage.BrokerMessage, func(ctx context.Context) error { + // Process message + msg, err := brokerMessageToTestMessageMessageFromTestingChannel(acknowledgeableBrokerMessage.BrokerMessage) + if err != nil { + return err + } + + // Execute the subscription function + if err := fn(ctx, msg); err != nil { + return err + } + + acknowledgeableBrokerMessage.Ack() + + return nil + }); err != nil { + c.errorHandler(ctx, addr, &acknowledgeableBrokerMessage, err) + // On error execute the acknowledgeableBrokerMessage nack() function and + // let the BrokerAcknowledgment decide what is the right nack behavior for the broker + acknowledgeableBrokerMessage.Nak() + } + } + }() + + // Add the cancel channel to the inside map + c.subscriptions[addr] = sub + + return nil +} // UnsubscribeFromHandleTestingOperation will stop the reception of TestMessageMessageFromTestingChannel messages from Testing channel. +// A timeout can be set in context to avoid blocking operation, if needed. +func (c *AppController) UnsubscribeFromHandleTestingOperation( + ctx context.Context, +) { + // Get channel address + addr := "v3.issue222.test" + + // Check if there receivers for this channel + sub, exists := c.subscriptions[addr] + if !exists { + return + } + + // Set context + ctx = addAppContextValues(ctx, addr) + + // Stop the subscription + sub.Cancel(ctx) -// UserSubscriber represents all handlers that are expecting messages for User -type UserSubscriber interface { - // V3Issue222Test subscribes to messages placed on the 'v3.issue222.test' channel - V3Issue222Test(ctx context.Context, msg V3Issue222TestMessage) error + // Remove if from the receivers + delete(c.subscriptions, addr) + + c.logger.Info(ctx, "Unsubscribed from channel") } -// UserController is the structure that provides publishing capabilities to the +// UserController is the structure that provides sending capabilities to the // developer and and connect the broker with the User type UserController struct { controller @@ -232,137 +333,45 @@ func (c UserController) executeMiddlewares(ctx context.Context, msg *extensions. return wrapped(ctx, msg) } -func addUserContextValues(ctx context.Context, path string) context.Context { +func addUserContextValues(ctx context.Context, addr string) context.Context { ctx = context.WithValue(ctx, extensions.ContextKeyIsVersion, "1.2.3") ctx = context.WithValue(ctx, extensions.ContextKeyIsProvider, "user") - return context.WithValue(ctx, extensions.ContextKeyIsChannel, path) + return context.WithValue(ctx, extensions.ContextKeyIsChannel, addr) } // Close will clean up any existing resources on the controller func (c *UserController) Close(ctx context.Context) { // Unsubscribing remaining channels - c.UnsubscribeAll(ctx) - - c.logger.Info(ctx, "Closed user controller") -} - -// SubscribeAll will subscribe to channels without parameters on which the app is expecting messages. -// For channels with parameters, they should be subscribed independently. -func (c *UserController) SubscribeAll(ctx context.Context, as UserSubscriber) error { - if as == nil { - return extensions.ErrNilUserSubscriber - } - - if err := c.SubscribeV3Issue222Test(ctx, as.V3Issue222Test); err != nil { - return err - } - - return nil -} - -// UnsubscribeAll will unsubscribe all remaining subscribed channels -func (c *UserController) UnsubscribeAll(ctx context.Context) { - c.UnsubscribeV3Issue222Test(ctx) } -// SubscribeV3Issue222Test will subscribe to new messages from 'v3.issue222.test' channel. +// SendToHandleTestingOperation will send a TestMessageMessageFromTestingChannel message on Testing channel. // -// Callback function 'fn' will be called each time a new message is received. -func (c *UserController) SubscribeV3Issue222Test( +// NOTE: for now, this only support the first message from AsyncAPI list. +// If you need support for other messages, please raise an issue. +func (c *UserController) SendToHandleTestingOperation( ctx context.Context, - fn func(ctx context.Context, msg V3Issue222TestMessage) error, + msg TestMessageMessageFromTestingChannel, ) error { - // Get channel path - path := "v3.issue222.test" + // Set channel address + addr := "v3.issue222.test" // Set context - ctx = addUserContextValues(ctx, path) - ctx = context.WithValue(ctx, extensions.ContextKeyIsDirection, "reception") - - // Check if there is already a subscription - _, exists := c.subscriptions[path] - if exists { - err := fmt.Errorf("%w: %q channel is already subscribed", extensions.ErrAlreadySubscribedChannel, path) - c.logger.Error(ctx, err.Error()) - return err - } + ctx = addUserContextValues(ctx, addr) + ctx = context.WithValue(ctx, extensions.ContextKeyIsDirection, "publication") - // Subscribe to broker channel - sub, err := c.broker.Subscribe(ctx, path) + // Convert to BrokerMessage + brokerMsg, err := msg.toBrokerMessage() if err != nil { - c.logger.Error(ctx, err.Error()) return err } - c.logger.Info(ctx, "Subscribed to channel") - // Asynchronously listen to new messages and pass them to app subscriber - go func() { - for { - // Wait for next message - acknowledgeableBrokerMessage, open := <-sub.MessagesChannel() - - // If subscription is closed and there is no more message - // (i.e. uninitialized message), then exit the function - if !open && acknowledgeableBrokerMessage.IsUninitialized() { - return - } - - // Set broker message to context - ctx = context.WithValue(ctx, extensions.ContextKeyIsBrokerMessage, acknowledgeableBrokerMessage.String()) - - // Execute middlewares before handling the message - if err := c.executeMiddlewares(ctx, &acknowledgeableBrokerMessage.BrokerMessage, func(ctx context.Context) error { - // Process message - msg, err := brokerMessageToV3Issue222TestMessage(acknowledgeableBrokerMessage.BrokerMessage) - if err != nil { - return err - } - - // Execute the subscription function - if err := fn(ctx, msg); err != nil { - return err - } - - acknowledgeableBrokerMessage.Ack() - - return nil - }); err != nil { - c.errorHandler(ctx, path, &acknowledgeableBrokerMessage, err) - // On error execute the acknowledgeableBrokerMessage nack() function and - // let the BrokerAcknowledgment decide what is the right nack behavior for the broker - acknowledgeableBrokerMessage.Nak() - } - } - }() - - // Add the cancel channel to the inside map - c.subscriptions[path] = sub - - return nil -} - -// UnsubscribeV3Issue222Test will unsubscribe messages from 'v3.issue222.test' channel. -// A timeout can be set in context to avoid blocking operation, if needed. -func (c *UserController) UnsubscribeV3Issue222Test(ctx context.Context) { - // Get channel path - path := "v3.issue222.test" - - // Check if there subscribers for this channel - sub, exists := c.subscriptions[path] - if !exists { - return - } - - // Set context - ctx = addUserContextValues(ctx, path) - - // Stop the subscription - sub.Cancel(ctx) - - // Remove if from the subscribers - delete(c.subscriptions, path) + // Set broker message to context + ctx = context.WithValue(ctx, extensions.ContextKeyIsBrokerMessage, brokerMsg.String()) - c.logger.Info(ctx, "Unsubscribed from channel") + // Send the message on event-broker through middlewares + return c.executeMiddlewares(ctx, &brokerMsg, func(ctx context.Context) error { + return c.broker.Publish(ctx, addr, brokerMsg) + }) } // AsyncAPIVersion is the version of the used AsyncAPI document @@ -423,21 +432,21 @@ func (e *Error) Error() string { return fmt.Sprintf("channel %q: err %v", e.Channel, e.Err) } -// V3Issue222TestMessage is the message expected for 'V3Issue222TestMessage' channel. -type V3Issue222TestMessage struct { +// TestMessageMessageFromTestingChannel is the message expected for 'TestMessageMessageFromTestingChannel' channel. +type TestMessageMessageFromTestingChannel struct { // Payload will be inserted in the message payload Payload TestSchema } -func NewV3Issue222TestMessage() V3Issue222TestMessage { - var msg V3Issue222TestMessage +func NewTestMessageMessageFromTestingChannel() TestMessageMessageFromTestingChannel { + var msg TestMessageMessageFromTestingChannel return msg } -// brokerMessageToV3Issue222TestMessage will fill a new V3Issue222TestMessage with data from generic broker message -func brokerMessageToV3Issue222TestMessage(bMsg extensions.BrokerMessage) (V3Issue222TestMessage, error) { - var msg V3Issue222TestMessage +// brokerMessageToTestMessageMessageFromTestingChannel will fill a new TestMessageMessageFromTestingChannel with data from generic broker message +func brokerMessageToTestMessageMessageFromTestingChannel(bMsg extensions.BrokerMessage) (TestMessageMessageFromTestingChannel, error) { + var msg TestMessageMessageFromTestingChannel // Unmarshal payload to expected message payload format err := json.Unmarshal(bMsg.Payload, &msg.Payload) @@ -450,8 +459,8 @@ func brokerMessageToV3Issue222TestMessage(bMsg extensions.BrokerMessage) (V3Issu return msg, nil } -// toBrokerMessage will generate a generic broker message from V3Issue222TestMessage data -func (msg V3Issue222TestMessage) toBrokerMessage() (extensions.BrokerMessage, error) { +// toBrokerMessage will generate a generic broker message from TestMessageMessageFromTestingChannel data +func (msg TestMessageMessageFromTestingChannel) toBrokerMessage() (extensions.BrokerMessage, error) { // TODO: implement checks on message // Marshal payload to JSON @@ -476,11 +485,11 @@ type TestSchema struct { } const ( - // V3Issue222TestPath is the constant representing the 'V3Issue222Test' channel path. - V3Issue222TestPath = "v3.issue222.test" + // TestingChannelPath is the constant representing the 'TestingChannel' channel path. + TestingChannelPath = "v3.issue222.test" ) // ChannelsPaths is an array of all channels paths var ChannelsPaths = []string{ - V3Issue222TestPath, + TestingChannelPath, } diff --git a/test/v3/issues/222/asyncapi.yaml b/test/v3/issues/222/asyncapi.yaml index 8ae8f85b..4f9047ad 100644 --- a/test/v3/issues/222/asyncapi.yaml +++ b/test/v3/issues/222/asyncapi.yaml @@ -1,15 +1,24 @@ -asyncapi: 2.6.0 +asyncapi: 3.0.0 info: title: Sample App version: 1.2.3 channels: - v3.issue222.test: - subscribe: - message: + testing: + address: v3.issue222.test + messages: + testMessage: payload: $ref: '#/components/schemas/Test' +operations: + handleTesting: + action: "receive" + channel: + $ref: "#/channels/testing" + messages: + - $ref: "#/channels/testing/messages/testMessage" + components: schemas: Test: