Коммит 07c1f836 создал по автору Баринов Сергей Владимирович's avatar Баринов Сергей Владимирович
Просмотр файлов

some updates

владелец fb6c44a1
......@@ -8,23 +8,12 @@ use tonic::transport::{Endpoint, Channel, Uri};
use crate::generated::ydb::discovery::v1::DiscoveryServiceClient;
use crate::generated::ydb::discovery::{ListEndpointsResult, ListEndpointsRequest};
use crate::generated::ydb::table::v1::table_service_client::TableServiceClient;
//use ydb_grpc::ydb_proto::discovery::{v1::discovery_service_client::DiscoveryServiceClient, WhoAmIRequest, WhoAmIResponse, ListEndpointsRequest, WhoAmIResult, ListEndpointsResult};
pub type AsciiValue = tonic::metadata::MetadataValue<tonic::metadata::Ascii>;
pub fn create_ydb_service<C: Credentials>(channel: Channel, db_name: String, creds: C) -> YdbService<C> {
let db_name = db_name.try_into().unwrap();
let interceptor = DBInterceptor {db_name, creds};
let service = tower::ServiceBuilder::new()
.layer(tonic::service::interceptor(interceptor))
.layer_fn(|x|x)
.service(channel);
YdbService(service)
}
pub fn create_endpoint(uri: Uri) -> Endpoint {
let mut res = Endpoint::from(uri);
if matches!(res.uri().scheme_str(), Some("grpcs")) {
......@@ -34,50 +23,6 @@ pub fn create_endpoint(uri: Uri) -> Endpoint {
}
#[derive(Debug)]
pub struct Client<C: Credentials + Clone> {
endpoints: Vec<Endpoint>,
channel: Channel,
interceptor: DBInterceptor<C>,
}
impl<C: Credentials + Clone> Client<C> {
pub async fn new<E: TryInto<Endpoint>>(endpoint: E, db_name: &str, creds: C) -> Result<Self, Box<dyn Error>> where E::Error : std::error::Error + 'static {
let db_name = db_name.try_into()?;
let endpoint: Endpoint = endpoint.try_into()?;
let endpoint = endpoint.tcp_keepalive(Some(std::time::Duration::from_secs(15)));
//.tls_config(tonic::transport::ClientTlsConfig::new())?;
let channel = endpoint.connect().await?;
let interceptor = DBInterceptor {db_name, creds};
let result = Self::list_endpoints(channel.clone(), interceptor.clone()).await?;
let mut endpoints = Vec::with_capacity(result.endpoints.len());
for e in result.endpoints {
println!("endpoint: {e:?}");
let endpoint: Endpoint = e.address.try_into()?;
endpoints.push(Self::correct_endpoint(endpoint));
}
let me = Self{endpoints, channel, interceptor};
Ok(me)
}
fn correct_endpoint(endpoint: Endpoint) -> Endpoint {
match endpoint.uri().scheme_str() {
Some("grpcs") => endpoint.tls_config(tonic::transport::ClientTlsConfig::new()).unwrap(),
_ => endpoint
}.tcp_keepalive(Some(std::time::Duration::from_secs(15)))
}
async fn list_endpoints(channel: Channel, interceptor: DBInterceptor<C>) -> Result<ListEndpointsResult, Box<dyn Error>> {
let database = interceptor.db_name();
let req = ListEndpointsRequest {
database,
..ListEndpointsRequest::default()
};
println!("req: {req:?}\n");
let mut discovery = DiscoveryServiceClient::with_interceptor(channel, interceptor);
let response = discovery.list_endpoints(req).await?.into_inner();
ListEndpointsResult::decode(response.operation.unwrap().result.unwrap().value.as_slice()).map_err(Into::into)
}
}
pub trait Credentials: Clone {
fn token(&self) -> AsciiValue;
}
......@@ -97,12 +42,6 @@ pub struct DBInterceptor<C: Clone> {
creds: C
}
impl<C: Clone> DBInterceptor<C> {
fn db_name(&self) -> String {
self.db_name.to_str().unwrap().to_owned()
}
}
impl<C: Credentials> Interceptor for DBInterceptor<C> {
fn call(&mut self, mut request: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
let headers = request.metadata_mut();
......@@ -134,4 +73,21 @@ impl<C: Credentials> Service<Body> for YdbService<C> {
fn call(&mut self, request: tonic::codegen::http::Request<tonic::body::BoxBody>) -> Self::Future {
self.0.call(request)
}
}
impl<C: Credentials> YdbService<C> {
pub fn new(channel: Channel, db_name: AsciiValue, creds: C) -> Self {
let interceptor = DBInterceptor {db_name, creds};
let service = tower::ServiceBuilder::new()
.layer(tonic::service::interceptor(interceptor))
.layer_fn(|x|x)
.service(channel);
YdbService(service)
}
pub fn discovery(self) -> DiscoveryServiceClient<Self> {
DiscoveryServiceClient::new(self)
}
pub fn table(self) -> TableServiceClient<Self> {
TableServiceClient::new(self)
}
}
\ Нет новой строки в конце файла
......@@ -31,22 +31,22 @@ pub async fn main() {
let db_name = "/ru-central1/b1gtv82sacrcnutlfktm/etn8sgrgdbp7jqv64k9f";
//let url = "grpcs://localhost:2135";
//let db_name = "/local";
let creds = "t1.9euelZrOz5mWmMaQnI2eno-TlJbLyO3rnpWamcmNip2Tk46RxpyZlpuTyo_l8_cQOgBe-e8uCAIV_t3z91BofV357y4IAhX-.aXebaJZxBI7mtfYjDDRNT1opYrO1e1g8dlC4AzYQstnYxxB5KS32uDwLWi7UXxSUG-ay6r2I5CJhyfjnnnhWCA";
let creds = env!("TOKEN");
let tls_config = ClientTlsConfig::new().ca_certificate(CERT.clone());
//println!("tls config: {tls_config:?}");
let ep = client::create_endpoint(url.try_into().unwrap()).tls_config(tls_config).unwrap();
let channel = ep.connect().await.unwrap();
let service = client::create_ydb_service(channel, db_name.into(), creds.to_owned());
let service = YdbService::new(channel, db_name.try_into().unwrap(), creds.to_owned());
//client::Client::new(url, db_name, creds.to_owned()).await.unwrap();
let mut client = create_discovery_client(&service);
let mut client = service.clone().discovery();
//let mut client = DiscoveryServiceClient::connect("test").await.unwrap();
let response = client.list_endpoints(ListEndpointsRequest{database: db_name.into(), ..Default::default()}).await.unwrap();
let payload = response.into_inner().payload().unwrap();
println!("payload: {payload:?}");
let table_client = TableServiceClient::new(service.clone());
let table_client = service.clone().table();
//let mut table_client = TableServiceClient::connect("").await.unwrap();
......@@ -95,7 +95,7 @@ impl<T> Baz<T> where T: Foo, T::Inner: Bar,
}
fn test() {
let baz = Baz::new(1);
let baz: Baz<i32> = Baz::new(1);
let s = baz.foo();
}
......
Поддерживает Markdown
0% или .
You are about to add 0 people to the discussion. Proceed with caution.
Сначала завершите редактирование этого сообщения!
Пожалуйста, зарегистрируйтесь или чтобы прокомментировать