asp.net core源码如何实现监听Http请求,分析Kestrel看一下过程

时间:2022-10-04 19:42:43

#asp.net core#先让我们看一下最小API的代码,通过以下几行代码就可以搭建一个简单的asp.NET core web服务器,是不是十分简洁?

var builder = WebApplication.CreateBuilder(args);
var app = builder.Build();
app.MapGet("/", () => "Hello World!");



让我们通过源码整体看一下执行过程,源码以.NET 7为例


/// <summary>
/// Runs an application and block the calling thread until host shutdown.
/// </summary>
/// <param name="url">The URL to listen to if the server hasn't been configured directly.</param>
public void Run(string? url = null)


/// <summary>
    /// Runs an application and returns a <see cref="Task"/> that only completes when the token is triggered or shutdown is triggered.
    /// The <paramref name="host"/> instance is disposed of after running.
    /// </summary>
    /// <param name="host">The <see cref="IHost"/> to run.</param>
    /// <param name="token">The token to trigger shutdown.</param>
    /// <returns>The <see cref="Task"/> that represents the asynchronous operation.</returns>
    public static async Task RunAsync(this IHost host, CancellationToken token = default)
            awAIt host.StartAsync(token).ConfigureAwait(false);
            await host.WaitForShutdownAsync(token).ConfigureAwait(false);
            if (host is IAsyncDisposable asyncDisposable)
                await asyncDisposable.DisposeAsync().ConfigureAwait(false);

3 KestrelServer执行StartAsync()方法

public Task StartAsync<TContext>(IHttpApplication<TContext> application, CancellationToken cancellationToken) where TContext : notnull
    return _innerKestrelServer.StartAsync(application, cancellationToken);


public async Task StartAsync<TContext>(IHttpApplication<TContext> application, CancellationToken cancellationToken) where TContext : notnull

            if (_hasStarted)
                // The server has already started and/or has not been cleaned up yet
                throw new InvalidOperationException(CoreStrings.ServerAlreadyStarted);
            _hasStarted = true;


            async Task OnBind(ListenOptions options, CancellationToken onBindCancellationToken)
                var hasHttp1 = options.Protocols.HasFlag(HttpProtocols.Http1);
                var hasHttp2 = options.Protocols.HasFlag(HttpProtocols.Http2);
                var hasHttp3 = options.Protocols.HasFlag(HttpProtocols.Http3);
                var hasTls = options.IsTls;

                // Filter out invalid combinations.

                if (!hasTls)
                    // Http/1 without TLS, no-op HTTP/2 and 3.
                    if (hasHttp1)
                        hasHttp2 = false;
                        hasHttp3 = false;
                    // Http/3 requires TLS. Note we only let it fall back to HTTP/1, not HTTP/2
                    else if (hasHttp3)
                        throw new InvalidOperationException("HTTP/3 requires HTTPS.");

                // Quic isn't registered if it's not supported, throw if we can't fall back to 1 or 2
                if (hasHttp3 && _multiplexedTransportFactory is null && !(hasHttp1 || hasHttp2))
                    throw new InvalidOperationException("This platform doesn't support QUIC or HTTP/3.");

                // Disable adding alt-svc header if endpoint has configured not to or there is no
                // multiplexed transport factory, which happens if QUIC isn't supported.
                var addAltSvcHeader = !options.DisableAltSvcHeader && _multiplexedTransportFactory != null;

                var configuredEndpoint = options.EndPoint;

                // Add the HTTP middleware as the terminal connection middleware
                if (hasHttp1 || hasHttp2
                    || options.Protocols == HttpProtocols.None) // TODO a test fails because it doesn't throw an exception in the right place
                                                                // when there is no HttpProtocols in KestrelServer, can we remove/change the test?
                    if (_transportFactory is null)
                        throw new InvalidOperationException($"Cannot start HTTP/1.x or HTTP/2 server if no {nameof(IConnectionListenerFactory)} is registered.");

                    options.UseHttpServer(ServiceContext, application, options.Protocols, addAltSvcHeader);
                    var connectionDelegate = options.Build();

                    // Add the connection limit middleware
                    connectionDelegate = EnforceConnectionLimit(connectionDelegate, Options.Limits.MaxConcurrentConnections, Trace);

                    options.EndPoint = await _transportManager.BindAsync(configuredEndpoint, connectionDelegate, options.EndpointConfig, onBindCancellationToken).ConfigureAwait(false);

                if (hasHttp3 && _multiplexedTransportFactory is not null)
                    // Check if a previous transport has changed the endpoint. If it has then the endpoint is dynamic and we can't guarantee it will work for other transports.
                    // For more details, see https://Github.com/dotnet/aspnetcore/issues/42982
                    if (!configuredEndpoint.Equals(options.EndPoint))
                        // 增加 ConnectionContext 中间件,后面处理Http请求会用到
                        options.UseHttp3Server(ServiceContext, application, options.Protocols, addAltSvcHeader);
                        var multiplexedConnectionDelegate = ((IMultiplexedConnectionBuilder)options).Build();

                        // Add the connection limit middleware
                        multiplexedConnectionDelegate = EnforceConnectionLimit(multiplexedConnectionDelegate, Options.Limits.MaxConcurrentConnections, Trace);
                        // 绑定
                        options.EndPoint = await _transportManager.BindAsync(configuredEndpoint, multiplexedConnectionDelegate, options, onBindCancellationToken).ConfigureAwait(false);

            AddressBindContext = new AddressBindContext(_serverAddresses, Options, Trace, OnBind);

            await BindAsync(cancellationToken).ConfigureAwait(false);
            // Don't log the error https://github.com/dotnet/aspnetcore/issues/29801

        // Register the options with the event source so it can be logged (if necessary)

