RabbitMQ 客户端源码系列 - Connection

前言

本次打算直接上干货分享 RabbitMQ Java 客户端一系列的客户源码分析 (com.rabbitmq:amqp-client:4.8.3)。

ps:最近接收到公司的端源任务就是阅读和分析 spring-rabbit、amqp-client,码系因此打算一同和大家分享 amqp-client。客户由于 RabbitMQ 是端源 Erlang 语言开发(暂时没有对这块分享的计划)。

友情提醒:本次分享适合的码系人群,需要对 RabbitMQ 有一定的客户了解。

RabbitMQ Getstarted: https://www.rabbitmq.com/#getstarted。端源Java Client API Guide: https://www.rabbitmq.com/api-guide.html。码系

废话不多话,客户开整!

Java Client Connection Demo

我们先看一个官网提供的端源 Java Client Connecting to RabbitMQ Demo。

ConnectionFactory factory = new ConnectionFactory();

// "guest"/"guest" by default,码系 limited to localhost connections

factory.setUsername(userName);

factory.setPassword(password);

factory.setVirtualHost(virtualHost);

factory.setHost(hostName);

factory.setPort(portNumber);

Connection conn = factory.newConnection();

Channel channel = connection.createChannel();

byte[] messageBodyBytes = "Hello, world!".getBytes();

channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);

channel.close();

connection.close();AMQP 协议交互流程

已经使用过 RabbitMQ 的同学相信已经不陌生,因此就简单的客户描述下:与 RabbitMQ Broker 建立 Connection 和 Channel,发送消息后,端源关闭 Connection 和 Channel 的码系过程。下图是 针对这个过程使用 Wireshark 抓包查看整个 AMQP 协议的交互流程(172.30.0.74 为客户端即本机 ip;192.168.17.160 为 RabbitMQ Broker 的 ip)。

「client 与 broker 创建Connection、Channel、发送消息」

「client 与 broker 发送心跳(Heartbeat)、关闭Connection、云服务器Channel」

为了让读者更容易看得源码,我先给大家描述下 client 与 broker 之间 AMQP 协议的交互流程描述(AMQP 协议中 不少命令都是成对存在的,抓包协议中 Info 里的命令是 -,而代码里的是 驼峰式 此处以代码为准):

将 AMQP 0-9-1 的连接头写入底层套接字,包含指定的版本信息(客户端告诉 broker 自己使用的协议及版本,底层使用 java 自带的 socket)。客户端等待 broker 发送的 Connection.Start (broker 告诉客户端 通信的协议和版本、SASL认证机制(详细见)、语言环境以及RabbitMQ的版本信息和支持能力)。客户端接收后 发送 Connection.StartOk (客户端告诉 broker 连接使用的帐号和密码、认证机制、语言环境、客户的信息以及能力)。客户端等待 broker 发送的 Connection.Tune (broker 与 客户端 进行参数协商)。客户端接收后 发送 Connection.TuneOk (客户端 参数 [ChannelMax、FrameMax、亿华云Heartbeat] 协商完成后告诉 broker)。客户端发送 Connection.Open (客户端 告诉 broker 打开一个连接,并请求设置_virtualHost [vhost])。broker 接收到后返回 Connection.OpenOk (client 对 vhost 进行验证,成功则返回如下此信息)。客户端发送 Channel.Open,broker 接收到后返回 Channel.OpenOk (客户端 创建通道;broker 收到并创建通道完成)。客户端发送 Confirm.Select,broker 接收到后返回 Confirm.SelectOk(客户端告诉 broker 消息需要使用 confirm的机制,broker收到并回复)。。客户端发送消息 Basic.Publish,broker 应答返回 Basic.Ack。期间 客户端和 broker 会相互检查彼此的心跳 heartbeat。客户端 关闭通道 Channel.Close,broker 应答返回 Channel.CloseOk。客户端 关闭连接 Connection.Close,broker 应答返回 Connection.CloseOk。源码分析

