redesign create_accounts_for_transaction
This commit is contained in:
parent
3465af2a13
commit
1569a6cd2f
|
@ -244,30 +244,16 @@ impl PostgresSession {
|
|||
&self,
|
||||
accounts: HashSet<String>,
|
||||
) -> anyhow::Result<()> {
|
||||
// create temp table
|
||||
let temp_table = self.get_new_temp_table();
|
||||
|
||||
self.client
|
||||
.execute(
|
||||
format!(
|
||||
"CREATE TEMP TABLE {}(
|
||||
let statement = "CREATE TEMP TABLE temp_new_accounts_raw(
|
||||
key TEXT
|
||||
);",
|
||||
temp_table
|
||||
)
|
||||
.as_str(),
|
||||
&[],
|
||||
)
|
||||
.await?;
|
||||
)";
|
||||
self.client.execute(statement, &[]).await?;
|
||||
|
||||
let statement = format!(
|
||||
r#"
|
||||
COPY {}(
|
||||
let statement = r#"
|
||||
COPY temp_new_accounts_raw(
|
||||
key
|
||||
) FROM STDIN BINARY
|
||||
"#,
|
||||
temp_table
|
||||
);
|
||||
"#.to_string();
|
||||
let started_at = Instant::now();
|
||||
let sink: CopyInSink<bytes::Bytes> = self.copy_in(statement.as_str()).await?;
|
||||
let writer = BinaryCopyInWriter::new(sink, &[Type::TEXT]);
|
||||
|
@ -282,22 +268,18 @@ impl PostgresSession {
|
|||
started_at.elapsed().as_millis()
|
||||
);
|
||||
|
||||
let statement = format!(
|
||||
r#"
|
||||
INSERT INTO banking_stage_results_2.accounts(account_key) SELECT key from {}
|
||||
ON CONFLICT DO NOTHING
|
||||
"#,
|
||||
temp_table
|
||||
);
|
||||
let statement = r#"
|
||||
INSERT INTO banking_stage_results_2.accounts(account_key) SELECT key FROM temp_new_accounts_raw
|
||||
ON CONFLICT DO NOTHING
|
||||
"#;
|
||||
let started_at = Instant::now();
|
||||
self.client.execute(statement.as_str(), &[]).await?;
|
||||
self.client.execute(statement, &[]).await?;
|
||||
debug!(
|
||||
"inserted {} account keys into accounts table in {}ms",
|
||||
num_rows,
|
||||
started_at.elapsed().as_millis()
|
||||
);
|
||||
|
||||
self.drop_temp_table(temp_table).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue