@@ -76,6 +76,19 @@ void setSecureMsg(HTTPMessage& msg,
7676 msg.setSecure (setupTransportInfo.secure );
7777}
7878
79+ using CoroWtSession = proxygen::coro::detail::CoroWtSession;
80+ class CoroWtSessionImpl : public CoroWtSession {
81+ public:
82+ using CoroWtSession::CoroWtSession;
83+ HTTPSessionContextPtr ka{}; // ensures session isn't destroyed before WtHandle
84+ const folly::SocketAddress& getLocalAddress () const noexcept override {
85+ return ka->getLocalAddress ();
86+ }
87+ const folly::SocketAddress& getPeerAddress () const noexcept override {
88+ return ka->getPeerAddress ();
89+ }
90+ };
91+
7992} // namespace
8093
8194using folly::coro::co_error;
@@ -91,6 +104,37 @@ HTTPSource* getErrorResponse(uint16_t statusCode, const std::string& body) {
91104 return resp;
92105}
93106
107+ struct HTTPCoroSession ::WtHelper {
108+ HTTPCoroSession& sess;
109+ auto createEgressSource () const noexcept {
110+ return detail::EgressSourcePtr (new detail::EgressSource (
111+ sess.eventBase_ .get (),
112+ /* id=*/ folly::none,
113+ /* callback=*/ nullptr ,
114+ /* egressBufferSize=*/ sess.getStreamSendFlowControlWindow ()));
115+ }
116+ auto createHttpSourceTransport (detail::EgressSourcePtr&& egress,
117+ HTTPSourceHolder&& ingress) const noexcept {
118+ return detail::makeHttpSourceTransport (
119+ sess.eventBase_ .get (), std::move (egress), std::move (ingress));
120+ }
121+ auto createWtSession (std::unique_ptr<folly::coro::TransportIf> transport,
122+ WebTransportHandler::Ptr wtHandler) noexcept {
123+ using namespace proxygen ::detail;
124+ auto dir = sess.isDownstream () ? WtDir::Server : WtDir::Client;
125+ auto wt = std::make_shared<CoroWtSessionImpl>(
126+ sess.eventBase_ .get (),
127+ dir,
128+ getWtConfig (sess.codec_ ->getIngressSettings (),
129+ sess.codec_ ->getEgressSettings ()),
130+ std::move (wtHandler),
131+ std::move (transport));
132+ wt->ka = sess.acquireKeepAlive ();
133+ wt->start (wt);
134+ return wt;
135+ }
136+ };
137+
94138struct HTTPCoroSession ::StreamState {
95139 private:
96140 // Detach Criteria: source and egressCoro start as complete until the stream
@@ -3715,19 +3759,6 @@ folly::coro::Task<WtReqResult> makeInternalEx(std::string_view err) {
37153759 HTTPError{HTTPErrorCode::INTERNAL_ERROR, std::string (err)});
37163760}
37173761
3718- class CoroWtSessionImpl : public detail ::CoroWtSession {
3719- public:
3720- using CoroWtSession::CoroWtSession;
3721- HTTPSessionContextPtr ka_{}; // ensures session isn't destructed prior to
3722- // WtHandle
3723- const folly::SocketAddress& getLocalAddress () const noexcept override {
3724- return ka_->getLocalAddress ();
3725- }
3726- const folly::SocketAddress& getPeerAddress () const noexcept override {
3727- return ka_->getPeerAddress ();
3728- }
3729- };
3730-
37313762}; // namespace
37323763
37333764/* *
@@ -3769,11 +3800,8 @@ folly::coro::Task<WtReqResult> HTTPUniplexTransportSession::sendWtReq(
37693800 }
37703801
37713802 // valid wt req
3772- auto egressSource = detail::EgressSourcePtr (new detail::EgressSource (
3773- eventBase_.get (),
3774- /* id=*/ folly::none,
3775- /* callback=*/ nullptr ,
3776- /* egressBufferSize=*/ getStreamSendFlowControlWindow ()));
3803+ WtHelper wtHelper{*this };
3804+ auto egressSource = wtHelper.createEgressSource ();
37773805 egressSource->validateHeadersAndSkip (msg);
37783806
37793807 auto res = sendRequestImpl (/* headers=*/ msg,
@@ -3793,19 +3821,9 @@ folly::coro::Task<WtReqResult> HTTPUniplexTransportSession::sendWtReq(
37933821 }
37943822
37953823 // wt upgrade successful
3796- auto transport = detail::makeHttpSourceTransport (
3797- eventBase_.get (), std::move (egressSource), std::move (*res));
3798- auto wt = std::make_shared<CoroWtSessionImpl>(
3799- eventBase_.get (),
3800- ::proxygen::detail::WtDir::Client,
3801- ::proxygen::detail::getWtConfig (codec_->getIngressSettings (),
3802- codec_->getEgressSettings()),
3803- std::move(wtHandler),
3804- std::move(transport));
3805- wt->ka_ = acquireKeepAlive();
3806- wt->start (wt);
3807-
3808- ret.wt = std::move (wt);
3824+ auto transport = wtHelper.createHttpSourceTransport (std::move (egressSource),
3825+ std::move (*res));
3826+ ret.wt = wtHelper.createWtSession (std::move (transport), std::move (wtHandler));
38093827 co_return ret;
38103828}
38113829
0 commit comments