From c49a670b2a5dcb143b4c49d9d60b64d48b29dea4 Mon Sep 17 00:00:00 2001 From: Alec Huang Date: Mon, 24 Jul 2023 15:38:21 -0700 Subject: [PATCH] add oauth --- .github/scripts/profile.json.gpg | Bin 4107 -> 4210 bytes .github/scripts/profile_azure.json.gpg | Bin 4120 -> 4228 bytes .github/scripts/profile_gcs.json.gpg | Bin 4121 -> 4229 bytes .../connector/SnowflakeSinkConnector.java | 29 ++- .../SnowflakeSinkConnectorConfig.java | 4 + .../com/snowflake/kafka/connector/Utils.java | 207 ++++++++++++++++- .../connector/internal/InternalUtils.java | 76 +++++-- .../SnowflakeConnectionServiceFactory.java | 3 +- .../connector/internal/SnowflakeErrors.java | 24 ++ .../connector/internal/SnowflakeURL.java | 2 +- .../internal/streaming/StreamingUtils.java | 44 ++++ .../kafka/connector/ConnectorConfigTest.java | 61 +++++ .../kafka/connector/ConnectorIT.java | 49 ++++ .../SnowflakeSinkTaskForStreamingIT.java | 18 +- .../internal/ConnectionServiceIT.java | 7 +- .../connector/internal/InternalUtilsTest.java | 10 +- .../kafka/connector/internal/TestUtils.java | 213 +++++++++++++++++- .../streaming/SnowflakeSinkServiceV2IT.java | 25 +- 18 files changed, 728 insertions(+), 44 deletions(-) diff --git a/.github/scripts/profile.json.gpg b/.github/scripts/profile.json.gpg index 2b9bf9a62835f25cacadc5246b3b614699b4dc3a..d931860e9d18122b06be12055d1dc1bd0fce5adb 100644 GIT binary patch literal 4210 zcmV-&5RLDQ4Fm}T2!=X5jq05bQ2o;C0cD#i1Xk%nfcboSSf68^9s+on7-~!s2$Cw6 zq>=n@O_~!^?F*&MbAtNl1D zF?z#6K{uXWQoE1#@13y_D|bgAY`>VO|NuBDMy z=tzQ63m2sAQ7)BBL5|xz-{@^r)yQ3iXUC~dE`ChUq;5|e%iMp)O9;o!nC1$O*LlE4 zwxxv>0V|98aT;@?LLPDqvFpXlr65+b;6Faf!HLep5#4(;WiOC?Enhx# z=2QkZ2h91B*maCmvzuqLUhD7=c7x&Manae8eOqXIHD`A5gI_jEFUbA=KcQtSoP)*2 z377!WEq^rC^KttHIz64A(M(10DQVWhEk*WvW@a45D*w zdIn0ZrL8G9;=1lqVlEChYQo~`*pkPq{&{!_!C==xUg?`dw7;>N<^E9$1x;{a*R7-I zC>)N_(8M4uA^;s_Hi9za1xM`bF)TC6R+d`HAK`LAT{j$I-_j-AEdnZe-0@jvsX&i> zO^N^IFze9hTs@8w=K3-=6%V%-CV|qhoyqk7p`UAa@UY=-0Tqq)+2GqU0DVKID6sDW zdug{SZANAZW|sLX&@G2okam306-1mf`8%7uOVL%_S;>jP{RbR?mQA0=I_xt)6;z`$ z^%AZpl}0KTW4jtYwCZ2}l6zQvzl^kheFgOp5wA)Y<%w`m1b|ICK7qc5A^cH3QtFHb zT~W@&mYDCukKSg7v?~pkibWgkLFo4(Y*)x=n%nwS_H4q#<=ETC6`CfhjI5>E4np|~ z`#F(iLEe}`3nhKNbGl7(PpT@BsJuxm(VpK^(PdvBR?^!0Bwy^`=lYoEnnxUPj2tOa z|5cwfhDFJQY#@nAh8|g)pkNWP`{<<`f=k@s_oC}(d2OvDx7G!pL1 zl}6U~!z@5GZz+FoNzz0gv*5O8OX~SvB@lx9Ro%(;x|Q1?1vO@6TmJ;C6(3d)YIyc* zRf=C>SBm{BY|2zHooCnGV{+LHruyezPQd-1VkeDbI>#4Yu?1auyJ;3ML%&GwxP{l{ zaPpK@VtdADLnt#F`qqP~l*q!!Tiz`oakqH6+#Cv-6IH4}pb=bQAj zALc2W1igk6>cFzcC*pFJl{Yu;wCnFT9B&2`V{HnjVR_qFCpEfoAt9RfK%fv({?^W+mHqy)vDhY>hCC|rQ`4XsX!?@Cl zbyAJ_?h;k@a$EY=g|*=<@uVV|zJfDawBlHDlSK>JjBkJnyyvy}z^oot;b>9W*;`)I zWR0!h=(4~S{MgxfrRjvMvsGwxWeWq(36S0kopRyMxCTLO@!Wz1je$iGkM=(_Pz0vL z8@AUzUb&2y`DnktO{s=N`c2h3YF?46_=725SUfY+;#yDq^8%mj zA;4!(r^|};ePFFOx%+I7Wau)o-3e!4ubDJj^{-~=VETPg=%BvF1&s>d_Hm8dfl~Q5 z2r@0!{NV}!vAH0uw+bp%WU-FTZyxeH&{yhsV-xwRpFhFU$l_jgg#YpxjL=P)F4o+= zj1NH-nt}O;!SM0LH6Vc3TAA4alF$|z4V(-S1M_#Ph#9|YEiD&D0NEQh4m7ZU8_Qba zewP;}W98Lp3v>Y2hPhBN&W~Bncjr_zz>3@AH2766-~V%m3j7Vb4+pxd(d0_TYRPqVF`fHi zo6bf@)2ZY%IJ@<-MPPg^V;n4YI(^~eJAE&c{U{w%83NzVO$SvOV(&_bl9@Kz+?)xP z#28X{`CO(!E^!%DT#@t*rtzv347=z?H7cjbsh3}eH9{|yTB2>z<7KCilLSp31I^i9 zVS{oA$RQz6hMkYer z33Jy@l+1iNBrh$3Oy46s^oD`?e-@wEgZMR{iZ9i$z>m-8e=&)%@Cx&eivL;$udLyd@Z~N@{gaaHB=G;RsDz_<3 zRCZchN;^s^Lz8WlK0hD1HHHDp@Jn;6;>~~O;8;&EU6eh!WLW1w$j49gNC!uBFfxcW zXJfRKD)woe?tP+`h7m!((zgAFymNr${;Ada)Bi!nORgmn!}OXbnB{IH?G$d z6w*}u$0{4V_N!hELFj075ZosLo7EnR`UP#|yyHsyz%7+e>bNabvz~NYn6(H=Lr7I; zL23z3^QTOAR)`}CKG;*vM$1^+-MFjeFV!rIrlia<;t0!0h?Yl*un8ENBTlsQ!LT%s zOhojbNW*mDRTkoJI>@~HIcTJPr2g8z-}!Z& z=OENn4@zUAs&_N{f#?)<$(oMHb;#cO0ov{MtLV?_XrsCIBvKU<9>~LNRU)4zSfae9 z(?5~rJaABt4=k29l3@th#1)b>jcx0uSDF-aPQUWrm7ll?!NJNPc~=9i5i6f7V$X|z zT3uFvEaG5YSevl6to0xRpq?~_iaqD0TcB! zKT#Pm=}4)l(_|#M+OG!pk}1!+XCz+S>vQ2%c(lgUcosoV49 z1Qkn27e&!-nc4Cp&OiY=xB0rpFZdQjrtf7QN$ef=z2|9wZ$_|L96Q8XhE9#F_e3yN zl>=U*ou$U*wB)1Vu}N;d(?G(PGfGmRDi0+yQ9Yw9ANP3XuHg&>3T^e+hWk zNsnSMQvL9K$aX5yQVe$g@+9*Df~Q-6dw9(C9K^m0qoC3y2(UP_OQ9l^h)R|DdpT>w z3`ZpD}TJBL2G`MLK9XZJ6b>5#v~5V3X7RNPoLIgZG;t zDeKjAhylmQWkHpz@V!|3aOc?+LcTBT7fInNH2&MLM)}T8)n@;!yfw`j)9jDGMXq#g zIeXPG@~T;fN%jcF{wVs*7wJ=~V$+yHqoM#JasB?at6j3Qz8bAQ3EWG^2GpiBZBAlY z^5nLsJKnoANO0`l^vU3Po}4Yl0+qyGGWL~rYUYj|9)w6z%N|{Um9q*o~d82bDOSNu5(ka1Tt6M zxji@X4j^ajCxb&MM=8fr6wNs`XL;E78TpOdRisW+<1Lz= zD8y>i2jA%{XSmP+&Kw7YwX~1Bx2Q^2V>M=Dp3*Luz5O{hfzhcg9TEIAdF~~uzXG?a zKd6D`nDP|>(ZF(i1))(pwywyfKT+`H8prOID?bV{BV?NBs?~v(JOzZI@0DUp@R^8A zDfSwLo;#~LQh)fuGD|!+{v%XtS+=a&G5B;We<7iN*_oN4MIF}lA*c~{;gws(42GF@ zbfM4m)2=7M9F5pEJXoa*H9mCQ)6m{Y^>JOKy8KW+@^&x<78OtX8+)a-=*=0ze_u12 zXGupq#h`_P_p*~J@2xn0woGvN`(V}=Y11jSKh43M7tt}4{ll@

