Skip to content

Commit

Permalink
Merge pull request #5 from IBM/interfaceAndMultiQuerySupport
Browse files Browse the repository at this point in the history
Changes added to support multiquery in simple select
  • Loading branch information
sandippawar1412 committed Aug 27, 2019
2 parents 69fc89b + 18fb19c commit 9e07f2f
Showing 1 changed file with 64 additions and 52 deletions.
116 changes: 64 additions & 52 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1140,48 +1140,8 @@ func (cn *conn) getFileFromBE(logDir string, filename string, logType int) bool
return status
}

func (cn *conn) simpleQuery(query string) (res *rows, err error) {

defer cn.errRecover(&err)
func (cn *conn) connNextResultSet(query string) (res *rows, err error) {
var filename readBuf

if cn.status == CONN_EXECUTING || cn.status == CONN_FETCHING {
cn.status = CONN_CONNECTED
cn.Sock_clear_socket()
} else if cn.status == CONN_CANCELLED {
// Control will reach here only when the query was really huge and
// even after Cancel request sent, it took too long to cancel and
// Conn_clear_sock returned as data was not yet available
cn.Sock_clear_socket()
}
elog.Infoln("Processing query:", query)

buffer := &writeBuf{
buf: []byte{'P', '\xFF', '\xFF', '\xFF', '\xFF'},
pos: 1,
}

if cn.commandNumber != -1 {
cn.commandNumber++
buffer = &writeBuf{
buf: []byte{'P', '\x00', '\x00', '\x00', byte(cn.commandNumber)},
pos: 1,
}
if cn.commandNumber > 100000 {
cn.commandNumber = 1
}
}

buffer.string(query)
elog.Debugln(chopPath(funName()), "Buffer sent to nps: ", buffer.buf)

_, err = cn.c.Write(buffer.buf)
if err != nil {
panic(err)
}

cn.status = CONN_EXECUTING

for {
response, err := cn.recvSingleByte()
if err != nil {
Expand All @@ -1192,19 +1152,18 @@ func (cn *conn) simpleQuery(query string) (res *rows, err error) {
switch response {

case 'A': /* Asynchronous Messages are ignored */
fallthrough
case 0: /* Ignored any null characters */
fallthrough
case 'L': /* This is receieved from server for batch queries after processing rows */
break
case 'C': /* portal query command, no tuples returned */
length, _ := cn.recv_n_bytes(4)
responseBuf, err := cn.recv_n_bytes(int(length.int32()))
elog.Debugf(chopPath(funName()), "response received from backend: %s \n", responseBuf.string())
return res, err
responseBuf, _ := cn.recv_n_bytes(int(length.int32()))
resStr := fmt.Sprintf("%s", responseBuf.string())
elog.Debugf(chopPath(funName()), "response received from backend: %s \n", resStr)
break
case 'Z':
length, _ := cn.recv_n_bytes(4)
responseBuf, err := cn.recv_n_bytes(int(length.int32()))
cn.processReadyForQuery(&responseBuf)
if res == nil && err == nil {
err = errUnexpectedReady
}
return res, err
case 'N':
length, _ := cn.recv_n_bytes(4)
Expand All @@ -1215,7 +1174,6 @@ func (cn *conn) simpleQuery(query string) (res *rows, err error) {
responseBuf, _ := cn.recv_n_bytes(int(length.int32()))
elog.Debugf(chopPath(funName()), "response received from backend: %s \n", responseBuf.string())
break

case 'T':
length, _ := cn.recv_n_bytes(4)
responseBuf, err := cn.recv_n_bytes(int(length.int32()))
Expand Down Expand Up @@ -1265,6 +1223,51 @@ func (cn *conn) simpleQuery(query string) (res *rows, err error) {
}
}

func (cn *conn) simpleQuery(query string) (res *rows, err error) {

defer cn.errRecover(&err)

if cn.status == CONN_EXECUTING || cn.status == CONN_FETCHING {
cn.status = CONN_CONNECTED
cn.Sock_clear_socket()
} else if cn.status == CONN_CANCELLED {
// Control will reach here only when the query was really huge and
// even after Cancel request sent, it took too long to cancel and
// Conn_clear_sock returned as data was not yet available
cn.Sock_clear_socket()
}
elog.Infoln("Processing query:", query)

buffer := &writeBuf{
buf: []byte{'P', '\xFF', '\xFF', '\xFF', '\xFF'},
pos: 1,
}

if cn.commandNumber != -1 {
cn.commandNumber++
buffer = &writeBuf{
buf: []byte{'P', '\x00', '\x00', '\x00', byte(cn.commandNumber)},
pos: 1,
}
if cn.commandNumber > 100000 {
cn.commandNumber = 1
}
}

buffer.string(query)
elog.Debugln(chopPath(funName()), "Buffer sent to nps: ", buffer.buf)

_, err = cn.c.Write(buffer.buf)
if err != nil {
panic(err)
}

cn.status = CONN_EXECUTING

return cn.connNextResultSet(query)

}

type noRows struct{}

var emptyRows noRows
Expand Down Expand Up @@ -2015,7 +2018,12 @@ func (res *rows) Next(dest []driver.Value) (err error) {

case 'C':
elog.Debugln(chopPath(funName()), "All Rows fetched")
res.done = true
re, _ := cn.connNextResultSet("")
if re == nil {
res.done = true
} else {
res.next = &re.rowsHeader
}
return io.EOF

case 'D':
Expand All @@ -2026,6 +2034,7 @@ func (res *rows) Next(dest []driver.Value) (err error) {
cn.saveMessage(response, &responseBuf)
res.readTuples(dest)
return

case 'X': // get dbos tuple descriptor
cn.recv_n_bytes(4)
length, _ := cn.recv_n_bytes(4)
Expand All @@ -2039,6 +2048,9 @@ func (res *rows) Next(dest []driver.Value) (err error) {
res.status = PGRES_TUPLES_OK
res.Res_read_dbos_tuple(dest)
return /* continue reading */
case 0:
res.done = true
return io.EOF
default:
elog.Fatalf(chopPath(funName()), "Unknown response: %d", response)
}
Expand Down

0 comments on commit 9e07f2f

Please sign in to comment.