Открыть боковую панель
Баринов Сергей Владимирович
Unofficial Ydb client library written on rust
Коммиты
22d081a3
Коммит
22d081a3
создал
Май 06, 2023
по автору
Баринов Сергей Владимирович
Просмотр файлов
что-то еще
владелец
20b766ac
Изменения
5
Скрыть пробелы
Построчно
Рядом
src/client.rs
Просмотр файла @
22d081a3
...
...
@@ -8,7 +8,7 @@ use tonic::codegen::InterceptedService;
use
tonic
::
service
::
Interceptor
;
use
tonic
::
transport
::{
Endpoint
,
Channel
,
Uri
};
use
crate
::
exper
::
YdbResponse
;
use
crate
::
payload
::
YdbResponse
;
use
crate
::
generated
::
ydb
::
discovery
::
v1
::
DiscoveryServiceClient
;
use
crate
::
generated
::
ydb
::
table
::
query
::
Query
;
use
crate
::
generated
::
ydb
::
table
::
transaction_control
::
TxSelector
;
...
...
@@ -119,6 +119,7 @@ impl<C: Credentials> Drop for YdbService<C> {
}
});
}
log
::
debug!
(
"YdbService closed"
);
}
}
...
...
@@ -188,9 +189,6 @@ pub struct YdbTransaction<'a, C: Credentials> {
}
impl
<
'a
,
C
:
Credentials
>
YdbTransaction
<
'a
,
C
>
{
fn
client
(
&
mut
self
)
->
&
mut
TableClientWithSession
<
'a
,
C
>
{
&
mut
self
.client
}
pub
async
fn
create
(
mut
client
:
TableClientWithSession
<
'a
,
C
>
)
->
Result
<
YdbTransaction
<
'a
,
C
>
,
YdbError
>
{
let
tx_settings
=
Some
(
TransactionSettings
{
tx_mode
:
Some
(
TxMode
::
SerializableReadWrite
(
Default
::
default
()))});
let
response
=
client
.begin_transaction
(
BeginTransactionRequest
{
tx_settings
,
..
Default
::
default
()})
.await
?
;
...
...
@@ -230,18 +228,3 @@ impl<'a, C: Credentials> YdbTransaction<'a, C> {
}
}
struct
X
<
'a
>
{
x
:
Option
<
Y
<
'a
>>
}
struct
Y
<
'a
>
{
v
:
&
'a
mut
u32
,
}
impl
<
'a
>
X
<
'a
>
{
fn
x
(
&
mut
self
)
->
&
'a
mut
Y
{
self
.x
.as_mut
()
.unwrap
()
}
}
\ Нет новой строки в конце файла
src/lib.rs
0 → 100644
Просмотр файла @
22d081a3
pub
mod
generated
;
pub
mod
client
;
pub
mod
pool
;
mod
payload
;
pub
use
payload
::
*
;
src/main.rs
Просмотр файла @
22d081a3
#![allow(dead_code)]
use
std
::{
env
,
time
::
Duration
};
use
deadpool
::
managed
::
PoolBuilder
;
use
tokio
::
sync
::
futures
;
use
ydb_unofficial
::
generated
::
ydb
::
table
::{
ExecuteScanQueryRequest
,
ExecuteSchemeQueryRequest
};
//use ydb_grpc::ydb_proto::{discovery::{v1::discovery_service_client::DiscoveryServiceClient, WhoAmIRequest, ListEndpointsRequest}, table::{v1::table_service_client::TableServiceClient, CreateSessionRequest}};
use
exper
::
YdbResponse
;
use
pool
::
YdbPoolBuilder
;
use
ydb_unofficial
::
{
YdbResponse
,
generated
::
ydb
::
r
#
type
::
PrimitiveTypeId
}
;
use
ydb_unofficial
::
pool
::
YdbPoolBuilder
;
use
tonic
::
transport
::
Uri
;
use
ydb_unofficial
::{
pool
::
ConnectionManager
,
client
::
YdbError
,
generated
::
ydb
::{
table
::{
CreateTableRequest
,
ColumnMeta
}}};
use
crate
::
generated
::
ydb
::{
table
::{
ExecuteDataQueryRequest
,
query
::
Query
,
self
,
TransactionControl
,
TransactionSettings
,
transaction_settings
::
TxMode
,
transaction_control
::
TxSelector
},
discovery
::
ListEndpointsRequest
};
use
self
::
client
::
YdbService
;
mod
pool
;
mod
client
;
mod
exper
;
mod
generated
;
use
ydb_unofficial
::
generated
::
ydb
::{
table
::{
ExecuteDataQueryRequest
,
query
::
Query
,
self
,
TransactionControl
,
TransactionSettings
,
transaction_settings
::
TxMode
,
transaction_control
::
TxSelector
},
discovery
::
ListEndpointsRequest
};
#[tokio::main]
...
...
@@ -27,14 +21,18 @@ pub async fn main() {
//let db_name = "/local";
let
creds
=
env
::
var
(
"TOKEN"
)
.unwrap
();
let
uri
:
Uri
=
url
.try_into
()
.unwrap
();
let
ep
=
client
::
create_endpoint
(
url
.try_into
()
.unwrap
());
let
ep
=
ydb_unofficial
::
client
::
create_endpoint
(
url
.try_into
()
.unwrap
());
let
channel
=
ep
.connect
()
.await
.unwrap
();
let
pool
=
YdbPoolBuilder
::
new
(
creds
,
db_name
.try_into
()
.unwrap
(),
uri
.try_into
()
.unwrap
())
.build
()
.unwrap
();
{
let
f1
=
create_table2
(
&
pool
,
db_name
);
let
f2
=
create_table3
(
&
pool
,
db_name
);
let
res
=
tokio
::
try_join!
(
f1
,
f2
)
.unwrap
();
if
false
{
let
mut
service
=
pool
.get
()
.await
.unwrap
();
//client::Client::new(url, db_name, creds.to_owned()).await.unwrap();
let
mut
discovery
=
service
.discovery
();
let
mut
discovery
=
pool
.get
()
.await
.unwrap
();
let
mut
discovery
=
discovery
.discovery
();
//let mut client = DiscoveryServiceClient::connect("test").await.unwrap();
let
response
=
discovery
.list_endpoints
(
ListEndpointsRequest
{
database
:
db_name
.into
(),
..
Default
::
default
()})
.await
.unwrap
();
let
payload
=
response
.into_inner
()
.payload
()
.unwrap
();
...
...
@@ -43,7 +41,7 @@ pub async fn main() {
//let mut table_client = TableServiceClient::connect("").await.unwrap();
let
query
=
"SELECT 1+1 as sum, 2*2 as mul"
;
let
session
=
service
.table
()
.await
.unwrap
();
let
mut
transaction
=
client
::
YdbTransaction
::
create
(
session
)
.await
.unwrap
();
let
mut
transaction
=
ydb_unofficial
::
client
::
YdbTransaction
::
create
(
session
)
.await
.unwrap
();
let
x
=
transaction
.execute_data_query
(
ExecuteDataQueryRequest
{
query
:
Some
(
table
::
Query
{
query
:
Some
(
Query
::
YqlText
(
query
.into
()))}),
..
Default
::
default
()
...
...
@@ -70,13 +68,55 @@ pub async fn main() {
tokio
::
time
::
sleep
(
Duration
::
from_secs
(
1
))
.await
;
}
async
fn
create_table2
(
pool
:
&
deadpool
::
managed
::
Pool
<
ConnectionManager
<
String
>>
,
db_name
:
&
str
)
->
Result
<
(),
YdbError
>
{
let
mut
conn
=
pool
.get
()
.await
?
;
let
mut
conn
=
conn
.table
()
.await
?
;
let
response
=
conn
.execute_scheme_query
(
ExecuteSchemeQueryRequest
{
yql_text
:
"create table my_table2(id uint64 not null, value utf8, primary key(id))"
.to_owned
(),
..
Default
::
default
()
})
.await
?
;
log
::
error!
(
"response: {response:?}"
);
Ok
(())
}
async
fn
create_table3
(
pool
:
&
deadpool
::
managed
::
Pool
<
ConnectionManager
<
String
>>
,
db_name
:
&
str
)
->
Result
<
(),
YdbError
>
{
let
mut
conn
=
pool
.get
()
.await
?
;
let
mut
conn
=
conn
.table
()
.await
?
;
let
response
=
conn
.execute_scheme_query
(
ExecuteSchemeQueryRequest
{
yql_text
:
"create table my_table3(id uint64 not null, value utf8 not null, primary key(id))"
.to_owned
(),
..
Default
::
default
()
})
.await
?
;
log
::
error!
(
"response: {response:?}"
);
Ok
(())
}
async
fn
create_table
(
pool
:
&
deadpool
::
managed
::
Pool
<
ConnectionManager
<
String
>>
,
db_name
:
&
str
)
->
Result
<
(),
YdbError
>
{
let
str_type
=
ydb_unofficial
::
generated
::
ydb
::
Type
{
r
#
type
:
Some
(
ydb_unofficial
::
generated
::
ydb
::
r
#
type
::
Type
::
TypeId
(
PrimitiveTypeId
::
Utf8
as
i32
))};
let
str_nullable_type
=
ydb_unofficial
::
generated
::
ydb
::
Type
{
r
#
type
:
Some
(
ydb_unofficial
::
generated
::
ydb
::
r
#
type
::
Type
::
OptionalType
(
Box
::
new
(
ydb_unofficial
::
generated
::
ydb
::
OptionalType
{
item
:
Some
(
Box
::
new
(
ydb_unofficial
::
generated
::
ydb
::
Type
{
r
#
type
:
Some
(
ydb_unofficial
::
generated
::
ydb
::
r
#
type
::
Type
::
TypeId
(
PrimitiveTypeId
::
Utf8
as
i32
))}
))})
))};
let
req
=
CreateTableRequest
{
path
:
format!
(
"{db_name}/my_table"
),
columns
:
vec!
[
ColumnMeta
{
name
:
"id"
.to_owned
(),
r
#
type
:
Some
(
str_type
.clone
()),
family
:
""
.to_owned
()
},
ColumnMeta
{
name
:
"value1"
.to_owned
(),
r
#
type
:
Some
(
str_nullable_type
),
family
:
""
.to_owned
()
}
],
primary_key
:
vec!
[
"id"
.to_owned
()],
indexes
:
vec!
[],
..
Default
::
default
()
};
let
mut
conn
=
pool
.get
()
.await
?
;
let
mut
conn
=
conn
.table
()
.await
?
;
let
result
=
conn
.create_table
(
req
)
.await
?
;
log
::
error!
(
"result of create table: {result:?}"
);
Ok
(())
}
fn
init_logger
()
{
use
simplelog
::
*
;
let
mut
builder
=
ConfigBuilder
::
new
();
builder
.set_time_level
(
LevelFilter
::
Error
);
TermLogger
::
init
(
LevelFilter
::
Debug
,
builder
.build
(),
TerminalMode
::
Mixed
,
ColorChoice
::
Auto
)
.unwrap
();
TermLogger
::
init
(
LevelFilter
::
Info
,
builder
.build
(),
TerminalMode
::
Mixed
,
ColorChoice
::
Auto
)
.unwrap
();
}
...
...
src/
exper
.rs
→
src/
payload
.rs
Просмотр файла @
22d081a3
Файл перемещен
src/pool.rs
Просмотр файла @
22d081a3
...
...
@@ -6,36 +6,13 @@ use deadpool::managed::{Manager, Pool, PoolBuilder, PoolConfig, Hook};
use
tonic
::
transport
::{
Endpoint
,
Uri
};
use
tower
::
ServiceExt
;
use
crate
::
exper
::
YdbResponse
;
use
crate
::
payload
::
YdbResponse
;
use
crate
::
generated
::
ydb
::
discovery
::{
EndpointInfo
,
ListEndpointsRequest
};
use
crate
::
client
::{
Credentials
,
YdbService
,
AsciiValue
,
YdbError
};
struct
YdbEndpoint
{
inner
:
Endpoint
,
load_factor
:
f32
,
}
type
YdbEndpoints
=
std
::
sync
::
RwLock
<
Vec
<
EndpointInfo
>>
;
impl
Into
<
Endpoint
>
for
YdbEndpoint
{
fn
into
(
self
)
->
Endpoint
{
self
.inner
}
}
impl
From
<&
EndpointInfo
>
for
YdbEndpoint
{
fn
from
(
info
:
&
EndpointInfo
)
->
Self
{
let
uri
:
tonic
::
transport
::
Uri
=
format!
(
"{}:{}"
,
info
.address
,
info
.port
)
.try_into
()
.unwrap
();
let
mut
inner
=
Endpoint
::
from
(
uri
)
.tcp_keepalive
(
Some
(
std
::
time
::
Duration
::
from_secs
(
15
)));
if
info
.ssl
{
inner
=
inner
.tls_config
(
Default
::
default
())
.unwrap
()
}
Self
{
inner
,
load_factor
:
info
.load_factor
,
}
}
}
fn
make_endpoint
(
info
:
&
EndpointInfo
)
->
Endpoint
{
let
uri
:
tonic
::
transport
::
Uri
=
format!
(
"{}://{}:{}"
,
info
.scheme
(),
info
.address
,
info
.port
)
.try_into
()
.unwrap
();
...
...
Редактирование
Предварительный просмотр
Поддерживает Markdown
0%
Попробовать снова
или
прикрепить новый файл
.
Отмена
You are about to add
0
people
to the discussion. Proceed with caution.
Сначала завершите редактирование этого сообщения!
Отмена
Пожалуйста,
зарегистрируйтесь
или
войдите
чтобы прокомментировать