熟悉完AMQP 协议的交互流程易于后续理解源码,开始本次主要介绍 Connection 相关的源码:ConnectionFactory.newConnection --> AMQConnection.start。

「ConnectionFactory.newConnection()」public Connection newConnection(ExecutorService executor, AddressResolver addressResolver, String clientProvidedName)

throws IOException, TimeoutException {

if(this.metricsCollector == null) {

this.metricsCollector = new NoOpMetricsCollector();

}

// make sure we respect the provided thread factory

// 创建 socketFactory 和 初始化相应的配置

FrameHandlerFactory fhFactory = createFrameHandlerFactory();

// 初始化 Connection 涉及到的参数

ConnectionParams params = params(executor);

// set client-provided via a client property

if (clientProvidedName != null) {

Mapproperties = new HashMap(params.getClientProperties());

properties.put("connection_name", clientProvidedName);

params.setClientProperties(properties);

}

// 这块逻辑属于 rabbit提供自动回复连接的逻辑

if (isAutomaticRecoveryEnabled()) {

// see com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory#newConnection

AutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory, addressResolver, metricsCollector);

conn.init();

return conn;

} else {

List

addrs = addressResolver.getAddresses();

Exception lastException = null;

for (Address addr : addrs) {

try {

// 创建、连接 socket 并封装成 返回 SocketFrameHandler (socket 不采用Negale算法[Negale算法,大家有兴趣可以了解下这块针对socket缓存性能的优化])

FrameHandler handler = fhFactory.create(addr);

// 初始化配置、_channel0、_channelManager等等

AMQConnection conn = createConnection(params, handler, metricsCollector);

// 启动 AMQConnection 后续会进行详细介绍

conn.start();

this.metricsCollector.newConnection(conn);

return conn;

} catch (IOException e) {

lastException = e;

} catch (TimeoutException te) {

lastException = te;

}

}

if (lastException != null) {

if (lastException instanceof IOException) {

throw (IOException) lastException;

} else if (lastException instanceof TimeoutException) {

throw (TimeoutException) lastException;

}

}

throw new IOException("failed to connect");

}

}

AMQP 协议的云南idc服务商交互流程中 1~6 的逻辑属于 AMQConnection.start() 的重点逻辑,也是本次给大家主要介绍的点。

public void start()

throws IOException, TimeoutException {

// 初始化工作线程

initializeConsumerWorkService();

// 初始化心跳发送

initializeHeartbeatSender();

// 将 Connection标志位 启动

this._running = true;

// 确认客户端 第一件事 发送header头部协议

AMQChannel.SimpleBlockingRpcContinuation connStartBlocker =

new AMQChannel.SimpleBlockingRpcContinuation();

// 进入Rpc队列进行阻塞,等待broker返回 connection.start method

_channel0.enqueueRpc(connStartBlocker);

try {

// The following two lines are akin to AMQChannels

// transmit() method for this pseudo-RPC.

_frameHandler.setTimeout(handshakeTimeout);

// 1. 发送header头部协议 AMQP 0-9-1

_frameHandler.sendHeader();

} catch (IOException ioe) {

_frameHandler.close();

throw ioe;

}

// 初始化启动 startMainLoop -- 为了接收和处理broker发送的消息

this._frameHandler.initialize(this);

AMQP.Connection.Start connStart;

AMQP.Connection.Tune connTune = null;

try {

// 2. 客户端等待 broker 发送的 Connection.Start

connStart =

(AMQP.Connection.Start) connStartBlocker.getReply(handshakeTimeout/2).getMethod();

// 通信的协议和版本、SASL认证机制(详细见)、语言环境以及RabbitMQ的版本信息和支持能力

_serverProperties = Collections.unmodifiableMap(connStart.getServerProperties());

Version serverVersion =

new Version(connStart.getVersionMajor(),

connStart.getVersionMinor());

// 版本比对

if (!Version.checkVersion(clientVersion, serverVersion)) {

throw new ProtocolVersionMismatchException(clientVersion,

serverVersion);

}

String[] mechanisms = connStart.getMechanisms().toString().split(" ");

SaslMechanism sm = this.saslConfig.getSaslMechanism(mechanisms);

if (sm == null) {

throw new IOException("No compatible authentication mechanism found - " +

"server offered [" + connStart.getMechanisms() + "]");

}

String username = credentialsProvider.getUsername();

String password = credentialsProvider.getPassword();

LongString challenge = null;

LongString response = sm.handleChallenge(null, username, password);

do {

// 3. 客户端接收后 发送 `Connection.StartOk`

Method method = (challenge == null)

? new AMQP.Connection.StartOk.Builder()

.clientProperties(_clientProperties)

.mechanism(sm.getName())

.response(response)

.build()

: new AMQP.Connection.SecureOk.Builder().response(response).build();

try {

Method serverResponse = _channel0.rpc(method, handshakeTimeout/2).getMethod();

if (serverResponse instanceof AMQP.Connection.Tune) {

// 4. 客户端等待 broker 发送的 Connection.Tune

connTune = (AMQP.Connection.Tune) serverResponse;

} else {

challenge = ((AMQP.Connection.Secure) serverResponse).getChallenge();

response = sm.handleChallenge(challenge, username, password);

}

} catch (ShutdownSignalException e) {

Method shutdownMethod = e.getReason();

if (shutdownMethod instanceof AMQP.Connection.Close) {

AMQP.Connection.Close shutdownClose = (AMQP.Connection.Close) shutdownMethod;

if (shutdownClose.getReplyCode() == AMQP.ACCESS_REFUSED) {

throw new AuthenticationFailureException(shutdownClose.getReplyText());

}

}

throw new PossibleAuthenticationFailureException(e);

}

} while (connTune == null);

} catch (TimeoutException te) {

_frameHandler.close();

throw te;

} catch (ShutdownSignalException sse) {

_frameHandler.close();

throw AMQChannel.wrap(sse);

} catch(IOException ioe) {

_frameHandler.close();

throw ioe;

}

try {

// 最大通道数

int channelMax =

negotiateChannelMax(this.requestedChannelMax,

connTune.getChannelMax());

_channelManager = instantiateChannelManager(channelMax, threadFactory);

// 帧最大的大小

int frameMax =

negotiatedMaxValue(this.requestedFrameMax,

connTune.getFrameMax());

this._frameMax = frameMax;

// 心跳

int heartbeat =

negotiatedMaxValue(this.requestedHeartbeat,

connTune.getHeartbeat());

setHeartbeat(heartbeat);

// 5. 客户端接收后 发送 Connection.TuneOk

_channel0.transmit(new AMQP.Connection.TuneOk.Builder()

.channelMax(channelMax)

.frameMax(frameMax)

.heartbeat(heartbeat)

.build());

// 6. 客户端发送 Channel.Open

_channel0.exnWrappingRpc(new AMQP.Connection.Open.Builder()

.virtualHost(_virtualHost)

.build());

} catch (IOException ioe) {

_heartbeatSender.shutdown();

_frameHandler.close();

throw ioe;

} catch (ShutdownSignalException sse) {

_heartbeatSender.shutdown();

_frameHandler.close();

throw AMQChannel.wrap(sse);

}

// We can now respond to errors having finished tailoring the connection

this._inConnectionNegotiation = false;

}最后

本次分享的目的,先让读者对于 RabbitMQ Client 与 RabbitMQ Broker 根据 AMQP 协议交互流程有个大体的认识,并根据分析 Connection 源码有一定认知,其中还有很多 Connection 细节源码需要读者慢慢体会。

数据库
上一篇:2、根据用户基础选择访问提供程序。由于互联问题的存在,接入商的选择也非常重要,如果用户群主要在联通,尽量选择联通接入较好的接入商,如果用户群主要在电信,那么选择电信接入较好的接入商。如果用户组位于国家/地区,则选择更好的访问提供程序进行交互。
下一篇:4、注册门槛低