proto 基础语法: //指定proto3语法 syntax ="proto3"; //指定作用域 package xxx; //java_multiple_files = true; 表示在生成Java代码时,每个`.proto`文件都会生成一个独立的Java文件 //声明 rpc 服务接口 //关键字: service 声明需要生成的服务接口"类"service Greeter { // 关键字: rpc 声明服务方法,包括方法名、请求消息(请求体)、相应消息(响应体) rpc SayHello(HelloRequest) returns (HelloResponse); } //声明请求、响应消息 //关键字: message 声明请求体和响应体 message HelloRequest { //标识号 1 //编号的范围为1 ~ 536,870,911(2^29-1),其中19000~19999不可用。因为Protobuf协议的实现过程中对预留了这些编号 string name = 1; //表示一个人有多个号码 repeated string phone = 3; } message HelloResponse { string message = 1; } 调用类型:
Unary RPC一元RPC调用,也叫简单RPC调用
rpc SayHello(HelloRequest) returns (HelloResponse);【服务端Server Stream RPC】流式RPC. 客户端向服务端发送单个请求,服务端以流的方式返回一系列消息。客户端从流中读取消息,直到没有更多的消息。当然,返回的消息当然不是乱序的,gRPC保证单个请求中的消息顺序
rpc LotsOfReplies(HelloRequest) returns (stream HelloResponse);【客户端Client Stream RPC】流式RPC调用。客户端向服务端请求一系列的消息,一旦客户端完成消息写入,就会等待服务端读取所有消息并处理它们。gRPC同样会保证单个请求中消息的顺序性
rpc LotsOfGreetings(stream HelloRequest) returns (HelloResponse);【双向流式调用Bidirectional Streaming RPC】就是客户端和服务端均以流的方式进行读写消息。这两个流式完全独立的,因此,服务端和客户端可以按照他们喜欢的方式写入和读取流。比如:服务端可以在等待所有的客户端消息发送到后,再处理;也可以交替读取请求、写入响应信息等。当然,每个流中的消息顺序是可以保证的。
rpc BidiHello(stream HelloRequest) returns (stream HelloResponse);Channels通道
gRPC通道提供了一条链接到指定主机和端口号的服务端的链接。他在创建客户端stub的时候使用。客户端可以指定通道参数来修改gRPC的默认行为,例如:打开或者关闭消息压缩。一个通道是有连接和空闲两个状态的
mvn 插件整合,proto编译,生成代码
大致流程:
服务端代码:
import com.meta.HtMetaInfoSyncServiceGrpc; import com.meta.HtMetaSyncRequest; import com.meta.HtMetaSyncResponse; import com.meta.SyncType; import com.util.JsonUtil; import com.vo.HtMetaClusterInfoVo; import io.grpc.stub.StreamObserver; import lombok.extern.slf4j.Slf4j; import net.devh.boot.grpc.server.service.GrpcService; import java.util.List; /** * @author 何永豪 * @className HtMetaSyncService * @description TODO * @date 2023/11/6 15:25 */ @Slf4j @GrpcService public class HtMetaSyncService extends HtMetaInfoSyncServiceGrpc.HtMetaInfoSyncServiceImplBase { @Override public void syncMeta(HtMetaSyncRequest request, StreamObserveryml
server: port: 8088 spring: application: name: local-client grpc: client: # gRPC配置的名字,GrpcClient注解会用到 local-grpc-server: # gRPC服务端地址 address: 'static://127.0.0.1:9898' enableKeepAlive: true keepAliveWithoutCalls: true #认证类型,无加密 negotiationType: plaintext客户端stub
import com.meta.HtMetaInfoSyncServiceGrpc; import com.meta.HtMetaSyncRequest; import com.meta.HtMetaSyncResponse; import com.meta.SyncType; import io.grpc.StatusRuntimeException; import net.devh.boot.grpc.client.inject.GrpcClient; import org.springframework.stereotype.Service; /** * @author xiaoshu */ @Service public class HtMetaInfoSyncClient { @GrpcClient("local-grpc-server") private HtMetaInfoSyncServiceGrpc.HtMetaInfoSyncServiceBlockingStub stub; public String syncMeta(final String json,final SyncType syncType) { try { HtMetaSyncResponse htMetaSyncResponse = stub.syncMeta(( HtMetaSyncRequest.newBuilder() .setSyncJson(json) .setSyncType(syncType) .build())); String code = htMetaSyncResponse.getCode(); String msg = htMetaSyncResponse.getMsg(); return code+":"+msg; } catch (final StatusRuntimeException e) { return"FAILED with"+ e.getStatus().getCode().name(); } } } 实际调用: @Autowired private HtMetaInfoSyncClient htMetaInfoSyncClient; @RequestMapping("/htMetaSync") public String htMetaSync() { String json=""; return htMetaInfoSyncClient.syncMeta(json, SyncType.ADD); } 原生集成启动grpc服务监听:
import com.annotation.GrpcService; import io.grpc.BindableService; import io.grpc.Server; import io.grpc.netty.GrpcSslContexts; import io.grpc.netty.NettyServerBuilder; import io.netty.handler.ssl.ClientAuth; import io.netty.handler.ssl.SslContextBuilder; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.boot.CommandLineRunner; import org.springframework.context.support.AbstractApplicationContext; import org.springframework.core.type.AnnotatedTypeMetadata; import javax.annotation.Resource; import java.io.File; import java.io.IOException; import java.lang.annotation.Annotation; import java.util.Map; import java.util.stream.Stream; /** * @author 何永豪 * @className GrpcServer * @description TODO * @date 2023/11/7 15:56 */ @Slf4j public class GrpcServer implements CommandLineRunner, DisposableBean { private Server server; @Resource private AbstractApplicationContext applicationContext; @Resource private GrpcProperties grpcProperties; @Override public void destroy() { stop(); } @Override public void run(String... args) throws Exception { start(); } private SslContextBuilder getSslContextBuilder(){ SslContextBuilder sslContextBuilder = SslContextBuilder.forServer(new File(grpcProperties.getServerCertPath()), new File(grpcProperties.getServerPrivateKeyPath())); sslContextBuilder.trustManager(new File(grpcProperties.getServerTrustCertPath())); sslContextBuilder.clientAuth(ClientAuth.REQUIRE); return GrpcSslContexts.configure(sslContextBuilder); } private void start() throws IOException { NettyServerBuilder nettyServerBuilder = NettyServerBuilder .forPort(grpcProperties.getPort()); // .sslContext(getSslContextBuilder().build() // ); scanBeanWithAnnotation(GrpcService.class, BindableService.class) .forEach(e->{ BindableService bindableService = applicationContext.getBeanFactory().getBean(e, BindableService.class); nettyServerBuilder.addService(bindableService.bindService()); }); server = nettyServerBuilder.build().start(); log.info("grpc start listen {}",grpcProperties.getPort()); Thread thread = new Thread(() -> { try { GrpcServer.this.blockUntilShutdown(); } catch (InterruptedException e) { log.error("grpc server stopped"); throw new RuntimeException(e); } }); thread.setDaemon(false); thread.start(); } private void stop(){ if (server !=null){ server.shutdown(); } } private void blockUntilShutdown() throws InterruptedException { if (server !=null ){ server.awaitTermination(); } } private服务端:
@Slf4j @GrpcService public class HtMetaSyncService extends HtMetaInfoSyncServiceGrpc.HtMetaInfoSyncServiceImplBase { @Override public void syncMeta(HtMetaSyncRequest request, StreamObserver客户端
import com.config.GrpcProperties; import com.meta.HtMetaInfoSyncServiceGrpc; import com.meta.HtMetaSyncRequest; import com.meta.HtMetaSyncResponse; import com.meta.SyncType; import io.grpc.ManagedChannel; import io.grpc.StatusRuntimeException; import io.grpc.netty.NettyChannelBuilder; import lombok.SneakyThrows; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.Objects; /** * @author xiaoshu */ @Component public class HtMetaInfoSyncClient { @Resource private GrpcProperties grpcProperties; @SneakyThrows public String syncMeta(final String json,final SyncType syncType) { ManagedChannel channel = null; try { channel=NettyChannelBuilder. forAddress(grpcProperties.getServerIp(),grpcProperties.getPort()) //非加密连接 .usePlaintext() //加密 //.sslContext(grpcProperties.buildClentSslContext()) .build(); HtMetaInfoSyncServiceGrpc.HtMetaInfoSyncServiceBlockingStub stub = HtMetaInfoSyncServiceGrpc.newBlockingStub(channel); HtMetaSyncResponse htMetaSyncResponse = stub.syncMeta(( HtMetaSyncRequest.newBuilder() .setSyncJson(json) .setSyncType(syncType) .build())); String code = htMetaSyncResponse.getCode(); String msg = htMetaSyncResponse.getMsg(); return code+":"+msg; } catch (final StatusRuntimeException e) { return"FAILED with"+ e.getStatus().getCode().name(); }finally { if (Objects.nonNull(channel)){ channel.shutdown(); } } } } import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author 何永豪 * @className GrpcConfig * @description TODO * @date 2023/11/7 16:58 */ @Configuration public class GrpcConfig { @Bean @ConditionalOnProperty(value ="grpc.enable",havingValue ="true",matchIfMissing = true) public GrpcServer grpcServer(){ return new GrpcServer(); } } @Component @ConfigurationProperties(prefix ="grpc") @Data public class GrpcProperties { private Integer port; private String serverIp; private String serverCertPath; private String serverPrivateKeyPath; private String serverTrustCertPath; private String clientCertPath; private String clientCertChainPath; private String clientPrivateKeyPath; } server: port: 8077 spring: application: name: grpc-api grpc: #监听端口 port: 6100 #目标IP server-ip: 127.0.0.1 #ssl配置 server-cert-path: /ca/server/server.crt server-private-key-path: /ca/server/server.pem server-trust-cert-path: /ca/server/ca.crt