5 开始绑定socket端口

public async Task<EndPoint> BindAsync(EndPoint endPoint, ConnectionDelegate connectionDelegate, EndpointConfig? endpointConfig, CancellationToken cancellationToken)
    if (_transportFactory is null)
        throw new InvalidOperationException($"Cannot bind with {nameof(ConnectionDelegate)} no {nameof(IConnectionListenerFactory)} is registered.");
    var transport = await _transportFactory.BindAsync(endPoint, cancellationToken).ConfigureAwait(false);
    StartAcceptLoop(new GenericConnectionListener(transport), c => connectionDelegate(c), endpointConfig);
    return transport.EndPoint;


ThreadPool.UnsafeQueueUserWorkItem(StartAcceptingConnectionsCore, listener, preferLocal: false);

7 通过 while循环不断监听socket连接请求

private void StartAcceptingConnectionsCore(IConnectionListener<T> listener)
    // REVIEW: Multiple accept loops in parallel?
    _ = AcceptConnectionsAsync();

    async Task AcceptConnectionsAsync()
            while (true)
                var connection = await listener.AcceptAsync();

                if (connection == null)
                    // We're done listening

                // Add the connection to the connection manager before we queue it for execution
                var id = _transportConnectionManager.GetNewConnectionId();
                var kestrelConnection = new KestrelConnection<T>(
                    id, _serviceContext, _transportConnectionManager, _connectionDelegate, connection, Log);

                _transportConnectionManager.AddConnection(id, kestrelConnection);


                ThreadPool.UnsafeQueueUserWorkItem(kestrelConnection, preferLocal: false);
        catch (Exception ex)
            // REVIEW: If the accept loop ends should this trigger a server shutdown? It will manifest as a hang
            Log.LogCritical(0, ex, "The connection listener failed to accept any new connections.");

8 使用Socket AcceptAsync的方法

public async ValueTask<ConnectionContext?> AcceptAsync(CancellationToken cancellationToken = default)
    while (true)
            Debug.Assert(_listenSocket != null, "Bind must be called first.");

            var acceptSocket = await _listenSocket.AcceptAsync(cancellationToken);

            // Only apply no delay to Tcp based endpoints
            if (acceptSocket.LocalEndPoint is IPEndPoint)
                acceptSocket.NoDelay = _options.NoDelay;

            return _factory.Create(acceptSocket);
        catch (ObjectDisposedException)
            // A call was made to UnbindAsync/DisposeAsync just return null which signals we're done
            return null;
        catch (SocketException e) when (e.SocketErrorCode == SocketError.OperationAborted)
            // A call was made to UnbindAsync/DisposeAsync just return null which signals we're done
            return null;
        catch (SocketException)
            // The connection got reset while it was in the backlog, so we try again.
            SocketsLog.ConnectionReset(_logger, connectionId: "(null)");

9 当有连接过来后执行 KestrelConnection的Execute方法

internal async Task ExecuteAsync()
    var connectionContext = _transportConnection;



        using (BeginConnectionScope(connectionContext))
                await _connectionDelegate(connectionContext);
            catch (Exception ex)
                Logger.LogError(0, ex, "Unhandled exception while processing {ConnectionId}.", connectionContext.ConnectionId);
        await FireOnCompletedAsync();


        // Dispose the transport connection, this needs to happen before removing it from the
        // connection manager so that we only signal completion of this connection after the transport
        // is properly torn down.
        await connectionContext.DisposeAsync();


10 监听到socket连接后,开始执行连接的委托,就是我们第4步注释提到的提前注入的一个中间件

public Task OnConnectionAsync(ConnectionContext connectionContext)
    var memoryPoolFeature = connectionContext.Features.Get<IMemoryPoolFeature>();
    var protocols = connectionContext.Features.Get<HttpProtocolsFeature>()?.HttpProtocols ?? _endpointDefaultProtocols;
    var localEndPoint = connectionContext.LocalEndPoint as IPEndPoint;
    var altSvcHeader = _addAltSvcHeader && localEndPoint != null ? HttpUtilities.GetEndpointAltSvc(localEndPoint, protocols) : null;

    var httpConnectionContext = new HttpConnectionContext(
        memoryPoolFeature?.MemoryPool ?? System.Buffers.MemoryPool<byte>.Shared,
        connectionContext.RemoteEndPoint as IPEndPoint);
    httpConnectionContext.Transport = connectionContext.Transport;

    var connection = new HttpConnection(httpConnectionContext);

    return connection.ProcessRequestsAsync(_application);

