Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

DO NOT MERGE: Filehasher #2022

Open
wants to merge 67 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
a8f3fc4
storage: Elaborate comment on Encryption.Reset
nolash Dec 4, 2019
22839fb
storage: Add Reset() to interface
nolash Dec 4, 2019
df3f3b7
file: Add reference filehasher pre-cleanup state
nolash Nov 27, 2019
472b6fd
file: Reference hasher commenting and partial cleanup
nolash Nov 27, 2019
01d5af2
file: Replaced hash data size calc with util.go function, add comments
nolash Nov 28, 2019
872e93e
file: Add test for dangling chunk, correct last test case param
nolash Nov 28, 2019
181e0bf
file: Add correct, non-optimized new hasher implementation
nolash Nov 27, 2019
bc1e288
file: Fix races
nolash Nov 28, 2019
300d373
file: Use all test cases in vector test even if fail
nolash Nov 28, 2019
8d3e020
file: Use local chan pointer to avoid tight loop in doneC
nolash Nov 28, 2019
781cb6b
file: Fix race in target members read
nolash Nov 28, 2019
a528496
file: Cleanup commented code
nolash Nov 28, 2019
a418656
file: Remove (properly) tight loop in doneC select
nolash Nov 28, 2019
f76e00f
file: Set target doneC to nil if target set on job create
nolash Nov 28, 2019
4b87da3
file: Move serialdata generation in bench outside bench loop
nolash Nov 28, 2019
988ff37
file: Lock dummy writer data buffer access
nolash Nov 28, 2019
243b92b
file: Remove commented code
nolash Nov 28, 2019
e491ee0
file: Update Write and Sum to correct job object interface
nolash Nov 28, 2019
bec33a0
file: Add hasher benchmark
nolash Nov 30, 2019
45db7d9
file: Make Hasher writers asynchronous within jobs
nolash Nov 30, 2019
21d25b7
file: Add explicit proof of ref hash on chunkSize*branches+chunkSize
nolash Dec 1, 2019
04705b6
file: Complete comments
nolash Dec 1, 2019
b4ba44a
file: Delint
nolash Dec 3, 2019
e4f8fc6
file: Rename files and extract index, target to separate files
nolash Dec 3, 2019
d300b86
file: Extract job sum into separate method
nolash Dec 3, 2019
92c2c7e
file: Extract split to separate file, implement bmt.SectionWriter
nolash Dec 3, 2019
5407b62
file: Add comments
nolash Dec 3, 2019
e5d29cb
file, param: Add package for global interfaces and settings
nolash Dec 3, 2019
91c0df1
file: Separate splitter and hasher in different packages
nolash Dec 3, 2019
e1f4283
file: Adjust interface to add context
nolash Dec 3, 2019
835c4b4
file: Add SectionWriter storer interface for chunks
nolash Dec 4, 2019
4f26df0
file: Add test for FileStore SectionWriter
nolash Dec 4, 2019
eaf0af1
file: Add FileStore integration test with Splitter
nolash Dec 4, 2019
e5fd998
file, bmt: Wrap bmt.Hasher in param.SectionWriter
nolash Dec 4, 2019
a0e9ffd
file: Add test, chunk.Store sink for both data+intermediate hashers
nolash Dec 4, 2019
9dab8da
file: Add encrypt writer stub and cache store as local testutil
nolash Dec 4, 2019
afc7001
file: Test encryption pipeline with one chunk
nolash Dec 4, 2019
3953663
file: Add key derivation to filehasher encrypt
nolash Dec 4, 2019
ef446f4
file, param: Add test to verify buffer-neutral encryption
nolash Dec 4, 2019
d0c7833
file: Add first part of intermediate chunk test
nolash Dec 4, 2019
fef9be3
file: Add Encryption.Reset to file/encrypt Reset
nolash Dec 4, 2019
77d25a3
file: Add multi-section write capability to job
nolash Dec 6, 2019
35b91ae
file, bmt, param: Add dynamic branch/section write and tests
nolash Dec 6, 2019
3132569
file: Add test for endcount calculation with differing sectionsize/br…
nolash Dec 6, 2019
d783260
file: Use same bmt for data and intermediate
nolash Dec 6, 2019
e778505
file, param, bmt: Amend split and store after bmt change
nolash Dec 6, 2019
5dc2d85
file: Amend encrypt to compile, fix benchmarks
nolash Dec 6, 2019
57c79ed
file, bmt, param: Add Connect method to IF to set underlying writer
nolash Dec 6, 2019
edd6f6e
file: Simplify code by adding function for BMT hashfunc create
nolash Dec 6, 2019
888d38f
file: Extract generate span sizes to separate function
nolash Dec 9, 2019
f790e33
file, bmt: Simplify ReferenceHasher
nolash Dec 9, 2019
1c877e4
file: Fix wrong call in ReferenceHasher benchmark
nolash Dec 9, 2019
83edc4b
bmt, param: Use hash.Hash for SectionWriter, implement in bmt.Hasher
nolash Dec 9, 2019
d18b51c
file: Move reference hasher to own package, bmt compiles w changes
nolash Dec 9, 2019
63d2149
bmt: Tests pass after changes to interface
nolash Dec 9, 2019
685fa40
bmt: Fix zerohash bug in sync BMT
nolash Dec 9, 2019
677d023
bmt, param: Cleanup, implement proper seek for async
nolash Dec 9, 2019
84749e0
file: Fix file/testutillocal/Cache
nolash Dec 9, 2019
b28375a
file: Move reference back, make hasher tests compile
nolash Dec 9, 2019
28e424c
file, bmt: Implement param.SectionWriter for all in file
nolash Dec 10, 2019
d64359a
bmt: Set lock across SeekSection() and Write() for async
nolash Dec 10, 2019
9dc2351
file: Fix dummySectionWriter test fails
nolash Dec 10, 2019
fad18c1
bmt, file, param: Make all tests in file/hasher pass
nolash Dec 10, 2019
94c6cb6
bmt: rehabilitiate file/encrypt, make bmt tests pass
nolash Dec 10, 2019
12355cb
storage, bmt: Replace ResetWithLength with param.SectionWriter.SpanBytes
nolash Dec 10, 2019
0fbe5f8
bmt, param: Cleanup + implement SetSpanBytes()
nolash Dec 10, 2019
4d85352
file, bmt: Cleanup
nolash Dec 10, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 175 additions & 48 deletions bmt/bmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,17 @@
package bmt

