diff options
Diffstat (limited to 'vendor/cloud.google.com/go/bigtable/reader.go')
| -rw-r--r-- | vendor/cloud.google.com/go/bigtable/reader.go | 250 |
1 files changed, 0 insertions, 250 deletions
diff --git a/vendor/cloud.google.com/go/bigtable/reader.go b/vendor/cloud.google.com/go/bigtable/reader.go deleted file mode 100644 index 4af2f7020..000000000 --- a/vendor/cloud.google.com/go/bigtable/reader.go +++ /dev/null @@ -1,250 +0,0 @@ -/* -Copyright 2016 Google Inc. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package bigtable - -import ( - "bytes" - "fmt" - - btpb "google.golang.org/genproto/googleapis/bigtable/v2" -) - -// A Row is returned by ReadRows. The map is keyed by column family (the prefix -// of the column name before the colon). The values are the returned ReadItems -// for that column family in the order returned by Read. -type Row map[string][]ReadItem - -// Key returns the row's key, or "" if the row is empty. -func (r Row) Key() string { - for _, items := range r { - if len(items) > 0 { - return items[0].Row - } - } - return "" -} - -// A ReadItem is returned by Read. A ReadItem contains data from a specific row and column. -type ReadItem struct { - Row, Column string - Timestamp Timestamp - Value []byte -} - -// The current state of the read rows state machine. -type rrState int64 - -const ( - newRow rrState = iota - rowInProgress - cellInProgress -) - -// chunkReader handles cell chunks from the read rows response and combines -// them into full Rows. -type chunkReader struct { - state rrState - curKey []byte - curFam string - curQual []byte - curTS int64 - curVal []byte - curRow Row - lastKey string -} - -// newChunkReader returns a new chunkReader for handling read rows responses. -func newChunkReader() *chunkReader { - return &chunkReader{state: newRow} -} - -// Process takes a cell chunk and returns a new Row if the given chunk -// completes a Row, or nil otherwise. -func (cr *chunkReader) Process(cc *btpb.ReadRowsResponse_CellChunk) (Row, error) { - var row Row - switch cr.state { - case newRow: - if err := cr.validateNewRow(cc); err != nil { - return nil, err - } - - cr.curRow = make(Row) - cr.curKey = cc.RowKey - cr.curFam = cc.FamilyName.Value - cr.curQual = cc.Qualifier.Value - cr.curTS = cc.TimestampMicros - row = cr.handleCellValue(cc) - - case rowInProgress: - if err := cr.validateRowInProgress(cc); err != nil { - return nil, err - } - - if cc.GetResetRow() { - cr.resetToNewRow() - return nil, nil - } - - if cc.FamilyName != nil { - cr.curFam = cc.FamilyName.Value - } - if cc.Qualifier != nil { - cr.curQual = cc.Qualifier.Value - } - cr.curTS = cc.TimestampMicros - row = cr.handleCellValue(cc) - - case cellInProgress: - if err := cr.validateCellInProgress(cc); err != nil { - return nil, err - } - if cc.GetResetRow() { - cr.resetToNewRow() - return nil, nil - } - row = cr.handleCellValue(cc) - } - - return row, nil -} - -// Close must be called after all cell chunks from the response -// have been processed. An error will be returned if the reader is -// in an invalid state, in which case the error should be propagated to the caller. -func (cr *chunkReader) Close() error { - if cr.state != newRow { - return fmt.Errorf("invalid state for end of stream %q", cr.state) - } - return nil -} - -// handleCellValue returns a Row if the cell value includes a commit, otherwise nil. -func (cr *chunkReader) handleCellValue(cc *btpb.ReadRowsResponse_CellChunk) Row { - if cc.ValueSize > 0 { - // ValueSize is specified so expect a split value of ValueSize bytes - if cr.curVal == nil { - cr.curVal = make([]byte, 0, cc.ValueSize) - } - cr.curVal = append(cr.curVal, cc.Value...) - cr.state = cellInProgress - } else { - // This cell is either the complete value or the last chunk of a split - if cr.curVal == nil { - cr.curVal = cc.Value - } else { - cr.curVal = append(cr.curVal, cc.Value...) - } - cr.finishCell() - - if cc.GetCommitRow() { - return cr.commitRow() - } else { - cr.state = rowInProgress - } - } - - return nil -} - -func (cr *chunkReader) finishCell() { - ri := ReadItem{ - Row: string(cr.curKey), - Column: fmt.Sprintf("%s:%s", cr.curFam, cr.curQual), - Timestamp: Timestamp(cr.curTS), - Value: cr.curVal, - } - cr.curRow[cr.curFam] = append(cr.curRow[cr.curFam], ri) - cr.curVal = nil -} - -func (cr *chunkReader) commitRow() Row { - row := cr.curRow - cr.lastKey = cr.curRow.Key() - cr.resetToNewRow() - return row -} - -func (cr *chunkReader) resetToNewRow() { - cr.curKey = nil - cr.curFam = "" - cr.curQual = nil - cr.curVal = nil - cr.curRow = nil - cr.curTS = 0 - cr.state = newRow -} - -func (cr *chunkReader) validateNewRow(cc *btpb.ReadRowsResponse_CellChunk) error { - if cc.GetResetRow() { - return fmt.Errorf("reset_row not allowed between rows") - } - if cc.RowKey == nil || cc.FamilyName == nil || cc.Qualifier == nil { - return fmt.Errorf("missing key field for new row %v", cc) - } - if cr.lastKey != "" && cr.lastKey >= string(cc.RowKey) { - return fmt.Errorf("out of order row key: %q, %q", cr.lastKey, string(cc.RowKey)) - } - return nil -} - -func (cr *chunkReader) validateRowInProgress(cc *btpb.ReadRowsResponse_CellChunk) error { - if err := cr.validateRowStatus(cc); err != nil { - return err - } - if cc.RowKey != nil && !bytes.Equal(cc.RowKey, cr.curKey) { - return fmt.Errorf("received new row key %q during existing row %q", cc.RowKey, cr.curKey) - } - if cc.FamilyName != nil && cc.Qualifier == nil { - return fmt.Errorf("family name %q specified without a qualifier", cc.FamilyName) - } - return nil -} - -func (cr *chunkReader) validateCellInProgress(cc *btpb.ReadRowsResponse_CellChunk) error { - if err := cr.validateRowStatus(cc); err != nil { - return err - } - if cr.curVal == nil { - return fmt.Errorf("no cached cell while CELL_IN_PROGRESS %v", cc) - } - if cc.GetResetRow() == false && cr.isAnyKeyPresent(cc) { - return fmt.Errorf("cell key components found while CELL_IN_PROGRESS %v", cc) - } - return nil -} - -func (cr *chunkReader) isAnyKeyPresent(cc *btpb.ReadRowsResponse_CellChunk) bool { - return cc.RowKey != nil || - cc.FamilyName != nil || - cc.Qualifier != nil || - cc.TimestampMicros != 0 -} - -// Validate a RowStatus, commit or reset, if present. -func (cr *chunkReader) validateRowStatus(cc *btpb.ReadRowsResponse_CellChunk) error { - // Resets can't be specified with any other part of a cell - if cc.GetResetRow() && (cr.isAnyKeyPresent(cc) || - cc.Value != nil || - cc.ValueSize != 0 || - cc.Labels != nil) { - return fmt.Errorf("reset must not be specified with other fields %v", cc) - } - if cc.GetCommitRow() && cc.ValueSize > 0 { - return fmt.Errorf("commit row found in between chunks in a cell") - } - return nil -} |