11 通过while循环监听Http请求,如果连接是keepAlive则不需要重复连接socket,一直在监听即可(如果是keeplive http连接不会断开)

private async Task ProcessRequests<TContext>(IHttpApplication<TContext> application) where TContext : notnull
        while (_keepAlive)
            if (_context.InitialExecutionContext is null)
                // If this is a first request on a non-Http2Connection, capture a clean ExecutionContext.
                _context.InitialExecutionContext = ExecutionContext.Capture();
                // Clear any AsyncLocals set during the request; back to a clean state ready for next request
                // And/or reset to Http2Connection's ExecutionContext giving access to the connection logging scope
                // and any other AsyncLocals set by connection middleware.


            var result = default(ReadResult);
            bool endConnection;
                if (BeginRead(out var awaitable))
                    result = await awaitable;
            } while (!TryParseRequest(result, out endConnection));

            if (endConnection)
                // Connection finished, stop processing requests

            var messageBody = CreateMessageBody();
            if (!messageBody.RequestKeepAlive)
                _keepAlive = false;

            IsUpgradableRequest = messageBody.RequestUpgrade;


            var context = application.CreateContext(this);


                // Run the application code for this request
                await application.ProcessRequestAsync(context);

                // Trigger OnStarting if it hasn't been called yet and the app hasn't
                // already failed. If an OnStarting callback throws we can go through
                // our normal error handling in ProduceEnd.
                // https://github.com/aspnet/KestrelHttpServer/issues/43
                if (!HasResponseStarted && _applicationException == null && _onStarting?.Count > 0)
                    await FireOnStarting();

                if (!_connectionAborted && !VerifyResponseContentLength(out var lengthException))
            catch (BadHttpRequestException ex)
                // Capture BadHttpRequestException for further processing
                // This has to be caught here so StatusCode is set properly before disposing the HttpContext
                // (DisposeContext logs StatusCode).
            catch (Exception ex)


            // At this point all user code that needs use to the request or response streams has completed.
            // Using these streams in the OnCompleted callback is not allowed.
                Debug.Assert(_bodyControl != null);
                await _bodyControl.StopAsync();
            catch (Exception ex)
                // BodyControl.StopAsync() can throw if the PipeWriter was completed prior to the application writing
                // enough bytes to satisfy the specified Content-Length. This risks double-logging the exception,
                // but this scenario generally indicates an app bug, so I don't want to risk not logging it.

            // 4XX responses are written by TryProduceInvalidRequestResponse during connection tear down.
            if (_requestRejectedException == null)
                if (!_connectionAborted)
                    // Call ProduceEnd() before consuming the rest of the request body to prevent
                    // delaying clients waiting for the chunk terminator:
                    // https://github.com/dotnet/corefx/issues/17330#issuecomment-288248663
                    // This also prevents the 100 Continue response from being sent if the app
                    // never tried to read the body.
                    // https://github.com/aspnet/KestrelHttpServer/issues/2102
                    // ProduceEnd() must be called before _application.DisposeContext(), to ensure
                    // HttpContext.Response.StatusCode is correctly set when
                    // IHttpContextFactory.Dispose(HttpContext) is called.
                    await ProduceEnd();
                else if (!HasResponseStarted)
                    // If the request was aborted and no response was sent, there's no
                    // meaningful status code to log.
                    StatusCode = 0;

            if (_onCompleted?.Count > 0)
                await FireOnCompleted();

            application.DisposeContext(context, _applicationException);

            // Even for non-keep-alive requests, try to consume the entire body to avoid RSTs.
            if (!_connectionAborted && _requestRejectedException == null && !messageBody.IsEmpty)
                await messageBody.ConsumeAsync();

            if (HasStartedConsumingRequestBody)
                await messageBody.StopAsync();


11 处理Http请求中间再执行我们在Http管道里的委托方法,用来进行HTTP的构造和HTTP Response,管道这里就不展开介绍了,可以看下官网的介绍



12 最后通过Http1OutputProducer最终响应结果

public ValueTask<FlushResult> WriteStreamSuffixAsync()
    ValueTask<FlushResult> result = default;

    lock (_contextLock)
        if (!_writeStreamSuffixCalled)
            if (_autoChunk)
                var writer = new BufferWriter<PipeWriter>(_pipeWriter);
                result = WriteAsyncInternal(ref writer, EndChunkedResponseBytes);
            else if (_unflushedBytes > 0)
                result = FlushAsync();

            _writeStreamSuffixCalled = true;

    return result;