import (
"context"
"encoding/binary"
"errors"
"fmt"
"hash"
"strings"
"sync"
"sync/atomic"

"github.com/ethersphere/swarm/log"
"github.com/ethersphere/swarm/param"
)

/*
Expand Down Expand Up @@ -60,6 +66,10 @@ const (
PoolSize = 8
)

var (
zeroSpan = make([]byte, 8)
)

// BaseHasherFunc is a hash.Hash constructor function used for the base hash of the BMT.
// implemented by Keccak256 SHA3 sha3.NewLegacyKeccak256
type BaseHasherFunc func() hash.Hash
Expand All @@ -75,8 +85,10 @@ type BaseHasherFunc func() hash.Hash
// the tree and itself in a state reusable for hashing a new chunk
// - generates and verifies segment inclusion proofs (TODO:)
type Hasher struct {
pool *TreePool // BMT resource pool
bmt *tree // prebuilt BMT resource for flowcontrol and proofs
pool *TreePool // BMT resource pool
bmt *tree // prebuilt BMT resource for flowcontrol and proofs
size int // bytes written to Hasher since last Reset()
cursor int // cursor to write to on next Write() call
}

// New creates a reusable BMT Hasher that
Expand Down Expand Up @@ -276,14 +288,56 @@ func newTree(segmentSize, depth int, hashfunc func() hash.Hash) *tree {
}
}

// methods needed to implement hash.Hash
// Implements param.SectionWriter
func (h *Hasher) SetWriter(_ param.SectionWriterFunc) param.SectionWriter {
log.Warn("Synchasher does not currently support SectionWriter chaining")
return h
}

// Implements param.SectionWriter
func (h *Hasher) SectionSize() int {
return h.pool.SegmentSize
}

// Implements param.SectionWriter
func (h *Hasher) SetLength(length int) {
}

// Implements param.SectionWriter
func (h *Hasher) SetSpan(length int) {
span := LengthToSpan(length)
h.getTree().span = span
}

// Implements storage.SwarmHash
func (h *Hasher) SetSpanBytes(b []byte) {
t := h.getTree()
t.span = make([]byte, 8)
copy(t.span, b)
}

// Implements param.SectionWriter
func (h *Hasher) Branches() int {
return h.pool.SegmentCount
}

// Implements param.SectionWriter
func (h *Hasher) Init(_ context.Context, _ func(error)) {
}

// Size returns the size
// Size returns the digest size
// Implements hash.Hash in param.SectionWriter
func (h *Hasher) Size() int {
return h.pool.SegmentSize
}

// Seek sets the section that will be written to on the next Write()
func (h *Hasher) SeekSection(offset int) {
h.cursor = offset
}

// BlockSize returns the block size
// Implements hash.Hash in param.SectionWriter
func (h *Hasher) BlockSize() int {
return 2 * h.pool.SegmentSize
}
Expand All @@ -293,31 +347,35 @@ func (h *Hasher) BlockSize() int {
// hash.Hash interface Sum method appends the byte slice to the underlying
// data before it calculates and returns the hash of the chunk
// caller must make sure Sum is not called concurrently with Write, writeSection
// Implements hash.Hash in param.SectionWriter
func (h *Hasher) Sum(b []byte) (s []byte) {
t := h.getTree()
if h.size == 0 && t.offset == 0 {
h.releaseTree()
return h.pool.zerohashes[h.pool.Depth]
}
// write the last section with final flag set to true
go h.writeSection(t.cursor, t.section, true, true)
// wait for the result
s = <-t.result
if t.span == nil {
t.span = LengthToSpan(h.size)
}
span := t.span
// release the tree resource back to the pool
h.releaseTree()
// b + sha3(span + BMT(pure_chunk))
if len(span) == 0 {
return append(b, s...)
}
return doSum(h.pool.hasher(), b, span, s)
}

// methods needed to implement the SwarmHash and the io.Writer interfaces

// Write calls sequentially add to the buffer to be hashed,
// with every full segment calls writeSection in a go routine
// Implements hash.Hash (io.Writer) in param.SectionWriter
func (h *Hasher) Write(b []byte) (int, error) {
l := len(b)
if l == 0 || l > h.pool.Size {
return 0, nil
}
h.size += len(b)
t := h.getTree()
secsize := 2 * h.pool.SegmentSize
// calculate length of missing bit to complete current open section
Expand Down Expand Up @@ -359,20 +417,13 @@ func (h *Hasher) Write(b []byte) (int, error) {
}

// Reset needs to be called before writing to the hasher
// Implements hash.Hash in param.SectionWriter
func (h *Hasher) Reset() {
h.cursor = 0
h.size = 0
h.releaseTree()
}

// methods needed to implement the SwarmHash interface

// ResetWithLength needs to be called before writing to the hasher
// the argument is supposed to be the byte slice binary representation of
// the length of the data subsumed under the hash, i.e., span
func (h *Hasher) ResetWithLength(span []byte) {
h.Reset()
h.getTree().span = span
}

// releaseTree gives back the Tree to the pool whereby it unlocks
// it resets tree, segment and index
func (h *Hasher) releaseTree() {
Expand All @@ -395,30 +446,30 @@ func (h *Hasher) releaseTree() {
}

// NewAsyncWriter extends Hasher with an interface for concurrent segment/section writes
// TODO: Instead of explicitly setting double size of segment should be dynamic and chunked internally. If not, we have to keep different bmt hashers generation functions for different purposes in the same instance, or cope with added complexity of bmt hasher generation functions having to receive parameters
func (h *Hasher) NewAsyncWriter(double bool) *AsyncHasher {
secsize := h.pool.SegmentSize
if double {
secsize *= 2
}
seccount := h.pool.SegmentCount
if double {
seccount /= 2
}
write := func(i int, section []byte, final bool) {
h.writeSection(i, section, double, final)
}
return &AsyncHasher{
Hasher: h,
double: double,
secsize: secsize,
write: write,
Hasher: h,
double: double,
secsize: secsize,
seccount: seccount,
write: write,
jobSize: 0,
sought: true,
}
}

// SectionWriter is an asynchronous segment/section writer interface
type SectionWriter interface {
Reset() // standard init to be called before reuse
Write(index int, data []byte) // write into section of index
Sum(b []byte, length int, span []byte) []byte // returns the hash of the buffer
SectionSize() int // size of the async section unit to use
}

// AsyncHasher extends BMT Hasher with an asynchronous segment/section writer interface
// AsyncHasher is unsafe and does not check indexes and section data lengths
// it must be used with the right indexes and length and the right number of sections
Expand All @@ -434,33 +485,94 @@ type SectionWriter interface {
// * it will not leak processes if not all sections are written but it blocks
// and keeps the resource which can be released calling Reset()
type AsyncHasher struct {
*Hasher // extends the Hasher
mtx sync.Mutex // to lock the cursor access
double bool // whether to use double segments (call Hasher.writeSection)
secsize int // size of base section (size of hash or double)
write func(i int, section []byte, final bool)
*Hasher // extends the Hasher
mtx sync.Mutex // to lock the cursor access
double bool // whether to use double segments (call Hasher.writeSection)
secsize int // size of base section (size of hash or double)
seccount int // base section count
write func(i int, section []byte, final bool)
errFunc func(error)
all bool // if all written in one go, temporary workaround
sought bool
jobSize int
}

// methods needed to implement AsyncWriter
// Implements param.SectionWriter
// TODO context should be implemented all across (ie original TODO in TreePool.reserve())
func (sw *AsyncHasher) Init(_ context.Context, errFunc func(error)) {
sw.errFunc = errFunc
}

// Implements param.SectionWriter
func (sw *AsyncHasher) Reset() {
sw.sought = true
sw.jobSize = 0
sw.all = false
sw.Hasher.Reset()
}

func (sw *AsyncHasher) SetLength(length int) {
sw.jobSize = length
}

// Implements param.SectionWriter
func (sw *AsyncHasher) SetWriter(_ param.SectionWriterFunc) param.SectionWriter {
sw.errFunc(errors.New("Asynchasher does not currently support SectionWriter chaining"))
return sw
}

// SectionSize returns the size of async section unit to use
// Implements param.SectionWriter
func (sw *AsyncHasher) SectionSize() int {
return sw.secsize
}

// DigestSize returns the branching factor, which is equivalent to the size of the BMT input
// Implements param.SectionWriter
func (sw *AsyncHasher) Branches() int {
return sw.seccount
}

// SeekSection sets the cursor where the next Write() will write
// It locks the cursor until Write() is called; if no Write() is called, it will hang.
// Implements param.SectionWriter
func (sw *AsyncHasher) SeekSection(offset int) {
sw.mtx.Lock()
sw.Hasher.SeekSection(offset)
}

// Write writes to the current position cursor of the Hasher
// The cursor must first be manually set with SeekSection()
// The method will NOT advance the cursor.
// Implements hash.hash in param.SectionWriter
func (sw *AsyncHasher) Write(section []byte) (int, error) {
defer sw.mtx.Unlock()
sw.Hasher.size += len(section)
return sw.writeSection(sw.Hasher.cursor, section)
}

// Write writes the i-th section of the BMT base
// this function can and is meant to be called concurrently
// it sets max segment threadsafely
func (sw *AsyncHasher) Write(i int, section []byte) {
sw.mtx.Lock()
defer sw.mtx.Unlock()
func (sw *AsyncHasher) writeSection(i int, section []byte) (int, error) {
// TODO: Temporary workaround for chunkwise write
if i < 0 {
sw.Hasher.cursor = 0
sw.Hasher.Reset()
sw.Hasher.SetLength(len(section))
sw.Hasher.Write(section)
sw.all = true
return len(section), nil
}
//sw.mtx.Lock() // this lock is now set in SeekSection
// defer sw.mtk.Unlock() // this unlock is still left in Write()
t := sw.getTree()
// cursor keeps track of the rightmost section written so far
// if index is lower than cursor then just write non-final section as is
if i < t.cursor {
// if index is not the rightmost, safe to write section
go sw.write(i, section, false)
return
return len(section), nil
}
// if there is a previous rightmost section safe to write section
if t.offset > 0 {
Expand All @@ -470,7 +582,7 @@ func (sw *AsyncHasher) Write(i int, section []byte) {
t.section = make([]byte, sw.secsize)
copy(t.section, section)
go sw.write(i, t.section, true)
return
return len(section), nil
}
// the rightmost section just changed, so we write the previous one as non-final
go sw.write(t.cursor, t.section, false)
Expand All @@ -481,6 +593,7 @@ func (sw *AsyncHasher) Write(i int, section []byte) {
t.offset = i*sw.secsize + 1
t.section = make([]byte, sw.secsize)
copy(t.section, section)
return len(section), nil
}

// Sum can be called any time once the length and the span is known
Expand All @@ -492,12 +605,20 @@ func (sw *AsyncHasher) Write(i int, section []byte) {
// length: known length of the input (unsafe; undefined if out of range)
// meta: metadata to hash together with BMT root for the final digest
// e.g., span for protection against existential forgery
func (sw *AsyncHasher) Sum(b []byte, length int, meta []byte) (s []byte) {
//
// Implements hash.hash in param.SectionWriter
func (sw *AsyncHasher) Sum(b []byte) (s []byte) {
if sw.all {
return sw.Hasher.Sum(nil)
}
sw.mtx.Lock()
t := sw.getTree()
length := sw.jobSize
if length == 0 {
sw.releaseTree()
sw.mtx.Unlock()
s = sw.pool.zerohashes[sw.pool.Depth]
return
} else {
// for non-zero input the rightmost section is written to the tree asynchronously
// if the actual last section has been written (t.cursor == length/t.secsize)
Expand All @@ -515,15 +636,13 @@ func (sw *AsyncHasher) Sum(b []byte, length int, meta []byte) (s []byte) {
}
// relesase the tree back to the pool
sw.releaseTree()
// if no meta is given just append digest to b
if len(meta) == 0 {
return append(b, s...)
}
meta := t.span
// hash together meta and BMT root hash using the pools
return doSum(sw.pool.hasher(), b, meta, s)
}

// writeSection writes the hash of i-th section into level 1 node of the BMT tree
// TODO: h.size increases even on multiple writes to the same section of a section
func (h *Hasher) writeSection(i int, section []byte, double bool, final bool) {
// select the leaf node for the section
var n *node
Expand Down Expand Up @@ -688,3 +807,11 @@ func calculateDepthFor(n int) (d int) {
}
return d + 1
}

// creates a binary span size representation
// to pass to bmt.SectionWriter
func LengthToSpan(length int) []byte {
spanBytes := make([]byte, 8)
binary.LittleEndian.PutUint64(spanBytes, uint64(length))
return spanBytes
}
Loading