Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GooglePubSub: emulator does not handle authentication #523

Closed
yanns opened this issue Oct 13, 2017 · 7 comments
Closed

GooglePubSub: emulator does not handle authentication #523

yanns opened this issue Oct 13, 2017 · 7 comments

Comments

@yanns
Copy link

yanns commented Oct 13, 2017

When using the emulator in docker-compose:

pubsub:
  container_name: pubsub
  image: google/cloud-sdk
  command: gcloud beta emulators pubsub start --host-port 0.0.0.0:8042
  ports:
   - "8042:8042"

and setting the PUBSUB_EMULATOR_HOST to localhost:8042
(#299)

then I have the following error:

[info]   exception thrown spray.json.DeserializationException: Object is missing required member 'access_token'
[info]   	at spray.json.package$.deserializationError(package.scala:23)
[info]   	at spray.json.ProductFormats.fromField(ProductFormats.scala:60)
[info]   	at spray.json.ProductFormats.fromField$(ProductFormats.scala:50)
[info]   	at spray.json.DefaultJsonProtocol$.fromField(DefaultJsonProtocol.scala:30)
[info]   	at spray.json.ProductFormatsInstances$$anon$3.read(ProductFormatsInstances.scala:78)
[info]   	at spray.json.ProductFormatsInstances$$anon$3.read(ProductFormatsInstances.scala:68)
[info]   	at akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport.$anonfun$sprayJsonUnmarshaller$1(SprayJsonSupport.scala:30)
[info]   	at akka.http.scaladsl.util.FastFuture$.$anonfun$map$1(FastFuture.scala:23)
[info]   	at akka.http.scaladsl.util.FastFuture$.strictTransform$1(FastFuture.scala:41)
[info]   	at akka.http.scaladsl.util.FastFuture$.$anonfun$transformWith$3(FastFuture.scala:51)
[info]   	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
[info]   	at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
[info]   	at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:91)
[info]   	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
[info]   	at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
[info]   	at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:91)
[info]   	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
[info]   	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:43)
[info]   	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[info]   	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[info]   	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[info]   	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[info]   Caused by: java.util.NoSuchElementException: key not found: access_token
[info]   	at scala.collection.immutable.Map$Map2.apply(Map.scala:129)
[info]   	at spray.json.ProductFormats.fromField(ProductFormats.scala:57)
[info]   	... 20 more

And I can see this log in the emulator:

[pubsub] INFO: Unknown request URI: /oauth2/v4/token

The solution would be:
when we are running against the simulator, the authentication should be by-passed.

@ilijaljubicic
Copy link

Could not get pull subscription working on emulated pubsub even with #605

session tries to get token and that seems to break things
even when subscription sends token when using emulated pubsub

When added small if-check into getToken inside Session.scala (just to debug)
everything started to work:

  def getToken()(implicit as: ActorSystem, materializer: Materializer): Future[String] = {
    import materializer.executionContext

    if (httpApi.isEmulated) {
      Future.successful("NO_TOKEN")
    } else {
      maybeAccessToken
        .getOrElse(getNewToken())
        .flatMap { result =>
          if (expiresSoon(result)) {
            getNewToken()
          } else {
            Future.successful(result)
          }
        }
        .map(_.accessToken)
    }
  }

@ennru
Copy link
Member

ennru commented Jan 22, 2018

@lukasz-golebiewski Do you have any input on this?

@ilijaljubicic
Copy link

ilijaljubicic commented Jan 23, 2018

issues seems to be that Session object passed to GooglePubSubSource
when in emulated mode doesn't check for being in emulated mode,
and then getToken method :
val req = session.getToken() called from GooglePubSubSource:createLogic

actually return failed future since there is an error :
"akka.http.scaladsl.unmarshalling.Unmarshaller$UnsupportedContentTypeException: Unsupported Content-Type, supported: application/json"

I guess emulator returns non json response in this case,logs from from emulator are:
pubsub] Jan 23, 2018 9:07:08 AM com.google.cloud.iam.testing.v1.shared.authorization.AuthInterceptor interceptCall
[pubsub] INFO: Authentication interceptor: Header value is null
[pubsub] Jan 23, 2018 9:07:08 AM com.google.cloud.iam.testing.v1.shared.authorization.AuthInterceptor interceptCall
[pubsub] INFO: Authentication interceptor: invalid or missing token

so failed future fails the whole stage :

 private val callback = getAsyncCallback[(Try[PullResponse], Materializer)] {
        case (Failure(tr), _) =>
          failStage(tr)

there is several places where isEmulated check could be put..
to me looks the best to be put in Session.scala .

Session.scala:

  def getToken()(implicit as: ActorSystem, materializer: Materializer): Future[String] = {
    import materializer.executionContext
    if (httpApi.isEmulated) {
      Future.successful("NO_TOKEN")
    } else {
      maybeAccessToken
        .getOrElse(getNewToken())
        .flatMap { result =>
          if (expiresSoon(result)) {
            getNewToken()
          } else {
            Future.successful(result)
          }
        }
        .map(_.accessToken)
    }
  }

@rcoh
Copy link

rcoh commented Jul 2, 2018

Seems like a small fix and it's been open for 2 years, I asssume you'd accept a PR?

@ennru
Copy link
Member

ennru commented Jul 3, 2018

You're more than welcome!

@armanbilge
Copy link
Contributor

This will be fixed by #2613.

@seglo
Copy link
Member

seglo commented Mar 26, 2021

Fixed in #2613

@seglo seglo closed this as completed Mar 26, 2021
@seglo seglo added this to the 3.0.0-M1 milestone Mar 26, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

7 participants