bb_ZcIe1ko8$5)2 z=m!JUm&LslFs6WJ+EjT(cVU`5eV*SP(DyHA%KqbdStru7Q5s0ri3tOtLDY(rYW1(& zmrO<^Y(*L$f<)puZVmtnLWH9x?B9y?MAyw_Yo*Qr7d^IGn1O_o+M<6Of(zD@#|(;# zwgYC#Ib*bY`?CEKe-q?p``hbiswdu2vxUR;zwpYKG3>qY7nhk4{}WhK<#X8F5ZwF0H&d33R6Ot0 z5qa6G=hqglLq+?dGwh?IvHH0xp7_FrL}XG{2a-w5FsrCdq+C@=D_zyquiSnP+uLFl zE{%!Hx|_jR8tVPWCIxPoBb$vc9D;7FI%avQ)>}v?N$=bJwh8N%7zH#v&?sl&Q z)%}3que8gK!x6h*j%Z$0wEWW?pOE8&9obH~M@{JzMGgA?@6(qgPy?~)bV5xK`sjuF zneJ2za`uLGe#p>;uR6&ECUi;f(?3A@tl6~4Z;FaM+}Wt{l=RT9<|@`cYf}zYMs0VQ*Y;i%=EyaA>9qC zLnw9z+DTNs`-cee)Dy)A4b)mk`uEbtppkg~KYS&aS;9@TA-@cc!cs>ax66X?L7r*uB5B}a&bBt`BX>%5Az@SuI3YfLn zAoIwK7{e$n?a%`S*<3o)tkr+&+&#=75WB?OYBuOUa5g28=^biijljCC8-(kW5e$BGkjto+GBe=4sRmfFV%kjWjkU zAv{N0Ph_O&3KJDHk(<(FH9J-*dyU)n&H^Ie`JRvl&>#19wC*npuTL2`*2AVfce+yd zn!JJ=X~q@{9*m)TxkT`Le4;&)pTC%rp9eY0Lhe?3B&v$x22QcSk)Sy{b%~kFOlYoP z9dTCj)Sw$Z_FZ(fOBSr&=(6Oo3!_O1m1cgvNqgUa)-z!BsY%7(?$7N}VTIz&A=yle zw`Tt$XT1F5AgH9ON2gV^TgZC7Xkco@;~8gF4#Kc0RJ#S{LtkT&>>unm4wuz|2EZG2 z(%f#@f}6s3mJk19)1lcO?@e4IFkVq=CQ2tHR0{kvoLHR$>o55kF7cPmq}8Zx#K4M!isD(M-9hcq$Eq%NM^R{a&FKbld&G-M9RZRq3p8wAW2C zw?R!2+nX%Uu=@2ZJtIg6Ws^FXM+AEnvkd>-=nb1EWu*B-jg9)b+;76EX3$O$Wr5Gd zh_H;QA_iFo7N$ttJ735l6Yi?g1j^9)HsNXSx%w%loJq}S86HN9G=;mQ1!(+kfD&))DoQ zru%yZw$XCBa*Af{fOyBzHIg-U+6xs!K$%5{4>*_Ro9JBxaZ#Tyqf;S;&0`}Sc|F{j5!5Z;lZKR6F%eQI%ch4b@>i*T!Cej=uh zX=#zLFcyb#vaCc`*O?)zB@RnqG63yb3fV-psbcSSA%UC&1EBj->ieh;6zB4DNZn)G zJ@N^-f7_A(1pn?$t7wpDK7bZ zh=79~N;`cw)coVSPP4*vIwm-(t3Vnct>UR*2B5eud026QMG+Y$%s%1@BK5l^wZAWQ zX`toP+_`_+{c&H6*XLt_*-+;aBNH{DD&WeG;0t~dH%{{!XFv*y?dd823;`cIR%Z>G z%yx-Jm3dheaJX`!#U5V%AeKF=hJ3wF3F?mxQjOpD6sPiA3T zyT^ZZHi|0elL5%N|E6iUjlu=m3`AbB*4h*w6P>85P=^$K*T?11ru8dK{$Pu-Brkge z(pcR6%bn6(dVJzzU4TW=BmFjvOX1}PCLd8%Q4*iBoD2MM!?;--F#P(i^X6>)$@+Pp z9!4#n-UM`>Jv>os-D^`b=t0i1s~fW3!Ma}5(Jjp+{K>cqTD9<>9LL2SlIkoQ2_g@n z4z^hEB|LU-Y&9yWg)7hL?T`9{^*ehUNdYySksPC{7yL6FHhkFbV}jo?oV_<J-z+hM2TaKe&9Adi|4lDfvwF_(4ZMZp6gSE#J?(k04;EM^^vaQ5+ z_8GwfMy>eR$_8woE`=feuhhZomGCh>(s!yO8wI?5=UDj4IaSgd883{hriEnf>!or)xQ)imDtiJg+#3{AgjF20z&EWt;A zWLqqUf<*nd%Zy}`OU!36xaI>&VF_>DXONxr#Zkwi9>*2|Oi$|1p^$uk6Tyrps;&N} z3-uyy#-28ivv7<>~qt+gmJ#>HwQq1#i99_>Jou!!(<*rLW%l& zz-*}Jw5PB_mJxyOjH@i2PBf~7&SRBpNYXkPI%r$jfB^hNYz$|$S2WmwINn?3`1|8Y zB`+yo24j2k9)UGLuJ~f2&|4euvaFAcQ4ioXV`<6&)UFoFmTRETvI-|V1&qCWO+1Xq zNyKpx9D$$3Q}{SHtIvI0+_Czy^snUc$hmW;epp4{rCRvE>ZC$6kwg41eNVEY*I=}pEUkh|vV5xq%O;d`Gi>1N;>mcWOd z@{Isc_Xr@!HU3FLfLs3M7C(_JS95PbFkN>B8*Y&rgIJ#D#ykVbKz!)=j!{cL(;4b^ zpZA%&3Y+#Ab}X?Lz>B8cD)SS0<31M28V&G;&Ty#!m`wt`Vbv$aBoz_G0{0HxR)IRs z^jw~c;|Iz_#$@JeM)Rv=z+eYcqco#c<#xkZo&$3jjrF8gb1-K*#h(rM7(dwrOzzHc zl*L#$CXke-4X@(^GGLU+pVI@C2IyOFTZ~i#P(WgKu2#VT_R8!10usA2N-T{O;c9)i z3l%pgLi?`Znf-I-D!sX!`0^OJjc6wTu#VBoc)|B%t~jgrw{&8bgIC8RVPLeew8W*1I>0t=Z*U%xTpQO%rB2* z=!>`a5qzBj?JDl*gZud4z_q=E5Y*AwX?x=1taihgc-*4h}YsE(oWl~7s03$p?6z^OT*uSZRRhTc=0<2fj^Q2HeBt48T9gP#JX!PWat}_GI zhSECs`}%plYrtqpotOod8c(%j>+;oX_5#{vOhC!SMALL+N2aI zWg}i8VS=XazgJ`ZhwP7%U1+H+f~b1B6ok_Y-Hr&?U1r^m>`&6MK!5>HoMp6=P1v0m zNYfzESVS}`UVREHjq~}k&awTm{=ZFvojZtHD{{!kU#8h;6Dlg7v0suIct8k^TQfuFd8EqB(agyq-j-7*GYQ>c? z<8KXEq$K)IGouA@NJ+MS*|Hu4{%;#LniYcVf~#0GxzSg63+B5@-CKK`@tL!b!eO+22mfBY)ZoWEIR>CU8Q7A#n#+Vxj>3`L zq7Jx}!Aq+*XW0BAp!bnZJ8AD$R9hi%P@<57@jV*~js1;ra4Y zgG*hQO3TDN*<{u0I$8xO$6nYECQ%kX)8ANcSi@rfmL(_q!rL(005d-2#3OwnQEMsN z+FkeRZMUNGSi4&R!+uncb1lbNQy%f}UK8LO;TSlyZve?ML zE~r6x7d6H1HaxlQK*~SEEH;OD(3yTg;k6$zd_Cx;Gj?Czo6j<1=L%!SNU>CdQfY^E zuw{tkoO56Kon{+$#+R0At`{5lX3qC#I~*E^O`P{_SUP+mq@bEUUzEPOu|+0jy%;s# zY(T%k+m}AJVfxT`Pcf|e+4$@ z99Ma6HPG8evD}uUZO9*Bv+{hkCy{aBi%i;>J7)^0usjrvrdYPNS;R3X+$2GH@YT;-w84Fm}T2nOCv(KX)4wf)lS0YGSW#*Q=>R*bM8a~=*-i9rSJj$7CI{4)Ri z%qa9K$tLbu;fkOG^{rT+PjQktI7o2Tgi-zF{4c-0ihd!B#nOp}*AqN96k`r&80y%4 zYSljEGOwRUv?B^T{Wk!bP`|js;0Jd_ORPVEQAQg1H}y**P2Q|EzM0oo2@8p!)>q%4 zu`V2QlY(Dr?pRf@uZHS(USYBz1!E5T4rXb=YH}6)eWKVfr2Z*0)&79=a8lM0!nW_Q zxd2c0z0AfSW7GI&MqZzoN}%^K6F~mS)=}&%%0r>8{npba61wm68+*7)X);Rb(eOq`^fGeH4?#fR55iUx~uMT=StpQ@3S$MPAWm5jo{o+hen% zv33h;GzlVivJq%@A6Km?b-fSh`o!v&a7?c|lOh?pwSyJeL88k>z1|LABJEpC(UKUQ z{~)FFQP=*lWBCLa&q7D8S3(|7Yg<3O5l6EhQ(=a&V}`1A4LBk$-#r#>xM}AK^Y@}u0N!Q9` zy{dvVED$@L7=&~!e*X+3@)YTZ45hFW`<4v=LkVsbcq2c7|Ix7VJY2)Hid5;R4?>+o^nG^u^D`;IYUl9mg!eN!A5-~N6m9Op3s+))~U zQZL-)p_}j`odj>wNzoNE(SsVJHbV3j8J@4VCN^&E__%^n=%r*{M9;lXc(wyN;joAt zAlh@Gx33yXjgZKXNty8eY}PaiF2wHuX^*F)_Jm006UROb>HLyXKekk_-!F)@P`Y;# zYT>t=?5p;_e!>K3vTr*^Mvda}R?_eE@wY32~G9x*7g&qZE4im}6{__TtMDZCuN&!Mdg zNWqB;3)e4xo6kNY^E~dSVSZy-F4WKm8`V#~Y(`*bKk+qQ`%*0@J_Vw!RE5&LATbJia0^su|RsZDK9z=C+;V;%s_gVkA0tJip5KR@M<9w z40@+GByhr4{vxm?S#D8{YBc^4A(4jGYufWnqVK?@OQIb*e@J6R&2`qeZN%&K{~G9m z%tf1Cf)9dcrHW8%h6ad%-a|rhE;bZG7Y*;M(7&T&-mEwz`ANM~V@-ZaiQj8;ra5nB z*|`vqBG=Z$=%PYuJm=<$N*SuWFW8LBb+q+w+WoRyU2!<`|!QCiQhU zxMhIU;SRW+tk!qZykeiLwK?3b34F~MKz3xajiaM;i3wXdnrp_oX?iHfQ?G;cGW(wyky8_&&Rn!3&*x>?A7r8n$Ow-a1z}Pc#YT!>u?!}Y9q}> zqUPfDNXkTVyw3_bUa7cPt+aGfB*FPY7wYCUrb!A`kYIyagF(qLW+C*64%HePoNTH$8l4Wk_!`W6B`dk*TeqNb# z)@A)F8X~ISDLOx~`BxZ21&0z+C0V}_Tu54V;~)q8m)OEX25V6u1G4q5ty?|fiQH3= zVDT@zn=)}8MLTlL4G$!IL(Cfte!sl#B z1;isRT)6OXd+9{O>%=Fkrhefe;*Hv*7;sxTp>0L20KN8k@i{0&JDqhi`{8 z@&*c`^>d!hM&6c#e71mk`QzGihJ707Q^9yyMwD#sKAmltXIDM6y!2E_PLm5kI8XEk zXJdPgqxoDs%yXOLmPT@Ie-aQDD*KDUQghmRbrM9(vL@~ zQ}Y_i1{*VG26aWzYEU3bjW$jU`^L_&>*|OcMd!gOwW#1*ZzjpJPLM6{b zV(R6qg3^x@pN(0Ufrb)$4j2OCRIScQN7^3-p{^5^Cm`+EOa2?$_T4#mx|1JiGn@Iq z-FYnArT*#+%4Et)QoRZV32R}^-|vxakx9$5k1&%aw#6AU^D)?X+ZBUh3)kw=21mr; z8`cpE>vR>R-sCu1tG55}Db_y(gtL=G(u<;_ZvIb$O^Nf<5?F zYVns}js7`ZNX0O&`tg}#<#98X@$&YE;^-Tj9NuVOuJE<+a?Cd;h_*x)d)3IR_2Zi^%n6XyK;6 z{l{CD1M-R4yZGG}HaVmuS%NO_Km-c+DU(XrvO1Q*v?0Mx+810cMPre!t7vfvkH(a^ zj(%}{^cD?$dZsww!LPc=Z+q&HQouB?9%zz;Vh38?V#6xtr*ULqjFHPFT~Sb<(D=3O zQ6{LMZiJT+J)C*FLNFlC_R-Fjc3?(Zlv%xUyp+dSOAEA`4VV4??GEPQviMt02XHHW zf{v4~)w2G#L9iBksRguL3fnn#EOaj5FeJy2aSx9&R?}5@-3WR7D1`?T!a;URTr-J zQQPqIo?SqOQAwrt7GZcx^Uw9+m~StR!m3vnw%smV6Je$JMq7*^~-H^!MGe*GH1Gx+H$-5MQMmK4zwq2;;VpJw1De ziWI9eu94ZS+X}8^Nf=T1Om+VP*~0BO+^FHucN?MH6{~Mjxk&U)kPT-sdrS?PX!4QrB?dS;L3eq{67A7pXPSR0zYw5l#qx0&ee+ zu3%+@`Ky;Oi-tZ`df$)mr5B&QMaFuGYCr|SP<290a&}J_JaB}%fr@Z|VmH@zSmegh zF0x++ktv45(NTyrounN(2kzNilplyqTBV(WL%A;<(s5&$Duo34104gqqPyGgt_U@y9+RWTC(7U(^aH`CXIJzDD zXCcR@dB~ON aeHfaBiFbb#GZ9I+1@9rc>3;`VfTix3%0LSiEk>r>C28o+ z+!onAt8yWTG;+?;8S{UcONw8x7pum*^%&vX28%PlpQ&|hu3L40J2Is*;VqQ8f2~(? zZp$^J!rv;BjyZ#&uO)uQ?ow2h?j9Q%RjT=z!@hCcLS?WzYNjn;XrIJ&joB04OsKZ; z*}If}zS(|V;s*M`C3;=uRPG%etM{Oe2jYUM8Q3IW?;C7-D8p0k^{B>mqn|1^amw2! z|C+9hJhk^?*Z;FrFFq7R*8ymF^15x}UYjcDw##y8}KSi+NW?Ahx+`*^Psnt+In3)Rb> z$I#cJ=JTAT_OCp`8dvWrCKw3)XLWU8Q5@a(s_uDD*@bX!7N`TJV@vB?m}%iiTXf7< zp^4+pYi)_z;%XWnEOO(Q#_3m}JpE=8VUyBOwPWk9-f3+Y0&Nd!$*}4KLiWnTy$$K} zttHZ$35!hbuxRN-28kGDyWXO8`9SE_YV{oB-LlyAF#{MoPak1}j?o_Ac%NlVC~%O8 zASLV4h&U*Z^n%8k>XC`!*LFskq!PzXh<~=9o3?&oCPpFo#{On(`e{h?4&Z_Yvz`0Evh&#~H)k^*dq$_A{UF+u8KB@e1&dqUqn~Ga z!{CWqgVZGSJxEgPQ>`rr+OUKqNp2EC*Kha_!-3jK|iDV#ZgAK0*tYr?hU3mg)X{YH=Xfn|k#ZYNIm zIuZ=u%A_WkvhIUgD0Gd{e6G>Gp;FVRXLcaim2ypzL&Gji&o(YR?iSci0YAdtyLfH* zs*XKg>`N!OJL_KLMA_dU7j}lCCFgGB3TQlgVQ{;S(#$4kmvjUCyQC_Y*rw8%TU?4m z)C3YLNKfQ630*a15r9w5zMd$Z*Q|t|p~8A*Nm&%2oeWcdEj`OJp^%uv_udEm@aIej zm*U}OS9vg5_&mzmkLizsbH*e}iFxSuM{O?Z=Gi)Tu*YoNSjOE@DR3sI(c&;GDlZ&{oHdZs}ys+gWqFLQrKn% z)G3?KE_KCH-Z!nD;7oJ_mQk5|SZqn>v+7zmjNxcLoEm;cRB;S_!x#@ z{F<%@Ke}rFwWQR8FY~>NUjPoV0f!|D_j07%_cbIks>P{6;X}CeDOP76_-cFYXyM_` zhRiMe?Jgme8UJkyR>ilIln>Cl6qfoWbD(>PHy-t8VZg3+eOPm38u$o+&0{;cLcM2H zdwfsf&snDDGs(+|JjHEz>JFtWagi^)U3KfkQWNRPf@1q?G+vm_1EqnD0&7GM=cS)7 zsS@SCPt)_Fduy1JRZZRg1=nx+9$}h;3%-Uz^vk1re;AVrRXADweZkJj6hW3_TmFtl zRBO0#nM_c{i4yTsG7s898v_JwPew-0z&^9M^k~}rbEan;{&m(E!(|@RVLqYJCiTta z|1X$rJwdE7RWk41WJ2$2`b57`h(=n- zD<^7Q=gN>S;Kt^fAkNa&*O0HyzYOslqI83%UMOs)+5`k(aa=`&ZDp!pF$S}7*~rMr z(bUIZg~$YehzCp~v5hrwOAl@|o+6&^eNr8Qg*W*NGs<+Nmk=nHgdj3y$tQ|s>19z1 zS&6n%(e91xzQQZ~?!Jj5y z^C1t+70axxb}_If2j*%B$~z+AND#ApAyi|c^&4?HKY8LQhi9rLN^o4eP0nA ztVe$!(Gq*HNRi2%GID|!H$P)sA#5r-u=2 z{TcKoo{f)N8%*~_EN-iEOyO-);+A!x%8<`B>6w>j?$Xya6>_Ulb8)1 z?j~e8igNe2%YHr9UXK5XL;|t{Mi**6J8btDnz6_$jznH^Omm>0B6%YfKFrSs6%2xi zR*(JWIEk17DRjOFHC8J+J(SK1KhR!>q|9wu0rWdE8!8dXprPEr0PphGSfdqnT~;d0 z!>6mW&M@)69SzLm>o@jQ(X5SMr3sk4zbAbLyd$Y61yW zZ$Ma^y1;(li09|tBkj0A!PzE;iJ(1$UJ4Gnd|IU3X5e88pUvU}_BR|`EgvY21DGHzh*a#tAa zmD;mg15v{%AAtNQhvFO>G2OeG)ml-{>8+;^v8|zs3X7`u2>gTPIwt=$jK1F8a0mA zzMmZkl!{lpv|#$}y@6G72SRv;a@ooGR$PF@#wWi<0rdJ*D+Q8CHz^P(l5a0sW)Xl4 z={GL%1H8E1nWH4-Kjpr_Ot7heEI#MJ9XxE^Q6K|W%RJ&cp|)IgUiVK~<&b(5QUh<$ zx?Bl$q*oF`Y9{ty7w=;VJ8iiUzGN{5@FHS-G*dzxD$ZM`aQEokkPZ-a9P^)4 zD9eTm0FhMkCl(lH(R7r+9kZ)8t_fJgs9W4@PzmoPm@Z46_VD7EFlRcw0j{1vU|5>o z#@HD@f1!-SndwHqE#i(}1c5{U*1;2M{JV?Tr>Hru&L&o^@GIdNG{y1FE3p?F7N|1P zoCj?J=NR)h0~J*+xN|mG!NBfG+p?v&m2eOq=OvgrkIr-!4Gx^kC- zwPew2BbR2nh&5eA(-MRNi9Jo3p2_!L;~E^v+CBJmKA`32%CxxSr$lC#h|wy2#rXqY z-YFA565x+p%I^=MDnm#!&qxelysN1!J}x^or^Pe2a_dN8TDrPj``UkB=W6u&MR=Ro zTXQXJ3mASp!~wXBU|zG&I(?f{O{6?&QfsiQ{OhFK7jbNI3Kq%u?!za`6#@*t9=DV2 z?eOwu<;dGWE7;c$T?w7xsH6d+%>GsWQ+`xkQFI~01H!CR_^SMGHyx~1YQAymj=3lP zb?1?tWKO*VSaY0y7oxqCUyE<*?3;-Ict3;>SUY(sSlJ6S$MuCag`CeGr30W;DJK`< zA~b`bwe3o6=Ku0wWVAHIpt{Ldv zPHHR`xc_dTHI!$F*n~=k%EuF>!mRxO&j(Czd9h9`%W2t2i-=PWp*_g#iH7NR2Olts z(gkH?deZ|jj5(#VbF6w%9IODms~1>_+I2y~0_Ez~s9jC;5D{ugb? z@RPPfBb%RyVg243Y%SF$&meP@O7}^_SSAp8oT1{=I=>Dj;h#- zZk(90#F}5gs)@dsaV&y{vOxAzEcJPn_|z&#_+r{1e!G3~pq^HcDtU40t%lYm3yrGg z@MX*A`x(|Er~h8BEUa9Qg$5zX8f+dE+hh3`Rk-QDISOn_?+*wZWtG7MDX1X;uU_?Lq z@}S?NvyAT=u6&+uho<1$(GF0X<=~dF%{AKY*j=2C+kzVOx7l_gl>ENiYz63KVkL`I zJGgaCob%alsp$Hijd{PH@64U7wvjPO4?*R=j;FvXJ#hNz!?-EqQmsLPd0QilDynao z6ZYk-8COO?XGhMz)u=E>Rk-|!&&YT@oxlRlvx zs55P_S41dl51F8{w)Q7EjNR{ zxP%Tu7(8Zn+xRiTgfZF6M6OCwIfzpO5kUwdNQ~TE9h6x4e%1Ea5=rH-bD}X9eylVRQOy~-oZ+k zlmaM7i=MP7#tn5HML>$c!2mFHFgwCKcbVq+U+#k(}J|{dL zMq7?o#afSphotBO4l*J23#OHbQ>*7Af*MVpRH4R|xF*EoRCYZ{3s%}pix)Su W-Oa1 z;3_j|1*m*g@Kx^%%$B%yzrwk!#3rfo~vx}nZs8=r`l89d}($# zqE6F01iZ1P0M8GJk^VbpAceWQp7*);V>xpuSa;rB3%Rq28UK%4&hVg{jneQ!F>UaQ zf1Jc34hE;f&);0+P1jLg@iQXCp=0;`c1W|LtZ*_ny=Aa_k>w`oJG5S=)N7eH5TEP& z5&hsXT>)<>Dbn5z_=?o!Nl6$Dt@qnzr+&*2IpPvxOp6)J3yuOuPJCq-z0B`85YGf_ z4>XtB;~e52#QZrS@&oagRbE>uRMe5RJV>WWKb@T&!-t!L2>g->q|tBbfa+AzQna@6 zkFA~*A)E@CvdSEz&Vln)K>$TIJ_aGBP+>tn4^_4T+&Uv$N;oP4UiYH(z;lBv#ySKR zJM9}2!-Nz(f-CvVbm$43@YNLTZDm4#eczQNU`Mp8y9^gQNZO(%qp8RQSsr!-Wik3# zw7)PgGF{W&wu+&x+ZA>K46pKzW^lKH+KI6S4ld)9>i;Hr)(U&NETS76<%VKyF{Qw8I z9A9^iiuINvr|xr5B>Up%ve$kngwa0%NzhKT7wwB%?UCh}@-saT7OlWWk7{1Dhk6wY zD(KXUXekE$-(3lGu|M@^cIg7Ny)c36Gnj>4G~7P@fc3PW8JPsF=R0K%{qG6CB6OAi z1d4rIKatZVJNWscRdB7y0&Z&Bk|s>ATU#gWr%Sn zbf7qi7Dx$DeMwGHLWgN?6`yeopDQyvEk@WJ2ahnnI#CI`396k_@YlVf zac5e|ZgxvK*nR(vcIb9-mB@d*M`af3rCe*Hb-Jz#{>mAr5no4S zdlpWLUU+V}?gx;ZHGA{XxUoP{3VL2IwpL>eO;Y=+%@9TLAVRi6_yD)V^$iF@yM+xy z_&oPmHCkz&^N63@3xxvF-(YPP@VUzR;Syn+!z}hY8&Wd$Nw_)U=rXO1c3i;bP3Y|` ziAQ>@$K8%-g)i|;*j%@TdpUol_qd!x{cXG)MA93g363b*GMgn~^g6UboEAibmGG%+ ziKIIEV29lP>Ih~P$s8xRR-Z6b{}DrwJE%oyig4ZV%{cNpa?yH@(7&zu)U|^Od+FVz z$~hfBnYhg|z~vKC#sUyPq)jdA1MILyK^=&vwGDo6tXRsu`W>vN427A7f$rW5@`4#? z>lU`9eeB*ocvbaV!3p@#?<#VHIjVetY@eHRl#n`6fMUPZ9p57Rd2nxPN@o@8E=Oqg zV$ap_9vz*U+#7Xt8e^z;llKhTTFmI9n5nB2LX5NwHkF2B6hZ+d4!LtM@YqS+zgKu% zVeF!eC%Gie90@2pQX+#ZSCbYpb@h{G~jR!?E zSxU#RX_VmnoI5uJ3??;3kUt_PNG=_*Nt8QsB--}dv1QO=Xa ze90+}Ig$}DG>PDsw5C`nscknOY$0@-Dn2VjGV=B%9pSdyCpbopf3T3C#!1)^lSMWh zvsNLslbSxy2L`)o5qiZ@=A<**?rRVO{QIZ6sDrj8B;_e_R;UD($$T6|r)+c#n}Z#0plyXO2P zikKxdXKC$W>NO8Ru9je^^>CxA2vkP&Do66^9iSBH1gOJEFyi-U_g=Vs${JY5EZSTL z$}8*kwBXT8m?8(z&_AyY{uO_xm3co)_PN7de?%#Xar+)v_c#QFi5Xu+zUx11(kO^T zJNthdhoA~wG1S>~kcAAVu(UN+#B3nJ3mz9+9actrP#N!6vC(6Ra8k6S&jNAEg@g?Y}N~>C&D9SN7l;aVyz`YxP^gp!=FOyn^s|x{OwgT zSH$gi^d=bDawFb)8i)p0LTHhZM(@G}-e~b*`KG=geE|W1M@m-PHhLc*z(tUmH3s4t>sOE{)`RT1=L%qacXv^O}N43 zlL(L`ba8m>3WdFCDxm5iEZJpQ$Jls+dbJi&3>N-cKBI>2v=g%rl&5dTC|ic1QRW?x zj^>`luzp4XOyt9f4k&BF6W#xjW+8*-IK;ndzu20InJ5gh+M^Odw6Uf91*Z9sX+KB6 z#qK}>BgEOBQ2QgPXi`(5FYmJ@IXboBg>ao^lRxG=9aPF~`2e@Tu-LoQ*}HRl*Gn11 z2Bpk-I5Xpr!BU~)p*W*lbhx_`%k}ERwKZnUR@XroCV`q{`JpP&aix4llCo5r+If2# zx!9}rI+D(7TFcK_15Ow13loNk~4lQPeBTu0Y1ICcrPPc z)m^JY9J81N4gV~~kiKihD9N5kD3L$k;63F=+VYM4|NJjdw#Azvke`=0%2Q{v@Ak=U zgx33sZaYjn@T6<|#IOW(EjkVHv4bqUrcBi$iN>B-TB4@irFoPiDFwxFOrp=A8TTdJOUM875)lXphc+lX#7}i*aye-8 zmcnl>uswJ}nhye(r+MM{qHQ=8P&1~);7a(s`}P;L}76fYoe%KEXN>ns0lw}CiPc4UOuwzF7K$dBdn zrh`5`nV1ePT;9e`_8t6ZxF5nGlGuAMk?debhpE)wg7SmmgEAi3?qUmG=$G++m5z8P z>b{?8WhZjMY$z4!n*psu7Flr~hdsg+=@+RhWMZ1I6d`~z!sr*_+V%?2dNGO#wH%ji z{eJ?(vN!aYZfUEiYZ`S`+5yU5Q(1o=)wq+~Zw+O>iz&DYv&HaED^c{GF5-OFv6}hT zqyE)QS&|5nU}V=>h8Z4JMx|GW6Y}>qvL#JVo{va z7+FhYVNy5I^?LvDA{dg0hs>Wtkm4&1<}eF}K4COVi^MN8`2l~#qaY|MW=fiSI<9l9 zdBqK5TV+`HbUMsPyfwfMRV2b6*|PXlC36+cxZh7f-b|d2W}PYc#wH((Om5Cy-R*&t zDH(Zntilj`-4YTTDmv!$ZH|oH%0uB=MGFt8CAe!CLw1B+u z=~-Xr+gE)i@Z?TBn(~u}c=AYeJDX!A9N0kJBF?nOI=_gGxOW8Cx#@wq$L?$q zhyy%_793hpq)4*?CJ~b?yAC(y?Lxt|R%{L;QuzDNX0#-mI3IFT~-3i=Q9d?4!(Qn-EdWZKvFId;e!CdI=7PNuL3?ZDPDqv zgp4xJpHyD;vxF*!xowiQU<;9QvOQAbPlz~h`d#yEjRTC7a7>SxBuiD^w^#r=+{W*P ztO8~w{czI!iK7&8zkVm|6!ZLrDZ2LwZir~)82ZMPZi+wh;(oT7;aw%xDmnKO?~bO( zNRdFM9W5yCmj5a=Es_bbeFMRvCEXTh@ghP!eZXq<^cpa<^9*7mQap54dDQM)dq%f-%m8Zu2afBX~S zs>FZ}re30_EsaBmBr|Qu4V2zQ9$v)+E9HYM{Ip=~I)ukH3mC$zMz8!$(du?c-c5%o bCTfutxn6vp(KUc0jc>ypD39C{5>E_s+agJH literal 4121 zcmV+!5a#cU4Fm}T0zk=%JVB|uzv$BH0Vu7!mTFNceLhEu@5nW;Y2mem6Ro`}U9Vc- z;$SXeV2aZmycN+kH%cSp4!G}C%1aoQj=|-w{%cMWGpA9|-NEOFvsATnzq+_KV|d`6 zG<=xRB0R5}Cf@7KOY*SGSb}C3p3}yq;|gqDX`!|{u}mY-qYC%fkOvL)5qiBH7=ws8 z6W20F%R~HQ=xXw)A+mi{7z)4HpX-^1m`qV{K;Nr;$J!pqAR?M@^4QtcqW!}5KSy{$ zh-BG9Q6jaTQIBK30`s@7j!(0=-f{) z6X=H!$Nj6(^J+kkP=YzAb`>d%KD_l^@3+~>wu&t*Y^nmO%gxSDn>i)(&+U!gLDgs~UvNyyJaSFl97-d@)>inUo)hxP4`RfkG zN?vsv2D1j_k%O(xC>N#!Pni9xy4lczDo1653Qx(+;GjpnWys0|!%pSc0-yDV%d@Ir zm@!VOprE6cveoSWamD5yh#W+V#qXBRKwL`okw>OwzuBzT zM1ld~x0>xO`}lG#pf%TObmMb@Lo}P1)ry=II+ziLFXe zs*6{DTPH|s#4bb;Qjv8Lv(EabIcKr#E(}yWt@t41bu5uO$XMJk!Z~P!kiYuYj1M>N zm$8bS+XBsH*oHb%WYLXIYC3GkJ^x_)kuO1@tyRD^r8%3)&29(07X5Xzaw}Fxr)N_` zw2QX}(#&%$u6N+&IwJX@^E)&RI7 z8U=N6pf~i#wca6q@irE-!F4U)ehr5A>?6IKmKAWH0mQKW)nB5^&DU@gOmtDd(~91fvp8P5Fe zJHzAxh@Aq$5I)R#OUm;IL`6IKHhwJJYmg76jqW5WpI%y&{RmH*q7IX*oD;>u^L1&+^6;H_~@)W?nmY43)(E zi?j+^q)SL5&?!B@;RaRpz9g0p1;ywf6!5~vg6eBG0J#TaDwEi7?E-f*L&j~n@m;$$ zzz%#I7j-Y$8h>&Fb|Dofmt&!mew_l_@9^%Vn_TbVWDs@;Ul1;)fPw)K*zuf&5n+xh z2KL+;9(=sJ|KC((_0mDFVqgt#10T%^-}p#$k5W9~$~FBMnJdel0}ph+3S!UcF2;5P z(TA|suWs-A66)E*VARc=XA+EK?k;5316+?4N{<0`y!>v>A0fPoKjgvxXR^w9g1b2) zNkUo?9QlNgiM$1~*PNXbbeu4-!y!-DA3$H146RhJM_>n)XO${Ei(N#c8owD6^5~H5wQwy;nu_zEXZtshmAXwsOIo; zH{nS*r%Gau;V)JMjUE>kUtf;T0q)v()FtG!7HrUqwl+-U@*>0SAp-Kqiy2?Jf7ET9 zVv<8W`CONet6R5*yH(O0@N-}WOYV6Qj1v-^UV~5QzO^v==2cLVHP{f2gYo5cT;R3# zxJy4B4cpF}$$^gZJ(P;@PRGlCoqGn&Z}P+w_A`Nbb7qiT$w@2i1S$hVfY$qq@SO_V zswa#pbef+UlUMi;HVWjazC(`l?CiZ&tc>_j116%dl{VpE@*lRfV&JHMIm>GAluIbb z#WJBzfq|1hQ>$WV2o?;jr~7n|twg(wb@o_TXTHl5@dTX?j>ZCcV>jE0Y+pTVBrTXR zm#*)|N*GWyrQ^$y)jA>iR%H8$V2*PzZUK-h)4nXL~A*N`I0SpCjR$N2ha1j$fC@JHTtXsx4$e$dvZ)3pi#V;!~NI7HDAJ~BtF@!eLug%POVOddhxrc4OK+W1mpjg zPM=Ck3J`+;qA}S)V@a6W9PeE{Y+TUJX7cq|l$w-(PWUs?Nna9+3%D%Svg)9AoR$B@ z7RvZ@R0PU20D{L@GS;}`r%Jbio5NvNTFzY(0iR{DZAtvKY%8e5vciFQ6tpQ&SJcGE zU^uW)ZzNgp>2=V2W#M#j==U*Uqd9ef0D)W+;ez$o@>yX6Jiy4Ciu%$!fzZks!@IfA z&tn4fIz3>hDsL0`wwr!Eyo)3KF1!y?7x}}PY#e*KUpxr|PBgq8x+Dw7wB?n@j19=E zD~1uzHX#F}&7Th<*7wJ;I7kdx@Fr;mlMpp`%O9JrzYm~xCZz)`wG#h`yX6!DfE`Fy z&`lR+@x8)Z+LcA>DHpy4+kv+*!d;uaW_!HUGxrq=jaShL|8WPvW#nU)Yk4>2T!baO z^V>lNlFEFh+Nfx~Esx7z0!dA28{7Hsuo>_OKNhwt?d9^KQ}eCVbmHFWQ=D!9XJnA+ z5Aaf7mH11&RLQx%tT!O$WLsyS0Z3%3_nIugo#`bSu~K`KY5HOVSGRqUI~i=yB$cpL zQTd-y&Sui9jz^4}jJz0X%0Ikp1b;OVJ5Ucv!&x)2VK4$dWg7rT)RWA4s0u8_rm@hz zb}CVA#sh(3V8s1X0{$sq$c8HqYXPUg4p;K1e+U}8l1;^PLG36{2&~g#TvmX9D#8qi zFFUYd?^Vo3=6`=2vrFuKk(E9Y8tqh~9V2eLh0T2QP@ zzfK!hK#rKfLEWq4YCzScX%$0AS&`ggWSs2qo*h2<)^KK)d?mSFrhRl#|0EQ z&!RLQGX^@y3OlZnn7a1j_x)&}Zh*m7TKQvzu)7$#Zg8e4|FMybEe*JPJ(Ns6mx3p) z#k-!$a%bD_d3)(M_|U8t z0+v(kZbxl^XR$nxiH(LUB`9mNdfu9BX*x5kocAMy3|p#s=XFS?_&5P#RmZ>MNbzCd%F6@nEz_y&6h=i4h8*xKNUJ}co+ z=!ikB`V=N_dxE6>oMy`9PdJ?M`?C;}e9c8?iPzLOX?Wlk5kGNShJTM%(`e~!x^FwN zG`zL-iQ&xiE6*td?|6&_)_oW^q7iy7nZAeM@uAwQ;l#vzk59$$95QB!Y0!aS`;8UKHqsPoEho5|E?49;}f zLdW;Hk==FMZHf-ok|q|-_6M$PR+z17dx|_us9uCBKI5=1%6I#r9g@t;3QoPOlZI)N zts41z0wVK(3{+BzjID}lT3cx|a`tCV;SgzuDhT6C`NX_CGg88}u5MIP=3JqbxV5aB zysNU^meJzIwr3`6StOqjs$FUuVP4btY1*9pi7N0v`a~f-%}Fz<_bxNzZkWy>u_JN7 zq*e*rF=VpZUfYD1_`#;!;nzkofbJ07rnH#XH^xK~0)juRzrzgSzP8?yXYFU?0+0r@k zp?FBn5ugEgS&?|f;-Nw9<H6Vc#vIKrl3XFa&| RU}8K-(2ZF`Pta_q*P7)% z(9b#HqnhUjX8vyRPTnLMvm0?`vCA+&RYYr0qKK(EN6whs*A=1REP-qubfhG%0xTsh zwn4{cBeNa9b^Da$Y1in6DHF=`@4muN4RHEl<$4YfVpc|=#TR5e)oCjV?Qn=b%*f?BK^yM?nPje zVe&$T6pjRl!>c6qCf)^((@Mmgj(8UmmyaRk)LxC%cL#1+GxlBx*f+YH+INrF_aPyQ z+*t4`-`IDQ3fuor`(_XUTYNT#IiAq3-nRTQ1>xhdQZBvp^1~{&)#rR?CHZGUo!d`X z^W7;RJEV8Me1oKCtCw7$?{U@{qzVBF$A0+#tm`t$6!oJL-SRC#PBM-(jEhT^+pY69)s zMg9)*Uu_BWVd?y=3$AKsyU!GcnnIHUYyRrU<&{}N!M9I`DWflCiIHG8uJGoW_hLIs*=SK>z diff --git a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnector.java b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnector.java index ee48d9752..f1af664b5 100644 --- a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnector.java +++ b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnector.java @@ -210,8 +210,11 @@ public Config validate(Map connectorConfigs) { } // If private key or private key passphrase is provided through file, skip validation - if (connectorConfigs.getOrDefault(Utils.SF_PRIVATE_KEY, "").contains("${file:") - || connectorConfigs.getOrDefault(Utils.PRIVATE_KEY_PASSPHRASE, "").contains("${file:")) + if (connectorConfigs + .getOrDefault(Utils.SF_AUTHENTICATOR, Utils.SNOWFLAKE_JWT) + .equals(Utils.SNOWFLAKE_JWT) + && (connectorConfigs.getOrDefault(Utils.SF_PRIVATE_KEY, "").contains("${file:") + || connectorConfigs.getOrDefault(Utils.PRIVATE_KEY_PASSPHRASE, "").contains("${file:"))) return result; // We don't validate name, since it is not included in the return value @@ -244,6 +247,28 @@ public Config validate(Map connectorConfigs) { case "0013": Utils.updateConfigErrorMessage(result, Utils.SF_PRIVATE_KEY, " must be non-empty"); break; + case "0026": + Utils.updateConfigErrorMessage( + result, + Utils.SF_OAUTH_CLIENT_ID, + " must be non-empty when using oauth authenticator"); + break; + case "0027": + Utils.updateConfigErrorMessage( + result, + Utils.SF_OAUTH_CLIENT_SECRET, + " must be non-empty when using oauth authenticator"); + break; + case "0028": + Utils.updateConfigErrorMessage( + result, + Utils.SF_OAUTH_REFRESH_TOKEN, + " must be non-empty when using oauth authenticator"); + break; + case "0029": + Utils.updateConfigErrorMessage( + result, Utils.SF_AUTHENTICATOR, " is not a valid authenticator"); + break; case "0002": Utils.updateConfigErrorMessage( result, Utils.SF_PRIVATE_KEY, " must be a valid PEM RSA private key"); diff --git a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java index f9abfc9fa..592e77162 100644 --- a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java +++ b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java @@ -71,6 +71,10 @@ public class SnowflakeSinkConnectorConfig { static final String SNOWFLAKE_DATABASE = Utils.SF_DATABASE; static final String SNOWFLAKE_SCHEMA = Utils.SF_SCHEMA; static final String SNOWFLAKE_PRIVATE_KEY_PASSPHRASE = Utils.PRIVATE_KEY_PASSPHRASE; + static final String AUTHENTICATOR = Utils.SF_AUTHENTICATOR; + static final String OAUTH_CLIENT_ID = Utils.SF_OAUTH_CLIENT_ID; + static final String OAUTH_CLIENT_SECRET = Utils.SF_OAUTH_CLIENT_SECRET; + static final String OAUTH_REFRESH_TOKEN = Utils.SF_OAUTH_REFRESH_TOKEN; // For Snowpipe Streaming client public static final String SNOWFLAKE_ROLE = Utils.SF_ROLE; diff --git a/src/main/java/com/snowflake/kafka/connector/Utils.java b/src/main/java/com/snowflake/kafka/connector/Utils.java index 8fcd22d12..0b16b7e3b 100644 --- a/src/main/java/com/snowflake/kafka/connector/Utils.java +++ b/src/main/java/com/snowflake/kafka/connector/Utils.java @@ -25,23 +25,42 @@ import com.snowflake.kafka.connector.internal.BufferThreshold; import com.snowflake.kafka.connector.internal.KCLogger; import com.snowflake.kafka.connector.internal.SnowflakeErrors; +import com.snowflake.kafka.connector.internal.SnowflakeURL; import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig; import com.snowflake.kafka.connector.internal.streaming.StreamingUtils; import java.io.BufferedReader; import java.io.File; import java.io.InputStream; import java.io.InputStreamReader; +import java.io.UnsupportedEncodingException; import java.net.Authenticator; import java.net.PasswordAuthentication; +import java.net.URI; +import java.net.URISyntaxException; import java.net.URL; import java.net.URLConnection; +import java.net.URLEncoder; import java.util.Arrays; +import java.util.Base64; import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.Random; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; +import net.snowflake.client.jdbc.internal.apache.http.HttpHeaders; +import net.snowflake.client.jdbc.internal.apache.http.client.methods.CloseableHttpResponse; +import net.snowflake.client.jdbc.internal.apache.http.client.methods.HttpPost; +import net.snowflake.client.jdbc.internal.apache.http.client.utils.URIBuilder; +import net.snowflake.client.jdbc.internal.apache.http.entity.ContentType; +import net.snowflake.client.jdbc.internal.apache.http.entity.StringEntity; +import net.snowflake.client.jdbc.internal.apache.http.impl.client.CloseableHttpClient; +import net.snowflake.client.jdbc.internal.apache.http.impl.client.HttpClientBuilder; +import net.snowflake.client.jdbc.internal.apache.http.util.EntityUtils; +import net.snowflake.client.jdbc.internal.google.api.client.http.HttpStatusCodes; +import net.snowflake.client.jdbc.internal.google.gson.JsonObject; +import net.snowflake.client.jdbc.internal.google.gson.JsonParser; import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.ConfigValue; @@ -62,6 +81,10 @@ public class Utils { public static final String SF_SSL = "sfssl"; // for test only public static final String SF_WAREHOUSE = "sfwarehouse"; // for test only public static final String PRIVATE_KEY_PASSPHRASE = "snowflake.private.key" + ".passphrase"; + public static final String SF_AUTHENTICATOR = "snowflake.authenticator"; + public static final String SF_OAUTH_CLIENT_ID = "snowflake.oauth.client.id"; + public static final String SF_OAUTH_CLIENT_SECRET = "snowflake.oauth.client.secret"; + public static final String SF_OAUTH_REFRESH_TOKEN = "snowflake.refresh.token"; /** * This value should be present if ingestion method is {@link @@ -105,6 +128,19 @@ public class Utils { public static final String GET_EXCEPTION_MISSING_MESSAGE = "missing exception message"; public static final String GET_EXCEPTION_MISSING_CAUSE = "missing exception cause"; + // OAuth + public static final String TOKEN_REQUEST_ENDPOINT = "/oauth/token-request"; + public static final String OAUTH_CONTENT_TYPE_HEADER = "application/x-www-form-urlencoded"; + public static final String BASIC_AUTH_HEADER_PREFIX = "Basic "; + public static final String GRANT_TYPE_PARAM = "grant_type"; + public static final String REFRESH_TOKEN = "refresh_token"; + public static final String ACCESS_TOKEN = "access_token"; + public static final String SNOWFLAKE_JWT = "snowflake_jwt"; + public static final String OAUTH = "oauth"; + public static final String REDIRECT_URI = "redirect_uri"; + public static final String DEFAULT_REDIRECT_URI = "https://localhost.com/oauth"; + public static final int OAUTH_MAX_RETRY = 5; + private static final KCLogger LOGGER = new KCLogger(Utils.class.getName()); /** @@ -440,11 +476,51 @@ && parseTopicToTableMap(config.get(SnowflakeSinkConnectorConfig.TOPICS_TABLES_MA Utils.formatString("{} cannot be empty.", SnowflakeSinkConnectorConfig.SNOWFLAKE_SCHEMA)); } - if (!config.containsKey(SnowflakeSinkConnectorConfig.SNOWFLAKE_PRIVATE_KEY)) { - invalidConfigParams.put( - SnowflakeSinkConnectorConfig.SNOWFLAKE_PRIVATE_KEY, - Utils.formatString( - "{} cannot be empty.", SnowflakeSinkConnectorConfig.SNOWFLAKE_PRIVATE_KEY)); + switch (config.getOrDefault(SnowflakeSinkConnectorConfig.AUTHENTICATOR, SNOWFLAKE_JWT)) { + case SNOWFLAKE_JWT: + if (!config.containsKey(SnowflakeSinkConnectorConfig.SNOWFLAKE_PRIVATE_KEY)) { + invalidConfigParams.put( + SnowflakeSinkConnectorConfig.SNOWFLAKE_PRIVATE_KEY, + Utils.formatString( + "{} cannot be empty when using {} authenticator.", + SnowflakeSinkConnectorConfig.SNOWFLAKE_PRIVATE_KEY, + SNOWFLAKE_JWT)); + } + break; + case OAUTH: + if (!config.containsKey(SnowflakeSinkConnectorConfig.OAUTH_CLIENT_ID)) { + invalidConfigParams.put( + SnowflakeSinkConnectorConfig.OAUTH_CLIENT_ID, + Utils.formatString( + "{} cannot be empty when using {} authenticator.", + SnowflakeSinkConnectorConfig.OAUTH_CLIENT_ID, + OAUTH)); + } + if (!config.containsKey(SnowflakeSinkConnectorConfig.OAUTH_CLIENT_SECRET)) { + invalidConfigParams.put( + SnowflakeSinkConnectorConfig.OAUTH_CLIENT_SECRET, + Utils.formatString( + "{} cannot be empty when using {} authenticator.", + SnowflakeSinkConnectorConfig.OAUTH_CLIENT_SECRET, + OAUTH)); + } + if (!config.containsKey(SnowflakeSinkConnectorConfig.OAUTH_REFRESH_TOKEN)) { + invalidConfigParams.put( + SnowflakeSinkConnectorConfig.OAUTH_REFRESH_TOKEN, + Utils.formatString( + "{} cannot be empty when using {} authenticator.", + SnowflakeSinkConnectorConfig.OAUTH_REFRESH_TOKEN, + OAUTH)); + } + break; + default: + invalidConfigParams.put( + SnowflakeSinkConnectorConfig.AUTHENTICATOR, + Utils.formatString( + "{} should be one of {} or {}.", + SnowflakeSinkConnectorConfig.AUTHENTICATOR, + SNOWFLAKE_JWT, + OAUTH)); } if (!config.containsKey(SnowflakeSinkConnectorConfig.SNOWFLAKE_USER)) { @@ -704,6 +780,127 @@ public static String formatString(String format, Object... vars) { return format; } + /** + * Get OAuth access token given refresh token + * + * @param url OAuth server url + * @param clientId OAuth clientId + * @param clientSecret OAuth clientSecret + * @param refreshToken OAuth refresh token + * @return OAuth access token + */ + public static String getSnowflakeOAuthAccessToken( + SnowflakeURL url, String clientId, String clientSecret, String refreshToken) { + return getSnowflakeOAuthToken( + url, clientId, clientSecret, refreshToken, REFRESH_TOKEN, REFRESH_TOKEN, ACCESS_TOKEN); + } + + /** + * Get OAuth token given integration info Snowflake OAuth + * Overview + * + * @param url OAuth server url + * @param clientId OAuth clientId + * @param clientSecret OAuth clientSecret + * @param credential OAuth credential, either az code or refresh token + * @param grantType OAuth grant type, either authorization_code or refresh_token + * @param credentialType OAuth credential key, either code or refresh_token + * @param tokenType type of OAuth token to get, either access_token or refresh_token + * @return OAuth token + */ + public static String getSnowflakeOAuthToken( + SnowflakeURL url, + String clientId, + String clientSecret, + String credential, + String grantType, + String credentialType, + String tokenType) { + Map headers = new HashMap<>(); + headers.put(HttpHeaders.CONTENT_TYPE, OAUTH_CONTENT_TYPE_HEADER); + headers.put( + HttpHeaders.AUTHORIZATION, + BASIC_AUTH_HEADER_PREFIX + + Base64.getEncoder().encodeToString((clientId + ":" + clientSecret).getBytes())); + + Map payload = new HashMap<>(); + payload.put(GRANT_TYPE_PARAM, grantType); + payload.put(credentialType, credential); + payload.put(REDIRECT_URI, DEFAULT_REDIRECT_URI); + + // Encode and convert payload into string entity + String payloadString = + payload.entrySet().stream() + .map( + e -> { + try { + return e.getKey() + "=" + URLEncoder.encode(e.getValue(), "UTF-8"); + } catch (UnsupportedEncodingException ex) { + throw new RuntimeException(ex); + } + }) + .collect(Collectors.joining("&")); + final StringEntity entity = + new StringEntity(payloadString, ContentType.APPLICATION_FORM_URLENCODED); + + HttpPost post = makeOAuthHttpPost(url, TOKEN_REQUEST_ENDPOINT, headers, entity); + + // Request access token + CloseableHttpClient client = HttpClientBuilder.create().build(); + for (int retries = 0; retries < OAUTH_MAX_RETRY; retries++) { + try (CloseableHttpResponse httpResponse = client.execute(post)) { + String respBodyString = EntityUtils.toString(httpResponse.getEntity()); + + if (httpResponse.getStatusLine().getStatusCode() == HttpStatusCodes.STATUS_CODE_OK) { + JsonObject respBody = JsonParser.parseString(respBodyString).getAsJsonObject(); + if (respBody.has(tokenType)) { + // Trim surrounding quotation marks + return respBody.get(tokenType).toString().replaceAll("^\"|\"$", ""); + } + } + } catch (Exception e) { + // Exponential backoff retires + try { + Thread.sleep((1L << retries) * 1000L); + } catch (InterruptedException ex) { + throw SnowflakeErrors.ERROR_1004.getException(ex); + } + } + } + throw SnowflakeErrors.ERROR_1004.getException("Failed to get access token"); + } + + /** + * Build OAuth http post request base on headers and payload + * + * @param url target url + * @param headers headers key value pairs + * @param entity payload entity + * @return HttpPost request for OAuth + */ + public static HttpPost makeOAuthHttpPost( + SnowflakeURL url, String path, Map headers, StringEntity entity) { + // Build post request + URI uri; + try { + uri = + new URIBuilder().setHost(url.toString()).setScheme(url.getScheme()).setPath(path).build(); + } catch (URISyntaxException e) { + throw SnowflakeErrors.ERROR_1004.getException(e); + } + + // Add headers + HttpPost post = new HttpPost(uri); + for (Map.Entry e : headers.entrySet()) { + post.addHeader(e.getKey(), e.getValue()); + } + + post.setEntity(entity); + + return post; + } + /** * Get the message and cause of a missing exception, handling the null or empty cases of each * diff --git a/src/main/java/com/snowflake/kafka/connector/internal/InternalUtils.java b/src/main/java/com/snowflake/kafka/connector/internal/InternalUtils.java index a58f04589..a3e40b8bb 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/InternalUtils.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/InternalUtils.java @@ -30,7 +30,8 @@ class InternalUtils { static final String JDBC_SSL = "ssl"; static final String JDBC_SESSION_KEEP_ALIVE = "client_session_keep_alive"; static final String JDBC_WAREHOUSE = "warehouse"; // for test only - + static final String JDBC_AUTHENTICATOR = "authenticator"; + static final String JDBC_TOKEN = "token"; // internal parameters static final long MAX_RECOVERY_TIME = 10 * 24 * 3600 * 1000; // 10 days @@ -112,29 +113,37 @@ static String timestampToDate(long time) { * config. It assumes the caller wants to use this Property for Snowpipe based Kafka Connector. * * @param conf User provided kafka connector config - * @param sslEnabled is sslEnabled? + * @param url target server url * @return Properties object which will be passed down to JDBC connection */ - static Properties createProperties(Map conf, boolean sslEnabled) { - return createProperties(conf, sslEnabled, IngestionMethodConfig.SNOWPIPE); + static Properties createProperties(Map conf, SnowflakeURL url) { + return createProperties(conf, url, IngestionMethodConfig.SNOWPIPE); } /** * create a properties for snowflake connection * * @param conf a map contains all parameters - * @param sslEnabled if ssl is enabled + * @param url target server url * @param ingestionMethodConfig which ingestion method is provided. * @return a Properties instance */ static Properties createProperties( - Map conf, boolean sslEnabled, IngestionMethodConfig ingestionMethodConfig) { + Map conf, SnowflakeURL url, IngestionMethodConfig ingestionMethodConfig) { Properties properties = new Properties(); // decrypt rsa key String privateKey = ""; String privateKeyPassphrase = ""; + // OAuth params + String oAuthClientId = ""; + String oAuthClientSecret = ""; + String oAuthRefreshToken = ""; + + // OAuth access token + String token = ""; + for (Map.Entry entry : conf.entrySet()) { // case insensitive switch (entry.getKey().toLowerCase()) { @@ -156,21 +165,57 @@ static Properties createProperties( case Utils.PRIVATE_KEY_PASSPHRASE: privateKeyPassphrase = entry.getValue(); break; + case Utils.SF_AUTHENTICATOR: + properties.put(JDBC_AUTHENTICATOR, entry.getValue()); + break; + case Utils.SF_OAUTH_CLIENT_ID: + oAuthClientId = entry.getValue(); + break; + case Utils.SF_OAUTH_CLIENT_SECRET: + oAuthClientSecret = entry.getValue(); + break; + case Utils.SF_OAUTH_REFRESH_TOKEN: + oAuthRefreshToken = entry.getValue(); + break; default: // ignore unrecognized keys } } - if (!privateKeyPassphrase.isEmpty()) { - properties.put( - JDBC_PRIVATE_KEY, - EncryptionUtils.parseEncryptedPrivateKey(privateKey, privateKeyPassphrase)); - } else if (!privateKey.isEmpty()) { - properties.put(JDBC_PRIVATE_KEY, parsePrivateKey(privateKey)); + // Set credential + if (!properties.containsKey(JDBC_AUTHENTICATOR)) { + properties.put(JDBC_AUTHENTICATOR, Utils.SNOWFLAKE_JWT); + } + if (properties.getProperty(JDBC_AUTHENTICATOR).equals(Utils.SNOWFLAKE_JWT)) { + // JWT key pair auth + if (!privateKeyPassphrase.isEmpty()) { + properties.put( + JDBC_PRIVATE_KEY, + EncryptionUtils.parseEncryptedPrivateKey(privateKey, privateKeyPassphrase)); + } else if (!privateKey.isEmpty()) { + properties.put(JDBC_PRIVATE_KEY, parsePrivateKey(privateKey)); + } + } else if (properties.getProperty(JDBC_AUTHENTICATOR).equals(Utils.OAUTH)) { + // OAuth auth + if (oAuthClientId.isEmpty()) { + throw SnowflakeErrors.ERROR_0026.getException(); + } + if (oAuthClientSecret.isEmpty()) { + throw SnowflakeErrors.ERROR_0027.getException(); + } + if (oAuthRefreshToken.isEmpty()) { + throw SnowflakeErrors.ERROR_0028.getException(); + } + String accessToken = + Utils.getSnowflakeOAuthAccessToken( + url, oAuthClientId, oAuthClientSecret, oAuthRefreshToken); + properties.put(JDBC_TOKEN, accessToken); + } else { + throw SnowflakeErrors.ERROR_0029.getException(); } // set ssl - if (sslEnabled) { + if (url.sslEnabled()) { properties.put(JDBC_SSL, "on"); } else { properties.put(JDBC_SSL, "off"); @@ -197,8 +242,9 @@ static Properties createProperties( } } - // required parameter check - if (!properties.containsKey(JDBC_PRIVATE_KEY)) { + // required parameter check, the OAuth parameter is already checked when fetching access token + if (properties.getProperty(JDBC_AUTHENTICATOR).equals(Utils.SNOWFLAKE_JWT) + && !properties.containsKey(JDBC_PRIVATE_KEY)) { throw SnowflakeErrors.ERROR_0013.getException(); } diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceFactory.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceFactory.java index 36ec22d26..52a531701 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceFactory.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceFactory.java @@ -71,8 +71,7 @@ public SnowflakeConnectionServiceBuilder setProperties(Map conf) this.proxyProperties = InternalUtils.generateProxyParametersIfRequired(conf); this.connectorName = conf.get(Utils.NAME); this.ingestionMethodConfig = IngestionMethodConfig.determineIngestionMethod(conf); - this.prop = - InternalUtils.createProperties(conf, this.url.sslEnabled(), ingestionMethodConfig); + this.prop = InternalUtils.createProperties(conf, this.url, ingestionMethodConfig); return this; } diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeErrors.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeErrors.java index 9dc4424f4..61710efc4 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeErrors.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeErrors.java @@ -107,6 +107,26 @@ public enum SnowflakeErrors { "Duplicate case-insensitive column names detected", "Duplicate case-insensitive column names detected. Schematization currently does not support" + " this."), + ERROR_0026( + "0026", + "Missed oauth client id in connector config", + "oauth_client_id must be provided with " + + Utils.SF_OAUTH_CLIENT_ID + + " parameter when using oauth as authenticator"), + ERROR_0027( + "0027", + "Missed oauth client secret in connector config", + "oauth_client_secret must be provided with " + + Utils.SF_OAUTH_CLIENT_SECRET + + " parameter when using oauth as authenticator"), + ERROR_0028( + "0028", + "Missed oauth refresh token in connector config", + "oauth_refresh_token must be provided with " + + Utils.SF_OAUTH_REFRESH_TOKEN + + " parameter when using oauth as authenticator"), + ERROR_0029( + "0029", "Invalid authenticator", "Authenticator should be either oauth or snowflake_jwt"), // Snowflake connection issues 1--- ERROR_1001( "1001", @@ -120,6 +140,10 @@ public enum SnowflakeErrors { "1003", "Snowflake connection is closed", "Either the current connection is closed or hasn't connect to snowflake" + " server"), + ERROR_1004( + "1004", + "Fetching OAuth access token fail", + "Fail to get OAuth access token from authorization server"), // SQL issues 2--- ERROR_2001( "2001", "Failed to prepare SQL statement", "SQL Exception, reported by Snowflake JDBC"), diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeURL.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeURL.java index cae68d956..288d44c8a 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeURL.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeURL.java @@ -109,7 +109,7 @@ boolean sslEnabled() { return ssl; } - String getScheme() { + public String getScheme() { if (ssl) { return "https"; } else { diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingUtils.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingUtils.java index e5cfd46cb..2c6d877f7 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingUtils.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingUtils.java @@ -65,6 +65,14 @@ public class StreamingUtils { // This is overhead size for calculating while buffering Kafka records. public static final int MAX_RECORD_OVERHEAD_BYTES = DefaultRecord.MAX_RECORD_OVERHEAD; + // TODO: Modify STREAMING_CONSTANT to Constants. after SNOW-352846 is released + public static final String STREAMING_CONSTANT_AUTHORIZATION_TYPE = "authorization_type"; + public static final String STREAMING_CONSTANT_JWT = "JWT"; + public static final String STREAMING_CONSTANT_OAUTH = "OAuth"; + public static final String STREAMING_CONSTANT_OAUTH_CLIENT_ID = "oauth_client_id"; + public static final String STREAMING_CONSTANT_OAUTH_CLIENT_SECRET = "oauth_client_secret"; + public static final String STREAMING_CONSTANT_OAUTH_REFRESH_TOKEN = "oauth_refresh_token"; + /* Maps streaming client's property keys to what we got from snowflake KC config file. */ public static Map convertConfigForStreamingClient( Map connectorConfig) { @@ -90,6 +98,20 @@ public static Map convertConfigForStreamingClient( return value; }); + connectorConfig.computeIfPresent( + Utils.SF_AUTHENTICATOR, + (key, value) -> { + if (value.equals(Utils.SNOWFLAKE_JWT)) { + streamingPropertiesMap.put( + STREAMING_CONSTANT_AUTHORIZATION_TYPE, STREAMING_CONSTANT_JWT); + } + if (value.equals(Utils.OAUTH)) { + streamingPropertiesMap.put( + STREAMING_CONSTANT_AUTHORIZATION_TYPE, STREAMING_CONSTANT_OAUTH); + } + return value; + }); + connectorConfig.computeIfPresent( Utils.SF_PRIVATE_KEY, (key, value) -> { @@ -105,6 +127,28 @@ public static Map convertConfigForStreamingClient( } return value; }); + + connectorConfig.computeIfPresent( + Utils.SF_OAUTH_CLIENT_ID, + (key, value) -> { + streamingPropertiesMap.put(STREAMING_CONSTANT_OAUTH_CLIENT_ID, value); + return value; + }); + + connectorConfig.computeIfPresent( + Utils.SF_OAUTH_CLIENT_SECRET, + (key, value) -> { + streamingPropertiesMap.put(STREAMING_CONSTANT_OAUTH_CLIENT_SECRET, value); + return value; + }); + + connectorConfig.computeIfPresent( + Utils.SF_OAUTH_REFRESH_TOKEN, + (key, value) -> { + streamingPropertiesMap.put(STREAMING_CONSTANT_OAUTH_REFRESH_TOKEN, value); + return value; + }); + return streamingPropertiesMap; } diff --git a/src/test/java/com/snowflake/kafka/connector/ConnectorConfigTest.java b/src/test/java/com/snowflake/kafka/connector/ConnectorConfigTest.java index d20ce6b2f..d44f3febb 100644 --- a/src/test/java/com/snowflake/kafka/connector/ConnectorConfigTest.java +++ b/src/test/java/com/snowflake/kafka/connector/ConnectorConfigTest.java @@ -4,6 +4,7 @@ import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ERRORS_TOLERANCE_CONFIG; import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.NAME; import static com.snowflake.kafka.connector.Utils.HTTP_NON_PROXY_HOSTS; +import static com.snowflake.kafka.connector.Utils.OAUTH; import static com.snowflake.kafka.connector.internal.TestUtils.getConfig; import static org.junit.Assert.assertEquals; @@ -898,6 +899,66 @@ public void testMultipleInvalidConfigs() { } } + @Test + public void testOAuthAuthenticator() { + Map config = getConfig(); + config.put(SnowflakeSinkConnectorConfig.AUTHENTICATOR, OAUTH); + config.put(SnowflakeSinkConnectorConfig.OAUTH_CLIENT_ID, "client_id"); + config.put(SnowflakeSinkConnectorConfig.OAUTH_CLIENT_SECRET, "client_secret"); + config.put(SnowflakeSinkConnectorConfig.OAUTH_REFRESH_TOKEN, "refresh_token"); + Utils.validateConfig(config); + } + + @Test + public void testInvalidAuthenticator() { + try { + Map config = getConfig(); + config.put(SnowflakeSinkConnectorConfig.AUTHENTICATOR, "invalid_authenticator"); + Utils.validateConfig(config); + } catch (SnowflakeKafkaConnectorException exception) { + assert exception.getMessage().contains(SnowflakeSinkConnectorConfig.AUTHENTICATOR); + } + } + + @Test + public void testEmptyClientId() { + try { + Map config = getConfig(); + config.put(SnowflakeSinkConnectorConfig.AUTHENTICATOR, OAUTH); + config.put(SnowflakeSinkConnectorConfig.OAUTH_CLIENT_SECRET, "client_secret"); + config.put(SnowflakeSinkConnectorConfig.OAUTH_REFRESH_TOKEN, "refresh_token"); + Utils.validateConfig(config); + } catch (SnowflakeKafkaConnectorException exception) { + assert exception.getMessage().contains(SnowflakeSinkConnectorConfig.OAUTH_CLIENT_ID); + } + } + + @Test + public void testEmptyClientSecret() { + try { + Map config = getConfig(); + config.put(SnowflakeSinkConnectorConfig.AUTHENTICATOR, OAUTH); + config.put(SnowflakeSinkConnectorConfig.OAUTH_CLIENT_ID, "client_id"); + config.put(SnowflakeSinkConnectorConfig.OAUTH_REFRESH_TOKEN, "refresh_token"); + Utils.validateConfig(config); + } catch (SnowflakeKafkaConnectorException exception) { + assert exception.getMessage().contains(SnowflakeSinkConnectorConfig.OAUTH_CLIENT_SECRET); + } + } + + @Test + public void testEmptyRefreshToken() { + try { + Map config = getConfig(); + config.put(SnowflakeSinkConnectorConfig.AUTHENTICATOR, OAUTH); + config.put(SnowflakeSinkConnectorConfig.OAUTH_CLIENT_ID, "client_id"); + config.put(SnowflakeSinkConnectorConfig.OAUTH_CLIENT_SECRET, "client_secret"); + Utils.validateConfig(config); + } catch (SnowflakeKafkaConnectorException exception) { + assert exception.getMessage().contains(SnowflakeSinkConnectorConfig.OAUTH_REFRESH_TOKEN); + } + } + private void invalidConfigRunner(List paramsToRemove) { Map config = getConfig(); for (String configParam : paramsToRemove) { diff --git a/src/test/java/com/snowflake/kafka/connector/ConnectorIT.java b/src/test/java/com/snowflake/kafka/connector/ConnectorIT.java index 6bbcdbcbf..e1b67d28f 100644 --- a/src/test/java/com/snowflake/kafka/connector/ConnectorIT.java +++ b/src/test/java/com/snowflake/kafka/connector/ConnectorIT.java @@ -4,6 +4,7 @@ import static com.snowflake.kafka.connector.Utils.TASK_ID; import static com.snowflake.kafka.connector.internal.TestUtils.TEST_CONNECTOR_NAME; import static com.snowflake.kafka.connector.internal.TestUtils.getConf; +import static com.snowflake.kafka.connector.internal.TestUtils.getConfWithOAuth; import java.util.*; import java.util.concurrent.ExecutorService; @@ -96,6 +97,14 @@ static Map getCorrectConfig() { return config; } + static Map getCorrectConfigWithOAuth() { + Map config = getConfWithOAuth(); + config.remove(Utils.SF_WAREHOUSE); + config.remove(Utils.NAME); + config.remove(TASK_ID); + return config; + } + @Test public void testValidateErrorConfig() { Map validateMap = toValidateMap(getErrorConfig()); @@ -129,6 +138,12 @@ public void testValidateCorrectConfig() { assertPropHasError(validateMap, new String[] {}); } + @Test + public void testValidateCorrectConfigWithOAuth() { + Map validateMap = toValidateMap(getCorrectConfigWithOAuth()); + assertPropHasError(validateMap, new String[] {}); + } + @Test public void testValidateErrorURLFormatConfig() { Map config = getCorrectConfig(); @@ -193,6 +208,40 @@ public void testValidateNullPasswordConfig() { validateMap, new String[] {SnowflakeSinkConnectorConfig.SNOWFLAKE_PRIVATE_KEY}); } + @Test + public void testValidateNullOAuthClientIdConfig() { + Map config = getCorrectConfigWithOAuth(); + config.remove(SnowflakeSinkConnectorConfig.OAUTH_CLIENT_ID); + Map validateMap = toValidateMap(config); + assertPropHasError(validateMap, new String[] {SnowflakeSinkConnectorConfig.OAUTH_CLIENT_ID}); + } + + @Test + public void testValidateNullOAuthClientSecretConfig() { + Map config = getCorrectConfigWithOAuth(); + config.remove(SnowflakeSinkConnectorConfig.OAUTH_CLIENT_SECRET); + Map validateMap = toValidateMap(config); + assertPropHasError( + validateMap, new String[] {SnowflakeSinkConnectorConfig.OAUTH_CLIENT_SECRET}); + } + + @Test + public void testValidateNullOAuthRefreshTokenConfig() { + Map config = getCorrectConfigWithOAuth(); + config.remove(SnowflakeSinkConnectorConfig.OAUTH_REFRESH_TOKEN); + Map validateMap = toValidateMap(config); + assertPropHasError( + validateMap, new String[] {SnowflakeSinkConnectorConfig.OAUTH_REFRESH_TOKEN}); + } + + @Test + public void testValidateInvalidAuthenticator() { + Map config = getCorrectConfig(); + config.put(SnowflakeSinkConnectorConfig.AUTHENTICATOR, "invalid_authenticator"); + Map validateMap = toValidateMap(config); + assertPropHasError(validateMap, new String[] {SnowflakeSinkConnectorConfig.AUTHENTICATOR}); + } + @Test public void testValidateFilePasswordConfig() { Map config = getCorrectConfig(); diff --git a/src/test/java/com/snowflake/kafka/connector/SnowflakeSinkTaskForStreamingIT.java b/src/test/java/com/snowflake/kafka/connector/SnowflakeSinkTaskForStreamingIT.java index bc87d1919..b5387a5e6 100644 --- a/src/test/java/com/snowflake/kafka/connector/SnowflakeSinkTaskForStreamingIT.java +++ b/src/test/java/com/snowflake/kafka/connector/SnowflakeSinkTaskForStreamingIT.java @@ -64,9 +64,12 @@ public void after() { TestUtils.dropTable(topicName); } - @Test - public void testSinkTask() throws Exception { + /** @param useOAuth true if using oauth as authentication, otherwise use snowflake_jwt */ + private void sinkTaskTest(boolean useOAuth) throws Exception { Map config = TestUtils.getConfForStreaming(); + if (useOAuth) { + config = TestUtils.getConfForStreamingWithOAuth(); + } SnowflakeSinkConnectorConfig.setDefaultValues(config); config.put(BUFFER_COUNT_RECORDS, "1"); // override @@ -103,6 +106,17 @@ public void testSinkTask() throws Exception { sinkTask.stop(); } + @Test + public void testSinkTask() throws Exception { + sinkTaskTest(false); + } + + // TODO: Added after SNOW-352846 is released + // @Test + public void testSinkTaskWithOAuth() throws Exception { + sinkTaskTest(true); + } + @Test public void testSinkTaskWithMultipleOpenClose() throws Exception { Map config = TestUtils.getConfForStreaming(); diff --git a/src/test/java/com/snowflake/kafka/connector/internal/ConnectionServiceIT.java b/src/test/java/com/snowflake/kafka/connector/internal/ConnectionServiceIT.java index a9ff9fa02..c16749edd 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/ConnectionServiceIT.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/ConnectionServiceIT.java @@ -40,6 +40,11 @@ public void testEncryptedKey() { .build(); } + @Test + public void testOAuthAZ() { + SnowflakeConnectionServiceFactory.builder().setProperties(TestUtils.getConfWithOAuth()).build(); + } + @Test public void testSetSSLProperties() { Map testConfig = TestUtils.getConf(); @@ -108,7 +113,7 @@ public void createConnectionService() { }); SnowflakeURL url = TestUtils.getUrl(); - Properties prop = InternalUtils.createProperties(TestUtils.getConf(), url.sslEnabled()); + Properties prop = InternalUtils.createProperties(TestUtils.getConf(), url); String appName = TestUtils.TEST_CONNECTOR_NAME; service = diff --git a/src/test/java/com/snowflake/kafka/connector/internal/InternalUtilsTest.java b/src/test/java/com/snowflake/kafka/connector/internal/InternalUtilsTest.java index 09ecfd330..dc6980c00 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/InternalUtilsTest.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/InternalUtilsTest.java @@ -78,7 +78,7 @@ public void testAssertNotEmpty() { public void testCreateProperties() { Map config = TestUtils.getConf(); SnowflakeURL url = TestUtils.getUrl(); - Properties prop = InternalUtils.createProperties(config, url.sslEnabled()); + Properties prop = InternalUtils.createProperties(config, url); assert prop.containsKey(InternalUtils.JDBC_DATABASE); assert prop.containsKey(InternalUtils.JDBC_PRIVATE_KEY); assert prop.containsKey(InternalUtils.JDBC_SCHEMA); @@ -99,7 +99,7 @@ public void testCreateProperties() { () -> { Map t = new HashMap<>(config); t.remove(Utils.SF_PRIVATE_KEY); - InternalUtils.createProperties(t, url.sslEnabled()); + InternalUtils.createProperties(t, url); }); assert TestUtils.assertError( @@ -107,7 +107,7 @@ public void testCreateProperties() { () -> { Map t = new HashMap<>(config); t.remove(Utils.SF_SCHEMA); - InternalUtils.createProperties(t, url.sslEnabled()); + InternalUtils.createProperties(t, url); }); assert TestUtils.assertError( @@ -115,7 +115,7 @@ public void testCreateProperties() { () -> { Map t = new HashMap<>(config); t.remove(Utils.SF_DATABASE); - InternalUtils.createProperties(t, url.sslEnabled()); + InternalUtils.createProperties(t, url); }); assert TestUtils.assertError( @@ -123,7 +123,7 @@ public void testCreateProperties() { () -> { Map t = new HashMap<>(config); t.remove(Utils.SF_USER); - InternalUtils.createProperties(t, url.sslEnabled()); + InternalUtils.createProperties(t, url); }); } diff --git a/src/test/java/com/snowflake/kafka/connector/internal/TestUtils.java b/src/test/java/com/snowflake/kafka/connector/internal/TestUtils.java index 0247f7572..b34f490f2 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/TestUtils.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/TestUtils.java @@ -26,10 +26,15 @@ import static com.snowflake.kafka.connector.Utils.HTTP_PROXY_USER; import static com.snowflake.kafka.connector.Utils.HTTP_USE_PROXY; import static com.snowflake.kafka.connector.Utils.JDK_HTTP_AUTH_TUNNELING; +import static com.snowflake.kafka.connector.Utils.OAUTH; +import static com.snowflake.kafka.connector.Utils.REFRESH_TOKEN; import static com.snowflake.kafka.connector.Utils.SF_DATABASE; +import static com.snowflake.kafka.connector.Utils.SF_OAUTH_CLIENT_SECRET; import static com.snowflake.kafka.connector.Utils.SF_SCHEMA; import static com.snowflake.kafka.connector.Utils.SF_URL; import static com.snowflake.kafka.connector.Utils.SF_USER; +import static com.snowflake.kafka.connector.Utils.getSnowflakeOAuthToken; +import static com.snowflake.kafka.connector.Utils.makeOAuthHttpPost; import com.snowflake.client.jdbc.SnowflakeDriver; import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig; @@ -43,6 +48,7 @@ import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import java.io.File; import java.io.IOException; +import java.io.UnsupportedEncodingException; import java.nio.charset.StandardCharsets; import java.security.PrivateKey; import java.sql.Connection; @@ -58,8 +64,18 @@ import java.util.Random; import java.util.regex.Matcher; import java.util.regex.Pattern; +import net.snowflake.client.jdbc.internal.apache.http.HttpHeaders; +import net.snowflake.client.jdbc.internal.apache.http.client.methods.CloseableHttpResponse; +import net.snowflake.client.jdbc.internal.apache.http.client.methods.HttpPost; +import net.snowflake.client.jdbc.internal.apache.http.entity.ContentType; +import net.snowflake.client.jdbc.internal.apache.http.entity.StringEntity; +import net.snowflake.client.jdbc.internal.apache.http.impl.client.CloseableHttpClient; +import net.snowflake.client.jdbc.internal.apache.http.impl.client.HttpClientBuilder; +import net.snowflake.client.jdbc.internal.apache.http.util.EntityUtils; import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode; import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper; +import net.snowflake.client.jdbc.internal.google.gson.JsonObject; +import net.snowflake.client.jdbc.internal.google.gson.JsonParser; import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient; import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClientFactory; import org.apache.kafka.common.record.TimestampType; @@ -81,6 +97,33 @@ public class TestUtils { private static final String PRIVATE_KEY = "private_key"; private static final String ENCRYPTED_PRIVATE_KEY = "encrypted_private_key"; private static final String PRIVATE_KEY_PASSPHRASE = "private_key_passphrase"; + private static final String AUTHENTICATOR = "authenticator"; + private static final String OAUTH_CLIENT_ID = "oauth_client_id"; + private static final String OAUTH_CLIENT_SECRET = "oauth_client_secret"; + private static final String OAUTH_REFRESH_TOKEN = "oauth_refresh_token"; + private static final String PASSWORD = "password"; + + // AZ request data key + private static final String AZ_ACCOUNT_NAME = "ACCOUNT_NAME"; + private static final String AZ_LOGIN_NAME = "LOGIN_NAME"; + private static final String AZ_PASSWORD = "PASSWORD"; + private static final String AZ_CLIENT_ID = "clientId"; + private static final String AZ_REDIRECT_URL = "redirectUrl"; + private static final String AZ_RESPONSE_TYPE = "responseType"; + private static final String AZ_SCOPE = "scope"; + private static final String AZ_MASTER_TOKEN = "masterToken"; + + // AZ endpoints + private static final String AZ_LOGIN_ENDPOINT = "/session/authenticate-request"; + private static final String AZ_REQUEST_ENDPOINT = "/oauth/authorization-request"; + + // AZ request value + private static final String AZ_SCOPE_REFRESH_TOKEN_PREFIX = "refresh_token"; + private static final String AZ_SCOPE_ROLE_PREFIX = "session:role:"; + private static final String AZ_RESPONSE_TYPE_CODE = "code"; + private static final String AZ_GRANT_TYPE = "authorization_code"; + private static final String AZ_CREDENTIAL_TYPE_CODE = "code"; + private static final Random random = new Random(); private static final String DES_RSA_KEY = "des_rsa_key"; public static final String TEST_CONNECTOR_NAME = "TEST_CONNECTOR"; @@ -98,6 +141,8 @@ public class TestUtils { private static Map conf = null; + private static Map confWithOAuth = null; + private static Map confForStreaming = null; private static SnowflakeURL url = null; @@ -159,12 +204,35 @@ private static JsonNode getProfile(final String profileFilePath) { private static Map getPropertiesMapFromProfile(final String profileFileName) { Map configuration = new HashMap<>(); - configuration.put(Utils.SF_USER, getProfile(profileFileName).get(USER).asText()); - configuration.put(Utils.SF_DATABASE, getProfile(profileFileName).get(DATABASE).asText()); - configuration.put(Utils.SF_SCHEMA, getProfile(profileFileName).get(SCHEMA).asText()); - configuration.put(Utils.SF_URL, getProfile(profileFileName).get(HOST).asText()); - configuration.put(Utils.SF_WAREHOUSE, getProfile(profileFileName).get(WAREHOUSE).asText()); - configuration.put(Utils.SF_PRIVATE_KEY, getProfile(profileFileName).get(PRIVATE_KEY).asText()); + JsonNode profileJson = getProfile(profileFileName); + configuration.put(Utils.SF_USER, profileJson.get(USER).asText()); + configuration.put(Utils.SF_DATABASE, profileJson.get(DATABASE).asText()); + configuration.put(Utils.SF_SCHEMA, profileJson.get(SCHEMA).asText()); + configuration.put(Utils.SF_URL, profileJson.get(HOST).asText()); + configuration.put(Utils.SF_WAREHOUSE, profileJson.get(WAREHOUSE).asText()); + + if (profileJson.has(AUTHENTICATOR)) { + configuration.put(Utils.SF_AUTHENTICATOR, profileJson.get(AUTHENTICATOR).asText()); + } + if (profileJson.has(PRIVATE_KEY)) { + configuration.put(Utils.SF_PRIVATE_KEY, profileJson.get(PRIVATE_KEY).asText()); + } + if (profileJson.has(OAUTH_CLIENT_ID)) { + configuration.put(Utils.SF_OAUTH_CLIENT_ID, profileJson.get(OAUTH_CLIENT_ID).asText()); + } + if (profileJson.has(OAUTH_CLIENT_SECRET)) { + configuration.put( + Utils.SF_OAUTH_CLIENT_SECRET, profileJson.get(OAUTH_CLIENT_SECRET).asText()); + } + if (profileJson.has(OAUTH_REFRESH_TOKEN)) { + configuration.put( + Utils.SF_OAUTH_REFRESH_TOKEN, profileJson.get(OAUTH_REFRESH_TOKEN).asText()); + } + + // password only appears in test profile + if (profileJson.has(PASSWORD)) { + configuration.put(PASSWORD, profileJson.get(PASSWORD).asText()); + } configuration.put(Utils.NAME, TEST_CONNECTOR_NAME); @@ -215,7 +283,7 @@ private static Connection generateConnectionToSnowflake(final String profileFile SnowflakeURL url = new SnowflakeURL(getConfFromFileName(profileFileName).get(Utils.SF_URL)); Properties properties = - InternalUtils.createProperties(getConfFromFileName(profileFileName), url.sslEnabled()); + InternalUtils.createProperties(getConfFromFileName(profileFileName), url); Connection connToSnowflake = new SnowflakeDriver().connect(url.getJdbcUrl(), properties); @@ -260,6 +328,20 @@ public static Map getConfForStreaming() { return configuration; } + /* Get configuration map from profile path. Used against prod deployment of Snowflake */ + public static Map getConfForStreamingWithOAuth() { + Map configuration = getConfWithOAuth(); + + // On top of existing configurations, add + configuration.put(Utils.SF_ROLE, getProfile(PROFILE_PATH).get(ROLE).asText()); + configuration.put(Utils.TASK_ID, "0"); + configuration.put( + SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT, + IngestionMethodConfig.SNOWPIPE_STREAMING.toString()); + + return configuration; + } + /** @return JDBC config with encrypted private key */ static Map getConfWithEncryptedKey() { if (conf == null) { @@ -274,6 +356,22 @@ static Map getConfWithEncryptedKey() { return config; } + public static Map getConfWithOAuth() { + if (confWithOAuth == null) { + Map config = getConf(); + assert (config.containsKey(PASSWORD) || config.containsKey(Utils.SF_OAUTH_REFRESH_TOKEN)) + && config.containsKey(Utils.SF_OAUTH_CLIENT_ID) + && config.containsKey(SF_OAUTH_CLIENT_SECRET); + if (!config.containsKey(OAUTH_REFRESH_TOKEN)) { + config.put(Utils.SF_OAUTH_REFRESH_TOKEN, getRefreshToken(config)); + } + config.put(Utils.SF_AUTHENTICATOR, OAUTH); + config.remove(Utils.SF_PRIVATE_KEY); + confWithOAuth = config; + } + return confWithOAuth; + } + /** * execute sql query * @@ -354,7 +452,7 @@ private static String getPropertyValueFromKey(String name) { static SnowflakeURL getUrl() { if (url == null) { - url = new SnowflakeURL(getPropertyValueFromKey(Utils.SF_URL)); + url = new SnowflakeURL(getProfile(PROFILE_PATH).get(HOST).asText()); } return url; } @@ -380,6 +478,11 @@ public static SnowflakeConnectionService getConnectionService() { return SnowflakeConnectionServiceFactory.builder().setProperties(getConf()).build(); } + /** @return snowflake connection using OAuth authentication */ + public static SnowflakeConnectionService getOAuthConnectionService() { + return SnowflakeConnectionServiceFactory.builder().setProperties(getConfWithOAuth()).build(); + } + public static SnowflakeConnectionService getConnectionServiceForStreaming() { return SnowflakeConnectionServiceFactory.builder().setProperties(getConfForStreaming()).build(); } @@ -746,4 +849,98 @@ public static SnowflakeStreamingIngestClient createStreamingClient( .setProperties(clientProperties) .build(); } + + /** + * Get refresh token using username, password, clientId and clientSecret + * + * @param config config parsed from profile + * @return refresh token + */ + public static String getRefreshToken(Map config) { + assert config.containsKey(Utils.SF_USER) + && config.containsKey(Utils.SF_OAUTH_CLIENT_ID) + && config.containsKey(Utils.SF_OAUTH_CLIENT_SECRET); + return getSnowflakeOAuthToken( + getUrl(), + config.get(Utils.SF_OAUTH_CLIENT_ID), + config.get(Utils.SF_OAUTH_CLIENT_SECRET), + getAZCode(config), + AZ_GRANT_TYPE, + AZ_CREDENTIAL_TYPE_CODE, + REFRESH_TOKEN); + } + + private static String getAZCode(Map config) { + assert config.containsKey(PASSWORD) + && config.containsKey(Utils.SF_USER) + && config.containsKey(Utils.SF_OAUTH_CLIENT_ID) + && config.containsKey(Utils.SF_OAUTH_CLIENT_SECRET); + + CloseableHttpClient client = HttpClientBuilder.create().build(); + SnowflakeURL url = getUrl(); + Map headers = new HashMap<>(); + headers.put(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.toString()); + + // Build login request + JsonObject loginData = new JsonObject(); + loginData.addProperty(AZ_ACCOUNT_NAME, url.getAccount().toUpperCase()); + loginData.addProperty(AZ_LOGIN_NAME, config.get(Utils.SF_USER)); + loginData.addProperty(AZ_PASSWORD, config.get(PASSWORD)); + loginData.addProperty(AZ_CLIENT_ID, config.get(Utils.SF_OAUTH_CLIENT_ID)); + loginData.addProperty(AZ_RESPONSE_TYPE, AZ_RESPONSE_TYPE_CODE); + String scopeString = AZ_SCOPE_REFRESH_TOKEN_PREFIX; + if (config.containsKey(Utils.SF_ROLE)) { + scopeString += " " + AZ_SCOPE_ROLE_PREFIX + config.get(ROLE).toUpperCase(); + } + loginData.addProperty(AZ_SCOPE, scopeString); + JsonObject loginPayload = new JsonObject(); + loginPayload.add("data", loginData); + HttpPost loginRequest = + makeOAuthHttpPost( + url, AZ_LOGIN_ENDPOINT, headers, buildStringEntity(loginPayload.toString())); + + // Login + String masterToken; + try (CloseableHttpResponse httpResponse = client.execute(loginRequest)) { + String respBodyString = EntityUtils.toString(httpResponse.getEntity()); + JsonObject respBody = JsonParser.parseString(respBodyString).getAsJsonObject(); + masterToken = + respBody + .get("data") + .getAsJsonObject() + .get(AZ_MASTER_TOKEN) + .toString() + .replaceAll("^\"|\"$", ""); + } catch (Exception e) { + throw SnowflakeErrors.ERROR_1004.getException(e); + } + + // Build AZ code request + loginData.addProperty(AZ_MASTER_TOKEN, masterToken); + HttpPost aZCodeRequest = + makeOAuthHttpPost( + url, AZ_REQUEST_ENDPOINT, headers, buildStringEntity(loginData.toString())); + + // Request AZ code + try (CloseableHttpResponse httpResponse = client.execute(aZCodeRequest)) { + String respBodyString = EntityUtils.toString(httpResponse.getEntity()); + JsonObject respBody = JsonParser.parseString(respBodyString).getAsJsonObject(); + return respBody + .get("data") + .getAsJsonObject() + .get(AZ_REDIRECT_URL) + .toString() + .replaceAll(".*\\bcode=([A-Fa-f0-9]+).*", "$1"); + } catch (Exception e) { + throw SnowflakeErrors.ERROR_1004.getException(e); + } + } + + private static StringEntity buildStringEntity(String payload) { + try { + return new StringEntity(payload); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + } } diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java index e7ca59baa..bcaafb72c 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java @@ -57,7 +57,22 @@ public void afterEach() { @Test public void testSinkServiceV2Builder() { - Map config = TestUtils.getConfForStreaming(); + sinkServiceV2BuilderTest(false); + } + + // TODO: Added after SNOW-352846 is released + // @Test + public void testSinkServiceV2BuilderWithOAuth() { + sinkServiceV2BuilderTest(true); + } + + private void sinkServiceV2BuilderTest(boolean useOAuth) { + Map config; + if (useOAuth) { + config = TestUtils.getConfForStreamingWithOAuth(); + } else { + config = TestUtils.getConfForStreaming(); + } SnowflakeSinkConnectorConfig.setDefaultValues(config); SnowflakeSinkService service = @@ -67,19 +82,23 @@ public void testSinkServiceV2Builder() { assert service instanceof SnowflakeSinkServiceV2; // connection test + Map finalConfig = config; assert TestUtils.assertError( SnowflakeErrors.ERROR_5010, () -> SnowflakeSinkServiceFactory.builder( - null, IngestionMethodConfig.SNOWPIPE_STREAMING, config) + null, IngestionMethodConfig.SNOWPIPE_STREAMING, finalConfig) .build()); assert TestUtils.assertError( SnowflakeErrors.ERROR_5010, () -> { SnowflakeConnectionService conn = TestUtils.getConnectionService(); + if (useOAuth) { + conn = TestUtils.getOAuthConnectionService(); + } conn.close(); SnowflakeSinkServiceFactory.builder( - conn, IngestionMethodConfig.SNOWPIPE_STREAMING, config) + conn, IngestionMethodConfig.SNOWPIPE_STREAMING, finalConfig) .build(); }); }